iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::{TryFutureExt, future::BoxFuture};
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30    ConsensusConfig, NodeConfig,
31    node::{DBCheckpointConfig, RunWithRange},
32    node_config_metrics::NodeConfigMetrics,
33    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36    authority::{
37        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38        authority_per_epoch_store::AuthorityPerEpochStore,
39        authority_store_pruner::ObjectsCompactionFilter,
40        authority_store_tables::{
41            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42        },
43        backpressure::BackpressureManager,
44        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45    },
46    authority_aggregator::{
47        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48    },
49    authority_client::NetworkAuthorityClient,
50    authority_server::{ValidatorService, ValidatorServiceMetrics},
51    checkpoints::{
52        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53        SubmitCheckpointToConsensus,
54        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55    },
56    connection_monitor::ConnectionMonitor,
57    consensus_adapter::{
58        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59        ConsensusClient,
60    },
61    consensus_handler::ConsensusHandlerInitializer,
62    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64    db_checkpoint_handler::DBCheckpointHandler,
65    epoch::{
66        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68        reconfiguration::ReconfigurationInitiator,
69    },
70    execution_cache::build_execution_cache,
71    jsonrpc_index::IndexStore,
72    module_cache_metrics::ResolverMetrics,
73    overload_monitor::overload_monitor,
74    rest_index::RestIndexStore,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78    storage::{RestReadStore, RocksDbStore},
79    traffic_controller::metrics::TrafficControllerMetrics,
80    transaction_orchestrator::TransactionOrchestrator,
81    validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87    transaction_builder_api::TransactionBuilderApi,
88    transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93    RegistryID, RegistryService,
94    hardware_metrics::register_hardware_metrics,
95    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96    server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::RestMetrics;
105use iota_snapshot::uploader::StateSnapshotUploader;
106use iota_storage::{
107    FileCompression, StorageFormat,
108    http_key_value_store::HttpKVStore,
109    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
110    key_value_store_metrics::KeyValueStoreMetrics,
111};
112use iota_types::{
113    base_types::{AuthorityName, ConciseableName, EpochId},
114    committee::Committee,
115    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
116    digests::ChainIdentifier,
117    error::{IotaError, IotaResult},
118    executable_transaction::VerifiedExecutableTransaction,
119    execution_config_utils::to_binary_config,
120    full_checkpoint_content::CheckpointData,
121    iota_system_state::{
122        IotaSystemState, IotaSystemStateTrait,
123        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
124    },
125    messages_checkpoint::CertifiedCheckpointSummary,
126    messages_consensus::{
127        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
128        SignedAuthorityCapabilitiesV1, check_total_jwk_size,
129    },
130    messages_grpc::HandleCapabilityNotificationRequestV1,
131    quorum_driver_types::QuorumDriverEffectsQueueResult,
132    supported_protocol_versions::SupportedProtocolVersions,
133    transaction::{Transaction, VerifiedCertificate},
134};
135use prometheus::Registry;
136use shared_crypto::intent::{Intent, IntentMessage, IntentScope};
137#[cfg(msim)]
138pub use simulator::set_jwk_injector;
139#[cfg(msim)]
140use simulator::*;
141use tap::tap::TapFallible;
142use tokio::{
143    runtime::Handle,
144    sync::{Mutex, broadcast, mpsc, watch},
145    task::{JoinHandle, JoinSet},
146};
147use tokio_util::sync::CancellationToken;
148use tower::ServiceBuilder;
149use tracing::{Instrument, debug, error, error_span, info, warn};
150use typed_store::{
151    DBMetrics,
152    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
153};
154
155use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
156
157pub mod admin;
158mod handle;
159pub mod metrics;
160
161pub struct ValidatorComponents {
162    validator_server_spawn_handle: SpawnOnce,
163    validator_server_handle: iota_http::ServerHandle,
164    validator_overload_monitor_handle: Option<JoinHandle<()>>,
165    consensus_manager: ConsensusManager,
166    consensus_store_pruner: ConsensusStorePruner,
167    consensus_adapter: Arc<ConsensusAdapter>,
168    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
169    checkpoint_service_tasks: JoinSet<()>,
170    checkpoint_metrics: Arc<CheckpointMetrics>,
171    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
172    validator_registry_id: RegistryID,
173}
174
175#[cfg(msim)]
176mod simulator {
177    use std::sync::atomic::AtomicBool;
178
179    use super::*;
180
181    pub(super) struct SimState {
182        pub sim_node: iota_simulator::runtime::NodeHandle,
183        pub sim_safe_mode_expected: AtomicBool,
184        _leak_detector: iota_simulator::NodeLeakDetector,
185    }
186
187    impl Default for SimState {
188        fn default() -> Self {
189            Self {
190                sim_node: iota_simulator::runtime::NodeHandle::current(),
191                sim_safe_mode_expected: AtomicBool::new(false),
192                _leak_detector: iota_simulator::NodeLeakDetector::new(),
193            }
194        }
195    }
196
197    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
198        + Send
199        + Sync
200        + 'static;
201
202    fn default_fetch_jwks(
203        _authority: AuthorityName,
204        _provider: &OIDCProvider,
205    ) -> IotaResult<Vec<(JwkId, JWK)>> {
206        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
207        // Just load a default Twitch jwk for testing.
208        parse_jwks(
209            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
210            &OIDCProvider::Twitch,
211        )
212        .map_err(|_| IotaError::JWKRetrieval)
213    }
214
215    thread_local! {
216        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
217    }
218
219    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
220        JWK_INJECTOR.with(|injector| injector.borrow().clone())
221    }
222
223    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
224        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
225    }
226}
227
228pub struct IotaNode {
229    config: NodeConfig,
230    validator_components: Mutex<Option<ValidatorComponents>>,
231    /// The http server responsible for serving JSON-RPC as well as the
232    /// experimental rest service
233    _http_server: Option<iota_http::ServerHandle>,
234    state: Arc<AuthorityState>,
235    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
236    registry_service: RegistryService,
237    metrics: Arc<IotaNodeMetrics>,
238
239    _discovery: discovery::Handle,
240    state_sync_handle: state_sync::Handle,
241    randomness_handle: randomness::Handle,
242    checkpoint_store: Arc<CheckpointStore>,
243    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
244    connection_monitor_status: Arc<ConnectionMonitorStatus>,
245
246    /// Broadcast channel to send the starting system state for the next epoch.
247    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
248
249    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
250    /// peers.
251    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
252
253    backpressure_manager: Arc<BackpressureManager>,
254
255    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
256
257    #[cfg(msim)]
258    sim_state: SimState,
259
260    _state_archive_handle: Option<broadcast::Sender<()>>,
261
262    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
263    // Channel to allow signaling upstream to shutdown iota-node
264    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
265
266    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
267    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
268
269    /// AuthorityAggregator of the network, created at start and beginning of
270    /// each epoch. Use ArcSwap so that we could mutate it without taking
271    /// mut reference.
272    // TODO: Eventually we can make this auth aggregator a shared reference so that this
273    // update will automatically propagate to other uses.
274    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
275}
276
277impl fmt::Debug for IotaNode {
278    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
279        f.debug_struct("IotaNode")
280            .field("name", &self.state.name.concise())
281            .finish()
282    }
283}
284
285static MAX_JWK_KEYS_PER_FETCH: usize = 100;
286
287impl IotaNode {
288    pub async fn start(
289        config: NodeConfig,
290        registry_service: RegistryService,
291        custom_rpc_runtime: Option<Handle>,
292    ) -> Result<Arc<IotaNode>> {
293        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
294    }
295
296    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
297    /// configuration.
298    /// This function ensures continuous fetching, validation, and submission of
299    /// JWKs, maintaining up-to-date keys for the specified providers.
300    fn start_jwk_updater(
301        config: &NodeConfig,
302        metrics: Arc<IotaNodeMetrics>,
303        authority: AuthorityName,
304        epoch_store: Arc<AuthorityPerEpochStore>,
305        consensus_adapter: Arc<ConsensusAdapter>,
306    ) {
307        let epoch = epoch_store.epoch();
308
309        let supported_providers = config
310            .zklogin_oauth_providers
311            .get(&epoch_store.get_chain_identifier().chain())
312            .unwrap_or(&BTreeSet::new())
313            .iter()
314            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
315            .collect::<Vec<_>>();
316
317        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
318
319        info!(
320            ?fetch_interval,
321            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
322        );
323
324        fn validate_jwk(
325            metrics: &Arc<IotaNodeMetrics>,
326            provider: &OIDCProvider,
327            id: &JwkId,
328            jwk: &JWK,
329        ) -> bool {
330            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
331                warn!(
332                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
333                    id.iss, provider
334                );
335                metrics
336                    .invalid_jwks
337                    .with_label_values(&[&provider.to_string()])
338                    .inc();
339                return false;
340            };
341
342            if iss_provider != *provider {
343                warn!(
344                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
345                    id.iss, provider, iss_provider
346                );
347                metrics
348                    .invalid_jwks
349                    .with_label_values(&[&provider.to_string()])
350                    .inc();
351                return false;
352            }
353
354            if !check_total_jwk_size(id, jwk) {
355                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
356                metrics
357                    .invalid_jwks
358                    .with_label_values(&[&provider.to_string()])
359                    .inc();
360                return false;
361            }
362
363            true
364        }
365
366        // metrics is:
367        //  pub struct IotaNodeMetrics {
368        //      pub jwk_requests: IntCounterVec,
369        //      pub jwk_request_errors: IntCounterVec,
370        //      pub total_jwks: IntCounterVec,
371        //      pub unique_jwks: IntCounterVec,
372        //  }
373
374        for p in supported_providers.into_iter() {
375            let provider_str = p.to_string();
376            let epoch_store = epoch_store.clone();
377            let consensus_adapter = consensus_adapter.clone();
378            let metrics = metrics.clone();
379            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
380                async move {
381                    // note: restart-safe de-duplication happens after consensus, this is
382                    // just best-effort to reduce unneeded submissions.
383                    let mut seen = HashSet::new();
384                    loop {
385                        info!("fetching JWK for provider {:?}", p);
386                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
387                        match Self::fetch_jwks(authority, &p).await {
388                            Err(e) => {
389                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
390                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
391                                // Retry in 30 seconds
392                                tokio::time::sleep(Duration::from_secs(30)).await;
393                                continue;
394                            }
395                            Ok(mut keys) => {
396                                metrics.total_jwks
397                                    .with_label_values(&[&provider_str])
398                                    .inc_by(keys.len() as u64);
399
400                                keys.retain(|(id, jwk)| {
401                                    validate_jwk(&metrics, &p, id, jwk) &&
402                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
403                                    seen.insert((id.clone(), jwk.clone()))
404                                });
405
406                                metrics.unique_jwks
407                                    .with_label_values(&[&provider_str])
408                                    .inc_by(keys.len() as u64);
409
410                                // prevent oauth providers from sending too many keys,
411                                // inadvertently or otherwise
412                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
413                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
414                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
415                                }
416
417                                for (id, jwk) in keys.into_iter() {
418                                    info!("Submitting JWK to consensus: {:?}", id);
419
420                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
421                                    consensus_adapter.submit(txn, None, &epoch_store)
422                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
423                                        .ok();
424                                }
425                            }
426                        }
427                        tokio::time::sleep(fetch_interval).await;
428                    }
429                }
430                .instrument(error_span!("jwk_updater_task", epoch)),
431            ));
432        }
433    }
434
435    pub async fn start_async(
436        config: NodeConfig,
437        registry_service: RegistryService,
438        custom_rpc_runtime: Option<Handle>,
439        software_version: &'static str,
440    ) -> Result<Arc<IotaNode>> {
441        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
442        let mut config = config.clone();
443        if config.supported_protocol_versions.is_none() {
444            info!(
445                "populating config.supported_protocol_versions with default {:?}",
446                SupportedProtocolVersions::SYSTEM_DEFAULT
447            );
448            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
449        }
450
451        let run_with_range = config.run_with_range;
452        let is_validator = config.consensus_config().is_some();
453        let is_full_node = !is_validator;
454        let prometheus_registry = registry_service.default_registry();
455
456        info!(node =? config.authority_public_key(),
457            "Initializing iota-node listening on {}", config.network_address
458        );
459
460        let genesis = config.genesis()?.clone();
461
462        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
463        info!("IOTA chain identifier: {chain_identifier}");
464
465        // Check and set the db_corrupted flag
466        let db_corrupted_path = &config.db_path().join("status");
467        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
468            panic!("Failed to check database corruption: {err}");
469        }
470
471        // Initialize metrics to track db usage before creating any stores
472        DBMetrics::init(&prometheus_registry);
473
474        // Initialize IOTA metrics.
475        iota_metrics::init_metrics(&prometheus_registry);
476        // Unsupported (because of the use of static variable) and unnecessary in
477        // simtests.
478        #[cfg(not(msim))]
479        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481        // Register hardware metrics.
482        register_hardware_metrics(&registry_service, &config.db_path)
483            .expect("Failed registering hardware metrics");
484        // Register uptime metric
485        prometheus_registry
486            .register(iota_metrics::uptime_metric(
487                if is_validator {
488                    "validator"
489                } else {
490                    "fullnode"
491                },
492                software_version,
493                &chain_identifier.to_string(),
494            ))
495            .expect("Failed registering uptime metric");
496
497        // If genesis come with some migration data then load them into memory from the
498        // file path specified in config.
499        let migration_tx_data = if genesis.contains_migrations() {
500            // Here the load already verifies that the content of the migration blob is
501            // valid in respect to the content found in genesis
502            Some(config.load_migration_tx_data()?)
503        } else {
504            None
505        };
506
507        let secret = Arc::pin(config.authority_key_pair().copy());
508        let genesis_committee = genesis.committee()?;
509        let committee_store = Arc::new(CommitteeStore::new(
510            config.db_path().join("epochs"),
511            &genesis_committee,
512            None,
513        ));
514
515        let mut pruner_db = None;
516        if config
517            .authority_store_pruning_config
518            .enable_compaction_filter
519        {
520            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
521                &config.db_path().join("store"),
522            )));
523        }
524        let compaction_filter = pruner_db
525            .clone()
526            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
527
528        // By default, only enable write stall on validators for perpetual db.
529        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
530        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
531            enable_write_stall,
532            compaction_filter,
533        };
534        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
535            &config.db_path().join("store"),
536            Some(perpetual_tables_options),
537        ));
538        let is_genesis = perpetual_tables
539            .database_is_empty()
540            .expect("Database read should not fail at init.");
541        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
542        let backpressure_manager =
543            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
544
545        let store = AuthorityStore::open(
546            perpetual_tables,
547            &genesis,
548            &config,
549            &prometheus_registry,
550            migration_tx_data.as_ref(),
551        )
552        .await?;
553
554        let cur_epoch = store.get_recovery_epoch_at_restart()?;
555        let committee = committee_store
556            .get_committee(&cur_epoch)?
557            .expect("Committee of the current epoch must exist");
558        let epoch_start_configuration = store
559            .get_epoch_start_configuration()?
560            .expect("EpochStartConfiguration of the current epoch must exist");
561        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
562        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
563
564        let cache_traits = build_execution_cache(
565            &config.execution_cache_config,
566            &epoch_start_configuration,
567            &prometheus_registry,
568            &store,
569            backpressure_manager.clone(),
570        );
571
572        let auth_agg = {
573            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
574            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
575            Arc::new(ArcSwap::new(Arc::new(
576                AuthorityAggregator::new_from_epoch_start_state(
577                    epoch_start_configuration.epoch_start_state(),
578                    &committee_store,
579                    safe_client_metrics_base,
580                    auth_agg_metrics,
581                ),
582            )))
583        };
584
585        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
586        let epoch_store = AuthorityPerEpochStore::new(
587            config.authority_public_key(),
588            committee.clone(),
589            &config.db_path().join("store"),
590            Some(epoch_options.options),
591            EpochMetrics::new(&registry_service.default_registry()),
592            epoch_start_configuration,
593            cache_traits.backing_package_store.clone(),
594            cache_traits.object_store.clone(),
595            cache_metrics,
596            signature_verifier_metrics,
597            &config.expensive_safety_check_config,
598            ChainIdentifier::from(*genesis.checkpoint().digest()),
599            checkpoint_store
600                .get_highest_executed_checkpoint_seq_number()
601                .expect("checkpoint store read cannot fail")
602                .unwrap_or(0),
603        );
604
605        info!("created epoch store");
606
607        replay_log!(
608            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
609            epoch_store.epoch(),
610            epoch_store.protocol_config()
611        );
612
613        // the database is empty at genesis time
614        if is_genesis {
615            info!("checking IOTA conservation at genesis");
616            // When we are opening the db table, the only time when it's safe to
617            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
618            // an epoch and the IOTA conservation check will fail. This also initialize
619            // the expected_network_iota_amount table.
620            cache_traits
621                .reconfig_api
622                .try_expensive_check_iota_conservation(&epoch_store, None)
623                .expect("IOTA conservation check cannot fail at genesis");
624        }
625
626        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
627        let default_buffer_stake = epoch_store
628            .protocol_config()
629            .buffer_stake_for_protocol_upgrade_bps();
630        if effective_buffer_stake != default_buffer_stake {
631            warn!(
632                ?effective_buffer_stake,
633                ?default_buffer_stake,
634                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
635            );
636        }
637
638        checkpoint_store.insert_genesis_checkpoint(
639            genesis.checkpoint(),
640            genesis.checkpoint_contents().clone(),
641            &epoch_store,
642        );
643
644        // Database has everything from genesis, set corrupted key to 0
645        unmark_db_corruption(db_corrupted_path)?;
646
647        info!("creating state sync store");
648        let state_sync_store = RocksDbStore::new(
649            cache_traits.clone(),
650            committee_store.clone(),
651            checkpoint_store.clone(),
652        );
653
654        let index_store = if is_full_node && config.enable_index_processing {
655            info!("creating index store");
656            Some(Arc::new(IndexStore::new(
657                config.db_path().join("indexes"),
658                &prometheus_registry,
659                epoch_store
660                    .protocol_config()
661                    .max_move_identifier_len_as_option(),
662                config.remove_deprecated_tables,
663            )))
664        } else {
665            None
666        };
667
668        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
669        {
670            Some(Arc::new(RestIndexStore::new(
671                config.db_path().join("rest_index"),
672                &store,
673                &checkpoint_store,
674                &epoch_store,
675                &cache_traits.backing_package_store,
676            )))
677        } else {
678            None
679        };
680
681        info!("creating archive reader");
682        // Create network
683        // TODO only configure validators as seed/preferred peers for validators and not
684        // for fullnodes once we've had a chance to re-work fullnode
685        // configuration generation.
686        let archive_readers =
687            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
688        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
689        let (randomness_tx, randomness_rx) = mpsc::channel(
690            config
691                .p2p_config
692                .randomness
693                .clone()
694                .unwrap_or_default()
695                .mailbox_capacity(),
696        );
697        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
698            Self::create_p2p_network(
699                &config,
700                state_sync_store.clone(),
701                chain_identifier,
702                trusted_peer_change_rx,
703                archive_readers.clone(),
704                randomness_tx,
705                &prometheus_registry,
706            )?;
707
708        // We must explicitly send this instead of relying on the initial value to
709        // trigger watch value change, so that state-sync is able to process it.
710        send_trusted_peer_change(
711            &config,
712            &trusted_peer_change_tx,
713            epoch_store.epoch_start_state(),
714        );
715
716        info!("start state archival");
717        // Start archiving local state to remote store
718        let state_archive_handle =
719            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
720                .await?;
721
722        info!("start snapshot upload");
723        // Start uploading state snapshot to remote store
724        let state_snapshot_handle =
725            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
726
727        // Start uploading db checkpoints to remote store
728        info!("start db checkpoint");
729        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
730            &config,
731            &prometheus_registry,
732            state_snapshot_handle.is_some(),
733        )?;
734
735        let mut genesis_objects = genesis.objects().to_vec();
736        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
737            genesis_objects.extend(migration_tx_data.get_objects());
738        }
739
740        let authority_name = config.authority_public_key();
741        let validator_tx_finalizer =
742            config
743                .enable_validator_tx_finalizer
744                .then_some(Arc::new(ValidatorTxFinalizer::new(
745                    auth_agg.clone(),
746                    authority_name,
747                    &prometheus_registry,
748                )));
749
750        info!("create authority state");
751        let state = AuthorityState::new(
752            authority_name,
753            secret,
754            config.supported_protocol_versions.unwrap(),
755            store.clone(),
756            cache_traits.clone(),
757            epoch_store.clone(),
758            committee_store.clone(),
759            index_store.clone(),
760            rest_index,
761            checkpoint_store.clone(),
762            &prometheus_registry,
763            &genesis_objects,
764            &db_checkpoint_config,
765            config.clone(),
766            archive_readers,
767            validator_tx_finalizer,
768            chain_identifier,
769            pruner_db,
770        )
771        .await;
772
773        // ensure genesis and migration txs were executed
774        if epoch_store.epoch() == 0 {
775            let genesis_tx = &genesis.transaction();
776            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
777            // Execute genesis transaction
778            Self::execute_transaction_immediately_at_zero_epoch(
779                &state,
780                &epoch_store,
781                genesis_tx,
782                span,
783            )
784            .await;
785
786            // Execute migration transactions if present
787            if let Some(migration_tx_data) = migration_tx_data {
788                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
789                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
790                    Self::execute_transaction_immediately_at_zero_epoch(
791                        &state,
792                        &epoch_store,
793                        tx,
794                        span,
795                    )
796                    .await;
797                }
798            }
799        }
800
801        // Start the loop that receives new randomness and generates transactions for
802        // it.
803        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
804
805        if config
806            .expensive_safety_check_config
807            .enable_secondary_index_checks()
808        {
809            if let Some(indexes) = state.indexes.clone() {
810                iota_core::verify_indexes::verify_indexes(
811                    state.get_accumulator_store().as_ref(),
812                    indexes,
813                )
814                .expect("secondary indexes are inconsistent");
815            }
816        }
817
818        let (end_of_epoch_channel, end_of_epoch_receiver) =
819            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
820
821        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
822            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
823                auth_agg.load_full(),
824                state.clone(),
825                end_of_epoch_receiver,
826                &config.db_path(),
827                &prometheus_registry,
828            )))
829        } else {
830            None
831        };
832
833        let http_server = build_http_server(
834            state.clone(),
835            state_sync_store.clone(),
836            &transaction_orchestrator.clone(),
837            &config,
838            &prometheus_registry,
839            custom_rpc_runtime,
840            software_version,
841        )
842        .await?;
843
844        let accumulator = Arc::new(StateAccumulator::new(
845            cache_traits.accumulator_store.clone(),
846            StateAccumulatorMetrics::new(&prometheus_registry),
847        ));
848
849        let authority_names_to_peer_ids = epoch_store
850            .epoch_start_state()
851            .get_authority_names_to_peer_ids();
852
853        let network_connection_metrics =
854            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
855
856        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
857
858        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
859            p2p_network.downgrade(),
860            network_connection_metrics,
861            HashMap::new(),
862            None,
863        );
864
865        let connection_monitor_status = ConnectionMonitorStatus {
866            connection_statuses,
867            authority_names_to_peer_ids,
868        };
869
870        let connection_monitor_status = Arc::new(connection_monitor_status);
871        let iota_node_metrics =
872            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
873
874        let grpc_server_handle =
875            build_grpc_server(&config, state.clone(), state_sync_store.clone()).await?;
876
877        let validator_components = if state.is_committee_validator(&epoch_store) {
878            let (components, _) = futures::join!(
879                Self::construct_validator_components(
880                    config.clone(),
881                    state.clone(),
882                    committee,
883                    epoch_store.clone(),
884                    checkpoint_store.clone(),
885                    state_sync_handle.clone(),
886                    randomness_handle.clone(),
887                    Arc::downgrade(&accumulator),
888                    backpressure_manager.clone(),
889                    connection_monitor_status.clone(),
890                    &registry_service,
891                    iota_node_metrics.clone(),
892                ),
893                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
894            );
895            let mut components = components?;
896
897            components.consensus_adapter.submit_recovered(&epoch_store);
898
899            // Start the gRPC server
900            components.validator_server_spawn_handle =
901                components.validator_server_spawn_handle.start();
902
903            Some(components)
904        } else {
905            None
906        };
907
908        // setup shutdown channel
909        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
910
911        let node = Self {
912            config,
913            validator_components: Mutex::new(validator_components),
914            _http_server: http_server,
915            state,
916            transaction_orchestrator,
917            registry_service,
918            metrics: iota_node_metrics,
919
920            _discovery: discovery_handle,
921            state_sync_handle,
922            randomness_handle,
923            checkpoint_store,
924            accumulator: Mutex::new(Some(accumulator)),
925            end_of_epoch_channel,
926            connection_monitor_status,
927            trusted_peer_change_tx,
928            backpressure_manager,
929
930            _db_checkpoint_handle: db_checkpoint_handle,
931
932            #[cfg(msim)]
933            sim_state: Default::default(),
934
935            _state_archive_handle: state_archive_handle,
936            _state_snapshot_uploader_handle: state_snapshot_handle,
937            shutdown_channel_tx: shutdown_channel,
938
939            grpc_server_handle: Mutex::new(grpc_server_handle),
940
941            auth_agg,
942        };
943
944        info!("IotaNode started!");
945        let node = Arc::new(node);
946        let node_copy = node.clone();
947        spawn_monitored_task!(async move {
948            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
949            if let Err(error) = result {
950                warn!("Reconfiguration finished with error {:?}", error);
951            }
952        });
953
954        Ok(node)
955    }
956
957    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
958        self.end_of_epoch_channel.subscribe()
959    }
960
961    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
962        self.shutdown_channel_tx.subscribe()
963    }
964
965    pub fn current_epoch_for_testing(&self) -> EpochId {
966        self.state.current_epoch_for_testing()
967    }
968
969    pub fn db_checkpoint_path(&self) -> PathBuf {
970        self.config.db_checkpoint_path()
971    }
972
973    // Init reconfig process by starting to reject user certs
974    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
975        info!("close_epoch (current epoch = {})", epoch_store.epoch());
976        self.validator_components
977            .lock()
978            .await
979            .as_ref()
980            .ok_or_else(|| IotaError::from("Node is not a validator"))?
981            .consensus_adapter
982            .close_epoch(epoch_store);
983        Ok(())
984    }
985
986    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
987        self.state
988            .clear_override_protocol_upgrade_buffer_stake(epoch)
989    }
990
991    pub fn set_override_protocol_upgrade_buffer_stake(
992        &self,
993        epoch: EpochId,
994        buffer_stake_bps: u64,
995    ) -> IotaResult {
996        self.state
997            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
998    }
999
1000    // Testing-only API to start epoch close process.
1001    // For production code, please use the non-testing version.
1002    pub async fn close_epoch_for_testing(&self) -> IotaResult {
1003        let epoch_store = self.state.epoch_store_for_testing();
1004        self.close_epoch(&epoch_store).await
1005    }
1006
1007    async fn start_state_archival(
1008        config: &NodeConfig,
1009        prometheus_registry: &Registry,
1010        state_sync_store: RocksDbStore,
1011    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1012        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1013            let local_store_config = ObjectStoreConfig {
1014                object_store: Some(ObjectStoreType::File),
1015                directory: Some(config.archive_path()),
1016                ..Default::default()
1017            };
1018            let archive_writer = ArchiveWriter::new(
1019                local_store_config,
1020                remote_store_config.clone(),
1021                FileCompression::Zstd,
1022                StorageFormat::Blob,
1023                Duration::from_secs(600),
1024                256 * 1024 * 1024,
1025                prometheus_registry,
1026            )
1027            .await?;
1028            Ok(Some(archive_writer.start(state_sync_store).await?))
1029        } else {
1030            Ok(None)
1031        }
1032    }
1033
1034    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
1035    /// is set.
1036    fn start_state_snapshot(
1037        config: &NodeConfig,
1038        prometheus_registry: &Registry,
1039        checkpoint_store: Arc<CheckpointStore>,
1040    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1041        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1042            let snapshot_uploader = StateSnapshotUploader::new(
1043                &config.db_checkpoint_path(),
1044                &config.snapshot_path(),
1045                remote_store_config.clone(),
1046                60,
1047                prometheus_registry,
1048                checkpoint_store,
1049            )?;
1050            Ok(Some(snapshot_uploader.start()))
1051        } else {
1052            Ok(None)
1053        }
1054    }
1055
1056    fn start_db_checkpoint(
1057        config: &NodeConfig,
1058        prometheus_registry: &Registry,
1059        state_snapshot_enabled: bool,
1060    ) -> Result<(
1061        DBCheckpointConfig,
1062        Option<tokio::sync::broadcast::Sender<()>>,
1063    )> {
1064        let checkpoint_path = Some(
1065            config
1066                .db_checkpoint_config
1067                .checkpoint_path
1068                .clone()
1069                .unwrap_or_else(|| config.db_checkpoint_path()),
1070        );
1071        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1072            DBCheckpointConfig {
1073                checkpoint_path,
1074                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1075                    true
1076                } else {
1077                    config
1078                        .db_checkpoint_config
1079                        .perform_db_checkpoints_at_epoch_end
1080                },
1081                ..config.db_checkpoint_config.clone()
1082            }
1083        } else {
1084            config.db_checkpoint_config.clone()
1085        };
1086
1087        match (
1088            db_checkpoint_config.object_store_config.as_ref(),
1089            state_snapshot_enabled,
1090        ) {
1091            // If db checkpoint config object store not specified but
1092            // state snapshot object store is specified, create handler
1093            // anyway for marking db checkpoints as completed so that they
1094            // can be uploaded as state snapshots.
1095            (None, false) => Ok((db_checkpoint_config, None)),
1096            (_, _) => {
1097                let handler = DBCheckpointHandler::new(
1098                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1099                    db_checkpoint_config.object_store_config.as_ref(),
1100                    60,
1101                    db_checkpoint_config
1102                        .prune_and_compact_before_upload
1103                        .unwrap_or(true),
1104                    config.authority_store_pruning_config.clone(),
1105                    prometheus_registry,
1106                    state_snapshot_enabled,
1107                )?;
1108                Ok((
1109                    db_checkpoint_config,
1110                    Some(DBCheckpointHandler::start(handler)),
1111                ))
1112            }
1113        }
1114    }
1115
1116    fn create_p2p_network(
1117        config: &NodeConfig,
1118        state_sync_store: RocksDbStore,
1119        chain_identifier: ChainIdentifier,
1120        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1121        archive_readers: ArchiveReaderBalancer,
1122        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1123        prometheus_registry: &Registry,
1124    ) -> Result<(
1125        Network,
1126        discovery::Handle,
1127        state_sync::Handle,
1128        randomness::Handle,
1129    )> {
1130        let (state_sync, state_sync_server) = state_sync::Builder::new()
1131            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1132            .store(state_sync_store)
1133            .archive_readers(archive_readers)
1134            .with_metrics(prometheus_registry)
1135            .build();
1136
1137        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1138            .config(config.p2p_config.clone())
1139            .build();
1140
1141        let (randomness, randomness_router) =
1142            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1143                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1144                .with_metrics(prometheus_registry)
1145                .build();
1146
1147        let p2p_network = {
1148            let routes = anemo::Router::new()
1149                .add_rpc_service(discovery_server)
1150                .add_rpc_service(state_sync_server);
1151            let routes = routes.merge(randomness_router);
1152
1153            let inbound_network_metrics =
1154                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1155            let outbound_network_metrics =
1156                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1157
1158            let service = ServiceBuilder::new()
1159                .layer(
1160                    TraceLayer::new_for_server_errors()
1161                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1162                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1163                )
1164                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1165                    Arc::new(inbound_network_metrics),
1166                    config.p2p_config.excessive_message_size(),
1167                )))
1168                .service(routes);
1169
1170            let outbound_layer = ServiceBuilder::new()
1171                .layer(
1172                    TraceLayer::new_for_client_and_server_errors()
1173                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1174                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1175                )
1176                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1177                    Arc::new(outbound_network_metrics),
1178                    config.p2p_config.excessive_message_size(),
1179                )))
1180                .into_inner();
1181
1182            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1183            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1184            // many staking events in the epoch change txn.
1185            anemo_config.max_frame_size = Some(1 << 30);
1186
1187            // Set a higher default value for socket send/receive buffers if not already
1188            // configured.
1189            let mut quic_config = anemo_config.quic.unwrap_or_default();
1190            if quic_config.socket_send_buffer_size.is_none() {
1191                quic_config.socket_send_buffer_size = Some(20 << 20);
1192            }
1193            if quic_config.socket_receive_buffer_size.is_none() {
1194                quic_config.socket_receive_buffer_size = Some(20 << 20);
1195            }
1196            quic_config.allow_failed_socket_buffer_size_setting = true;
1197
1198            // Set high-performance defaults for quinn transport.
1199            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1200            if quic_config.max_concurrent_bidi_streams.is_none() {
1201                quic_config.max_concurrent_bidi_streams = Some(500);
1202            }
1203            if quic_config.max_concurrent_uni_streams.is_none() {
1204                quic_config.max_concurrent_uni_streams = Some(500);
1205            }
1206            if quic_config.stream_receive_window.is_none() {
1207                quic_config.stream_receive_window = Some(100 << 20);
1208            }
1209            if quic_config.receive_window.is_none() {
1210                quic_config.receive_window = Some(200 << 20);
1211            }
1212            if quic_config.send_window.is_none() {
1213                quic_config.send_window = Some(200 << 20);
1214            }
1215            if quic_config.crypto_buffer_size.is_none() {
1216                quic_config.crypto_buffer_size = Some(1 << 20);
1217            }
1218            if quic_config.max_idle_timeout_ms.is_none() {
1219                quic_config.max_idle_timeout_ms = Some(30_000);
1220            }
1221            if quic_config.keep_alive_interval_ms.is_none() {
1222                quic_config.keep_alive_interval_ms = Some(5_000);
1223            }
1224            anemo_config.quic = Some(quic_config);
1225
1226            let server_name = format!("iota-{chain_identifier}");
1227            let network = Network::bind(config.p2p_config.listen_address)
1228                .server_name(&server_name)
1229                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1230                .config(anemo_config)
1231                .outbound_request_layer(outbound_layer)
1232                .start(service)?;
1233            info!(
1234                server_name = server_name,
1235                "P2p network started on {}",
1236                network.local_addr()
1237            );
1238
1239            network
1240        };
1241
1242        let discovery_handle =
1243            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1244        let state_sync_handle = state_sync.start(p2p_network.clone());
1245        let randomness_handle = randomness.start(p2p_network.clone());
1246
1247        Ok((
1248            p2p_network,
1249            discovery_handle,
1250            state_sync_handle,
1251            randomness_handle,
1252        ))
1253    }
1254
1255    /// Asynchronously constructs and initializes the components necessary for
1256    /// the validator node.
1257    async fn construct_validator_components(
1258        config: NodeConfig,
1259        state: Arc<AuthorityState>,
1260        committee: Arc<Committee>,
1261        epoch_store: Arc<AuthorityPerEpochStore>,
1262        checkpoint_store: Arc<CheckpointStore>,
1263        state_sync_handle: state_sync::Handle,
1264        randomness_handle: randomness::Handle,
1265        accumulator: Weak<StateAccumulator>,
1266        backpressure_manager: Arc<BackpressureManager>,
1267        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1268        registry_service: &RegistryService,
1269        iota_node_metrics: Arc<IotaNodeMetrics>,
1270    ) -> Result<ValidatorComponents> {
1271        let mut config_clone = config.clone();
1272        let consensus_config = config_clone
1273            .consensus_config
1274            .as_mut()
1275            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1276        let validator_registry = Registry::new();
1277        let validator_registry_id = registry_service.add(validator_registry.clone());
1278
1279        let client = Arc::new(UpdatableConsensusClient::new());
1280        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1281            &committee,
1282            consensus_config,
1283            state.name,
1284            connection_monitor_status.clone(),
1285            &validator_registry,
1286            client.clone(),
1287            checkpoint_store.clone(),
1288        ));
1289        let consensus_manager = ConsensusManager::new(
1290            &config,
1291            consensus_config,
1292            registry_service,
1293            &validator_registry,
1294            client,
1295        );
1296
1297        // This only gets started up once, not on every epoch. (Make call to remove
1298        // every epoch.)
1299        let consensus_store_pruner = ConsensusStorePruner::new(
1300            consensus_manager.get_storage_base_path(),
1301            consensus_config.db_retention_epochs(),
1302            consensus_config.db_pruner_period(),
1303            &validator_registry,
1304        );
1305
1306        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1307        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1308
1309        let (validator_server_spawn_handle, validator_server_handle) =
1310            Self::start_grpc_validator_service(
1311                &config,
1312                state.clone(),
1313                consensus_adapter.clone(),
1314                &validator_registry,
1315            )
1316            .await?;
1317
1318        // Starts an overload monitor that monitors the execution of the authority.
1319        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1320        let validator_overload_monitor_handle = if config
1321            .authority_overload_config
1322            .max_load_shedding_percentage
1323            > 0
1324        {
1325            let authority_state = Arc::downgrade(&state);
1326            let overload_config = config.authority_overload_config.clone();
1327            fail_point!("starting_overload_monitor");
1328            Some(spawn_monitored_task!(overload_monitor(
1329                authority_state,
1330                overload_config,
1331            )))
1332        } else {
1333            None
1334        };
1335
1336        Self::start_epoch_specific_validator_components(
1337            &config,
1338            state.clone(),
1339            consensus_adapter,
1340            checkpoint_store,
1341            epoch_store,
1342            state_sync_handle,
1343            randomness_handle,
1344            consensus_manager,
1345            consensus_store_pruner,
1346            accumulator,
1347            backpressure_manager,
1348            validator_server_spawn_handle,
1349            validator_server_handle,
1350            validator_overload_monitor_handle,
1351            checkpoint_metrics,
1352            iota_node_metrics,
1353            iota_tx_validator_metrics,
1354            validator_registry_id,
1355        )
1356        .await
1357    }
1358
1359    /// Initializes and starts components specific to the current
1360    /// epoch for the validator node.
1361    async fn start_epoch_specific_validator_components(
1362        config: &NodeConfig,
1363        state: Arc<AuthorityState>,
1364        consensus_adapter: Arc<ConsensusAdapter>,
1365        checkpoint_store: Arc<CheckpointStore>,
1366        epoch_store: Arc<AuthorityPerEpochStore>,
1367        state_sync_handle: state_sync::Handle,
1368        randomness_handle: randomness::Handle,
1369        consensus_manager: ConsensusManager,
1370        consensus_store_pruner: ConsensusStorePruner,
1371        accumulator: Weak<StateAccumulator>,
1372        backpressure_manager: Arc<BackpressureManager>,
1373        validator_server_spawn_handle: SpawnOnce,
1374        validator_server_handle: iota_http::ServerHandle,
1375        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1376        checkpoint_metrics: Arc<CheckpointMetrics>,
1377        iota_node_metrics: Arc<IotaNodeMetrics>,
1378        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1379        validator_registry_id: RegistryID,
1380    ) -> Result<ValidatorComponents> {
1381        let checkpoint_service = Self::build_checkpoint_service(
1382            config,
1383            consensus_adapter.clone(),
1384            checkpoint_store.clone(),
1385            epoch_store.clone(),
1386            state.clone(),
1387            state_sync_handle,
1388            accumulator,
1389            checkpoint_metrics.clone(),
1390        );
1391
1392        // create a new map that gets injected into both the consensus handler and the
1393        // consensus adapter the consensus handler will write values forwarded
1394        // from consensus, and the consensus adapter will read the values to
1395        // make decisions about which validator submits a transaction to consensus
1396        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1397
1398        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1399
1400        let randomness_manager = RandomnessManager::try_new(
1401            Arc::downgrade(&epoch_store),
1402            Box::new(consensus_adapter.clone()),
1403            randomness_handle,
1404            config.authority_key_pair(),
1405        )
1406        .await;
1407        if let Some(randomness_manager) = randomness_manager {
1408            epoch_store
1409                .set_randomness_manager(randomness_manager)
1410                .await?;
1411        }
1412
1413        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1414            state.clone(),
1415            checkpoint_service.clone(),
1416            epoch_store.clone(),
1417            low_scoring_authorities,
1418            backpressure_manager,
1419        );
1420
1421        info!("Starting consensus manager");
1422
1423        consensus_manager
1424            .start(
1425                config,
1426                epoch_store.clone(),
1427                consensus_handler_initializer,
1428                IotaTxValidator::new(
1429                    epoch_store.clone(),
1430                    checkpoint_service.clone(),
1431                    state.transaction_manager().clone(),
1432                    iota_tx_validator_metrics.clone(),
1433                ),
1434            )
1435            .await;
1436
1437        if !epoch_store
1438            .epoch_start_config()
1439            .is_data_quarantine_active_from_beginning_of_epoch()
1440        {
1441            checkpoint_store
1442                .reexecute_local_checkpoints(&state, &epoch_store)
1443                .await;
1444        }
1445
1446        info!("Spawning checkpoint service");
1447        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1448
1449        if epoch_store.authenticator_state_enabled() {
1450            Self::start_jwk_updater(
1451                config,
1452                iota_node_metrics,
1453                state.name,
1454                epoch_store.clone(),
1455                consensus_adapter.clone(),
1456            );
1457        }
1458
1459        Ok(ValidatorComponents {
1460            validator_server_spawn_handle,
1461            validator_server_handle,
1462            validator_overload_monitor_handle,
1463            consensus_manager,
1464            consensus_store_pruner,
1465            consensus_adapter,
1466            checkpoint_service_tasks,
1467            checkpoint_metrics,
1468            iota_tx_validator_metrics,
1469            validator_registry_id,
1470        })
1471    }
1472
1473    /// Starts the checkpoint service for the validator node, initializing
1474    /// necessary components and settings.
1475    /// The function ensures proper initialization of the checkpoint service,
1476    /// preparing it to handle checkpoint creation and submission to consensus,
1477    /// while also setting up the necessary monitoring and synchronization
1478    /// mechanisms.
1479    fn build_checkpoint_service(
1480        config: &NodeConfig,
1481        consensus_adapter: Arc<ConsensusAdapter>,
1482        checkpoint_store: Arc<CheckpointStore>,
1483        epoch_store: Arc<AuthorityPerEpochStore>,
1484        state: Arc<AuthorityState>,
1485        state_sync_handle: state_sync::Handle,
1486        accumulator: Weak<StateAccumulator>,
1487        checkpoint_metrics: Arc<CheckpointMetrics>,
1488    ) -> Arc<CheckpointService> {
1489        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1490        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1491
1492        debug!(
1493            "Starting checkpoint service with epoch start timestamp {}
1494            and epoch duration {}",
1495            epoch_start_timestamp_ms, epoch_duration_ms
1496        );
1497
1498        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1499            sender: consensus_adapter,
1500            signer: state.secret.clone(),
1501            authority: config.authority_public_key(),
1502            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1503                .checked_add(epoch_duration_ms)
1504                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1505            metrics: checkpoint_metrics.clone(),
1506        });
1507
1508        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1509        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1510        let max_checkpoint_size_bytes =
1511            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1512
1513        CheckpointService::build(
1514            state.clone(),
1515            checkpoint_store,
1516            epoch_store,
1517            state.get_transaction_cache_reader().clone(),
1518            accumulator,
1519            checkpoint_output,
1520            Box::new(certified_checkpoint_output),
1521            checkpoint_metrics,
1522            max_tx_per_checkpoint,
1523            max_checkpoint_size_bytes,
1524        )
1525    }
1526
1527    fn construct_consensus_adapter(
1528        committee: &Committee,
1529        consensus_config: &ConsensusConfig,
1530        authority: AuthorityName,
1531        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1532        prometheus_registry: &Registry,
1533        consensus_client: Arc<dyn ConsensusClient>,
1534        checkpoint_store: Arc<CheckpointStore>,
1535    ) -> ConsensusAdapter {
1536        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1537        // The consensus adapter allows the authority to send user certificates through
1538        // consensus.
1539
1540        ConsensusAdapter::new(
1541            consensus_client,
1542            checkpoint_store,
1543            authority,
1544            connection_monitor_status,
1545            consensus_config.max_pending_transactions(),
1546            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1547            consensus_config.max_submit_position,
1548            consensus_config.submit_delay_step_override(),
1549            ca_metrics,
1550        )
1551    }
1552
1553    async fn start_grpc_validator_service(
1554        config: &NodeConfig,
1555        state: Arc<AuthorityState>,
1556        consensus_adapter: Arc<ConsensusAdapter>,
1557        prometheus_registry: &Registry,
1558    ) -> Result<(SpawnOnce, iota_http::ServerHandle)> {
1559        let validator_service = ValidatorService::new(
1560            state.clone(),
1561            consensus_adapter,
1562            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1563            TrafficControllerMetrics::new(prometheus_registry),
1564            config.policy_config.clone(),
1565            config.firewall_config.clone(),
1566        );
1567
1568        let mut server_conf = iota_network_stack::config::Config::new();
1569        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1570        server_conf.load_shed = config.grpc_load_shed;
1571        let mut server_builder =
1572            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1573
1574        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1575
1576        let tls_config = iota_tls::create_rustls_server_config(
1577            config.network_key_pair().copy().private(),
1578            IOTA_TLS_SERVER_NAME.to_string(),
1579        );
1580
1581        let server = server_builder
1582            .bind(config.network_address(), Some(tls_config))
1583            .await
1584            .map_err(|err| anyhow!(err.to_string()))?;
1585
1586        let local_addr = server.local_addr();
1587        info!("Listening to traffic on {local_addr}");
1588
1589        // Clone the server handle for shutdown functionality
1590        let server_handle = server.handle().clone();
1591
1592        Ok((
1593            SpawnOnce::new(server.serve().map_err(Into::into)),
1594            server_handle,
1595        ))
1596    }
1597
1598    /// Re-executes pending consensus certificates, which may not have been
1599    /// committed to disk before the node restarted. This is necessary for
1600    /// the following reasons:
1601    ///
1602    /// 1. For any transaction for which we returned signed effects to a client,
1603    ///    we must ensure that we have re-executed the transaction before we
1604    ///    begin accepting grpc requests. Otherwise we would appear to have
1605    ///    forgotten about the transaction.
1606    /// 2. While this is running, we are concurrently waiting for all previously
1607    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1608    ///    either direction (from checkpointed consensus transactions to pending
1609    ///    consensus transactions, or vice versa), we must re-execute pending
1610    ///    consensus transactions to ensure that both processes can complete.
1611    /// 3. Also note that for any pending consensus transactions for which we
1612    ///    wrote a signed effects digest to disk, we must re-execute using that
1613    ///    digest as the expected effects digest, to ensure that we cannot
1614    ///    arrive at different effects than what we previously signed.
1615    async fn reexecute_pending_consensus_certs(
1616        epoch_store: &Arc<AuthorityPerEpochStore>,
1617        state: &Arc<AuthorityState>,
1618    ) {
1619        let mut pending_consensus_certificates = Vec::new();
1620        let mut additional_certs = Vec::new();
1621
1622        for tx in epoch_store.get_all_pending_consensus_transactions() {
1623            match tx.kind {
1624                // Shared object txns cannot be re-executed at this point, because we must wait for
1625                // consensus replay to assign shared object versions.
1626                ConsensusTransactionKind::CertifiedTransaction(tx)
1627                    if !tx.contains_shared_object() =>
1628                {
1629                    let tx = *tx;
1630                    // new_unchecked is safe because we never submit a transaction to consensus
1631                    // without verifying it
1632                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1633                        VerifiedCertificate::new_unchecked(tx),
1634                    );
1635                    // we only need to re-execute if we previously signed the effects (which
1636                    // indicates we returned the effects to a client).
1637                    if let Some(fx_digest) = epoch_store
1638                        .get_signed_effects_digest(tx.digest())
1639                        .expect("db error")
1640                    {
1641                        pending_consensus_certificates.push((tx, fx_digest));
1642                    } else {
1643                        additional_certs.push(tx);
1644                    }
1645                }
1646                _ => (),
1647            }
1648        }
1649
1650        let digests = pending_consensus_certificates
1651            .iter()
1652            .map(|(tx, _)| *tx.digest())
1653            .collect::<Vec<_>>();
1654
1655        info!(
1656            "reexecuting {} pending consensus certificates: {:?}",
1657            digests.len(),
1658            digests
1659        );
1660
1661        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1662        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1663
1664        // If this times out, the validator will still almost certainly start up fine.
1665        // But, it is possible that it may temporarily "forget" about
1666        // transactions that it had previously executed. This could confuse
1667        // clients in some circumstances. However, the transactions are still in
1668        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1669        let timeout = if cfg!(msim) { 120 } else { 60 };
1670        if tokio::time::timeout(
1671            std::time::Duration::from_secs(timeout),
1672            state
1673                .get_transaction_cache_reader()
1674                .try_notify_read_executed_effects_digests(&digests),
1675        )
1676        .await
1677        .is_err()
1678        {
1679            // Log all the digests that were not executed to help debugging.
1680            if let Ok(executed_effects_digests) = state
1681                .get_transaction_cache_reader()
1682                .try_multi_get_executed_effects_digests(&digests)
1683            {
1684                let pending_digests = digests
1685                    .iter()
1686                    .zip(executed_effects_digests.iter())
1687                    .filter_map(|(digest, executed_effects_digest)| {
1688                        if executed_effects_digest.is_none() {
1689                            Some(digest)
1690                        } else {
1691                            None
1692                        }
1693                    })
1694                    .collect::<Vec<_>>();
1695                debug_fatal!(
1696                    "Timed out waiting for effects digests to be executed: {:?}",
1697                    pending_digests
1698                );
1699            } else {
1700                debug_fatal!(
1701                    "Timed out waiting for effects digests to be executed, digests not found"
1702                );
1703            }
1704        }
1705    }
1706
1707    pub fn state(&self) -> Arc<AuthorityState> {
1708        self.state.clone()
1709    }
1710
1711    // Only used for testing because of how epoch store is loaded.
1712    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1713        self.state.reference_gas_price_for_testing()
1714    }
1715
1716    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1717        self.state.committee_store().clone()
1718    }
1719
1720    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1721    // self.state.db()
1722    // }
1723
1724    /// Clone an AuthorityAggregator currently used in this node's
1725    /// QuorumDriver, if the node is a fullnode. After reconfig,
1726    /// QuorumDriver builds a new AuthorityAggregator. The caller
1727    /// of this function will mostly likely want to call this again
1728    /// to get a fresh one.
1729    pub fn clone_authority_aggregator(
1730        &self,
1731    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1732        self.transaction_orchestrator
1733            .as_ref()
1734            .map(|to| to.clone_authority_aggregator())
1735    }
1736
1737    pub fn transaction_orchestrator(
1738        &self,
1739    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1740        self.transaction_orchestrator.clone()
1741    }
1742
1743    pub fn subscribe_to_transaction_orchestrator_effects(
1744        &self,
1745    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1746        self.transaction_orchestrator
1747            .as_ref()
1748            .map(|to| to.subscribe_to_effects_queue())
1749            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1750    }
1751
1752    /// This function awaits the completion of checkpoint execution of the
1753    /// current epoch, after which it initiates reconfiguration of the
1754    /// entire system. This function also handles role changes for the node when
1755    /// epoch changes and advertises capabilities to the committee if the node
1756    /// is a validator.
1757    pub async fn monitor_reconfiguration(
1758        self: Arc<Self>,
1759        mut epoch_store: Arc<AuthorityPerEpochStore>,
1760    ) -> Result<()> {
1761        let checkpoint_executor_metrics =
1762            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1763
1764        loop {
1765            let mut accumulator_guard = self.accumulator.lock().await;
1766            let accumulator = accumulator_guard.take().unwrap();
1767            info!(
1768                "Creating checkpoint executor for epoch {}",
1769                epoch_store.epoch()
1770            );
1771
1772            // Create closures that handle gRPC type conversion
1773            let summary_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1774                guard.as_ref().map(|handle| {
1775                    let tx = handle.checkpoint_summary_broadcaster().clone();
1776                    Box::new(move |summary: &CertifiedCheckpointSummary| {
1777                        tx.send_traced(summary);
1778                    }) as Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>
1779                })
1780            } else {
1781                None
1782            };
1783            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1784                guard.as_ref().map(|handle| {
1785                    let tx = handle.checkpoint_data_broadcaster().clone();
1786                    Box::new(move |data: &CheckpointData| {
1787                        tx.send_traced(data);
1788                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1789                })
1790            } else {
1791                None
1792            };
1793
1794            let checkpoint_executor = CheckpointExecutor::new(
1795                epoch_store.clone(),
1796                self.checkpoint_store.clone(),
1797                self.state.clone(),
1798                accumulator.clone(),
1799                self.backpressure_manager.clone(),
1800                self.config.checkpoint_executor_config.clone(),
1801                checkpoint_executor_metrics.clone(),
1802                summary_sender,
1803                data_sender,
1804            );
1805
1806            let run_with_range = self.config.run_with_range;
1807
1808            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1809
1810            // Advertise capabilities to committee, if we are a validator.
1811            if let Some(components) = &*self.validator_components.lock().await {
1812                // TODO: without this sleep, the consensus message is not delivered reliably.
1813                tokio::time::sleep(Duration::from_millis(1)).await;
1814
1815                let config = cur_epoch_store.protocol_config();
1816                let binary_config = to_binary_config(config);
1817                let transaction = ConsensusTransaction::new_capability_notification_v1(
1818                    AuthorityCapabilitiesV1::new(
1819                        self.state.name,
1820                        cur_epoch_store.get_chain_identifier().chain(),
1821                        self.config
1822                            .supported_protocol_versions
1823                            .expect("Supported versions should be populated")
1824                            // no need to send digests of versions less than the current version
1825                            .truncate_below(config.version),
1826                        self.state
1827                            .get_available_system_packages(&binary_config)
1828                            .await,
1829                    ),
1830                );
1831                info!(?transaction, "submitting capabilities to consensus");
1832                components
1833                    .consensus_adapter
1834                    .submit(transaction, None, &cur_epoch_store)?;
1835            } else if self.state.is_active_validator(&cur_epoch_store)
1836                && cur_epoch_store
1837                    .protocol_config()
1838                    .track_non_committee_eligible_validators()
1839            {
1840                // Send signed capabilities to committee validators if we are a non-committee
1841                // validator in a separate task to not block the caller. Sending is done only if
1842                // the feature flag supporting it is enabled.
1843                let epoch_store = cur_epoch_store.clone();
1844                let node_clone = self.clone();
1845                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1846                    node_clone
1847                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1848                        .await;
1849                }));
1850            }
1851
1852            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1853
1854            if stop_condition == StopReason::RunWithRangeCondition {
1855                IotaNode::shutdown(&self).await;
1856                self.shutdown_channel_tx
1857                    .send(run_with_range)
1858                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1859                return Ok(());
1860            }
1861
1862            // Safe to call because we are in the middle of reconfiguration.
1863            let latest_system_state = self
1864                .state
1865                .get_object_cache_reader()
1866                .try_get_iota_system_state_object_unsafe()
1867                .expect("Read IOTA System State object cannot fail");
1868
1869            #[cfg(msim)]
1870            if !self
1871                .sim_state
1872                .sim_safe_mode_expected
1873                .load(Ordering::Relaxed)
1874            {
1875                debug_assert!(!latest_system_state.safe_mode());
1876            }
1877
1878            #[cfg(not(msim))]
1879            debug_assert!(!latest_system_state.safe_mode());
1880
1881            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1882                if self.state.is_fullnode(&cur_epoch_store) {
1883                    warn!(
1884                        "Failed to send end of epoch notification to subscriber: {:?}",
1885                        err
1886                    );
1887                }
1888            }
1889
1890            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1891            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1892
1893            self.auth_agg.store(Arc::new(
1894                self.auth_agg
1895                    .load()
1896                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1897            ));
1898
1899            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1900            let next_epoch = next_epoch_committee.epoch();
1901            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1902
1903            info!(
1904                next_epoch,
1905                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1906            );
1907
1908            fail_point_async!("reconfig_delay");
1909
1910            // We save the connection monitor status map regardless of validator / fullnode
1911            // status so that we don't need to restart the connection monitor
1912            // every epoch. Update the mappings that will be used by the
1913            // consensus adapter if it exists or is about to be created.
1914            let authority_names_to_peer_ids =
1915                new_epoch_start_state.get_authority_names_to_peer_ids();
1916            self.connection_monitor_status
1917                .update_mapping_for_epoch(authority_names_to_peer_ids);
1918
1919            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1920
1921            send_trusted_peer_change(
1922                &self.config,
1923                &self.trusted_peer_change_tx,
1924                &new_epoch_start_state,
1925            );
1926
1927            let mut validator_components_lock_guard = self.validator_components.lock().await;
1928
1929            // The following code handles 4 different cases, depending on whether the node
1930            // was a validator in the previous epoch, and whether the node is a validator
1931            // in the new epoch.
1932            let new_epoch_store = self
1933                .reconfigure_state(
1934                    &self.state,
1935                    &cur_epoch_store,
1936                    next_epoch_committee.clone(),
1937                    new_epoch_start_state,
1938                    accumulator.clone(),
1939                )
1940                .await?;
1941
1942            let new_validator_components = if let Some(ValidatorComponents {
1943                validator_server_spawn_handle,
1944                validator_server_handle,
1945                validator_overload_monitor_handle,
1946                consensus_manager,
1947                consensus_store_pruner,
1948                consensus_adapter,
1949                mut checkpoint_service_tasks,
1950                checkpoint_metrics,
1951                iota_tx_validator_metrics,
1952                validator_registry_id,
1953            }) = validator_components_lock_guard.take()
1954            {
1955                info!("Reconfiguring the validator.");
1956                // Cancel the old checkpoint service tasks.
1957                // Waiting for checkpoint builder to finish gracefully is not possible, because
1958                // it may wait on transactions while consensus on peers have
1959                // already shut down.
1960                checkpoint_service_tasks.abort_all();
1961                while let Some(result) = checkpoint_service_tasks.join_next().await {
1962                    if let Err(err) = result {
1963                        if err.is_panic() {
1964                            std::panic::resume_unwind(err.into_panic());
1965                        }
1966                        warn!("Error in checkpoint service task: {:?}", err);
1967                    }
1968                }
1969                info!("Checkpoint service has shut down.");
1970
1971                consensus_manager.shutdown().await;
1972                info!("Consensus has shut down.");
1973
1974                info!("Epoch store finished reconfiguration.");
1975
1976                // No other components should be holding a strong reference to state accumulator
1977                // at this point. Confirm here before we swap in the new accumulator.
1978                let accumulator_metrics = Arc::into_inner(accumulator)
1979                    .expect("Accumulator should have no other references at this point")
1980                    .metrics();
1981                let new_accumulator = Arc::new(StateAccumulator::new(
1982                    self.state.get_accumulator_store().clone(),
1983                    accumulator_metrics,
1984                ));
1985                let weak_accumulator = Arc::downgrade(&new_accumulator);
1986                *accumulator_guard = Some(new_accumulator);
1987
1988                consensus_store_pruner.prune(next_epoch).await;
1989
1990                if self.state.is_committee_validator(&new_epoch_store) {
1991                    // Only restart consensus if this node is still a validator in the new epoch.
1992                    Some(
1993                        Self::start_epoch_specific_validator_components(
1994                            &self.config,
1995                            self.state.clone(),
1996                            consensus_adapter,
1997                            self.checkpoint_store.clone(),
1998                            new_epoch_store.clone(),
1999                            self.state_sync_handle.clone(),
2000                            self.randomness_handle.clone(),
2001                            consensus_manager,
2002                            consensus_store_pruner,
2003                            weak_accumulator,
2004                            self.backpressure_manager.clone(),
2005                            validator_server_spawn_handle,
2006                            validator_server_handle,
2007                            validator_overload_monitor_handle,
2008                            checkpoint_metrics,
2009                            self.metrics.clone(),
2010                            iota_tx_validator_metrics,
2011                            validator_registry_id,
2012                        )
2013                        .await?,
2014                    )
2015                } else {
2016                    info!("This node is no longer a validator after reconfiguration");
2017                    if self.registry_service.remove(validator_registry_id) {
2018                        debug!("Removed validator metrics registry");
2019                    } else {
2020                        warn!("Failed to remove validator metrics registry");
2021                    }
2022                    validator_server_handle.trigger_shutdown();
2023                    debug!("Validator grpc server shutdown triggered");
2024
2025                    None
2026                }
2027            } else {
2028                // No other components should be holding a strong reference to state accumulator
2029                // at this point. Confirm here before we swap in the new accumulator.
2030                let accumulator_metrics = Arc::into_inner(accumulator)
2031                    .expect("Accumulator should have no other references at this point")
2032                    .metrics();
2033                let new_accumulator = Arc::new(StateAccumulator::new(
2034                    self.state.get_accumulator_store().clone(),
2035                    accumulator_metrics,
2036                ));
2037                let weak_accumulator = Arc::downgrade(&new_accumulator);
2038                *accumulator_guard = Some(new_accumulator);
2039
2040                if self.state.is_committee_validator(&new_epoch_store) {
2041                    info!("Promoting the node from fullnode to validator, starting grpc server");
2042
2043                    let mut components = Self::construct_validator_components(
2044                        self.config.clone(),
2045                        self.state.clone(),
2046                        Arc::new(next_epoch_committee.clone()),
2047                        new_epoch_store.clone(),
2048                        self.checkpoint_store.clone(),
2049                        self.state_sync_handle.clone(),
2050                        self.randomness_handle.clone(),
2051                        weak_accumulator,
2052                        self.backpressure_manager.clone(),
2053                        self.connection_monitor_status.clone(),
2054                        &self.registry_service,
2055                        self.metrics.clone(),
2056                    )
2057                    .await?;
2058
2059                    components.validator_server_spawn_handle =
2060                        components.validator_server_spawn_handle.start();
2061
2062                    Some(components)
2063                } else {
2064                    None
2065                }
2066            };
2067            *validator_components_lock_guard = new_validator_components;
2068
2069            // Force releasing current epoch store DB handle, because the
2070            // Arc<AuthorityPerEpochStore> may linger.
2071            cur_epoch_store.release_db_handles();
2072
2073            if cfg!(msim)
2074                && !matches!(
2075                    self.config
2076                        .authority_store_pruning_config
2077                        .num_epochs_to_retain_for_checkpoints(),
2078                    None | Some(u64::MAX) | Some(0)
2079                )
2080            {
2081                self.state
2082                .prune_checkpoints_for_eligible_epochs_for_testing(
2083                    self.config.clone(),
2084                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2085                )
2086                .await?;
2087            }
2088
2089            epoch_store = new_epoch_store;
2090            info!("Reconfiguration finished");
2091        }
2092    }
2093
2094    async fn shutdown(&self) {
2095        if let Some(validator_components) = &*self.validator_components.lock().await {
2096            validator_components.consensus_manager.shutdown().await;
2097        }
2098
2099        // Shutdown the gRPC server if it's running
2100        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2101            info!("Shutting down gRPC server");
2102            if let Err(e) = grpc_handle.shutdown().await {
2103                warn!("Failed to gracefully shutdown gRPC server: {e}");
2104            }
2105        }
2106    }
2107
2108    /// Asynchronously reconfigures the state of the authority node for the next
2109    /// epoch.
2110    async fn reconfigure_state(
2111        &self,
2112        state: &Arc<AuthorityState>,
2113        cur_epoch_store: &AuthorityPerEpochStore,
2114        next_epoch_committee: Committee,
2115        next_epoch_start_system_state: EpochStartSystemState,
2116        accumulator: Arc<StateAccumulator>,
2117    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2118        let next_epoch = next_epoch_committee.epoch();
2119
2120        let last_checkpoint = self
2121            .checkpoint_store
2122            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2123            .expect("Error loading last checkpoint for current epoch")
2124            .expect("Could not load last checkpoint for current epoch");
2125        let epoch_supply_change = last_checkpoint
2126            .end_of_epoch_data
2127            .as_ref()
2128            .ok_or_else(|| {
2129                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2130            })?
2131            .epoch_supply_change;
2132
2133        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2134
2135        assert_eq!(
2136            Some(last_checkpoint_seq),
2137            self.checkpoint_store
2138                .get_highest_executed_checkpoint_seq_number()
2139                .expect("Error loading highest executed checkpoint sequence number")
2140        );
2141
2142        let epoch_start_configuration = EpochStartConfiguration::new(
2143            next_epoch_start_system_state,
2144            *last_checkpoint.digest(),
2145            state.get_object_store().as_ref(),
2146            EpochFlag::default_flags_for_new_epoch(&state.config),
2147        )
2148        .expect("EpochStartConfiguration construction cannot fail");
2149
2150        let new_epoch_store = self
2151            .state
2152            .reconfigure(
2153                cur_epoch_store,
2154                self.config.supported_protocol_versions.unwrap(),
2155                next_epoch_committee,
2156                epoch_start_configuration,
2157                accumulator,
2158                &self.config.expensive_safety_check_config,
2159                epoch_supply_change,
2160                last_checkpoint_seq,
2161            )
2162            .await
2163            .expect("Reconfigure authority state cannot fail");
2164        info!(next_epoch, "Node State has been reconfigured");
2165        assert_eq!(next_epoch, new_epoch_store.epoch());
2166        self.state.get_reconfig_api().update_epoch_flags_metrics(
2167            cur_epoch_store.epoch_start_config().flags(),
2168            new_epoch_store.epoch_start_config().flags(),
2169        );
2170
2171        Ok(new_epoch_store)
2172    }
2173
2174    pub fn get_config(&self) -> &NodeConfig {
2175        &self.config
2176    }
2177
2178    async fn execute_transaction_immediately_at_zero_epoch(
2179        state: &Arc<AuthorityState>,
2180        epoch_store: &Arc<AuthorityPerEpochStore>,
2181        tx: &Transaction,
2182        span: tracing::Span,
2183    ) {
2184        let _guard = span.enter();
2185        let transaction =
2186            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2187                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2188                    tx.data().clone(),
2189                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2190                ),
2191            );
2192        state
2193            .try_execute_immediately(&transaction, None, epoch_store)
2194            .unwrap();
2195    }
2196
2197    pub fn randomness_handle(&self) -> randomness::Handle {
2198        self.randomness_handle.clone()
2199    }
2200
2201    /// Sends signed capability notification to committee validators for
2202    /// non-committee validators. This method implements retry logic to handle
2203    /// failed attempts to send the notification. It will retry sending the
2204    /// notification with an increasing interval until it receives a successful
2205    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2206    async fn send_signed_capability_notification_to_committee_with_retry(
2207        &self,
2208        epoch_store: &Arc<AuthorityPerEpochStore>,
2209    ) {
2210        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2211        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2212        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2213
2214        // Create the capability notification once
2215        let config = epoch_store.protocol_config();
2216        let binary_config = to_binary_config(config);
2217
2218        // Create the capability notification
2219        let capabilities = AuthorityCapabilitiesV1::new(
2220            self.state.name,
2221            epoch_store.get_chain_identifier().chain(),
2222            self.config
2223                .supported_protocol_versions
2224                .expect("Supported versions should be populated")
2225                .truncate_below(config.version),
2226            self.state
2227                .get_available_system_packages(&binary_config)
2228                .await,
2229        );
2230
2231        // Sign the capabilities using the authority key pair from config
2232        let signature = AuthoritySignature::new_secure(
2233            &IntentMessage::new(
2234                Intent::iota_app(IntentScope::AuthorityCapabilities),
2235                &capabilities,
2236            ),
2237            &epoch_store.epoch(),
2238            self.config.authority_key_pair(),
2239        );
2240
2241        let request = HandleCapabilityNotificationRequestV1 {
2242            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2243        };
2244
2245        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2246
2247        loop {
2248            let auth_agg = self.auth_agg.load();
2249            match auth_agg
2250                .send_capability_notification_to_quorum(request.clone())
2251                .await
2252            {
2253                Ok(_) => {
2254                    info!("Successfully sent capability notification to committee");
2255                    break;
2256                }
2257                Err(err) => {
2258                    match &err {
2259                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2260                            errors,
2261                        } => {
2262                            warn!(
2263                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2264                                retry_interval, errors
2265                            );
2266                        }
2267                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2268                            errors,
2269                        } => {
2270                            error!(
2271                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2272                                errors
2273                            );
2274                            break;
2275                        }
2276                    };
2277
2278                    // Wait before retrying
2279                    tokio::time::sleep(retry_interval).await;
2280
2281                    // Increase retry interval for the next attempt, capped at max
2282                    retry_interval = std::cmp::min(
2283                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2284                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2285                    );
2286                }
2287            }
2288        }
2289    }
2290}
2291
2292#[cfg(not(msim))]
2293impl IotaNode {
2294    async fn fetch_jwks(
2295        _authority: AuthorityName,
2296        provider: &OIDCProvider,
2297    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2298        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2299        let client = reqwest::Client::new();
2300        fetch_jwks(provider, &client)
2301            .await
2302            .map_err(|_| IotaError::JWKRetrieval)
2303    }
2304}
2305
2306#[cfg(msim)]
2307impl IotaNode {
2308    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2309        self.sim_state.sim_node.id()
2310    }
2311
2312    pub fn set_safe_mode_expected(&self, new_value: bool) {
2313        info!("Setting safe mode expected to {}", new_value);
2314        self.sim_state
2315            .sim_safe_mode_expected
2316            .store(new_value, Ordering::Relaxed);
2317    }
2318
2319    async fn fetch_jwks(
2320        authority: AuthorityName,
2321        provider: &OIDCProvider,
2322    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2323        get_jwk_injector()(authority, provider)
2324    }
2325}
2326
2327enum SpawnOnce {
2328    // Mutex is only needed to make SpawnOnce Send
2329    Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2330    #[allow(unused)]
2331    Started(JoinHandle<Result<()>>),
2332}
2333
2334impl SpawnOnce {
2335    pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2336        Self::Unstarted(Mutex::new(Box::pin(future)))
2337    }
2338
2339    pub fn start(self) -> Self {
2340        match self {
2341            Self::Unstarted(future) => {
2342                let future = future.into_inner();
2343                let handle = tokio::spawn(future);
2344                Self::Started(handle)
2345            }
2346            Self::Started(_) => self,
2347        }
2348    }
2349}
2350
2351/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2352/// available.
2353fn send_trusted_peer_change(
2354    config: &NodeConfig,
2355    sender: &watch::Sender<TrustedPeerChangeEvent>,
2356    new_epoch_start_state: &EpochStartSystemState,
2357) {
2358    let new_committee =
2359        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2360
2361    sender.send_modify(|event| {
2362        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2363        event.new_committee = new_committee;
2364    })
2365}
2366
2367fn build_kv_store(
2368    state: &Arc<AuthorityState>,
2369    config: &NodeConfig,
2370    registry: &Registry,
2371) -> Result<Arc<TransactionKeyValueStore>> {
2372    let metrics = KeyValueStoreMetrics::new(registry);
2373    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2374
2375    let base_url = &config.transaction_kv_store_read_config.base_url;
2376
2377    if base_url.is_empty() {
2378        info!("no http kv store url provided, using local db only");
2379        return Ok(Arc::new(db_store));
2380    }
2381
2382    base_url.parse::<url::Url>().tap_err(|e| {
2383        error!(
2384            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2385            base_url, e
2386        )
2387    })?;
2388
2389    let http_store = HttpKVStore::new_kv(
2390        base_url,
2391        config.transaction_kv_store_read_config.cache_size,
2392        metrics.clone(),
2393    )?;
2394    info!("using local key-value store with fallback to http key-value store");
2395    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2396        db_store,
2397        http_store,
2398        metrics,
2399        "json_rpc_fallback",
2400    )))
2401}
2402
2403/// Builds and starts the gRPC server for the IOTA node based on the node's
2404/// configuration.
2405///
2406/// This function performs the following tasks:
2407/// 1. Checks if the node is a validator by inspecting the consensus
2408///    configuration; if so, it returns early as validators do not expose gRPC
2409///    APIs.
2410/// 2. Checks if gRPC is enabled in the configuration.
2411/// 3. Creates broadcast channels for checkpoint streaming.
2412/// 4. Initializes the gRPC checkpoint service.
2413/// 5. Spawns the gRPC server to listen for incoming connections.
2414///
2415/// Returns a tuple of optional broadcast channels for checkpoint summary and
2416/// data.
2417async fn build_grpc_server(
2418    config: &NodeConfig,
2419    state: Arc<AuthorityState>,
2420    state_sync_store: RocksDbStore,
2421) -> Result<Option<GrpcServerHandle>> {
2422    // Validators do not expose gRPC APIs
2423    if config.consensus_config().is_some() || !config.enable_grpc_api {
2424        return Ok(None);
2425    }
2426
2427    let Some(grpc_config) = &config.grpc_api_config else {
2428        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2429    };
2430
2431    let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2432
2433    // Create cancellation token for proper shutdown hierarchy
2434    let shutdown_token = CancellationToken::new();
2435
2436    // Create GrpcReader
2437    let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(rest_read_store));
2438
2439    // Get the subscription handler from the state for event streaming
2440    let event_subscriber =
2441        state.subscription_handler.clone() as Arc<dyn iota_grpc_server::EventSubscriber>;
2442
2443    // Pass the same token to both GrpcReader (already done above) and
2444    // start_grpc_server
2445    let handle = start_grpc_server(
2446        grpc_reader,
2447        event_subscriber,
2448        grpc_config.clone(),
2449        shutdown_token,
2450    )
2451    .await?;
2452
2453    Ok(Some(handle))
2454}
2455
2456/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2457/// REST APIs based on the node's configuration.
2458///
2459/// This function performs the following tasks:
2460/// 1. Checks if the node is a validator by inspecting the consensus
2461///    configuration; if so, it returns early as validators do not expose these
2462///    APIs.
2463/// 2. Creates an Axum router to handle HTTP requests.
2464/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2465///    on the node's state and configuration, including CoinApi,
2466///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2467///    IndexerApi.
2468/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2469///    the `/api/v1` path.
2470/// 5. Binds the server to the specified JSON-RPC address and starts listening
2471///    for incoming connections.
2472pub async fn build_http_server(
2473    state: Arc<AuthorityState>,
2474    store: RocksDbStore,
2475    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2476    config: &NodeConfig,
2477    prometheus_registry: &Registry,
2478    _custom_runtime: Option<Handle>,
2479    software_version: &'static str,
2480) -> Result<Option<iota_http::ServerHandle>> {
2481    // Validators do not expose these APIs
2482    if config.consensus_config().is_some() {
2483        return Ok(None);
2484    }
2485
2486    let mut router = axum::Router::new();
2487
2488    let json_rpc_router = {
2489        let mut server = JsonRpcServerBuilder::new(
2490            env!("CARGO_PKG_VERSION"),
2491            prometheus_registry,
2492            config.policy_config.clone(),
2493            config.firewall_config.clone(),
2494        );
2495
2496        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2497
2498        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2499        server.register_module(ReadApi::new(
2500            state.clone(),
2501            kv_store.clone(),
2502            metrics.clone(),
2503        ))?;
2504        server.register_module(CoinReadApi::new(
2505            state.clone(),
2506            kv_store.clone(),
2507            metrics.clone(),
2508        )?)?;
2509
2510        // if run_with_range is enabled we want to prevent any transactions
2511        // run_with_range = None is normal operating conditions
2512        if config.run_with_range.is_none() {
2513            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2514        }
2515        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2516
2517        if let Some(transaction_orchestrator) = transaction_orchestrator {
2518            server.register_module(TransactionExecutionApi::new(
2519                state.clone(),
2520                transaction_orchestrator.clone(),
2521                metrics.clone(),
2522            ))?;
2523        }
2524
2525        let iota_names_config = config
2526            .iota_names_config
2527            .clone()
2528            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2529
2530        server.register_module(IndexerApi::new(
2531            state.clone(),
2532            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2533            kv_store,
2534            metrics,
2535            iota_names_config,
2536            config.indexer_max_subscriptions,
2537        ))?;
2538        server.register_module(MoveUtils::new(state.clone()))?;
2539
2540        let server_type = config.jsonrpc_server_type();
2541
2542        server.to_router(server_type).await?
2543    };
2544
2545    router = router.merge(json_rpc_router);
2546
2547    if config.enable_rest_api {
2548        let mut rest_service = iota_rest_api::RestService::new(
2549            Arc::new(RestReadStore::new(state.clone(), store)),
2550            software_version,
2551        );
2552
2553        if let Some(config) = config.rest.clone() {
2554            rest_service.with_config(config);
2555        }
2556
2557        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2558
2559        if let Some(transaction_orchestrator) = transaction_orchestrator {
2560            rest_service.with_executor(transaction_orchestrator.clone())
2561        }
2562
2563        router = router.merge(rest_service.into_router());
2564    }
2565
2566    // TODO: Remove this health check when experimental REST API becomes default
2567    // This is a copy of the health check in crates/iota-rest-api/src/health.rs
2568    router = router
2569        .route("/health", axum::routing::get(health_check_handler))
2570        .route_layer(axum::Extension(state));
2571
2572    let layers = ServiceBuilder::new()
2573        .map_request(|mut request: axum::http::Request<_>| {
2574            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2575                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2576                request.extensions_mut().insert(axum_connect_info);
2577            }
2578            request
2579        })
2580        .layer(axum::middleware::from_fn(server_timing_middleware));
2581
2582    router = router.layer(layers);
2583
2584    let handle = iota_http::Builder::new()
2585        .serve(&config.json_rpc_address, router)
2586        .map_err(|e| anyhow::anyhow!("{e}"))?;
2587    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2588
2589    Ok(Some(handle))
2590}
2591
2592#[derive(Debug, serde::Serialize, serde::Deserialize)]
2593pub struct Threshold {
2594    pub threshold_seconds: Option<u32>,
2595}
2596
2597async fn health_check_handler(
2598    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2599    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2600) -> impl axum::response::IntoResponse {
2601    if let Some(threshold_seconds) = threshold_seconds {
2602        // Attempt to get the latest checkpoint
2603        let summary = match state
2604            .get_checkpoint_store()
2605            .get_highest_executed_checkpoint()
2606        {
2607            Ok(Some(summary)) => summary,
2608            Ok(None) => {
2609                warn!("Highest executed checkpoint not found");
2610                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2611            }
2612            Err(err) => {
2613                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2614                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2615            }
2616        };
2617
2618        // Calculate the threshold time based on the provided threshold_seconds
2619        let latest_chain_time = summary.timestamp();
2620        let threshold =
2621            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2622
2623        // Check if the latest checkpoint is within the threshold
2624        if latest_chain_time < threshold {
2625            warn!(
2626                ?latest_chain_time,
2627                ?threshold,
2628                "failing health check due to checkpoint lag"
2629            );
2630            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2631        }
2632    }
2633    // if health endpoint is responding and no threshold is given, respond success
2634    (axum::http::StatusCode::OK, "up")
2635}
2636
2637#[cfg(not(test))]
2638fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2639    protocol_config.max_transactions_per_checkpoint() as usize
2640}
2641
2642#[cfg(test)]
2643fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2644    2
2645}