iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    net::SocketAddr,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::TryFutureExt;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_config::{
29    ConsensusConfig, NodeConfig,
30    node::{DBCheckpointConfig, RunWithRange},
31    node_config_metrics::NodeConfigMetrics,
32    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
33};
34use iota_core::{
35    authority::{
36        AuthorityState, AuthorityStore, CHAIN_IDENTIFIER, RandomnessRoundReceiver,
37        authority_per_epoch_store::AuthorityPerEpochStore,
38        authority_store_tables::{AuthorityPerpetualTables, AuthorityPerpetualTablesOptions},
39        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
40    },
41    authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
42    authority_client::NetworkAuthorityClient,
43    authority_server::{ValidatorService, ValidatorServiceMetrics},
44    checkpoints::{
45        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
46        SubmitCheckpointToConsensus,
47        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
48    },
49    connection_monitor::ConnectionMonitor,
50    consensus_adapter::{
51        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
52        ConsensusClient,
53    },
54    consensus_handler::ConsensusHandlerInitializer,
55    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
56    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
57    db_checkpoint_handler::DBCheckpointHandler,
58    epoch::{
59        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
60        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
61        reconfiguration::ReconfigurationInitiator,
62    },
63    execution_cache::build_execution_cache,
64    module_cache_metrics::ResolverMetrics,
65    overload_monitor::overload_monitor,
66    rest_index::RestIndexStore,
67    safe_client::SafeClientMetricsBase,
68    signature_verifier::SignatureVerifierMetrics,
69    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
70    storage::{RestReadStore, RocksDbStore},
71    traffic_controller::metrics::TrafficControllerMetrics,
72    transaction_orchestrator::TransactionOrchestrator,
73    validator_tx_finalizer::ValidatorTxFinalizer,
74};
75use iota_json_rpc::{
76    JsonRpcServerBuilder, bridge_api::BridgeReadApi, coin_api::CoinReadApi,
77    governance_api::GovernanceReadApi, indexer_api::IndexerApi, move_utils::MoveUtils,
78    read_api::ReadApi, transaction_builder_api::TransactionBuilderApi,
79    transaction_execution_api::TransactionExecutionApi,
80};
81use iota_json_rpc_api::JsonRpcMetrics;
82use iota_macros::{fail_point, fail_point_async, replay_log};
83use iota_metrics::{
84    RegistryID, RegistryService,
85    hardware_metrics::register_hardware_metrics,
86    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
87    server_timing_middleware, spawn_monitored_task,
88};
89use iota_network::{
90    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
91};
92use iota_network_stack::server::ServerBuilder;
93use iota_protocol_config::ProtocolConfig;
94use iota_rest_api::RestMetrics;
95use iota_snapshot::uploader::StateSnapshotUploader;
96use iota_storage::{
97    FileCompression, IndexStore, StorageFormat,
98    http_key_value_store::HttpKVStore,
99    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
100    key_value_store_metrics::KeyValueStoreMetrics,
101};
102use iota_types::{
103    base_types::{AuthorityName, ConciseableName, EpochId},
104    committee::Committee,
105    crypto::{KeypairTraits, RandomnessRound},
106    digests::ChainIdentifier,
107    error::{IotaError, IotaResult},
108    execution_config_utils::to_binary_config,
109    iota_system_state::{
110        IotaSystemState, IotaSystemStateTrait,
111        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
112    },
113    messages_consensus::{AuthorityCapabilitiesV1, ConsensusTransaction, check_total_jwk_size},
114    quorum_driver_types::QuorumDriverEffectsQueueResult,
115    supported_protocol_versions::SupportedProtocolVersions,
116    transaction::Transaction,
117};
118use prometheus::Registry;
119#[cfg(msim)]
120pub use simulator::set_jwk_injector;
121#[cfg(msim)]
122use simulator::*;
123use tap::tap::TapFallible;
124use tokio::{
125    runtime::Handle,
126    sync::{Mutex, broadcast, mpsc, watch},
127    task::{JoinHandle, JoinSet},
128};
129use tower::ServiceBuilder;
130use tracing::{Instrument, debug, error, error_span, info, warn};
131use typed_store::{DBMetrics, rocks::default_db_options};
132
133use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
134
135pub mod admin;
136mod handle;
137pub mod metrics;
138
139pub struct ValidatorComponents {
140    validator_server_handle: JoinHandle<Result<()>>,
141    validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
142    validator_overload_monitor_handle: Option<JoinHandle<()>>,
143    consensus_manager: ConsensusManager,
144    consensus_store_pruner: ConsensusStorePruner,
145    consensus_adapter: Arc<ConsensusAdapter>,
146    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
147    checkpoint_service_tasks: JoinSet<()>,
148    checkpoint_metrics: Arc<CheckpointMetrics>,
149    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
150    validator_registry_id: RegistryID,
151}
152
153#[cfg(msim)]
154mod simulator {
155    use std::sync::atomic::AtomicBool;
156
157    use super::*;
158
159    pub(super) struct SimState {
160        pub sim_node: iota_simulator::runtime::NodeHandle,
161        pub sim_safe_mode_expected: AtomicBool,
162        _leak_detector: iota_simulator::NodeLeakDetector,
163    }
164
165    impl Default for SimState {
166        fn default() -> Self {
167            Self {
168                sim_node: iota_simulator::runtime::NodeHandle::current(),
169                sim_safe_mode_expected: AtomicBool::new(false),
170                _leak_detector: iota_simulator::NodeLeakDetector::new(),
171            }
172        }
173    }
174
175    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
176        + Send
177        + Sync
178        + 'static;
179
180    fn default_fetch_jwks(
181        _authority: AuthorityName,
182        _provider: &OIDCProvider,
183    ) -> IotaResult<Vec<(JwkId, JWK)>> {
184        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
185        // Just load a default Twitch jwk for testing.
186        parse_jwks(
187            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
188            &OIDCProvider::Twitch,
189        )
190        .map_err(|_| IotaError::JWKRetrieval)
191    }
192
193    thread_local! {
194        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
195    }
196
197    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
198        JWK_INJECTOR.with(|injector| injector.borrow().clone())
199    }
200
201    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
202        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
203    }
204}
205
206pub struct IotaNode {
207    config: NodeConfig,
208    validator_components: Mutex<Option<ValidatorComponents>>,
209    /// The http server responsible for serving JSON-RPC as well as the
210    /// experimental rest service
211    _http_server: Option<tokio::task::JoinHandle<()>>,
212    state: Arc<AuthorityState>,
213    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
214    registry_service: RegistryService,
215    metrics: Arc<IotaNodeMetrics>,
216
217    _discovery: discovery::Handle,
218    state_sync_handle: state_sync::Handle,
219    randomness_handle: randomness::Handle,
220    checkpoint_store: Arc<CheckpointStore>,
221    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
222    connection_monitor_status: Arc<ConnectionMonitorStatus>,
223
224    /// Broadcast channel to send the starting system state for the next epoch.
225    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
226
227    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
228    /// peers.
229    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
230
231    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
232
233    #[cfg(msim)]
234    sim_state: SimState,
235
236    _state_archive_handle: Option<broadcast::Sender<()>>,
237
238    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
239    // Channel to allow signaling upstream to shutdown iota-node
240    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
241
242    /// AuthorityAggregator of the network, created at start and beginning of
243    /// each epoch. Use ArcSwap so that we could mutate it without taking
244    /// mut reference.
245    // TODO: Eventually we can make this auth aggregator a shared reference so that this
246    // update will automatically propagate to other uses.
247    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
248}
249
250impl fmt::Debug for IotaNode {
251    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
252        f.debug_struct("IotaNode")
253            .field("name", &self.state.name.concise())
254            .finish()
255    }
256}
257
258static MAX_JWK_KEYS_PER_FETCH: usize = 100;
259
260impl IotaNode {
261    pub async fn start(
262        config: NodeConfig,
263        registry_service: RegistryService,
264        custom_rpc_runtime: Option<Handle>,
265    ) -> Result<Arc<IotaNode>> {
266        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
267    }
268
269    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
270    /// configuration.
271    /// This function ensures continuous fetching, validation, and submission of
272    /// JWKs, maintaining up-to-date keys for the specified providers.
273    fn start_jwk_updater(
274        config: &NodeConfig,
275        metrics: Arc<IotaNodeMetrics>,
276        authority: AuthorityName,
277        epoch_store: Arc<AuthorityPerEpochStore>,
278        consensus_adapter: Arc<ConsensusAdapter>,
279    ) {
280        let epoch = epoch_store.epoch();
281
282        let supported_providers = config
283            .zklogin_oauth_providers
284            .get(&epoch_store.get_chain_identifier().chain())
285            .unwrap_or(&BTreeSet::new())
286            .iter()
287            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
288            .collect::<Vec<_>>();
289
290        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
291
292        info!(
293            ?fetch_interval,
294            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
295        );
296
297        fn validate_jwk(
298            metrics: &Arc<IotaNodeMetrics>,
299            provider: &OIDCProvider,
300            id: &JwkId,
301            jwk: &JWK,
302        ) -> bool {
303            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
304                warn!(
305                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
306                    id.iss, provider
307                );
308                metrics
309                    .invalid_jwks
310                    .with_label_values(&[&provider.to_string()])
311                    .inc();
312                return false;
313            };
314
315            if iss_provider != *provider {
316                warn!(
317                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
318                    id.iss, provider, iss_provider
319                );
320                metrics
321                    .invalid_jwks
322                    .with_label_values(&[&provider.to_string()])
323                    .inc();
324                return false;
325            }
326
327            if !check_total_jwk_size(id, jwk) {
328                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
329                metrics
330                    .invalid_jwks
331                    .with_label_values(&[&provider.to_string()])
332                    .inc();
333                return false;
334            }
335
336            true
337        }
338
339        // metrics is:
340        //  pub struct IotaNodeMetrics {
341        //      pub jwk_requests: IntCounterVec,
342        //      pub jwk_request_errors: IntCounterVec,
343        //      pub total_jwks: IntCounterVec,
344        //      pub unique_jwks: IntCounterVec,
345        //  }
346
347        for p in supported_providers.into_iter() {
348            let provider_str = p.to_string();
349            let epoch_store = epoch_store.clone();
350            let consensus_adapter = consensus_adapter.clone();
351            let metrics = metrics.clone();
352            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
353                async move {
354                    // note: restart-safe de-duplication happens after consensus, this is
355                    // just best-effort to reduce unneeded submissions.
356                    let mut seen = HashSet::new();
357                    loop {
358                        info!("fetching JWK for provider {:?}", p);
359                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
360                        match Self::fetch_jwks(authority, &p).await {
361                            Err(e) => {
362                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
363                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
364                                // Retry in 30 seconds
365                                tokio::time::sleep(Duration::from_secs(30)).await;
366                                continue;
367                            }
368                            Ok(mut keys) => {
369                                metrics.total_jwks
370                                    .with_label_values(&[&provider_str])
371                                    .inc_by(keys.len() as u64);
372
373                                keys.retain(|(id, jwk)| {
374                                    validate_jwk(&metrics, &p, id, jwk) &&
375                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
376                                    seen.insert((id.clone(), jwk.clone()))
377                                });
378
379                                metrics.unique_jwks
380                                    .with_label_values(&[&provider_str])
381                                    .inc_by(keys.len() as u64);
382
383                                // prevent oauth providers from sending too many keys,
384                                // inadvertently or otherwise
385                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
386                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
387                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
388                                }
389
390                                for (id, jwk) in keys.into_iter() {
391                                    info!("Submitting JWK to consensus: {:?}", id);
392
393                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
394                                    consensus_adapter.submit(txn, None, &epoch_store)
395                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
396                                        .ok();
397                                }
398                            }
399                        }
400                        tokio::time::sleep(fetch_interval).await;
401                    }
402                }
403                .instrument(error_span!("jwk_updater_task", epoch)),
404            ));
405        }
406    }
407
408    pub async fn start_async(
409        config: NodeConfig,
410        registry_service: RegistryService,
411        custom_rpc_runtime: Option<Handle>,
412        software_version: &'static str,
413    ) -> Result<Arc<IotaNode>> {
414        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
415        let mut config = config.clone();
416        if config.supported_protocol_versions.is_none() {
417            info!(
418                "populating config.supported_protocol_versions with default {:?}",
419                SupportedProtocolVersions::SYSTEM_DEFAULT
420            );
421            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
422        }
423
424        let run_with_range = config.run_with_range;
425        let is_validator = config.consensus_config().is_some();
426        let is_full_node = !is_validator;
427        let prometheus_registry = registry_service.default_registry();
428
429        info!(node =? config.authority_public_key(),
430            "Initializing iota-node listening on {}", config.network_address
431        );
432
433        let genesis = config.genesis()?.clone();
434
435        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
436        // It's ok if the value is already set due to data races.
437        let _ = CHAIN_IDENTIFIER.set(chain_identifier);
438        info!("IOTA chain identifier: {chain_identifier}");
439
440        // Initialize metrics to track db usage before creating any stores
441        DBMetrics::init(&prometheus_registry);
442
443        // Initialize IOTA metrics.
444        iota_metrics::init_metrics(&prometheus_registry);
445        // Unsupported (because of the use of static variable) and unnecessary in
446        // simtests.
447        #[cfg(not(msim))]
448        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
449
450        // Register hardware metrics.
451        register_hardware_metrics(&registry_service, &config.db_path)
452            .expect("Failed registering hardware metrics");
453        // Register uptime metric
454        prometheus_registry
455            .register(iota_metrics::uptime_metric(
456                if is_validator {
457                    "validator"
458                } else {
459                    "fullnode"
460                },
461                software_version,
462                &chain_identifier.to_string(),
463            ))
464            .expect("Failed registering uptime metric");
465
466        // If genesis come with some migration data then load them into memory from the
467        // file path specified in config.
468        let migration_tx_data = if genesis.contains_migrations() {
469            // Here the load already verifies that the content of the migration blob is
470            // valid in respect to the content found in genesis
471            Some(config.load_migration_tx_data()?)
472        } else {
473            None
474        };
475
476        let secret = Arc::pin(config.authority_key_pair().copy());
477        let genesis_committee = genesis.committee()?;
478        let committee_store = Arc::new(CommitteeStore::new(
479            config.db_path().join("epochs"),
480            &genesis_committee,
481            None,
482        ));
483
484        // By default, only enable write stall on validators for perpetual db.
485        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
486        let perpetual_tables_options = AuthorityPerpetualTablesOptions { enable_write_stall };
487        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
488            &config.db_path().join("store"),
489            Some(perpetual_tables_options),
490        ));
491        let is_genesis = perpetual_tables
492            .database_is_empty()
493            .expect("Database read should not fail at init.");
494        let store = AuthorityStore::open(
495            perpetual_tables,
496            &genesis,
497            &config,
498            &prometheus_registry,
499            migration_tx_data.as_ref(),
500        )
501        .await?;
502
503        let cur_epoch = store.get_recovery_epoch_at_restart()?;
504        let committee = committee_store
505            .get_committee(&cur_epoch)?
506            .expect("Committee of the current epoch must exist");
507        let epoch_start_configuration = store
508            .get_epoch_start_configuration()?
509            .expect("EpochStartConfiguration of the current epoch must exist");
510        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
511        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
512
513        let cache_traits =
514            build_execution_cache(&epoch_start_configuration, &prometheus_registry, &store);
515
516        let auth_agg = {
517            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
518            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
519            Arc::new(ArcSwap::new(Arc::new(
520                AuthorityAggregator::new_from_epoch_start_state(
521                    epoch_start_configuration.epoch_start_state(),
522                    &committee_store,
523                    safe_client_metrics_base,
524                    auth_agg_metrics,
525                ),
526            )))
527        };
528
529        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
530        let epoch_store = AuthorityPerEpochStore::new(
531            config.authority_public_key(),
532            committee.clone(),
533            &config.db_path().join("store"),
534            Some(epoch_options.options),
535            EpochMetrics::new(&registry_service.default_registry()),
536            epoch_start_configuration,
537            cache_traits.backing_package_store.clone(),
538            cache_traits.object_store.clone(),
539            cache_metrics,
540            signature_verifier_metrics,
541            &config.expensive_safety_check_config,
542            ChainIdentifier::from(*genesis.checkpoint().digest()),
543        );
544
545        info!("created epoch store");
546
547        replay_log!(
548            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
549            epoch_store.epoch(),
550            epoch_store.protocol_config()
551        );
552
553        // the database is empty at genesis time
554        if is_genesis {
555            info!("checking IOTA conservation at genesis");
556            // When we are opening the db table, the only time when it's safe to
557            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
558            // an epoch and the IOTA conservation check will fail. This also initialize
559            // the expected_network_iota_amount table.
560            cache_traits
561                .reconfig_api
562                .expensive_check_iota_conservation(&epoch_store, None)
563                .expect("IOTA conservation check cannot fail at genesis");
564        }
565
566        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
567        let default_buffer_stake = epoch_store
568            .protocol_config()
569            .buffer_stake_for_protocol_upgrade_bps();
570        if effective_buffer_stake != default_buffer_stake {
571            warn!(
572                ?effective_buffer_stake,
573                ?default_buffer_stake,
574                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
575            );
576        }
577
578        info!("creating checkpoint store");
579
580        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
581        checkpoint_store.insert_genesis_checkpoint(
582            genesis.checkpoint(),
583            genesis.checkpoint_contents().clone(),
584            &epoch_store,
585        );
586
587        info!("creating state sync store");
588        let state_sync_store = RocksDbStore::new(
589            cache_traits.clone(),
590            committee_store.clone(),
591            checkpoint_store.clone(),
592        );
593
594        let index_store = if is_full_node && config.enable_index_processing {
595            info!("creating index store");
596            Some(Arc::new(IndexStore::new(
597                config.db_path().join("indexes"),
598                &prometheus_registry,
599                epoch_store
600                    .protocol_config()
601                    .max_move_identifier_len_as_option(),
602                config.remove_deprecated_tables,
603            )))
604        } else {
605            None
606        };
607
608        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
609        {
610            Some(Arc::new(RestIndexStore::new(
611                config.db_path().join("rest_index"),
612                &store,
613                &checkpoint_store,
614                &epoch_store,
615                &cache_traits.backing_package_store,
616            )))
617        } else {
618            None
619        };
620
621        info!("creating archive reader");
622        // Create network
623        // TODO only configure validators as seed/preferred peers for validators and not
624        // for fullnodes once we've had a chance to re-work fullnode
625        // configuration generation.
626        let archive_readers =
627            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
628        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
629        let (randomness_tx, randomness_rx) = mpsc::channel(
630            config
631                .p2p_config
632                .randomness
633                .clone()
634                .unwrap_or_default()
635                .mailbox_capacity(),
636        );
637        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
638            Self::create_p2p_network(
639                &config,
640                state_sync_store.clone(),
641                chain_identifier,
642                trusted_peer_change_rx,
643                archive_readers.clone(),
644                randomness_tx,
645                &prometheus_registry,
646            )?;
647
648        // We must explicitly send this instead of relying on the initial value to
649        // trigger watch value change, so that state-sync is able to process it.
650        send_trusted_peer_change(
651            &config,
652            &trusted_peer_change_tx,
653            epoch_store.epoch_start_state(),
654        );
655
656        info!("start state archival");
657        // Start archiving local state to remote store
658        let state_archive_handle =
659            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
660                .await?;
661
662        info!("start snapshot upload");
663        // Start uploading state snapshot to remote store
664        let state_snapshot_handle =
665            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
666
667        // Start uploading db checkpoints to remote store
668        info!("start db checkpoint");
669        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
670            &config,
671            &prometheus_registry,
672            state_snapshot_handle.is_some(),
673        )?;
674
675        let mut genesis_objects = genesis.objects().to_vec();
676        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
677            genesis_objects.extend(migration_tx_data.get_objects());
678        }
679
680        let authority_name = config.authority_public_key();
681        let validator_tx_finalizer =
682            config
683                .enable_validator_tx_finalizer
684                .then_some(Arc::new(ValidatorTxFinalizer::new(
685                    auth_agg.clone(),
686                    authority_name,
687                    &prometheus_registry,
688                )));
689
690        info!("create authority state");
691        let state = AuthorityState::new(
692            authority_name,
693            secret,
694            config.supported_protocol_versions.unwrap(),
695            store.clone(),
696            cache_traits.clone(),
697            epoch_store.clone(),
698            committee_store.clone(),
699            index_store.clone(),
700            rest_index,
701            checkpoint_store.clone(),
702            &prometheus_registry,
703            &genesis_objects,
704            &db_checkpoint_config,
705            config.clone(),
706            config.indirect_objects_threshold,
707            archive_readers,
708            validator_tx_finalizer,
709        )
710        .await;
711
712        // ensure genesis and migration txs were executed
713        if epoch_store.epoch() == 0 {
714            let genesis_tx = &genesis.transaction();
715            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
716            // Execute genesis transaction
717            Self::execute_transaction_immediately_at_zero_epoch(
718                &state,
719                &epoch_store,
720                genesis_tx,
721                span,
722            )
723            .await;
724
725            // Execute migration transactions if present
726            if let Some(migration_tx_data) = migration_tx_data {
727                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
728                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
729                    Self::execute_transaction_immediately_at_zero_epoch(
730                        &state,
731                        &epoch_store,
732                        tx,
733                        span,
734                    )
735                    .await;
736                }
737            }
738        }
739
740        checkpoint_store
741            .reexecute_local_checkpoints(&state, &epoch_store)
742            .await;
743
744        // Start the loop that receives new randomness and generates transactions for
745        // it.
746        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
747
748        if config
749            .expensive_safety_check_config
750            .enable_secondary_index_checks()
751        {
752            if let Some(indexes) = state.indexes.clone() {
753                iota_core::verify_indexes::verify_indexes(
754                    state.get_accumulator_store().as_ref(),
755                    indexes,
756                )
757                .expect("secondary indexes are inconsistent");
758            }
759        }
760
761        let (end_of_epoch_channel, end_of_epoch_receiver) =
762            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
763
764        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
765            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
766                auth_agg.load_full(),
767                state.clone(),
768                end_of_epoch_receiver,
769                &config.db_path(),
770                &prometheus_registry,
771            )))
772        } else {
773            None
774        };
775
776        let http_server = build_http_server(
777            state.clone(),
778            state_sync_store,
779            &transaction_orchestrator.clone(),
780            &config,
781            &prometheus_registry,
782            custom_rpc_runtime,
783            software_version,
784        )
785        .await?;
786
787        let accumulator = Arc::new(StateAccumulator::new(
788            cache_traits.accumulator_store.clone(),
789            StateAccumulatorMetrics::new(&prometheus_registry),
790        ));
791
792        let authority_names_to_peer_ids = epoch_store
793            .epoch_start_state()
794            .get_authority_names_to_peer_ids();
795
796        let network_connection_metrics =
797            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
798
799        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
800
801        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
802            p2p_network.downgrade(),
803            network_connection_metrics,
804            HashMap::new(),
805            None,
806        );
807
808        let connection_monitor_status = ConnectionMonitorStatus {
809            connection_statuses,
810            authority_names_to_peer_ids,
811        };
812
813        let connection_monitor_status = Arc::new(connection_monitor_status);
814        let iota_node_metrics =
815            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
816
817        let validator_components = if state.is_validator(&epoch_store) {
818            let components = Self::construct_validator_components(
819                config.clone(),
820                state.clone(),
821                committee,
822                epoch_store.clone(),
823                checkpoint_store.clone(),
824                state_sync_handle.clone(),
825                randomness_handle.clone(),
826                Arc::downgrade(&accumulator),
827                connection_monitor_status.clone(),
828                &registry_service,
829                iota_node_metrics.clone(),
830            )
831            .await?;
832            // This is only needed during cold start.
833            components.consensus_adapter.submit_recovered(&epoch_store);
834
835            Some(components)
836        } else {
837            None
838        };
839
840        // setup shutdown channel
841        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
842
843        let node = Self {
844            config,
845            validator_components: Mutex::new(validator_components),
846            _http_server: http_server,
847            state,
848            transaction_orchestrator,
849            registry_service,
850            metrics: iota_node_metrics,
851
852            _discovery: discovery_handle,
853            state_sync_handle,
854            randomness_handle,
855            checkpoint_store,
856            accumulator: Mutex::new(Some(accumulator)),
857            end_of_epoch_channel,
858            connection_monitor_status,
859            trusted_peer_change_tx,
860
861            _db_checkpoint_handle: db_checkpoint_handle,
862
863            #[cfg(msim)]
864            sim_state: Default::default(),
865
866            _state_archive_handle: state_archive_handle,
867            _state_snapshot_uploader_handle: state_snapshot_handle,
868            shutdown_channel_tx: shutdown_channel,
869
870            auth_agg,
871        };
872
873        info!("IotaNode started!");
874        let node = Arc::new(node);
875        let node_copy = node.clone();
876        spawn_monitored_task!(async move {
877            let result = Self::monitor_reconfiguration(node_copy).await;
878            if let Err(error) = result {
879                warn!("Reconfiguration finished with error {:?}", error);
880            }
881        });
882
883        Ok(node)
884    }
885
886    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
887        self.end_of_epoch_channel.subscribe()
888    }
889
890    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
891        self.shutdown_channel_tx.subscribe()
892    }
893
894    pub fn current_epoch_for_testing(&self) -> EpochId {
895        self.state.current_epoch_for_testing()
896    }
897
898    pub fn db_checkpoint_path(&self) -> PathBuf {
899        self.config.db_checkpoint_path()
900    }
901
902    // Init reconfig process by starting to reject user certs
903    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
904        info!("close_epoch (current epoch = {})", epoch_store.epoch());
905        self.validator_components
906            .lock()
907            .await
908            .as_ref()
909            .ok_or_else(|| IotaError::from("Node is not a validator"))?
910            .consensus_adapter
911            .close_epoch(epoch_store);
912        Ok(())
913    }
914
915    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
916        self.state
917            .clear_override_protocol_upgrade_buffer_stake(epoch)
918    }
919
920    pub fn set_override_protocol_upgrade_buffer_stake(
921        &self,
922        epoch: EpochId,
923        buffer_stake_bps: u64,
924    ) -> IotaResult {
925        self.state
926            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
927    }
928
929    // Testing-only API to start epoch close process.
930    // For production code, please use the non-testing version.
931    pub async fn close_epoch_for_testing(&self) -> IotaResult {
932        let epoch_store = self.state.epoch_store_for_testing();
933        self.close_epoch(&epoch_store).await
934    }
935
936    async fn start_state_archival(
937        config: &NodeConfig,
938        prometheus_registry: &Registry,
939        state_sync_store: RocksDbStore,
940    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
941        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
942            let local_store_config = ObjectStoreConfig {
943                object_store: Some(ObjectStoreType::File),
944                directory: Some(config.archive_path()),
945                ..Default::default()
946            };
947            let archive_writer = ArchiveWriter::new(
948                local_store_config,
949                remote_store_config.clone(),
950                FileCompression::Zstd,
951                StorageFormat::Blob,
952                Duration::from_secs(600),
953                256 * 1024 * 1024,
954                prometheus_registry,
955            )
956            .await?;
957            Ok(Some(archive_writer.start(state_sync_store).await?))
958        } else {
959            Ok(None)
960        }
961    }
962
963    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
964    /// is set.
965    fn start_state_snapshot(
966        config: &NodeConfig,
967        prometheus_registry: &Registry,
968        checkpoint_store: Arc<CheckpointStore>,
969    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
970        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
971            let snapshot_uploader = StateSnapshotUploader::new(
972                &config.db_checkpoint_path(),
973                &config.snapshot_path(),
974                remote_store_config.clone(),
975                60,
976                prometheus_registry,
977                checkpoint_store,
978            )?;
979            Ok(Some(snapshot_uploader.start()))
980        } else {
981            Ok(None)
982        }
983    }
984
985    fn start_db_checkpoint(
986        config: &NodeConfig,
987        prometheus_registry: &Registry,
988        state_snapshot_enabled: bool,
989    ) -> Result<(
990        DBCheckpointConfig,
991        Option<tokio::sync::broadcast::Sender<()>>,
992    )> {
993        let checkpoint_path = Some(
994            config
995                .db_checkpoint_config
996                .checkpoint_path
997                .clone()
998                .unwrap_or_else(|| config.db_checkpoint_path()),
999        );
1000        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1001            DBCheckpointConfig {
1002                checkpoint_path,
1003                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1004                    true
1005                } else {
1006                    config
1007                        .db_checkpoint_config
1008                        .perform_db_checkpoints_at_epoch_end
1009                },
1010                ..config.db_checkpoint_config.clone()
1011            }
1012        } else {
1013            config.db_checkpoint_config.clone()
1014        };
1015
1016        match (
1017            db_checkpoint_config.object_store_config.as_ref(),
1018            state_snapshot_enabled,
1019        ) {
1020            // If db checkpoint config object store not specified but
1021            // state snapshot object store is specified, create handler
1022            // anyway for marking db checkpoints as completed so that they
1023            // can be uploaded as state snapshots.
1024            (None, false) => Ok((db_checkpoint_config, None)),
1025            (_, _) => {
1026                let handler = DBCheckpointHandler::new(
1027                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1028                    db_checkpoint_config.object_store_config.as_ref(),
1029                    60,
1030                    db_checkpoint_config
1031                        .prune_and_compact_before_upload
1032                        .unwrap_or(true),
1033                    config.indirect_objects_threshold,
1034                    config.authority_store_pruning_config.clone(),
1035                    prometheus_registry,
1036                    state_snapshot_enabled,
1037                )?;
1038                Ok((
1039                    db_checkpoint_config,
1040                    Some(DBCheckpointHandler::start(handler)),
1041                ))
1042            }
1043        }
1044    }
1045
1046    fn create_p2p_network(
1047        config: &NodeConfig,
1048        state_sync_store: RocksDbStore,
1049        chain_identifier: ChainIdentifier,
1050        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1051        archive_readers: ArchiveReaderBalancer,
1052        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1053        prometheus_registry: &Registry,
1054    ) -> Result<(
1055        Network,
1056        discovery::Handle,
1057        state_sync::Handle,
1058        randomness::Handle,
1059    )> {
1060        let (state_sync, state_sync_server) = state_sync::Builder::new()
1061            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1062            .store(state_sync_store)
1063            .archive_readers(archive_readers)
1064            .with_metrics(prometheus_registry)
1065            .build();
1066
1067        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1068            .config(config.p2p_config.clone())
1069            .build();
1070
1071        let (randomness, randomness_router) =
1072            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1073                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1074                .with_metrics(prometheus_registry)
1075                .build();
1076
1077        let p2p_network = {
1078            let routes = anemo::Router::new()
1079                .add_rpc_service(discovery_server)
1080                .add_rpc_service(state_sync_server);
1081            let routes = routes.merge(randomness_router);
1082
1083            let inbound_network_metrics =
1084                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1085            let outbound_network_metrics =
1086                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1087
1088            let service = ServiceBuilder::new()
1089                .layer(
1090                    TraceLayer::new_for_server_errors()
1091                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1092                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1093                )
1094                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1095                    Arc::new(inbound_network_metrics),
1096                    config.p2p_config.excessive_message_size(),
1097                )))
1098                .service(routes);
1099
1100            let outbound_layer = ServiceBuilder::new()
1101                .layer(
1102                    TraceLayer::new_for_client_and_server_errors()
1103                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1104                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1105                )
1106                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1107                    Arc::new(outbound_network_metrics),
1108                    config.p2p_config.excessive_message_size(),
1109                )))
1110                .into_inner();
1111
1112            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1113            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1114            // many staking events in the epoch change txn.
1115            anemo_config.max_frame_size = Some(1 << 30);
1116
1117            // Set a higher default value for socket send/receive buffers if not already
1118            // configured.
1119            let mut quic_config = anemo_config.quic.unwrap_or_default();
1120            if quic_config.socket_send_buffer_size.is_none() {
1121                quic_config.socket_send_buffer_size = Some(20 << 20);
1122            }
1123            if quic_config.socket_receive_buffer_size.is_none() {
1124                quic_config.socket_receive_buffer_size = Some(20 << 20);
1125            }
1126            quic_config.allow_failed_socket_buffer_size_setting = true;
1127
1128            // Set high-performance defaults for quinn transport.
1129            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1130            if quic_config.max_concurrent_bidi_streams.is_none() {
1131                quic_config.max_concurrent_bidi_streams = Some(500);
1132            }
1133            if quic_config.max_concurrent_uni_streams.is_none() {
1134                quic_config.max_concurrent_uni_streams = Some(500);
1135            }
1136            if quic_config.stream_receive_window.is_none() {
1137                quic_config.stream_receive_window = Some(100 << 20);
1138            }
1139            if quic_config.receive_window.is_none() {
1140                quic_config.receive_window = Some(200 << 20);
1141            }
1142            if quic_config.send_window.is_none() {
1143                quic_config.send_window = Some(200 << 20);
1144            }
1145            if quic_config.crypto_buffer_size.is_none() {
1146                quic_config.crypto_buffer_size = Some(1 << 20);
1147            }
1148            if quic_config.max_idle_timeout_ms.is_none() {
1149                quic_config.max_idle_timeout_ms = Some(30_000);
1150            }
1151            if quic_config.keep_alive_interval_ms.is_none() {
1152                quic_config.keep_alive_interval_ms = Some(5_000);
1153            }
1154            anemo_config.quic = Some(quic_config);
1155
1156            let server_name = format!("iota-{}", chain_identifier);
1157            let network = Network::bind(config.p2p_config.listen_address)
1158                .server_name(&server_name)
1159                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1160                .config(anemo_config)
1161                .outbound_request_layer(outbound_layer)
1162                .start(service)?;
1163            info!(
1164                server_name = server_name,
1165                "P2p network started on {}",
1166                network.local_addr()
1167            );
1168
1169            network
1170        };
1171
1172        let discovery_handle = discovery.start(p2p_network.clone());
1173        let state_sync_handle = state_sync.start(p2p_network.clone());
1174        let randomness_handle = randomness.start(p2p_network.clone());
1175
1176        Ok((
1177            p2p_network,
1178            discovery_handle,
1179            state_sync_handle,
1180            randomness_handle,
1181        ))
1182    }
1183
1184    /// Asynchronously constructs and initializes the components necessary for
1185    /// the validator node.
1186    async fn construct_validator_components(
1187        config: NodeConfig,
1188        state: Arc<AuthorityState>,
1189        committee: Arc<Committee>,
1190        epoch_store: Arc<AuthorityPerEpochStore>,
1191        checkpoint_store: Arc<CheckpointStore>,
1192        state_sync_handle: state_sync::Handle,
1193        randomness_handle: randomness::Handle,
1194        accumulator: Weak<StateAccumulator>,
1195        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1196        registry_service: &RegistryService,
1197        iota_node_metrics: Arc<IotaNodeMetrics>,
1198    ) -> Result<ValidatorComponents> {
1199        let mut config_clone = config.clone();
1200        let consensus_config = config_clone
1201            .consensus_config
1202            .as_mut()
1203            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1204        let validator_registry = Registry::new();
1205        let validator_registry_id = registry_service.add(validator_registry.clone());
1206
1207        let client = Arc::new(UpdatableConsensusClient::new());
1208        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1209            &committee,
1210            consensus_config,
1211            state.name,
1212            connection_monitor_status.clone(),
1213            &validator_registry,
1214            client.clone(),
1215        ));
1216        let consensus_manager = ConsensusManager::new(
1217            &config,
1218            consensus_config,
1219            registry_service,
1220            &validator_registry,
1221            client,
1222        );
1223
1224        // This only gets started up once, not on every epoch. (Make call to remove
1225        // every epoch.)
1226        let consensus_store_pruner = ConsensusStorePruner::new(
1227            consensus_manager.get_storage_base_path(),
1228            consensus_config.db_retention_epochs(),
1229            consensus_config.db_pruner_period(),
1230            &validator_registry,
1231        );
1232
1233        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1234        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1235
1236        let (validator_server_handle, validator_server_cancel_handle) =
1237            Self::start_grpc_validator_service(
1238                &config,
1239                state.clone(),
1240                consensus_adapter.clone(),
1241                &validator_registry,
1242            )
1243            .await?;
1244
1245        // Starts an overload monitor that monitors the execution of the authority.
1246        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1247        let validator_overload_monitor_handle = if config
1248            .authority_overload_config
1249            .max_load_shedding_percentage
1250            > 0
1251        {
1252            let authority_state = Arc::downgrade(&state);
1253            let overload_config = config.authority_overload_config.clone();
1254            fail_point!("starting_overload_monitor");
1255            Some(spawn_monitored_task!(overload_monitor(
1256                authority_state,
1257                overload_config,
1258            )))
1259        } else {
1260            None
1261        };
1262
1263        Self::start_epoch_specific_validator_components(
1264            &config,
1265            state.clone(),
1266            consensus_adapter,
1267            checkpoint_store,
1268            epoch_store,
1269            state_sync_handle,
1270            randomness_handle,
1271            consensus_manager,
1272            consensus_store_pruner,
1273            accumulator,
1274            validator_server_handle,
1275            validator_server_cancel_handle,
1276            validator_overload_monitor_handle,
1277            checkpoint_metrics,
1278            iota_node_metrics,
1279            iota_tx_validator_metrics,
1280            validator_registry_id,
1281        )
1282        .await
1283    }
1284
1285    /// Initializes and starts components specific to the current
1286    /// epoch for the validator node.
1287    async fn start_epoch_specific_validator_components(
1288        config: &NodeConfig,
1289        state: Arc<AuthorityState>,
1290        consensus_adapter: Arc<ConsensusAdapter>,
1291        checkpoint_store: Arc<CheckpointStore>,
1292        epoch_store: Arc<AuthorityPerEpochStore>,
1293        state_sync_handle: state_sync::Handle,
1294        randomness_handle: randomness::Handle,
1295        consensus_manager: ConsensusManager,
1296        consensus_store_pruner: ConsensusStorePruner,
1297        accumulator: Weak<StateAccumulator>,
1298        validator_server_handle: JoinHandle<Result<()>>,
1299        validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
1300        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1301        checkpoint_metrics: Arc<CheckpointMetrics>,
1302        iota_node_metrics: Arc<IotaNodeMetrics>,
1303        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1304        validator_registry_id: RegistryID,
1305    ) -> Result<ValidatorComponents> {
1306        let checkpoint_service = Self::start_checkpoint_service(
1307            config,
1308            consensus_adapter.clone(),
1309            checkpoint_store,
1310            epoch_store.clone(),
1311            state.clone(),
1312            state_sync_handle,
1313            accumulator,
1314            checkpoint_metrics.clone(),
1315        );
1316
1317        // create a new map that gets injected into both the consensus handler and the
1318        // consensus adapter the consensus handler will write values forwarded
1319        // from consensus, and the consensus adapter will read the values to
1320        // make decisions about which validator submits a transaction to consensus
1321        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1322
1323        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1324
1325        let randomness_manager = RandomnessManager::try_new(
1326            Arc::downgrade(&epoch_store),
1327            Box::new(consensus_adapter.clone()),
1328            randomness_handle,
1329            config.authority_key_pair(),
1330        )
1331        .await;
1332        if let Some(randomness_manager) = randomness_manager {
1333            epoch_store
1334                .set_randomness_manager(randomness_manager)
1335                .await?;
1336        }
1337
1338        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1339            state.clone(),
1340            checkpoint_service.clone(),
1341            epoch_store.clone(),
1342            low_scoring_authorities,
1343        );
1344
1345        consensus_manager
1346            .start(
1347                config,
1348                epoch_store.clone(),
1349                consensus_handler_initializer,
1350                IotaTxValidator::new(
1351                    epoch_store.clone(),
1352                    checkpoint_service.clone(),
1353                    state.transaction_manager().clone(),
1354                    iota_tx_validator_metrics.clone(),
1355                ),
1356            )
1357            .await;
1358
1359        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1360
1361        if epoch_store.authenticator_state_enabled() {
1362            Self::start_jwk_updater(
1363                config,
1364                iota_node_metrics,
1365                state.name,
1366                epoch_store.clone(),
1367                consensus_adapter.clone(),
1368            );
1369        }
1370
1371        Ok(ValidatorComponents {
1372            validator_server_handle,
1373            validator_server_cancel_handle,
1374            validator_overload_monitor_handle,
1375            consensus_manager,
1376            consensus_store_pruner,
1377            consensus_adapter,
1378            checkpoint_service_tasks,
1379            checkpoint_metrics,
1380            iota_tx_validator_metrics,
1381            validator_registry_id,
1382        })
1383    }
1384
1385    /// Starts the checkpoint service for the validator node, initializing
1386    /// necessary components and settings.
1387    /// The function ensures proper initialization of the checkpoint service,
1388    /// preparing it to handle checkpoint creation and submission to consensus,
1389    /// while also setting up the necessary monitoring and synchronization
1390    /// mechanisms.
1391    fn start_checkpoint_service(
1392        config: &NodeConfig,
1393        consensus_adapter: Arc<ConsensusAdapter>,
1394        checkpoint_store: Arc<CheckpointStore>,
1395        epoch_store: Arc<AuthorityPerEpochStore>,
1396        state: Arc<AuthorityState>,
1397        state_sync_handle: state_sync::Handle,
1398        accumulator: Weak<StateAccumulator>,
1399        checkpoint_metrics: Arc<CheckpointMetrics>,
1400    ) -> Arc<CheckpointService> {
1401        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1402        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1403
1404        debug!(
1405            "Starting checkpoint service with epoch start timestamp {}
1406            and epoch duration {}",
1407            epoch_start_timestamp_ms, epoch_duration_ms
1408        );
1409
1410        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1411            sender: consensus_adapter,
1412            signer: state.secret.clone(),
1413            authority: config.authority_public_key(),
1414            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1415                .checked_add(epoch_duration_ms)
1416                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1417            metrics: checkpoint_metrics.clone(),
1418        });
1419
1420        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1421        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1422        let max_checkpoint_size_bytes =
1423            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1424
1425        CheckpointService::build(
1426            state.clone(),
1427            checkpoint_store,
1428            epoch_store,
1429            state.get_transaction_cache_reader().clone(),
1430            accumulator,
1431            checkpoint_output,
1432            Box::new(certified_checkpoint_output),
1433            checkpoint_metrics,
1434            max_tx_per_checkpoint,
1435            max_checkpoint_size_bytes,
1436        )
1437    }
1438
1439    fn construct_consensus_adapter(
1440        committee: &Committee,
1441        consensus_config: &ConsensusConfig,
1442        authority: AuthorityName,
1443        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1444        prometheus_registry: &Registry,
1445        consensus_client: Arc<dyn ConsensusClient>,
1446    ) -> ConsensusAdapter {
1447        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1448        // The consensus adapter allows the authority to send user certificates through
1449        // consensus.
1450
1451        ConsensusAdapter::new(
1452            consensus_client,
1453            authority,
1454            connection_monitor_status,
1455            consensus_config.max_pending_transactions(),
1456            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1457            consensus_config.max_submit_position,
1458            consensus_config.submit_delay_step_override(),
1459            ca_metrics,
1460        )
1461    }
1462
1463    async fn start_grpc_validator_service(
1464        config: &NodeConfig,
1465        state: Arc<AuthorityState>,
1466        consensus_adapter: Arc<ConsensusAdapter>,
1467        prometheus_registry: &Registry,
1468    ) -> Result<(
1469        tokio::task::JoinHandle<Result<()>>,
1470        tokio::sync::oneshot::Sender<()>,
1471    )> {
1472        let validator_service = ValidatorService::new(
1473            state.clone(),
1474            consensus_adapter,
1475            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1476            TrafficControllerMetrics::new(prometheus_registry),
1477            config.policy_config.clone(),
1478            config.firewall_config.clone(),
1479        );
1480
1481        let mut server_conf = iota_network_stack::config::Config::new();
1482        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1483        server_conf.load_shed = config.grpc_load_shed;
1484        let mut server_builder =
1485            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1486
1487        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1488
1489        let mut server = server_builder
1490            .bind(config.network_address())
1491            .await
1492            .map_err(|err| anyhow!(err.to_string()))?;
1493        let cancel_handle = server
1494            .take_cancel_handle()
1495            .expect("GRPC server should still have a cancel handle");
1496        let local_addr = server.local_addr();
1497        info!("Listening to traffic on {local_addr}");
1498        let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into));
1499
1500        Ok((grpc_server, cancel_handle))
1501    }
1502
1503    pub fn state(&self) -> Arc<AuthorityState> {
1504        self.state.clone()
1505    }
1506
1507    // Only used for testing because of how epoch store is loaded.
1508    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1509        self.state.reference_gas_price_for_testing()
1510    }
1511
1512    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1513        self.state.committee_store().clone()
1514    }
1515
1516    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1517    // self.state.db()
1518    // }
1519
1520    /// Clone an AuthorityAggregator currently used in this node's
1521    /// QuorumDriver, if the node is a fullnode. After reconfig,
1522    /// QuorumDriver builds a new AuthorityAggregator. The caller
1523    /// of this function will mostly likely want to call this again
1524    /// to get a fresh one.
1525    pub fn clone_authority_aggregator(
1526        &self,
1527    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1528        self.transaction_orchestrator
1529            .as_ref()
1530            .map(|to| to.clone_authority_aggregator())
1531    }
1532
1533    pub fn transaction_orchestrator(
1534        &self,
1535    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1536        self.transaction_orchestrator.clone()
1537    }
1538
1539    pub fn subscribe_to_transaction_orchestrator_effects(
1540        &self,
1541    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1542        self.transaction_orchestrator
1543            .as_ref()
1544            .map(|to| to.subscribe_to_effects_queue())
1545            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1546    }
1547
1548    /// This function awaits the completion of checkpoint execution of the
1549    /// current epoch, after which it initiates reconfiguration of the
1550    /// entire system. This function also handles role changes for the node when
1551    /// epoch changes and advertises capabilities to the committee if the node
1552    /// is a validator.
1553    pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
1554        let checkpoint_executor_metrics =
1555            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1556
1557        loop {
1558            let mut accumulator_guard = self.accumulator.lock().await;
1559            let accumulator = accumulator_guard.take().unwrap();
1560            let mut checkpoint_executor = CheckpointExecutor::new(
1561                self.state_sync_handle.subscribe_to_synced_checkpoints(),
1562                self.checkpoint_store.clone(),
1563                self.state.clone(),
1564                accumulator.clone(),
1565                self.config.checkpoint_executor_config.clone(),
1566                checkpoint_executor_metrics.clone(),
1567            );
1568
1569            let run_with_range = self.config.run_with_range;
1570
1571            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1572
1573            // Advertise capabilities to committee, if we are a validator.
1574            if let Some(components) = &*self.validator_components.lock().await {
1575                // TODO: without this sleep, the consensus message is not delivered reliably.
1576                tokio::time::sleep(Duration::from_millis(1)).await;
1577
1578                let config = cur_epoch_store.protocol_config();
1579                let binary_config = to_binary_config(config);
1580                let transaction = ConsensusTransaction::new_capability_notification_v1(
1581                    AuthorityCapabilitiesV1::new(
1582                        self.state.name,
1583                        cur_epoch_store.get_chain_identifier().chain(),
1584                        self.config
1585                            .supported_protocol_versions
1586                            .expect("Supported versions should be populated")
1587                            // no need to send digests of versions less than the current version
1588                            .truncate_below(config.version),
1589                        self.state
1590                            .get_available_system_packages(&binary_config)
1591                            .await,
1592                    ),
1593                );
1594                info!(?transaction, "submitting capabilities to consensus");
1595                components
1596                    .consensus_adapter
1597                    .submit(transaction, None, &cur_epoch_store)?;
1598            }
1599
1600            let stop_condition = checkpoint_executor
1601                .run_epoch(cur_epoch_store.clone(), run_with_range)
1602                .await;
1603            drop(checkpoint_executor);
1604
1605            if stop_condition == StopReason::RunWithRangeCondition {
1606                IotaNode::shutdown(&self).await;
1607                self.shutdown_channel_tx
1608                    .send(run_with_range)
1609                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1610                return Ok(());
1611            }
1612
1613            // Safe to call because we are in the middle of reconfiguration.
1614            let latest_system_state = self
1615                .state
1616                .get_object_cache_reader()
1617                .get_iota_system_state_object_unsafe()
1618                .expect("Read IOTA System State object cannot fail");
1619
1620            #[cfg(msim)]
1621            if !self
1622                .sim_state
1623                .sim_safe_mode_expected
1624                .load(Ordering::Relaxed)
1625            {
1626                debug_assert!(!latest_system_state.safe_mode());
1627            }
1628
1629            #[cfg(not(msim))]
1630            debug_assert!(!latest_system_state.safe_mode());
1631
1632            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1633                if self.state.is_fullnode(&cur_epoch_store) {
1634                    warn!(
1635                        "Failed to send end of epoch notification to subscriber: {:?}",
1636                        err
1637                    );
1638                }
1639            }
1640
1641            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1642            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1643
1644            self.auth_agg.store(Arc::new(
1645                self.auth_agg
1646                    .load()
1647                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1648            ));
1649
1650            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1651            let next_epoch = next_epoch_committee.epoch();
1652            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1653
1654            info!(
1655                next_epoch,
1656                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1657            );
1658
1659            fail_point_async!("reconfig_delay");
1660
1661            // We save the connection monitor status map regardless of validator / fullnode
1662            // status so that we don't need to restart the connection monitor
1663            // every epoch. Update the mappings that will be used by the
1664            // consensus adapter if it exists or is about to be created.
1665            let authority_names_to_peer_ids =
1666                new_epoch_start_state.get_authority_names_to_peer_ids();
1667            self.connection_monitor_status
1668                .update_mapping_for_epoch(authority_names_to_peer_ids);
1669
1670            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1671
1672            send_trusted_peer_change(
1673                &self.config,
1674                &self.trusted_peer_change_tx,
1675                &new_epoch_start_state,
1676            );
1677
1678            // The following code handles 4 different cases, depending on whether the node
1679            // was a validator in the previous epoch, and whether the node is a validator
1680            // in the new epoch.
1681
1682            let new_validator_components = if let Some(ValidatorComponents {
1683                validator_server_handle,
1684                validator_server_cancel_handle,
1685                validator_overload_monitor_handle,
1686                consensus_manager,
1687                consensus_store_pruner,
1688                consensus_adapter,
1689                mut checkpoint_service_tasks,
1690                checkpoint_metrics,
1691                iota_tx_validator_metrics,
1692                validator_registry_id,
1693            }) = self.validator_components.lock().await.take()
1694            {
1695                info!("Reconfiguring the validator.");
1696                // Cancel the old checkpoint service tasks.
1697                // Waiting for checkpoint builder to finish gracefully is not possible, because
1698                // it may wait on transactions while consensus on peers have
1699                // already shut down.
1700                checkpoint_service_tasks.abort_all();
1701                while let Some(result) = checkpoint_service_tasks.join_next().await {
1702                    if let Err(err) = result {
1703                        if err.is_panic() {
1704                            std::panic::resume_unwind(err.into_panic());
1705                        }
1706                        warn!("Error in checkpoint service task: {:?}", err);
1707                    }
1708                }
1709                info!("Checkpoint service has shut down.");
1710
1711                consensus_manager.shutdown().await;
1712                info!("Consensus has shut down.");
1713
1714                let new_epoch_store = self
1715                    .reconfigure_state(
1716                        &self.state,
1717                        &cur_epoch_store,
1718                        next_epoch_committee.clone(),
1719                        new_epoch_start_state,
1720                        accumulator.clone(),
1721                    )
1722                    .await?;
1723                info!("Epoch store finished reconfiguration.");
1724
1725                // No other components should be holding a strong reference to state accumulator
1726                // at this point. Confirm here before we swap in the new accumulator.
1727                let accumulator_metrics = Arc::into_inner(accumulator)
1728                    .expect("Accumulator should have no other references at this point")
1729                    .metrics();
1730                let new_accumulator = Arc::new(StateAccumulator::new(
1731                    self.state.get_accumulator_store().clone(),
1732                    accumulator_metrics,
1733                ));
1734                let weak_accumulator = Arc::downgrade(&new_accumulator);
1735                *accumulator_guard = Some(new_accumulator);
1736
1737                consensus_store_pruner.prune(next_epoch).await;
1738
1739                if self.state.is_validator(&new_epoch_store) {
1740                    // Only restart consensus if this node is still a validator in the new epoch.
1741                    Some(
1742                        Self::start_epoch_specific_validator_components(
1743                            &self.config,
1744                            self.state.clone(),
1745                            consensus_adapter,
1746                            self.checkpoint_store.clone(),
1747                            new_epoch_store.clone(),
1748                            self.state_sync_handle.clone(),
1749                            self.randomness_handle.clone(),
1750                            consensus_manager,
1751                            consensus_store_pruner,
1752                            weak_accumulator,
1753                            validator_server_handle,
1754                            validator_server_cancel_handle,
1755                            validator_overload_monitor_handle,
1756                            checkpoint_metrics,
1757                            self.metrics.clone(),
1758                            iota_tx_validator_metrics,
1759                            validator_registry_id,
1760                        )
1761                        .await?,
1762                    )
1763                } else {
1764                    info!("This node is no longer a validator after reconfiguration");
1765                    if self.registry_service.remove(validator_registry_id) {
1766                        debug!("Removed validator metrics registry");
1767                    } else {
1768                        warn!("Failed to remove validator metrics registry");
1769                    }
1770                    if validator_server_cancel_handle.send(()).is_ok() {
1771                        debug!("Validator grpc server cancelled");
1772                    } else {
1773                        warn!("Failed to cancel validator grpc server");
1774                    }
1775
1776                    None
1777                }
1778            } else {
1779                let new_epoch_store = self
1780                    .reconfigure_state(
1781                        &self.state,
1782                        &cur_epoch_store,
1783                        next_epoch_committee.clone(),
1784                        new_epoch_start_state,
1785                        accumulator.clone(),
1786                    )
1787                    .await?;
1788
1789                // No other components should be holding a strong reference to state accumulator
1790                // at this point. Confirm here before we swap in the new accumulator.
1791                let accumulator_metrics = Arc::into_inner(accumulator)
1792                    .expect("Accumulator should have no other references at this point")
1793                    .metrics();
1794                let new_accumulator = Arc::new(StateAccumulator::new(
1795                    self.state.get_accumulator_store().clone(),
1796                    accumulator_metrics,
1797                ));
1798                let weak_accumulator = Arc::downgrade(&new_accumulator);
1799                *accumulator_guard = Some(new_accumulator);
1800
1801                if self.state.is_validator(&new_epoch_store) {
1802                    info!("Promoting the node from fullnode to validator, starting grpc server");
1803
1804                    Some(
1805                        Self::construct_validator_components(
1806                            self.config.clone(),
1807                            self.state.clone(),
1808                            Arc::new(next_epoch_committee.clone()),
1809                            new_epoch_store.clone(),
1810                            self.checkpoint_store.clone(),
1811                            self.state_sync_handle.clone(),
1812                            self.randomness_handle.clone(),
1813                            weak_accumulator,
1814                            self.connection_monitor_status.clone(),
1815                            &self.registry_service,
1816                            self.metrics.clone(),
1817                        )
1818                        .await?,
1819                    )
1820                } else {
1821                    None
1822                }
1823            };
1824            *self.validator_components.lock().await = new_validator_components;
1825
1826            // Force releasing current epoch store DB handle, because the
1827            // Arc<AuthorityPerEpochStore> may linger.
1828            cur_epoch_store.release_db_handles();
1829
1830            if cfg!(msim)
1831                && !matches!(
1832                    self.config
1833                        .authority_store_pruning_config
1834                        .num_epochs_to_retain_for_checkpoints(),
1835                    None | Some(u64::MAX) | Some(0)
1836                )
1837            {
1838                self.state
1839                .prune_checkpoints_for_eligible_epochs_for_testing(
1840                    self.config.clone(),
1841                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1842                )
1843                .await?;
1844            }
1845
1846            info!("Reconfiguration finished");
1847        }
1848    }
1849
1850    async fn shutdown(&self) {
1851        if let Some(validator_components) = &*self.validator_components.lock().await {
1852            validator_components.consensus_manager.shutdown().await;
1853        }
1854    }
1855
1856    /// Asynchronously reconfigures the state of the authority node for the next
1857    /// epoch.
1858    async fn reconfigure_state(
1859        &self,
1860        state: &Arc<AuthorityState>,
1861        cur_epoch_store: &AuthorityPerEpochStore,
1862        next_epoch_committee: Committee,
1863        next_epoch_start_system_state: EpochStartSystemState,
1864        accumulator: Arc<StateAccumulator>,
1865    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1866        let next_epoch = next_epoch_committee.epoch();
1867
1868        let last_checkpoint = self
1869            .checkpoint_store
1870            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1871            .expect("Error loading last checkpoint for current epoch")
1872            .expect("Could not load last checkpoint for current epoch");
1873        let epoch_supply_change = last_checkpoint
1874            .end_of_epoch_data
1875            .as_ref()
1876            .ok_or_else(|| {
1877                IotaError::from("last checkpoint in epoch should contain end of epoch data")
1878            })?
1879            .epoch_supply_change;
1880
1881        let epoch_start_configuration = EpochStartConfiguration::new(
1882            next_epoch_start_system_state,
1883            *last_checkpoint.digest(),
1884            state.get_object_store().as_ref(),
1885            EpochFlag::default_flags_for_new_epoch(&state.config),
1886        )
1887        .expect("EpochStartConfiguration construction cannot fail");
1888
1889        let new_epoch_store = self
1890            .state
1891            .reconfigure(
1892                cur_epoch_store,
1893                self.config.supported_protocol_versions.unwrap(),
1894                next_epoch_committee,
1895                epoch_start_configuration,
1896                accumulator,
1897                &self.config.expensive_safety_check_config,
1898                epoch_supply_change,
1899            )
1900            .await
1901            .expect("Reconfigure authority state cannot fail");
1902        info!(next_epoch, "Node State has been reconfigured");
1903        assert_eq!(next_epoch, new_epoch_store.epoch());
1904        self.state.get_reconfig_api().update_epoch_flags_metrics(
1905            cur_epoch_store.epoch_start_config().flags(),
1906            new_epoch_store.epoch_start_config().flags(),
1907        );
1908
1909        Ok(new_epoch_store)
1910    }
1911
1912    pub fn get_config(&self) -> &NodeConfig {
1913        &self.config
1914    }
1915
1916    async fn execute_transaction_immediately_at_zero_epoch(
1917        state: &Arc<AuthorityState>,
1918        epoch_store: &Arc<AuthorityPerEpochStore>,
1919        tx: &Transaction,
1920        span: tracing::Span,
1921    ) {
1922        let transaction =
1923            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
1924                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
1925                    tx.data().clone(),
1926                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
1927                ),
1928            );
1929        state
1930            .try_execute_immediately(&transaction, None, epoch_store)
1931            .instrument(span)
1932            .await
1933            .unwrap();
1934    }
1935
1936    pub fn randomness_handle(&self) -> randomness::Handle {
1937        self.randomness_handle.clone()
1938    }
1939}
1940
1941#[cfg(not(msim))]
1942impl IotaNode {
1943    async fn fetch_jwks(
1944        _authority: AuthorityName,
1945        provider: &OIDCProvider,
1946    ) -> IotaResult<Vec<(JwkId, JWK)>> {
1947        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
1948        let client = reqwest::Client::new();
1949        fetch_jwks(provider, &client)
1950            .await
1951            .map_err(|_| IotaError::JWKRetrieval)
1952    }
1953}
1954
1955#[cfg(msim)]
1956impl IotaNode {
1957    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
1958        self.sim_state.sim_node.id()
1959    }
1960
1961    pub fn set_safe_mode_expected(&self, new_value: bool) {
1962        info!("Setting safe mode expected to {}", new_value);
1963        self.sim_state
1964            .sim_safe_mode_expected
1965            .store(new_value, Ordering::Relaxed);
1966    }
1967
1968    async fn fetch_jwks(
1969        authority: AuthorityName,
1970        provider: &OIDCProvider,
1971    ) -> IotaResult<Vec<(JwkId, JWK)>> {
1972        get_jwk_injector()(authority, provider)
1973    }
1974}
1975
1976/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
1977/// available.
1978fn send_trusted_peer_change(
1979    config: &NodeConfig,
1980    sender: &watch::Sender<TrustedPeerChangeEvent>,
1981    new_epoch_start_state: &EpochStartSystemState,
1982) {
1983    let new_committee =
1984        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
1985
1986    sender.send_modify(|event| {
1987        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
1988        event.new_committee = new_committee;
1989    })
1990}
1991
1992fn build_kv_store(
1993    state: &Arc<AuthorityState>,
1994    config: &NodeConfig,
1995    registry: &Registry,
1996) -> Result<Arc<TransactionKeyValueStore>> {
1997    let metrics = KeyValueStoreMetrics::new(registry);
1998    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
1999
2000    let base_url = &config.transaction_kv_store_read_config.base_url;
2001
2002    if base_url.is_empty() {
2003        info!("no http kv store url provided, using local db only");
2004        return Ok(Arc::new(db_store));
2005    }
2006
2007    base_url.parse::<url::Url>().tap_err(|e| {
2008        error!(
2009            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2010            base_url, e
2011        )
2012    })?;
2013
2014    let http_store = HttpKVStore::new_kv(
2015        base_url,
2016        config.transaction_kv_store_read_config.cache_size,
2017        metrics.clone(),
2018    )?;
2019    info!("using local key-value store with fallback to http key-value store");
2020    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2021        db_store,
2022        http_store,
2023        metrics,
2024        "json_rpc_fallback",
2025    )))
2026}
2027
2028/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2029/// REST APIs based on the node's configuration.
2030///
2031/// This function performs the following tasks:
2032/// 1. Checks if the node is a validator by inspecting the consensus
2033///    configuration; if so, it returns early as validators do not expose these
2034///    APIs.
2035/// 2. Creates an Axum router to handle HTTP requests.
2036/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2037///    on the node's state and configuration, including CoinApi,
2038///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2039///    IndexerApi.
2040/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2041///    the `/api/v1` path.
2042/// 5. Binds the server to the specified JSON-RPC address and starts listening
2043///    for incoming connections.
2044pub async fn build_http_server(
2045    state: Arc<AuthorityState>,
2046    store: RocksDbStore,
2047    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2048    config: &NodeConfig,
2049    prometheus_registry: &Registry,
2050    _custom_runtime: Option<Handle>,
2051    software_version: &'static str,
2052) -> Result<Option<tokio::task::JoinHandle<()>>> {
2053    // Validators do not expose these APIs
2054    if config.consensus_config().is_some() {
2055        return Ok(None);
2056    }
2057
2058    let mut router = axum::Router::new();
2059
2060    let json_rpc_router = {
2061        let mut server = JsonRpcServerBuilder::new(
2062            env!("CARGO_PKG_VERSION"),
2063            prometheus_registry,
2064            config.policy_config.clone(),
2065            config.firewall_config.clone(),
2066        );
2067
2068        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2069
2070        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2071        server.register_module(ReadApi::new(
2072            state.clone(),
2073            kv_store.clone(),
2074            metrics.clone(),
2075        ))?;
2076        server.register_module(CoinReadApi::new(
2077            state.clone(),
2078            kv_store.clone(),
2079            metrics.clone(),
2080        )?)?;
2081
2082        // if run_with_range is enabled we want to prevent any transactions
2083        // run_with_range = None is normal operating conditions
2084        if config.run_with_range.is_none() {
2085            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2086        }
2087        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2088        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2089
2090        if let Some(transaction_orchestrator) = transaction_orchestrator {
2091            server.register_module(TransactionExecutionApi::new(
2092                state.clone(),
2093                transaction_orchestrator.clone(),
2094                metrics.clone(),
2095            ))?;
2096        }
2097
2098        // TODO: Init from chain if config is not set once `IotaNamesConfig::from_chain`
2099        // is implemented
2100        let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2101
2102        server.register_module(IndexerApi::new(
2103            state.clone(),
2104            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2105            kv_store,
2106            metrics,
2107            iota_names_config,
2108            config.indexer_max_subscriptions,
2109        ))?;
2110        server.register_module(MoveUtils::new(state.clone()))?;
2111
2112        let server_type = config.jsonrpc_server_type();
2113
2114        server.to_router(server_type).await?
2115    };
2116
2117    router = router.merge(json_rpc_router);
2118
2119    if config.enable_rest_api {
2120        let mut rest_service = iota_rest_api::RestService::new(
2121            Arc::new(RestReadStore::new(state, store)),
2122            software_version,
2123        );
2124
2125        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2126
2127        if let Some(transaction_orchestrator) = transaction_orchestrator {
2128            rest_service.with_executor(transaction_orchestrator.clone())
2129        }
2130
2131        router = router.merge(rest_service.into_router());
2132    }
2133
2134    let listener = tokio::net::TcpListener::bind(&config.json_rpc_address)
2135        .await
2136        .unwrap();
2137    let addr = listener.local_addr().unwrap();
2138
2139    router = router.layer(axum::middleware::from_fn(server_timing_middleware));
2140
2141    let handle = tokio::spawn(async move {
2142        axum::serve(
2143            listener,
2144            router.into_make_service_with_connect_info::<SocketAddr>(),
2145        )
2146        .await
2147        .unwrap()
2148    });
2149
2150    info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
2151
2152    Ok(Some(handle))
2153}
2154
2155#[cfg(not(test))]
2156fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2157    protocol_config.max_transactions_per_checkpoint() as usize
2158}
2159
2160#[cfg(test)]
2161fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2162    2
2163}