1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::{TryFutureExt, future::BoxFuture};
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
47 authority_client::NetworkAuthorityClient,
48 authority_server::{ValidatorService, ValidatorServiceMetrics},
49 checkpoints::{
50 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
51 SubmitCheckpointToConsensus,
52 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
53 },
54 connection_monitor::ConnectionMonitor,
55 consensus_adapter::{
56 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
57 ConsensusClient,
58 },
59 consensus_handler::ConsensusHandlerInitializer,
60 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
61 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
62 db_checkpoint_handler::DBCheckpointHandler,
63 epoch::{
64 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
65 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
66 reconfiguration::ReconfigurationInitiator,
67 },
68 execution_cache::build_execution_cache,
69 jsonrpc_index::IndexStore,
70 module_cache_metrics::ResolverMetrics,
71 overload_monitor::overload_monitor,
72 rest_index::RestIndexStore,
73 safe_client::SafeClientMetricsBase,
74 signature_verifier::SignatureVerifierMetrics,
75 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
76 storage::{RestReadStore, RocksDbStore},
77 traffic_controller::metrics::TrafficControllerMetrics,
78 transaction_orchestrator::TransactionOrchestrator,
79 validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_json_rpc::{
82 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
83 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
84 transaction_builder_api::TransactionBuilderApi,
85 transaction_execution_api::TransactionExecutionApi,
86};
87use iota_json_rpc_api::JsonRpcMetrics;
88use iota_macros::{fail_point, fail_point_async, replay_log};
89use iota_metrics::{
90 RegistryID, RegistryService,
91 hardware_metrics::register_hardware_metrics,
92 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
93 server_timing_middleware, spawn_monitored_task,
94};
95use iota_network::{
96 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
97};
98use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
99use iota_protocol_config::ProtocolConfig;
100use iota_rest_api::RestMetrics;
101use iota_snapshot::uploader::StateSnapshotUploader;
102use iota_storage::{
103 FileCompression, StorageFormat,
104 http_key_value_store::HttpKVStore,
105 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
106 key_value_store_metrics::KeyValueStoreMetrics,
107};
108use iota_types::{
109 base_types::{AuthorityName, ConciseableName, EpochId},
110 committee::Committee,
111 crypto::{KeypairTraits, RandomnessRound},
112 digests::ChainIdentifier,
113 error::{IotaError, IotaResult},
114 executable_transaction::VerifiedExecutableTransaction,
115 execution_config_utils::to_binary_config,
116 iota_system_state::{
117 IotaSystemState, IotaSystemStateTrait,
118 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
119 },
120 messages_consensus::{
121 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
122 check_total_jwk_size,
123 },
124 quorum_driver_types::QuorumDriverEffectsQueueResult,
125 supported_protocol_versions::SupportedProtocolVersions,
126 transaction::{Transaction, VerifiedCertificate},
127};
128use prometheus::Registry;
129#[cfg(msim)]
130pub use simulator::set_jwk_injector;
131#[cfg(msim)]
132use simulator::*;
133use tap::tap::TapFallible;
134use tokio::{
135 runtime::Handle,
136 sync::{Mutex, broadcast, mpsc, watch},
137 task::{JoinHandle, JoinSet},
138};
139use tower::ServiceBuilder;
140use tracing::{Instrument, debug, error, error_span, info, warn};
141use typed_store::{
142 DBMetrics,
143 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
144};
145
146use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
147
148pub mod admin;
149mod handle;
150pub mod metrics;
151
152pub struct ValidatorComponents {
153 validator_server_spawn_handle: SpawnOnce,
154 validator_server_handle: iota_http::ServerHandle,
155 validator_overload_monitor_handle: Option<JoinHandle<()>>,
156 consensus_manager: ConsensusManager,
157 consensus_store_pruner: ConsensusStorePruner,
158 consensus_adapter: Arc<ConsensusAdapter>,
159 checkpoint_service_tasks: JoinSet<()>,
161 checkpoint_metrics: Arc<CheckpointMetrics>,
162 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
163 validator_registry_id: RegistryID,
164}
165
166#[cfg(msim)]
167mod simulator {
168 use std::sync::atomic::AtomicBool;
169
170 use super::*;
171
172 pub(super) struct SimState {
173 pub sim_node: iota_simulator::runtime::NodeHandle,
174 pub sim_safe_mode_expected: AtomicBool,
175 _leak_detector: iota_simulator::NodeLeakDetector,
176 }
177
178 impl Default for SimState {
179 fn default() -> Self {
180 Self {
181 sim_node: iota_simulator::runtime::NodeHandle::current(),
182 sim_safe_mode_expected: AtomicBool::new(false),
183 _leak_detector: iota_simulator::NodeLeakDetector::new(),
184 }
185 }
186 }
187
188 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
189 + Send
190 + Sync
191 + 'static;
192
193 fn default_fetch_jwks(
194 _authority: AuthorityName,
195 _provider: &OIDCProvider,
196 ) -> IotaResult<Vec<(JwkId, JWK)>> {
197 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
198 parse_jwks(
200 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
201 &OIDCProvider::Twitch,
202 )
203 .map_err(|_| IotaError::JWKRetrieval)
204 }
205
206 thread_local! {
207 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
208 }
209
210 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
211 JWK_INJECTOR.with(|injector| injector.borrow().clone())
212 }
213
214 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
215 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
216 }
217}
218
219pub struct IotaNode {
220 config: NodeConfig,
221 validator_components: Mutex<Option<ValidatorComponents>>,
222 _http_server: Option<iota_http::ServerHandle>,
225 state: Arc<AuthorityState>,
226 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
227 registry_service: RegistryService,
228 metrics: Arc<IotaNodeMetrics>,
229
230 _discovery: discovery::Handle,
231 state_sync_handle: state_sync::Handle,
232 randomness_handle: randomness::Handle,
233 checkpoint_store: Arc<CheckpointStore>,
234 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
235 connection_monitor_status: Arc<ConnectionMonitorStatus>,
236
237 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
239
240 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
243
244 backpressure_manager: Arc<BackpressureManager>,
245
246 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
247
248 #[cfg(msim)]
249 sim_state: SimState,
250
251 _state_archive_handle: Option<broadcast::Sender<()>>,
252
253 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
254 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
256
257 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
263}
264
265impl fmt::Debug for IotaNode {
266 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
267 f.debug_struct("IotaNode")
268 .field("name", &self.state.name.concise())
269 .finish()
270 }
271}
272
273static MAX_JWK_KEYS_PER_FETCH: usize = 100;
274
275impl IotaNode {
276 pub async fn start(
277 config: NodeConfig,
278 registry_service: RegistryService,
279 custom_rpc_runtime: Option<Handle>,
280 ) -> Result<Arc<IotaNode>> {
281 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
282 }
283
284 fn start_jwk_updater(
289 config: &NodeConfig,
290 metrics: Arc<IotaNodeMetrics>,
291 authority: AuthorityName,
292 epoch_store: Arc<AuthorityPerEpochStore>,
293 consensus_adapter: Arc<ConsensusAdapter>,
294 ) {
295 let epoch = epoch_store.epoch();
296
297 let supported_providers = config
298 .zklogin_oauth_providers
299 .get(&epoch_store.get_chain_identifier().chain())
300 .unwrap_or(&BTreeSet::new())
301 .iter()
302 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
303 .collect::<Vec<_>>();
304
305 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
306
307 info!(
308 ?fetch_interval,
309 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
310 );
311
312 fn validate_jwk(
313 metrics: &Arc<IotaNodeMetrics>,
314 provider: &OIDCProvider,
315 id: &JwkId,
316 jwk: &JWK,
317 ) -> bool {
318 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
319 warn!(
320 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
321 id.iss, provider
322 );
323 metrics
324 .invalid_jwks
325 .with_label_values(&[&provider.to_string()])
326 .inc();
327 return false;
328 };
329
330 if iss_provider != *provider {
331 warn!(
332 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
333 id.iss, provider, iss_provider
334 );
335 metrics
336 .invalid_jwks
337 .with_label_values(&[&provider.to_string()])
338 .inc();
339 return false;
340 }
341
342 if !check_total_jwk_size(id, jwk) {
343 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
344 metrics
345 .invalid_jwks
346 .with_label_values(&[&provider.to_string()])
347 .inc();
348 return false;
349 }
350
351 true
352 }
353
354 for p in supported_providers.into_iter() {
363 let provider_str = p.to_string();
364 let epoch_store = epoch_store.clone();
365 let consensus_adapter = consensus_adapter.clone();
366 let metrics = metrics.clone();
367 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
368 async move {
369 let mut seen = HashSet::new();
372 loop {
373 info!("fetching JWK for provider {:?}", p);
374 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
375 match Self::fetch_jwks(authority, &p).await {
376 Err(e) => {
377 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
378 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
379 tokio::time::sleep(Duration::from_secs(30)).await;
381 continue;
382 }
383 Ok(mut keys) => {
384 metrics.total_jwks
385 .with_label_values(&[&provider_str])
386 .inc_by(keys.len() as u64);
387
388 keys.retain(|(id, jwk)| {
389 validate_jwk(&metrics, &p, id, jwk) &&
390 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
391 seen.insert((id.clone(), jwk.clone()))
392 });
393
394 metrics.unique_jwks
395 .with_label_values(&[&provider_str])
396 .inc_by(keys.len() as u64);
397
398 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
401 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
402 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
403 }
404
405 for (id, jwk) in keys.into_iter() {
406 info!("Submitting JWK to consensus: {:?}", id);
407
408 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
409 consensus_adapter.submit(txn, None, &epoch_store)
410 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
411 .ok();
412 }
413 }
414 }
415 tokio::time::sleep(fetch_interval).await;
416 }
417 }
418 .instrument(error_span!("jwk_updater_task", epoch)),
419 ));
420 }
421 }
422
423 pub async fn start_async(
424 config: NodeConfig,
425 registry_service: RegistryService,
426 custom_rpc_runtime: Option<Handle>,
427 software_version: &'static str,
428 ) -> Result<Arc<IotaNode>> {
429 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
430 let mut config = config.clone();
431 if config.supported_protocol_versions.is_none() {
432 info!(
433 "populating config.supported_protocol_versions with default {:?}",
434 SupportedProtocolVersions::SYSTEM_DEFAULT
435 );
436 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
437 }
438
439 let run_with_range = config.run_with_range;
440 let is_validator = config.consensus_config().is_some();
441 let is_full_node = !is_validator;
442 let prometheus_registry = registry_service.default_registry();
443
444 info!(node =? config.authority_public_key(),
445 "Initializing iota-node listening on {}", config.network_address
446 );
447
448 let genesis = config.genesis()?.clone();
449
450 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
451 info!("IOTA chain identifier: {chain_identifier}");
452
453 let db_corrupted_path = &config.db_path().join("status");
455 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
456 panic!("Failed to check database corruption: {err}");
457 }
458
459 DBMetrics::init(&prometheus_registry);
461
462 iota_metrics::init_metrics(&prometheus_registry);
464 #[cfg(not(msim))]
467 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
468
469 register_hardware_metrics(®istry_service, &config.db_path)
471 .expect("Failed registering hardware metrics");
472 prometheus_registry
474 .register(iota_metrics::uptime_metric(
475 if is_validator {
476 "validator"
477 } else {
478 "fullnode"
479 },
480 software_version,
481 &chain_identifier.to_string(),
482 ))
483 .expect("Failed registering uptime metric");
484
485 let migration_tx_data = if genesis.contains_migrations() {
488 Some(config.load_migration_tx_data()?)
491 } else {
492 None
493 };
494
495 let secret = Arc::pin(config.authority_key_pair().copy());
496 let genesis_committee = genesis.committee()?;
497 let committee_store = Arc::new(CommitteeStore::new(
498 config.db_path().join("epochs"),
499 &genesis_committee,
500 None,
501 ));
502
503 let mut pruner_db = None;
504 if config
505 .authority_store_pruning_config
506 .enable_compaction_filter
507 {
508 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
509 &config.db_path().join("store"),
510 )));
511 }
512 let compaction_filter = pruner_db
513 .clone()
514 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
515
516 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
518 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
519 enable_write_stall,
520 compaction_filter,
521 };
522 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
523 &config.db_path().join("store"),
524 Some(perpetual_tables_options),
525 ));
526 let is_genesis = perpetual_tables
527 .database_is_empty()
528 .expect("Database read should not fail at init.");
529 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
530 let backpressure_manager =
531 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
532
533 let store = AuthorityStore::open(
534 perpetual_tables,
535 &genesis,
536 &config,
537 &prometheus_registry,
538 migration_tx_data.as_ref(),
539 )
540 .await?;
541
542 let cur_epoch = store.get_recovery_epoch_at_restart()?;
543 let committee = committee_store
544 .get_committee(&cur_epoch)?
545 .expect("Committee of the current epoch must exist");
546 let epoch_start_configuration = store
547 .get_epoch_start_configuration()?
548 .expect("EpochStartConfiguration of the current epoch must exist");
549 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
550 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
551
552 let cache_traits = build_execution_cache(
553 &config.execution_cache_config,
554 &epoch_start_configuration,
555 &prometheus_registry,
556 &store,
557 backpressure_manager.clone(),
558 );
559
560 let auth_agg = {
561 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
562 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
563 Arc::new(ArcSwap::new(Arc::new(
564 AuthorityAggregator::new_from_epoch_start_state(
565 epoch_start_configuration.epoch_start_state(),
566 &committee_store,
567 safe_client_metrics_base,
568 auth_agg_metrics,
569 ),
570 )))
571 };
572
573 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
574 let epoch_store = AuthorityPerEpochStore::new(
575 config.authority_public_key(),
576 committee.clone(),
577 &config.db_path().join("store"),
578 Some(epoch_options.options),
579 EpochMetrics::new(®istry_service.default_registry()),
580 epoch_start_configuration,
581 cache_traits.backing_package_store.clone(),
582 cache_traits.object_store.clone(),
583 cache_metrics,
584 signature_verifier_metrics,
585 &config.expensive_safety_check_config,
586 ChainIdentifier::from(*genesis.checkpoint().digest()),
587 checkpoint_store
588 .get_highest_executed_checkpoint_seq_number()
589 .expect("checkpoint store read cannot fail")
590 .unwrap_or(0),
591 );
592
593 info!("created epoch store");
594
595 replay_log!(
596 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
597 epoch_store.epoch(),
598 epoch_store.protocol_config()
599 );
600
601 if is_genesis {
603 info!("checking IOTA conservation at genesis");
604 cache_traits
609 .reconfig_api
610 .try_expensive_check_iota_conservation(&epoch_store, None)
611 .expect("IOTA conservation check cannot fail at genesis");
612 }
613
614 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
615 let default_buffer_stake = epoch_store
616 .protocol_config()
617 .buffer_stake_for_protocol_upgrade_bps();
618 if effective_buffer_stake != default_buffer_stake {
619 warn!(
620 ?effective_buffer_stake,
621 ?default_buffer_stake,
622 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
623 );
624 }
625
626 checkpoint_store.insert_genesis_checkpoint(
627 genesis.checkpoint(),
628 genesis.checkpoint_contents().clone(),
629 &epoch_store,
630 );
631
632 unmark_db_corruption(db_corrupted_path)?;
634
635 info!("creating state sync store");
636 let state_sync_store = RocksDbStore::new(
637 cache_traits.clone(),
638 committee_store.clone(),
639 checkpoint_store.clone(),
640 );
641
642 let index_store = if is_full_node && config.enable_index_processing {
643 info!("creating index store");
644 Some(Arc::new(IndexStore::new(
645 config.db_path().join("indexes"),
646 &prometheus_registry,
647 epoch_store
648 .protocol_config()
649 .max_move_identifier_len_as_option(),
650 config.remove_deprecated_tables,
651 )))
652 } else {
653 None
654 };
655
656 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
657 {
658 Some(Arc::new(RestIndexStore::new(
659 config.db_path().join("rest_index"),
660 &store,
661 &checkpoint_store,
662 &epoch_store,
663 &cache_traits.backing_package_store,
664 )))
665 } else {
666 None
667 };
668
669 info!("creating archive reader");
670 let archive_readers =
675 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
676 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
677 let (randomness_tx, randomness_rx) = mpsc::channel(
678 config
679 .p2p_config
680 .randomness
681 .clone()
682 .unwrap_or_default()
683 .mailbox_capacity(),
684 );
685 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
686 Self::create_p2p_network(
687 &config,
688 state_sync_store.clone(),
689 chain_identifier,
690 trusted_peer_change_rx,
691 archive_readers.clone(),
692 randomness_tx,
693 &prometheus_registry,
694 )?;
695
696 send_trusted_peer_change(
699 &config,
700 &trusted_peer_change_tx,
701 epoch_store.epoch_start_state(),
702 );
703
704 info!("start state archival");
705 let state_archive_handle =
707 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
708 .await?;
709
710 info!("start snapshot upload");
711 let state_snapshot_handle =
713 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
714
715 info!("start db checkpoint");
717 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
718 &config,
719 &prometheus_registry,
720 state_snapshot_handle.is_some(),
721 )?;
722
723 let mut genesis_objects = genesis.objects().to_vec();
724 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
725 genesis_objects.extend(migration_tx_data.get_objects());
726 }
727
728 let authority_name = config.authority_public_key();
729 let validator_tx_finalizer =
730 config
731 .enable_validator_tx_finalizer
732 .then_some(Arc::new(ValidatorTxFinalizer::new(
733 auth_agg.clone(),
734 authority_name,
735 &prometheus_registry,
736 )));
737
738 info!("create authority state");
739 let state = AuthorityState::new(
740 authority_name,
741 secret,
742 config.supported_protocol_versions.unwrap(),
743 store.clone(),
744 cache_traits.clone(),
745 epoch_store.clone(),
746 committee_store.clone(),
747 index_store.clone(),
748 rest_index,
749 checkpoint_store.clone(),
750 &prometheus_registry,
751 &genesis_objects,
752 &db_checkpoint_config,
753 config.clone(),
754 archive_readers,
755 validator_tx_finalizer,
756 chain_identifier,
757 pruner_db,
758 )
759 .await;
760
761 if epoch_store.epoch() == 0 {
763 let genesis_tx = &genesis.transaction();
764 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
765 Self::execute_transaction_immediately_at_zero_epoch(
767 &state,
768 &epoch_store,
769 genesis_tx,
770 span,
771 )
772 .await;
773
774 if let Some(migration_tx_data) = migration_tx_data {
776 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
777 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
778 Self::execute_transaction_immediately_at_zero_epoch(
779 &state,
780 &epoch_store,
781 tx,
782 span,
783 )
784 .await;
785 }
786 }
787 }
788
789 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
792
793 if config
794 .expensive_safety_check_config
795 .enable_secondary_index_checks()
796 {
797 if let Some(indexes) = state.indexes.clone() {
798 iota_core::verify_indexes::verify_indexes(
799 state.get_accumulator_store().as_ref(),
800 indexes,
801 )
802 .expect("secondary indexes are inconsistent");
803 }
804 }
805
806 let (end_of_epoch_channel, end_of_epoch_receiver) =
807 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
808
809 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
810 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
811 auth_agg.load_full(),
812 state.clone(),
813 end_of_epoch_receiver,
814 &config.db_path(),
815 &prometheus_registry,
816 )))
817 } else {
818 None
819 };
820
821 let http_server = build_http_server(
822 state.clone(),
823 state_sync_store,
824 &transaction_orchestrator.clone(),
825 &config,
826 &prometheus_registry,
827 custom_rpc_runtime,
828 software_version,
829 )
830 .await?;
831
832 let accumulator = Arc::new(StateAccumulator::new(
833 cache_traits.accumulator_store.clone(),
834 StateAccumulatorMetrics::new(&prometheus_registry),
835 ));
836
837 let authority_names_to_peer_ids = epoch_store
838 .epoch_start_state()
839 .get_authority_names_to_peer_ids();
840
841 let network_connection_metrics =
842 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
843
844 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
845
846 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
847 p2p_network.downgrade(),
848 network_connection_metrics,
849 HashMap::new(),
850 None,
851 );
852
853 let connection_monitor_status = ConnectionMonitorStatus {
854 connection_statuses,
855 authority_names_to_peer_ids,
856 };
857
858 let connection_monitor_status = Arc::new(connection_monitor_status);
859 let iota_node_metrics =
860 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
861
862 let validator_components = if state.is_validator(&epoch_store) {
863 let (components, _) = futures::join!(
864 Self::construct_validator_components(
865 config.clone(),
866 state.clone(),
867 committee,
868 epoch_store.clone(),
869 checkpoint_store.clone(),
870 state_sync_handle.clone(),
871 randomness_handle.clone(),
872 Arc::downgrade(&accumulator),
873 backpressure_manager.clone(),
874 connection_monitor_status.clone(),
875 ®istry_service,
876 iota_node_metrics.clone(),
877 ),
878 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
879 );
880 let mut components = components?;
881
882 components.consensus_adapter.submit_recovered(&epoch_store);
883
884 components.validator_server_spawn_handle =
886 components.validator_server_spawn_handle.start();
887
888 Some(components)
889 } else {
890 None
891 };
892
893 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
895
896 let node = Self {
897 config,
898 validator_components: Mutex::new(validator_components),
899 _http_server: http_server,
900 state,
901 transaction_orchestrator,
902 registry_service,
903 metrics: iota_node_metrics,
904
905 _discovery: discovery_handle,
906 state_sync_handle,
907 randomness_handle,
908 checkpoint_store,
909 accumulator: Mutex::new(Some(accumulator)),
910 end_of_epoch_channel,
911 connection_monitor_status,
912 trusted_peer_change_tx,
913 backpressure_manager,
914
915 _db_checkpoint_handle: db_checkpoint_handle,
916
917 #[cfg(msim)]
918 sim_state: Default::default(),
919
920 _state_archive_handle: state_archive_handle,
921 _state_snapshot_uploader_handle: state_snapshot_handle,
922 shutdown_channel_tx: shutdown_channel,
923
924 auth_agg,
925 };
926
927 info!("IotaNode started!");
928 let node = Arc::new(node);
929 let node_copy = node.clone();
930 spawn_monitored_task!(async move {
931 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
932 if let Err(error) = result {
933 warn!("Reconfiguration finished with error {:?}", error);
934 }
935 });
936
937 Ok(node)
938 }
939
940 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
941 self.end_of_epoch_channel.subscribe()
942 }
943
944 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
945 self.shutdown_channel_tx.subscribe()
946 }
947
948 pub fn current_epoch_for_testing(&self) -> EpochId {
949 self.state.current_epoch_for_testing()
950 }
951
952 pub fn db_checkpoint_path(&self) -> PathBuf {
953 self.config.db_checkpoint_path()
954 }
955
956 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
958 info!("close_epoch (current epoch = {})", epoch_store.epoch());
959 self.validator_components
960 .lock()
961 .await
962 .as_ref()
963 .ok_or_else(|| IotaError::from("Node is not a validator"))?
964 .consensus_adapter
965 .close_epoch(epoch_store);
966 Ok(())
967 }
968
969 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
970 self.state
971 .clear_override_protocol_upgrade_buffer_stake(epoch)
972 }
973
974 pub fn set_override_protocol_upgrade_buffer_stake(
975 &self,
976 epoch: EpochId,
977 buffer_stake_bps: u64,
978 ) -> IotaResult {
979 self.state
980 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
981 }
982
983 pub async fn close_epoch_for_testing(&self) -> IotaResult {
986 let epoch_store = self.state.epoch_store_for_testing();
987 self.close_epoch(&epoch_store).await
988 }
989
990 async fn start_state_archival(
991 config: &NodeConfig,
992 prometheus_registry: &Registry,
993 state_sync_store: RocksDbStore,
994 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
996 let local_store_config = ObjectStoreConfig {
997 object_store: Some(ObjectStoreType::File),
998 directory: Some(config.archive_path()),
999 ..Default::default()
1000 };
1001 let archive_writer = ArchiveWriter::new(
1002 local_store_config,
1003 remote_store_config.clone(),
1004 FileCompression::Zstd,
1005 StorageFormat::Blob,
1006 Duration::from_secs(600),
1007 256 * 1024 * 1024,
1008 prometheus_registry,
1009 )
1010 .await?;
1011 Ok(Some(archive_writer.start(state_sync_store).await?))
1012 } else {
1013 Ok(None)
1014 }
1015 }
1016
1017 fn start_state_snapshot(
1020 config: &NodeConfig,
1021 prometheus_registry: &Registry,
1022 checkpoint_store: Arc<CheckpointStore>,
1023 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1024 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1025 let snapshot_uploader = StateSnapshotUploader::new(
1026 &config.db_checkpoint_path(),
1027 &config.snapshot_path(),
1028 remote_store_config.clone(),
1029 60,
1030 prometheus_registry,
1031 checkpoint_store,
1032 )?;
1033 Ok(Some(snapshot_uploader.start()))
1034 } else {
1035 Ok(None)
1036 }
1037 }
1038
1039 fn start_db_checkpoint(
1040 config: &NodeConfig,
1041 prometheus_registry: &Registry,
1042 state_snapshot_enabled: bool,
1043 ) -> Result<(
1044 DBCheckpointConfig,
1045 Option<tokio::sync::broadcast::Sender<()>>,
1046 )> {
1047 let checkpoint_path = Some(
1048 config
1049 .db_checkpoint_config
1050 .checkpoint_path
1051 .clone()
1052 .unwrap_or_else(|| config.db_checkpoint_path()),
1053 );
1054 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1055 DBCheckpointConfig {
1056 checkpoint_path,
1057 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1058 true
1059 } else {
1060 config
1061 .db_checkpoint_config
1062 .perform_db_checkpoints_at_epoch_end
1063 },
1064 ..config.db_checkpoint_config.clone()
1065 }
1066 } else {
1067 config.db_checkpoint_config.clone()
1068 };
1069
1070 match (
1071 db_checkpoint_config.object_store_config.as_ref(),
1072 state_snapshot_enabled,
1073 ) {
1074 (None, false) => Ok((db_checkpoint_config, None)),
1079 (_, _) => {
1080 let handler = DBCheckpointHandler::new(
1081 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1082 db_checkpoint_config.object_store_config.as_ref(),
1083 60,
1084 db_checkpoint_config
1085 .prune_and_compact_before_upload
1086 .unwrap_or(true),
1087 config.authority_store_pruning_config.clone(),
1088 prometheus_registry,
1089 state_snapshot_enabled,
1090 )?;
1091 Ok((
1092 db_checkpoint_config,
1093 Some(DBCheckpointHandler::start(handler)),
1094 ))
1095 }
1096 }
1097 }
1098
1099 fn create_p2p_network(
1100 config: &NodeConfig,
1101 state_sync_store: RocksDbStore,
1102 chain_identifier: ChainIdentifier,
1103 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1104 archive_readers: ArchiveReaderBalancer,
1105 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1106 prometheus_registry: &Registry,
1107 ) -> Result<(
1108 Network,
1109 discovery::Handle,
1110 state_sync::Handle,
1111 randomness::Handle,
1112 )> {
1113 let (state_sync, state_sync_server) = state_sync::Builder::new()
1114 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1115 .store(state_sync_store)
1116 .archive_readers(archive_readers)
1117 .with_metrics(prometheus_registry)
1118 .build();
1119
1120 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1121 .config(config.p2p_config.clone())
1122 .build();
1123
1124 let (randomness, randomness_router) =
1125 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1126 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1127 .with_metrics(prometheus_registry)
1128 .build();
1129
1130 let p2p_network = {
1131 let routes = anemo::Router::new()
1132 .add_rpc_service(discovery_server)
1133 .add_rpc_service(state_sync_server);
1134 let routes = routes.merge(randomness_router);
1135
1136 let inbound_network_metrics =
1137 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1138 let outbound_network_metrics =
1139 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1140
1141 let service = ServiceBuilder::new()
1142 .layer(
1143 TraceLayer::new_for_server_errors()
1144 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1145 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1146 )
1147 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1148 Arc::new(inbound_network_metrics),
1149 config.p2p_config.excessive_message_size(),
1150 )))
1151 .service(routes);
1152
1153 let outbound_layer = ServiceBuilder::new()
1154 .layer(
1155 TraceLayer::new_for_client_and_server_errors()
1156 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1157 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1158 )
1159 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1160 Arc::new(outbound_network_metrics),
1161 config.p2p_config.excessive_message_size(),
1162 )))
1163 .into_inner();
1164
1165 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1166 anemo_config.max_frame_size = Some(1 << 30);
1169
1170 let mut quic_config = anemo_config.quic.unwrap_or_default();
1173 if quic_config.socket_send_buffer_size.is_none() {
1174 quic_config.socket_send_buffer_size = Some(20 << 20);
1175 }
1176 if quic_config.socket_receive_buffer_size.is_none() {
1177 quic_config.socket_receive_buffer_size = Some(20 << 20);
1178 }
1179 quic_config.allow_failed_socket_buffer_size_setting = true;
1180
1181 if quic_config.max_concurrent_bidi_streams.is_none() {
1184 quic_config.max_concurrent_bidi_streams = Some(500);
1185 }
1186 if quic_config.max_concurrent_uni_streams.is_none() {
1187 quic_config.max_concurrent_uni_streams = Some(500);
1188 }
1189 if quic_config.stream_receive_window.is_none() {
1190 quic_config.stream_receive_window = Some(100 << 20);
1191 }
1192 if quic_config.receive_window.is_none() {
1193 quic_config.receive_window = Some(200 << 20);
1194 }
1195 if quic_config.send_window.is_none() {
1196 quic_config.send_window = Some(200 << 20);
1197 }
1198 if quic_config.crypto_buffer_size.is_none() {
1199 quic_config.crypto_buffer_size = Some(1 << 20);
1200 }
1201 if quic_config.max_idle_timeout_ms.is_none() {
1202 quic_config.max_idle_timeout_ms = Some(30_000);
1203 }
1204 if quic_config.keep_alive_interval_ms.is_none() {
1205 quic_config.keep_alive_interval_ms = Some(5_000);
1206 }
1207 anemo_config.quic = Some(quic_config);
1208
1209 let server_name = format!("iota-{chain_identifier}");
1210 let network = Network::bind(config.p2p_config.listen_address)
1211 .server_name(&server_name)
1212 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1213 .config(anemo_config)
1214 .outbound_request_layer(outbound_layer)
1215 .start(service)?;
1216 info!(
1217 server_name = server_name,
1218 "P2p network started on {}",
1219 network.local_addr()
1220 );
1221
1222 network
1223 };
1224
1225 let discovery_handle =
1226 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1227 let state_sync_handle = state_sync.start(p2p_network.clone());
1228 let randomness_handle = randomness.start(p2p_network.clone());
1229
1230 Ok((
1231 p2p_network,
1232 discovery_handle,
1233 state_sync_handle,
1234 randomness_handle,
1235 ))
1236 }
1237
1238 async fn construct_validator_components(
1241 config: NodeConfig,
1242 state: Arc<AuthorityState>,
1243 committee: Arc<Committee>,
1244 epoch_store: Arc<AuthorityPerEpochStore>,
1245 checkpoint_store: Arc<CheckpointStore>,
1246 state_sync_handle: state_sync::Handle,
1247 randomness_handle: randomness::Handle,
1248 accumulator: Weak<StateAccumulator>,
1249 backpressure_manager: Arc<BackpressureManager>,
1250 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1251 registry_service: &RegistryService,
1252 iota_node_metrics: Arc<IotaNodeMetrics>,
1253 ) -> Result<ValidatorComponents> {
1254 let mut config_clone = config.clone();
1255 let consensus_config = config_clone
1256 .consensus_config
1257 .as_mut()
1258 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1259 let validator_registry = Registry::new();
1260 let validator_registry_id = registry_service.add(validator_registry.clone());
1261
1262 let client = Arc::new(UpdatableConsensusClient::new());
1263 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1264 &committee,
1265 consensus_config,
1266 state.name,
1267 connection_monitor_status.clone(),
1268 &validator_registry,
1269 client.clone(),
1270 checkpoint_store.clone(),
1271 ));
1272 let consensus_manager = ConsensusManager::new(
1273 &config,
1274 consensus_config,
1275 registry_service,
1276 &validator_registry,
1277 client,
1278 );
1279
1280 let consensus_store_pruner = ConsensusStorePruner::new(
1283 consensus_manager.get_storage_base_path(),
1284 consensus_config.db_retention_epochs(),
1285 consensus_config.db_pruner_period(),
1286 &validator_registry,
1287 );
1288
1289 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1290 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1291
1292 let (validator_server_spawn_handle, validator_server_handle) =
1293 Self::start_grpc_validator_service(
1294 &config,
1295 state.clone(),
1296 consensus_adapter.clone(),
1297 &validator_registry,
1298 )
1299 .await?;
1300
1301 let validator_overload_monitor_handle = if config
1304 .authority_overload_config
1305 .max_load_shedding_percentage
1306 > 0
1307 {
1308 let authority_state = Arc::downgrade(&state);
1309 let overload_config = config.authority_overload_config.clone();
1310 fail_point!("starting_overload_monitor");
1311 Some(spawn_monitored_task!(overload_monitor(
1312 authority_state,
1313 overload_config,
1314 )))
1315 } else {
1316 None
1317 };
1318
1319 Self::start_epoch_specific_validator_components(
1320 &config,
1321 state.clone(),
1322 consensus_adapter,
1323 checkpoint_store,
1324 epoch_store,
1325 state_sync_handle,
1326 randomness_handle,
1327 consensus_manager,
1328 consensus_store_pruner,
1329 accumulator,
1330 backpressure_manager,
1331 validator_server_spawn_handle,
1332 validator_server_handle,
1333 validator_overload_monitor_handle,
1334 checkpoint_metrics,
1335 iota_node_metrics,
1336 iota_tx_validator_metrics,
1337 validator_registry_id,
1338 )
1339 .await
1340 }
1341
1342 async fn start_epoch_specific_validator_components(
1345 config: &NodeConfig,
1346 state: Arc<AuthorityState>,
1347 consensus_adapter: Arc<ConsensusAdapter>,
1348 checkpoint_store: Arc<CheckpointStore>,
1349 epoch_store: Arc<AuthorityPerEpochStore>,
1350 state_sync_handle: state_sync::Handle,
1351 randomness_handle: randomness::Handle,
1352 consensus_manager: ConsensusManager,
1353 consensus_store_pruner: ConsensusStorePruner,
1354 accumulator: Weak<StateAccumulator>,
1355 backpressure_manager: Arc<BackpressureManager>,
1356 validator_server_spawn_handle: SpawnOnce,
1357 validator_server_handle: iota_http::ServerHandle,
1358 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1359 checkpoint_metrics: Arc<CheckpointMetrics>,
1360 iota_node_metrics: Arc<IotaNodeMetrics>,
1361 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1362 validator_registry_id: RegistryID,
1363 ) -> Result<ValidatorComponents> {
1364 let checkpoint_service = Self::build_checkpoint_service(
1365 config,
1366 consensus_adapter.clone(),
1367 checkpoint_store.clone(),
1368 epoch_store.clone(),
1369 state.clone(),
1370 state_sync_handle,
1371 accumulator,
1372 checkpoint_metrics.clone(),
1373 );
1374
1375 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1380
1381 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1382
1383 let randomness_manager = RandomnessManager::try_new(
1384 Arc::downgrade(&epoch_store),
1385 Box::new(consensus_adapter.clone()),
1386 randomness_handle,
1387 config.authority_key_pair(),
1388 )
1389 .await;
1390 if let Some(randomness_manager) = randomness_manager {
1391 epoch_store
1392 .set_randomness_manager(randomness_manager)
1393 .await?;
1394 }
1395
1396 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1397 state.clone(),
1398 checkpoint_service.clone(),
1399 epoch_store.clone(),
1400 low_scoring_authorities,
1401 backpressure_manager,
1402 );
1403
1404 info!("Starting consensus manager");
1405
1406 consensus_manager
1407 .start(
1408 config,
1409 epoch_store.clone(),
1410 consensus_handler_initializer,
1411 IotaTxValidator::new(
1412 epoch_store.clone(),
1413 checkpoint_service.clone(),
1414 state.transaction_manager().clone(),
1415 iota_tx_validator_metrics.clone(),
1416 ),
1417 )
1418 .await;
1419
1420 if !epoch_store
1421 .epoch_start_config()
1422 .is_data_quarantine_active_from_beginning_of_epoch()
1423 {
1424 checkpoint_store
1425 .reexecute_local_checkpoints(&state, &epoch_store)
1426 .await;
1427 }
1428
1429 info!("Spawning checkpoint service");
1430 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1431
1432 if epoch_store.authenticator_state_enabled() {
1433 Self::start_jwk_updater(
1434 config,
1435 iota_node_metrics,
1436 state.name,
1437 epoch_store.clone(),
1438 consensus_adapter.clone(),
1439 );
1440 }
1441
1442 Ok(ValidatorComponents {
1443 validator_server_spawn_handle,
1444 validator_server_handle,
1445 validator_overload_monitor_handle,
1446 consensus_manager,
1447 consensus_store_pruner,
1448 consensus_adapter,
1449 checkpoint_service_tasks,
1450 checkpoint_metrics,
1451 iota_tx_validator_metrics,
1452 validator_registry_id,
1453 })
1454 }
1455
1456 fn build_checkpoint_service(
1463 config: &NodeConfig,
1464 consensus_adapter: Arc<ConsensusAdapter>,
1465 checkpoint_store: Arc<CheckpointStore>,
1466 epoch_store: Arc<AuthorityPerEpochStore>,
1467 state: Arc<AuthorityState>,
1468 state_sync_handle: state_sync::Handle,
1469 accumulator: Weak<StateAccumulator>,
1470 checkpoint_metrics: Arc<CheckpointMetrics>,
1471 ) -> Arc<CheckpointService> {
1472 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1473 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1474
1475 debug!(
1476 "Starting checkpoint service with epoch start timestamp {}
1477 and epoch duration {}",
1478 epoch_start_timestamp_ms, epoch_duration_ms
1479 );
1480
1481 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1482 sender: consensus_adapter,
1483 signer: state.secret.clone(),
1484 authority: config.authority_public_key(),
1485 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1486 .checked_add(epoch_duration_ms)
1487 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1488 metrics: checkpoint_metrics.clone(),
1489 });
1490
1491 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1492 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1493 let max_checkpoint_size_bytes =
1494 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1495
1496 CheckpointService::build(
1497 state.clone(),
1498 checkpoint_store,
1499 epoch_store,
1500 state.get_transaction_cache_reader().clone(),
1501 accumulator,
1502 checkpoint_output,
1503 Box::new(certified_checkpoint_output),
1504 checkpoint_metrics,
1505 max_tx_per_checkpoint,
1506 max_checkpoint_size_bytes,
1507 )
1508 }
1509
1510 fn construct_consensus_adapter(
1511 committee: &Committee,
1512 consensus_config: &ConsensusConfig,
1513 authority: AuthorityName,
1514 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1515 prometheus_registry: &Registry,
1516 consensus_client: Arc<dyn ConsensusClient>,
1517 checkpoint_store: Arc<CheckpointStore>,
1518 ) -> ConsensusAdapter {
1519 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1520 ConsensusAdapter::new(
1524 consensus_client,
1525 checkpoint_store,
1526 authority,
1527 connection_monitor_status,
1528 consensus_config.max_pending_transactions(),
1529 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1530 consensus_config.max_submit_position,
1531 consensus_config.submit_delay_step_override(),
1532 ca_metrics,
1533 )
1534 }
1535
1536 async fn start_grpc_validator_service(
1537 config: &NodeConfig,
1538 state: Arc<AuthorityState>,
1539 consensus_adapter: Arc<ConsensusAdapter>,
1540 prometheus_registry: &Registry,
1541 ) -> Result<(SpawnOnce, iota_http::ServerHandle)> {
1542 let validator_service = ValidatorService::new(
1543 state.clone(),
1544 consensus_adapter,
1545 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1546 TrafficControllerMetrics::new(prometheus_registry),
1547 config.policy_config.clone(),
1548 config.firewall_config.clone(),
1549 );
1550
1551 let mut server_conf = iota_network_stack::config::Config::new();
1552 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1553 server_conf.load_shed = config.grpc_load_shed;
1554 let mut server_builder =
1555 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1556
1557 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1558
1559 let tls_config = iota_tls::create_rustls_server_config(
1560 config.network_key_pair().copy().private(),
1561 IOTA_TLS_SERVER_NAME.to_string(),
1562 );
1563
1564 let server = server_builder
1565 .bind(config.network_address(), Some(tls_config))
1566 .await
1567 .map_err(|err| anyhow!(err.to_string()))?;
1568
1569 let local_addr = server.local_addr();
1570 info!("Listening to traffic on {local_addr}");
1571
1572 let server_handle = server.handle().clone();
1574
1575 Ok((
1576 SpawnOnce::new(server.serve().map_err(Into::into)),
1577 server_handle,
1578 ))
1579 }
1580
1581 async fn reexecute_pending_consensus_certs(
1599 epoch_store: &Arc<AuthorityPerEpochStore>,
1600 state: &Arc<AuthorityState>,
1601 ) {
1602 let mut pending_consensus_certificates = Vec::new();
1603 let mut additional_certs = Vec::new();
1604
1605 for tx in epoch_store.get_all_pending_consensus_transactions() {
1606 match tx.kind {
1607 ConsensusTransactionKind::CertifiedTransaction(tx)
1610 if !tx.contains_shared_object() =>
1611 {
1612 let tx = *tx;
1613 let tx = VerifiedExecutableTransaction::new_from_certificate(
1616 VerifiedCertificate::new_unchecked(tx),
1617 );
1618 if let Some(fx_digest) = epoch_store
1621 .get_signed_effects_digest(tx.digest())
1622 .expect("db error")
1623 {
1624 pending_consensus_certificates.push((tx, fx_digest));
1625 } else {
1626 additional_certs.push(tx);
1627 }
1628 }
1629 _ => (),
1630 }
1631 }
1632
1633 let digests = pending_consensus_certificates
1634 .iter()
1635 .map(|(tx, _)| *tx.digest())
1636 .collect::<Vec<_>>();
1637
1638 info!(
1639 "reexecuting {} pending consensus certificates: {:?}",
1640 digests.len(),
1641 digests
1642 );
1643
1644 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1645 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1646
1647 let timeout = if cfg!(msim) { 120 } else { 60 };
1653 if tokio::time::timeout(
1654 std::time::Duration::from_secs(timeout),
1655 state
1656 .get_transaction_cache_reader()
1657 .try_notify_read_executed_effects_digests(&digests),
1658 )
1659 .await
1660 .is_err()
1661 {
1662 if let Ok(executed_effects_digests) = state
1664 .get_transaction_cache_reader()
1665 .try_multi_get_executed_effects_digests(&digests)
1666 {
1667 let pending_digests = digests
1668 .iter()
1669 .zip(executed_effects_digests.iter())
1670 .filter_map(|(digest, executed_effects_digest)| {
1671 if executed_effects_digest.is_none() {
1672 Some(digest)
1673 } else {
1674 None
1675 }
1676 })
1677 .collect::<Vec<_>>();
1678 debug_fatal!(
1679 "Timed out waiting for effects digests to be executed: {:?}",
1680 pending_digests
1681 );
1682 } else {
1683 debug_fatal!(
1684 "Timed out waiting for effects digests to be executed, digests not found"
1685 );
1686 }
1687 }
1688 }
1689
1690 pub fn state(&self) -> Arc<AuthorityState> {
1691 self.state.clone()
1692 }
1693
1694 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1696 self.state.reference_gas_price_for_testing()
1697 }
1698
1699 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1700 self.state.committee_store().clone()
1701 }
1702
1703 pub fn clone_authority_aggregator(
1713 &self,
1714 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1715 self.transaction_orchestrator
1716 .as_ref()
1717 .map(|to| to.clone_authority_aggregator())
1718 }
1719
1720 pub fn transaction_orchestrator(
1721 &self,
1722 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1723 self.transaction_orchestrator.clone()
1724 }
1725
1726 pub fn subscribe_to_transaction_orchestrator_effects(
1727 &self,
1728 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1729 self.transaction_orchestrator
1730 .as_ref()
1731 .map(|to| to.subscribe_to_effects_queue())
1732 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1733 }
1734
1735 pub async fn monitor_reconfiguration(
1741 self: Arc<Self>,
1742 mut epoch_store: Arc<AuthorityPerEpochStore>,
1743 ) -> Result<()> {
1744 let checkpoint_executor_metrics =
1745 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1746
1747 loop {
1748 let mut accumulator_guard = self.accumulator.lock().await;
1749 let accumulator = accumulator_guard.take().unwrap();
1750 info!(
1751 "Creating checkpoint executor for epoch {}",
1752 epoch_store.epoch()
1753 );
1754 let checkpoint_executor = CheckpointExecutor::new(
1755 epoch_store.clone(),
1756 self.checkpoint_store.clone(),
1757 self.state.clone(),
1758 accumulator.clone(),
1759 self.backpressure_manager.clone(),
1760 self.config.checkpoint_executor_config.clone(),
1761 checkpoint_executor_metrics.clone(),
1762 );
1763
1764 let run_with_range = self.config.run_with_range;
1765
1766 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1767
1768 if let Some(components) = &*self.validator_components.lock().await {
1770 tokio::time::sleep(Duration::from_millis(1)).await;
1772
1773 let config = cur_epoch_store.protocol_config();
1774 let binary_config = to_binary_config(config);
1775 let transaction = ConsensusTransaction::new_capability_notification_v1(
1776 AuthorityCapabilitiesV1::new(
1777 self.state.name,
1778 cur_epoch_store.get_chain_identifier().chain(),
1779 self.config
1780 .supported_protocol_versions
1781 .expect("Supported versions should be populated")
1782 .truncate_below(config.version),
1784 self.state
1785 .get_available_system_packages(&binary_config)
1786 .await,
1787 ),
1788 );
1789 info!(?transaction, "submitting capabilities to consensus");
1790 components
1791 .consensus_adapter
1792 .submit(transaction, None, &cur_epoch_store)?;
1793 }
1794
1795 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1796
1797 if stop_condition == StopReason::RunWithRangeCondition {
1798 IotaNode::shutdown(&self).await;
1799 self.shutdown_channel_tx
1800 .send(run_with_range)
1801 .expect("RunWithRangeCondition met but failed to send shutdown message");
1802 return Ok(());
1803 }
1804
1805 let latest_system_state = self
1807 .state
1808 .get_object_cache_reader()
1809 .try_get_iota_system_state_object_unsafe()
1810 .expect("Read IOTA System State object cannot fail");
1811
1812 #[cfg(msim)]
1813 if !self
1814 .sim_state
1815 .sim_safe_mode_expected
1816 .load(Ordering::Relaxed)
1817 {
1818 debug_assert!(!latest_system_state.safe_mode());
1819 }
1820
1821 #[cfg(not(msim))]
1822 debug_assert!(!latest_system_state.safe_mode());
1823
1824 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1825 if self.state.is_fullnode(&cur_epoch_store) {
1826 warn!(
1827 "Failed to send end of epoch notification to subscriber: {:?}",
1828 err
1829 );
1830 }
1831 }
1832
1833 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1834 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1835
1836 self.auth_agg.store(Arc::new(
1837 self.auth_agg
1838 .load()
1839 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1840 ));
1841
1842 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1843 let next_epoch = next_epoch_committee.epoch();
1844 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1845
1846 info!(
1847 next_epoch,
1848 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1849 );
1850
1851 fail_point_async!("reconfig_delay");
1852
1853 let authority_names_to_peer_ids =
1858 new_epoch_start_state.get_authority_names_to_peer_ids();
1859 self.connection_monitor_status
1860 .update_mapping_for_epoch(authority_names_to_peer_ids);
1861
1862 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1863
1864 send_trusted_peer_change(
1865 &self.config,
1866 &self.trusted_peer_change_tx,
1867 &new_epoch_start_state,
1868 );
1869
1870 let mut validator_components_lock_guard = self.validator_components.lock().await;
1871
1872 let new_epoch_store = self
1876 .reconfigure_state(
1877 &self.state,
1878 &cur_epoch_store,
1879 next_epoch_committee.clone(),
1880 new_epoch_start_state,
1881 accumulator.clone(),
1882 )
1883 .await?;
1884
1885 let new_validator_components = if let Some(ValidatorComponents {
1886 validator_server_spawn_handle,
1887 validator_server_handle,
1888 validator_overload_monitor_handle,
1889 consensus_manager,
1890 consensus_store_pruner,
1891 consensus_adapter,
1892 mut checkpoint_service_tasks,
1893 checkpoint_metrics,
1894 iota_tx_validator_metrics,
1895 validator_registry_id,
1896 }) = validator_components_lock_guard.take()
1897 {
1898 info!("Reconfiguring the validator.");
1899 checkpoint_service_tasks.abort_all();
1904 while let Some(result) = checkpoint_service_tasks.join_next().await {
1905 if let Err(err) = result {
1906 if err.is_panic() {
1907 std::panic::resume_unwind(err.into_panic());
1908 }
1909 warn!("Error in checkpoint service task: {:?}", err);
1910 }
1911 }
1912 info!("Checkpoint service has shut down.");
1913
1914 consensus_manager.shutdown().await;
1915 info!("Consensus has shut down.");
1916
1917 info!("Epoch store finished reconfiguration.");
1918
1919 let accumulator_metrics = Arc::into_inner(accumulator)
1922 .expect("Accumulator should have no other references at this point")
1923 .metrics();
1924 let new_accumulator = Arc::new(StateAccumulator::new(
1925 self.state.get_accumulator_store().clone(),
1926 accumulator_metrics,
1927 ));
1928 let weak_accumulator = Arc::downgrade(&new_accumulator);
1929 *accumulator_guard = Some(new_accumulator);
1930
1931 consensus_store_pruner.prune(next_epoch).await;
1932
1933 if self.state.is_validator(&new_epoch_store) {
1934 Some(
1936 Self::start_epoch_specific_validator_components(
1937 &self.config,
1938 self.state.clone(),
1939 consensus_adapter,
1940 self.checkpoint_store.clone(),
1941 new_epoch_store.clone(),
1942 self.state_sync_handle.clone(),
1943 self.randomness_handle.clone(),
1944 consensus_manager,
1945 consensus_store_pruner,
1946 weak_accumulator,
1947 self.backpressure_manager.clone(),
1948 validator_server_spawn_handle,
1949 validator_server_handle,
1950 validator_overload_monitor_handle,
1951 checkpoint_metrics,
1952 self.metrics.clone(),
1953 iota_tx_validator_metrics,
1954 validator_registry_id,
1955 )
1956 .await?,
1957 )
1958 } else {
1959 info!("This node is no longer a validator after reconfiguration");
1960 if self.registry_service.remove(validator_registry_id) {
1961 debug!("Removed validator metrics registry");
1962 } else {
1963 warn!("Failed to remove validator metrics registry");
1964 }
1965 validator_server_handle.trigger_shutdown();
1966 debug!("Validator grpc server shutdown triggered");
1967
1968 None
1969 }
1970 } else {
1971 let accumulator_metrics = Arc::into_inner(accumulator)
1974 .expect("Accumulator should have no other references at this point")
1975 .metrics();
1976 let new_accumulator = Arc::new(StateAccumulator::new(
1977 self.state.get_accumulator_store().clone(),
1978 accumulator_metrics,
1979 ));
1980 let weak_accumulator = Arc::downgrade(&new_accumulator);
1981 *accumulator_guard = Some(new_accumulator);
1982
1983 if self.state.is_validator(&new_epoch_store) {
1984 info!("Promoting the node from fullnode to validator, starting grpc server");
1985
1986 let mut components = Self::construct_validator_components(
1987 self.config.clone(),
1988 self.state.clone(),
1989 Arc::new(next_epoch_committee.clone()),
1990 new_epoch_store.clone(),
1991 self.checkpoint_store.clone(),
1992 self.state_sync_handle.clone(),
1993 self.randomness_handle.clone(),
1994 weak_accumulator,
1995 self.backpressure_manager.clone(),
1996 self.connection_monitor_status.clone(),
1997 &self.registry_service,
1998 self.metrics.clone(),
1999 )
2000 .await?;
2001
2002 components.validator_server_spawn_handle =
2003 components.validator_server_spawn_handle.start();
2004
2005 Some(components)
2006 } else {
2007 None
2008 }
2009 };
2010 *validator_components_lock_guard = new_validator_components;
2011
2012 cur_epoch_store.release_db_handles();
2015
2016 if cfg!(msim)
2017 && !matches!(
2018 self.config
2019 .authority_store_pruning_config
2020 .num_epochs_to_retain_for_checkpoints(),
2021 None | Some(u64::MAX) | Some(0)
2022 )
2023 {
2024 self.state
2025 .prune_checkpoints_for_eligible_epochs_for_testing(
2026 self.config.clone(),
2027 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2028 )
2029 .await?;
2030 }
2031
2032 epoch_store = new_epoch_store;
2033 info!("Reconfiguration finished");
2034 }
2035 }
2036
2037 async fn shutdown(&self) {
2038 if let Some(validator_components) = &*self.validator_components.lock().await {
2039 validator_components.consensus_manager.shutdown().await;
2040 }
2041 }
2042
2043 async fn reconfigure_state(
2046 &self,
2047 state: &Arc<AuthorityState>,
2048 cur_epoch_store: &AuthorityPerEpochStore,
2049 next_epoch_committee: Committee,
2050 next_epoch_start_system_state: EpochStartSystemState,
2051 accumulator: Arc<StateAccumulator>,
2052 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2053 let next_epoch = next_epoch_committee.epoch();
2054
2055 let last_checkpoint = self
2056 .checkpoint_store
2057 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2058 .expect("Error loading last checkpoint for current epoch")
2059 .expect("Could not load last checkpoint for current epoch");
2060 let epoch_supply_change = last_checkpoint
2061 .end_of_epoch_data
2062 .as_ref()
2063 .ok_or_else(|| {
2064 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2065 })?
2066 .epoch_supply_change;
2067
2068 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2069
2070 assert_eq!(
2071 Some(last_checkpoint_seq),
2072 self.checkpoint_store
2073 .get_highest_executed_checkpoint_seq_number()
2074 .expect("Error loading highest executed checkpoint sequence number")
2075 );
2076
2077 let epoch_start_configuration = EpochStartConfiguration::new(
2078 next_epoch_start_system_state,
2079 *last_checkpoint.digest(),
2080 state.get_object_store().as_ref(),
2081 EpochFlag::default_flags_for_new_epoch(&state.config),
2082 )
2083 .expect("EpochStartConfiguration construction cannot fail");
2084
2085 let new_epoch_store = self
2086 .state
2087 .reconfigure(
2088 cur_epoch_store,
2089 self.config.supported_protocol_versions.unwrap(),
2090 next_epoch_committee,
2091 epoch_start_configuration,
2092 accumulator,
2093 &self.config.expensive_safety_check_config,
2094 epoch_supply_change,
2095 last_checkpoint_seq,
2096 )
2097 .await
2098 .expect("Reconfigure authority state cannot fail");
2099 info!(next_epoch, "Node State has been reconfigured");
2100 assert_eq!(next_epoch, new_epoch_store.epoch());
2101 self.state.get_reconfig_api().update_epoch_flags_metrics(
2102 cur_epoch_store.epoch_start_config().flags(),
2103 new_epoch_store.epoch_start_config().flags(),
2104 );
2105
2106 Ok(new_epoch_store)
2107 }
2108
2109 pub fn get_config(&self) -> &NodeConfig {
2110 &self.config
2111 }
2112
2113 async fn execute_transaction_immediately_at_zero_epoch(
2114 state: &Arc<AuthorityState>,
2115 epoch_store: &Arc<AuthorityPerEpochStore>,
2116 tx: &Transaction,
2117 span: tracing::Span,
2118 ) {
2119 let _guard = span.enter();
2120 let transaction =
2121 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2122 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2123 tx.data().clone(),
2124 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2125 ),
2126 );
2127 state
2128 .try_execute_immediately(&transaction, None, epoch_store)
2129 .unwrap();
2130 }
2131
2132 pub fn randomness_handle(&self) -> randomness::Handle {
2133 self.randomness_handle.clone()
2134 }
2135}
2136
2137#[cfg(not(msim))]
2138impl IotaNode {
2139 async fn fetch_jwks(
2140 _authority: AuthorityName,
2141 provider: &OIDCProvider,
2142 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2143 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2144 let client = reqwest::Client::new();
2145 fetch_jwks(provider, &client)
2146 .await
2147 .map_err(|_| IotaError::JWKRetrieval)
2148 }
2149}
2150
2151#[cfg(msim)]
2152impl IotaNode {
2153 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2154 self.sim_state.sim_node.id()
2155 }
2156
2157 pub fn set_safe_mode_expected(&self, new_value: bool) {
2158 info!("Setting safe mode expected to {}", new_value);
2159 self.sim_state
2160 .sim_safe_mode_expected
2161 .store(new_value, Ordering::Relaxed);
2162 }
2163
2164 async fn fetch_jwks(
2165 authority: AuthorityName,
2166 provider: &OIDCProvider,
2167 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2168 get_jwk_injector()(authority, provider)
2169 }
2170}
2171
2172enum SpawnOnce {
2173 Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2175 #[allow(unused)]
2176 Started(JoinHandle<Result<()>>),
2177}
2178
2179impl SpawnOnce {
2180 pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2181 Self::Unstarted(Mutex::new(Box::pin(future)))
2182 }
2183
2184 pub fn start(self) -> Self {
2185 match self {
2186 Self::Unstarted(future) => {
2187 let future = future.into_inner();
2188 let handle = tokio::spawn(future);
2189 Self::Started(handle)
2190 }
2191 Self::Started(_) => self,
2192 }
2193 }
2194}
2195
2196fn send_trusted_peer_change(
2199 config: &NodeConfig,
2200 sender: &watch::Sender<TrustedPeerChangeEvent>,
2201 new_epoch_start_state: &EpochStartSystemState,
2202) {
2203 let new_committee =
2204 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2205
2206 sender.send_modify(|event| {
2207 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2208 event.new_committee = new_committee;
2209 })
2210}
2211
2212fn build_kv_store(
2213 state: &Arc<AuthorityState>,
2214 config: &NodeConfig,
2215 registry: &Registry,
2216) -> Result<Arc<TransactionKeyValueStore>> {
2217 let metrics = KeyValueStoreMetrics::new(registry);
2218 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2219
2220 let base_url = &config.transaction_kv_store_read_config.base_url;
2221
2222 if base_url.is_empty() {
2223 info!("no http kv store url provided, using local db only");
2224 return Ok(Arc::new(db_store));
2225 }
2226
2227 base_url.parse::<url::Url>().tap_err(|e| {
2228 error!(
2229 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2230 base_url, e
2231 )
2232 })?;
2233
2234 let http_store = HttpKVStore::new_kv(
2235 base_url,
2236 config.transaction_kv_store_read_config.cache_size,
2237 metrics.clone(),
2238 )?;
2239 info!("using local key-value store with fallback to http key-value store");
2240 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2241 db_store,
2242 http_store,
2243 metrics,
2244 "json_rpc_fallback",
2245 )))
2246}
2247
2248pub async fn build_http_server(
2265 state: Arc<AuthorityState>,
2266 store: RocksDbStore,
2267 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2268 config: &NodeConfig,
2269 prometheus_registry: &Registry,
2270 _custom_runtime: Option<Handle>,
2271 software_version: &'static str,
2272) -> Result<Option<iota_http::ServerHandle>> {
2273 if config.consensus_config().is_some() {
2275 return Ok(None);
2276 }
2277
2278 let mut router = axum::Router::new();
2279
2280 let json_rpc_router = {
2281 let mut server = JsonRpcServerBuilder::new(
2282 env!("CARGO_PKG_VERSION"),
2283 prometheus_registry,
2284 config.policy_config.clone(),
2285 config.firewall_config.clone(),
2286 );
2287
2288 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2289
2290 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2291 server.register_module(ReadApi::new(
2292 state.clone(),
2293 kv_store.clone(),
2294 metrics.clone(),
2295 ))?;
2296 server.register_module(CoinReadApi::new(
2297 state.clone(),
2298 kv_store.clone(),
2299 metrics.clone(),
2300 )?)?;
2301
2302 if config.run_with_range.is_none() {
2305 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2306 }
2307 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2308
2309 if let Some(transaction_orchestrator) = transaction_orchestrator {
2310 server.register_module(TransactionExecutionApi::new(
2311 state.clone(),
2312 transaction_orchestrator.clone(),
2313 metrics.clone(),
2314 ))?;
2315 }
2316
2317 let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2320
2321 server.register_module(IndexerApi::new(
2322 state.clone(),
2323 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2324 kv_store,
2325 metrics,
2326 iota_names_config,
2327 config.indexer_max_subscriptions,
2328 ))?;
2329 server.register_module(MoveUtils::new(state.clone()))?;
2330
2331 let server_type = config.jsonrpc_server_type();
2332
2333 server.to_router(server_type).await?
2334 };
2335
2336 router = router.merge(json_rpc_router);
2337
2338 if config.enable_rest_api {
2339 let mut rest_service = iota_rest_api::RestService::new(
2340 Arc::new(RestReadStore::new(state.clone(), store)),
2341 software_version,
2342 );
2343
2344 if let Some(config) = config.rest.clone() {
2345 rest_service.with_config(config);
2346 }
2347
2348 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2349
2350 if let Some(transaction_orchestrator) = transaction_orchestrator {
2351 rest_service.with_executor(transaction_orchestrator.clone())
2352 }
2353
2354 router = router.merge(rest_service.into_router());
2355 }
2356
2357 router = router
2360 .route("/health", axum::routing::get(health_check_handler))
2361 .route_layer(axum::Extension(state));
2362
2363 let layers = ServiceBuilder::new()
2364 .map_request(|mut request: axum::http::Request<_>| {
2365 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2366 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2367 request.extensions_mut().insert(axum_connect_info);
2368 }
2369 request
2370 })
2371 .layer(axum::middleware::from_fn(server_timing_middleware));
2372
2373 router = router.layer(layers);
2374
2375 let handle = iota_http::Builder::new()
2376 .serve(&config.json_rpc_address, router)
2377 .map_err(|e| anyhow::anyhow!("{e}"))?;
2378 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2379
2380 Ok(Some(handle))
2381}
2382
2383#[derive(Debug, serde::Serialize, serde::Deserialize)]
2384pub struct Threshold {
2385 pub threshold_seconds: Option<u32>,
2386}
2387
2388async fn health_check_handler(
2389 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2390 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2391) -> impl axum::response::IntoResponse {
2392 if let Some(threshold_seconds) = threshold_seconds {
2393 let summary = match state
2395 .get_checkpoint_store()
2396 .get_highest_executed_checkpoint()
2397 {
2398 Ok(Some(summary)) => summary,
2399 Ok(None) => {
2400 warn!("Highest executed checkpoint not found");
2401 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2402 }
2403 Err(err) => {
2404 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2405 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2406 }
2407 };
2408
2409 let latest_chain_time = summary.timestamp();
2411 let threshold =
2412 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2413
2414 if latest_chain_time < threshold {
2416 warn!(
2417 ?latest_chain_time,
2418 ?threshold,
2419 "failing health check due to checkpoint lag"
2420 );
2421 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2422 }
2423 }
2424 (axum::http::StatusCode::OK, "up")
2426}
2427
2428#[cfg(not(test))]
2429fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2430 protocol_config.max_transactions_per_checkpoint() as usize
2431}
2432
2433#[cfg(test)]
2434fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2435 2
2436}