iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    net::SocketAddr,
12    path::PathBuf,
13    str::FromStr,
14    sync::{Arc, Weak},
15    time::Duration,
16};
17
18use anemo::Network;
19use anemo_tower::{
20    callback::CallbackLayer,
21    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
22};
23use anyhow::{Result, anyhow};
24use arc_swap::ArcSwap;
25use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
26use futures::{TryFutureExt, future::BoxFuture};
27pub use handle::IotaNodeHandle;
28use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
29use iota_common::debug_fatal;
30use iota_config::{
31    ConsensusConfig, NodeConfig,
32    node::{DBCheckpointConfig, RunWithRange},
33    node_config_metrics::NodeConfigMetrics,
34    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
35};
36use iota_core::{
37    authority::{
38        AuthorityState, AuthorityStore, CHAIN_IDENTIFIER, RandomnessRoundReceiver,
39        authority_per_epoch_store::AuthorityPerEpochStore,
40        authority_store_tables::{AuthorityPerpetualTables, AuthorityPerpetualTablesOptions},
41        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
42    },
43    authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
44    authority_client::NetworkAuthorityClient,
45    authority_server::{ValidatorService, ValidatorServiceMetrics},
46    checkpoints::{
47        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
48        SubmitCheckpointToConsensus,
49        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
50    },
51    connection_monitor::ConnectionMonitor,
52    consensus_adapter::{
53        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
54        ConsensusClient,
55    },
56    consensus_handler::ConsensusHandlerInitializer,
57    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
58    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
59    db_checkpoint_handler::DBCheckpointHandler,
60    epoch::{
61        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
62        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
63        reconfiguration::ReconfigurationInitiator,
64    },
65    execution_cache::build_execution_cache,
66    jsonrpc_index::IndexStore,
67    module_cache_metrics::ResolverMetrics,
68    overload_monitor::overload_monitor,
69    rest_index::RestIndexStore,
70    safe_client::SafeClientMetricsBase,
71    signature_verifier::SignatureVerifierMetrics,
72    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
73    storage::{RestReadStore, RocksDbStore},
74    traffic_controller::metrics::TrafficControllerMetrics,
75    transaction_orchestrator::TransactionOrchestrator,
76    validator_tx_finalizer::ValidatorTxFinalizer,
77};
78use iota_json_rpc::{
79    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
80    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
81    transaction_builder_api::TransactionBuilderApi,
82    transaction_execution_api::TransactionExecutionApi,
83};
84use iota_json_rpc_api::JsonRpcMetrics;
85use iota_macros::{fail_point, fail_point_async, replay_log};
86use iota_metrics::{
87    RegistryID, RegistryService,
88    hardware_metrics::register_hardware_metrics,
89    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
90    server_timing_middleware, spawn_monitored_task,
91};
92use iota_network::{
93    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
94};
95use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
96use iota_protocol_config::ProtocolConfig;
97use iota_rest_api::RestMetrics;
98use iota_snapshot::uploader::StateSnapshotUploader;
99use iota_storage::{
100    FileCompression, StorageFormat,
101    http_key_value_store::HttpKVStore,
102    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
103    key_value_store_metrics::KeyValueStoreMetrics,
104};
105use iota_types::{
106    base_types::{AuthorityName, ConciseableName, EpochId},
107    committee::Committee,
108    crypto::{KeypairTraits, RandomnessRound},
109    digests::ChainIdentifier,
110    error::{IotaError, IotaResult},
111    executable_transaction::VerifiedExecutableTransaction,
112    execution_config_utils::to_binary_config,
113    iota_system_state::{
114        IotaSystemState, IotaSystemStateTrait,
115        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
116    },
117    messages_consensus::{
118        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
119        check_total_jwk_size,
120    },
121    quorum_driver_types::QuorumDriverEffectsQueueResult,
122    supported_protocol_versions::SupportedProtocolVersions,
123    transaction::{Transaction, VerifiedCertificate},
124};
125use prometheus::Registry;
126#[cfg(msim)]
127pub use simulator::set_jwk_injector;
128#[cfg(msim)]
129use simulator::*;
130use tap::tap::TapFallible;
131use tokio::{
132    runtime::Handle,
133    sync::{Mutex, broadcast, mpsc, watch},
134    task::{JoinHandle, JoinSet},
135};
136use tower::ServiceBuilder;
137use tracing::{Instrument, debug, error, error_span, info, warn};
138use typed_store::{
139    DBMetrics,
140    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
141};
142
143use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
144
145pub mod admin;
146mod handle;
147pub mod metrics;
148
149pub struct ValidatorComponents {
150    validator_server_handle: SpawnOnce,
151    validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
152    validator_overload_monitor_handle: Option<JoinHandle<()>>,
153    consensus_manager: ConsensusManager,
154    consensus_store_pruner: ConsensusStorePruner,
155    consensus_adapter: Arc<ConsensusAdapter>,
156    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
157    checkpoint_service_tasks: JoinSet<()>,
158    checkpoint_metrics: Arc<CheckpointMetrics>,
159    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
160    validator_registry_id: RegistryID,
161}
162
163#[cfg(msim)]
164mod simulator {
165    use std::sync::atomic::AtomicBool;
166
167    use super::*;
168
169    pub(super) struct SimState {
170        pub sim_node: iota_simulator::runtime::NodeHandle,
171        pub sim_safe_mode_expected: AtomicBool,
172        _leak_detector: iota_simulator::NodeLeakDetector,
173    }
174
175    impl Default for SimState {
176        fn default() -> Self {
177            Self {
178                sim_node: iota_simulator::runtime::NodeHandle::current(),
179                sim_safe_mode_expected: AtomicBool::new(false),
180                _leak_detector: iota_simulator::NodeLeakDetector::new(),
181            }
182        }
183    }
184
185    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
186        + Send
187        + Sync
188        + 'static;
189
190    fn default_fetch_jwks(
191        _authority: AuthorityName,
192        _provider: &OIDCProvider,
193    ) -> IotaResult<Vec<(JwkId, JWK)>> {
194        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
195        // Just load a default Twitch jwk for testing.
196        parse_jwks(
197            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
198            &OIDCProvider::Twitch,
199        )
200        .map_err(|_| IotaError::JWKRetrieval)
201    }
202
203    thread_local! {
204        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
205    }
206
207    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
208        JWK_INJECTOR.with(|injector| injector.borrow().clone())
209    }
210
211    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
212        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
213    }
214}
215
216pub struct IotaNode {
217    config: NodeConfig,
218    validator_components: Mutex<Option<ValidatorComponents>>,
219    /// The http server responsible for serving JSON-RPC as well as the
220    /// experimental rest service
221    _http_server: Option<tokio::task::JoinHandle<()>>,
222    state: Arc<AuthorityState>,
223    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
224    registry_service: RegistryService,
225    metrics: Arc<IotaNodeMetrics>,
226
227    _discovery: discovery::Handle,
228    state_sync_handle: state_sync::Handle,
229    randomness_handle: randomness::Handle,
230    checkpoint_store: Arc<CheckpointStore>,
231    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
232    connection_monitor_status: Arc<ConnectionMonitorStatus>,
233
234    /// Broadcast channel to send the starting system state for the next epoch.
235    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
236
237    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
238    /// peers.
239    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
240
241    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
242
243    #[cfg(msim)]
244    sim_state: SimState,
245
246    _state_archive_handle: Option<broadcast::Sender<()>>,
247
248    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
249    // Channel to allow signaling upstream to shutdown iota-node
250    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
251
252    /// AuthorityAggregator of the network, created at start and beginning of
253    /// each epoch. Use ArcSwap so that we could mutate it without taking
254    /// mut reference.
255    // TODO: Eventually we can make this auth aggregator a shared reference so that this
256    // update will automatically propagate to other uses.
257    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
258}
259
260impl fmt::Debug for IotaNode {
261    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262        f.debug_struct("IotaNode")
263            .field("name", &self.state.name.concise())
264            .finish()
265    }
266}
267
268static MAX_JWK_KEYS_PER_FETCH: usize = 100;
269
270impl IotaNode {
271    pub async fn start(
272        config: NodeConfig,
273        registry_service: RegistryService,
274        custom_rpc_runtime: Option<Handle>,
275    ) -> Result<Arc<IotaNode>> {
276        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
277    }
278
279    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
280    /// configuration.
281    /// This function ensures continuous fetching, validation, and submission of
282    /// JWKs, maintaining up-to-date keys for the specified providers.
283    fn start_jwk_updater(
284        config: &NodeConfig,
285        metrics: Arc<IotaNodeMetrics>,
286        authority: AuthorityName,
287        epoch_store: Arc<AuthorityPerEpochStore>,
288        consensus_adapter: Arc<ConsensusAdapter>,
289    ) {
290        let epoch = epoch_store.epoch();
291
292        let supported_providers = config
293            .zklogin_oauth_providers
294            .get(&epoch_store.get_chain_identifier().chain())
295            .unwrap_or(&BTreeSet::new())
296            .iter()
297            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
298            .collect::<Vec<_>>();
299
300        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
301
302        info!(
303            ?fetch_interval,
304            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
305        );
306
307        fn validate_jwk(
308            metrics: &Arc<IotaNodeMetrics>,
309            provider: &OIDCProvider,
310            id: &JwkId,
311            jwk: &JWK,
312        ) -> bool {
313            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
314                warn!(
315                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
316                    id.iss, provider
317                );
318                metrics
319                    .invalid_jwks
320                    .with_label_values(&[&provider.to_string()])
321                    .inc();
322                return false;
323            };
324
325            if iss_provider != *provider {
326                warn!(
327                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
328                    id.iss, provider, iss_provider
329                );
330                metrics
331                    .invalid_jwks
332                    .with_label_values(&[&provider.to_string()])
333                    .inc();
334                return false;
335            }
336
337            if !check_total_jwk_size(id, jwk) {
338                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
339                metrics
340                    .invalid_jwks
341                    .with_label_values(&[&provider.to_string()])
342                    .inc();
343                return false;
344            }
345
346            true
347        }
348
349        // metrics is:
350        //  pub struct IotaNodeMetrics {
351        //      pub jwk_requests: IntCounterVec,
352        //      pub jwk_request_errors: IntCounterVec,
353        //      pub total_jwks: IntCounterVec,
354        //      pub unique_jwks: IntCounterVec,
355        //  }
356
357        for p in supported_providers.into_iter() {
358            let provider_str = p.to_string();
359            let epoch_store = epoch_store.clone();
360            let consensus_adapter = consensus_adapter.clone();
361            let metrics = metrics.clone();
362            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
363                async move {
364                    // note: restart-safe de-duplication happens after consensus, this is
365                    // just best-effort to reduce unneeded submissions.
366                    let mut seen = HashSet::new();
367                    loop {
368                        info!("fetching JWK for provider {:?}", p);
369                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
370                        match Self::fetch_jwks(authority, &p).await {
371                            Err(e) => {
372                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
373                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
374                                // Retry in 30 seconds
375                                tokio::time::sleep(Duration::from_secs(30)).await;
376                                continue;
377                            }
378                            Ok(mut keys) => {
379                                metrics.total_jwks
380                                    .with_label_values(&[&provider_str])
381                                    .inc_by(keys.len() as u64);
382
383                                keys.retain(|(id, jwk)| {
384                                    validate_jwk(&metrics, &p, id, jwk) &&
385                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
386                                    seen.insert((id.clone(), jwk.clone()))
387                                });
388
389                                metrics.unique_jwks
390                                    .with_label_values(&[&provider_str])
391                                    .inc_by(keys.len() as u64);
392
393                                // prevent oauth providers from sending too many keys,
394                                // inadvertently or otherwise
395                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
396                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
397                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
398                                }
399
400                                for (id, jwk) in keys.into_iter() {
401                                    info!("Submitting JWK to consensus: {:?}", id);
402
403                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
404                                    consensus_adapter.submit(txn, None, &epoch_store)
405                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
406                                        .ok();
407                                }
408                            }
409                        }
410                        tokio::time::sleep(fetch_interval).await;
411                    }
412                }
413                .instrument(error_span!("jwk_updater_task", epoch)),
414            ));
415        }
416    }
417
418    pub async fn start_async(
419        config: NodeConfig,
420        registry_service: RegistryService,
421        custom_rpc_runtime: Option<Handle>,
422        software_version: &'static str,
423    ) -> Result<Arc<IotaNode>> {
424        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
425        let mut config = config.clone();
426        if config.supported_protocol_versions.is_none() {
427            info!(
428                "populating config.supported_protocol_versions with default {:?}",
429                SupportedProtocolVersions::SYSTEM_DEFAULT
430            );
431            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
432        }
433
434        let run_with_range = config.run_with_range;
435        let is_validator = config.consensus_config().is_some();
436        let is_full_node = !is_validator;
437        let prometheus_registry = registry_service.default_registry();
438
439        info!(node =? config.authority_public_key(),
440            "Initializing iota-node listening on {}", config.network_address
441        );
442
443        let genesis = config.genesis()?.clone();
444
445        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
446        // It's ok if the value is already set due to data races.
447        let _ = CHAIN_IDENTIFIER.set(chain_identifier);
448        info!("IOTA chain identifier: {chain_identifier}");
449
450        // Check and set the db_corrupted flag
451        let db_corrupted_path = &config.db_path().join("status");
452        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
453            panic!("Failed to check database corruption: {err}");
454        }
455
456        // Initialize metrics to track db usage before creating any stores
457        DBMetrics::init(&prometheus_registry);
458
459        // Initialize IOTA metrics.
460        iota_metrics::init_metrics(&prometheus_registry);
461        // Unsupported (because of the use of static variable) and unnecessary in
462        // simtests.
463        #[cfg(not(msim))]
464        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
465
466        // Register hardware metrics.
467        register_hardware_metrics(&registry_service, &config.db_path)
468            .expect("Failed registering hardware metrics");
469        // Register uptime metric
470        prometheus_registry
471            .register(iota_metrics::uptime_metric(
472                if is_validator {
473                    "validator"
474                } else {
475                    "fullnode"
476                },
477                software_version,
478                &chain_identifier.to_string(),
479            ))
480            .expect("Failed registering uptime metric");
481
482        // If genesis come with some migration data then load them into memory from the
483        // file path specified in config.
484        let migration_tx_data = if genesis.contains_migrations() {
485            // Here the load already verifies that the content of the migration blob is
486            // valid in respect to the content found in genesis
487            Some(config.load_migration_tx_data()?)
488        } else {
489            None
490        };
491
492        let secret = Arc::pin(config.authority_key_pair().copy());
493        let genesis_committee = genesis.committee()?;
494        let committee_store = Arc::new(CommitteeStore::new(
495            config.db_path().join("epochs"),
496            &genesis_committee,
497            None,
498        ));
499
500        // By default, only enable write stall on validators for perpetual db.
501        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
502        let perpetual_tables_options = AuthorityPerpetualTablesOptions { enable_write_stall };
503        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
504            &config.db_path().join("store"),
505            Some(perpetual_tables_options),
506        ));
507        let is_genesis = perpetual_tables
508            .database_is_empty()
509            .expect("Database read should not fail at init.");
510        let store = AuthorityStore::open(
511            perpetual_tables,
512            &genesis,
513            &config,
514            &prometheus_registry,
515            migration_tx_data.as_ref(),
516        )
517        .await?;
518
519        let cur_epoch = store.get_recovery_epoch_at_restart()?;
520        let committee = committee_store
521            .get_committee(&cur_epoch)?
522            .expect("Committee of the current epoch must exist");
523        let epoch_start_configuration = store
524            .get_epoch_start_configuration()?
525            .expect("EpochStartConfiguration of the current epoch must exist");
526        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
527        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
528
529        let cache_traits =
530            build_execution_cache(&epoch_start_configuration, &prometheus_registry, &store);
531
532        let auth_agg = {
533            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
534            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
535            Arc::new(ArcSwap::new(Arc::new(
536                AuthorityAggregator::new_from_epoch_start_state(
537                    epoch_start_configuration.epoch_start_state(),
538                    &committee_store,
539                    safe_client_metrics_base,
540                    auth_agg_metrics,
541                ),
542            )))
543        };
544
545        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
546        let epoch_store = AuthorityPerEpochStore::new(
547            config.authority_public_key(),
548            committee.clone(),
549            &config.db_path().join("store"),
550            Some(epoch_options.options),
551            EpochMetrics::new(&registry_service.default_registry()),
552            epoch_start_configuration,
553            cache_traits.backing_package_store.clone(),
554            cache_traits.object_store.clone(),
555            cache_metrics,
556            signature_verifier_metrics,
557            &config.expensive_safety_check_config,
558            ChainIdentifier::from(*genesis.checkpoint().digest()),
559        );
560
561        info!("created epoch store");
562
563        replay_log!(
564            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
565            epoch_store.epoch(),
566            epoch_store.protocol_config()
567        );
568
569        // the database is empty at genesis time
570        if is_genesis {
571            info!("checking IOTA conservation at genesis");
572            // When we are opening the db table, the only time when it's safe to
573            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
574            // an epoch and the IOTA conservation check will fail. This also initialize
575            // the expected_network_iota_amount table.
576            cache_traits
577                .reconfig_api
578                .expensive_check_iota_conservation(&epoch_store, None)
579                .expect("IOTA conservation check cannot fail at genesis");
580        }
581
582        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
583        let default_buffer_stake = epoch_store
584            .protocol_config()
585            .buffer_stake_for_protocol_upgrade_bps();
586        if effective_buffer_stake != default_buffer_stake {
587            warn!(
588                ?effective_buffer_stake,
589                ?default_buffer_stake,
590                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
591            );
592        }
593
594        info!("creating checkpoint store");
595
596        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
597        checkpoint_store.insert_genesis_checkpoint(
598            genesis.checkpoint(),
599            genesis.checkpoint_contents().clone(),
600            &epoch_store,
601        );
602
603        // Database has everything from genesis, set corrupted key to 0
604        unmark_db_corruption(db_corrupted_path)?;
605
606        info!("creating state sync store");
607        let state_sync_store = RocksDbStore::new(
608            cache_traits.clone(),
609            committee_store.clone(),
610            checkpoint_store.clone(),
611        );
612
613        let index_store = if is_full_node && config.enable_index_processing {
614            info!("creating index store");
615            Some(Arc::new(IndexStore::new(
616                config.db_path().join("indexes"),
617                &prometheus_registry,
618                epoch_store
619                    .protocol_config()
620                    .max_move_identifier_len_as_option(),
621                config.remove_deprecated_tables,
622            )))
623        } else {
624            None
625        };
626
627        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
628        {
629            Some(Arc::new(RestIndexStore::new(
630                config.db_path().join("rest_index"),
631                &store,
632                &checkpoint_store,
633                &epoch_store,
634                &cache_traits.backing_package_store,
635            )))
636        } else {
637            None
638        };
639
640        info!("creating archive reader");
641        // Create network
642        // TODO only configure validators as seed/preferred peers for validators and not
643        // for fullnodes once we've had a chance to re-work fullnode
644        // configuration generation.
645        let archive_readers =
646            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
647        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
648        let (randomness_tx, randomness_rx) = mpsc::channel(
649            config
650                .p2p_config
651                .randomness
652                .clone()
653                .unwrap_or_default()
654                .mailbox_capacity(),
655        );
656        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
657            Self::create_p2p_network(
658                &config,
659                state_sync_store.clone(),
660                chain_identifier,
661                trusted_peer_change_rx,
662                archive_readers.clone(),
663                randomness_tx,
664                &prometheus_registry,
665            )?;
666
667        // We must explicitly send this instead of relying on the initial value to
668        // trigger watch value change, so that state-sync is able to process it.
669        send_trusted_peer_change(
670            &config,
671            &trusted_peer_change_tx,
672            epoch_store.epoch_start_state(),
673        );
674
675        info!("start state archival");
676        // Start archiving local state to remote store
677        let state_archive_handle =
678            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
679                .await?;
680
681        info!("start snapshot upload");
682        // Start uploading state snapshot to remote store
683        let state_snapshot_handle =
684            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
685
686        // Start uploading db checkpoints to remote store
687        info!("start db checkpoint");
688        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
689            &config,
690            &prometheus_registry,
691            state_snapshot_handle.is_some(),
692        )?;
693
694        let mut genesis_objects = genesis.objects().to_vec();
695        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
696            genesis_objects.extend(migration_tx_data.get_objects());
697        }
698
699        let authority_name = config.authority_public_key();
700        let validator_tx_finalizer =
701            config
702                .enable_validator_tx_finalizer
703                .then_some(Arc::new(ValidatorTxFinalizer::new(
704                    auth_agg.clone(),
705                    authority_name,
706                    &prometheus_registry,
707                )));
708
709        info!("create authority state");
710        let state = AuthorityState::new(
711            authority_name,
712            secret,
713            config.supported_protocol_versions.unwrap(),
714            store.clone(),
715            cache_traits.clone(),
716            epoch_store.clone(),
717            committee_store.clone(),
718            index_store.clone(),
719            rest_index,
720            checkpoint_store.clone(),
721            &prometheus_registry,
722            &genesis_objects,
723            &db_checkpoint_config,
724            config.clone(),
725            config.indirect_objects_threshold,
726            archive_readers,
727            validator_tx_finalizer,
728        )
729        .await;
730
731        // ensure genesis and migration txs were executed
732        if epoch_store.epoch() == 0 {
733            let genesis_tx = &genesis.transaction();
734            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
735            // Execute genesis transaction
736            Self::execute_transaction_immediately_at_zero_epoch(
737                &state,
738                &epoch_store,
739                genesis_tx,
740                span,
741            )
742            .await;
743
744            // Execute migration transactions if present
745            if let Some(migration_tx_data) = migration_tx_data {
746                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
747                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
748                    Self::execute_transaction_immediately_at_zero_epoch(
749                        &state,
750                        &epoch_store,
751                        tx,
752                        span,
753                    )
754                    .await;
755                }
756            }
757        }
758
759        checkpoint_store
760            .reexecute_local_checkpoints(&state, &epoch_store)
761            .await;
762
763        // Start the loop that receives new randomness and generates transactions for
764        // it.
765        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
766
767        if config
768            .expensive_safety_check_config
769            .enable_secondary_index_checks()
770        {
771            if let Some(indexes) = state.indexes.clone() {
772                iota_core::verify_indexes::verify_indexes(
773                    state.get_accumulator_store().as_ref(),
774                    indexes,
775                )
776                .expect("secondary indexes are inconsistent");
777            }
778        }
779
780        let (end_of_epoch_channel, end_of_epoch_receiver) =
781            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
782
783        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
784            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
785                auth_agg.load_full(),
786                state.clone(),
787                end_of_epoch_receiver,
788                &config.db_path(),
789                &prometheus_registry,
790            )))
791        } else {
792            None
793        };
794
795        let http_server = build_http_server(
796            state.clone(),
797            state_sync_store,
798            &transaction_orchestrator.clone(),
799            &config,
800            &prometheus_registry,
801            custom_rpc_runtime,
802            software_version,
803        )
804        .await?;
805
806        let accumulator = Arc::new(StateAccumulator::new(
807            cache_traits.accumulator_store.clone(),
808            StateAccumulatorMetrics::new(&prometheus_registry),
809        ));
810
811        let authority_names_to_peer_ids = epoch_store
812            .epoch_start_state()
813            .get_authority_names_to_peer_ids();
814
815        let network_connection_metrics =
816            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
817
818        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
819
820        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
821            p2p_network.downgrade(),
822            network_connection_metrics,
823            HashMap::new(),
824            None,
825        );
826
827        let connection_monitor_status = ConnectionMonitorStatus {
828            connection_statuses,
829            authority_names_to_peer_ids,
830        };
831
832        let connection_monitor_status = Arc::new(connection_monitor_status);
833        let iota_node_metrics =
834            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
835
836        let validator_components = if state.is_validator(&epoch_store) {
837            let (components, _) = futures::join!(
838                Self::construct_validator_components(
839                    config.clone(),
840                    state.clone(),
841                    committee,
842                    epoch_store.clone(),
843                    checkpoint_store.clone(),
844                    state_sync_handle.clone(),
845                    randomness_handle.clone(),
846                    Arc::downgrade(&accumulator),
847                    connection_monitor_status.clone(),
848                    &registry_service,
849                    iota_node_metrics.clone(),
850                ),
851                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
852            );
853            let mut components = components?;
854
855            components.consensus_adapter.submit_recovered(&epoch_store);
856
857            // Start the gRPC server
858            components.validator_server_handle = components.validator_server_handle.start();
859
860            Some(components)
861        } else {
862            None
863        };
864
865        // setup shutdown channel
866        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
867
868        let node = Self {
869            config,
870            validator_components: Mutex::new(validator_components),
871            _http_server: http_server,
872            state,
873            transaction_orchestrator,
874            registry_service,
875            metrics: iota_node_metrics,
876
877            _discovery: discovery_handle,
878            state_sync_handle,
879            randomness_handle,
880            checkpoint_store,
881            accumulator: Mutex::new(Some(accumulator)),
882            end_of_epoch_channel,
883            connection_monitor_status,
884            trusted_peer_change_tx,
885
886            _db_checkpoint_handle: db_checkpoint_handle,
887
888            #[cfg(msim)]
889            sim_state: Default::default(),
890
891            _state_archive_handle: state_archive_handle,
892            _state_snapshot_uploader_handle: state_snapshot_handle,
893            shutdown_channel_tx: shutdown_channel,
894
895            auth_agg,
896        };
897
898        info!("IotaNode started!");
899        let node = Arc::new(node);
900        let node_copy = node.clone();
901        spawn_monitored_task!(async move {
902            let result = Self::monitor_reconfiguration(node_copy).await;
903            if let Err(error) = result {
904                warn!("Reconfiguration finished with error {:?}", error);
905            }
906        });
907
908        Ok(node)
909    }
910
911    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
912        self.end_of_epoch_channel.subscribe()
913    }
914
915    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
916        self.shutdown_channel_tx.subscribe()
917    }
918
919    pub fn current_epoch_for_testing(&self) -> EpochId {
920        self.state.current_epoch_for_testing()
921    }
922
923    pub fn db_checkpoint_path(&self) -> PathBuf {
924        self.config.db_checkpoint_path()
925    }
926
927    // Init reconfig process by starting to reject user certs
928    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
929        info!("close_epoch (current epoch = {})", epoch_store.epoch());
930        self.validator_components
931            .lock()
932            .await
933            .as_ref()
934            .ok_or_else(|| IotaError::from("Node is not a validator"))?
935            .consensus_adapter
936            .close_epoch(epoch_store);
937        Ok(())
938    }
939
940    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
941        self.state
942            .clear_override_protocol_upgrade_buffer_stake(epoch)
943    }
944
945    pub fn set_override_protocol_upgrade_buffer_stake(
946        &self,
947        epoch: EpochId,
948        buffer_stake_bps: u64,
949    ) -> IotaResult {
950        self.state
951            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
952    }
953
954    // Testing-only API to start epoch close process.
955    // For production code, please use the non-testing version.
956    pub async fn close_epoch_for_testing(&self) -> IotaResult {
957        let epoch_store = self.state.epoch_store_for_testing();
958        self.close_epoch(&epoch_store).await
959    }
960
961    async fn start_state_archival(
962        config: &NodeConfig,
963        prometheus_registry: &Registry,
964        state_sync_store: RocksDbStore,
965    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
966        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
967            let local_store_config = ObjectStoreConfig {
968                object_store: Some(ObjectStoreType::File),
969                directory: Some(config.archive_path()),
970                ..Default::default()
971            };
972            let archive_writer = ArchiveWriter::new(
973                local_store_config,
974                remote_store_config.clone(),
975                FileCompression::Zstd,
976                StorageFormat::Blob,
977                Duration::from_secs(600),
978                256 * 1024 * 1024,
979                prometheus_registry,
980            )
981            .await?;
982            Ok(Some(archive_writer.start(state_sync_store).await?))
983        } else {
984            Ok(None)
985        }
986    }
987
988    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
989    /// is set.
990    fn start_state_snapshot(
991        config: &NodeConfig,
992        prometheus_registry: &Registry,
993        checkpoint_store: Arc<CheckpointStore>,
994    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
996            let snapshot_uploader = StateSnapshotUploader::new(
997                &config.db_checkpoint_path(),
998                &config.snapshot_path(),
999                remote_store_config.clone(),
1000                60,
1001                prometheus_registry,
1002                checkpoint_store,
1003            )?;
1004            Ok(Some(snapshot_uploader.start()))
1005        } else {
1006            Ok(None)
1007        }
1008    }
1009
1010    fn start_db_checkpoint(
1011        config: &NodeConfig,
1012        prometheus_registry: &Registry,
1013        state_snapshot_enabled: bool,
1014    ) -> Result<(
1015        DBCheckpointConfig,
1016        Option<tokio::sync::broadcast::Sender<()>>,
1017    )> {
1018        let checkpoint_path = Some(
1019            config
1020                .db_checkpoint_config
1021                .checkpoint_path
1022                .clone()
1023                .unwrap_or_else(|| config.db_checkpoint_path()),
1024        );
1025        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1026            DBCheckpointConfig {
1027                checkpoint_path,
1028                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1029                    true
1030                } else {
1031                    config
1032                        .db_checkpoint_config
1033                        .perform_db_checkpoints_at_epoch_end
1034                },
1035                ..config.db_checkpoint_config.clone()
1036            }
1037        } else {
1038            config.db_checkpoint_config.clone()
1039        };
1040
1041        match (
1042            db_checkpoint_config.object_store_config.as_ref(),
1043            state_snapshot_enabled,
1044        ) {
1045            // If db checkpoint config object store not specified but
1046            // state snapshot object store is specified, create handler
1047            // anyway for marking db checkpoints as completed so that they
1048            // can be uploaded as state snapshots.
1049            (None, false) => Ok((db_checkpoint_config, None)),
1050            (_, _) => {
1051                let handler = DBCheckpointHandler::new(
1052                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1053                    db_checkpoint_config.object_store_config.as_ref(),
1054                    60,
1055                    db_checkpoint_config
1056                        .prune_and_compact_before_upload
1057                        .unwrap_or(true),
1058                    config.indirect_objects_threshold,
1059                    config.authority_store_pruning_config.clone(),
1060                    prometheus_registry,
1061                    state_snapshot_enabled,
1062                )?;
1063                Ok((
1064                    db_checkpoint_config,
1065                    Some(DBCheckpointHandler::start(handler)),
1066                ))
1067            }
1068        }
1069    }
1070
1071    fn create_p2p_network(
1072        config: &NodeConfig,
1073        state_sync_store: RocksDbStore,
1074        chain_identifier: ChainIdentifier,
1075        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1076        archive_readers: ArchiveReaderBalancer,
1077        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1078        prometheus_registry: &Registry,
1079    ) -> Result<(
1080        Network,
1081        discovery::Handle,
1082        state_sync::Handle,
1083        randomness::Handle,
1084    )> {
1085        let (state_sync, state_sync_server) = state_sync::Builder::new()
1086            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1087            .store(state_sync_store)
1088            .archive_readers(archive_readers)
1089            .with_metrics(prometheus_registry)
1090            .build();
1091
1092        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1093            .config(config.p2p_config.clone())
1094            .build();
1095
1096        let (randomness, randomness_router) =
1097            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1098                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1099                .with_metrics(prometheus_registry)
1100                .build();
1101
1102        let p2p_network = {
1103            let routes = anemo::Router::new()
1104                .add_rpc_service(discovery_server)
1105                .add_rpc_service(state_sync_server);
1106            let routes = routes.merge(randomness_router);
1107
1108            let inbound_network_metrics =
1109                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1110            let outbound_network_metrics =
1111                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1112
1113            let service = ServiceBuilder::new()
1114                .layer(
1115                    TraceLayer::new_for_server_errors()
1116                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1117                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1118                )
1119                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1120                    Arc::new(inbound_network_metrics),
1121                    config.p2p_config.excessive_message_size(),
1122                )))
1123                .service(routes);
1124
1125            let outbound_layer = ServiceBuilder::new()
1126                .layer(
1127                    TraceLayer::new_for_client_and_server_errors()
1128                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1129                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1130                )
1131                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1132                    Arc::new(outbound_network_metrics),
1133                    config.p2p_config.excessive_message_size(),
1134                )))
1135                .into_inner();
1136
1137            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1138            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1139            // many staking events in the epoch change txn.
1140            anemo_config.max_frame_size = Some(1 << 30);
1141
1142            // Set a higher default value for socket send/receive buffers if not already
1143            // configured.
1144            let mut quic_config = anemo_config.quic.unwrap_or_default();
1145            if quic_config.socket_send_buffer_size.is_none() {
1146                quic_config.socket_send_buffer_size = Some(20 << 20);
1147            }
1148            if quic_config.socket_receive_buffer_size.is_none() {
1149                quic_config.socket_receive_buffer_size = Some(20 << 20);
1150            }
1151            quic_config.allow_failed_socket_buffer_size_setting = true;
1152
1153            // Set high-performance defaults for quinn transport.
1154            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1155            if quic_config.max_concurrent_bidi_streams.is_none() {
1156                quic_config.max_concurrent_bidi_streams = Some(500);
1157            }
1158            if quic_config.max_concurrent_uni_streams.is_none() {
1159                quic_config.max_concurrent_uni_streams = Some(500);
1160            }
1161            if quic_config.stream_receive_window.is_none() {
1162                quic_config.stream_receive_window = Some(100 << 20);
1163            }
1164            if quic_config.receive_window.is_none() {
1165                quic_config.receive_window = Some(200 << 20);
1166            }
1167            if quic_config.send_window.is_none() {
1168                quic_config.send_window = Some(200 << 20);
1169            }
1170            if quic_config.crypto_buffer_size.is_none() {
1171                quic_config.crypto_buffer_size = Some(1 << 20);
1172            }
1173            if quic_config.max_idle_timeout_ms.is_none() {
1174                quic_config.max_idle_timeout_ms = Some(30_000);
1175            }
1176            if quic_config.keep_alive_interval_ms.is_none() {
1177                quic_config.keep_alive_interval_ms = Some(5_000);
1178            }
1179            anemo_config.quic = Some(quic_config);
1180
1181            let server_name = format!("iota-{chain_identifier}");
1182            let network = Network::bind(config.p2p_config.listen_address)
1183                .server_name(&server_name)
1184                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1185                .config(anemo_config)
1186                .outbound_request_layer(outbound_layer)
1187                .start(service)?;
1188            info!(
1189                server_name = server_name,
1190                "P2p network started on {}",
1191                network.local_addr()
1192            );
1193
1194            network
1195        };
1196
1197        let discovery_handle =
1198            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1199        let state_sync_handle = state_sync.start(p2p_network.clone());
1200        let randomness_handle = randomness.start(p2p_network.clone());
1201
1202        Ok((
1203            p2p_network,
1204            discovery_handle,
1205            state_sync_handle,
1206            randomness_handle,
1207        ))
1208    }
1209
1210    /// Asynchronously constructs and initializes the components necessary for
1211    /// the validator node.
1212    async fn construct_validator_components(
1213        config: NodeConfig,
1214        state: Arc<AuthorityState>,
1215        committee: Arc<Committee>,
1216        epoch_store: Arc<AuthorityPerEpochStore>,
1217        checkpoint_store: Arc<CheckpointStore>,
1218        state_sync_handle: state_sync::Handle,
1219        randomness_handle: randomness::Handle,
1220        accumulator: Weak<StateAccumulator>,
1221        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1222        registry_service: &RegistryService,
1223        iota_node_metrics: Arc<IotaNodeMetrics>,
1224    ) -> Result<ValidatorComponents> {
1225        let mut config_clone = config.clone();
1226        let consensus_config = config_clone
1227            .consensus_config
1228            .as_mut()
1229            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1230        let validator_registry = Registry::new();
1231        let validator_registry_id = registry_service.add(validator_registry.clone());
1232
1233        let client = Arc::new(UpdatableConsensusClient::new());
1234        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1235            &committee,
1236            consensus_config,
1237            state.name,
1238            connection_monitor_status.clone(),
1239            &validator_registry,
1240            client.clone(),
1241        ));
1242        let consensus_manager = ConsensusManager::new(
1243            &config,
1244            consensus_config,
1245            registry_service,
1246            &validator_registry,
1247            client,
1248        );
1249
1250        // This only gets started up once, not on every epoch. (Make call to remove
1251        // every epoch.)
1252        let consensus_store_pruner = ConsensusStorePruner::new(
1253            consensus_manager.get_storage_base_path(),
1254            consensus_config.db_retention_epochs(),
1255            consensus_config.db_pruner_period(),
1256            &validator_registry,
1257        );
1258
1259        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1260        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1261
1262        let (validator_server_handle, validator_server_cancel_handle) =
1263            Self::start_grpc_validator_service(
1264                &config,
1265                state.clone(),
1266                consensus_adapter.clone(),
1267                &validator_registry,
1268            )
1269            .await?;
1270
1271        // Starts an overload monitor that monitors the execution of the authority.
1272        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1273        let validator_overload_monitor_handle = if config
1274            .authority_overload_config
1275            .max_load_shedding_percentage
1276            > 0
1277        {
1278            let authority_state = Arc::downgrade(&state);
1279            let overload_config = config.authority_overload_config.clone();
1280            fail_point!("starting_overload_monitor");
1281            Some(spawn_monitored_task!(overload_monitor(
1282                authority_state,
1283                overload_config,
1284            )))
1285        } else {
1286            None
1287        };
1288
1289        Self::start_epoch_specific_validator_components(
1290            &config,
1291            state.clone(),
1292            consensus_adapter,
1293            checkpoint_store,
1294            epoch_store,
1295            state_sync_handle,
1296            randomness_handle,
1297            consensus_manager,
1298            consensus_store_pruner,
1299            accumulator,
1300            validator_server_handle,
1301            validator_server_cancel_handle,
1302            validator_overload_monitor_handle,
1303            checkpoint_metrics,
1304            iota_node_metrics,
1305            iota_tx_validator_metrics,
1306            validator_registry_id,
1307        )
1308        .await
1309    }
1310
1311    /// Initializes and starts components specific to the current
1312    /// epoch for the validator node.
1313    async fn start_epoch_specific_validator_components(
1314        config: &NodeConfig,
1315        state: Arc<AuthorityState>,
1316        consensus_adapter: Arc<ConsensusAdapter>,
1317        checkpoint_store: Arc<CheckpointStore>,
1318        epoch_store: Arc<AuthorityPerEpochStore>,
1319        state_sync_handle: state_sync::Handle,
1320        randomness_handle: randomness::Handle,
1321        consensus_manager: ConsensusManager,
1322        consensus_store_pruner: ConsensusStorePruner,
1323        accumulator: Weak<StateAccumulator>,
1324        validator_server_handle: SpawnOnce,
1325        validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
1326        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1327        checkpoint_metrics: Arc<CheckpointMetrics>,
1328        iota_node_metrics: Arc<IotaNodeMetrics>,
1329        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1330        validator_registry_id: RegistryID,
1331    ) -> Result<ValidatorComponents> {
1332        let checkpoint_service = Self::build_checkpoint_service(
1333            config,
1334            consensus_adapter.clone(),
1335            checkpoint_store,
1336            epoch_store.clone(),
1337            state.clone(),
1338            state_sync_handle,
1339            accumulator,
1340            checkpoint_metrics.clone(),
1341        );
1342
1343        // create a new map that gets injected into both the consensus handler and the
1344        // consensus adapter the consensus handler will write values forwarded
1345        // from consensus, and the consensus adapter will read the values to
1346        // make decisions about which validator submits a transaction to consensus
1347        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1348
1349        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1350
1351        let randomness_manager = RandomnessManager::try_new(
1352            Arc::downgrade(&epoch_store),
1353            Box::new(consensus_adapter.clone()),
1354            randomness_handle,
1355            config.authority_key_pair(),
1356        )
1357        .await;
1358        if let Some(randomness_manager) = randomness_manager {
1359            epoch_store
1360                .set_randomness_manager(randomness_manager)
1361                .await?;
1362        }
1363
1364        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1365            state.clone(),
1366            checkpoint_service.clone(),
1367            epoch_store.clone(),
1368            low_scoring_authorities,
1369        );
1370
1371        consensus_manager
1372            .start(
1373                config,
1374                epoch_store.clone(),
1375                consensus_handler_initializer,
1376                IotaTxValidator::new(
1377                    epoch_store.clone(),
1378                    checkpoint_service.clone(),
1379                    state.transaction_manager().clone(),
1380                    iota_tx_validator_metrics.clone(),
1381                ),
1382            )
1383            .await;
1384
1385        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1386
1387        if epoch_store.authenticator_state_enabled() {
1388            Self::start_jwk_updater(
1389                config,
1390                iota_node_metrics,
1391                state.name,
1392                epoch_store.clone(),
1393                consensus_adapter.clone(),
1394            );
1395        }
1396
1397        Ok(ValidatorComponents {
1398            validator_server_handle,
1399            validator_server_cancel_handle,
1400            validator_overload_monitor_handle,
1401            consensus_manager,
1402            consensus_store_pruner,
1403            consensus_adapter,
1404            checkpoint_service_tasks,
1405            checkpoint_metrics,
1406            iota_tx_validator_metrics,
1407            validator_registry_id,
1408        })
1409    }
1410
1411    /// Starts the checkpoint service for the validator node, initializing
1412    /// necessary components and settings.
1413    /// The function ensures proper initialization of the checkpoint service,
1414    /// preparing it to handle checkpoint creation and submission to consensus,
1415    /// while also setting up the necessary monitoring and synchronization
1416    /// mechanisms.
1417    fn build_checkpoint_service(
1418        config: &NodeConfig,
1419        consensus_adapter: Arc<ConsensusAdapter>,
1420        checkpoint_store: Arc<CheckpointStore>,
1421        epoch_store: Arc<AuthorityPerEpochStore>,
1422        state: Arc<AuthorityState>,
1423        state_sync_handle: state_sync::Handle,
1424        accumulator: Weak<StateAccumulator>,
1425        checkpoint_metrics: Arc<CheckpointMetrics>,
1426    ) -> Arc<CheckpointService> {
1427        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1428        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1429
1430        debug!(
1431            "Starting checkpoint service with epoch start timestamp {}
1432            and epoch duration {}",
1433            epoch_start_timestamp_ms, epoch_duration_ms
1434        );
1435
1436        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1437            sender: consensus_adapter,
1438            signer: state.secret.clone(),
1439            authority: config.authority_public_key(),
1440            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1441                .checked_add(epoch_duration_ms)
1442                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1443            metrics: checkpoint_metrics.clone(),
1444        });
1445
1446        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1447        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1448        let max_checkpoint_size_bytes =
1449            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1450
1451        CheckpointService::build(
1452            state.clone(),
1453            checkpoint_store,
1454            epoch_store,
1455            state.get_transaction_cache_reader().clone(),
1456            accumulator,
1457            checkpoint_output,
1458            Box::new(certified_checkpoint_output),
1459            checkpoint_metrics,
1460            max_tx_per_checkpoint,
1461            max_checkpoint_size_bytes,
1462        )
1463    }
1464
1465    fn construct_consensus_adapter(
1466        committee: &Committee,
1467        consensus_config: &ConsensusConfig,
1468        authority: AuthorityName,
1469        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1470        prometheus_registry: &Registry,
1471        consensus_client: Arc<dyn ConsensusClient>,
1472    ) -> ConsensusAdapter {
1473        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1474        // The consensus adapter allows the authority to send user certificates through
1475        // consensus.
1476
1477        ConsensusAdapter::new(
1478            consensus_client,
1479            authority,
1480            connection_monitor_status,
1481            consensus_config.max_pending_transactions(),
1482            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1483            consensus_config.max_submit_position,
1484            consensus_config.submit_delay_step_override(),
1485            ca_metrics,
1486        )
1487    }
1488
1489    async fn start_grpc_validator_service(
1490        config: &NodeConfig,
1491        state: Arc<AuthorityState>,
1492        consensus_adapter: Arc<ConsensusAdapter>,
1493        prometheus_registry: &Registry,
1494    ) -> Result<(SpawnOnce, tokio::sync::oneshot::Sender<()>)> {
1495        let validator_service = ValidatorService::new(
1496            state.clone(),
1497            consensus_adapter,
1498            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1499            TrafficControllerMetrics::new(prometheus_registry),
1500            config.policy_config.clone(),
1501            config.firewall_config.clone(),
1502        );
1503
1504        let mut server_conf = iota_network_stack::config::Config::new();
1505        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1506        server_conf.load_shed = config.grpc_load_shed;
1507        let mut server_builder =
1508            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1509
1510        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1511
1512        let tls_config = iota_tls::create_rustls_server_config(
1513            config.network_key_pair().copy().private(),
1514            IOTA_TLS_SERVER_NAME.to_string(),
1515            iota_tls::AllowAll,
1516        );
1517        let mut server = server_builder
1518            .bind(config.network_address(), Some(tls_config))
1519            .await
1520            .map_err(|err| anyhow!(err.to_string()))?;
1521        let cancel_handle = server
1522            .take_cancel_handle()
1523            .expect("GRPC server should still have a cancel handle");
1524        let local_addr = server.local_addr();
1525        info!("Listening to traffic on {local_addr}");
1526
1527        Ok((
1528            SpawnOnce::new(server.serve().map_err(Into::into)),
1529            cancel_handle,
1530        ))
1531    }
1532
1533    /// Re-executes pending consensus certificates, which may not have been
1534    /// committed to disk before the node restarted. This is necessary for
1535    /// the following reasons:
1536    ///
1537    /// 1. For any transaction for which we returned signed effects to a client,
1538    ///    we must ensure that we have re-executed the transaction before we
1539    ///    begin accepting grpc requests. Otherwise we would appear to have
1540    ///    forgotten about the transaction.
1541    /// 2. While this is running, we are concurrently waiting for all previously
1542    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1543    ///    either direction (from checkpointed consensus transactions to pending
1544    ///    consensus transactions, or vice versa), we must re-execute pending
1545    ///    consensus transactions to ensure that both processes can complete.
1546    /// 3. Also note that for any pending consensus transactions for which we
1547    ///    wrote a signed effects digest to disk, we must re-execute using that
1548    ///    digest as the expected effects digest, to ensure that we cannot
1549    ///    arrive at different effects than what we previously signed.
1550    async fn reexecute_pending_consensus_certs(
1551        epoch_store: &Arc<AuthorityPerEpochStore>,
1552        state: &Arc<AuthorityState>,
1553    ) {
1554        let mut pending_consensus_certificates = Vec::new();
1555        let mut additional_certs = Vec::new();
1556
1557        for tx in epoch_store.get_all_pending_consensus_transactions() {
1558            match tx.kind {
1559                // Shared object txns cannot be re-executed at this point, because we must wait for
1560                // consensus replay to assign shared object versions.
1561                ConsensusTransactionKind::CertifiedTransaction(tx)
1562                    if !tx.contains_shared_object() =>
1563                {
1564                    let tx = *tx;
1565                    // new_unchecked is safe because we never submit a transaction to consensus
1566                    // without verifying it
1567                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1568                        VerifiedCertificate::new_unchecked(tx),
1569                    );
1570                    // we only need to re-execute if we previously signed the effects (which
1571                    // indicates we returned the effects to a client).
1572                    if let Some(fx_digest) = epoch_store
1573                        .get_signed_effects_digest(tx.digest())
1574                        .expect("db error")
1575                    {
1576                        pending_consensus_certificates.push((tx, fx_digest));
1577                    } else {
1578                        additional_certs.push(tx);
1579                    }
1580                }
1581                _ => (),
1582            }
1583        }
1584
1585        let digests = pending_consensus_certificates
1586            .iter()
1587            .map(|(tx, _)| *tx.digest())
1588            .collect::<Vec<_>>();
1589
1590        info!(
1591            "reexecuting {} pending consensus certificates: {:?}",
1592            digests.len(),
1593            digests
1594        );
1595
1596        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1597        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1598
1599        // If this times out, the validator will still almost certainly start up fine.
1600        // But, it is possible that it may temporarily "forget" about
1601        // transactions that it had previously executed. This could confuse
1602        // clients in some circumstances. However, the transactions are still in
1603        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1604        let timeout = if cfg!(msim) { 120 } else { 60 };
1605        if tokio::time::timeout(
1606            std::time::Duration::from_secs(timeout),
1607            state
1608                .get_transaction_cache_reader()
1609                .notify_read_executed_effects_digests(&digests),
1610        )
1611        .await
1612        .is_err()
1613        {
1614            // Log all the digests that were not executed to help debugging.
1615            if let Ok(executed_effects_digests) = state
1616                .get_transaction_cache_reader()
1617                .multi_get_executed_effects_digests(&digests)
1618            {
1619                let pending_digests = digests
1620                    .iter()
1621                    .zip(executed_effects_digests.iter())
1622                    .filter_map(|(digest, executed_effects_digest)| {
1623                        if executed_effects_digest.is_none() {
1624                            Some(digest)
1625                        } else {
1626                            None
1627                        }
1628                    })
1629                    .collect::<Vec<_>>();
1630                debug_fatal!(
1631                    "Timed out waiting for effects digests to be executed: {:?}",
1632                    pending_digests
1633                );
1634            } else {
1635                debug_fatal!(
1636                    "Timed out waiting for effects digests to be executed, digests not found"
1637                );
1638            }
1639        }
1640    }
1641
1642    pub fn state(&self) -> Arc<AuthorityState> {
1643        self.state.clone()
1644    }
1645
1646    // Only used for testing because of how epoch store is loaded.
1647    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1648        self.state.reference_gas_price_for_testing()
1649    }
1650
1651    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1652        self.state.committee_store().clone()
1653    }
1654
1655    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1656    // self.state.db()
1657    // }
1658
1659    /// Clone an AuthorityAggregator currently used in this node's
1660    /// QuorumDriver, if the node is a fullnode. After reconfig,
1661    /// QuorumDriver builds a new AuthorityAggregator. The caller
1662    /// of this function will mostly likely want to call this again
1663    /// to get a fresh one.
1664    pub fn clone_authority_aggregator(
1665        &self,
1666    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1667        self.transaction_orchestrator
1668            .as_ref()
1669            .map(|to| to.clone_authority_aggregator())
1670    }
1671
1672    pub fn transaction_orchestrator(
1673        &self,
1674    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1675        self.transaction_orchestrator.clone()
1676    }
1677
1678    pub fn subscribe_to_transaction_orchestrator_effects(
1679        &self,
1680    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1681        self.transaction_orchestrator
1682            .as_ref()
1683            .map(|to| to.subscribe_to_effects_queue())
1684            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1685    }
1686
1687    /// This function awaits the completion of checkpoint execution of the
1688    /// current epoch, after which it initiates reconfiguration of the
1689    /// entire system. This function also handles role changes for the node when
1690    /// epoch changes and advertises capabilities to the committee if the node
1691    /// is a validator.
1692    pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
1693        let checkpoint_executor_metrics =
1694            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1695
1696        loop {
1697            let mut accumulator_guard = self.accumulator.lock().await;
1698            let accumulator = accumulator_guard.take().unwrap();
1699            let mut checkpoint_executor = CheckpointExecutor::new(
1700                self.state_sync_handle.subscribe_to_synced_checkpoints(),
1701                self.checkpoint_store.clone(),
1702                self.state.clone(),
1703                accumulator.clone(),
1704                self.config.checkpoint_executor_config.clone(),
1705                checkpoint_executor_metrics.clone(),
1706            );
1707
1708            let run_with_range = self.config.run_with_range;
1709
1710            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1711
1712            // Advertise capabilities to committee, if we are a validator.
1713            if let Some(components) = &*self.validator_components.lock().await {
1714                // TODO: without this sleep, the consensus message is not delivered reliably.
1715                tokio::time::sleep(Duration::from_millis(1)).await;
1716
1717                let config = cur_epoch_store.protocol_config();
1718                let binary_config = to_binary_config(config);
1719                let transaction = ConsensusTransaction::new_capability_notification_v1(
1720                    AuthorityCapabilitiesV1::new(
1721                        self.state.name,
1722                        cur_epoch_store.get_chain_identifier().chain(),
1723                        self.config
1724                            .supported_protocol_versions
1725                            .expect("Supported versions should be populated")
1726                            // no need to send digests of versions less than the current version
1727                            .truncate_below(config.version),
1728                        self.state
1729                            .get_available_system_packages(&binary_config)
1730                            .await,
1731                    ),
1732                );
1733                info!(?transaction, "submitting capabilities to consensus");
1734                components
1735                    .consensus_adapter
1736                    .submit(transaction, None, &cur_epoch_store)?;
1737            }
1738
1739            let stop_condition = checkpoint_executor
1740                .run_epoch(cur_epoch_store.clone(), run_with_range)
1741                .await;
1742            drop(checkpoint_executor);
1743
1744            if stop_condition == StopReason::RunWithRangeCondition {
1745                IotaNode::shutdown(&self).await;
1746                self.shutdown_channel_tx
1747                    .send(run_with_range)
1748                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1749                return Ok(());
1750            }
1751
1752            // Safe to call because we are in the middle of reconfiguration.
1753            let latest_system_state = self
1754                .state
1755                .get_object_cache_reader()
1756                .get_iota_system_state_object_unsafe()
1757                .expect("Read IOTA System State object cannot fail");
1758
1759            #[cfg(msim)]
1760            if !self
1761                .sim_state
1762                .sim_safe_mode_expected
1763                .load(Ordering::Relaxed)
1764            {
1765                debug_assert!(!latest_system_state.safe_mode());
1766            }
1767
1768            #[cfg(not(msim))]
1769            debug_assert!(!latest_system_state.safe_mode());
1770
1771            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1772                if self.state.is_fullnode(&cur_epoch_store) {
1773                    warn!(
1774                        "Failed to send end of epoch notification to subscriber: {:?}",
1775                        err
1776                    );
1777                }
1778            }
1779
1780            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1781            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1782
1783            self.auth_agg.store(Arc::new(
1784                self.auth_agg
1785                    .load()
1786                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1787            ));
1788
1789            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1790            let next_epoch = next_epoch_committee.epoch();
1791            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1792
1793            info!(
1794                next_epoch,
1795                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1796            );
1797
1798            fail_point_async!("reconfig_delay");
1799
1800            // We save the connection monitor status map regardless of validator / fullnode
1801            // status so that we don't need to restart the connection monitor
1802            // every epoch. Update the mappings that will be used by the
1803            // consensus adapter if it exists or is about to be created.
1804            let authority_names_to_peer_ids =
1805                new_epoch_start_state.get_authority_names_to_peer_ids();
1806            self.connection_monitor_status
1807                .update_mapping_for_epoch(authority_names_to_peer_ids);
1808
1809            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1810
1811            send_trusted_peer_change(
1812                &self.config,
1813                &self.trusted_peer_change_tx,
1814                &new_epoch_start_state,
1815            );
1816
1817            // The following code handles 4 different cases, depending on whether the node
1818            // was a validator in the previous epoch, and whether the node is a validator
1819            // in the new epoch.
1820
1821            let new_validator_components = if let Some(ValidatorComponents {
1822                validator_server_handle,
1823                validator_server_cancel_handle,
1824                validator_overload_monitor_handle,
1825                consensus_manager,
1826                consensus_store_pruner,
1827                consensus_adapter,
1828                mut checkpoint_service_tasks,
1829                checkpoint_metrics,
1830                iota_tx_validator_metrics,
1831                validator_registry_id,
1832            }) = self.validator_components.lock().await.take()
1833            {
1834                info!("Reconfiguring the validator.");
1835                // Cancel the old checkpoint service tasks.
1836                // Waiting for checkpoint builder to finish gracefully is not possible, because
1837                // it may wait on transactions while consensus on peers have
1838                // already shut down.
1839                checkpoint_service_tasks.abort_all();
1840                while let Some(result) = checkpoint_service_tasks.join_next().await {
1841                    if let Err(err) = result {
1842                        if err.is_panic() {
1843                            std::panic::resume_unwind(err.into_panic());
1844                        }
1845                        warn!("Error in checkpoint service task: {:?}", err);
1846                    }
1847                }
1848                info!("Checkpoint service has shut down.");
1849
1850                consensus_manager.shutdown().await;
1851                info!("Consensus has shut down.");
1852
1853                let new_epoch_store = self
1854                    .reconfigure_state(
1855                        &self.state,
1856                        &cur_epoch_store,
1857                        next_epoch_committee.clone(),
1858                        new_epoch_start_state,
1859                        accumulator.clone(),
1860                    )
1861                    .await?;
1862                info!("Epoch store finished reconfiguration.");
1863
1864                // No other components should be holding a strong reference to state accumulator
1865                // at this point. Confirm here before we swap in the new accumulator.
1866                let accumulator_metrics = Arc::into_inner(accumulator)
1867                    .expect("Accumulator should have no other references at this point")
1868                    .metrics();
1869                let new_accumulator = Arc::new(StateAccumulator::new(
1870                    self.state.get_accumulator_store().clone(),
1871                    accumulator_metrics,
1872                ));
1873                let weak_accumulator = Arc::downgrade(&new_accumulator);
1874                *accumulator_guard = Some(new_accumulator);
1875
1876                consensus_store_pruner.prune(next_epoch).await;
1877
1878                if self.state.is_validator(&new_epoch_store) {
1879                    // Only restart consensus if this node is still a validator in the new epoch.
1880                    Some(
1881                        Self::start_epoch_specific_validator_components(
1882                            &self.config,
1883                            self.state.clone(),
1884                            consensus_adapter,
1885                            self.checkpoint_store.clone(),
1886                            new_epoch_store.clone(),
1887                            self.state_sync_handle.clone(),
1888                            self.randomness_handle.clone(),
1889                            consensus_manager,
1890                            consensus_store_pruner,
1891                            weak_accumulator,
1892                            validator_server_handle,
1893                            validator_server_cancel_handle,
1894                            validator_overload_monitor_handle,
1895                            checkpoint_metrics,
1896                            self.metrics.clone(),
1897                            iota_tx_validator_metrics,
1898                            validator_registry_id,
1899                        )
1900                        .await?,
1901                    )
1902                } else {
1903                    info!("This node is no longer a validator after reconfiguration");
1904                    if self.registry_service.remove(validator_registry_id) {
1905                        debug!("Removed validator metrics registry");
1906                    } else {
1907                        warn!("Failed to remove validator metrics registry");
1908                    }
1909                    if validator_server_cancel_handle.send(()).is_ok() {
1910                        debug!("Validator grpc server cancelled");
1911                    } else {
1912                        warn!("Failed to cancel validator grpc server");
1913                    }
1914
1915                    None
1916                }
1917            } else {
1918                let new_epoch_store = self
1919                    .reconfigure_state(
1920                        &self.state,
1921                        &cur_epoch_store,
1922                        next_epoch_committee.clone(),
1923                        new_epoch_start_state,
1924                        accumulator.clone(),
1925                    )
1926                    .await?;
1927
1928                // No other components should be holding a strong reference to state accumulator
1929                // at this point. Confirm here before we swap in the new accumulator.
1930                let accumulator_metrics = Arc::into_inner(accumulator)
1931                    .expect("Accumulator should have no other references at this point")
1932                    .metrics();
1933                let new_accumulator = Arc::new(StateAccumulator::new(
1934                    self.state.get_accumulator_store().clone(),
1935                    accumulator_metrics,
1936                ));
1937                let weak_accumulator = Arc::downgrade(&new_accumulator);
1938                *accumulator_guard = Some(new_accumulator);
1939
1940                if self.state.is_validator(&new_epoch_store) {
1941                    info!("Promoting the node from fullnode to validator, starting grpc server");
1942
1943                    let mut components = Self::construct_validator_components(
1944                        self.config.clone(),
1945                        self.state.clone(),
1946                        Arc::new(next_epoch_committee.clone()),
1947                        new_epoch_store.clone(),
1948                        self.checkpoint_store.clone(),
1949                        self.state_sync_handle.clone(),
1950                        self.randomness_handle.clone(),
1951                        weak_accumulator,
1952                        self.connection_monitor_status.clone(),
1953                        &self.registry_service,
1954                        self.metrics.clone(),
1955                    )
1956                    .await?;
1957
1958                    components.validator_server_handle = components.validator_server_handle.start();
1959
1960                    Some(components)
1961                } else {
1962                    None
1963                }
1964            };
1965            *self.validator_components.lock().await = new_validator_components;
1966
1967            // Force releasing current epoch store DB handle, because the
1968            // Arc<AuthorityPerEpochStore> may linger.
1969            cur_epoch_store.release_db_handles();
1970
1971            if cfg!(msim)
1972                && !matches!(
1973                    self.config
1974                        .authority_store_pruning_config
1975                        .num_epochs_to_retain_for_checkpoints(),
1976                    None | Some(u64::MAX) | Some(0)
1977                )
1978            {
1979                self.state
1980                .prune_checkpoints_for_eligible_epochs_for_testing(
1981                    self.config.clone(),
1982                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1983                )
1984                .await?;
1985            }
1986
1987            info!("Reconfiguration finished");
1988        }
1989    }
1990
1991    async fn shutdown(&self) {
1992        if let Some(validator_components) = &*self.validator_components.lock().await {
1993            validator_components.consensus_manager.shutdown().await;
1994        }
1995    }
1996
1997    /// Asynchronously reconfigures the state of the authority node for the next
1998    /// epoch.
1999    async fn reconfigure_state(
2000        &self,
2001        state: &Arc<AuthorityState>,
2002        cur_epoch_store: &AuthorityPerEpochStore,
2003        next_epoch_committee: Committee,
2004        next_epoch_start_system_state: EpochStartSystemState,
2005        accumulator: Arc<StateAccumulator>,
2006    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2007        let next_epoch = next_epoch_committee.epoch();
2008
2009        let last_checkpoint = self
2010            .checkpoint_store
2011            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2012            .expect("Error loading last checkpoint for current epoch")
2013            .expect("Could not load last checkpoint for current epoch");
2014        let epoch_supply_change = last_checkpoint
2015            .end_of_epoch_data
2016            .as_ref()
2017            .ok_or_else(|| {
2018                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2019            })?
2020            .epoch_supply_change;
2021
2022        let epoch_start_configuration = EpochStartConfiguration::new(
2023            next_epoch_start_system_state,
2024            *last_checkpoint.digest(),
2025            state.get_object_store().as_ref(),
2026            EpochFlag::default_flags_for_new_epoch(&state.config),
2027        )
2028        .expect("EpochStartConfiguration construction cannot fail");
2029
2030        let new_epoch_store = self
2031            .state
2032            .reconfigure(
2033                cur_epoch_store,
2034                self.config.supported_protocol_versions.unwrap(),
2035                next_epoch_committee,
2036                epoch_start_configuration,
2037                accumulator,
2038                &self.config.expensive_safety_check_config,
2039                epoch_supply_change,
2040            )
2041            .await
2042            .expect("Reconfigure authority state cannot fail");
2043        info!(next_epoch, "Node State has been reconfigured");
2044        assert_eq!(next_epoch, new_epoch_store.epoch());
2045        self.state.get_reconfig_api().update_epoch_flags_metrics(
2046            cur_epoch_store.epoch_start_config().flags(),
2047            new_epoch_store.epoch_start_config().flags(),
2048        );
2049
2050        Ok(new_epoch_store)
2051    }
2052
2053    pub fn get_config(&self) -> &NodeConfig {
2054        &self.config
2055    }
2056
2057    async fn execute_transaction_immediately_at_zero_epoch(
2058        state: &Arc<AuthorityState>,
2059        epoch_store: &Arc<AuthorityPerEpochStore>,
2060        tx: &Transaction,
2061        span: tracing::Span,
2062    ) {
2063        let transaction =
2064            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2065                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2066                    tx.data().clone(),
2067                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2068                ),
2069            );
2070        state
2071            .try_execute_immediately(&transaction, None, epoch_store)
2072            .instrument(span)
2073            .await
2074            .unwrap();
2075    }
2076
2077    pub fn randomness_handle(&self) -> randomness::Handle {
2078        self.randomness_handle.clone()
2079    }
2080}
2081
2082#[cfg(not(msim))]
2083impl IotaNode {
2084    async fn fetch_jwks(
2085        _authority: AuthorityName,
2086        provider: &OIDCProvider,
2087    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2088        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2089        let client = reqwest::Client::new();
2090        fetch_jwks(provider, &client)
2091            .await
2092            .map_err(|_| IotaError::JWKRetrieval)
2093    }
2094}
2095
2096#[cfg(msim)]
2097impl IotaNode {
2098    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2099        self.sim_state.sim_node.id()
2100    }
2101
2102    pub fn set_safe_mode_expected(&self, new_value: bool) {
2103        info!("Setting safe mode expected to {}", new_value);
2104        self.sim_state
2105            .sim_safe_mode_expected
2106            .store(new_value, Ordering::Relaxed);
2107    }
2108
2109    async fn fetch_jwks(
2110        authority: AuthorityName,
2111        provider: &OIDCProvider,
2112    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2113        get_jwk_injector()(authority, provider)
2114    }
2115}
2116
2117enum SpawnOnce {
2118    // Mutex is only needed to make SpawnOnce Send
2119    Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2120    #[allow(unused)]
2121    Started(JoinHandle<Result<()>>),
2122}
2123
2124impl SpawnOnce {
2125    pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2126        Self::Unstarted(Mutex::new(Box::pin(future)))
2127    }
2128
2129    pub fn start(self) -> Self {
2130        match self {
2131            Self::Unstarted(future) => {
2132                let future = future.into_inner();
2133                let handle = tokio::spawn(future);
2134                Self::Started(handle)
2135            }
2136            Self::Started(_) => self,
2137        }
2138    }
2139}
2140
2141/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2142/// available.
2143fn send_trusted_peer_change(
2144    config: &NodeConfig,
2145    sender: &watch::Sender<TrustedPeerChangeEvent>,
2146    new_epoch_start_state: &EpochStartSystemState,
2147) {
2148    let new_committee =
2149        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2150
2151    sender.send_modify(|event| {
2152        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2153        event.new_committee = new_committee;
2154    })
2155}
2156
2157fn build_kv_store(
2158    state: &Arc<AuthorityState>,
2159    config: &NodeConfig,
2160    registry: &Registry,
2161) -> Result<Arc<TransactionKeyValueStore>> {
2162    let metrics = KeyValueStoreMetrics::new(registry);
2163    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2164
2165    let base_url = &config.transaction_kv_store_read_config.base_url;
2166
2167    if base_url.is_empty() {
2168        info!("no http kv store url provided, using local db only");
2169        return Ok(Arc::new(db_store));
2170    }
2171
2172    base_url.parse::<url::Url>().tap_err(|e| {
2173        error!(
2174            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2175            base_url, e
2176        )
2177    })?;
2178
2179    let http_store = HttpKVStore::new_kv(
2180        base_url,
2181        config.transaction_kv_store_read_config.cache_size,
2182        metrics.clone(),
2183    )?;
2184    info!("using local key-value store with fallback to http key-value store");
2185    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2186        db_store,
2187        http_store,
2188        metrics,
2189        "json_rpc_fallback",
2190    )))
2191}
2192
2193/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2194/// REST APIs based on the node's configuration.
2195///
2196/// This function performs the following tasks:
2197/// 1. Checks if the node is a validator by inspecting the consensus
2198///    configuration; if so, it returns early as validators do not expose these
2199///    APIs.
2200/// 2. Creates an Axum router to handle HTTP requests.
2201/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2202///    on the node's state and configuration, including CoinApi,
2203///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2204///    IndexerApi.
2205/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2206///    the `/api/v1` path.
2207/// 5. Binds the server to the specified JSON-RPC address and starts listening
2208///    for incoming connections.
2209pub async fn build_http_server(
2210    state: Arc<AuthorityState>,
2211    store: RocksDbStore,
2212    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2213    config: &NodeConfig,
2214    prometheus_registry: &Registry,
2215    _custom_runtime: Option<Handle>,
2216    software_version: &'static str,
2217) -> Result<Option<tokio::task::JoinHandle<()>>> {
2218    // Validators do not expose these APIs
2219    if config.consensus_config().is_some() {
2220        return Ok(None);
2221    }
2222
2223    let mut router = axum::Router::new();
2224
2225    let json_rpc_router = {
2226        let mut server = JsonRpcServerBuilder::new(
2227            env!("CARGO_PKG_VERSION"),
2228            prometheus_registry,
2229            config.policy_config.clone(),
2230            config.firewall_config.clone(),
2231        );
2232
2233        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2234
2235        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2236        server.register_module(ReadApi::new(
2237            state.clone(),
2238            kv_store.clone(),
2239            metrics.clone(),
2240        ))?;
2241        server.register_module(CoinReadApi::new(
2242            state.clone(),
2243            kv_store.clone(),
2244            metrics.clone(),
2245        )?)?;
2246
2247        // if run_with_range is enabled we want to prevent any transactions
2248        // run_with_range = None is normal operating conditions
2249        if config.run_with_range.is_none() {
2250            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2251        }
2252        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2253
2254        if let Some(transaction_orchestrator) = transaction_orchestrator {
2255            server.register_module(TransactionExecutionApi::new(
2256                state.clone(),
2257                transaction_orchestrator.clone(),
2258                metrics.clone(),
2259            ))?;
2260        }
2261
2262        // TODO: Init from chain if config is not set once `IotaNamesConfig::from_chain`
2263        // is implemented
2264        let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2265
2266        server.register_module(IndexerApi::new(
2267            state.clone(),
2268            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2269            kv_store,
2270            metrics,
2271            iota_names_config,
2272            config.indexer_max_subscriptions,
2273        ))?;
2274        server.register_module(MoveUtils::new(state.clone()))?;
2275
2276        let server_type = config.jsonrpc_server_type();
2277
2278        server.to_router(server_type).await?
2279    };
2280
2281    router = router.merge(json_rpc_router);
2282
2283    if config.enable_rest_api {
2284        let mut rest_service = iota_rest_api::RestService::new(
2285            Arc::new(RestReadStore::new(state.clone(), store)),
2286            software_version,
2287        );
2288
2289        if let Some(config) = config.rest.clone() {
2290            rest_service.with_config(config);
2291        }
2292
2293        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2294
2295        if let Some(transaction_orchestrator) = transaction_orchestrator {
2296            rest_service.with_executor(transaction_orchestrator.clone())
2297        }
2298
2299        router = router.merge(rest_service.into_router());
2300    }
2301
2302    // TODO: Remove this health check when experimental REST API becomes default
2303    // This is a copy of the health check in crates/iota-rest-api/src/health.rs
2304    router = router
2305        .route("/health", axum::routing::get(health_check_handler))
2306        .route_layer(axum::Extension(state));
2307
2308    let listener = tokio::net::TcpListener::bind(&config.json_rpc_address)
2309        .await
2310        .unwrap();
2311    let addr = listener.local_addr().unwrap();
2312
2313    router = router.layer(axum::middleware::from_fn(server_timing_middleware));
2314
2315    let handle = tokio::spawn(async move {
2316        axum::serve(
2317            listener,
2318            router.into_make_service_with_connect_info::<SocketAddr>(),
2319        )
2320        .await
2321        .unwrap()
2322    });
2323
2324    info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
2325
2326    Ok(Some(handle))
2327}
2328
2329#[derive(Debug, serde::Serialize, serde::Deserialize)]
2330pub struct Threshold {
2331    pub threshold_seconds: Option<u32>,
2332}
2333
2334async fn health_check_handler(
2335    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2336    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2337) -> impl axum::response::IntoResponse {
2338    if let Some(threshold_seconds) = threshold_seconds {
2339        // Attempt to get the latest checkpoint
2340        let summary = match state
2341            .get_checkpoint_store()
2342            .get_highest_executed_checkpoint()
2343        {
2344            Ok(Some(summary)) => summary,
2345            Ok(None) => {
2346                warn!("Highest executed checkpoint not found");
2347                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2348            }
2349            Err(err) => {
2350                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2351                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2352            }
2353        };
2354
2355        // Calculate the threshold time based on the provided threshold_seconds
2356        let latest_chain_time = summary.timestamp();
2357        let threshold =
2358            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2359
2360        // Check if the latest checkpoint is within the threshold
2361        if latest_chain_time < threshold {
2362            warn!(
2363                ?latest_chain_time,
2364                ?threshold,
2365                "failing health check due to checkpoint lag"
2366            );
2367            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2368        }
2369    }
2370    // if health endpoint is responding and no threshold is given, respond success
2371    (axum::http::StatusCode::OK, "up")
2372}
2373
2374#[cfg(not(test))]
2375fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2376    protocol_config.max_transactions_per_checkpoint() as usize
2377}
2378
2379#[cfg(test)]
2380fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2381    2
2382}