iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::{TryFutureExt, future::BoxFuture};
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30    ConsensusConfig, NodeConfig,
31    node::{DBCheckpointConfig, RunWithRange},
32    node_config_metrics::NodeConfigMetrics,
33    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36    authority::{
37        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38        authority_per_epoch_store::AuthorityPerEpochStore,
39        authority_store_pruner::ObjectsCompactionFilter,
40        authority_store_tables::{
41            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42        },
43        backpressure::BackpressureManager,
44        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45    },
46    authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
47    authority_client::NetworkAuthorityClient,
48    authority_server::{ValidatorService, ValidatorServiceMetrics},
49    checkpoints::{
50        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
51        SubmitCheckpointToConsensus,
52        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
53    },
54    connection_monitor::ConnectionMonitor,
55    consensus_adapter::{
56        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
57        ConsensusClient,
58    },
59    consensus_handler::ConsensusHandlerInitializer,
60    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
61    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
62    db_checkpoint_handler::DBCheckpointHandler,
63    epoch::{
64        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
65        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
66        reconfiguration::ReconfigurationInitiator,
67    },
68    execution_cache::build_execution_cache,
69    jsonrpc_index::IndexStore,
70    module_cache_metrics::ResolverMetrics,
71    overload_monitor::overload_monitor,
72    rest_index::RestIndexStore,
73    safe_client::SafeClientMetricsBase,
74    signature_verifier::SignatureVerifierMetrics,
75    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
76    storage::{RestReadStore, RocksDbStore},
77    traffic_controller::metrics::TrafficControllerMetrics,
78    transaction_orchestrator::TransactionOrchestrator,
79    validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_json_rpc::{
82    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
83    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
84    transaction_builder_api::TransactionBuilderApi,
85    transaction_execution_api::TransactionExecutionApi,
86};
87use iota_json_rpc_api::JsonRpcMetrics;
88use iota_macros::{fail_point, fail_point_async, replay_log};
89use iota_metrics::{
90    RegistryID, RegistryService,
91    hardware_metrics::register_hardware_metrics,
92    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
93    server_timing_middleware, spawn_monitored_task,
94};
95use iota_network::{
96    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
97};
98use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
99use iota_protocol_config::ProtocolConfig;
100use iota_rest_api::RestMetrics;
101use iota_snapshot::uploader::StateSnapshotUploader;
102use iota_storage::{
103    FileCompression, StorageFormat,
104    http_key_value_store::HttpKVStore,
105    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
106    key_value_store_metrics::KeyValueStoreMetrics,
107};
108use iota_types::{
109    base_types::{AuthorityName, ConciseableName, EpochId},
110    committee::Committee,
111    crypto::{KeypairTraits, RandomnessRound},
112    digests::ChainIdentifier,
113    error::{IotaError, IotaResult},
114    executable_transaction::VerifiedExecutableTransaction,
115    execution_config_utils::to_binary_config,
116    iota_system_state::{
117        IotaSystemState, IotaSystemStateTrait,
118        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
119    },
120    messages_consensus::{
121        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
122        check_total_jwk_size,
123    },
124    quorum_driver_types::QuorumDriverEffectsQueueResult,
125    supported_protocol_versions::SupportedProtocolVersions,
126    transaction::{Transaction, VerifiedCertificate},
127};
128use prometheus::Registry;
129#[cfg(msim)]
130pub use simulator::set_jwk_injector;
131#[cfg(msim)]
132use simulator::*;
133use tap::tap::TapFallible;
134use tokio::{
135    runtime::Handle,
136    sync::{Mutex, broadcast, mpsc, watch},
137    task::{JoinHandle, JoinSet},
138};
139use tower::ServiceBuilder;
140use tracing::{Instrument, debug, error, error_span, info, warn};
141use typed_store::{
142    DBMetrics,
143    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
144};
145
146use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
147
148pub mod admin;
149mod handle;
150pub mod metrics;
151
152pub struct ValidatorComponents {
153    validator_server_spawn_handle: SpawnOnce,
154    validator_server_handle: iota_http::ServerHandle,
155    validator_overload_monitor_handle: Option<JoinHandle<()>>,
156    consensus_manager: ConsensusManager,
157    consensus_store_pruner: ConsensusStorePruner,
158    consensus_adapter: Arc<ConsensusAdapter>,
159    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
160    checkpoint_service_tasks: JoinSet<()>,
161    checkpoint_metrics: Arc<CheckpointMetrics>,
162    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
163    validator_registry_id: RegistryID,
164}
165
166#[cfg(msim)]
167mod simulator {
168    use std::sync::atomic::AtomicBool;
169
170    use super::*;
171
172    pub(super) struct SimState {
173        pub sim_node: iota_simulator::runtime::NodeHandle,
174        pub sim_safe_mode_expected: AtomicBool,
175        _leak_detector: iota_simulator::NodeLeakDetector,
176    }
177
178    impl Default for SimState {
179        fn default() -> Self {
180            Self {
181                sim_node: iota_simulator::runtime::NodeHandle::current(),
182                sim_safe_mode_expected: AtomicBool::new(false),
183                _leak_detector: iota_simulator::NodeLeakDetector::new(),
184            }
185        }
186    }
187
188    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
189        + Send
190        + Sync
191        + 'static;
192
193    fn default_fetch_jwks(
194        _authority: AuthorityName,
195        _provider: &OIDCProvider,
196    ) -> IotaResult<Vec<(JwkId, JWK)>> {
197        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
198        // Just load a default Twitch jwk for testing.
199        parse_jwks(
200            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
201            &OIDCProvider::Twitch,
202        )
203        .map_err(|_| IotaError::JWKRetrieval)
204    }
205
206    thread_local! {
207        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
208    }
209
210    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
211        JWK_INJECTOR.with(|injector| injector.borrow().clone())
212    }
213
214    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
215        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
216    }
217}
218
219pub struct IotaNode {
220    config: NodeConfig,
221    validator_components: Mutex<Option<ValidatorComponents>>,
222    /// The http server responsible for serving JSON-RPC as well as the
223    /// experimental rest service
224    _http_server: Option<iota_http::ServerHandle>,
225    state: Arc<AuthorityState>,
226    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
227    registry_service: RegistryService,
228    metrics: Arc<IotaNodeMetrics>,
229
230    _discovery: discovery::Handle,
231    state_sync_handle: state_sync::Handle,
232    randomness_handle: randomness::Handle,
233    checkpoint_store: Arc<CheckpointStore>,
234    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
235    connection_monitor_status: Arc<ConnectionMonitorStatus>,
236
237    /// Broadcast channel to send the starting system state for the next epoch.
238    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
239
240    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
241    /// peers.
242    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
243
244    backpressure_manager: Arc<BackpressureManager>,
245
246    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
247
248    #[cfg(msim)]
249    sim_state: SimState,
250
251    _state_archive_handle: Option<broadcast::Sender<()>>,
252
253    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
254    // Channel to allow signaling upstream to shutdown iota-node
255    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
256
257    /// AuthorityAggregator of the network, created at start and beginning of
258    /// each epoch. Use ArcSwap so that we could mutate it without taking
259    /// mut reference.
260    // TODO: Eventually we can make this auth aggregator a shared reference so that this
261    // update will automatically propagate to other uses.
262    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
263}
264
265impl fmt::Debug for IotaNode {
266    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
267        f.debug_struct("IotaNode")
268            .field("name", &self.state.name.concise())
269            .finish()
270    }
271}
272
273static MAX_JWK_KEYS_PER_FETCH: usize = 100;
274
275impl IotaNode {
276    pub async fn start(
277        config: NodeConfig,
278        registry_service: RegistryService,
279        custom_rpc_runtime: Option<Handle>,
280    ) -> Result<Arc<IotaNode>> {
281        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
282    }
283
284    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
285    /// configuration.
286    /// This function ensures continuous fetching, validation, and submission of
287    /// JWKs, maintaining up-to-date keys for the specified providers.
288    fn start_jwk_updater(
289        config: &NodeConfig,
290        metrics: Arc<IotaNodeMetrics>,
291        authority: AuthorityName,
292        epoch_store: Arc<AuthorityPerEpochStore>,
293        consensus_adapter: Arc<ConsensusAdapter>,
294    ) {
295        let epoch = epoch_store.epoch();
296
297        let supported_providers = config
298            .zklogin_oauth_providers
299            .get(&epoch_store.get_chain_identifier().chain())
300            .unwrap_or(&BTreeSet::new())
301            .iter()
302            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
303            .collect::<Vec<_>>();
304
305        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
306
307        info!(
308            ?fetch_interval,
309            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
310        );
311
312        fn validate_jwk(
313            metrics: &Arc<IotaNodeMetrics>,
314            provider: &OIDCProvider,
315            id: &JwkId,
316            jwk: &JWK,
317        ) -> bool {
318            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
319                warn!(
320                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
321                    id.iss, provider
322                );
323                metrics
324                    .invalid_jwks
325                    .with_label_values(&[&provider.to_string()])
326                    .inc();
327                return false;
328            };
329
330            if iss_provider != *provider {
331                warn!(
332                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
333                    id.iss, provider, iss_provider
334                );
335                metrics
336                    .invalid_jwks
337                    .with_label_values(&[&provider.to_string()])
338                    .inc();
339                return false;
340            }
341
342            if !check_total_jwk_size(id, jwk) {
343                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
344                metrics
345                    .invalid_jwks
346                    .with_label_values(&[&provider.to_string()])
347                    .inc();
348                return false;
349            }
350
351            true
352        }
353
354        // metrics is:
355        //  pub struct IotaNodeMetrics {
356        //      pub jwk_requests: IntCounterVec,
357        //      pub jwk_request_errors: IntCounterVec,
358        //      pub total_jwks: IntCounterVec,
359        //      pub unique_jwks: IntCounterVec,
360        //  }
361
362        for p in supported_providers.into_iter() {
363            let provider_str = p.to_string();
364            let epoch_store = epoch_store.clone();
365            let consensus_adapter = consensus_adapter.clone();
366            let metrics = metrics.clone();
367            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
368                async move {
369                    // note: restart-safe de-duplication happens after consensus, this is
370                    // just best-effort to reduce unneeded submissions.
371                    let mut seen = HashSet::new();
372                    loop {
373                        info!("fetching JWK for provider {:?}", p);
374                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
375                        match Self::fetch_jwks(authority, &p).await {
376                            Err(e) => {
377                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
378                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
379                                // Retry in 30 seconds
380                                tokio::time::sleep(Duration::from_secs(30)).await;
381                                continue;
382                            }
383                            Ok(mut keys) => {
384                                metrics.total_jwks
385                                    .with_label_values(&[&provider_str])
386                                    .inc_by(keys.len() as u64);
387
388                                keys.retain(|(id, jwk)| {
389                                    validate_jwk(&metrics, &p, id, jwk) &&
390                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
391                                    seen.insert((id.clone(), jwk.clone()))
392                                });
393
394                                metrics.unique_jwks
395                                    .with_label_values(&[&provider_str])
396                                    .inc_by(keys.len() as u64);
397
398                                // prevent oauth providers from sending too many keys,
399                                // inadvertently or otherwise
400                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
401                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
402                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
403                                }
404
405                                for (id, jwk) in keys.into_iter() {
406                                    info!("Submitting JWK to consensus: {:?}", id);
407
408                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
409                                    consensus_adapter.submit(txn, None, &epoch_store)
410                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
411                                        .ok();
412                                }
413                            }
414                        }
415                        tokio::time::sleep(fetch_interval).await;
416                    }
417                }
418                .instrument(error_span!("jwk_updater_task", epoch)),
419            ));
420        }
421    }
422
423    pub async fn start_async(
424        config: NodeConfig,
425        registry_service: RegistryService,
426        custom_rpc_runtime: Option<Handle>,
427        software_version: &'static str,
428    ) -> Result<Arc<IotaNode>> {
429        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
430        let mut config = config.clone();
431        if config.supported_protocol_versions.is_none() {
432            info!(
433                "populating config.supported_protocol_versions with default {:?}",
434                SupportedProtocolVersions::SYSTEM_DEFAULT
435            );
436            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
437        }
438
439        let run_with_range = config.run_with_range;
440        let is_validator = config.consensus_config().is_some();
441        let is_full_node = !is_validator;
442        let prometheus_registry = registry_service.default_registry();
443
444        info!(node =? config.authority_public_key(),
445            "Initializing iota-node listening on {}", config.network_address
446        );
447
448        let genesis = config.genesis()?.clone();
449
450        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
451        info!("IOTA chain identifier: {chain_identifier}");
452
453        // Check and set the db_corrupted flag
454        let db_corrupted_path = &config.db_path().join("status");
455        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
456            panic!("Failed to check database corruption: {err}");
457        }
458
459        // Initialize metrics to track db usage before creating any stores
460        DBMetrics::init(&prometheus_registry);
461
462        // Initialize IOTA metrics.
463        iota_metrics::init_metrics(&prometheus_registry);
464        // Unsupported (because of the use of static variable) and unnecessary in
465        // simtests.
466        #[cfg(not(msim))]
467        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
468
469        // Register hardware metrics.
470        register_hardware_metrics(&registry_service, &config.db_path)
471            .expect("Failed registering hardware metrics");
472        // Register uptime metric
473        prometheus_registry
474            .register(iota_metrics::uptime_metric(
475                if is_validator {
476                    "validator"
477                } else {
478                    "fullnode"
479                },
480                software_version,
481                &chain_identifier.to_string(),
482            ))
483            .expect("Failed registering uptime metric");
484
485        // If genesis come with some migration data then load them into memory from the
486        // file path specified in config.
487        let migration_tx_data = if genesis.contains_migrations() {
488            // Here the load already verifies that the content of the migration blob is
489            // valid in respect to the content found in genesis
490            Some(config.load_migration_tx_data()?)
491        } else {
492            None
493        };
494
495        let secret = Arc::pin(config.authority_key_pair().copy());
496        let genesis_committee = genesis.committee()?;
497        let committee_store = Arc::new(CommitteeStore::new(
498            config.db_path().join("epochs"),
499            &genesis_committee,
500            None,
501        ));
502
503        let mut pruner_db = None;
504        if config
505            .authority_store_pruning_config
506            .enable_compaction_filter
507        {
508            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
509                &config.db_path().join("store"),
510            )));
511        }
512        let compaction_filter = pruner_db
513            .clone()
514            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
515
516        // By default, only enable write stall on validators for perpetual db.
517        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
518        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
519            enable_write_stall,
520            compaction_filter,
521        };
522        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
523            &config.db_path().join("store"),
524            Some(perpetual_tables_options),
525        ));
526        let is_genesis = perpetual_tables
527            .database_is_empty()
528            .expect("Database read should not fail at init.");
529        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
530        let backpressure_manager =
531            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
532
533        let store = AuthorityStore::open(
534            perpetual_tables,
535            &genesis,
536            &config,
537            &prometheus_registry,
538            migration_tx_data.as_ref(),
539        )
540        .await?;
541
542        let cur_epoch = store.get_recovery_epoch_at_restart()?;
543        let committee = committee_store
544            .get_committee(&cur_epoch)?
545            .expect("Committee of the current epoch must exist");
546        let epoch_start_configuration = store
547            .get_epoch_start_configuration()?
548            .expect("EpochStartConfiguration of the current epoch must exist");
549        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
550        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
551
552        let cache_traits = build_execution_cache(
553            &config.execution_cache_config,
554            &epoch_start_configuration,
555            &prometheus_registry,
556            &store,
557            backpressure_manager.clone(),
558        );
559
560        let auth_agg = {
561            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
562            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
563            Arc::new(ArcSwap::new(Arc::new(
564                AuthorityAggregator::new_from_epoch_start_state(
565                    epoch_start_configuration.epoch_start_state(),
566                    &committee_store,
567                    safe_client_metrics_base,
568                    auth_agg_metrics,
569                ),
570            )))
571        };
572
573        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
574        let epoch_store = AuthorityPerEpochStore::new(
575            config.authority_public_key(),
576            committee.clone(),
577            &config.db_path().join("store"),
578            Some(epoch_options.options),
579            EpochMetrics::new(&registry_service.default_registry()),
580            epoch_start_configuration,
581            cache_traits.backing_package_store.clone(),
582            cache_traits.object_store.clone(),
583            cache_metrics,
584            signature_verifier_metrics,
585            &config.expensive_safety_check_config,
586            ChainIdentifier::from(*genesis.checkpoint().digest()),
587            checkpoint_store
588                .get_highest_executed_checkpoint_seq_number()
589                .expect("checkpoint store read cannot fail")
590                .unwrap_or(0),
591        );
592
593        info!("created epoch store");
594
595        replay_log!(
596            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
597            epoch_store.epoch(),
598            epoch_store.protocol_config()
599        );
600
601        // the database is empty at genesis time
602        if is_genesis {
603            info!("checking IOTA conservation at genesis");
604            // When we are opening the db table, the only time when it's safe to
605            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
606            // an epoch and the IOTA conservation check will fail. This also initialize
607            // the expected_network_iota_amount table.
608            cache_traits
609                .reconfig_api
610                .try_expensive_check_iota_conservation(&epoch_store, None)
611                .expect("IOTA conservation check cannot fail at genesis");
612        }
613
614        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
615        let default_buffer_stake = epoch_store
616            .protocol_config()
617            .buffer_stake_for_protocol_upgrade_bps();
618        if effective_buffer_stake != default_buffer_stake {
619            warn!(
620                ?effective_buffer_stake,
621                ?default_buffer_stake,
622                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
623            );
624        }
625
626        checkpoint_store.insert_genesis_checkpoint(
627            genesis.checkpoint(),
628            genesis.checkpoint_contents().clone(),
629            &epoch_store,
630        );
631
632        // Database has everything from genesis, set corrupted key to 0
633        unmark_db_corruption(db_corrupted_path)?;
634
635        info!("creating state sync store");
636        let state_sync_store = RocksDbStore::new(
637            cache_traits.clone(),
638            committee_store.clone(),
639            checkpoint_store.clone(),
640        );
641
642        let index_store = if is_full_node && config.enable_index_processing {
643            info!("creating index store");
644            Some(Arc::new(IndexStore::new(
645                config.db_path().join("indexes"),
646                &prometheus_registry,
647                epoch_store
648                    .protocol_config()
649                    .max_move_identifier_len_as_option(),
650                config.remove_deprecated_tables,
651            )))
652        } else {
653            None
654        };
655
656        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
657        {
658            Some(Arc::new(RestIndexStore::new(
659                config.db_path().join("rest_index"),
660                &store,
661                &checkpoint_store,
662                &epoch_store,
663                &cache_traits.backing_package_store,
664            )))
665        } else {
666            None
667        };
668
669        info!("creating archive reader");
670        // Create network
671        // TODO only configure validators as seed/preferred peers for validators and not
672        // for fullnodes once we've had a chance to re-work fullnode
673        // configuration generation.
674        let archive_readers =
675            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
676        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
677        let (randomness_tx, randomness_rx) = mpsc::channel(
678            config
679                .p2p_config
680                .randomness
681                .clone()
682                .unwrap_or_default()
683                .mailbox_capacity(),
684        );
685        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
686            Self::create_p2p_network(
687                &config,
688                state_sync_store.clone(),
689                chain_identifier,
690                trusted_peer_change_rx,
691                archive_readers.clone(),
692                randomness_tx,
693                &prometheus_registry,
694            )?;
695
696        // We must explicitly send this instead of relying on the initial value to
697        // trigger watch value change, so that state-sync is able to process it.
698        send_trusted_peer_change(
699            &config,
700            &trusted_peer_change_tx,
701            epoch_store.epoch_start_state(),
702        );
703
704        info!("start state archival");
705        // Start archiving local state to remote store
706        let state_archive_handle =
707            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
708                .await?;
709
710        info!("start snapshot upload");
711        // Start uploading state snapshot to remote store
712        let state_snapshot_handle =
713            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
714
715        // Start uploading db checkpoints to remote store
716        info!("start db checkpoint");
717        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
718            &config,
719            &prometheus_registry,
720            state_snapshot_handle.is_some(),
721        )?;
722
723        let mut genesis_objects = genesis.objects().to_vec();
724        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
725            genesis_objects.extend(migration_tx_data.get_objects());
726        }
727
728        let authority_name = config.authority_public_key();
729        let validator_tx_finalizer =
730            config
731                .enable_validator_tx_finalizer
732                .then_some(Arc::new(ValidatorTxFinalizer::new(
733                    auth_agg.clone(),
734                    authority_name,
735                    &prometheus_registry,
736                )));
737
738        info!("create authority state");
739        let state = AuthorityState::new(
740            authority_name,
741            secret,
742            config.supported_protocol_versions.unwrap(),
743            store.clone(),
744            cache_traits.clone(),
745            epoch_store.clone(),
746            committee_store.clone(),
747            index_store.clone(),
748            rest_index,
749            checkpoint_store.clone(),
750            &prometheus_registry,
751            &genesis_objects,
752            &db_checkpoint_config,
753            config.clone(),
754            archive_readers,
755            validator_tx_finalizer,
756            chain_identifier,
757            pruner_db,
758        )
759        .await;
760
761        // ensure genesis and migration txs were executed
762        if epoch_store.epoch() == 0 {
763            let genesis_tx = &genesis.transaction();
764            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
765            // Execute genesis transaction
766            Self::execute_transaction_immediately_at_zero_epoch(
767                &state,
768                &epoch_store,
769                genesis_tx,
770                span,
771            )
772            .await;
773
774            // Execute migration transactions if present
775            if let Some(migration_tx_data) = migration_tx_data {
776                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
777                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
778                    Self::execute_transaction_immediately_at_zero_epoch(
779                        &state,
780                        &epoch_store,
781                        tx,
782                        span,
783                    )
784                    .await;
785                }
786            }
787        }
788
789        // Start the loop that receives new randomness and generates transactions for
790        // it.
791        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
792
793        if config
794            .expensive_safety_check_config
795            .enable_secondary_index_checks()
796        {
797            if let Some(indexes) = state.indexes.clone() {
798                iota_core::verify_indexes::verify_indexes(
799                    state.get_accumulator_store().as_ref(),
800                    indexes,
801                )
802                .expect("secondary indexes are inconsistent");
803            }
804        }
805
806        let (end_of_epoch_channel, end_of_epoch_receiver) =
807            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
808
809        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
810            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
811                auth_agg.load_full(),
812                state.clone(),
813                end_of_epoch_receiver,
814                &config.db_path(),
815                &prometheus_registry,
816            )))
817        } else {
818            None
819        };
820
821        let http_server = build_http_server(
822            state.clone(),
823            state_sync_store,
824            &transaction_orchestrator.clone(),
825            &config,
826            &prometheus_registry,
827            custom_rpc_runtime,
828            software_version,
829        )
830        .await?;
831
832        let accumulator = Arc::new(StateAccumulator::new(
833            cache_traits.accumulator_store.clone(),
834            StateAccumulatorMetrics::new(&prometheus_registry),
835        ));
836
837        let authority_names_to_peer_ids = epoch_store
838            .epoch_start_state()
839            .get_authority_names_to_peer_ids();
840
841        let network_connection_metrics =
842            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
843
844        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
845
846        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
847            p2p_network.downgrade(),
848            network_connection_metrics,
849            HashMap::new(),
850            None,
851        );
852
853        let connection_monitor_status = ConnectionMonitorStatus {
854            connection_statuses,
855            authority_names_to_peer_ids,
856        };
857
858        let connection_monitor_status = Arc::new(connection_monitor_status);
859        let iota_node_metrics =
860            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
861
862        let validator_components = if state.is_validator(&epoch_store) {
863            let (components, _) = futures::join!(
864                Self::construct_validator_components(
865                    config.clone(),
866                    state.clone(),
867                    committee,
868                    epoch_store.clone(),
869                    checkpoint_store.clone(),
870                    state_sync_handle.clone(),
871                    randomness_handle.clone(),
872                    Arc::downgrade(&accumulator),
873                    backpressure_manager.clone(),
874                    connection_monitor_status.clone(),
875                    &registry_service,
876                    iota_node_metrics.clone(),
877                ),
878                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
879            );
880            let mut components = components?;
881
882            components.consensus_adapter.submit_recovered(&epoch_store);
883
884            // Start the gRPC server
885            components.validator_server_spawn_handle =
886                components.validator_server_spawn_handle.start();
887
888            Some(components)
889        } else {
890            None
891        };
892
893        // setup shutdown channel
894        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
895
896        let node = Self {
897            config,
898            validator_components: Mutex::new(validator_components),
899            _http_server: http_server,
900            state,
901            transaction_orchestrator,
902            registry_service,
903            metrics: iota_node_metrics,
904
905            _discovery: discovery_handle,
906            state_sync_handle,
907            randomness_handle,
908            checkpoint_store,
909            accumulator: Mutex::new(Some(accumulator)),
910            end_of_epoch_channel,
911            connection_monitor_status,
912            trusted_peer_change_tx,
913            backpressure_manager,
914
915            _db_checkpoint_handle: db_checkpoint_handle,
916
917            #[cfg(msim)]
918            sim_state: Default::default(),
919
920            _state_archive_handle: state_archive_handle,
921            _state_snapshot_uploader_handle: state_snapshot_handle,
922            shutdown_channel_tx: shutdown_channel,
923
924            auth_agg,
925        };
926
927        info!("IotaNode started!");
928        let node = Arc::new(node);
929        let node_copy = node.clone();
930        spawn_monitored_task!(async move {
931            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
932            if let Err(error) = result {
933                warn!("Reconfiguration finished with error {:?}", error);
934            }
935        });
936
937        Ok(node)
938    }
939
940    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
941        self.end_of_epoch_channel.subscribe()
942    }
943
944    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
945        self.shutdown_channel_tx.subscribe()
946    }
947
948    pub fn current_epoch_for_testing(&self) -> EpochId {
949        self.state.current_epoch_for_testing()
950    }
951
952    pub fn db_checkpoint_path(&self) -> PathBuf {
953        self.config.db_checkpoint_path()
954    }
955
956    // Init reconfig process by starting to reject user certs
957    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
958        info!("close_epoch (current epoch = {})", epoch_store.epoch());
959        self.validator_components
960            .lock()
961            .await
962            .as_ref()
963            .ok_or_else(|| IotaError::from("Node is not a validator"))?
964            .consensus_adapter
965            .close_epoch(epoch_store);
966        Ok(())
967    }
968
969    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
970        self.state
971            .clear_override_protocol_upgrade_buffer_stake(epoch)
972    }
973
974    pub fn set_override_protocol_upgrade_buffer_stake(
975        &self,
976        epoch: EpochId,
977        buffer_stake_bps: u64,
978    ) -> IotaResult {
979        self.state
980            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
981    }
982
983    // Testing-only API to start epoch close process.
984    // For production code, please use the non-testing version.
985    pub async fn close_epoch_for_testing(&self) -> IotaResult {
986        let epoch_store = self.state.epoch_store_for_testing();
987        self.close_epoch(&epoch_store).await
988    }
989
990    async fn start_state_archival(
991        config: &NodeConfig,
992        prometheus_registry: &Registry,
993        state_sync_store: RocksDbStore,
994    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
996            let local_store_config = ObjectStoreConfig {
997                object_store: Some(ObjectStoreType::File),
998                directory: Some(config.archive_path()),
999                ..Default::default()
1000            };
1001            let archive_writer = ArchiveWriter::new(
1002                local_store_config,
1003                remote_store_config.clone(),
1004                FileCompression::Zstd,
1005                StorageFormat::Blob,
1006                Duration::from_secs(600),
1007                256 * 1024 * 1024,
1008                prometheus_registry,
1009            )
1010            .await?;
1011            Ok(Some(archive_writer.start(state_sync_store).await?))
1012        } else {
1013            Ok(None)
1014        }
1015    }
1016
1017    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
1018    /// is set.
1019    fn start_state_snapshot(
1020        config: &NodeConfig,
1021        prometheus_registry: &Registry,
1022        checkpoint_store: Arc<CheckpointStore>,
1023    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1024        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1025            let snapshot_uploader = StateSnapshotUploader::new(
1026                &config.db_checkpoint_path(),
1027                &config.snapshot_path(),
1028                remote_store_config.clone(),
1029                60,
1030                prometheus_registry,
1031                checkpoint_store,
1032            )?;
1033            Ok(Some(snapshot_uploader.start()))
1034        } else {
1035            Ok(None)
1036        }
1037    }
1038
1039    fn start_db_checkpoint(
1040        config: &NodeConfig,
1041        prometheus_registry: &Registry,
1042        state_snapshot_enabled: bool,
1043    ) -> Result<(
1044        DBCheckpointConfig,
1045        Option<tokio::sync::broadcast::Sender<()>>,
1046    )> {
1047        let checkpoint_path = Some(
1048            config
1049                .db_checkpoint_config
1050                .checkpoint_path
1051                .clone()
1052                .unwrap_or_else(|| config.db_checkpoint_path()),
1053        );
1054        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1055            DBCheckpointConfig {
1056                checkpoint_path,
1057                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1058                    true
1059                } else {
1060                    config
1061                        .db_checkpoint_config
1062                        .perform_db_checkpoints_at_epoch_end
1063                },
1064                ..config.db_checkpoint_config.clone()
1065            }
1066        } else {
1067            config.db_checkpoint_config.clone()
1068        };
1069
1070        match (
1071            db_checkpoint_config.object_store_config.as_ref(),
1072            state_snapshot_enabled,
1073        ) {
1074            // If db checkpoint config object store not specified but
1075            // state snapshot object store is specified, create handler
1076            // anyway for marking db checkpoints as completed so that they
1077            // can be uploaded as state snapshots.
1078            (None, false) => Ok((db_checkpoint_config, None)),
1079            (_, _) => {
1080                let handler = DBCheckpointHandler::new(
1081                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1082                    db_checkpoint_config.object_store_config.as_ref(),
1083                    60,
1084                    db_checkpoint_config
1085                        .prune_and_compact_before_upload
1086                        .unwrap_or(true),
1087                    config.authority_store_pruning_config.clone(),
1088                    prometheus_registry,
1089                    state_snapshot_enabled,
1090                )?;
1091                Ok((
1092                    db_checkpoint_config,
1093                    Some(DBCheckpointHandler::start(handler)),
1094                ))
1095            }
1096        }
1097    }
1098
1099    fn create_p2p_network(
1100        config: &NodeConfig,
1101        state_sync_store: RocksDbStore,
1102        chain_identifier: ChainIdentifier,
1103        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1104        archive_readers: ArchiveReaderBalancer,
1105        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1106        prometheus_registry: &Registry,
1107    ) -> Result<(
1108        Network,
1109        discovery::Handle,
1110        state_sync::Handle,
1111        randomness::Handle,
1112    )> {
1113        let (state_sync, state_sync_server) = state_sync::Builder::new()
1114            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1115            .store(state_sync_store)
1116            .archive_readers(archive_readers)
1117            .with_metrics(prometheus_registry)
1118            .build();
1119
1120        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1121            .config(config.p2p_config.clone())
1122            .build();
1123
1124        let (randomness, randomness_router) =
1125            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1126                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1127                .with_metrics(prometheus_registry)
1128                .build();
1129
1130        let p2p_network = {
1131            let routes = anemo::Router::new()
1132                .add_rpc_service(discovery_server)
1133                .add_rpc_service(state_sync_server);
1134            let routes = routes.merge(randomness_router);
1135
1136            let inbound_network_metrics =
1137                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1138            let outbound_network_metrics =
1139                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1140
1141            let service = ServiceBuilder::new()
1142                .layer(
1143                    TraceLayer::new_for_server_errors()
1144                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1145                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1146                )
1147                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1148                    Arc::new(inbound_network_metrics),
1149                    config.p2p_config.excessive_message_size(),
1150                )))
1151                .service(routes);
1152
1153            let outbound_layer = ServiceBuilder::new()
1154                .layer(
1155                    TraceLayer::new_for_client_and_server_errors()
1156                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1157                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1158                )
1159                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1160                    Arc::new(outbound_network_metrics),
1161                    config.p2p_config.excessive_message_size(),
1162                )))
1163                .into_inner();
1164
1165            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1166            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1167            // many staking events in the epoch change txn.
1168            anemo_config.max_frame_size = Some(1 << 30);
1169
1170            // Set a higher default value for socket send/receive buffers if not already
1171            // configured.
1172            let mut quic_config = anemo_config.quic.unwrap_or_default();
1173            if quic_config.socket_send_buffer_size.is_none() {
1174                quic_config.socket_send_buffer_size = Some(20 << 20);
1175            }
1176            if quic_config.socket_receive_buffer_size.is_none() {
1177                quic_config.socket_receive_buffer_size = Some(20 << 20);
1178            }
1179            quic_config.allow_failed_socket_buffer_size_setting = true;
1180
1181            // Set high-performance defaults for quinn transport.
1182            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1183            if quic_config.max_concurrent_bidi_streams.is_none() {
1184                quic_config.max_concurrent_bidi_streams = Some(500);
1185            }
1186            if quic_config.max_concurrent_uni_streams.is_none() {
1187                quic_config.max_concurrent_uni_streams = Some(500);
1188            }
1189            if quic_config.stream_receive_window.is_none() {
1190                quic_config.stream_receive_window = Some(100 << 20);
1191            }
1192            if quic_config.receive_window.is_none() {
1193                quic_config.receive_window = Some(200 << 20);
1194            }
1195            if quic_config.send_window.is_none() {
1196                quic_config.send_window = Some(200 << 20);
1197            }
1198            if quic_config.crypto_buffer_size.is_none() {
1199                quic_config.crypto_buffer_size = Some(1 << 20);
1200            }
1201            if quic_config.max_idle_timeout_ms.is_none() {
1202                quic_config.max_idle_timeout_ms = Some(30_000);
1203            }
1204            if quic_config.keep_alive_interval_ms.is_none() {
1205                quic_config.keep_alive_interval_ms = Some(5_000);
1206            }
1207            anemo_config.quic = Some(quic_config);
1208
1209            let server_name = format!("iota-{chain_identifier}");
1210            let network = Network::bind(config.p2p_config.listen_address)
1211                .server_name(&server_name)
1212                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1213                .config(anemo_config)
1214                .outbound_request_layer(outbound_layer)
1215                .start(service)?;
1216            info!(
1217                server_name = server_name,
1218                "P2p network started on {}",
1219                network.local_addr()
1220            );
1221
1222            network
1223        };
1224
1225        let discovery_handle =
1226            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1227        let state_sync_handle = state_sync.start(p2p_network.clone());
1228        let randomness_handle = randomness.start(p2p_network.clone());
1229
1230        Ok((
1231            p2p_network,
1232            discovery_handle,
1233            state_sync_handle,
1234            randomness_handle,
1235        ))
1236    }
1237
1238    /// Asynchronously constructs and initializes the components necessary for
1239    /// the validator node.
1240    async fn construct_validator_components(
1241        config: NodeConfig,
1242        state: Arc<AuthorityState>,
1243        committee: Arc<Committee>,
1244        epoch_store: Arc<AuthorityPerEpochStore>,
1245        checkpoint_store: Arc<CheckpointStore>,
1246        state_sync_handle: state_sync::Handle,
1247        randomness_handle: randomness::Handle,
1248        accumulator: Weak<StateAccumulator>,
1249        backpressure_manager: Arc<BackpressureManager>,
1250        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1251        registry_service: &RegistryService,
1252        iota_node_metrics: Arc<IotaNodeMetrics>,
1253    ) -> Result<ValidatorComponents> {
1254        let mut config_clone = config.clone();
1255        let consensus_config = config_clone
1256            .consensus_config
1257            .as_mut()
1258            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1259        let validator_registry = Registry::new();
1260        let validator_registry_id = registry_service.add(validator_registry.clone());
1261
1262        let client = Arc::new(UpdatableConsensusClient::new());
1263        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1264            &committee,
1265            consensus_config,
1266            state.name,
1267            connection_monitor_status.clone(),
1268            &validator_registry,
1269            client.clone(),
1270            checkpoint_store.clone(),
1271        ));
1272        let consensus_manager = ConsensusManager::new(
1273            &config,
1274            consensus_config,
1275            registry_service,
1276            &validator_registry,
1277            client,
1278        );
1279
1280        // This only gets started up once, not on every epoch. (Make call to remove
1281        // every epoch.)
1282        let consensus_store_pruner = ConsensusStorePruner::new(
1283            consensus_manager.get_storage_base_path(),
1284            consensus_config.db_retention_epochs(),
1285            consensus_config.db_pruner_period(),
1286            &validator_registry,
1287        );
1288
1289        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1290        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1291
1292        let (validator_server_spawn_handle, validator_server_handle) =
1293            Self::start_grpc_validator_service(
1294                &config,
1295                state.clone(),
1296                consensus_adapter.clone(),
1297                &validator_registry,
1298            )
1299            .await?;
1300
1301        // Starts an overload monitor that monitors the execution of the authority.
1302        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1303        let validator_overload_monitor_handle = if config
1304            .authority_overload_config
1305            .max_load_shedding_percentage
1306            > 0
1307        {
1308            let authority_state = Arc::downgrade(&state);
1309            let overload_config = config.authority_overload_config.clone();
1310            fail_point!("starting_overload_monitor");
1311            Some(spawn_monitored_task!(overload_monitor(
1312                authority_state,
1313                overload_config,
1314            )))
1315        } else {
1316            None
1317        };
1318
1319        Self::start_epoch_specific_validator_components(
1320            &config,
1321            state.clone(),
1322            consensus_adapter,
1323            checkpoint_store,
1324            epoch_store,
1325            state_sync_handle,
1326            randomness_handle,
1327            consensus_manager,
1328            consensus_store_pruner,
1329            accumulator,
1330            backpressure_manager,
1331            validator_server_spawn_handle,
1332            validator_server_handle,
1333            validator_overload_monitor_handle,
1334            checkpoint_metrics,
1335            iota_node_metrics,
1336            iota_tx_validator_metrics,
1337            validator_registry_id,
1338        )
1339        .await
1340    }
1341
1342    /// Initializes and starts components specific to the current
1343    /// epoch for the validator node.
1344    async fn start_epoch_specific_validator_components(
1345        config: &NodeConfig,
1346        state: Arc<AuthorityState>,
1347        consensus_adapter: Arc<ConsensusAdapter>,
1348        checkpoint_store: Arc<CheckpointStore>,
1349        epoch_store: Arc<AuthorityPerEpochStore>,
1350        state_sync_handle: state_sync::Handle,
1351        randomness_handle: randomness::Handle,
1352        consensus_manager: ConsensusManager,
1353        consensus_store_pruner: ConsensusStorePruner,
1354        accumulator: Weak<StateAccumulator>,
1355        backpressure_manager: Arc<BackpressureManager>,
1356        validator_server_spawn_handle: SpawnOnce,
1357        validator_server_handle: iota_http::ServerHandle,
1358        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1359        checkpoint_metrics: Arc<CheckpointMetrics>,
1360        iota_node_metrics: Arc<IotaNodeMetrics>,
1361        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1362        validator_registry_id: RegistryID,
1363    ) -> Result<ValidatorComponents> {
1364        let checkpoint_service = Self::build_checkpoint_service(
1365            config,
1366            consensus_adapter.clone(),
1367            checkpoint_store.clone(),
1368            epoch_store.clone(),
1369            state.clone(),
1370            state_sync_handle,
1371            accumulator,
1372            checkpoint_metrics.clone(),
1373        );
1374
1375        // create a new map that gets injected into both the consensus handler and the
1376        // consensus adapter the consensus handler will write values forwarded
1377        // from consensus, and the consensus adapter will read the values to
1378        // make decisions about which validator submits a transaction to consensus
1379        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1380
1381        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1382
1383        let randomness_manager = RandomnessManager::try_new(
1384            Arc::downgrade(&epoch_store),
1385            Box::new(consensus_adapter.clone()),
1386            randomness_handle,
1387            config.authority_key_pair(),
1388        )
1389        .await;
1390        if let Some(randomness_manager) = randomness_manager {
1391            epoch_store
1392                .set_randomness_manager(randomness_manager)
1393                .await?;
1394        }
1395
1396        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1397            state.clone(),
1398            checkpoint_service.clone(),
1399            epoch_store.clone(),
1400            low_scoring_authorities,
1401            backpressure_manager,
1402        );
1403
1404        info!("Starting consensus manager");
1405
1406        consensus_manager
1407            .start(
1408                config,
1409                epoch_store.clone(),
1410                consensus_handler_initializer,
1411                IotaTxValidator::new(
1412                    epoch_store.clone(),
1413                    checkpoint_service.clone(),
1414                    state.transaction_manager().clone(),
1415                    iota_tx_validator_metrics.clone(),
1416                ),
1417            )
1418            .await;
1419
1420        if !epoch_store
1421            .epoch_start_config()
1422            .is_data_quarantine_active_from_beginning_of_epoch()
1423        {
1424            checkpoint_store
1425                .reexecute_local_checkpoints(&state, &epoch_store)
1426                .await;
1427        }
1428
1429        info!("Spawning checkpoint service");
1430        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1431
1432        if epoch_store.authenticator_state_enabled() {
1433            Self::start_jwk_updater(
1434                config,
1435                iota_node_metrics,
1436                state.name,
1437                epoch_store.clone(),
1438                consensus_adapter.clone(),
1439            );
1440        }
1441
1442        Ok(ValidatorComponents {
1443            validator_server_spawn_handle,
1444            validator_server_handle,
1445            validator_overload_monitor_handle,
1446            consensus_manager,
1447            consensus_store_pruner,
1448            consensus_adapter,
1449            checkpoint_service_tasks,
1450            checkpoint_metrics,
1451            iota_tx_validator_metrics,
1452            validator_registry_id,
1453        })
1454    }
1455
1456    /// Starts the checkpoint service for the validator node, initializing
1457    /// necessary components and settings.
1458    /// The function ensures proper initialization of the checkpoint service,
1459    /// preparing it to handle checkpoint creation and submission to consensus,
1460    /// while also setting up the necessary monitoring and synchronization
1461    /// mechanisms.
1462    fn build_checkpoint_service(
1463        config: &NodeConfig,
1464        consensus_adapter: Arc<ConsensusAdapter>,
1465        checkpoint_store: Arc<CheckpointStore>,
1466        epoch_store: Arc<AuthorityPerEpochStore>,
1467        state: Arc<AuthorityState>,
1468        state_sync_handle: state_sync::Handle,
1469        accumulator: Weak<StateAccumulator>,
1470        checkpoint_metrics: Arc<CheckpointMetrics>,
1471    ) -> Arc<CheckpointService> {
1472        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1473        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1474
1475        debug!(
1476            "Starting checkpoint service with epoch start timestamp {}
1477            and epoch duration {}",
1478            epoch_start_timestamp_ms, epoch_duration_ms
1479        );
1480
1481        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1482            sender: consensus_adapter,
1483            signer: state.secret.clone(),
1484            authority: config.authority_public_key(),
1485            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1486                .checked_add(epoch_duration_ms)
1487                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1488            metrics: checkpoint_metrics.clone(),
1489        });
1490
1491        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1492        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1493        let max_checkpoint_size_bytes =
1494            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1495
1496        CheckpointService::build(
1497            state.clone(),
1498            checkpoint_store,
1499            epoch_store,
1500            state.get_transaction_cache_reader().clone(),
1501            accumulator,
1502            checkpoint_output,
1503            Box::new(certified_checkpoint_output),
1504            checkpoint_metrics,
1505            max_tx_per_checkpoint,
1506            max_checkpoint_size_bytes,
1507        )
1508    }
1509
1510    fn construct_consensus_adapter(
1511        committee: &Committee,
1512        consensus_config: &ConsensusConfig,
1513        authority: AuthorityName,
1514        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1515        prometheus_registry: &Registry,
1516        consensus_client: Arc<dyn ConsensusClient>,
1517        checkpoint_store: Arc<CheckpointStore>,
1518    ) -> ConsensusAdapter {
1519        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1520        // The consensus adapter allows the authority to send user certificates through
1521        // consensus.
1522
1523        ConsensusAdapter::new(
1524            consensus_client,
1525            checkpoint_store,
1526            authority,
1527            connection_monitor_status,
1528            consensus_config.max_pending_transactions(),
1529            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1530            consensus_config.max_submit_position,
1531            consensus_config.submit_delay_step_override(),
1532            ca_metrics,
1533        )
1534    }
1535
1536    async fn start_grpc_validator_service(
1537        config: &NodeConfig,
1538        state: Arc<AuthorityState>,
1539        consensus_adapter: Arc<ConsensusAdapter>,
1540        prometheus_registry: &Registry,
1541    ) -> Result<(SpawnOnce, iota_http::ServerHandle)> {
1542        let validator_service = ValidatorService::new(
1543            state.clone(),
1544            consensus_adapter,
1545            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1546            TrafficControllerMetrics::new(prometheus_registry),
1547            config.policy_config.clone(),
1548            config.firewall_config.clone(),
1549        );
1550
1551        let mut server_conf = iota_network_stack::config::Config::new();
1552        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1553        server_conf.load_shed = config.grpc_load_shed;
1554        let mut server_builder =
1555            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1556
1557        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1558
1559        let tls_config = iota_tls::create_rustls_server_config(
1560            config.network_key_pair().copy().private(),
1561            IOTA_TLS_SERVER_NAME.to_string(),
1562        );
1563
1564        let server = server_builder
1565            .bind(config.network_address(), Some(tls_config))
1566            .await
1567            .map_err(|err| anyhow!(err.to_string()))?;
1568
1569        let local_addr = server.local_addr();
1570        info!("Listening to traffic on {local_addr}");
1571
1572        // Clone the server handle for shutdown functionality
1573        let server_handle = server.handle().clone();
1574
1575        Ok((
1576            SpawnOnce::new(server.serve().map_err(Into::into)),
1577            server_handle,
1578        ))
1579    }
1580
1581    /// Re-executes pending consensus certificates, which may not have been
1582    /// committed to disk before the node restarted. This is necessary for
1583    /// the following reasons:
1584    ///
1585    /// 1. For any transaction for which we returned signed effects to a client,
1586    ///    we must ensure that we have re-executed the transaction before we
1587    ///    begin accepting grpc requests. Otherwise we would appear to have
1588    ///    forgotten about the transaction.
1589    /// 2. While this is running, we are concurrently waiting for all previously
1590    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1591    ///    either direction (from checkpointed consensus transactions to pending
1592    ///    consensus transactions, or vice versa), we must re-execute pending
1593    ///    consensus transactions to ensure that both processes can complete.
1594    /// 3. Also note that for any pending consensus transactions for which we
1595    ///    wrote a signed effects digest to disk, we must re-execute using that
1596    ///    digest as the expected effects digest, to ensure that we cannot
1597    ///    arrive at different effects than what we previously signed.
1598    async fn reexecute_pending_consensus_certs(
1599        epoch_store: &Arc<AuthorityPerEpochStore>,
1600        state: &Arc<AuthorityState>,
1601    ) {
1602        let mut pending_consensus_certificates = Vec::new();
1603        let mut additional_certs = Vec::new();
1604
1605        for tx in epoch_store.get_all_pending_consensus_transactions() {
1606            match tx.kind {
1607                // Shared object txns cannot be re-executed at this point, because we must wait for
1608                // consensus replay to assign shared object versions.
1609                ConsensusTransactionKind::CertifiedTransaction(tx)
1610                    if !tx.contains_shared_object() =>
1611                {
1612                    let tx = *tx;
1613                    // new_unchecked is safe because we never submit a transaction to consensus
1614                    // without verifying it
1615                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1616                        VerifiedCertificate::new_unchecked(tx),
1617                    );
1618                    // we only need to re-execute if we previously signed the effects (which
1619                    // indicates we returned the effects to a client).
1620                    if let Some(fx_digest) = epoch_store
1621                        .get_signed_effects_digest(tx.digest())
1622                        .expect("db error")
1623                    {
1624                        pending_consensus_certificates.push((tx, fx_digest));
1625                    } else {
1626                        additional_certs.push(tx);
1627                    }
1628                }
1629                _ => (),
1630            }
1631        }
1632
1633        let digests = pending_consensus_certificates
1634            .iter()
1635            .map(|(tx, _)| *tx.digest())
1636            .collect::<Vec<_>>();
1637
1638        info!(
1639            "reexecuting {} pending consensus certificates: {:?}",
1640            digests.len(),
1641            digests
1642        );
1643
1644        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1645        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1646
1647        // If this times out, the validator will still almost certainly start up fine.
1648        // But, it is possible that it may temporarily "forget" about
1649        // transactions that it had previously executed. This could confuse
1650        // clients in some circumstances. However, the transactions are still in
1651        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1652        let timeout = if cfg!(msim) { 120 } else { 60 };
1653        if tokio::time::timeout(
1654            std::time::Duration::from_secs(timeout),
1655            state
1656                .get_transaction_cache_reader()
1657                .try_notify_read_executed_effects_digests(&digests),
1658        )
1659        .await
1660        .is_err()
1661        {
1662            // Log all the digests that were not executed to help debugging.
1663            if let Ok(executed_effects_digests) = state
1664                .get_transaction_cache_reader()
1665                .try_multi_get_executed_effects_digests(&digests)
1666            {
1667                let pending_digests = digests
1668                    .iter()
1669                    .zip(executed_effects_digests.iter())
1670                    .filter_map(|(digest, executed_effects_digest)| {
1671                        if executed_effects_digest.is_none() {
1672                            Some(digest)
1673                        } else {
1674                            None
1675                        }
1676                    })
1677                    .collect::<Vec<_>>();
1678                debug_fatal!(
1679                    "Timed out waiting for effects digests to be executed: {:?}",
1680                    pending_digests
1681                );
1682            } else {
1683                debug_fatal!(
1684                    "Timed out waiting for effects digests to be executed, digests not found"
1685                );
1686            }
1687        }
1688    }
1689
1690    pub fn state(&self) -> Arc<AuthorityState> {
1691        self.state.clone()
1692    }
1693
1694    // Only used for testing because of how epoch store is loaded.
1695    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1696        self.state.reference_gas_price_for_testing()
1697    }
1698
1699    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1700        self.state.committee_store().clone()
1701    }
1702
1703    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1704    // self.state.db()
1705    // }
1706
1707    /// Clone an AuthorityAggregator currently used in this node's
1708    /// QuorumDriver, if the node is a fullnode. After reconfig,
1709    /// QuorumDriver builds a new AuthorityAggregator. The caller
1710    /// of this function will mostly likely want to call this again
1711    /// to get a fresh one.
1712    pub fn clone_authority_aggregator(
1713        &self,
1714    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1715        self.transaction_orchestrator
1716            .as_ref()
1717            .map(|to| to.clone_authority_aggregator())
1718    }
1719
1720    pub fn transaction_orchestrator(
1721        &self,
1722    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1723        self.transaction_orchestrator.clone()
1724    }
1725
1726    pub fn subscribe_to_transaction_orchestrator_effects(
1727        &self,
1728    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1729        self.transaction_orchestrator
1730            .as_ref()
1731            .map(|to| to.subscribe_to_effects_queue())
1732            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1733    }
1734
1735    /// This function awaits the completion of checkpoint execution of the
1736    /// current epoch, after which it initiates reconfiguration of the
1737    /// entire system. This function also handles role changes for the node when
1738    /// epoch changes and advertises capabilities to the committee if the node
1739    /// is a validator.
1740    pub async fn monitor_reconfiguration(
1741        self: Arc<Self>,
1742        mut epoch_store: Arc<AuthorityPerEpochStore>,
1743    ) -> Result<()> {
1744        let checkpoint_executor_metrics =
1745            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1746
1747        loop {
1748            let mut accumulator_guard = self.accumulator.lock().await;
1749            let accumulator = accumulator_guard.take().unwrap();
1750            info!(
1751                "Creating checkpoint executor for epoch {}",
1752                epoch_store.epoch()
1753            );
1754            let checkpoint_executor = CheckpointExecutor::new(
1755                epoch_store.clone(),
1756                self.checkpoint_store.clone(),
1757                self.state.clone(),
1758                accumulator.clone(),
1759                self.backpressure_manager.clone(),
1760                self.config.checkpoint_executor_config.clone(),
1761                checkpoint_executor_metrics.clone(),
1762            );
1763
1764            let run_with_range = self.config.run_with_range;
1765
1766            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1767
1768            // Advertise capabilities to committee, if we are a validator.
1769            if let Some(components) = &*self.validator_components.lock().await {
1770                // TODO: without this sleep, the consensus message is not delivered reliably.
1771                tokio::time::sleep(Duration::from_millis(1)).await;
1772
1773                let config = cur_epoch_store.protocol_config();
1774                let binary_config = to_binary_config(config);
1775                let transaction = ConsensusTransaction::new_capability_notification_v1(
1776                    AuthorityCapabilitiesV1::new(
1777                        self.state.name,
1778                        cur_epoch_store.get_chain_identifier().chain(),
1779                        self.config
1780                            .supported_protocol_versions
1781                            .expect("Supported versions should be populated")
1782                            // no need to send digests of versions less than the current version
1783                            .truncate_below(config.version),
1784                        self.state
1785                            .get_available_system_packages(&binary_config)
1786                            .await,
1787                    ),
1788                );
1789                info!(?transaction, "submitting capabilities to consensus");
1790                components
1791                    .consensus_adapter
1792                    .submit(transaction, None, &cur_epoch_store)?;
1793            }
1794
1795            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1796
1797            if stop_condition == StopReason::RunWithRangeCondition {
1798                IotaNode::shutdown(&self).await;
1799                self.shutdown_channel_tx
1800                    .send(run_with_range)
1801                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1802                return Ok(());
1803            }
1804
1805            // Safe to call because we are in the middle of reconfiguration.
1806            let latest_system_state = self
1807                .state
1808                .get_object_cache_reader()
1809                .try_get_iota_system_state_object_unsafe()
1810                .expect("Read IOTA System State object cannot fail");
1811
1812            #[cfg(msim)]
1813            if !self
1814                .sim_state
1815                .sim_safe_mode_expected
1816                .load(Ordering::Relaxed)
1817            {
1818                debug_assert!(!latest_system_state.safe_mode());
1819            }
1820
1821            #[cfg(not(msim))]
1822            debug_assert!(!latest_system_state.safe_mode());
1823
1824            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1825                if self.state.is_fullnode(&cur_epoch_store) {
1826                    warn!(
1827                        "Failed to send end of epoch notification to subscriber: {:?}",
1828                        err
1829                    );
1830                }
1831            }
1832
1833            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1834            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1835
1836            self.auth_agg.store(Arc::new(
1837                self.auth_agg
1838                    .load()
1839                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1840            ));
1841
1842            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1843            let next_epoch = next_epoch_committee.epoch();
1844            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1845
1846            info!(
1847                next_epoch,
1848                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1849            );
1850
1851            fail_point_async!("reconfig_delay");
1852
1853            // We save the connection monitor status map regardless of validator / fullnode
1854            // status so that we don't need to restart the connection monitor
1855            // every epoch. Update the mappings that will be used by the
1856            // consensus adapter if it exists or is about to be created.
1857            let authority_names_to_peer_ids =
1858                new_epoch_start_state.get_authority_names_to_peer_ids();
1859            self.connection_monitor_status
1860                .update_mapping_for_epoch(authority_names_to_peer_ids);
1861
1862            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1863
1864            send_trusted_peer_change(
1865                &self.config,
1866                &self.trusted_peer_change_tx,
1867                &new_epoch_start_state,
1868            );
1869
1870            let mut validator_components_lock_guard = self.validator_components.lock().await;
1871
1872            // The following code handles 4 different cases, depending on whether the node
1873            // was a validator in the previous epoch, and whether the node is a validator
1874            // in the new epoch.
1875            let new_epoch_store = self
1876                .reconfigure_state(
1877                    &self.state,
1878                    &cur_epoch_store,
1879                    next_epoch_committee.clone(),
1880                    new_epoch_start_state,
1881                    accumulator.clone(),
1882                )
1883                .await?;
1884
1885            let new_validator_components = if let Some(ValidatorComponents {
1886                validator_server_spawn_handle,
1887                validator_server_handle,
1888                validator_overload_monitor_handle,
1889                consensus_manager,
1890                consensus_store_pruner,
1891                consensus_adapter,
1892                mut checkpoint_service_tasks,
1893                checkpoint_metrics,
1894                iota_tx_validator_metrics,
1895                validator_registry_id,
1896            }) = validator_components_lock_guard.take()
1897            {
1898                info!("Reconfiguring the validator.");
1899                // Cancel the old checkpoint service tasks.
1900                // Waiting for checkpoint builder to finish gracefully is not possible, because
1901                // it may wait on transactions while consensus on peers have
1902                // already shut down.
1903                checkpoint_service_tasks.abort_all();
1904                while let Some(result) = checkpoint_service_tasks.join_next().await {
1905                    if let Err(err) = result {
1906                        if err.is_panic() {
1907                            std::panic::resume_unwind(err.into_panic());
1908                        }
1909                        warn!("Error in checkpoint service task: {:?}", err);
1910                    }
1911                }
1912                info!("Checkpoint service has shut down.");
1913
1914                consensus_manager.shutdown().await;
1915                info!("Consensus has shut down.");
1916
1917                info!("Epoch store finished reconfiguration.");
1918
1919                // No other components should be holding a strong reference to state accumulator
1920                // at this point. Confirm here before we swap in the new accumulator.
1921                let accumulator_metrics = Arc::into_inner(accumulator)
1922                    .expect("Accumulator should have no other references at this point")
1923                    .metrics();
1924                let new_accumulator = Arc::new(StateAccumulator::new(
1925                    self.state.get_accumulator_store().clone(),
1926                    accumulator_metrics,
1927                ));
1928                let weak_accumulator = Arc::downgrade(&new_accumulator);
1929                *accumulator_guard = Some(new_accumulator);
1930
1931                consensus_store_pruner.prune(next_epoch).await;
1932
1933                if self.state.is_validator(&new_epoch_store) {
1934                    // Only restart consensus if this node is still a validator in the new epoch.
1935                    Some(
1936                        Self::start_epoch_specific_validator_components(
1937                            &self.config,
1938                            self.state.clone(),
1939                            consensus_adapter,
1940                            self.checkpoint_store.clone(),
1941                            new_epoch_store.clone(),
1942                            self.state_sync_handle.clone(),
1943                            self.randomness_handle.clone(),
1944                            consensus_manager,
1945                            consensus_store_pruner,
1946                            weak_accumulator,
1947                            self.backpressure_manager.clone(),
1948                            validator_server_spawn_handle,
1949                            validator_server_handle,
1950                            validator_overload_monitor_handle,
1951                            checkpoint_metrics,
1952                            self.metrics.clone(),
1953                            iota_tx_validator_metrics,
1954                            validator_registry_id,
1955                        )
1956                        .await?,
1957                    )
1958                } else {
1959                    info!("This node is no longer a validator after reconfiguration");
1960                    if self.registry_service.remove(validator_registry_id) {
1961                        debug!("Removed validator metrics registry");
1962                    } else {
1963                        warn!("Failed to remove validator metrics registry");
1964                    }
1965                    validator_server_handle.trigger_shutdown();
1966                    debug!("Validator grpc server shutdown triggered");
1967
1968                    None
1969                }
1970            } else {
1971                // No other components should be holding a strong reference to state accumulator
1972                // at this point. Confirm here before we swap in the new accumulator.
1973                let accumulator_metrics = Arc::into_inner(accumulator)
1974                    .expect("Accumulator should have no other references at this point")
1975                    .metrics();
1976                let new_accumulator = Arc::new(StateAccumulator::new(
1977                    self.state.get_accumulator_store().clone(),
1978                    accumulator_metrics,
1979                ));
1980                let weak_accumulator = Arc::downgrade(&new_accumulator);
1981                *accumulator_guard = Some(new_accumulator);
1982
1983                if self.state.is_validator(&new_epoch_store) {
1984                    info!("Promoting the node from fullnode to validator, starting grpc server");
1985
1986                    let mut components = Self::construct_validator_components(
1987                        self.config.clone(),
1988                        self.state.clone(),
1989                        Arc::new(next_epoch_committee.clone()),
1990                        new_epoch_store.clone(),
1991                        self.checkpoint_store.clone(),
1992                        self.state_sync_handle.clone(),
1993                        self.randomness_handle.clone(),
1994                        weak_accumulator,
1995                        self.backpressure_manager.clone(),
1996                        self.connection_monitor_status.clone(),
1997                        &self.registry_service,
1998                        self.metrics.clone(),
1999                    )
2000                    .await?;
2001
2002                    components.validator_server_spawn_handle =
2003                        components.validator_server_spawn_handle.start();
2004
2005                    Some(components)
2006                } else {
2007                    None
2008                }
2009            };
2010            *validator_components_lock_guard = new_validator_components;
2011
2012            // Force releasing current epoch store DB handle, because the
2013            // Arc<AuthorityPerEpochStore> may linger.
2014            cur_epoch_store.release_db_handles();
2015
2016            if cfg!(msim)
2017                && !matches!(
2018                    self.config
2019                        .authority_store_pruning_config
2020                        .num_epochs_to_retain_for_checkpoints(),
2021                    None | Some(u64::MAX) | Some(0)
2022                )
2023            {
2024                self.state
2025                .prune_checkpoints_for_eligible_epochs_for_testing(
2026                    self.config.clone(),
2027                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2028                )
2029                .await?;
2030            }
2031
2032            epoch_store = new_epoch_store;
2033            info!("Reconfiguration finished");
2034        }
2035    }
2036
2037    async fn shutdown(&self) {
2038        if let Some(validator_components) = &*self.validator_components.lock().await {
2039            validator_components.consensus_manager.shutdown().await;
2040        }
2041    }
2042
2043    /// Asynchronously reconfigures the state of the authority node for the next
2044    /// epoch.
2045    async fn reconfigure_state(
2046        &self,
2047        state: &Arc<AuthorityState>,
2048        cur_epoch_store: &AuthorityPerEpochStore,
2049        next_epoch_committee: Committee,
2050        next_epoch_start_system_state: EpochStartSystemState,
2051        accumulator: Arc<StateAccumulator>,
2052    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2053        let next_epoch = next_epoch_committee.epoch();
2054
2055        let last_checkpoint = self
2056            .checkpoint_store
2057            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2058            .expect("Error loading last checkpoint for current epoch")
2059            .expect("Could not load last checkpoint for current epoch");
2060        let epoch_supply_change = last_checkpoint
2061            .end_of_epoch_data
2062            .as_ref()
2063            .ok_or_else(|| {
2064                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2065            })?
2066            .epoch_supply_change;
2067
2068        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2069
2070        assert_eq!(
2071            Some(last_checkpoint_seq),
2072            self.checkpoint_store
2073                .get_highest_executed_checkpoint_seq_number()
2074                .expect("Error loading highest executed checkpoint sequence number")
2075        );
2076
2077        let epoch_start_configuration = EpochStartConfiguration::new(
2078            next_epoch_start_system_state,
2079            *last_checkpoint.digest(),
2080            state.get_object_store().as_ref(),
2081            EpochFlag::default_flags_for_new_epoch(&state.config),
2082        )
2083        .expect("EpochStartConfiguration construction cannot fail");
2084
2085        let new_epoch_store = self
2086            .state
2087            .reconfigure(
2088                cur_epoch_store,
2089                self.config.supported_protocol_versions.unwrap(),
2090                next_epoch_committee,
2091                epoch_start_configuration,
2092                accumulator,
2093                &self.config.expensive_safety_check_config,
2094                epoch_supply_change,
2095                last_checkpoint_seq,
2096            )
2097            .await
2098            .expect("Reconfigure authority state cannot fail");
2099        info!(next_epoch, "Node State has been reconfigured");
2100        assert_eq!(next_epoch, new_epoch_store.epoch());
2101        self.state.get_reconfig_api().update_epoch_flags_metrics(
2102            cur_epoch_store.epoch_start_config().flags(),
2103            new_epoch_store.epoch_start_config().flags(),
2104        );
2105
2106        Ok(new_epoch_store)
2107    }
2108
2109    pub fn get_config(&self) -> &NodeConfig {
2110        &self.config
2111    }
2112
2113    async fn execute_transaction_immediately_at_zero_epoch(
2114        state: &Arc<AuthorityState>,
2115        epoch_store: &Arc<AuthorityPerEpochStore>,
2116        tx: &Transaction,
2117        span: tracing::Span,
2118    ) {
2119        let _guard = span.enter();
2120        let transaction =
2121            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2122                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2123                    tx.data().clone(),
2124                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2125                ),
2126            );
2127        state
2128            .try_execute_immediately(&transaction, None, epoch_store)
2129            .unwrap();
2130    }
2131
2132    pub fn randomness_handle(&self) -> randomness::Handle {
2133        self.randomness_handle.clone()
2134    }
2135}
2136
2137#[cfg(not(msim))]
2138impl IotaNode {
2139    async fn fetch_jwks(
2140        _authority: AuthorityName,
2141        provider: &OIDCProvider,
2142    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2143        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2144        let client = reqwest::Client::new();
2145        fetch_jwks(provider, &client)
2146            .await
2147            .map_err(|_| IotaError::JWKRetrieval)
2148    }
2149}
2150
2151#[cfg(msim)]
2152impl IotaNode {
2153    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2154        self.sim_state.sim_node.id()
2155    }
2156
2157    pub fn set_safe_mode_expected(&self, new_value: bool) {
2158        info!("Setting safe mode expected to {}", new_value);
2159        self.sim_state
2160            .sim_safe_mode_expected
2161            .store(new_value, Ordering::Relaxed);
2162    }
2163
2164    async fn fetch_jwks(
2165        authority: AuthorityName,
2166        provider: &OIDCProvider,
2167    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2168        get_jwk_injector()(authority, provider)
2169    }
2170}
2171
2172enum SpawnOnce {
2173    // Mutex is only needed to make SpawnOnce Send
2174    Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2175    #[allow(unused)]
2176    Started(JoinHandle<Result<()>>),
2177}
2178
2179impl SpawnOnce {
2180    pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2181        Self::Unstarted(Mutex::new(Box::pin(future)))
2182    }
2183
2184    pub fn start(self) -> Self {
2185        match self {
2186            Self::Unstarted(future) => {
2187                let future = future.into_inner();
2188                let handle = tokio::spawn(future);
2189                Self::Started(handle)
2190            }
2191            Self::Started(_) => self,
2192        }
2193    }
2194}
2195
2196/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2197/// available.
2198fn send_trusted_peer_change(
2199    config: &NodeConfig,
2200    sender: &watch::Sender<TrustedPeerChangeEvent>,
2201    new_epoch_start_state: &EpochStartSystemState,
2202) {
2203    let new_committee =
2204        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2205
2206    sender.send_modify(|event| {
2207        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2208        event.new_committee = new_committee;
2209    })
2210}
2211
2212fn build_kv_store(
2213    state: &Arc<AuthorityState>,
2214    config: &NodeConfig,
2215    registry: &Registry,
2216) -> Result<Arc<TransactionKeyValueStore>> {
2217    let metrics = KeyValueStoreMetrics::new(registry);
2218    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2219
2220    let base_url = &config.transaction_kv_store_read_config.base_url;
2221
2222    if base_url.is_empty() {
2223        info!("no http kv store url provided, using local db only");
2224        return Ok(Arc::new(db_store));
2225    }
2226
2227    base_url.parse::<url::Url>().tap_err(|e| {
2228        error!(
2229            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2230            base_url, e
2231        )
2232    })?;
2233
2234    let http_store = HttpKVStore::new_kv(
2235        base_url,
2236        config.transaction_kv_store_read_config.cache_size,
2237        metrics.clone(),
2238    )?;
2239    info!("using local key-value store with fallback to http key-value store");
2240    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2241        db_store,
2242        http_store,
2243        metrics,
2244        "json_rpc_fallback",
2245    )))
2246}
2247
2248/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2249/// REST APIs based on the node's configuration.
2250///
2251/// This function performs the following tasks:
2252/// 1. Checks if the node is a validator by inspecting the consensus
2253///    configuration; if so, it returns early as validators do not expose these
2254///    APIs.
2255/// 2. Creates an Axum router to handle HTTP requests.
2256/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2257///    on the node's state and configuration, including CoinApi,
2258///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2259///    IndexerApi.
2260/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2261///    the `/api/v1` path.
2262/// 5. Binds the server to the specified JSON-RPC address and starts listening
2263///    for incoming connections.
2264pub async fn build_http_server(
2265    state: Arc<AuthorityState>,
2266    store: RocksDbStore,
2267    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2268    config: &NodeConfig,
2269    prometheus_registry: &Registry,
2270    _custom_runtime: Option<Handle>,
2271    software_version: &'static str,
2272) -> Result<Option<iota_http::ServerHandle>> {
2273    // Validators do not expose these APIs
2274    if config.consensus_config().is_some() {
2275        return Ok(None);
2276    }
2277
2278    let mut router = axum::Router::new();
2279
2280    let json_rpc_router = {
2281        let mut server = JsonRpcServerBuilder::new(
2282            env!("CARGO_PKG_VERSION"),
2283            prometheus_registry,
2284            config.policy_config.clone(),
2285            config.firewall_config.clone(),
2286        );
2287
2288        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2289
2290        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2291        server.register_module(ReadApi::new(
2292            state.clone(),
2293            kv_store.clone(),
2294            metrics.clone(),
2295        ))?;
2296        server.register_module(CoinReadApi::new(
2297            state.clone(),
2298            kv_store.clone(),
2299            metrics.clone(),
2300        )?)?;
2301
2302        // if run_with_range is enabled we want to prevent any transactions
2303        // run_with_range = None is normal operating conditions
2304        if config.run_with_range.is_none() {
2305            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2306        }
2307        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2308
2309        if let Some(transaction_orchestrator) = transaction_orchestrator {
2310            server.register_module(TransactionExecutionApi::new(
2311                state.clone(),
2312                transaction_orchestrator.clone(),
2313                metrics.clone(),
2314            ))?;
2315        }
2316
2317        // TODO: Init from chain if config is not set once `IotaNamesConfig::from_chain`
2318        // is implemented
2319        let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2320
2321        server.register_module(IndexerApi::new(
2322            state.clone(),
2323            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2324            kv_store,
2325            metrics,
2326            iota_names_config,
2327            config.indexer_max_subscriptions,
2328        ))?;
2329        server.register_module(MoveUtils::new(state.clone()))?;
2330
2331        let server_type = config.jsonrpc_server_type();
2332
2333        server.to_router(server_type).await?
2334    };
2335
2336    router = router.merge(json_rpc_router);
2337
2338    if config.enable_rest_api {
2339        let mut rest_service = iota_rest_api::RestService::new(
2340            Arc::new(RestReadStore::new(state.clone(), store)),
2341            software_version,
2342        );
2343
2344        if let Some(config) = config.rest.clone() {
2345            rest_service.with_config(config);
2346        }
2347
2348        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2349
2350        if let Some(transaction_orchestrator) = transaction_orchestrator {
2351            rest_service.with_executor(transaction_orchestrator.clone())
2352        }
2353
2354        router = router.merge(rest_service.into_router());
2355    }
2356
2357    // TODO: Remove this health check when experimental REST API becomes default
2358    // This is a copy of the health check in crates/iota-rest-api/src/health.rs
2359    router = router
2360        .route("/health", axum::routing::get(health_check_handler))
2361        .route_layer(axum::Extension(state));
2362
2363    let layers = ServiceBuilder::new()
2364        .map_request(|mut request: axum::http::Request<_>| {
2365            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2366                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2367                request.extensions_mut().insert(axum_connect_info);
2368            }
2369            request
2370        })
2371        .layer(axum::middleware::from_fn(server_timing_middleware));
2372
2373    router = router.layer(layers);
2374
2375    let handle = iota_http::Builder::new()
2376        .serve(&config.json_rpc_address, router)
2377        .map_err(|e| anyhow::anyhow!("{e}"))?;
2378    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2379
2380    Ok(Some(handle))
2381}
2382
2383#[derive(Debug, serde::Serialize, serde::Deserialize)]
2384pub struct Threshold {
2385    pub threshold_seconds: Option<u32>,
2386}
2387
2388async fn health_check_handler(
2389    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2390    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2391) -> impl axum::response::IntoResponse {
2392    if let Some(threshold_seconds) = threshold_seconds {
2393        // Attempt to get the latest checkpoint
2394        let summary = match state
2395            .get_checkpoint_store()
2396            .get_highest_executed_checkpoint()
2397        {
2398            Ok(Some(summary)) => summary,
2399            Ok(None) => {
2400                warn!("Highest executed checkpoint not found");
2401                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2402            }
2403            Err(err) => {
2404                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2405                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2406            }
2407        };
2408
2409        // Calculate the threshold time based on the provided threshold_seconds
2410        let latest_chain_time = summary.timestamp();
2411        let threshold =
2412            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2413
2414        // Check if the latest checkpoint is within the threshold
2415        if latest_chain_time < threshold {
2416            warn!(
2417                ?latest_chain_time,
2418                ?threshold,
2419                "failing health check due to checkpoint lag"
2420            );
2421            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2422        }
2423    }
2424    // if health endpoint is responding and no threshold is given, respond success
2425    (axum::http::StatusCode::OK, "up")
2426}
2427
2428#[cfg(not(test))]
2429fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2430    protocol_config.max_transactions_per_checkpoint() as usize
2431}
2432
2433#[cfg(test)]
2434fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2435    2
2436}