1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 net::SocketAddr,
12 path::PathBuf,
13 str::FromStr,
14 sync::{Arc, Weak},
15 time::Duration,
16};
17
18use anemo::Network;
19use anemo_tower::{
20 callback::CallbackLayer,
21 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
22};
23use anyhow::{Result, anyhow};
24use arc_swap::ArcSwap;
25use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
26use futures::{TryFutureExt, future::BoxFuture};
27pub use handle::IotaNodeHandle;
28use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
29use iota_common::debug_fatal;
30use iota_config::{
31 ConsensusConfig, NodeConfig,
32 node::{DBCheckpointConfig, RunWithRange},
33 node_config_metrics::NodeConfigMetrics,
34 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
35};
36use iota_core::{
37 authority::{
38 AuthorityState, AuthorityStore, CHAIN_IDENTIFIER, RandomnessRoundReceiver,
39 authority_per_epoch_store::AuthorityPerEpochStore,
40 authority_store_tables::{AuthorityPerpetualTables, AuthorityPerpetualTablesOptions},
41 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
42 },
43 authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
44 authority_client::NetworkAuthorityClient,
45 authority_server::{ValidatorService, ValidatorServiceMetrics},
46 checkpoints::{
47 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
48 SubmitCheckpointToConsensus,
49 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
50 },
51 connection_monitor::ConnectionMonitor,
52 consensus_adapter::{
53 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
54 ConsensusClient,
55 },
56 consensus_handler::ConsensusHandlerInitializer,
57 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
58 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
59 db_checkpoint_handler::DBCheckpointHandler,
60 epoch::{
61 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
62 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
63 reconfiguration::ReconfigurationInitiator,
64 },
65 execution_cache::build_execution_cache,
66 jsonrpc_index::IndexStore,
67 module_cache_metrics::ResolverMetrics,
68 overload_monitor::overload_monitor,
69 rest_index::RestIndexStore,
70 safe_client::SafeClientMetricsBase,
71 signature_verifier::SignatureVerifierMetrics,
72 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
73 storage::{RestReadStore, RocksDbStore},
74 traffic_controller::metrics::TrafficControllerMetrics,
75 transaction_orchestrator::TransactionOrchestrator,
76 validator_tx_finalizer::ValidatorTxFinalizer,
77};
78use iota_json_rpc::{
79 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
80 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
81 transaction_builder_api::TransactionBuilderApi,
82 transaction_execution_api::TransactionExecutionApi,
83};
84use iota_json_rpc_api::JsonRpcMetrics;
85use iota_macros::{fail_point, fail_point_async, replay_log};
86use iota_metrics::{
87 RegistryID, RegistryService,
88 hardware_metrics::register_hardware_metrics,
89 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
90 server_timing_middleware, spawn_monitored_task,
91};
92use iota_network::{
93 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
94};
95use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
96use iota_protocol_config::ProtocolConfig;
97use iota_rest_api::RestMetrics;
98use iota_snapshot::uploader::StateSnapshotUploader;
99use iota_storage::{
100 FileCompression, StorageFormat,
101 http_key_value_store::HttpKVStore,
102 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
103 key_value_store_metrics::KeyValueStoreMetrics,
104};
105use iota_types::{
106 base_types::{AuthorityName, ConciseableName, EpochId},
107 committee::Committee,
108 crypto::{KeypairTraits, RandomnessRound},
109 digests::ChainIdentifier,
110 error::{IotaError, IotaResult},
111 executable_transaction::VerifiedExecutableTransaction,
112 execution_config_utils::to_binary_config,
113 iota_system_state::{
114 IotaSystemState, IotaSystemStateTrait,
115 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
116 },
117 messages_consensus::{
118 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
119 check_total_jwk_size,
120 },
121 quorum_driver_types::QuorumDriverEffectsQueueResult,
122 supported_protocol_versions::SupportedProtocolVersions,
123 transaction::{Transaction, VerifiedCertificate},
124};
125use prometheus::Registry;
126#[cfg(msim)]
127pub use simulator::set_jwk_injector;
128#[cfg(msim)]
129use simulator::*;
130use tap::tap::TapFallible;
131use tokio::{
132 runtime::Handle,
133 sync::{Mutex, broadcast, mpsc, watch},
134 task::{JoinHandle, JoinSet},
135};
136use tower::ServiceBuilder;
137use tracing::{Instrument, debug, error, error_span, info, warn};
138use typed_store::{
139 DBMetrics,
140 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
141};
142
143use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
144
145pub mod admin;
146mod handle;
147pub mod metrics;
148
149pub struct ValidatorComponents {
150 validator_server_handle: SpawnOnce,
151 validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
152 validator_overload_monitor_handle: Option<JoinHandle<()>>,
153 consensus_manager: ConsensusManager,
154 consensus_store_pruner: ConsensusStorePruner,
155 consensus_adapter: Arc<ConsensusAdapter>,
156 checkpoint_service_tasks: JoinSet<()>,
158 checkpoint_metrics: Arc<CheckpointMetrics>,
159 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
160 validator_registry_id: RegistryID,
161}
162
163#[cfg(msim)]
164mod simulator {
165 use std::sync::atomic::AtomicBool;
166
167 use super::*;
168
169 pub(super) struct SimState {
170 pub sim_node: iota_simulator::runtime::NodeHandle,
171 pub sim_safe_mode_expected: AtomicBool,
172 _leak_detector: iota_simulator::NodeLeakDetector,
173 }
174
175 impl Default for SimState {
176 fn default() -> Self {
177 Self {
178 sim_node: iota_simulator::runtime::NodeHandle::current(),
179 sim_safe_mode_expected: AtomicBool::new(false),
180 _leak_detector: iota_simulator::NodeLeakDetector::new(),
181 }
182 }
183 }
184
185 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
186 + Send
187 + Sync
188 + 'static;
189
190 fn default_fetch_jwks(
191 _authority: AuthorityName,
192 _provider: &OIDCProvider,
193 ) -> IotaResult<Vec<(JwkId, JWK)>> {
194 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
195 parse_jwks(
197 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
198 &OIDCProvider::Twitch,
199 )
200 .map_err(|_| IotaError::JWKRetrieval)
201 }
202
203 thread_local! {
204 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
205 }
206
207 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
208 JWK_INJECTOR.with(|injector| injector.borrow().clone())
209 }
210
211 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
212 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
213 }
214}
215
216pub struct IotaNode {
217 config: NodeConfig,
218 validator_components: Mutex<Option<ValidatorComponents>>,
219 _http_server: Option<tokio::task::JoinHandle<()>>,
222 state: Arc<AuthorityState>,
223 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
224 registry_service: RegistryService,
225 metrics: Arc<IotaNodeMetrics>,
226
227 _discovery: discovery::Handle,
228 state_sync_handle: state_sync::Handle,
229 randomness_handle: randomness::Handle,
230 checkpoint_store: Arc<CheckpointStore>,
231 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
232 connection_monitor_status: Arc<ConnectionMonitorStatus>,
233
234 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
236
237 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
240
241 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
242
243 #[cfg(msim)]
244 sim_state: SimState,
245
246 _state_archive_handle: Option<broadcast::Sender<()>>,
247
248 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
249 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
251
252 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
258}
259
260impl fmt::Debug for IotaNode {
261 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262 f.debug_struct("IotaNode")
263 .field("name", &self.state.name.concise())
264 .finish()
265 }
266}
267
268static MAX_JWK_KEYS_PER_FETCH: usize = 100;
269
270impl IotaNode {
271 pub async fn start(
272 config: NodeConfig,
273 registry_service: RegistryService,
274 custom_rpc_runtime: Option<Handle>,
275 ) -> Result<Arc<IotaNode>> {
276 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
277 }
278
279 fn start_jwk_updater(
284 config: &NodeConfig,
285 metrics: Arc<IotaNodeMetrics>,
286 authority: AuthorityName,
287 epoch_store: Arc<AuthorityPerEpochStore>,
288 consensus_adapter: Arc<ConsensusAdapter>,
289 ) {
290 let epoch = epoch_store.epoch();
291
292 let supported_providers = config
293 .zklogin_oauth_providers
294 .get(&epoch_store.get_chain_identifier().chain())
295 .unwrap_or(&BTreeSet::new())
296 .iter()
297 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
298 .collect::<Vec<_>>();
299
300 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
301
302 info!(
303 ?fetch_interval,
304 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
305 );
306
307 fn validate_jwk(
308 metrics: &Arc<IotaNodeMetrics>,
309 provider: &OIDCProvider,
310 id: &JwkId,
311 jwk: &JWK,
312 ) -> bool {
313 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
314 warn!(
315 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
316 id.iss, provider
317 );
318 metrics
319 .invalid_jwks
320 .with_label_values(&[&provider.to_string()])
321 .inc();
322 return false;
323 };
324
325 if iss_provider != *provider {
326 warn!(
327 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
328 id.iss, provider, iss_provider
329 );
330 metrics
331 .invalid_jwks
332 .with_label_values(&[&provider.to_string()])
333 .inc();
334 return false;
335 }
336
337 if !check_total_jwk_size(id, jwk) {
338 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
339 metrics
340 .invalid_jwks
341 .with_label_values(&[&provider.to_string()])
342 .inc();
343 return false;
344 }
345
346 true
347 }
348
349 for p in supported_providers.into_iter() {
358 let provider_str = p.to_string();
359 let epoch_store = epoch_store.clone();
360 let consensus_adapter = consensus_adapter.clone();
361 let metrics = metrics.clone();
362 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
363 async move {
364 let mut seen = HashSet::new();
367 loop {
368 info!("fetching JWK for provider {:?}", p);
369 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
370 match Self::fetch_jwks(authority, &p).await {
371 Err(e) => {
372 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
373 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
374 tokio::time::sleep(Duration::from_secs(30)).await;
376 continue;
377 }
378 Ok(mut keys) => {
379 metrics.total_jwks
380 .with_label_values(&[&provider_str])
381 .inc_by(keys.len() as u64);
382
383 keys.retain(|(id, jwk)| {
384 validate_jwk(&metrics, &p, id, jwk) &&
385 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
386 seen.insert((id.clone(), jwk.clone()))
387 });
388
389 metrics.unique_jwks
390 .with_label_values(&[&provider_str])
391 .inc_by(keys.len() as u64);
392
393 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
396 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
397 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
398 }
399
400 for (id, jwk) in keys.into_iter() {
401 info!("Submitting JWK to consensus: {:?}", id);
402
403 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
404 consensus_adapter.submit(txn, None, &epoch_store)
405 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
406 .ok();
407 }
408 }
409 }
410 tokio::time::sleep(fetch_interval).await;
411 }
412 }
413 .instrument(error_span!("jwk_updater_task", epoch)),
414 ));
415 }
416 }
417
418 pub async fn start_async(
419 config: NodeConfig,
420 registry_service: RegistryService,
421 custom_rpc_runtime: Option<Handle>,
422 software_version: &'static str,
423 ) -> Result<Arc<IotaNode>> {
424 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
425 let mut config = config.clone();
426 if config.supported_protocol_versions.is_none() {
427 info!(
428 "populating config.supported_protocol_versions with default {:?}",
429 SupportedProtocolVersions::SYSTEM_DEFAULT
430 );
431 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
432 }
433
434 let run_with_range = config.run_with_range;
435 let is_validator = config.consensus_config().is_some();
436 let is_full_node = !is_validator;
437 let prometheus_registry = registry_service.default_registry();
438
439 info!(node =? config.authority_public_key(),
440 "Initializing iota-node listening on {}", config.network_address
441 );
442
443 let genesis = config.genesis()?.clone();
444
445 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
446 let _ = CHAIN_IDENTIFIER.set(chain_identifier);
448 info!("IOTA chain identifier: {chain_identifier}");
449
450 let db_corrupted_path = &config.db_path().join("status");
452 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
453 panic!("Failed to check database corruption: {err}");
454 }
455
456 DBMetrics::init(&prometheus_registry);
458
459 iota_metrics::init_metrics(&prometheus_registry);
461 #[cfg(not(msim))]
464 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
465
466 register_hardware_metrics(®istry_service, &config.db_path)
468 .expect("Failed registering hardware metrics");
469 prometheus_registry
471 .register(iota_metrics::uptime_metric(
472 if is_validator {
473 "validator"
474 } else {
475 "fullnode"
476 },
477 software_version,
478 &chain_identifier.to_string(),
479 ))
480 .expect("Failed registering uptime metric");
481
482 let migration_tx_data = if genesis.contains_migrations() {
485 Some(config.load_migration_tx_data()?)
488 } else {
489 None
490 };
491
492 let secret = Arc::pin(config.authority_key_pair().copy());
493 let genesis_committee = genesis.committee()?;
494 let committee_store = Arc::new(CommitteeStore::new(
495 config.db_path().join("epochs"),
496 &genesis_committee,
497 None,
498 ));
499
500 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
502 let perpetual_tables_options = AuthorityPerpetualTablesOptions { enable_write_stall };
503 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
504 &config.db_path().join("store"),
505 Some(perpetual_tables_options),
506 ));
507 let is_genesis = perpetual_tables
508 .database_is_empty()
509 .expect("Database read should not fail at init.");
510 let store = AuthorityStore::open(
511 perpetual_tables,
512 &genesis,
513 &config,
514 &prometheus_registry,
515 migration_tx_data.as_ref(),
516 )
517 .await?;
518
519 let cur_epoch = store.get_recovery_epoch_at_restart()?;
520 let committee = committee_store
521 .get_committee(&cur_epoch)?
522 .expect("Committee of the current epoch must exist");
523 let epoch_start_configuration = store
524 .get_epoch_start_configuration()?
525 .expect("EpochStartConfiguration of the current epoch must exist");
526 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
527 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
528
529 let cache_traits =
530 build_execution_cache(&epoch_start_configuration, &prometheus_registry, &store);
531
532 let auth_agg = {
533 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
534 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
535 Arc::new(ArcSwap::new(Arc::new(
536 AuthorityAggregator::new_from_epoch_start_state(
537 epoch_start_configuration.epoch_start_state(),
538 &committee_store,
539 safe_client_metrics_base,
540 auth_agg_metrics,
541 ),
542 )))
543 };
544
545 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
546 let epoch_store = AuthorityPerEpochStore::new(
547 config.authority_public_key(),
548 committee.clone(),
549 &config.db_path().join("store"),
550 Some(epoch_options.options),
551 EpochMetrics::new(®istry_service.default_registry()),
552 epoch_start_configuration,
553 cache_traits.backing_package_store.clone(),
554 cache_traits.object_store.clone(),
555 cache_metrics,
556 signature_verifier_metrics,
557 &config.expensive_safety_check_config,
558 ChainIdentifier::from(*genesis.checkpoint().digest()),
559 );
560
561 info!("created epoch store");
562
563 replay_log!(
564 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
565 epoch_store.epoch(),
566 epoch_store.protocol_config()
567 );
568
569 if is_genesis {
571 info!("checking IOTA conservation at genesis");
572 cache_traits
577 .reconfig_api
578 .expensive_check_iota_conservation(&epoch_store, None)
579 .expect("IOTA conservation check cannot fail at genesis");
580 }
581
582 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
583 let default_buffer_stake = epoch_store
584 .protocol_config()
585 .buffer_stake_for_protocol_upgrade_bps();
586 if effective_buffer_stake != default_buffer_stake {
587 warn!(
588 ?effective_buffer_stake,
589 ?default_buffer_stake,
590 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
591 );
592 }
593
594 info!("creating checkpoint store");
595
596 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
597 checkpoint_store.insert_genesis_checkpoint(
598 genesis.checkpoint(),
599 genesis.checkpoint_contents().clone(),
600 &epoch_store,
601 );
602
603 unmark_db_corruption(db_corrupted_path)?;
605
606 info!("creating state sync store");
607 let state_sync_store = RocksDbStore::new(
608 cache_traits.clone(),
609 committee_store.clone(),
610 checkpoint_store.clone(),
611 );
612
613 let index_store = if is_full_node && config.enable_index_processing {
614 info!("creating index store");
615 Some(Arc::new(IndexStore::new(
616 config.db_path().join("indexes"),
617 &prometheus_registry,
618 epoch_store
619 .protocol_config()
620 .max_move_identifier_len_as_option(),
621 config.remove_deprecated_tables,
622 )))
623 } else {
624 None
625 };
626
627 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
628 {
629 Some(Arc::new(RestIndexStore::new(
630 config.db_path().join("rest_index"),
631 &store,
632 &checkpoint_store,
633 &epoch_store,
634 &cache_traits.backing_package_store,
635 )))
636 } else {
637 None
638 };
639
640 info!("creating archive reader");
641 let archive_readers =
646 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
647 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
648 let (randomness_tx, randomness_rx) = mpsc::channel(
649 config
650 .p2p_config
651 .randomness
652 .clone()
653 .unwrap_or_default()
654 .mailbox_capacity(),
655 );
656 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
657 Self::create_p2p_network(
658 &config,
659 state_sync_store.clone(),
660 chain_identifier,
661 trusted_peer_change_rx,
662 archive_readers.clone(),
663 randomness_tx,
664 &prometheus_registry,
665 )?;
666
667 send_trusted_peer_change(
670 &config,
671 &trusted_peer_change_tx,
672 epoch_store.epoch_start_state(),
673 );
674
675 info!("start state archival");
676 let state_archive_handle =
678 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
679 .await?;
680
681 info!("start snapshot upload");
682 let state_snapshot_handle =
684 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
685
686 info!("start db checkpoint");
688 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
689 &config,
690 &prometheus_registry,
691 state_snapshot_handle.is_some(),
692 )?;
693
694 let mut genesis_objects = genesis.objects().to_vec();
695 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
696 genesis_objects.extend(migration_tx_data.get_objects());
697 }
698
699 let authority_name = config.authority_public_key();
700 let validator_tx_finalizer =
701 config
702 .enable_validator_tx_finalizer
703 .then_some(Arc::new(ValidatorTxFinalizer::new(
704 auth_agg.clone(),
705 authority_name,
706 &prometheus_registry,
707 )));
708
709 info!("create authority state");
710 let state = AuthorityState::new(
711 authority_name,
712 secret,
713 config.supported_protocol_versions.unwrap(),
714 store.clone(),
715 cache_traits.clone(),
716 epoch_store.clone(),
717 committee_store.clone(),
718 index_store.clone(),
719 rest_index,
720 checkpoint_store.clone(),
721 &prometheus_registry,
722 &genesis_objects,
723 &db_checkpoint_config,
724 config.clone(),
725 config.indirect_objects_threshold,
726 archive_readers,
727 validator_tx_finalizer,
728 )
729 .await;
730
731 if epoch_store.epoch() == 0 {
733 let genesis_tx = &genesis.transaction();
734 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
735 Self::execute_transaction_immediately_at_zero_epoch(
737 &state,
738 &epoch_store,
739 genesis_tx,
740 span,
741 )
742 .await;
743
744 if let Some(migration_tx_data) = migration_tx_data {
746 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
747 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
748 Self::execute_transaction_immediately_at_zero_epoch(
749 &state,
750 &epoch_store,
751 tx,
752 span,
753 )
754 .await;
755 }
756 }
757 }
758
759 checkpoint_store
760 .reexecute_local_checkpoints(&state, &epoch_store)
761 .await;
762
763 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
766
767 if config
768 .expensive_safety_check_config
769 .enable_secondary_index_checks()
770 {
771 if let Some(indexes) = state.indexes.clone() {
772 iota_core::verify_indexes::verify_indexes(
773 state.get_accumulator_store().as_ref(),
774 indexes,
775 )
776 .expect("secondary indexes are inconsistent");
777 }
778 }
779
780 let (end_of_epoch_channel, end_of_epoch_receiver) =
781 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
782
783 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
784 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
785 auth_agg.load_full(),
786 state.clone(),
787 end_of_epoch_receiver,
788 &config.db_path(),
789 &prometheus_registry,
790 )))
791 } else {
792 None
793 };
794
795 let http_server = build_http_server(
796 state.clone(),
797 state_sync_store,
798 &transaction_orchestrator.clone(),
799 &config,
800 &prometheus_registry,
801 custom_rpc_runtime,
802 software_version,
803 )
804 .await?;
805
806 let accumulator = Arc::new(StateAccumulator::new(
807 cache_traits.accumulator_store.clone(),
808 StateAccumulatorMetrics::new(&prometheus_registry),
809 ));
810
811 let authority_names_to_peer_ids = epoch_store
812 .epoch_start_state()
813 .get_authority_names_to_peer_ids();
814
815 let network_connection_metrics =
816 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
817
818 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
819
820 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
821 p2p_network.downgrade(),
822 network_connection_metrics,
823 HashMap::new(),
824 None,
825 );
826
827 let connection_monitor_status = ConnectionMonitorStatus {
828 connection_statuses,
829 authority_names_to_peer_ids,
830 };
831
832 let connection_monitor_status = Arc::new(connection_monitor_status);
833 let iota_node_metrics =
834 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
835
836 let validator_components = if state.is_validator(&epoch_store) {
837 let (components, _) = futures::join!(
838 Self::construct_validator_components(
839 config.clone(),
840 state.clone(),
841 committee,
842 epoch_store.clone(),
843 checkpoint_store.clone(),
844 state_sync_handle.clone(),
845 randomness_handle.clone(),
846 Arc::downgrade(&accumulator),
847 connection_monitor_status.clone(),
848 ®istry_service,
849 iota_node_metrics.clone(),
850 ),
851 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
852 );
853 let mut components = components?;
854
855 components.consensus_adapter.submit_recovered(&epoch_store);
856
857 components.validator_server_handle = components.validator_server_handle.start();
859
860 Some(components)
861 } else {
862 None
863 };
864
865 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
867
868 let node = Self {
869 config,
870 validator_components: Mutex::new(validator_components),
871 _http_server: http_server,
872 state,
873 transaction_orchestrator,
874 registry_service,
875 metrics: iota_node_metrics,
876
877 _discovery: discovery_handle,
878 state_sync_handle,
879 randomness_handle,
880 checkpoint_store,
881 accumulator: Mutex::new(Some(accumulator)),
882 end_of_epoch_channel,
883 connection_monitor_status,
884 trusted_peer_change_tx,
885
886 _db_checkpoint_handle: db_checkpoint_handle,
887
888 #[cfg(msim)]
889 sim_state: Default::default(),
890
891 _state_archive_handle: state_archive_handle,
892 _state_snapshot_uploader_handle: state_snapshot_handle,
893 shutdown_channel_tx: shutdown_channel,
894
895 auth_agg,
896 };
897
898 info!("IotaNode started!");
899 let node = Arc::new(node);
900 let node_copy = node.clone();
901 spawn_monitored_task!(async move {
902 let result = Self::monitor_reconfiguration(node_copy).await;
903 if let Err(error) = result {
904 warn!("Reconfiguration finished with error {:?}", error);
905 }
906 });
907
908 Ok(node)
909 }
910
911 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
912 self.end_of_epoch_channel.subscribe()
913 }
914
915 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
916 self.shutdown_channel_tx.subscribe()
917 }
918
919 pub fn current_epoch_for_testing(&self) -> EpochId {
920 self.state.current_epoch_for_testing()
921 }
922
923 pub fn db_checkpoint_path(&self) -> PathBuf {
924 self.config.db_checkpoint_path()
925 }
926
927 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
929 info!("close_epoch (current epoch = {})", epoch_store.epoch());
930 self.validator_components
931 .lock()
932 .await
933 .as_ref()
934 .ok_or_else(|| IotaError::from("Node is not a validator"))?
935 .consensus_adapter
936 .close_epoch(epoch_store);
937 Ok(())
938 }
939
940 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
941 self.state
942 .clear_override_protocol_upgrade_buffer_stake(epoch)
943 }
944
945 pub fn set_override_protocol_upgrade_buffer_stake(
946 &self,
947 epoch: EpochId,
948 buffer_stake_bps: u64,
949 ) -> IotaResult {
950 self.state
951 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
952 }
953
954 pub async fn close_epoch_for_testing(&self) -> IotaResult {
957 let epoch_store = self.state.epoch_store_for_testing();
958 self.close_epoch(&epoch_store).await
959 }
960
961 async fn start_state_archival(
962 config: &NodeConfig,
963 prometheus_registry: &Registry,
964 state_sync_store: RocksDbStore,
965 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
966 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
967 let local_store_config = ObjectStoreConfig {
968 object_store: Some(ObjectStoreType::File),
969 directory: Some(config.archive_path()),
970 ..Default::default()
971 };
972 let archive_writer = ArchiveWriter::new(
973 local_store_config,
974 remote_store_config.clone(),
975 FileCompression::Zstd,
976 StorageFormat::Blob,
977 Duration::from_secs(600),
978 256 * 1024 * 1024,
979 prometheus_registry,
980 )
981 .await?;
982 Ok(Some(archive_writer.start(state_sync_store).await?))
983 } else {
984 Ok(None)
985 }
986 }
987
988 fn start_state_snapshot(
991 config: &NodeConfig,
992 prometheus_registry: &Registry,
993 checkpoint_store: Arc<CheckpointStore>,
994 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
996 let snapshot_uploader = StateSnapshotUploader::new(
997 &config.db_checkpoint_path(),
998 &config.snapshot_path(),
999 remote_store_config.clone(),
1000 60,
1001 prometheus_registry,
1002 checkpoint_store,
1003 )?;
1004 Ok(Some(snapshot_uploader.start()))
1005 } else {
1006 Ok(None)
1007 }
1008 }
1009
1010 fn start_db_checkpoint(
1011 config: &NodeConfig,
1012 prometheus_registry: &Registry,
1013 state_snapshot_enabled: bool,
1014 ) -> Result<(
1015 DBCheckpointConfig,
1016 Option<tokio::sync::broadcast::Sender<()>>,
1017 )> {
1018 let checkpoint_path = Some(
1019 config
1020 .db_checkpoint_config
1021 .checkpoint_path
1022 .clone()
1023 .unwrap_or_else(|| config.db_checkpoint_path()),
1024 );
1025 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1026 DBCheckpointConfig {
1027 checkpoint_path,
1028 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1029 true
1030 } else {
1031 config
1032 .db_checkpoint_config
1033 .perform_db_checkpoints_at_epoch_end
1034 },
1035 ..config.db_checkpoint_config.clone()
1036 }
1037 } else {
1038 config.db_checkpoint_config.clone()
1039 };
1040
1041 match (
1042 db_checkpoint_config.object_store_config.as_ref(),
1043 state_snapshot_enabled,
1044 ) {
1045 (None, false) => Ok((db_checkpoint_config, None)),
1050 (_, _) => {
1051 let handler = DBCheckpointHandler::new(
1052 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1053 db_checkpoint_config.object_store_config.as_ref(),
1054 60,
1055 db_checkpoint_config
1056 .prune_and_compact_before_upload
1057 .unwrap_or(true),
1058 config.indirect_objects_threshold,
1059 config.authority_store_pruning_config.clone(),
1060 prometheus_registry,
1061 state_snapshot_enabled,
1062 )?;
1063 Ok((
1064 db_checkpoint_config,
1065 Some(DBCheckpointHandler::start(handler)),
1066 ))
1067 }
1068 }
1069 }
1070
1071 fn create_p2p_network(
1072 config: &NodeConfig,
1073 state_sync_store: RocksDbStore,
1074 chain_identifier: ChainIdentifier,
1075 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1076 archive_readers: ArchiveReaderBalancer,
1077 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1078 prometheus_registry: &Registry,
1079 ) -> Result<(
1080 Network,
1081 discovery::Handle,
1082 state_sync::Handle,
1083 randomness::Handle,
1084 )> {
1085 let (state_sync, state_sync_server) = state_sync::Builder::new()
1086 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1087 .store(state_sync_store)
1088 .archive_readers(archive_readers)
1089 .with_metrics(prometheus_registry)
1090 .build();
1091
1092 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1093 .config(config.p2p_config.clone())
1094 .build();
1095
1096 let (randomness, randomness_router) =
1097 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1098 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1099 .with_metrics(prometheus_registry)
1100 .build();
1101
1102 let p2p_network = {
1103 let routes = anemo::Router::new()
1104 .add_rpc_service(discovery_server)
1105 .add_rpc_service(state_sync_server);
1106 let routes = routes.merge(randomness_router);
1107
1108 let inbound_network_metrics =
1109 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1110 let outbound_network_metrics =
1111 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1112
1113 let service = ServiceBuilder::new()
1114 .layer(
1115 TraceLayer::new_for_server_errors()
1116 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1117 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1118 )
1119 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1120 Arc::new(inbound_network_metrics),
1121 config.p2p_config.excessive_message_size(),
1122 )))
1123 .service(routes);
1124
1125 let outbound_layer = ServiceBuilder::new()
1126 .layer(
1127 TraceLayer::new_for_client_and_server_errors()
1128 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1129 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1130 )
1131 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1132 Arc::new(outbound_network_metrics),
1133 config.p2p_config.excessive_message_size(),
1134 )))
1135 .into_inner();
1136
1137 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1138 anemo_config.max_frame_size = Some(1 << 30);
1141
1142 let mut quic_config = anemo_config.quic.unwrap_or_default();
1145 if quic_config.socket_send_buffer_size.is_none() {
1146 quic_config.socket_send_buffer_size = Some(20 << 20);
1147 }
1148 if quic_config.socket_receive_buffer_size.is_none() {
1149 quic_config.socket_receive_buffer_size = Some(20 << 20);
1150 }
1151 quic_config.allow_failed_socket_buffer_size_setting = true;
1152
1153 if quic_config.max_concurrent_bidi_streams.is_none() {
1156 quic_config.max_concurrent_bidi_streams = Some(500);
1157 }
1158 if quic_config.max_concurrent_uni_streams.is_none() {
1159 quic_config.max_concurrent_uni_streams = Some(500);
1160 }
1161 if quic_config.stream_receive_window.is_none() {
1162 quic_config.stream_receive_window = Some(100 << 20);
1163 }
1164 if quic_config.receive_window.is_none() {
1165 quic_config.receive_window = Some(200 << 20);
1166 }
1167 if quic_config.send_window.is_none() {
1168 quic_config.send_window = Some(200 << 20);
1169 }
1170 if quic_config.crypto_buffer_size.is_none() {
1171 quic_config.crypto_buffer_size = Some(1 << 20);
1172 }
1173 if quic_config.max_idle_timeout_ms.is_none() {
1174 quic_config.max_idle_timeout_ms = Some(30_000);
1175 }
1176 if quic_config.keep_alive_interval_ms.is_none() {
1177 quic_config.keep_alive_interval_ms = Some(5_000);
1178 }
1179 anemo_config.quic = Some(quic_config);
1180
1181 let server_name = format!("iota-{chain_identifier}");
1182 let network = Network::bind(config.p2p_config.listen_address)
1183 .server_name(&server_name)
1184 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1185 .config(anemo_config)
1186 .outbound_request_layer(outbound_layer)
1187 .start(service)?;
1188 info!(
1189 server_name = server_name,
1190 "P2p network started on {}",
1191 network.local_addr()
1192 );
1193
1194 network
1195 };
1196
1197 let discovery_handle =
1198 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1199 let state_sync_handle = state_sync.start(p2p_network.clone());
1200 let randomness_handle = randomness.start(p2p_network.clone());
1201
1202 Ok((
1203 p2p_network,
1204 discovery_handle,
1205 state_sync_handle,
1206 randomness_handle,
1207 ))
1208 }
1209
1210 async fn construct_validator_components(
1213 config: NodeConfig,
1214 state: Arc<AuthorityState>,
1215 committee: Arc<Committee>,
1216 epoch_store: Arc<AuthorityPerEpochStore>,
1217 checkpoint_store: Arc<CheckpointStore>,
1218 state_sync_handle: state_sync::Handle,
1219 randomness_handle: randomness::Handle,
1220 accumulator: Weak<StateAccumulator>,
1221 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1222 registry_service: &RegistryService,
1223 iota_node_metrics: Arc<IotaNodeMetrics>,
1224 ) -> Result<ValidatorComponents> {
1225 let mut config_clone = config.clone();
1226 let consensus_config = config_clone
1227 .consensus_config
1228 .as_mut()
1229 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1230 let validator_registry = Registry::new();
1231 let validator_registry_id = registry_service.add(validator_registry.clone());
1232
1233 let client = Arc::new(UpdatableConsensusClient::new());
1234 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1235 &committee,
1236 consensus_config,
1237 state.name,
1238 connection_monitor_status.clone(),
1239 &validator_registry,
1240 client.clone(),
1241 ));
1242 let consensus_manager = ConsensusManager::new(
1243 &config,
1244 consensus_config,
1245 registry_service,
1246 &validator_registry,
1247 client,
1248 );
1249
1250 let consensus_store_pruner = ConsensusStorePruner::new(
1253 consensus_manager.get_storage_base_path(),
1254 consensus_config.db_retention_epochs(),
1255 consensus_config.db_pruner_period(),
1256 &validator_registry,
1257 );
1258
1259 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1260 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1261
1262 let (validator_server_handle, validator_server_cancel_handle) =
1263 Self::start_grpc_validator_service(
1264 &config,
1265 state.clone(),
1266 consensus_adapter.clone(),
1267 &validator_registry,
1268 )
1269 .await?;
1270
1271 let validator_overload_monitor_handle = if config
1274 .authority_overload_config
1275 .max_load_shedding_percentage
1276 > 0
1277 {
1278 let authority_state = Arc::downgrade(&state);
1279 let overload_config = config.authority_overload_config.clone();
1280 fail_point!("starting_overload_monitor");
1281 Some(spawn_monitored_task!(overload_monitor(
1282 authority_state,
1283 overload_config,
1284 )))
1285 } else {
1286 None
1287 };
1288
1289 Self::start_epoch_specific_validator_components(
1290 &config,
1291 state.clone(),
1292 consensus_adapter,
1293 checkpoint_store,
1294 epoch_store,
1295 state_sync_handle,
1296 randomness_handle,
1297 consensus_manager,
1298 consensus_store_pruner,
1299 accumulator,
1300 validator_server_handle,
1301 validator_server_cancel_handle,
1302 validator_overload_monitor_handle,
1303 checkpoint_metrics,
1304 iota_node_metrics,
1305 iota_tx_validator_metrics,
1306 validator_registry_id,
1307 )
1308 .await
1309 }
1310
1311 async fn start_epoch_specific_validator_components(
1314 config: &NodeConfig,
1315 state: Arc<AuthorityState>,
1316 consensus_adapter: Arc<ConsensusAdapter>,
1317 checkpoint_store: Arc<CheckpointStore>,
1318 epoch_store: Arc<AuthorityPerEpochStore>,
1319 state_sync_handle: state_sync::Handle,
1320 randomness_handle: randomness::Handle,
1321 consensus_manager: ConsensusManager,
1322 consensus_store_pruner: ConsensusStorePruner,
1323 accumulator: Weak<StateAccumulator>,
1324 validator_server_handle: SpawnOnce,
1325 validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
1326 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1327 checkpoint_metrics: Arc<CheckpointMetrics>,
1328 iota_node_metrics: Arc<IotaNodeMetrics>,
1329 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1330 validator_registry_id: RegistryID,
1331 ) -> Result<ValidatorComponents> {
1332 let checkpoint_service = Self::build_checkpoint_service(
1333 config,
1334 consensus_adapter.clone(),
1335 checkpoint_store,
1336 epoch_store.clone(),
1337 state.clone(),
1338 state_sync_handle,
1339 accumulator,
1340 checkpoint_metrics.clone(),
1341 );
1342
1343 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1348
1349 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1350
1351 let randomness_manager = RandomnessManager::try_new(
1352 Arc::downgrade(&epoch_store),
1353 Box::new(consensus_adapter.clone()),
1354 randomness_handle,
1355 config.authority_key_pair(),
1356 )
1357 .await;
1358 if let Some(randomness_manager) = randomness_manager {
1359 epoch_store
1360 .set_randomness_manager(randomness_manager)
1361 .await?;
1362 }
1363
1364 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1365 state.clone(),
1366 checkpoint_service.clone(),
1367 epoch_store.clone(),
1368 low_scoring_authorities,
1369 );
1370
1371 consensus_manager
1372 .start(
1373 config,
1374 epoch_store.clone(),
1375 consensus_handler_initializer,
1376 IotaTxValidator::new(
1377 epoch_store.clone(),
1378 checkpoint_service.clone(),
1379 state.transaction_manager().clone(),
1380 iota_tx_validator_metrics.clone(),
1381 ),
1382 )
1383 .await;
1384
1385 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1386
1387 if epoch_store.authenticator_state_enabled() {
1388 Self::start_jwk_updater(
1389 config,
1390 iota_node_metrics,
1391 state.name,
1392 epoch_store.clone(),
1393 consensus_adapter.clone(),
1394 );
1395 }
1396
1397 Ok(ValidatorComponents {
1398 validator_server_handle,
1399 validator_server_cancel_handle,
1400 validator_overload_monitor_handle,
1401 consensus_manager,
1402 consensus_store_pruner,
1403 consensus_adapter,
1404 checkpoint_service_tasks,
1405 checkpoint_metrics,
1406 iota_tx_validator_metrics,
1407 validator_registry_id,
1408 })
1409 }
1410
1411 fn build_checkpoint_service(
1418 config: &NodeConfig,
1419 consensus_adapter: Arc<ConsensusAdapter>,
1420 checkpoint_store: Arc<CheckpointStore>,
1421 epoch_store: Arc<AuthorityPerEpochStore>,
1422 state: Arc<AuthorityState>,
1423 state_sync_handle: state_sync::Handle,
1424 accumulator: Weak<StateAccumulator>,
1425 checkpoint_metrics: Arc<CheckpointMetrics>,
1426 ) -> Arc<CheckpointService> {
1427 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1428 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1429
1430 debug!(
1431 "Starting checkpoint service with epoch start timestamp {}
1432 and epoch duration {}",
1433 epoch_start_timestamp_ms, epoch_duration_ms
1434 );
1435
1436 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1437 sender: consensus_adapter,
1438 signer: state.secret.clone(),
1439 authority: config.authority_public_key(),
1440 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1441 .checked_add(epoch_duration_ms)
1442 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1443 metrics: checkpoint_metrics.clone(),
1444 });
1445
1446 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1447 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1448 let max_checkpoint_size_bytes =
1449 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1450
1451 CheckpointService::build(
1452 state.clone(),
1453 checkpoint_store,
1454 epoch_store,
1455 state.get_transaction_cache_reader().clone(),
1456 accumulator,
1457 checkpoint_output,
1458 Box::new(certified_checkpoint_output),
1459 checkpoint_metrics,
1460 max_tx_per_checkpoint,
1461 max_checkpoint_size_bytes,
1462 )
1463 }
1464
1465 fn construct_consensus_adapter(
1466 committee: &Committee,
1467 consensus_config: &ConsensusConfig,
1468 authority: AuthorityName,
1469 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1470 prometheus_registry: &Registry,
1471 consensus_client: Arc<dyn ConsensusClient>,
1472 ) -> ConsensusAdapter {
1473 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1474 ConsensusAdapter::new(
1478 consensus_client,
1479 authority,
1480 connection_monitor_status,
1481 consensus_config.max_pending_transactions(),
1482 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1483 consensus_config.max_submit_position,
1484 consensus_config.submit_delay_step_override(),
1485 ca_metrics,
1486 )
1487 }
1488
1489 async fn start_grpc_validator_service(
1490 config: &NodeConfig,
1491 state: Arc<AuthorityState>,
1492 consensus_adapter: Arc<ConsensusAdapter>,
1493 prometheus_registry: &Registry,
1494 ) -> Result<(SpawnOnce, tokio::sync::oneshot::Sender<()>)> {
1495 let validator_service = ValidatorService::new(
1496 state.clone(),
1497 consensus_adapter,
1498 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1499 TrafficControllerMetrics::new(prometheus_registry),
1500 config.policy_config.clone(),
1501 config.firewall_config.clone(),
1502 );
1503
1504 let mut server_conf = iota_network_stack::config::Config::new();
1505 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1506 server_conf.load_shed = config.grpc_load_shed;
1507 let mut server_builder =
1508 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1509
1510 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1511
1512 let tls_config = iota_tls::create_rustls_server_config(
1513 config.network_key_pair().copy().private(),
1514 IOTA_TLS_SERVER_NAME.to_string(),
1515 iota_tls::AllowAll,
1516 );
1517 let mut server = server_builder
1518 .bind(config.network_address(), Some(tls_config))
1519 .await
1520 .map_err(|err| anyhow!(err.to_string()))?;
1521 let cancel_handle = server
1522 .take_cancel_handle()
1523 .expect("GRPC server should still have a cancel handle");
1524 let local_addr = server.local_addr();
1525 info!("Listening to traffic on {local_addr}");
1526
1527 Ok((
1528 SpawnOnce::new(server.serve().map_err(Into::into)),
1529 cancel_handle,
1530 ))
1531 }
1532
1533 async fn reexecute_pending_consensus_certs(
1551 epoch_store: &Arc<AuthorityPerEpochStore>,
1552 state: &Arc<AuthorityState>,
1553 ) {
1554 let mut pending_consensus_certificates = Vec::new();
1555 let mut additional_certs = Vec::new();
1556
1557 for tx in epoch_store.get_all_pending_consensus_transactions() {
1558 match tx.kind {
1559 ConsensusTransactionKind::CertifiedTransaction(tx)
1562 if !tx.contains_shared_object() =>
1563 {
1564 let tx = *tx;
1565 let tx = VerifiedExecutableTransaction::new_from_certificate(
1568 VerifiedCertificate::new_unchecked(tx),
1569 );
1570 if let Some(fx_digest) = epoch_store
1573 .get_signed_effects_digest(tx.digest())
1574 .expect("db error")
1575 {
1576 pending_consensus_certificates.push((tx, fx_digest));
1577 } else {
1578 additional_certs.push(tx);
1579 }
1580 }
1581 _ => (),
1582 }
1583 }
1584
1585 let digests = pending_consensus_certificates
1586 .iter()
1587 .map(|(tx, _)| *tx.digest())
1588 .collect::<Vec<_>>();
1589
1590 info!(
1591 "reexecuting {} pending consensus certificates: {:?}",
1592 digests.len(),
1593 digests
1594 );
1595
1596 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1597 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1598
1599 let timeout = if cfg!(msim) { 120 } else { 60 };
1605 if tokio::time::timeout(
1606 std::time::Duration::from_secs(timeout),
1607 state
1608 .get_transaction_cache_reader()
1609 .notify_read_executed_effects_digests(&digests),
1610 )
1611 .await
1612 .is_err()
1613 {
1614 if let Ok(executed_effects_digests) = state
1616 .get_transaction_cache_reader()
1617 .multi_get_executed_effects_digests(&digests)
1618 {
1619 let pending_digests = digests
1620 .iter()
1621 .zip(executed_effects_digests.iter())
1622 .filter_map(|(digest, executed_effects_digest)| {
1623 if executed_effects_digest.is_none() {
1624 Some(digest)
1625 } else {
1626 None
1627 }
1628 })
1629 .collect::<Vec<_>>();
1630 debug_fatal!(
1631 "Timed out waiting for effects digests to be executed: {:?}",
1632 pending_digests
1633 );
1634 } else {
1635 debug_fatal!(
1636 "Timed out waiting for effects digests to be executed, digests not found"
1637 );
1638 }
1639 }
1640 }
1641
1642 pub fn state(&self) -> Arc<AuthorityState> {
1643 self.state.clone()
1644 }
1645
1646 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1648 self.state.reference_gas_price_for_testing()
1649 }
1650
1651 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1652 self.state.committee_store().clone()
1653 }
1654
1655 pub fn clone_authority_aggregator(
1665 &self,
1666 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1667 self.transaction_orchestrator
1668 .as_ref()
1669 .map(|to| to.clone_authority_aggregator())
1670 }
1671
1672 pub fn transaction_orchestrator(
1673 &self,
1674 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1675 self.transaction_orchestrator.clone()
1676 }
1677
1678 pub fn subscribe_to_transaction_orchestrator_effects(
1679 &self,
1680 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1681 self.transaction_orchestrator
1682 .as_ref()
1683 .map(|to| to.subscribe_to_effects_queue())
1684 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1685 }
1686
1687 pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
1693 let checkpoint_executor_metrics =
1694 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1695
1696 loop {
1697 let mut accumulator_guard = self.accumulator.lock().await;
1698 let accumulator = accumulator_guard.take().unwrap();
1699 let mut checkpoint_executor = CheckpointExecutor::new(
1700 self.state_sync_handle.subscribe_to_synced_checkpoints(),
1701 self.checkpoint_store.clone(),
1702 self.state.clone(),
1703 accumulator.clone(),
1704 self.config.checkpoint_executor_config.clone(),
1705 checkpoint_executor_metrics.clone(),
1706 );
1707
1708 let run_with_range = self.config.run_with_range;
1709
1710 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1711
1712 if let Some(components) = &*self.validator_components.lock().await {
1714 tokio::time::sleep(Duration::from_millis(1)).await;
1716
1717 let config = cur_epoch_store.protocol_config();
1718 let binary_config = to_binary_config(config);
1719 let transaction = ConsensusTransaction::new_capability_notification_v1(
1720 AuthorityCapabilitiesV1::new(
1721 self.state.name,
1722 cur_epoch_store.get_chain_identifier().chain(),
1723 self.config
1724 .supported_protocol_versions
1725 .expect("Supported versions should be populated")
1726 .truncate_below(config.version),
1728 self.state
1729 .get_available_system_packages(&binary_config)
1730 .await,
1731 ),
1732 );
1733 info!(?transaction, "submitting capabilities to consensus");
1734 components
1735 .consensus_adapter
1736 .submit(transaction, None, &cur_epoch_store)?;
1737 }
1738
1739 let stop_condition = checkpoint_executor
1740 .run_epoch(cur_epoch_store.clone(), run_with_range)
1741 .await;
1742 drop(checkpoint_executor);
1743
1744 if stop_condition == StopReason::RunWithRangeCondition {
1745 IotaNode::shutdown(&self).await;
1746 self.shutdown_channel_tx
1747 .send(run_with_range)
1748 .expect("RunWithRangeCondition met but failed to send shutdown message");
1749 return Ok(());
1750 }
1751
1752 let latest_system_state = self
1754 .state
1755 .get_object_cache_reader()
1756 .get_iota_system_state_object_unsafe()
1757 .expect("Read IOTA System State object cannot fail");
1758
1759 #[cfg(msim)]
1760 if !self
1761 .sim_state
1762 .sim_safe_mode_expected
1763 .load(Ordering::Relaxed)
1764 {
1765 debug_assert!(!latest_system_state.safe_mode());
1766 }
1767
1768 #[cfg(not(msim))]
1769 debug_assert!(!latest_system_state.safe_mode());
1770
1771 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1772 if self.state.is_fullnode(&cur_epoch_store) {
1773 warn!(
1774 "Failed to send end of epoch notification to subscriber: {:?}",
1775 err
1776 );
1777 }
1778 }
1779
1780 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1781 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1782
1783 self.auth_agg.store(Arc::new(
1784 self.auth_agg
1785 .load()
1786 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1787 ));
1788
1789 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1790 let next_epoch = next_epoch_committee.epoch();
1791 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1792
1793 info!(
1794 next_epoch,
1795 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1796 );
1797
1798 fail_point_async!("reconfig_delay");
1799
1800 let authority_names_to_peer_ids =
1805 new_epoch_start_state.get_authority_names_to_peer_ids();
1806 self.connection_monitor_status
1807 .update_mapping_for_epoch(authority_names_to_peer_ids);
1808
1809 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1810
1811 send_trusted_peer_change(
1812 &self.config,
1813 &self.trusted_peer_change_tx,
1814 &new_epoch_start_state,
1815 );
1816
1817 let new_validator_components = if let Some(ValidatorComponents {
1822 validator_server_handle,
1823 validator_server_cancel_handle,
1824 validator_overload_monitor_handle,
1825 consensus_manager,
1826 consensus_store_pruner,
1827 consensus_adapter,
1828 mut checkpoint_service_tasks,
1829 checkpoint_metrics,
1830 iota_tx_validator_metrics,
1831 validator_registry_id,
1832 }) = self.validator_components.lock().await.take()
1833 {
1834 info!("Reconfiguring the validator.");
1835 checkpoint_service_tasks.abort_all();
1840 while let Some(result) = checkpoint_service_tasks.join_next().await {
1841 if let Err(err) = result {
1842 if err.is_panic() {
1843 std::panic::resume_unwind(err.into_panic());
1844 }
1845 warn!("Error in checkpoint service task: {:?}", err);
1846 }
1847 }
1848 info!("Checkpoint service has shut down.");
1849
1850 consensus_manager.shutdown().await;
1851 info!("Consensus has shut down.");
1852
1853 let new_epoch_store = self
1854 .reconfigure_state(
1855 &self.state,
1856 &cur_epoch_store,
1857 next_epoch_committee.clone(),
1858 new_epoch_start_state,
1859 accumulator.clone(),
1860 )
1861 .await?;
1862 info!("Epoch store finished reconfiguration.");
1863
1864 let accumulator_metrics = Arc::into_inner(accumulator)
1867 .expect("Accumulator should have no other references at this point")
1868 .metrics();
1869 let new_accumulator = Arc::new(StateAccumulator::new(
1870 self.state.get_accumulator_store().clone(),
1871 accumulator_metrics,
1872 ));
1873 let weak_accumulator = Arc::downgrade(&new_accumulator);
1874 *accumulator_guard = Some(new_accumulator);
1875
1876 consensus_store_pruner.prune(next_epoch).await;
1877
1878 if self.state.is_validator(&new_epoch_store) {
1879 Some(
1881 Self::start_epoch_specific_validator_components(
1882 &self.config,
1883 self.state.clone(),
1884 consensus_adapter,
1885 self.checkpoint_store.clone(),
1886 new_epoch_store.clone(),
1887 self.state_sync_handle.clone(),
1888 self.randomness_handle.clone(),
1889 consensus_manager,
1890 consensus_store_pruner,
1891 weak_accumulator,
1892 validator_server_handle,
1893 validator_server_cancel_handle,
1894 validator_overload_monitor_handle,
1895 checkpoint_metrics,
1896 self.metrics.clone(),
1897 iota_tx_validator_metrics,
1898 validator_registry_id,
1899 )
1900 .await?,
1901 )
1902 } else {
1903 info!("This node is no longer a validator after reconfiguration");
1904 if self.registry_service.remove(validator_registry_id) {
1905 debug!("Removed validator metrics registry");
1906 } else {
1907 warn!("Failed to remove validator metrics registry");
1908 }
1909 if validator_server_cancel_handle.send(()).is_ok() {
1910 debug!("Validator grpc server cancelled");
1911 } else {
1912 warn!("Failed to cancel validator grpc server");
1913 }
1914
1915 None
1916 }
1917 } else {
1918 let new_epoch_store = self
1919 .reconfigure_state(
1920 &self.state,
1921 &cur_epoch_store,
1922 next_epoch_committee.clone(),
1923 new_epoch_start_state,
1924 accumulator.clone(),
1925 )
1926 .await?;
1927
1928 let accumulator_metrics = Arc::into_inner(accumulator)
1931 .expect("Accumulator should have no other references at this point")
1932 .metrics();
1933 let new_accumulator = Arc::new(StateAccumulator::new(
1934 self.state.get_accumulator_store().clone(),
1935 accumulator_metrics,
1936 ));
1937 let weak_accumulator = Arc::downgrade(&new_accumulator);
1938 *accumulator_guard = Some(new_accumulator);
1939
1940 if self.state.is_validator(&new_epoch_store) {
1941 info!("Promoting the node from fullnode to validator, starting grpc server");
1942
1943 let mut components = Self::construct_validator_components(
1944 self.config.clone(),
1945 self.state.clone(),
1946 Arc::new(next_epoch_committee.clone()),
1947 new_epoch_store.clone(),
1948 self.checkpoint_store.clone(),
1949 self.state_sync_handle.clone(),
1950 self.randomness_handle.clone(),
1951 weak_accumulator,
1952 self.connection_monitor_status.clone(),
1953 &self.registry_service,
1954 self.metrics.clone(),
1955 )
1956 .await?;
1957
1958 components.validator_server_handle = components.validator_server_handle.start();
1959
1960 Some(components)
1961 } else {
1962 None
1963 }
1964 };
1965 *self.validator_components.lock().await = new_validator_components;
1966
1967 cur_epoch_store.release_db_handles();
1970
1971 if cfg!(msim)
1972 && !matches!(
1973 self.config
1974 .authority_store_pruning_config
1975 .num_epochs_to_retain_for_checkpoints(),
1976 None | Some(u64::MAX) | Some(0)
1977 )
1978 {
1979 self.state
1980 .prune_checkpoints_for_eligible_epochs_for_testing(
1981 self.config.clone(),
1982 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1983 )
1984 .await?;
1985 }
1986
1987 info!("Reconfiguration finished");
1988 }
1989 }
1990
1991 async fn shutdown(&self) {
1992 if let Some(validator_components) = &*self.validator_components.lock().await {
1993 validator_components.consensus_manager.shutdown().await;
1994 }
1995 }
1996
1997 async fn reconfigure_state(
2000 &self,
2001 state: &Arc<AuthorityState>,
2002 cur_epoch_store: &AuthorityPerEpochStore,
2003 next_epoch_committee: Committee,
2004 next_epoch_start_system_state: EpochStartSystemState,
2005 accumulator: Arc<StateAccumulator>,
2006 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2007 let next_epoch = next_epoch_committee.epoch();
2008
2009 let last_checkpoint = self
2010 .checkpoint_store
2011 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2012 .expect("Error loading last checkpoint for current epoch")
2013 .expect("Could not load last checkpoint for current epoch");
2014 let epoch_supply_change = last_checkpoint
2015 .end_of_epoch_data
2016 .as_ref()
2017 .ok_or_else(|| {
2018 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2019 })?
2020 .epoch_supply_change;
2021
2022 let epoch_start_configuration = EpochStartConfiguration::new(
2023 next_epoch_start_system_state,
2024 *last_checkpoint.digest(),
2025 state.get_object_store().as_ref(),
2026 EpochFlag::default_flags_for_new_epoch(&state.config),
2027 )
2028 .expect("EpochStartConfiguration construction cannot fail");
2029
2030 let new_epoch_store = self
2031 .state
2032 .reconfigure(
2033 cur_epoch_store,
2034 self.config.supported_protocol_versions.unwrap(),
2035 next_epoch_committee,
2036 epoch_start_configuration,
2037 accumulator,
2038 &self.config.expensive_safety_check_config,
2039 epoch_supply_change,
2040 )
2041 .await
2042 .expect("Reconfigure authority state cannot fail");
2043 info!(next_epoch, "Node State has been reconfigured");
2044 assert_eq!(next_epoch, new_epoch_store.epoch());
2045 self.state.get_reconfig_api().update_epoch_flags_metrics(
2046 cur_epoch_store.epoch_start_config().flags(),
2047 new_epoch_store.epoch_start_config().flags(),
2048 );
2049
2050 Ok(new_epoch_store)
2051 }
2052
2053 pub fn get_config(&self) -> &NodeConfig {
2054 &self.config
2055 }
2056
2057 async fn execute_transaction_immediately_at_zero_epoch(
2058 state: &Arc<AuthorityState>,
2059 epoch_store: &Arc<AuthorityPerEpochStore>,
2060 tx: &Transaction,
2061 span: tracing::Span,
2062 ) {
2063 let transaction =
2064 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2065 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2066 tx.data().clone(),
2067 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2068 ),
2069 );
2070 state
2071 .try_execute_immediately(&transaction, None, epoch_store)
2072 .instrument(span)
2073 .await
2074 .unwrap();
2075 }
2076
2077 pub fn randomness_handle(&self) -> randomness::Handle {
2078 self.randomness_handle.clone()
2079 }
2080}
2081
2082#[cfg(not(msim))]
2083impl IotaNode {
2084 async fn fetch_jwks(
2085 _authority: AuthorityName,
2086 provider: &OIDCProvider,
2087 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2088 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2089 let client = reqwest::Client::new();
2090 fetch_jwks(provider, &client)
2091 .await
2092 .map_err(|_| IotaError::JWKRetrieval)
2093 }
2094}
2095
2096#[cfg(msim)]
2097impl IotaNode {
2098 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2099 self.sim_state.sim_node.id()
2100 }
2101
2102 pub fn set_safe_mode_expected(&self, new_value: bool) {
2103 info!("Setting safe mode expected to {}", new_value);
2104 self.sim_state
2105 .sim_safe_mode_expected
2106 .store(new_value, Ordering::Relaxed);
2107 }
2108
2109 async fn fetch_jwks(
2110 authority: AuthorityName,
2111 provider: &OIDCProvider,
2112 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2113 get_jwk_injector()(authority, provider)
2114 }
2115}
2116
2117enum SpawnOnce {
2118 Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2120 #[allow(unused)]
2121 Started(JoinHandle<Result<()>>),
2122}
2123
2124impl SpawnOnce {
2125 pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2126 Self::Unstarted(Mutex::new(Box::pin(future)))
2127 }
2128
2129 pub fn start(self) -> Self {
2130 match self {
2131 Self::Unstarted(future) => {
2132 let future = future.into_inner();
2133 let handle = tokio::spawn(future);
2134 Self::Started(handle)
2135 }
2136 Self::Started(_) => self,
2137 }
2138 }
2139}
2140
2141fn send_trusted_peer_change(
2144 config: &NodeConfig,
2145 sender: &watch::Sender<TrustedPeerChangeEvent>,
2146 new_epoch_start_state: &EpochStartSystemState,
2147) {
2148 let new_committee =
2149 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2150
2151 sender.send_modify(|event| {
2152 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2153 event.new_committee = new_committee;
2154 })
2155}
2156
2157fn build_kv_store(
2158 state: &Arc<AuthorityState>,
2159 config: &NodeConfig,
2160 registry: &Registry,
2161) -> Result<Arc<TransactionKeyValueStore>> {
2162 let metrics = KeyValueStoreMetrics::new(registry);
2163 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2164
2165 let base_url = &config.transaction_kv_store_read_config.base_url;
2166
2167 if base_url.is_empty() {
2168 info!("no http kv store url provided, using local db only");
2169 return Ok(Arc::new(db_store));
2170 }
2171
2172 base_url.parse::<url::Url>().tap_err(|e| {
2173 error!(
2174 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2175 base_url, e
2176 )
2177 })?;
2178
2179 let http_store = HttpKVStore::new_kv(
2180 base_url,
2181 config.transaction_kv_store_read_config.cache_size,
2182 metrics.clone(),
2183 )?;
2184 info!("using local key-value store with fallback to http key-value store");
2185 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2186 db_store,
2187 http_store,
2188 metrics,
2189 "json_rpc_fallback",
2190 )))
2191}
2192
2193pub async fn build_http_server(
2210 state: Arc<AuthorityState>,
2211 store: RocksDbStore,
2212 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2213 config: &NodeConfig,
2214 prometheus_registry: &Registry,
2215 _custom_runtime: Option<Handle>,
2216 software_version: &'static str,
2217) -> Result<Option<tokio::task::JoinHandle<()>>> {
2218 if config.consensus_config().is_some() {
2220 return Ok(None);
2221 }
2222
2223 let mut router = axum::Router::new();
2224
2225 let json_rpc_router = {
2226 let mut server = JsonRpcServerBuilder::new(
2227 env!("CARGO_PKG_VERSION"),
2228 prometheus_registry,
2229 config.policy_config.clone(),
2230 config.firewall_config.clone(),
2231 );
2232
2233 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2234
2235 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2236 server.register_module(ReadApi::new(
2237 state.clone(),
2238 kv_store.clone(),
2239 metrics.clone(),
2240 ))?;
2241 server.register_module(CoinReadApi::new(
2242 state.clone(),
2243 kv_store.clone(),
2244 metrics.clone(),
2245 )?)?;
2246
2247 if config.run_with_range.is_none() {
2250 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2251 }
2252 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2253
2254 if let Some(transaction_orchestrator) = transaction_orchestrator {
2255 server.register_module(TransactionExecutionApi::new(
2256 state.clone(),
2257 transaction_orchestrator.clone(),
2258 metrics.clone(),
2259 ))?;
2260 }
2261
2262 let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2265
2266 server.register_module(IndexerApi::new(
2267 state.clone(),
2268 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2269 kv_store,
2270 metrics,
2271 iota_names_config,
2272 config.indexer_max_subscriptions,
2273 ))?;
2274 server.register_module(MoveUtils::new(state.clone()))?;
2275
2276 let server_type = config.jsonrpc_server_type();
2277
2278 server.to_router(server_type).await?
2279 };
2280
2281 router = router.merge(json_rpc_router);
2282
2283 if config.enable_rest_api {
2284 let mut rest_service = iota_rest_api::RestService::new(
2285 Arc::new(RestReadStore::new(state.clone(), store)),
2286 software_version,
2287 );
2288
2289 if let Some(config) = config.rest.clone() {
2290 rest_service.with_config(config);
2291 }
2292
2293 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2294
2295 if let Some(transaction_orchestrator) = transaction_orchestrator {
2296 rest_service.with_executor(transaction_orchestrator.clone())
2297 }
2298
2299 router = router.merge(rest_service.into_router());
2300 }
2301
2302 router = router
2305 .route("/health", axum::routing::get(health_check_handler))
2306 .route_layer(axum::Extension(state));
2307
2308 let listener = tokio::net::TcpListener::bind(&config.json_rpc_address)
2309 .await
2310 .unwrap();
2311 let addr = listener.local_addr().unwrap();
2312
2313 router = router.layer(axum::middleware::from_fn(server_timing_middleware));
2314
2315 let handle = tokio::spawn(async move {
2316 axum::serve(
2317 listener,
2318 router.into_make_service_with_connect_info::<SocketAddr>(),
2319 )
2320 .await
2321 .unwrap()
2322 });
2323
2324 info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
2325
2326 Ok(Some(handle))
2327}
2328
2329#[derive(Debug, serde::Serialize, serde::Deserialize)]
2330pub struct Threshold {
2331 pub threshold_seconds: Option<u32>,
2332}
2333
2334async fn health_check_handler(
2335 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2336 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2337) -> impl axum::response::IntoResponse {
2338 if let Some(threshold_seconds) = threshold_seconds {
2339 let summary = match state
2341 .get_checkpoint_store()
2342 .get_highest_executed_checkpoint()
2343 {
2344 Ok(Some(summary)) => summary,
2345 Ok(None) => {
2346 warn!("Highest executed checkpoint not found");
2347 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2348 }
2349 Err(err) => {
2350 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2351 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2352 }
2353 };
2354
2355 let latest_chain_time = summary.timestamp();
2357 let threshold =
2358 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2359
2360 if latest_chain_time < threshold {
2362 warn!(
2363 ?latest_chain_time,
2364 ?threshold,
2365 "failing health check due to checkpoint lag"
2366 );
2367 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2368 }
2369 }
2370 (axum::http::StatusCode::OK, "up")
2372}
2373
2374#[cfg(not(test))]
2375fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2376 protocol_config.max_transactions_per_checkpoint() as usize
2377}
2378
2379#[cfg(test)]
2380fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2381 2
2382}