iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    net::SocketAddr,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::TryFutureExt;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_config::{
29    ConsensusConfig, NodeConfig,
30    node::{DBCheckpointConfig, RunWithRange},
31    node_config_metrics::NodeConfigMetrics,
32    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
33};
34use iota_core::{
35    authority::{
36        AuthorityState, AuthorityStore, CHAIN_IDENTIFIER, RandomnessRoundReceiver,
37        authority_per_epoch_store::AuthorityPerEpochStore,
38        authority_store_tables::AuthorityPerpetualTables,
39        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
40    },
41    authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
42    authority_client::NetworkAuthorityClient,
43    authority_server::{ValidatorService, ValidatorServiceMetrics},
44    checkpoints::{
45        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
46        SubmitCheckpointToConsensus,
47        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
48    },
49    connection_monitor::ConnectionMonitor,
50    consensus_adapter::{
51        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
52        ConsensusClient,
53    },
54    consensus_handler::ConsensusHandlerInitializer,
55    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
56    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
57    db_checkpoint_handler::DBCheckpointHandler,
58    epoch::{
59        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
60        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
61        reconfiguration::ReconfigurationInitiator,
62    },
63    execution_cache::build_execution_cache,
64    module_cache_metrics::ResolverMetrics,
65    overload_monitor::overload_monitor,
66    rest_index::RestIndexStore,
67    safe_client::SafeClientMetricsBase,
68    signature_verifier::SignatureVerifierMetrics,
69    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
70    storage::{RestReadStore, RocksDbStore},
71    traffic_controller::metrics::TrafficControllerMetrics,
72    transaction_orchestrator::TransactionOrchestrator,
73    validator_tx_finalizer::ValidatorTxFinalizer,
74};
75use iota_json_rpc::{
76    JsonRpcServerBuilder, bridge_api::BridgeReadApi, coin_api::CoinReadApi,
77    governance_api::GovernanceReadApi, indexer_api::IndexerApi, move_utils::MoveUtils,
78    read_api::ReadApi, transaction_builder_api::TransactionBuilderApi,
79    transaction_execution_api::TransactionExecutionApi,
80};
81use iota_json_rpc_api::JsonRpcMetrics;
82use iota_macros::{fail_point, fail_point_async, replay_log};
83use iota_metrics::{
84    RegistryService,
85    hardware_metrics::register_hardware_metrics,
86    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
87    server_timing_middleware, spawn_monitored_task,
88};
89use iota_network::{
90    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
91};
92use iota_network_stack::server::ServerBuilder;
93use iota_protocol_config::ProtocolConfig;
94use iota_rest_api::RestMetrics;
95use iota_snapshot::uploader::StateSnapshotUploader;
96use iota_storage::{
97    FileCompression, IndexStore, StorageFormat,
98    http_key_value_store::HttpKVStore,
99    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
100    key_value_store_metrics::KeyValueStoreMetrics,
101};
102use iota_types::{
103    base_types::{AuthorityName, ConciseableName, EpochId},
104    committee::Committee,
105    crypto::{KeypairTraits, RandomnessRound},
106    digests::ChainIdentifier,
107    error::{IotaError, IotaResult},
108    execution_config_utils::to_binary_config,
109    iota_system_state::{
110        IotaSystemState, IotaSystemStateTrait,
111        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
112    },
113    messages_consensus::{AuthorityCapabilitiesV1, ConsensusTransaction, check_total_jwk_size},
114    quorum_driver_types::QuorumDriverEffectsQueueResult,
115    supported_protocol_versions::SupportedProtocolVersions,
116    transaction::Transaction,
117};
118use prometheus::Registry;
119#[cfg(msim)]
120pub use simulator::set_jwk_injector;
121#[cfg(msim)]
122use simulator::*;
123use tap::tap::TapFallible;
124use tokio::{
125    runtime::Handle,
126    sync::{Mutex, broadcast, mpsc, watch},
127    task::{JoinHandle, JoinSet},
128};
129use tower::ServiceBuilder;
130use tracing::{Instrument, debug, error, error_span, info, warn};
131use typed_store::{DBMetrics, rocks::default_db_options};
132
133use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
134
135pub mod admin;
136mod handle;
137pub mod metrics;
138
139pub struct ValidatorComponents {
140    validator_server_handle: JoinHandle<Result<()>>,
141    validator_overload_monitor_handle: Option<JoinHandle<()>>,
142    consensus_manager: ConsensusManager,
143    consensus_store_pruner: ConsensusStorePruner,
144    consensus_adapter: Arc<ConsensusAdapter>,
145    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
146    checkpoint_service_tasks: JoinSet<()>,
147    checkpoint_metrics: Arc<CheckpointMetrics>,
148    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
149}
150
151#[cfg(msim)]
152mod simulator {
153    use std::sync::atomic::AtomicBool;
154
155    use super::*;
156
157    pub(super) struct SimState {
158        pub sim_node: iota_simulator::runtime::NodeHandle,
159        pub sim_safe_mode_expected: AtomicBool,
160        _leak_detector: iota_simulator::NodeLeakDetector,
161    }
162
163    impl Default for SimState {
164        fn default() -> Self {
165            Self {
166                sim_node: iota_simulator::runtime::NodeHandle::current(),
167                sim_safe_mode_expected: AtomicBool::new(false),
168                _leak_detector: iota_simulator::NodeLeakDetector::new(),
169            }
170        }
171    }
172
173    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
174        + Send
175        + Sync
176        + 'static;
177
178    fn default_fetch_jwks(
179        _authority: AuthorityName,
180        _provider: &OIDCProvider,
181    ) -> IotaResult<Vec<(JwkId, JWK)>> {
182        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
183        // Just load a default Twitch jwk for testing.
184        parse_jwks(
185            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
186            &OIDCProvider::Twitch,
187        )
188        .map_err(|_| IotaError::JWKRetrieval)
189    }
190
191    thread_local! {
192        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
193    }
194
195    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
196        JWK_INJECTOR.with(|injector| injector.borrow().clone())
197    }
198
199    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
200        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
201    }
202}
203
204pub struct IotaNode {
205    config: NodeConfig,
206    validator_components: Mutex<Option<ValidatorComponents>>,
207    /// The http server responsible for serving JSON-RPC as well as the
208    /// experimental rest service
209    _http_server: Option<tokio::task::JoinHandle<()>>,
210    state: Arc<AuthorityState>,
211    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
212    registry_service: RegistryService,
213    metrics: Arc<IotaNodeMetrics>,
214
215    _discovery: discovery::Handle,
216    state_sync_handle: state_sync::Handle,
217    randomness_handle: randomness::Handle,
218    checkpoint_store: Arc<CheckpointStore>,
219    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
220    connection_monitor_status: Arc<ConnectionMonitorStatus>,
221
222    /// Broadcast channel to send the starting system state for the next epoch.
223    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
224
225    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
226    /// peers.
227    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
228
229    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
230
231    #[cfg(msim)]
232    sim_state: SimState,
233
234    _state_archive_handle: Option<broadcast::Sender<()>>,
235
236    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
237    // Channel to allow signaling upstream to shutdown iota-node
238    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
239
240    /// AuthorityAggregator of the network, created at start and beginning of
241    /// each epoch. Use ArcSwap so that we could mutate it without taking
242    /// mut reference.
243    // TODO: Eventually we can make this auth aggregator a shared reference so that this
244    // update will automatically propagate to other uses.
245    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
246}
247
248impl fmt::Debug for IotaNode {
249    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
250        f.debug_struct("IotaNode")
251            .field("name", &self.state.name.concise())
252            .finish()
253    }
254}
255
256static MAX_JWK_KEYS_PER_FETCH: usize = 100;
257
258impl IotaNode {
259    pub async fn start(
260        config: NodeConfig,
261        registry_service: RegistryService,
262        custom_rpc_runtime: Option<Handle>,
263    ) -> Result<Arc<IotaNode>> {
264        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
265    }
266
267    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
268    /// configuration.
269    /// This function ensures continuous fetching, validation, and submission of
270    /// JWKs, maintaining up-to-date keys for the specified providers.
271    fn start_jwk_updater(
272        config: &NodeConfig,
273        metrics: Arc<IotaNodeMetrics>,
274        authority: AuthorityName,
275        epoch_store: Arc<AuthorityPerEpochStore>,
276        consensus_adapter: Arc<ConsensusAdapter>,
277    ) {
278        let epoch = epoch_store.epoch();
279
280        let supported_providers = config
281            .zklogin_oauth_providers
282            .get(&epoch_store.get_chain_identifier().chain())
283            .unwrap_or(&BTreeSet::new())
284            .iter()
285            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
286            .collect::<Vec<_>>();
287
288        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
289
290        info!(
291            ?fetch_interval,
292            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
293        );
294
295        fn validate_jwk(
296            metrics: &Arc<IotaNodeMetrics>,
297            provider: &OIDCProvider,
298            id: &JwkId,
299            jwk: &JWK,
300        ) -> bool {
301            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
302                warn!(
303                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
304                    id.iss, provider
305                );
306                metrics
307                    .invalid_jwks
308                    .with_label_values(&[&provider.to_string()])
309                    .inc();
310                return false;
311            };
312
313            if iss_provider != *provider {
314                warn!(
315                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
316                    id.iss, provider, iss_provider
317                );
318                metrics
319                    .invalid_jwks
320                    .with_label_values(&[&provider.to_string()])
321                    .inc();
322                return false;
323            }
324
325            if !check_total_jwk_size(id, jwk) {
326                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
327                metrics
328                    .invalid_jwks
329                    .with_label_values(&[&provider.to_string()])
330                    .inc();
331                return false;
332            }
333
334            true
335        }
336
337        // metrics is:
338        //  pub struct IotaNodeMetrics {
339        //      pub jwk_requests: IntCounterVec,
340        //      pub jwk_request_errors: IntCounterVec,
341        //      pub total_jwks: IntCounterVec,
342        //      pub unique_jwks: IntCounterVec,
343        //  }
344
345        for p in supported_providers.into_iter() {
346            let provider_str = p.to_string();
347            let epoch_store = epoch_store.clone();
348            let consensus_adapter = consensus_adapter.clone();
349            let metrics = metrics.clone();
350            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
351                async move {
352                    // note: restart-safe de-duplication happens after consensus, this is
353                    // just best-effort to reduce unneeded submissions.
354                    let mut seen = HashSet::new();
355                    loop {
356                        info!("fetching JWK for provider {:?}", p);
357                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
358                        match Self::fetch_jwks(authority, &p).await {
359                            Err(e) => {
360                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
361                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
362                                // Retry in 30 seconds
363                                tokio::time::sleep(Duration::from_secs(30)).await;
364                                continue;
365                            }
366                            Ok(mut keys) => {
367                                metrics.total_jwks
368                                    .with_label_values(&[&provider_str])
369                                    .inc_by(keys.len() as u64);
370
371                                keys.retain(|(id, jwk)| {
372                                    validate_jwk(&metrics, &p, id, jwk) &&
373                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
374                                    seen.insert((id.clone(), jwk.clone()))
375                                });
376
377                                metrics.unique_jwks
378                                    .with_label_values(&[&provider_str])
379                                    .inc_by(keys.len() as u64);
380
381                                // prevent oauth providers from sending too many keys,
382                                // inadvertently or otherwise
383                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
384                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
385                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
386                                }
387
388                                for (id, jwk) in keys.into_iter() {
389                                    info!("Submitting JWK to consensus: {:?}", id);
390
391                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
392                                    consensus_adapter.submit(txn, None, &epoch_store)
393                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
394                                        .ok();
395                                }
396                            }
397                        }
398                        tokio::time::sleep(fetch_interval).await;
399                    }
400                }
401                .instrument(error_span!("jwk_updater_task", epoch)),
402            ));
403        }
404    }
405
406    pub async fn start_async(
407        config: NodeConfig,
408        registry_service: RegistryService,
409        custom_rpc_runtime: Option<Handle>,
410        software_version: &'static str,
411    ) -> Result<Arc<IotaNode>> {
412        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
413        let mut config = config.clone();
414        if config.supported_protocol_versions.is_none() {
415            info!(
416                "populating config.supported_protocol_versions with default {:?}",
417                SupportedProtocolVersions::SYSTEM_DEFAULT
418            );
419            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
420        }
421
422        let run_with_range = config.run_with_range;
423        let is_validator = config.consensus_config().is_some();
424        let is_full_node = !is_validator;
425        let prometheus_registry = registry_service.default_registry();
426
427        info!(node =? config.authority_public_key(),
428            "Initializing iota-node listening on {}", config.network_address
429        );
430
431        let genesis = config.genesis()?.clone();
432
433        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
434        // It's ok if the value is already set due to data races.
435        let _ = CHAIN_IDENTIFIER.set(chain_identifier);
436        info!("IOTA chain identifier: {chain_identifier}");
437
438        // Initialize metrics to track db usage before creating any stores
439        DBMetrics::init(&prometheus_registry);
440
441        // Initialize IOTA metrics.
442        iota_metrics::init_metrics(&prometheus_registry);
443        // Unsupported (because of the use of static variable) and unnecessary in
444        // simtests.
445        #[cfg(not(msim))]
446        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
447
448        // Register hardware metrics.
449        register_hardware_metrics(&registry_service, &config.db_path)
450            .expect("Failed registering hardware metrics");
451        // Register uptime metric
452        prometheus_registry
453            .register(iota_metrics::uptime_metric(
454                if is_validator {
455                    "validator"
456                } else {
457                    "fullnode"
458                },
459                software_version,
460                &chain_identifier.to_string(),
461            ))
462            .expect("Failed registering uptime metric");
463
464        // If genesis come with some migration data then load them into memory from the
465        // file path specified in config.
466        let migration_tx_data = if genesis.contains_migrations() {
467            // Here the load already verifies that the content of the migration blob is
468            // valid in respect to the content found in genesis
469            Some(config.load_migration_tx_data()?)
470        } else {
471            None
472        };
473
474        let secret = Arc::pin(config.authority_key_pair().copy());
475        let genesis_committee = genesis.committee()?;
476        let committee_store = Arc::new(CommitteeStore::new(
477            config.db_path().join("epochs"),
478            &genesis_committee,
479            None,
480        ));
481
482        let perpetual_options = default_db_options().optimize_db_for_write_throughput(4);
483        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
484            &config.db_path().join("store"),
485            Some(perpetual_options.options),
486        ));
487        let is_genesis = perpetual_tables
488            .database_is_empty()
489            .expect("Database read should not fail at init.");
490        let store = AuthorityStore::open(
491            perpetual_tables,
492            &genesis,
493            &config,
494            &prometheus_registry,
495            migration_tx_data.as_ref(),
496        )
497        .await?;
498
499        let cur_epoch = store.get_recovery_epoch_at_restart()?;
500        let committee = committee_store
501            .get_committee(&cur_epoch)?
502            .expect("Committee of the current epoch must exist");
503        let epoch_start_configuration = store
504            .get_epoch_start_configuration()?
505            .expect("EpochStartConfiguration of the current epoch must exist");
506        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
507        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
508
509        let cache_traits =
510            build_execution_cache(&epoch_start_configuration, &prometheus_registry, &store);
511
512        let auth_agg = {
513            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
514            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
515            Arc::new(ArcSwap::new(Arc::new(
516                AuthorityAggregator::new_from_epoch_start_state(
517                    epoch_start_configuration.epoch_start_state(),
518                    &committee_store,
519                    safe_client_metrics_base,
520                    auth_agg_metrics,
521                ),
522            )))
523        };
524
525        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
526        let epoch_store = AuthorityPerEpochStore::new(
527            config.authority_public_key(),
528            committee.clone(),
529            &config.db_path().join("store"),
530            Some(epoch_options.options),
531            EpochMetrics::new(&registry_service.default_registry()),
532            epoch_start_configuration,
533            cache_traits.backing_package_store.clone(),
534            cache_traits.object_store.clone(),
535            cache_metrics,
536            signature_verifier_metrics,
537            &config.expensive_safety_check_config,
538            ChainIdentifier::from(*genesis.checkpoint().digest()),
539        );
540
541        info!("created epoch store");
542
543        replay_log!(
544            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
545            epoch_store.epoch(),
546            epoch_store.protocol_config()
547        );
548
549        // the database is empty at genesis time
550        if is_genesis {
551            info!("checking IOTA conservation at genesis");
552            // When we are opening the db table, the only time when it's safe to
553            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
554            // an epoch and the IOTA conservation check will fail. This also initialize
555            // the expected_network_iota_amount table.
556            cache_traits
557                .reconfig_api
558                .expensive_check_iota_conservation(&epoch_store, None)
559                .expect("IOTA conservation check cannot fail at genesis");
560        }
561
562        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
563        let default_buffer_stake = epoch_store
564            .protocol_config()
565            .buffer_stake_for_protocol_upgrade_bps();
566        if effective_buffer_stake != default_buffer_stake {
567            warn!(
568                ?effective_buffer_stake,
569                ?default_buffer_stake,
570                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
571            );
572        }
573
574        info!("creating checkpoint store");
575
576        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
577        checkpoint_store.insert_genesis_checkpoint(
578            genesis.checkpoint(),
579            genesis.checkpoint_contents().clone(),
580            &epoch_store,
581        );
582
583        info!("creating state sync store");
584        let state_sync_store = RocksDbStore::new(
585            cache_traits.clone(),
586            committee_store.clone(),
587            checkpoint_store.clone(),
588        );
589
590        let index_store = if is_full_node && config.enable_index_processing {
591            info!("creating index store");
592            Some(Arc::new(IndexStore::new(
593                config.db_path().join("indexes"),
594                &prometheus_registry,
595                epoch_store
596                    .protocol_config()
597                    .max_move_identifier_len_as_option(),
598                config.remove_deprecated_tables,
599            )))
600        } else {
601            None
602        };
603
604        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
605        {
606            Some(Arc::new(RestIndexStore::new(
607                config.db_path().join("rest_index"),
608                &store,
609                &checkpoint_store,
610                &epoch_store,
611                &cache_traits.backing_package_store,
612            )))
613        } else {
614            None
615        };
616
617        info!("creating archive reader");
618        // Create network
619        // TODO only configure validators as seed/preferred peers for validators and not
620        // for fullnodes once we've had a chance to re-work fullnode
621        // configuration generation.
622        let archive_readers =
623            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
624        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
625        let (randomness_tx, randomness_rx) = mpsc::channel(
626            config
627                .p2p_config
628                .randomness
629                .clone()
630                .unwrap_or_default()
631                .mailbox_capacity(),
632        );
633        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
634            Self::create_p2p_network(
635                &config,
636                state_sync_store.clone(),
637                chain_identifier,
638                trusted_peer_change_rx,
639                archive_readers.clone(),
640                randomness_tx,
641                &prometheus_registry,
642            )?;
643
644        // We must explicitly send this instead of relying on the initial value to
645        // trigger watch value change, so that state-sync is able to process it.
646        send_trusted_peer_change(
647            &config,
648            &trusted_peer_change_tx,
649            epoch_store.epoch_start_state(),
650        );
651
652        info!("start state archival");
653        // Start archiving local state to remote store
654        let state_archive_handle =
655            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
656                .await?;
657
658        info!("start snapshot upload");
659        // Start uploading state snapshot to remote store
660        let state_snapshot_handle =
661            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
662
663        // Start uploading db checkpoints to remote store
664        info!("start db checkpoint");
665        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
666            &config,
667            &prometheus_registry,
668            state_snapshot_handle.is_some(),
669        )?;
670
671        let mut genesis_objects = genesis.objects().to_vec();
672        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
673            genesis_objects.extend(migration_tx_data.get_objects());
674        }
675
676        let authority_name = config.authority_public_key();
677        let validator_tx_finalizer =
678            config
679                .enable_validator_tx_finalizer
680                .then_some(Arc::new(ValidatorTxFinalizer::new(
681                    auth_agg.clone(),
682                    authority_name,
683                    &prometheus_registry,
684                )));
685
686        info!("create authority state");
687        let state = AuthorityState::new(
688            authority_name,
689            secret,
690            config.supported_protocol_versions.unwrap(),
691            store.clone(),
692            cache_traits.clone(),
693            epoch_store.clone(),
694            committee_store.clone(),
695            index_store.clone(),
696            rest_index,
697            checkpoint_store.clone(),
698            &prometheus_registry,
699            &genesis_objects,
700            &db_checkpoint_config,
701            config.clone(),
702            config.indirect_objects_threshold,
703            archive_readers,
704            validator_tx_finalizer,
705        )
706        .await;
707
708        // ensure genesis and migration txs were executed
709        if epoch_store.epoch() == 0 {
710            let genesis_tx = &genesis.transaction();
711            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
712            // Execute genesis transaction
713            Self::execute_transaction_immediately_at_zero_epoch(
714                &state,
715                &epoch_store,
716                genesis_tx,
717                span,
718            )
719            .await;
720
721            // Execute migration transactions if present
722            if let Some(migration_tx_data) = migration_tx_data {
723                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
724                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
725                    Self::execute_transaction_immediately_at_zero_epoch(
726                        &state,
727                        &epoch_store,
728                        tx,
729                        span,
730                    )
731                    .await;
732                }
733            }
734        }
735
736        checkpoint_store
737            .reexecute_local_checkpoints(&state, &epoch_store)
738            .await;
739
740        // Start the loop that receives new randomness and generates transactions for
741        // it.
742        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
743
744        if config
745            .expensive_safety_check_config
746            .enable_secondary_index_checks()
747        {
748            if let Some(indexes) = state.indexes.clone() {
749                iota_core::verify_indexes::verify_indexes(
750                    state.get_accumulator_store().as_ref(),
751                    indexes,
752                )
753                .expect("secondary indexes are inconsistent");
754            }
755        }
756
757        let (end_of_epoch_channel, end_of_epoch_receiver) =
758            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
759
760        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
761            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
762                auth_agg.load_full(),
763                state.clone(),
764                end_of_epoch_receiver,
765                &config.db_path(),
766                &prometheus_registry,
767            )))
768        } else {
769            None
770        };
771
772        let http_server = build_http_server(
773            state.clone(),
774            state_sync_store,
775            &transaction_orchestrator.clone(),
776            &config,
777            &prometheus_registry,
778            custom_rpc_runtime,
779            software_version,
780        )
781        .await?;
782
783        let accumulator = Arc::new(StateAccumulator::new(
784            cache_traits.accumulator_store.clone(),
785            StateAccumulatorMetrics::new(&prometheus_registry),
786        ));
787
788        let authority_names_to_peer_ids = epoch_store
789            .epoch_start_state()
790            .get_authority_names_to_peer_ids();
791
792        let network_connection_metrics =
793            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
794
795        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
796
797        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
798            p2p_network.downgrade(),
799            network_connection_metrics,
800            HashMap::new(),
801            None,
802        );
803
804        let connection_monitor_status = ConnectionMonitorStatus {
805            connection_statuses,
806            authority_names_to_peer_ids,
807        };
808
809        let connection_monitor_status = Arc::new(connection_monitor_status);
810        let iota_node_metrics =
811            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
812
813        let validator_components = if state.is_validator(&epoch_store) {
814            let components = Self::construct_validator_components(
815                config.clone(),
816                state.clone(),
817                committee,
818                epoch_store.clone(),
819                checkpoint_store.clone(),
820                state_sync_handle.clone(),
821                randomness_handle.clone(),
822                Arc::downgrade(&accumulator),
823                connection_monitor_status.clone(),
824                &registry_service,
825                iota_node_metrics.clone(),
826            )
827            .await?;
828            // This is only needed during cold start.
829            components.consensus_adapter.submit_recovered(&epoch_store);
830
831            Some(components)
832        } else {
833            None
834        };
835
836        // setup shutdown channel
837        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
838
839        let node = Self {
840            config,
841            validator_components: Mutex::new(validator_components),
842            _http_server: http_server,
843            state,
844            transaction_orchestrator,
845            registry_service,
846            metrics: iota_node_metrics,
847
848            _discovery: discovery_handle,
849            state_sync_handle,
850            randomness_handle,
851            checkpoint_store,
852            accumulator: Mutex::new(Some(accumulator)),
853            end_of_epoch_channel,
854            connection_monitor_status,
855            trusted_peer_change_tx,
856
857            _db_checkpoint_handle: db_checkpoint_handle,
858
859            #[cfg(msim)]
860            sim_state: Default::default(),
861
862            _state_archive_handle: state_archive_handle,
863            _state_snapshot_uploader_handle: state_snapshot_handle,
864            shutdown_channel_tx: shutdown_channel,
865
866            auth_agg,
867        };
868
869        info!("IotaNode started!");
870        let node = Arc::new(node);
871        let node_copy = node.clone();
872        spawn_monitored_task!(async move {
873            let result = Self::monitor_reconfiguration(node_copy).await;
874            if let Err(error) = result {
875                warn!("Reconfiguration finished with error {:?}", error);
876            }
877        });
878
879        Ok(node)
880    }
881
882    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
883        self.end_of_epoch_channel.subscribe()
884    }
885
886    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
887        self.shutdown_channel_tx.subscribe()
888    }
889
890    pub fn current_epoch_for_testing(&self) -> EpochId {
891        self.state.current_epoch_for_testing()
892    }
893
894    pub fn db_checkpoint_path(&self) -> PathBuf {
895        self.config.db_checkpoint_path()
896    }
897
898    // Init reconfig process by starting to reject user certs
899    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
900        info!("close_epoch (current epoch = {})", epoch_store.epoch());
901        self.validator_components
902            .lock()
903            .await
904            .as_ref()
905            .ok_or_else(|| IotaError::from("Node is not a validator"))?
906            .consensus_adapter
907            .close_epoch(epoch_store);
908        Ok(())
909    }
910
911    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
912        self.state
913            .clear_override_protocol_upgrade_buffer_stake(epoch)
914    }
915
916    pub fn set_override_protocol_upgrade_buffer_stake(
917        &self,
918        epoch: EpochId,
919        buffer_stake_bps: u64,
920    ) -> IotaResult {
921        self.state
922            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
923    }
924
925    // Testing-only API to start epoch close process.
926    // For production code, please use the non-testing version.
927    pub async fn close_epoch_for_testing(&self) -> IotaResult {
928        let epoch_store = self.state.epoch_store_for_testing();
929        self.close_epoch(&epoch_store).await
930    }
931
932    async fn start_state_archival(
933        config: &NodeConfig,
934        prometheus_registry: &Registry,
935        state_sync_store: RocksDbStore,
936    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
937        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
938            let local_store_config = ObjectStoreConfig {
939                object_store: Some(ObjectStoreType::File),
940                directory: Some(config.archive_path()),
941                ..Default::default()
942            };
943            let archive_writer = ArchiveWriter::new(
944                local_store_config,
945                remote_store_config.clone(),
946                FileCompression::Zstd,
947                StorageFormat::Blob,
948                Duration::from_secs(600),
949                256 * 1024 * 1024,
950                prometheus_registry,
951            )
952            .await?;
953            Ok(Some(archive_writer.start(state_sync_store).await?))
954        } else {
955            Ok(None)
956        }
957    }
958
959    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
960    /// is set.
961    fn start_state_snapshot(
962        config: &NodeConfig,
963        prometheus_registry: &Registry,
964        checkpoint_store: Arc<CheckpointStore>,
965    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
966        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
967            let snapshot_uploader = StateSnapshotUploader::new(
968                &config.db_checkpoint_path(),
969                &config.snapshot_path(),
970                remote_store_config.clone(),
971                60,
972                prometheus_registry,
973                checkpoint_store,
974            )?;
975            Ok(Some(snapshot_uploader.start()))
976        } else {
977            Ok(None)
978        }
979    }
980
981    fn start_db_checkpoint(
982        config: &NodeConfig,
983        prometheus_registry: &Registry,
984        state_snapshot_enabled: bool,
985    ) -> Result<(
986        DBCheckpointConfig,
987        Option<tokio::sync::broadcast::Sender<()>>,
988    )> {
989        let checkpoint_path = Some(
990            config
991                .db_checkpoint_config
992                .checkpoint_path
993                .clone()
994                .unwrap_or_else(|| config.db_checkpoint_path()),
995        );
996        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
997            DBCheckpointConfig {
998                checkpoint_path,
999                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1000                    true
1001                } else {
1002                    config
1003                        .db_checkpoint_config
1004                        .perform_db_checkpoints_at_epoch_end
1005                },
1006                ..config.db_checkpoint_config.clone()
1007            }
1008        } else {
1009            config.db_checkpoint_config.clone()
1010        };
1011
1012        match (
1013            db_checkpoint_config.object_store_config.as_ref(),
1014            state_snapshot_enabled,
1015        ) {
1016            // If db checkpoint config object store not specified but
1017            // state snapshot object store is specified, create handler
1018            // anyway for marking db checkpoints as completed so that they
1019            // can be uploaded as state snapshots.
1020            (None, false) => Ok((db_checkpoint_config, None)),
1021            (_, _) => {
1022                let handler = DBCheckpointHandler::new(
1023                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1024                    db_checkpoint_config.object_store_config.as_ref(),
1025                    60,
1026                    db_checkpoint_config
1027                        .prune_and_compact_before_upload
1028                        .unwrap_or(true),
1029                    config.indirect_objects_threshold,
1030                    config.authority_store_pruning_config.clone(),
1031                    prometheus_registry,
1032                    state_snapshot_enabled,
1033                )?;
1034                Ok((
1035                    db_checkpoint_config,
1036                    Some(DBCheckpointHandler::start(handler)),
1037                ))
1038            }
1039        }
1040    }
1041
1042    fn create_p2p_network(
1043        config: &NodeConfig,
1044        state_sync_store: RocksDbStore,
1045        chain_identifier: ChainIdentifier,
1046        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1047        archive_readers: ArchiveReaderBalancer,
1048        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1049        prometheus_registry: &Registry,
1050    ) -> Result<(
1051        Network,
1052        discovery::Handle,
1053        state_sync::Handle,
1054        randomness::Handle,
1055    )> {
1056        let (state_sync, state_sync_server) = state_sync::Builder::new()
1057            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1058            .store(state_sync_store)
1059            .archive_readers(archive_readers)
1060            .with_metrics(prometheus_registry)
1061            .build();
1062
1063        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1064            .config(config.p2p_config.clone())
1065            .build();
1066
1067        let (randomness, randomness_router) =
1068            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1069                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1070                .with_metrics(prometheus_registry)
1071                .build();
1072
1073        let p2p_network = {
1074            let routes = anemo::Router::new()
1075                .add_rpc_service(discovery_server)
1076                .add_rpc_service(state_sync_server);
1077            let routes = routes.merge(randomness_router);
1078
1079            let inbound_network_metrics =
1080                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1081            let outbound_network_metrics =
1082                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1083
1084            let service = ServiceBuilder::new()
1085                .layer(
1086                    TraceLayer::new_for_server_errors()
1087                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1088                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1089                )
1090                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1091                    Arc::new(inbound_network_metrics),
1092                    config.p2p_config.excessive_message_size(),
1093                )))
1094                .service(routes);
1095
1096            let outbound_layer = ServiceBuilder::new()
1097                .layer(
1098                    TraceLayer::new_for_client_and_server_errors()
1099                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1100                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1101                )
1102                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1103                    Arc::new(outbound_network_metrics),
1104                    config.p2p_config.excessive_message_size(),
1105                )))
1106                .into_inner();
1107
1108            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1109            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1110            // many staking events in the epoch change txn.
1111            anemo_config.max_frame_size = Some(1 << 30);
1112
1113            // Set a higher default value for socket send/receive buffers if not already
1114            // configured.
1115            let mut quic_config = anemo_config.quic.unwrap_or_default();
1116            if quic_config.socket_send_buffer_size.is_none() {
1117                quic_config.socket_send_buffer_size = Some(20 << 20);
1118            }
1119            if quic_config.socket_receive_buffer_size.is_none() {
1120                quic_config.socket_receive_buffer_size = Some(20 << 20);
1121            }
1122            quic_config.allow_failed_socket_buffer_size_setting = true;
1123
1124            // Set high-performance defaults for quinn transport.
1125            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1126            if quic_config.max_concurrent_bidi_streams.is_none() {
1127                quic_config.max_concurrent_bidi_streams = Some(500);
1128            }
1129            if quic_config.max_concurrent_uni_streams.is_none() {
1130                quic_config.max_concurrent_uni_streams = Some(500);
1131            }
1132            if quic_config.stream_receive_window.is_none() {
1133                quic_config.stream_receive_window = Some(100 << 20);
1134            }
1135            if quic_config.receive_window.is_none() {
1136                quic_config.receive_window = Some(200 << 20);
1137            }
1138            if quic_config.send_window.is_none() {
1139                quic_config.send_window = Some(200 << 20);
1140            }
1141            if quic_config.crypto_buffer_size.is_none() {
1142                quic_config.crypto_buffer_size = Some(1 << 20);
1143            }
1144            if quic_config.max_idle_timeout_ms.is_none() {
1145                quic_config.max_idle_timeout_ms = Some(30_000);
1146            }
1147            if quic_config.keep_alive_interval_ms.is_none() {
1148                quic_config.keep_alive_interval_ms = Some(5_000);
1149            }
1150            anemo_config.quic = Some(quic_config);
1151
1152            let server_name = format!("iota-{}", chain_identifier);
1153            let network = Network::bind(config.p2p_config.listen_address)
1154                .server_name(&server_name)
1155                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1156                .config(anemo_config)
1157                .outbound_request_layer(outbound_layer)
1158                .start(service)?;
1159            info!(
1160                server_name = server_name,
1161                "P2p network started on {}",
1162                network.local_addr()
1163            );
1164
1165            network
1166        };
1167
1168        let discovery_handle = discovery.start(p2p_network.clone());
1169        let state_sync_handle = state_sync.start(p2p_network.clone());
1170        let randomness_handle = randomness.start(p2p_network.clone());
1171
1172        Ok((
1173            p2p_network,
1174            discovery_handle,
1175            state_sync_handle,
1176            randomness_handle,
1177        ))
1178    }
1179
1180    /// Asynchronously constructs and initializes the components necessary for
1181    /// the validator node.
1182    async fn construct_validator_components(
1183        config: NodeConfig,
1184        state: Arc<AuthorityState>,
1185        committee: Arc<Committee>,
1186        epoch_store: Arc<AuthorityPerEpochStore>,
1187        checkpoint_store: Arc<CheckpointStore>,
1188        state_sync_handle: state_sync::Handle,
1189        randomness_handle: randomness::Handle,
1190        accumulator: Weak<StateAccumulator>,
1191        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1192        registry_service: &RegistryService,
1193        iota_node_metrics: Arc<IotaNodeMetrics>,
1194    ) -> Result<ValidatorComponents> {
1195        let mut config_clone = config.clone();
1196        let consensus_config = config_clone
1197            .consensus_config
1198            .as_mut()
1199            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1200
1201        let client = Arc::new(UpdatableConsensusClient::new());
1202        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1203            &committee,
1204            consensus_config,
1205            state.name,
1206            connection_monitor_status.clone(),
1207            &registry_service.default_registry(),
1208            client.clone(),
1209        ));
1210        let consensus_manager =
1211            ConsensusManager::new(&config, consensus_config, registry_service, client);
1212
1213        // This only gets started up once, not on every epoch. (Make call to remove
1214        // every epoch.)
1215        let consensus_store_pruner = ConsensusStorePruner::new(
1216            consensus_manager.get_storage_base_path(),
1217            consensus_config.db_retention_epochs(),
1218            consensus_config.db_pruner_period(),
1219            &registry_service.default_registry(),
1220        );
1221
1222        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
1223        let iota_tx_validator_metrics =
1224            IotaTxValidatorMetrics::new(&registry_service.default_registry());
1225
1226        let validator_server_handle = Self::start_grpc_validator_service(
1227            &config,
1228            state.clone(),
1229            consensus_adapter.clone(),
1230            &registry_service.default_registry(),
1231        )
1232        .await?;
1233
1234        // Starts an overload monitor that monitors the execution of the authority.
1235        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1236        let validator_overload_monitor_handle = if config
1237            .authority_overload_config
1238            .max_load_shedding_percentage
1239            > 0
1240        {
1241            let authority_state = Arc::downgrade(&state);
1242            let overload_config = config.authority_overload_config.clone();
1243            fail_point!("starting_overload_monitor");
1244            Some(spawn_monitored_task!(overload_monitor(
1245                authority_state,
1246                overload_config,
1247            )))
1248        } else {
1249            None
1250        };
1251
1252        Self::start_epoch_specific_validator_components(
1253            &config,
1254            state.clone(),
1255            consensus_adapter,
1256            checkpoint_store,
1257            epoch_store,
1258            state_sync_handle,
1259            randomness_handle,
1260            consensus_manager,
1261            consensus_store_pruner,
1262            accumulator,
1263            validator_server_handle,
1264            validator_overload_monitor_handle,
1265            checkpoint_metrics,
1266            iota_node_metrics,
1267            iota_tx_validator_metrics,
1268        )
1269        .await
1270    }
1271
1272    /// Initializes and starts components specific to the current
1273    /// epoch for the validator node.
1274    async fn start_epoch_specific_validator_components(
1275        config: &NodeConfig,
1276        state: Arc<AuthorityState>,
1277        consensus_adapter: Arc<ConsensusAdapter>,
1278        checkpoint_store: Arc<CheckpointStore>,
1279        epoch_store: Arc<AuthorityPerEpochStore>,
1280        state_sync_handle: state_sync::Handle,
1281        randomness_handle: randomness::Handle,
1282        consensus_manager: ConsensusManager,
1283        consensus_store_pruner: ConsensusStorePruner,
1284        accumulator: Weak<StateAccumulator>,
1285        validator_server_handle: JoinHandle<Result<()>>,
1286        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1287        checkpoint_metrics: Arc<CheckpointMetrics>,
1288        iota_node_metrics: Arc<IotaNodeMetrics>,
1289        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1290    ) -> Result<ValidatorComponents> {
1291        let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service(
1292            config,
1293            consensus_adapter.clone(),
1294            checkpoint_store,
1295            epoch_store.clone(),
1296            state.clone(),
1297            state_sync_handle,
1298            accumulator,
1299            checkpoint_metrics.clone(),
1300        );
1301
1302        // create a new map that gets injected into both the consensus handler and the
1303        // consensus adapter the consensus handler will write values forwarded
1304        // from consensus, and the consensus adapter will read the values to
1305        // make decisions about which validator submits a transaction to consensus
1306        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1307
1308        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1309
1310        let randomness_manager = RandomnessManager::try_new(
1311            Arc::downgrade(&epoch_store),
1312            Box::new(consensus_adapter.clone()),
1313            randomness_handle,
1314            config.authority_key_pair(),
1315        )
1316        .await;
1317        if let Some(randomness_manager) = randomness_manager {
1318            epoch_store
1319                .set_randomness_manager(randomness_manager)
1320                .await?;
1321        }
1322
1323        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1324            state.clone(),
1325            checkpoint_service.clone(),
1326            epoch_store.clone(),
1327            low_scoring_authorities,
1328        );
1329
1330        consensus_manager
1331            .start(
1332                config,
1333                epoch_store.clone(),
1334                consensus_handler_initializer,
1335                IotaTxValidator::new(
1336                    epoch_store.clone(),
1337                    checkpoint_service.clone(),
1338                    state.transaction_manager().clone(),
1339                    iota_tx_validator_metrics.clone(),
1340                ),
1341            )
1342            .await;
1343
1344        if epoch_store.authenticator_state_enabled() {
1345            Self::start_jwk_updater(
1346                config,
1347                iota_node_metrics,
1348                state.name,
1349                epoch_store.clone(),
1350                consensus_adapter.clone(),
1351            );
1352        }
1353
1354        Ok(ValidatorComponents {
1355            validator_server_handle,
1356            validator_overload_monitor_handle,
1357            consensus_manager,
1358            consensus_store_pruner,
1359            consensus_adapter,
1360            checkpoint_service_tasks,
1361            checkpoint_metrics,
1362            iota_tx_validator_metrics,
1363        })
1364    }
1365
1366    /// Starts the checkpoint service for the validator node, initializing
1367    /// necessary components and settings.
1368    /// The function ensures proper initialization of the checkpoint service,
1369    /// preparing it to handle checkpoint creation and submission to consensus,
1370    /// while also setting up the necessary monitoring and synchronization
1371    /// mechanisms.
1372    fn start_checkpoint_service(
1373        config: &NodeConfig,
1374        consensus_adapter: Arc<ConsensusAdapter>,
1375        checkpoint_store: Arc<CheckpointStore>,
1376        epoch_store: Arc<AuthorityPerEpochStore>,
1377        state: Arc<AuthorityState>,
1378        state_sync_handle: state_sync::Handle,
1379        accumulator: Weak<StateAccumulator>,
1380        checkpoint_metrics: Arc<CheckpointMetrics>,
1381    ) -> (Arc<CheckpointService>, JoinSet<()>) {
1382        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1383        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1384
1385        debug!(
1386            "Starting checkpoint service with epoch start timestamp {}
1387            and epoch duration {}",
1388            epoch_start_timestamp_ms, epoch_duration_ms
1389        );
1390
1391        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1392            sender: consensus_adapter,
1393            signer: state.secret.clone(),
1394            authority: config.authority_public_key(),
1395            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1396                .checked_add(epoch_duration_ms)
1397                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1398            metrics: checkpoint_metrics.clone(),
1399        });
1400
1401        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1402        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1403        let max_checkpoint_size_bytes =
1404            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1405
1406        CheckpointService::spawn(
1407            state.clone(),
1408            checkpoint_store,
1409            epoch_store,
1410            state.get_transaction_cache_reader().clone(),
1411            accumulator,
1412            checkpoint_output,
1413            Box::new(certified_checkpoint_output),
1414            checkpoint_metrics,
1415            max_tx_per_checkpoint,
1416            max_checkpoint_size_bytes,
1417        )
1418    }
1419
1420    fn construct_consensus_adapter(
1421        committee: &Committee,
1422        consensus_config: &ConsensusConfig,
1423        authority: AuthorityName,
1424        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1425        prometheus_registry: &Registry,
1426        consensus_client: Arc<dyn ConsensusClient>,
1427    ) -> ConsensusAdapter {
1428        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1429        // The consensus adapter allows the authority to send user certificates through
1430        // consensus.
1431
1432        ConsensusAdapter::new(
1433            consensus_client,
1434            authority,
1435            connection_monitor_status,
1436            consensus_config.max_pending_transactions(),
1437            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1438            consensus_config.max_submit_position,
1439            consensus_config.submit_delay_step_override(),
1440            ca_metrics,
1441        )
1442    }
1443
1444    async fn start_grpc_validator_service(
1445        config: &NodeConfig,
1446        state: Arc<AuthorityState>,
1447        consensus_adapter: Arc<ConsensusAdapter>,
1448        prometheus_registry: &Registry,
1449    ) -> Result<tokio::task::JoinHandle<Result<()>>> {
1450        let validator_service = ValidatorService::new(
1451            state.clone(),
1452            consensus_adapter,
1453            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1454            TrafficControllerMetrics::new(prometheus_registry),
1455            config.policy_config.clone(),
1456            config.firewall_config.clone(),
1457        );
1458
1459        let mut server_conf = iota_network_stack::config::Config::new();
1460        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1461        server_conf.load_shed = config.grpc_load_shed;
1462        let mut server_builder =
1463            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1464
1465        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1466
1467        let server = server_builder
1468            .bind(config.network_address())
1469            .await
1470            .map_err(|err| anyhow!(err.to_string()))?;
1471        let local_addr = server.local_addr();
1472        info!("Listening to traffic on {local_addr}");
1473        let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into));
1474
1475        Ok(grpc_server)
1476    }
1477
1478    pub fn state(&self) -> Arc<AuthorityState> {
1479        self.state.clone()
1480    }
1481
1482    // Only used for testing because of how epoch store is loaded.
1483    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1484        self.state.reference_gas_price_for_testing()
1485    }
1486
1487    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1488        self.state.committee_store().clone()
1489    }
1490
1491    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1492    // self.state.db()
1493    // }
1494
1495    /// Clone an AuthorityAggregator currently used in this node's
1496    /// QuorumDriver, if the node is a fullnode. After reconfig,
1497    /// QuorumDriver builds a new AuthorityAggregator. The caller
1498    /// of this function will mostly likely want to call this again
1499    /// to get a fresh one.
1500    pub fn clone_authority_aggregator(
1501        &self,
1502    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1503        self.transaction_orchestrator
1504            .as_ref()
1505            .map(|to| to.clone_authority_aggregator())
1506    }
1507
1508    pub fn transaction_orchestrator(
1509        &self,
1510    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1511        self.transaction_orchestrator.clone()
1512    }
1513
1514    pub fn subscribe_to_transaction_orchestrator_effects(
1515        &self,
1516    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1517        self.transaction_orchestrator
1518            .as_ref()
1519            .map(|to| to.subscribe_to_effects_queue())
1520            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1521    }
1522
1523    /// This function awaits the completion of checkpoint execution of the
1524    /// current epoch, after which it initiates reconfiguration of the
1525    /// entire system. This function also handles role changes for the node when
1526    /// epoch changes and advertises capabilities to the committee if the node
1527    /// is a validator.
1528    pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
1529        let checkpoint_executor_metrics =
1530            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1531
1532        loop {
1533            let mut accumulator_guard = self.accumulator.lock().await;
1534            let accumulator = accumulator_guard.take().unwrap();
1535            let mut checkpoint_executor = CheckpointExecutor::new(
1536                self.state_sync_handle.subscribe_to_synced_checkpoints(),
1537                self.checkpoint_store.clone(),
1538                self.state.clone(),
1539                accumulator.clone(),
1540                self.config.checkpoint_executor_config.clone(),
1541                checkpoint_executor_metrics.clone(),
1542            );
1543
1544            let run_with_range = self.config.run_with_range;
1545
1546            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1547
1548            // Advertise capabilities to committee, if we are a validator.
1549            if let Some(components) = &*self.validator_components.lock().await {
1550                // TODO: without this sleep, the consensus message is not delivered reliably.
1551                tokio::time::sleep(Duration::from_millis(1)).await;
1552
1553                let config = cur_epoch_store.protocol_config();
1554                let binary_config = to_binary_config(config);
1555                let transaction = ConsensusTransaction::new_capability_notification_v1(
1556                    AuthorityCapabilitiesV1::new(
1557                        self.state.name,
1558                        cur_epoch_store.get_chain_identifier().chain(),
1559                        self.config
1560                            .supported_protocol_versions
1561                            .expect("Supported versions should be populated")
1562                            // no need to send digests of versions less than the current version
1563                            .truncate_below(config.version),
1564                        self.state
1565                            .get_available_system_packages(&binary_config)
1566                            .await,
1567                    ),
1568                );
1569                info!(?transaction, "submitting capabilities to consensus");
1570                components
1571                    .consensus_adapter
1572                    .submit(transaction, None, &cur_epoch_store)?;
1573            }
1574
1575            let stop_condition = checkpoint_executor
1576                .run_epoch(cur_epoch_store.clone(), run_with_range)
1577                .await;
1578            drop(checkpoint_executor);
1579
1580            if stop_condition == StopReason::RunWithRangeCondition {
1581                IotaNode::shutdown(&self).await;
1582                self.shutdown_channel_tx
1583                    .send(run_with_range)
1584                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1585                return Ok(());
1586            }
1587
1588            // Safe to call because we are in the middle of reconfiguration.
1589            let latest_system_state = self
1590                .state
1591                .get_object_cache_reader()
1592                .get_iota_system_state_object_unsafe()
1593                .expect("Read IOTA System State object cannot fail");
1594
1595            #[cfg(msim)]
1596            if !self
1597                .sim_state
1598                .sim_safe_mode_expected
1599                .load(Ordering::Relaxed)
1600            {
1601                debug_assert!(!latest_system_state.safe_mode());
1602            }
1603
1604            #[cfg(not(msim))]
1605            debug_assert!(!latest_system_state.safe_mode());
1606
1607            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1608                if self.state.is_fullnode(&cur_epoch_store) {
1609                    warn!(
1610                        "Failed to send end of epoch notification to subscriber: {:?}",
1611                        err
1612                    );
1613                }
1614            }
1615
1616            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1617            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1618
1619            self.auth_agg.store(Arc::new(
1620                self.auth_agg
1621                    .load()
1622                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1623            ));
1624
1625            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1626            let next_epoch = next_epoch_committee.epoch();
1627            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1628
1629            info!(
1630                next_epoch,
1631                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1632            );
1633
1634            fail_point_async!("reconfig_delay");
1635
1636            // We save the connection monitor status map regardless of validator / fullnode
1637            // status so that we don't need to restart the connection monitor
1638            // every epoch. Update the mappings that will be used by the
1639            // consensus adapter if it exists or is about to be created.
1640            let authority_names_to_peer_ids =
1641                new_epoch_start_state.get_authority_names_to_peer_ids();
1642            self.connection_monitor_status
1643                .update_mapping_for_epoch(authority_names_to_peer_ids);
1644
1645            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1646
1647            send_trusted_peer_change(
1648                &self.config,
1649                &self.trusted_peer_change_tx,
1650                &new_epoch_start_state,
1651            );
1652
1653            // The following code handles 4 different cases, depending on whether the node
1654            // was a validator in the previous epoch, and whether the node is a validator
1655            // in the new epoch.
1656
1657            let new_validator_components = if let Some(ValidatorComponents {
1658                validator_server_handle,
1659                validator_overload_monitor_handle,
1660                consensus_manager,
1661                consensus_store_pruner,
1662                consensus_adapter,
1663                mut checkpoint_service_tasks,
1664                checkpoint_metrics,
1665                iota_tx_validator_metrics,
1666            }) = self.validator_components.lock().await.take()
1667            {
1668                info!("Reconfiguring the validator.");
1669                // Cancel the old checkpoint service tasks.
1670                // Waiting for checkpoint builder to finish gracefully is not possible, because
1671                // it may wait on transactions while consensus on peers have
1672                // already shut down.
1673                checkpoint_service_tasks.abort_all();
1674                while let Some(result) = checkpoint_service_tasks.join_next().await {
1675                    if let Err(err) = result {
1676                        if err.is_panic() {
1677                            std::panic::resume_unwind(err.into_panic());
1678                        }
1679                        warn!("Error in checkpoint service task: {:?}", err);
1680                    }
1681                }
1682                info!("Checkpoint service has shut down.");
1683
1684                consensus_manager.shutdown().await;
1685                info!("Consensus has shut down.");
1686
1687                let new_epoch_store = self
1688                    .reconfigure_state(
1689                        &self.state,
1690                        &cur_epoch_store,
1691                        next_epoch_committee.clone(),
1692                        new_epoch_start_state,
1693                        accumulator.clone(),
1694                    )
1695                    .await?;
1696                info!("Epoch store finished reconfiguration.");
1697
1698                // No other components should be holding a strong reference to state accumulator
1699                // at this point. Confirm here before we swap in the new accumulator.
1700                let accumulator_metrics = Arc::into_inner(accumulator)
1701                    .expect("Accumulator should have no other references at this point")
1702                    .metrics();
1703                let new_accumulator = Arc::new(StateAccumulator::new(
1704                    self.state.get_accumulator_store().clone(),
1705                    accumulator_metrics,
1706                ));
1707                let weak_accumulator = Arc::downgrade(&new_accumulator);
1708                *accumulator_guard = Some(new_accumulator);
1709
1710                consensus_store_pruner.prune(next_epoch).await;
1711
1712                if self.state.is_validator(&new_epoch_store) {
1713                    // Only restart consensus if this node is still a validator in the new epoch.
1714                    Some(
1715                        Self::start_epoch_specific_validator_components(
1716                            &self.config,
1717                            self.state.clone(),
1718                            consensus_adapter,
1719                            self.checkpoint_store.clone(),
1720                            new_epoch_store.clone(),
1721                            self.state_sync_handle.clone(),
1722                            self.randomness_handle.clone(),
1723                            consensus_manager,
1724                            consensus_store_pruner,
1725                            weak_accumulator,
1726                            validator_server_handle,
1727                            validator_overload_monitor_handle,
1728                            checkpoint_metrics,
1729                            self.metrics.clone(),
1730                            iota_tx_validator_metrics,
1731                        )
1732                        .await?,
1733                    )
1734                } else {
1735                    info!("This node is no longer a validator after reconfiguration");
1736
1737                    consensus_adapter.unregister_consensus_adapter_metrics(
1738                        &self.registry_service.default_registry(),
1739                    );
1740                    debug!("Unregistered consensus adapter metrics");
1741                    None
1742                }
1743            } else {
1744                let new_epoch_store = self
1745                    .reconfigure_state(
1746                        &self.state,
1747                        &cur_epoch_store,
1748                        next_epoch_committee.clone(),
1749                        new_epoch_start_state,
1750                        accumulator.clone(),
1751                    )
1752                    .await?;
1753
1754                // No other components should be holding a strong reference to state accumulator
1755                // at this point. Confirm here before we swap in the new accumulator.
1756                let accumulator_metrics = Arc::into_inner(accumulator)
1757                    .expect("Accumulator should have no other references at this point")
1758                    .metrics();
1759                let new_accumulator = Arc::new(StateAccumulator::new(
1760                    self.state.get_accumulator_store().clone(),
1761                    accumulator_metrics,
1762                ));
1763                let weak_accumulator = Arc::downgrade(&new_accumulator);
1764                *accumulator_guard = Some(new_accumulator);
1765
1766                if self.state.is_validator(&new_epoch_store) {
1767                    info!("Promoting the node from fullnode to validator, starting grpc server");
1768
1769                    Some(
1770                        Self::construct_validator_components(
1771                            self.config.clone(),
1772                            self.state.clone(),
1773                            Arc::new(next_epoch_committee.clone()),
1774                            new_epoch_store.clone(),
1775                            self.checkpoint_store.clone(),
1776                            self.state_sync_handle.clone(),
1777                            self.randomness_handle.clone(),
1778                            weak_accumulator,
1779                            self.connection_monitor_status.clone(),
1780                            &self.registry_service,
1781                            self.metrics.clone(),
1782                        )
1783                        .await?,
1784                    )
1785                } else {
1786                    None
1787                }
1788            };
1789            *self.validator_components.lock().await = new_validator_components;
1790
1791            // Force releasing current epoch store DB handle, because the
1792            // Arc<AuthorityPerEpochStore> may linger.
1793            cur_epoch_store.release_db_handles();
1794
1795            if cfg!(msim)
1796                && !matches!(
1797                    self.config
1798                        .authority_store_pruning_config
1799                        .num_epochs_to_retain_for_checkpoints(),
1800                    None | Some(u64::MAX) | Some(0)
1801                )
1802            {
1803                self.state
1804                .prune_checkpoints_for_eligible_epochs_for_testing(
1805                    self.config.clone(),
1806                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1807                )
1808                .await?;
1809            }
1810
1811            info!("Reconfiguration finished");
1812        }
1813    }
1814
1815    async fn shutdown(&self) {
1816        if let Some(validator_components) = &*self.validator_components.lock().await {
1817            validator_components.consensus_manager.shutdown().await;
1818        }
1819    }
1820
1821    /// Asynchronously reconfigures the state of the authority node for the next
1822    /// epoch.
1823    async fn reconfigure_state(
1824        &self,
1825        state: &Arc<AuthorityState>,
1826        cur_epoch_store: &AuthorityPerEpochStore,
1827        next_epoch_committee: Committee,
1828        next_epoch_start_system_state: EpochStartSystemState,
1829        accumulator: Arc<StateAccumulator>,
1830    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1831        let next_epoch = next_epoch_committee.epoch();
1832
1833        let last_checkpoint = self
1834            .checkpoint_store
1835            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1836            .expect("Error loading last checkpoint for current epoch")
1837            .expect("Could not load last checkpoint for current epoch");
1838        let epoch_supply_change = last_checkpoint
1839            .end_of_epoch_data
1840            .as_ref()
1841            .ok_or_else(|| {
1842                IotaError::from("last checkpoint in epoch should contain end of epoch data")
1843            })?
1844            .epoch_supply_change;
1845
1846        let epoch_start_configuration = EpochStartConfiguration::new(
1847            next_epoch_start_system_state,
1848            *last_checkpoint.digest(),
1849            state.get_object_store().as_ref(),
1850            EpochFlag::default_flags_for_new_epoch(&state.config),
1851        )
1852        .expect("EpochStartConfiguration construction cannot fail");
1853
1854        let new_epoch_store = self
1855            .state
1856            .reconfigure(
1857                cur_epoch_store,
1858                self.config.supported_protocol_versions.unwrap(),
1859                next_epoch_committee,
1860                epoch_start_configuration,
1861                accumulator,
1862                &self.config.expensive_safety_check_config,
1863                epoch_supply_change,
1864            )
1865            .await
1866            .expect("Reconfigure authority state cannot fail");
1867        info!(next_epoch, "Node State has been reconfigured");
1868        assert_eq!(next_epoch, new_epoch_store.epoch());
1869        self.state.get_reconfig_api().update_epoch_flags_metrics(
1870            cur_epoch_store.epoch_start_config().flags(),
1871            new_epoch_store.epoch_start_config().flags(),
1872        );
1873
1874        Ok(new_epoch_store)
1875    }
1876
1877    pub fn get_config(&self) -> &NodeConfig {
1878        &self.config
1879    }
1880
1881    async fn execute_transaction_immediately_at_zero_epoch(
1882        state: &Arc<AuthorityState>,
1883        epoch_store: &Arc<AuthorityPerEpochStore>,
1884        tx: &Transaction,
1885        span: tracing::Span,
1886    ) {
1887        let transaction =
1888            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
1889                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
1890                    tx.data().clone(),
1891                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
1892                ),
1893            );
1894        state
1895            .try_execute_immediately(&transaction, None, epoch_store)
1896            .instrument(span)
1897            .await
1898            .unwrap();
1899    }
1900
1901    pub fn randomness_handle(&self) -> randomness::Handle {
1902        self.randomness_handle.clone()
1903    }
1904}
1905
1906#[cfg(not(msim))]
1907impl IotaNode {
1908    async fn fetch_jwks(
1909        _authority: AuthorityName,
1910        provider: &OIDCProvider,
1911    ) -> IotaResult<Vec<(JwkId, JWK)>> {
1912        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
1913        let client = reqwest::Client::new();
1914        fetch_jwks(provider, &client)
1915            .await
1916            .map_err(|_| IotaError::JWKRetrieval)
1917    }
1918}
1919
1920#[cfg(msim)]
1921impl IotaNode {
1922    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
1923        self.sim_state.sim_node.id()
1924    }
1925
1926    pub fn set_safe_mode_expected(&self, new_value: bool) {
1927        info!("Setting safe mode expected to {}", new_value);
1928        self.sim_state
1929            .sim_safe_mode_expected
1930            .store(new_value, Ordering::Relaxed);
1931    }
1932
1933    async fn fetch_jwks(
1934        authority: AuthorityName,
1935        provider: &OIDCProvider,
1936    ) -> IotaResult<Vec<(JwkId, JWK)>> {
1937        get_jwk_injector()(authority, provider)
1938    }
1939}
1940
1941/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
1942/// available.
1943fn send_trusted_peer_change(
1944    config: &NodeConfig,
1945    sender: &watch::Sender<TrustedPeerChangeEvent>,
1946    new_epoch_start_state: &EpochStartSystemState,
1947) {
1948    let new_committee =
1949        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
1950
1951    sender.send_modify(|event| {
1952        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
1953        event.new_committee = new_committee;
1954    })
1955}
1956
1957fn build_kv_store(
1958    state: &Arc<AuthorityState>,
1959    config: &NodeConfig,
1960    registry: &Registry,
1961) -> Result<Arc<TransactionKeyValueStore>> {
1962    let metrics = KeyValueStoreMetrics::new(registry);
1963    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
1964
1965    let base_url = &config.transaction_kv_store_read_config.base_url;
1966
1967    if base_url.is_empty() {
1968        info!("no http kv store url provided, using local db only");
1969        return Ok(Arc::new(db_store));
1970    }
1971
1972    base_url.parse::<url::Url>().tap_err(|e| {
1973        error!(
1974            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
1975            base_url, e
1976        )
1977    })?;
1978
1979    let http_store = HttpKVStore::new_kv(base_url, metrics.clone())?;
1980    info!("using local key-value store with fallback to http key-value store");
1981    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
1982        db_store,
1983        http_store,
1984        metrics,
1985        "json_rpc_fallback",
1986    )))
1987}
1988
1989/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
1990/// REST APIs based on the node's configuration.
1991///
1992/// This function performs the following tasks:
1993/// 1. Checks if the node is a validator by inspecting the consensus
1994///    configuration; if so, it returns early as validators do not expose these
1995///    APIs.
1996/// 2. Creates an Axum router to handle HTTP requests.
1997/// 3. Initializes the JSON-RPC server and registers various RPC modules based
1998///    on the node's state and configuration, including CoinApi,
1999///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2000///    IndexerApi.
2001/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2002///    the `/api/v1` path.
2003/// 5. Binds the server to the specified JSON-RPC address and starts listening
2004///    for incoming connections.
2005pub async fn build_http_server(
2006    state: Arc<AuthorityState>,
2007    store: RocksDbStore,
2008    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2009    config: &NodeConfig,
2010    prometheus_registry: &Registry,
2011    _custom_runtime: Option<Handle>,
2012    software_version: &'static str,
2013) -> Result<Option<tokio::task::JoinHandle<()>>> {
2014    // Validators do not expose these APIs
2015    if config.consensus_config().is_some() {
2016        return Ok(None);
2017    }
2018
2019    let mut router = axum::Router::new();
2020
2021    let json_rpc_router = {
2022        let mut server = JsonRpcServerBuilder::new(
2023            env!("CARGO_PKG_VERSION"),
2024            prometheus_registry,
2025            config.policy_config.clone(),
2026            config.firewall_config.clone(),
2027        );
2028
2029        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2030
2031        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2032        server.register_module(ReadApi::new(
2033            state.clone(),
2034            kv_store.clone(),
2035            metrics.clone(),
2036        ))?;
2037        server.register_module(CoinReadApi::new(
2038            state.clone(),
2039            kv_store.clone(),
2040            metrics.clone(),
2041        )?)?;
2042
2043        // if run_with_range is enabled we want to prevent any transactions
2044        // run_with_range = None is normal operating conditions
2045        if config.run_with_range.is_none() {
2046            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2047        }
2048        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2049        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2050
2051        if let Some(transaction_orchestrator) = transaction_orchestrator {
2052            server.register_module(TransactionExecutionApi::new(
2053                state.clone(),
2054                transaction_orchestrator.clone(),
2055                metrics.clone(),
2056            ))?;
2057        }
2058
2059        // TODO: Init from chain if config is not set once `IotaNamesConfig::from_chain`
2060        // is implemented
2061        let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2062
2063        server.register_module(IndexerApi::new(
2064            state.clone(),
2065            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2066            kv_store,
2067            metrics,
2068            iota_names_config,
2069            config.indexer_max_subscriptions,
2070        ))?;
2071        server.register_module(MoveUtils::new(state.clone()))?;
2072
2073        let server_type = config.jsonrpc_server_type();
2074
2075        server.to_router(server_type).await?
2076    };
2077
2078    router = router.merge(json_rpc_router);
2079
2080    if config.enable_rest_api {
2081        let mut rest_service = iota_rest_api::RestService::new(
2082            Arc::new(RestReadStore::new(state, store)),
2083            software_version,
2084        );
2085
2086        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2087
2088        if let Some(transaction_orchestrator) = transaction_orchestrator {
2089            rest_service.with_executor(transaction_orchestrator.clone())
2090        }
2091
2092        router = router.merge(rest_service.into_router());
2093    }
2094
2095    let listener = tokio::net::TcpListener::bind(&config.json_rpc_address)
2096        .await
2097        .unwrap();
2098    let addr = listener.local_addr().unwrap();
2099
2100    router = router.layer(axum::middleware::from_fn(server_timing_middleware));
2101
2102    let handle = tokio::spawn(async move {
2103        axum::serve(
2104            listener,
2105            router.into_make_service_with_connect_info::<SocketAddr>(),
2106        )
2107        .await
2108        .unwrap()
2109    });
2110
2111    info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
2112
2113    Ok(Some(handle))
2114}
2115
2116#[cfg(not(test))]
2117fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2118    protocol_config.max_transactions_per_checkpoint() as usize
2119}
2120
2121#[cfg(test)]
2122fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2123    2
2124}