iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30    ConsensusConfig, NodeConfig,
31    node::{DBCheckpointConfig, RunWithRange},
32    node_config_metrics::NodeConfigMetrics,
33    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36    authority::{
37        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38        authority_per_epoch_store::AuthorityPerEpochStore,
39        authority_store_pruner::ObjectsCompactionFilter,
40        authority_store_tables::{
41            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42        },
43        backpressure::BackpressureManager,
44        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45    },
46    authority_aggregator::{
47        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48    },
49    authority_client::NetworkAuthorityClient,
50    authority_server::{ValidatorService, ValidatorServiceMetrics},
51    checkpoints::{
52        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53        SubmitCheckpointToConsensus,
54        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55    },
56    connection_monitor::ConnectionMonitor,
57    consensus_adapter::{
58        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59        ConsensusClient,
60    },
61    consensus_handler::ConsensusHandlerInitializer,
62    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64    db_checkpoint_handler::DBCheckpointHandler,
65    epoch::{
66        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68        reconfiguration::ReconfigurationInitiator,
69    },
70    execution_cache::build_execution_cache,
71    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72    jsonrpc_index::IndexStore,
73    module_cache_metrics::ResolverMetrics,
74    overload_monitor::overload_monitor,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78    storage::{GrpcReadStore, RocksDbStore},
79    traffic_controller::metrics::TrafficControllerMetrics,
80    transaction_orchestrator::TransactionOrchestrator,
81    validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87    transaction_builder_api::TransactionBuilderApi,
88    transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93    RegistryID, RegistryService,
94    hardware_metrics::register_hardware_metrics,
95    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96    server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
104use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
105use iota_snapshot::uploader::StateSnapshotUploader;
106use iota_storage::{
107    FileCompression, StorageFormat,
108    http_key_value_store::HttpKVStore,
109    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
110    key_value_store_metrics::KeyValueStoreMetrics,
111};
112use iota_types::{
113    base_types::{AuthorityName, ConciseableName, EpochId},
114    committee::Committee,
115    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
116    digests::ChainIdentifier,
117    error::{IotaError, IotaResult},
118    executable_transaction::VerifiedExecutableTransaction,
119    execution_config_utils::to_binary_config,
120    full_checkpoint_content::CheckpointData,
121    iota_system_state::{
122        IotaSystemState, IotaSystemStateTrait,
123        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
124    },
125    messages_consensus::{
126        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
127        SignedAuthorityCapabilitiesV1, check_total_jwk_size,
128    },
129    messages_grpc::HandleCapabilityNotificationRequestV1,
130    quorum_driver_types::QuorumDriverEffectsQueueResult,
131    supported_protocol_versions::SupportedProtocolVersions,
132    transaction::{Transaction, VerifiedCertificate},
133};
134use prometheus::Registry;
135#[cfg(msim)]
136pub use simulator::set_jwk_injector;
137#[cfg(msim)]
138use simulator::*;
139use tap::tap::TapFallible;
140use tokio::{
141    sync::{Mutex, broadcast, mpsc, watch},
142    task::{JoinHandle, JoinSet},
143};
144use tokio_util::sync::CancellationToken;
145use tower::ServiceBuilder;
146use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
147use typed_store::{
148    DBMetrics,
149    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
150};
151
152use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
153
154pub mod admin;
155mod handle;
156pub mod metrics;
157
158pub struct ValidatorComponents {
159    validator_server_handle: SpawnOnce,
160    validator_overload_monitor_handle: Option<JoinHandle<()>>,
161    consensus_manager: ConsensusManager,
162    consensus_store_pruner: ConsensusStorePruner,
163    consensus_adapter: Arc<ConsensusAdapter>,
164    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
165    checkpoint_service_tasks: JoinSet<()>,
166    checkpoint_metrics: Arc<CheckpointMetrics>,
167    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
168    validator_registry_id: RegistryID,
169}
170
171#[cfg(msim)]
172mod simulator {
173    use std::sync::atomic::AtomicBool;
174
175    use super::*;
176
177    pub(super) struct SimState {
178        pub sim_node: iota_simulator::runtime::NodeHandle,
179        pub sim_safe_mode_expected: AtomicBool,
180        _leak_detector: iota_simulator::NodeLeakDetector,
181    }
182
183    impl Default for SimState {
184        fn default() -> Self {
185            Self {
186                sim_node: iota_simulator::runtime::NodeHandle::current(),
187                sim_safe_mode_expected: AtomicBool::new(false),
188                _leak_detector: iota_simulator::NodeLeakDetector::new(),
189            }
190        }
191    }
192
193    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
194        + Send
195        + Sync
196        + 'static;
197
198    fn default_fetch_jwks(
199        _authority: AuthorityName,
200        _provider: &OIDCProvider,
201    ) -> IotaResult<Vec<(JwkId, JWK)>> {
202        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
203        // Just load a default Twitch jwk for testing.
204        parse_jwks(
205            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
206            &OIDCProvider::Twitch,
207        )
208        .map_err(|_| IotaError::JWKRetrieval)
209    }
210
211    thread_local! {
212        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
213    }
214
215    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
216        JWK_INJECTOR.with(|injector| injector.borrow().clone())
217    }
218
219    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
220        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
221    }
222}
223
224#[derive(Clone)]
225pub struct ServerVersion {
226    pub bin: &'static str,
227    pub version: &'static str,
228}
229
230impl ServerVersion {
231    pub fn new(bin: &'static str, version: &'static str) -> Self {
232        Self { bin, version }
233    }
234}
235
236impl std::fmt::Display for ServerVersion {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        f.write_str(self.bin)?;
239        f.write_str("/")?;
240        f.write_str(self.version)
241    }
242}
243
244pub struct IotaNode {
245    config: NodeConfig,
246    validator_components: Mutex<Option<ValidatorComponents>>,
247    /// The http server responsible for serving JSON-RPC
248    _http_server: Option<iota_http::ServerHandle>,
249    state: Arc<AuthorityState>,
250    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
251    registry_service: RegistryService,
252    metrics: Arc<IotaNodeMetrics>,
253
254    _discovery: discovery::Handle,
255    state_sync_handle: state_sync::Handle,
256    randomness_handle: randomness::Handle,
257    checkpoint_store: Arc<CheckpointStore>,
258    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
259    connection_monitor_status: Arc<ConnectionMonitorStatus>,
260
261    /// Broadcast channel to send the starting system state for the next epoch.
262    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
263
264    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
265    /// peers.
266    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
267
268    backpressure_manager: Arc<BackpressureManager>,
269
270    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
271
272    #[cfg(msim)]
273    sim_state: SimState,
274
275    _state_archive_handle: Option<broadcast::Sender<()>>,
276
277    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
278    // Channel to allow signaling upstream to shutdown iota-node
279    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
280
281    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
282    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
283
284    /// AuthorityAggregator of the network, created at start and beginning of
285    /// each epoch. Use ArcSwap so that we could mutate it without taking
286    /// mut reference.
287    // TODO: Eventually we can make this auth aggregator a shared reference so that this
288    // update will automatically propagate to other uses.
289    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
290}
291
292impl fmt::Debug for IotaNode {
293    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
294        f.debug_struct("IotaNode")
295            .field("name", &self.state.name.concise())
296            .finish()
297    }
298}
299
300static MAX_JWK_KEYS_PER_FETCH: usize = 100;
301
302impl IotaNode {
303    pub async fn start(
304        config: NodeConfig,
305        registry_service: RegistryService,
306    ) -> Result<Arc<IotaNode>> {
307        Self::start_async(
308            config,
309            registry_service,
310            ServerVersion::new("iota-node", "unknown"),
311        )
312        .await
313    }
314
315    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
316    /// configuration.
317    /// This function ensures continuous fetching, validation, and submission of
318    /// JWKs, maintaining up-to-date keys for the specified providers.
319    fn start_jwk_updater(
320        config: &NodeConfig,
321        metrics: Arc<IotaNodeMetrics>,
322        authority: AuthorityName,
323        epoch_store: Arc<AuthorityPerEpochStore>,
324        consensus_adapter: Arc<ConsensusAdapter>,
325    ) {
326        let epoch = epoch_store.epoch();
327
328        let supported_providers = config
329            .zklogin_oauth_providers
330            .get(&epoch_store.get_chain_identifier().chain())
331            .unwrap_or(&BTreeSet::new())
332            .iter()
333            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
334            .collect::<Vec<_>>();
335
336        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
337
338        info!(
339            ?fetch_interval,
340            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
341        );
342
343        fn validate_jwk(
344            metrics: &Arc<IotaNodeMetrics>,
345            provider: &OIDCProvider,
346            id: &JwkId,
347            jwk: &JWK,
348        ) -> bool {
349            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
350                warn!(
351                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
352                    id.iss, provider
353                );
354                metrics
355                    .invalid_jwks
356                    .with_label_values(&[&provider.to_string()])
357                    .inc();
358                return false;
359            };
360
361            if iss_provider != *provider {
362                warn!(
363                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
364                    id.iss, provider, iss_provider
365                );
366                metrics
367                    .invalid_jwks
368                    .with_label_values(&[&provider.to_string()])
369                    .inc();
370                return false;
371            }
372
373            if !check_total_jwk_size(id, jwk) {
374                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
375                metrics
376                    .invalid_jwks
377                    .with_label_values(&[&provider.to_string()])
378                    .inc();
379                return false;
380            }
381
382            true
383        }
384
385        // metrics is:
386        //  pub struct IotaNodeMetrics {
387        //      pub jwk_requests: IntCounterVec,
388        //      pub jwk_request_errors: IntCounterVec,
389        //      pub total_jwks: IntCounterVec,
390        //      pub unique_jwks: IntCounterVec,
391        //  }
392
393        for p in supported_providers.into_iter() {
394            let provider_str = p.to_string();
395            let epoch_store = epoch_store.clone();
396            let consensus_adapter = consensus_adapter.clone();
397            let metrics = metrics.clone();
398            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
399                async move {
400                    // note: restart-safe de-duplication happens after consensus, this is
401                    // just best-effort to reduce unneeded submissions.
402                    let mut seen = HashSet::new();
403                    loop {
404                        info!("fetching JWK for provider {:?}", p);
405                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
406                        match Self::fetch_jwks(authority, &p).await {
407                            Err(e) => {
408                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
409                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
410                                // Retry in 30 seconds
411                                tokio::time::sleep(Duration::from_secs(30)).await;
412                                continue;
413                            }
414                            Ok(mut keys) => {
415                                metrics.total_jwks
416                                    .with_label_values(&[&provider_str])
417                                    .inc_by(keys.len() as u64);
418
419                                keys.retain(|(id, jwk)| {
420                                    validate_jwk(&metrics, &p, id, jwk) &&
421                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
422                                    seen.insert((id.clone(), jwk.clone()))
423                                });
424
425                                metrics.unique_jwks
426                                    .with_label_values(&[&provider_str])
427                                    .inc_by(keys.len() as u64);
428
429                                // prevent oauth providers from sending too many keys,
430                                // inadvertently or otherwise
431                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
432                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
433                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
434                                }
435
436                                for (id, jwk) in keys.into_iter() {
437                                    info!("Submitting JWK to consensus: {:?}", id);
438
439                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
440                                    consensus_adapter.submit(txn, None, &epoch_store)
441                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
442                                        .ok();
443                                }
444                            }
445                        }
446                        tokio::time::sleep(fetch_interval).await;
447                    }
448                }
449                .instrument(error_span!("jwk_updater_task", epoch)),
450            ));
451        }
452    }
453
454    pub async fn start_async(
455        config: NodeConfig,
456        registry_service: RegistryService,
457        server_version: ServerVersion,
458    ) -> Result<Arc<IotaNode>> {
459        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
460        let mut config = config.clone();
461        if config.supported_protocol_versions.is_none() {
462            info!(
463                "populating config.supported_protocol_versions with default {:?}",
464                SupportedProtocolVersions::SYSTEM_DEFAULT
465            );
466            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
467        }
468
469        let run_with_range = config.run_with_range;
470        let is_validator = config.consensus_config().is_some();
471        let is_full_node = !is_validator;
472        let prometheus_registry = registry_service.default_registry();
473
474        info!(node =? config.authority_public_key(),
475            "Initializing iota-node listening on {}", config.network_address
476        );
477
478        let genesis = config.genesis()?.clone();
479
480        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
481        info!("IOTA chain identifier: {chain_identifier}");
482
483        // Check and set the db_corrupted flag
484        let db_corrupted_path = &config.db_path().join("status");
485        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
486            panic!("Failed to check database corruption: {err}");
487        }
488
489        // Initialize metrics to track db usage before creating any stores
490        DBMetrics::init(&prometheus_registry);
491
492        // Initialize IOTA metrics.
493        iota_metrics::init_metrics(&prometheus_registry);
494        // Unsupported (because of the use of static variable) and unnecessary in
495        // simtests.
496        #[cfg(not(msim))]
497        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
498
499        // Register hardware metrics.
500        register_hardware_metrics(&registry_service, &config.db_path)
501            .expect("Failed registering hardware metrics");
502        // Register uptime metric
503        prometheus_registry
504            .register(iota_metrics::uptime_metric(
505                if is_validator {
506                    "validator"
507                } else {
508                    "fullnode"
509                },
510                server_version.version,
511                &chain_identifier.to_string(),
512            ))
513            .expect("Failed registering uptime metric");
514
515        // If genesis come with some migration data then load them into memory from the
516        // file path specified in config.
517        let migration_tx_data = if genesis.contains_migrations() {
518            // Here the load already verifies that the content of the migration blob is
519            // valid in respect to the content found in genesis
520            Some(config.load_migration_tx_data()?)
521        } else {
522            None
523        };
524
525        let secret = Arc::pin(config.authority_key_pair().copy());
526        let genesis_committee = genesis.committee()?;
527        let committee_store = Arc::new(CommitteeStore::new(
528            config.db_path().join("epochs"),
529            &genesis_committee,
530            None,
531        ));
532
533        let mut pruner_db = None;
534        if config
535            .authority_store_pruning_config
536            .enable_compaction_filter
537        {
538            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
539                &config.db_path().join("store"),
540            )));
541        }
542        let compaction_filter = pruner_db
543            .clone()
544            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
545
546        // By default, only enable write stall on validators for perpetual db.
547        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
548        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
549            enable_write_stall,
550            compaction_filter,
551        };
552        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
553            &config.db_path().join("store"),
554            Some(perpetual_tables_options),
555        ));
556        let is_genesis = perpetual_tables
557            .database_is_empty()
558            .expect("Database read should not fail at init.");
559        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
560        let backpressure_manager =
561            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
562
563        let store = AuthorityStore::open(
564            perpetual_tables,
565            &genesis,
566            &config,
567            &prometheus_registry,
568            migration_tx_data.as_ref(),
569        )
570        .await?;
571
572        let cur_epoch = store.get_recovery_epoch_at_restart()?;
573        let committee = committee_store
574            .get_committee(&cur_epoch)?
575            .expect("Committee of the current epoch must exist");
576        let epoch_start_configuration = store
577            .get_epoch_start_configuration()?
578            .expect("EpochStartConfiguration of the current epoch must exist");
579        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
580        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
581
582        let cache_traits = build_execution_cache(
583            &config.execution_cache_config,
584            &epoch_start_configuration,
585            &prometheus_registry,
586            &store,
587            backpressure_manager.clone(),
588        );
589
590        let auth_agg = {
591            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
592            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
593            Arc::new(ArcSwap::new(Arc::new(
594                AuthorityAggregator::new_from_epoch_start_state(
595                    epoch_start_configuration.epoch_start_state(),
596                    &committee_store,
597                    safe_client_metrics_base,
598                    auth_agg_metrics,
599                ),
600            )))
601        };
602
603        let chain = match config.chain_override_for_testing {
604            Some(chain) => chain,
605            None => chain_identifier.chain(),
606        };
607
608        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
609        let epoch_store = AuthorityPerEpochStore::new(
610            config.authority_public_key(),
611            committee.clone(),
612            &config.db_path().join("store"),
613            Some(epoch_options.options),
614            EpochMetrics::new(&registry_service.default_registry()),
615            epoch_start_configuration,
616            cache_traits.backing_package_store.clone(),
617            cache_traits.object_store.clone(),
618            cache_metrics,
619            signature_verifier_metrics,
620            &config.expensive_safety_check_config,
621            (chain_identifier, chain),
622            checkpoint_store
623                .get_highest_executed_checkpoint_seq_number()
624                .expect("checkpoint store read cannot fail")
625                .unwrap_or(0),
626        )?;
627
628        info!("created epoch store");
629
630        replay_log!(
631            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632            epoch_store.epoch(),
633            epoch_store.protocol_config()
634        );
635
636        // the database is empty at genesis time
637        if is_genesis {
638            info!("checking IOTA conservation at genesis");
639            // When we are opening the db table, the only time when it's safe to
640            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
641            // an epoch and the IOTA conservation check will fail. This also initialize
642            // the expected_network_iota_amount table.
643            cache_traits
644                .reconfig_api
645                .try_expensive_check_iota_conservation(&epoch_store, None)
646                .expect("IOTA conservation check cannot fail at genesis");
647        }
648
649        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650        let default_buffer_stake = epoch_store
651            .protocol_config()
652            .buffer_stake_for_protocol_upgrade_bps();
653        if effective_buffer_stake != default_buffer_stake {
654            warn!(
655                ?effective_buffer_stake,
656                ?default_buffer_stake,
657                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658            );
659        }
660
661        checkpoint_store.insert_genesis_checkpoint(
662            genesis.checkpoint(),
663            genesis.checkpoint_contents().clone(),
664            &epoch_store,
665        );
666
667        // Database has everything from genesis, set corrupted key to 0
668        unmark_db_corruption(db_corrupted_path)?;
669
670        info!("creating state sync store");
671        let state_sync_store = RocksDbStore::new(
672            cache_traits.clone(),
673            committee_store.clone(),
674            checkpoint_store.clone(),
675        );
676
677        let index_store = if is_full_node && config.enable_index_processing {
678            info!("creating index store");
679            Some(Arc::new(IndexStore::new(
680                config.db_path().join("indexes"),
681                &prometheus_registry,
682                epoch_store
683                    .protocol_config()
684                    .max_move_identifier_len_as_option(),
685            )))
686        } else {
687            None
688        };
689
690        let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
691            // Migrate legacy directory names before opening the DB.
692            GrpcIndexesStore::migrate_legacy_dirs(&config.db_path());
693            Some(Arc::new(
694                GrpcIndexesStore::new(
695                    config.db_path().join(GRPC_INDEXES_DIR),
696                    Arc::clone(&store),
697                    &checkpoint_store,
698                )
699                .await,
700            ))
701        } else {
702            None
703        };
704
705        info!("creating archive reader");
706        // Create network
707        // TODO only configure validators as seed/preferred peers for validators and not
708        // for fullnodes once we've had a chance to re-work fullnode
709        // configuration generation.
710        let archive_readers =
711            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
712        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
713        let (randomness_tx, randomness_rx) = mpsc::channel(
714            config
715                .p2p_config
716                .randomness
717                .clone()
718                .unwrap_or_default()
719                .mailbox_capacity(),
720        );
721        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
722            Self::create_p2p_network(
723                &config,
724                state_sync_store.clone(),
725                chain_identifier,
726                trusted_peer_change_rx,
727                archive_readers.clone(),
728                randomness_tx,
729                &prometheus_registry,
730            )?;
731
732        // We must explicitly send this instead of relying on the initial value to
733        // trigger watch value change, so that state-sync is able to process it.
734        send_trusted_peer_change(
735            &config,
736            &trusted_peer_change_tx,
737            epoch_store.epoch_start_state(),
738        );
739
740        info!("start state archival");
741        // Start archiving local state to remote store
742        let state_archive_handle =
743            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
744                .await?;
745
746        info!("start snapshot upload");
747        // Start uploading state snapshot to remote store
748        let state_snapshot_handle =
749            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
750
751        // Start uploading db checkpoints to remote store
752        info!("start db checkpoint");
753        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
754            &config,
755            &prometheus_registry,
756            state_snapshot_handle.is_some(),
757        )?;
758
759        let mut genesis_objects = genesis.objects().to_vec();
760        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
761            genesis_objects.extend(migration_tx_data.get_objects());
762        }
763
764        let authority_name = config.authority_public_key();
765        let validator_tx_finalizer =
766            config
767                .enable_validator_tx_finalizer
768                .then_some(Arc::new(ValidatorTxFinalizer::new(
769                    auth_agg.clone(),
770                    authority_name,
771                    &prometheus_registry,
772                )));
773
774        info!("create authority state");
775        let state = AuthorityState::new(
776            authority_name,
777            secret,
778            config.supported_protocol_versions.unwrap(),
779            store.clone(),
780            cache_traits.clone(),
781            epoch_store.clone(),
782            committee_store.clone(),
783            index_store.clone(),
784            grpc_indexes_store,
785            checkpoint_store.clone(),
786            &prometheus_registry,
787            &genesis_objects,
788            &db_checkpoint_config,
789            config.clone(),
790            archive_readers,
791            validator_tx_finalizer,
792            chain_identifier,
793            pruner_db,
794        )
795        .await;
796
797        // ensure genesis and migration txs were executed
798        if epoch_store.epoch() == 0 {
799            let genesis_tx = &genesis.transaction();
800            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
801            // Execute genesis transaction
802            Self::execute_transaction_immediately_at_zero_epoch(
803                &state,
804                &epoch_store,
805                genesis_tx,
806                span,
807            )
808            .await;
809
810            // Execute migration transactions if present
811            if let Some(migration_tx_data) = migration_tx_data {
812                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
813                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
814                    Self::execute_transaction_immediately_at_zero_epoch(
815                        &state,
816                        &epoch_store,
817                        tx,
818                        span,
819                    )
820                    .await;
821                }
822            }
823        }
824
825        // Start the loop that receives new randomness and generates transactions for
826        // it.
827        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
828
829        if config
830            .expensive_safety_check_config
831            .enable_secondary_index_checks()
832        {
833            if let Some(indexes) = state.indexes.clone() {
834                iota_core::verify_indexes::verify_indexes(
835                    state.get_accumulator_store().as_ref(),
836                    indexes,
837                )
838                .expect("secondary indexes are inconsistent");
839            }
840        }
841
842        let (end_of_epoch_channel, end_of_epoch_receiver) =
843            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
844
845        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
846            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
847                auth_agg.load_full(),
848                state.clone(),
849                end_of_epoch_receiver,
850                &config.db_path(),
851                &prometheus_registry,
852            )))
853        } else {
854            None
855        };
856
857        let http_server = build_http_server(
858            state.clone(),
859            &transaction_orchestrator.clone(),
860            &config,
861            &prometheus_registry,
862        )
863        .await?;
864
865        let accumulator = Arc::new(StateAccumulator::new(
866            cache_traits.accumulator_store.clone(),
867            StateAccumulatorMetrics::new(&prometheus_registry),
868        ));
869
870        let authority_names_to_peer_ids = epoch_store
871            .epoch_start_state()
872            .get_authority_names_to_peer_ids();
873
874        let network_connection_metrics =
875            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
876
877        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
878
879        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
880            p2p_network.downgrade(),
881            network_connection_metrics,
882            HashMap::new(),
883            None,
884        );
885
886        let connection_monitor_status = ConnectionMonitorStatus {
887            connection_statuses,
888            authority_names_to_peer_ids,
889        };
890
891        let connection_monitor_status = Arc::new(connection_monitor_status);
892        let iota_node_metrics =
893            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
894
895        iota_node_metrics
896            .binary_max_protocol_version
897            .set(ProtocolVersion::MAX.as_u64() as i64);
898        iota_node_metrics
899            .configured_max_protocol_version
900            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
901
902        // Convert transaction orchestrator to executor trait object for gRPC server
903        // Note that the transaction_orchestrator (so as executor) will be None if it is
904        // a validator node or run_with_range is set
905        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
906            transaction_orchestrator
907                .clone()
908                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
909
910        let grpc_server_handle = build_grpc_server(
911            &config,
912            state.clone(),
913            state_sync_store.clone(),
914            executor,
915            &prometheus_registry,
916            server_version,
917        )
918        .await?;
919
920        let validator_components = if state.is_committee_validator(&epoch_store) {
921            let (components, _) = futures::join!(
922                Self::construct_validator_components(
923                    config.clone(),
924                    state.clone(),
925                    committee,
926                    epoch_store.clone(),
927                    checkpoint_store.clone(),
928                    state_sync_handle.clone(),
929                    randomness_handle.clone(),
930                    Arc::downgrade(&accumulator),
931                    backpressure_manager.clone(),
932                    connection_monitor_status.clone(),
933                    &registry_service,
934                    iota_node_metrics.clone(),
935                ),
936                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
937            );
938            let mut components = components?;
939
940            components.consensus_adapter.submit_recovered(&epoch_store);
941
942            // Start the gRPC server
943            components.validator_server_handle = components.validator_server_handle.start().await;
944
945            Some(components)
946        } else {
947            None
948        };
949
950        // setup shutdown channel
951        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
952
953        let node = Self {
954            config,
955            validator_components: Mutex::new(validator_components),
956            _http_server: http_server,
957            state,
958            transaction_orchestrator,
959            registry_service,
960            metrics: iota_node_metrics,
961
962            _discovery: discovery_handle,
963            state_sync_handle,
964            randomness_handle,
965            checkpoint_store,
966            accumulator: Mutex::new(Some(accumulator)),
967            end_of_epoch_channel,
968            connection_monitor_status,
969            trusted_peer_change_tx,
970            backpressure_manager,
971
972            _db_checkpoint_handle: db_checkpoint_handle,
973
974            #[cfg(msim)]
975            sim_state: Default::default(),
976
977            _state_archive_handle: state_archive_handle,
978            _state_snapshot_uploader_handle: state_snapshot_handle,
979            shutdown_channel_tx: shutdown_channel,
980
981            grpc_server_handle: Mutex::new(grpc_server_handle),
982
983            auth_agg,
984        };
985
986        info!("IotaNode started!");
987        let node = Arc::new(node);
988        let node_copy = node.clone();
989        spawn_monitored_task!(async move {
990            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
991            if let Err(error) = result {
992                warn!("Reconfiguration finished with error {:?}", error);
993            }
994        });
995
996        Ok(node)
997    }
998
999    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
1000        self.end_of_epoch_channel.subscribe()
1001    }
1002
1003    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1004        self.shutdown_channel_tx.subscribe()
1005    }
1006
1007    pub fn current_epoch_for_testing(&self) -> EpochId {
1008        self.state.current_epoch_for_testing()
1009    }
1010
1011    pub fn db_checkpoint_path(&self) -> PathBuf {
1012        self.config.db_checkpoint_path()
1013    }
1014
1015    // Init reconfig process by starting to reject user certs
1016    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
1017        info!("close_epoch (current epoch = {})", epoch_store.epoch());
1018        self.validator_components
1019            .lock()
1020            .await
1021            .as_ref()
1022            .ok_or_else(|| IotaError::from("Node is not a validator"))?
1023            .consensus_adapter
1024            .close_epoch(epoch_store);
1025        Ok(())
1026    }
1027
1028    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1029        self.state
1030            .clear_override_protocol_upgrade_buffer_stake(epoch)
1031    }
1032
1033    pub fn set_override_protocol_upgrade_buffer_stake(
1034        &self,
1035        epoch: EpochId,
1036        buffer_stake_bps: u64,
1037    ) -> IotaResult {
1038        self.state
1039            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1040    }
1041
1042    // Testing-only API to start epoch close process.
1043    // For production code, please use the non-testing version.
1044    pub async fn close_epoch_for_testing(&self) -> IotaResult {
1045        let epoch_store = self.state.epoch_store_for_testing();
1046        self.close_epoch(&epoch_store).await
1047    }
1048
1049    async fn start_state_archival(
1050        config: &NodeConfig,
1051        prometheus_registry: &Registry,
1052        state_sync_store: RocksDbStore,
1053    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1054        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1055            let local_store_config = ObjectStoreConfig {
1056                object_store: Some(ObjectStoreType::File),
1057                directory: Some(config.archive_path()),
1058                ..Default::default()
1059            };
1060            let archive_writer = ArchiveWriter::new(
1061                local_store_config,
1062                remote_store_config.clone(),
1063                FileCompression::Zstd,
1064                StorageFormat::Blob,
1065                Duration::from_secs(600),
1066                256 * 1024 * 1024,
1067                prometheus_registry,
1068            )
1069            .await?;
1070            Ok(Some(archive_writer.start(state_sync_store).await?))
1071        } else {
1072            Ok(None)
1073        }
1074    }
1075
1076    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
1077    /// is set.
1078    fn start_state_snapshot(
1079        config: &NodeConfig,
1080        prometheus_registry: &Registry,
1081        checkpoint_store: Arc<CheckpointStore>,
1082    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1083        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1084            let snapshot_uploader = StateSnapshotUploader::new(
1085                &config.db_checkpoint_path(),
1086                &config.snapshot_path(),
1087                remote_store_config.clone(),
1088                60,
1089                prometheus_registry,
1090                checkpoint_store,
1091            )?;
1092            Ok(Some(snapshot_uploader.start()))
1093        } else {
1094            Ok(None)
1095        }
1096    }
1097
1098    fn start_db_checkpoint(
1099        config: &NodeConfig,
1100        prometheus_registry: &Registry,
1101        state_snapshot_enabled: bool,
1102    ) -> Result<(
1103        DBCheckpointConfig,
1104        Option<tokio::sync::broadcast::Sender<()>>,
1105    )> {
1106        let checkpoint_path = Some(
1107            config
1108                .db_checkpoint_config
1109                .checkpoint_path
1110                .clone()
1111                .unwrap_or_else(|| config.db_checkpoint_path()),
1112        );
1113        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1114            DBCheckpointConfig {
1115                checkpoint_path,
1116                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1117                    true
1118                } else {
1119                    config
1120                        .db_checkpoint_config
1121                        .perform_db_checkpoints_at_epoch_end
1122                },
1123                ..config.db_checkpoint_config.clone()
1124            }
1125        } else {
1126            config.db_checkpoint_config.clone()
1127        };
1128
1129        match (
1130            db_checkpoint_config.object_store_config.as_ref(),
1131            state_snapshot_enabled,
1132        ) {
1133            // If db checkpoint config object store not specified but
1134            // state snapshot object store is specified, create handler
1135            // anyway for marking db checkpoints as completed so that they
1136            // can be uploaded as state snapshots.
1137            (None, false) => Ok((db_checkpoint_config, None)),
1138            (_, _) => {
1139                let handler = DBCheckpointHandler::new(
1140                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1141                    db_checkpoint_config.object_store_config.as_ref(),
1142                    60,
1143                    db_checkpoint_config
1144                        .prune_and_compact_before_upload
1145                        .unwrap_or(true),
1146                    config.authority_store_pruning_config.clone(),
1147                    prometheus_registry,
1148                    state_snapshot_enabled,
1149                )?;
1150                Ok((
1151                    db_checkpoint_config,
1152                    Some(DBCheckpointHandler::start(handler)),
1153                ))
1154            }
1155        }
1156    }
1157
1158    fn create_p2p_network(
1159        config: &NodeConfig,
1160        state_sync_store: RocksDbStore,
1161        chain_identifier: ChainIdentifier,
1162        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1163        archive_readers: ArchiveReaderBalancer,
1164        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1165        prometheus_registry: &Registry,
1166    ) -> Result<(
1167        Network,
1168        discovery::Handle,
1169        state_sync::Handle,
1170        randomness::Handle,
1171    )> {
1172        let (state_sync, state_sync_server) = state_sync::Builder::new()
1173            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1174            .store(state_sync_store)
1175            .archive_readers(archive_readers)
1176            .with_metrics(prometheus_registry)
1177            .build();
1178
1179        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1180            .config(config.p2p_config.clone())
1181            .build();
1182
1183        let (randomness, randomness_router) =
1184            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1185                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1186                .with_metrics(prometheus_registry)
1187                .build();
1188
1189        let p2p_network = {
1190            let routes = anemo::Router::new()
1191                .add_rpc_service(discovery_server)
1192                .add_rpc_service(state_sync_server);
1193            let routes = routes.merge(randomness_router);
1194
1195            let inbound_network_metrics =
1196                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1197            let outbound_network_metrics =
1198                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1199
1200            let service = ServiceBuilder::new()
1201                .layer(
1202                    TraceLayer::new_for_server_errors()
1203                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1204                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1205                )
1206                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1207                    Arc::new(inbound_network_metrics),
1208                    config.p2p_config.excessive_message_size(),
1209                )))
1210                .service(routes);
1211
1212            let outbound_layer = ServiceBuilder::new()
1213                .layer(
1214                    TraceLayer::new_for_client_and_server_errors()
1215                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1216                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1217                )
1218                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1219                    Arc::new(outbound_network_metrics),
1220                    config.p2p_config.excessive_message_size(),
1221                )))
1222                .into_inner();
1223
1224            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1225            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1226            // many staking events in the epoch change txn.
1227            anemo_config.max_frame_size = Some(1 << 30);
1228
1229            // Set a higher default value for socket send/receive buffers if not already
1230            // configured.
1231            let mut quic_config = anemo_config.quic.unwrap_or_default();
1232            if quic_config.socket_send_buffer_size.is_none() {
1233                quic_config.socket_send_buffer_size = Some(20 << 20);
1234            }
1235            if quic_config.socket_receive_buffer_size.is_none() {
1236                quic_config.socket_receive_buffer_size = Some(20 << 20);
1237            }
1238            quic_config.allow_failed_socket_buffer_size_setting = true;
1239
1240            // Set high-performance defaults for quinn transport.
1241            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1242            if quic_config.max_concurrent_bidi_streams.is_none() {
1243                quic_config.max_concurrent_bidi_streams = Some(500);
1244            }
1245            if quic_config.max_concurrent_uni_streams.is_none() {
1246                quic_config.max_concurrent_uni_streams = Some(500);
1247            }
1248            if quic_config.stream_receive_window.is_none() {
1249                quic_config.stream_receive_window = Some(100 << 20);
1250            }
1251            if quic_config.receive_window.is_none() {
1252                quic_config.receive_window = Some(200 << 20);
1253            }
1254            if quic_config.send_window.is_none() {
1255                quic_config.send_window = Some(200 << 20);
1256            }
1257            if quic_config.crypto_buffer_size.is_none() {
1258                quic_config.crypto_buffer_size = Some(1 << 20);
1259            }
1260            if quic_config.max_idle_timeout_ms.is_none() {
1261                quic_config.max_idle_timeout_ms = Some(30_000);
1262            }
1263            if quic_config.keep_alive_interval_ms.is_none() {
1264                quic_config.keep_alive_interval_ms = Some(5_000);
1265            }
1266            anemo_config.quic = Some(quic_config);
1267
1268            let server_name = format!("iota-{chain_identifier}");
1269            let network = Network::bind(config.p2p_config.listen_address)
1270                .server_name(&server_name)
1271                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1272                .config(anemo_config)
1273                .outbound_request_layer(outbound_layer)
1274                .start(service)?;
1275            info!(
1276                server_name = server_name,
1277                "P2p network started on {}",
1278                network.local_addr()
1279            );
1280
1281            network
1282        };
1283
1284        let discovery_handle =
1285            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1286        let state_sync_handle = state_sync.start(p2p_network.clone());
1287        let randomness_handle = randomness.start(p2p_network.clone());
1288
1289        Ok((
1290            p2p_network,
1291            discovery_handle,
1292            state_sync_handle,
1293            randomness_handle,
1294        ))
1295    }
1296
1297    /// Asynchronously constructs and initializes the components necessary for
1298    /// the validator node.
1299    async fn construct_validator_components(
1300        config: NodeConfig,
1301        state: Arc<AuthorityState>,
1302        committee: Arc<Committee>,
1303        epoch_store: Arc<AuthorityPerEpochStore>,
1304        checkpoint_store: Arc<CheckpointStore>,
1305        state_sync_handle: state_sync::Handle,
1306        randomness_handle: randomness::Handle,
1307        accumulator: Weak<StateAccumulator>,
1308        backpressure_manager: Arc<BackpressureManager>,
1309        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1310        registry_service: &RegistryService,
1311        iota_node_metrics: Arc<IotaNodeMetrics>,
1312    ) -> Result<ValidatorComponents> {
1313        let mut config_clone = config.clone();
1314        let consensus_config = config_clone
1315            .consensus_config
1316            .as_mut()
1317            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1318        let validator_registry = Registry::new();
1319        let validator_registry_id = registry_service.add(validator_registry.clone());
1320
1321        let client = Arc::new(UpdatableConsensusClient::new());
1322        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1323            &committee,
1324            consensus_config,
1325            state.name,
1326            connection_monitor_status.clone(),
1327            &validator_registry,
1328            client.clone(),
1329            checkpoint_store.clone(),
1330        ));
1331        let consensus_manager = ConsensusManager::new(
1332            &config,
1333            consensus_config,
1334            registry_service,
1335            &validator_registry,
1336            client,
1337        );
1338
1339        // This only gets started up once, not on every epoch. (Make call to remove
1340        // every epoch.)
1341        let consensus_store_pruner = ConsensusStorePruner::new(
1342            consensus_manager.get_storage_base_path(),
1343            consensus_config.db_retention_epochs(),
1344            consensus_config.db_pruner_period(),
1345            &validator_registry,
1346        );
1347
1348        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1349        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1350
1351        let validator_server_handle = Self::start_grpc_validator_service(
1352            &config,
1353            state.clone(),
1354            consensus_adapter.clone(),
1355            &validator_registry,
1356        )
1357        .await?;
1358
1359        // Starts an overload monitor that monitors the execution of the authority.
1360        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1361        let validator_overload_monitor_handle = if config
1362            .authority_overload_config
1363            .max_load_shedding_percentage
1364            > 0
1365        {
1366            let authority_state = Arc::downgrade(&state);
1367            let overload_config = config.authority_overload_config.clone();
1368            fail_point!("starting_overload_monitor");
1369            Some(spawn_monitored_task!(overload_monitor(
1370                authority_state,
1371                overload_config,
1372            )))
1373        } else {
1374            None
1375        };
1376
1377        Self::start_epoch_specific_validator_components(
1378            &config,
1379            state.clone(),
1380            consensus_adapter,
1381            checkpoint_store,
1382            epoch_store,
1383            state_sync_handle,
1384            randomness_handle,
1385            consensus_manager,
1386            consensus_store_pruner,
1387            accumulator,
1388            backpressure_manager,
1389            validator_server_handle,
1390            validator_overload_monitor_handle,
1391            checkpoint_metrics,
1392            iota_node_metrics,
1393            iota_tx_validator_metrics,
1394            validator_registry_id,
1395        )
1396        .await
1397    }
1398
1399    /// Initializes and starts components specific to the current
1400    /// epoch for the validator node.
1401    async fn start_epoch_specific_validator_components(
1402        config: &NodeConfig,
1403        state: Arc<AuthorityState>,
1404        consensus_adapter: Arc<ConsensusAdapter>,
1405        checkpoint_store: Arc<CheckpointStore>,
1406        epoch_store: Arc<AuthorityPerEpochStore>,
1407        state_sync_handle: state_sync::Handle,
1408        randomness_handle: randomness::Handle,
1409        consensus_manager: ConsensusManager,
1410        consensus_store_pruner: ConsensusStorePruner,
1411        accumulator: Weak<StateAccumulator>,
1412        backpressure_manager: Arc<BackpressureManager>,
1413        validator_server_handle: SpawnOnce,
1414        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1415        checkpoint_metrics: Arc<CheckpointMetrics>,
1416        iota_node_metrics: Arc<IotaNodeMetrics>,
1417        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1418        validator_registry_id: RegistryID,
1419    ) -> Result<ValidatorComponents> {
1420        let checkpoint_service = Self::build_checkpoint_service(
1421            config,
1422            consensus_adapter.clone(),
1423            checkpoint_store.clone(),
1424            epoch_store.clone(),
1425            state.clone(),
1426            state_sync_handle,
1427            accumulator,
1428            checkpoint_metrics.clone(),
1429        );
1430
1431        // create a new map that gets injected into both the consensus handler and the
1432        // consensus adapter the consensus handler will write values forwarded
1433        // from consensus, and the consensus adapter will read the values to
1434        // make decisions about which validator submits a transaction to consensus
1435        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1436
1437        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1438
1439        let randomness_manager = RandomnessManager::try_new(
1440            Arc::downgrade(&epoch_store),
1441            Box::new(consensus_adapter.clone()),
1442            randomness_handle,
1443            config.authority_key_pair(),
1444        )
1445        .await;
1446        if let Some(randomness_manager) = randomness_manager {
1447            epoch_store
1448                .set_randomness_manager(randomness_manager)
1449                .await?;
1450        }
1451
1452        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1453            state.clone(),
1454            checkpoint_service.clone(),
1455            epoch_store.clone(),
1456            low_scoring_authorities,
1457            backpressure_manager,
1458        );
1459
1460        info!("Starting consensus manager");
1461
1462        consensus_manager
1463            .start(
1464                config,
1465                epoch_store.clone(),
1466                consensus_handler_initializer,
1467                IotaTxValidator::new(
1468                    epoch_store.clone(),
1469                    checkpoint_service.clone(),
1470                    state.transaction_manager().clone(),
1471                    iota_tx_validator_metrics.clone(),
1472                ),
1473            )
1474            .await;
1475
1476        info!("Spawning checkpoint service");
1477        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1478
1479        if epoch_store.authenticator_state_enabled() {
1480            Self::start_jwk_updater(
1481                config,
1482                iota_node_metrics,
1483                state.name,
1484                epoch_store.clone(),
1485                consensus_adapter.clone(),
1486            );
1487        }
1488
1489        Ok(ValidatorComponents {
1490            validator_server_handle,
1491            validator_overload_monitor_handle,
1492            consensus_manager,
1493            consensus_store_pruner,
1494            consensus_adapter,
1495            checkpoint_service_tasks,
1496            checkpoint_metrics,
1497            iota_tx_validator_metrics,
1498            validator_registry_id,
1499        })
1500    }
1501
1502    /// Starts the checkpoint service for the validator node, initializing
1503    /// necessary components and settings.
1504    /// The function ensures proper initialization of the checkpoint service,
1505    /// preparing it to handle checkpoint creation and submission to consensus,
1506    /// while also setting up the necessary monitoring and synchronization
1507    /// mechanisms.
1508    fn build_checkpoint_service(
1509        config: &NodeConfig,
1510        consensus_adapter: Arc<ConsensusAdapter>,
1511        checkpoint_store: Arc<CheckpointStore>,
1512        epoch_store: Arc<AuthorityPerEpochStore>,
1513        state: Arc<AuthorityState>,
1514        state_sync_handle: state_sync::Handle,
1515        accumulator: Weak<StateAccumulator>,
1516        checkpoint_metrics: Arc<CheckpointMetrics>,
1517    ) -> Arc<CheckpointService> {
1518        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1519        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1520
1521        debug!(
1522            "Starting checkpoint service with epoch start timestamp {}
1523            and epoch duration {}",
1524            epoch_start_timestamp_ms, epoch_duration_ms
1525        );
1526
1527        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1528            sender: consensus_adapter,
1529            signer: state.secret.clone(),
1530            authority: config.authority_public_key(),
1531            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1532                .checked_add(epoch_duration_ms)
1533                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1534            metrics: checkpoint_metrics.clone(),
1535        });
1536
1537        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1538        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1539        let max_checkpoint_size_bytes =
1540            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1541
1542        CheckpointService::build(
1543            state.clone(),
1544            checkpoint_store,
1545            epoch_store,
1546            state.get_transaction_cache_reader().clone(),
1547            accumulator,
1548            checkpoint_output,
1549            Box::new(certified_checkpoint_output),
1550            checkpoint_metrics,
1551            max_tx_per_checkpoint,
1552            max_checkpoint_size_bytes,
1553        )
1554    }
1555
1556    fn construct_consensus_adapter(
1557        committee: &Committee,
1558        consensus_config: &ConsensusConfig,
1559        authority: AuthorityName,
1560        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1561        prometheus_registry: &Registry,
1562        consensus_client: Arc<dyn ConsensusClient>,
1563        checkpoint_store: Arc<CheckpointStore>,
1564    ) -> ConsensusAdapter {
1565        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1566        // The consensus adapter allows the authority to send user certificates through
1567        // consensus.
1568
1569        ConsensusAdapter::new(
1570            consensus_client,
1571            checkpoint_store,
1572            authority,
1573            connection_monitor_status,
1574            consensus_config.max_pending_transactions(),
1575            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1576            consensus_config.max_submit_position,
1577            consensus_config.submit_delay_step_override(),
1578            ca_metrics,
1579        )
1580    }
1581
1582    async fn start_grpc_validator_service(
1583        config: &NodeConfig,
1584        state: Arc<AuthorityState>,
1585        consensus_adapter: Arc<ConsensusAdapter>,
1586        prometheus_registry: &Registry,
1587    ) -> Result<SpawnOnce> {
1588        let validator_service = ValidatorService::new(
1589            state,
1590            consensus_adapter,
1591            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1592            TrafficControllerMetrics::new(prometheus_registry),
1593            config.policy_config.clone(),
1594            config.firewall_config.clone(),
1595        );
1596
1597        let mut server_conf = iota_network_stack::config::Config::new();
1598        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1599        server_conf.load_shed = config.grpc_load_shed;
1600        let server_builder =
1601            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1602                .add_service(ValidatorServer::new(validator_service));
1603
1604        let tls_config = iota_tls::create_rustls_server_config(
1605            config.network_key_pair().copy().private(),
1606            IOTA_TLS_SERVER_NAME.to_string(),
1607        );
1608
1609        let network_address = config.network_address().clone();
1610
1611        let bind_future = async move {
1612            let server = server_builder
1613                .bind(&network_address, Some(tls_config))
1614                .await
1615                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1616
1617            let local_addr = server.local_addr();
1618            info!("Listening to traffic on {local_addr}");
1619
1620            Ok(server)
1621        };
1622
1623        Ok(SpawnOnce::new(bind_future))
1624    }
1625
1626    /// Re-executes pending consensus certificates, which may not have been
1627    /// committed to disk before the node restarted. This is necessary for
1628    /// the following reasons:
1629    ///
1630    /// 1. For any transaction for which we returned signed effects to a client,
1631    ///    we must ensure that we have re-executed the transaction before we
1632    ///    begin accepting grpc requests. Otherwise we would appear to have
1633    ///    forgotten about the transaction.
1634    /// 2. While this is running, we are concurrently waiting for all previously
1635    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1636    ///    either direction (from checkpointed consensus transactions to pending
1637    ///    consensus transactions, or vice versa), we must re-execute pending
1638    ///    consensus transactions to ensure that both processes can complete.
1639    /// 3. Also note that for any pending consensus transactions for which we
1640    ///    wrote a signed effects digest to disk, we must re-execute using that
1641    ///    digest as the expected effects digest, to ensure that we cannot
1642    ///    arrive at different effects than what we previously signed.
1643    async fn reexecute_pending_consensus_certs(
1644        epoch_store: &Arc<AuthorityPerEpochStore>,
1645        state: &Arc<AuthorityState>,
1646    ) {
1647        let mut pending_consensus_certificates = Vec::new();
1648        let mut additional_certs = Vec::new();
1649
1650        for tx in epoch_store.get_all_pending_consensus_transactions() {
1651            match tx.kind {
1652                // Shared object txns cannot be re-executed at this point, because we must wait for
1653                // consensus replay to assign shared object versions.
1654                ConsensusTransactionKind::CertifiedTransaction(tx)
1655                    if !tx.contains_shared_object() =>
1656                {
1657                    let tx = *tx;
1658                    // new_unchecked is safe because we never submit a transaction to consensus
1659                    // without verifying it
1660                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1661                        VerifiedCertificate::new_unchecked(tx),
1662                    );
1663                    // we only need to re-execute if we previously signed the effects (which
1664                    // indicates we returned the effects to a client).
1665                    if let Some(fx_digest) = epoch_store
1666                        .get_signed_effects_digest(tx.digest())
1667                        .expect("db error")
1668                    {
1669                        pending_consensus_certificates.push((tx, fx_digest));
1670                    } else {
1671                        additional_certs.push(tx);
1672                    }
1673                }
1674                _ => (),
1675            }
1676        }
1677
1678        let digests = pending_consensus_certificates
1679            .iter()
1680            .map(|(tx, _)| *tx.digest())
1681            .collect::<Vec<_>>();
1682
1683        info!(
1684            "reexecuting {} pending consensus certificates: {:?}",
1685            digests.len(),
1686            digests
1687        );
1688
1689        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1690        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1691
1692        // If this times out, the validator will still almost certainly start up fine.
1693        // But, it is possible that it may temporarily "forget" about
1694        // transactions that it had previously executed. This could confuse
1695        // clients in some circumstances. However, the transactions are still in
1696        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1697        let timeout = if cfg!(msim) { 120 } else { 60 };
1698        if tokio::time::timeout(
1699            std::time::Duration::from_secs(timeout),
1700            state
1701                .get_transaction_cache_reader()
1702                .try_notify_read_executed_effects_digests(&digests),
1703        )
1704        .await
1705        .is_err()
1706        {
1707            // Log all the digests that were not executed to help debugging.
1708            if let Ok(executed_effects_digests) = state
1709                .get_transaction_cache_reader()
1710                .try_multi_get_executed_effects_digests(&digests)
1711            {
1712                let pending_digests = digests
1713                    .iter()
1714                    .zip(executed_effects_digests.iter())
1715                    .filter_map(|(digest, executed_effects_digest)| {
1716                        if executed_effects_digest.is_none() {
1717                            Some(digest)
1718                        } else {
1719                            None
1720                        }
1721                    })
1722                    .collect::<Vec<_>>();
1723                debug_fatal!(
1724                    "Timed out waiting for effects digests to be executed: {:?}",
1725                    pending_digests
1726                );
1727            } else {
1728                debug_fatal!(
1729                    "Timed out waiting for effects digests to be executed, digests not found"
1730                );
1731            }
1732        }
1733    }
1734
1735    pub fn state(&self) -> Arc<AuthorityState> {
1736        self.state.clone()
1737    }
1738
1739    // Only used for testing because of how epoch store is loaded.
1740    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1741        self.state.reference_gas_price_for_testing()
1742    }
1743
1744    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1745        self.state.committee_store().clone()
1746    }
1747
1748    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1749    // self.state.db()
1750    // }
1751
1752    /// Clone an AuthorityAggregator currently used in this node's
1753    /// QuorumDriver, if the node is a fullnode. After reconfig,
1754    /// QuorumDriver builds a new AuthorityAggregator. The caller
1755    /// of this function will mostly likely want to call this again
1756    /// to get a fresh one.
1757    pub fn clone_authority_aggregator(
1758        &self,
1759    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1760        self.transaction_orchestrator
1761            .as_ref()
1762            .map(|to| to.clone_authority_aggregator())
1763    }
1764
1765    pub fn transaction_orchestrator(
1766        &self,
1767    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1768        self.transaction_orchestrator.clone()
1769    }
1770
1771    pub fn subscribe_to_transaction_orchestrator_effects(
1772        &self,
1773    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1774        self.transaction_orchestrator
1775            .as_ref()
1776            .map(|to| to.subscribe_to_effects_queue())
1777            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1778    }
1779
1780    /// This function awaits the completion of checkpoint execution of the
1781    /// current epoch, after which it initiates reconfiguration of the
1782    /// entire system. This function also handles role changes for the node when
1783    /// epoch changes and advertises capabilities to the committee if the node
1784    /// is a validator.
1785    pub async fn monitor_reconfiguration(
1786        self: Arc<Self>,
1787        mut epoch_store: Arc<AuthorityPerEpochStore>,
1788    ) -> Result<()> {
1789        let checkpoint_executor_metrics =
1790            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1791
1792        loop {
1793            let mut accumulator_guard = self.accumulator.lock().await;
1794            let accumulator = accumulator_guard.take().unwrap();
1795            info!(
1796                "Creating checkpoint executor for epoch {}",
1797                epoch_store.epoch()
1798            );
1799
1800            // Create closures that handle gRPC type conversion
1801            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1802                guard.as_ref().map(|handle| {
1803                    let tx = handle.checkpoint_data_broadcaster().clone();
1804                    Box::new(move |data: &CheckpointData| {
1805                        tx.send_traced(data);
1806                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1807                })
1808            } else {
1809                None
1810            };
1811
1812            let checkpoint_executor = CheckpointExecutor::new(
1813                epoch_store.clone(),
1814                self.checkpoint_store.clone(),
1815                self.state.clone(),
1816                accumulator.clone(),
1817                self.backpressure_manager.clone(),
1818                self.config.checkpoint_executor_config.clone(),
1819                checkpoint_executor_metrics.clone(),
1820                data_sender,
1821            );
1822
1823            let run_with_range = self.config.run_with_range;
1824
1825            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1826
1827            // Update the current protocol version metric.
1828            self.metrics
1829                .current_protocol_version
1830                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1831
1832            // Advertise capabilities to committee, if we are a validator.
1833            if let Some(components) = &*self.validator_components.lock().await {
1834                // TODO: without this sleep, the consensus message is not delivered reliably.
1835                tokio::time::sleep(Duration::from_millis(1)).await;
1836
1837                let config = cur_epoch_store.protocol_config();
1838                let binary_config = to_binary_config(config);
1839                let transaction = ConsensusTransaction::new_capability_notification_v1(
1840                    AuthorityCapabilitiesV1::new(
1841                        self.state.name,
1842                        cur_epoch_store.get_chain_identifier().chain(),
1843                        self.config
1844                            .supported_protocol_versions
1845                            .expect("Supported versions should be populated")
1846                            // no need to send digests of versions less than the current version
1847                            .truncate_below(config.version),
1848                        self.state
1849                            .get_available_system_packages(&binary_config)
1850                            .await,
1851                    ),
1852                );
1853                info!(?transaction, "submitting capabilities to consensus");
1854                components
1855                    .consensus_adapter
1856                    .submit(transaction, None, &cur_epoch_store)?;
1857            } else if self.state.is_active_validator(&cur_epoch_store)
1858                && cur_epoch_store
1859                    .protocol_config()
1860                    .track_non_committee_eligible_validators()
1861            {
1862                // Send signed capabilities to committee validators if we are a non-committee
1863                // validator in a separate task to not block the caller. Sending is done only if
1864                // the feature flag supporting it is enabled.
1865                let epoch_store = cur_epoch_store.clone();
1866                let node_clone = self.clone();
1867                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1868                    node_clone
1869                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1870                        .instrument(trace_span!(
1871                            "send_signed_capability_notification_to_committee_with_retry"
1872                        ))
1873                        .await;
1874                }));
1875            }
1876
1877            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1878
1879            if stop_condition == StopReason::RunWithRangeCondition {
1880                IotaNode::shutdown(&self).await;
1881                self.shutdown_channel_tx
1882                    .send(run_with_range)
1883                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1884                return Ok(());
1885            }
1886
1887            // Safe to call because we are in the middle of reconfiguration.
1888            let latest_system_state = self
1889                .state
1890                .get_object_cache_reader()
1891                .try_get_iota_system_state_object_unsafe()
1892                .expect("Read IOTA System State object cannot fail");
1893
1894            #[cfg(msim)]
1895            if !self
1896                .sim_state
1897                .sim_safe_mode_expected
1898                .load(Ordering::Relaxed)
1899            {
1900                debug_assert!(!latest_system_state.safe_mode());
1901            }
1902
1903            #[cfg(not(msim))]
1904            debug_assert!(!latest_system_state.safe_mode());
1905
1906            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1907                if self.state.is_fullnode(&cur_epoch_store) {
1908                    warn!(
1909                        "Failed to send end of epoch notification to subscriber: {:?}",
1910                        err
1911                    );
1912                }
1913            }
1914
1915            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1916            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1917
1918            self.auth_agg.store(Arc::new(
1919                self.auth_agg
1920                    .load()
1921                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1922            ));
1923
1924            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1925            let next_epoch = next_epoch_committee.epoch();
1926            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1927
1928            info!(
1929                next_epoch,
1930                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1931            );
1932
1933            fail_point_async!("reconfig_delay");
1934
1935            // We save the connection monitor status map regardless of validator / fullnode
1936            // status so that we don't need to restart the connection monitor
1937            // every epoch. Update the mappings that will be used by the
1938            // consensus adapter if it exists or is about to be created.
1939            let authority_names_to_peer_ids =
1940                new_epoch_start_state.get_authority_names_to_peer_ids();
1941            self.connection_monitor_status
1942                .update_mapping_for_epoch(authority_names_to_peer_ids);
1943
1944            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1945
1946            send_trusted_peer_change(
1947                &self.config,
1948                &self.trusted_peer_change_tx,
1949                &new_epoch_start_state,
1950            );
1951
1952            let mut validator_components_lock_guard = self.validator_components.lock().await;
1953
1954            // The following code handles 4 different cases, depending on whether the node
1955            // was a validator in the previous epoch, and whether the node is a validator
1956            // in the new epoch.
1957            let new_epoch_store = self
1958                .reconfigure_state(
1959                    &self.state,
1960                    &cur_epoch_store,
1961                    next_epoch_committee.clone(),
1962                    new_epoch_start_state,
1963                    accumulator.clone(),
1964                )
1965                .await?;
1966
1967            let new_validator_components = if let Some(ValidatorComponents {
1968                validator_server_handle,
1969                validator_overload_monitor_handle,
1970                consensus_manager,
1971                consensus_store_pruner,
1972                consensus_adapter,
1973                mut checkpoint_service_tasks,
1974                checkpoint_metrics,
1975                iota_tx_validator_metrics,
1976                validator_registry_id,
1977            }) = validator_components_lock_guard.take()
1978            {
1979                info!("Reconfiguring the validator.");
1980                // Cancel the old checkpoint service tasks.
1981                // Waiting for checkpoint builder to finish gracefully is not possible, because
1982                // it may wait on transactions while consensus on peers have
1983                // already shut down.
1984                checkpoint_service_tasks.abort_all();
1985                while let Some(result) = checkpoint_service_tasks.join_next().await {
1986                    if let Err(err) = result {
1987                        if err.is_panic() {
1988                            std::panic::resume_unwind(err.into_panic());
1989                        }
1990                        warn!("Error in checkpoint service task: {:?}", err);
1991                    }
1992                }
1993                info!("Checkpoint service has shut down.");
1994
1995                consensus_manager.shutdown().await;
1996                info!("Consensus has shut down.");
1997
1998                info!("Epoch store finished reconfiguration.");
1999
2000                // No other components should be holding a strong reference to state accumulator
2001                // at this point. Confirm here before we swap in the new accumulator.
2002                let accumulator_metrics = Arc::into_inner(accumulator)
2003                    .expect("Accumulator should have no other references at this point")
2004                    .metrics();
2005                let new_accumulator = Arc::new(StateAccumulator::new(
2006                    self.state.get_accumulator_store().clone(),
2007                    accumulator_metrics,
2008                ));
2009                let weak_accumulator = Arc::downgrade(&new_accumulator);
2010                *accumulator_guard = Some(new_accumulator);
2011
2012                consensus_store_pruner.prune(next_epoch).await;
2013
2014                if self.state.is_committee_validator(&new_epoch_store) {
2015                    // Only restart consensus if this node is still a validator in the new epoch.
2016                    Some(
2017                        Self::start_epoch_specific_validator_components(
2018                            &self.config,
2019                            self.state.clone(),
2020                            consensus_adapter,
2021                            self.checkpoint_store.clone(),
2022                            new_epoch_store.clone(),
2023                            self.state_sync_handle.clone(),
2024                            self.randomness_handle.clone(),
2025                            consensus_manager,
2026                            consensus_store_pruner,
2027                            weak_accumulator,
2028                            self.backpressure_manager.clone(),
2029                            validator_server_handle,
2030                            validator_overload_monitor_handle,
2031                            checkpoint_metrics,
2032                            self.metrics.clone(),
2033                            iota_tx_validator_metrics,
2034                            validator_registry_id,
2035                        )
2036                        .await?,
2037                    )
2038                } else {
2039                    info!("This node is no longer a validator after reconfiguration");
2040                    if self.registry_service.remove(validator_registry_id) {
2041                        debug!("Removed validator metrics registry");
2042                    } else {
2043                        warn!("Failed to remove validator metrics registry");
2044                    }
2045                    validator_server_handle.shutdown();
2046                    debug!("Validator grpc server shutdown triggered");
2047
2048                    None
2049                }
2050            } else {
2051                // No other components should be holding a strong reference to state accumulator
2052                // at this point. Confirm here before we swap in the new accumulator.
2053                let accumulator_metrics = Arc::into_inner(accumulator)
2054                    .expect("Accumulator should have no other references at this point")
2055                    .metrics();
2056                let new_accumulator = Arc::new(StateAccumulator::new(
2057                    self.state.get_accumulator_store().clone(),
2058                    accumulator_metrics,
2059                ));
2060                let weak_accumulator = Arc::downgrade(&new_accumulator);
2061                *accumulator_guard = Some(new_accumulator);
2062
2063                if self.state.is_committee_validator(&new_epoch_store) {
2064                    info!("Promoting the node from fullnode to validator, starting grpc server");
2065
2066                    let mut components = Self::construct_validator_components(
2067                        self.config.clone(),
2068                        self.state.clone(),
2069                        Arc::new(next_epoch_committee.clone()),
2070                        new_epoch_store.clone(),
2071                        self.checkpoint_store.clone(),
2072                        self.state_sync_handle.clone(),
2073                        self.randomness_handle.clone(),
2074                        weak_accumulator,
2075                        self.backpressure_manager.clone(),
2076                        self.connection_monitor_status.clone(),
2077                        &self.registry_service,
2078                        self.metrics.clone(),
2079                    )
2080                    .await?;
2081
2082                    components.validator_server_handle =
2083                        components.validator_server_handle.start().await;
2084
2085                    Some(components)
2086                } else {
2087                    None
2088                }
2089            };
2090            *validator_components_lock_guard = new_validator_components;
2091
2092            // Force releasing current epoch store DB handle, because the
2093            // Arc<AuthorityPerEpochStore> may linger.
2094            cur_epoch_store.release_db_handles();
2095
2096            if cfg!(msim)
2097                && !matches!(
2098                    self.config
2099                        .authority_store_pruning_config
2100                        .num_epochs_to_retain_for_checkpoints(),
2101                    None | Some(u64::MAX) | Some(0)
2102                )
2103            {
2104                self.state
2105                    .prune_checkpoints_for_eligible_epochs_for_testing(
2106                        self.config.clone(),
2107                        iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2108                    )
2109                    .await?;
2110            }
2111
2112            epoch_store = new_epoch_store;
2113            info!("Reconfiguration finished");
2114        }
2115    }
2116
2117    async fn shutdown(&self) {
2118        if let Some(validator_components) = &*self.validator_components.lock().await {
2119            validator_components.consensus_manager.shutdown().await;
2120        }
2121
2122        // Shutdown the gRPC server if it's running
2123        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2124            info!("Shutting down gRPC server");
2125            if let Err(e) = grpc_handle.shutdown().await {
2126                warn!("Failed to gracefully shutdown gRPC server: {e}");
2127            }
2128        }
2129    }
2130
2131    /// Asynchronously reconfigures the state of the authority node for the next
2132    /// epoch.
2133    async fn reconfigure_state(
2134        &self,
2135        state: &Arc<AuthorityState>,
2136        cur_epoch_store: &AuthorityPerEpochStore,
2137        next_epoch_committee: Committee,
2138        next_epoch_start_system_state: EpochStartSystemState,
2139        accumulator: Arc<StateAccumulator>,
2140    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2141        let next_epoch = next_epoch_committee.epoch();
2142
2143        let last_checkpoint = self
2144            .checkpoint_store
2145            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2146            .expect("Error loading last checkpoint for current epoch")
2147            .expect("Could not load last checkpoint for current epoch");
2148        let epoch_supply_change = last_checkpoint
2149            .end_of_epoch_data
2150            .as_ref()
2151            .ok_or_else(|| {
2152                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2153            })?
2154            .epoch_supply_change;
2155
2156        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2157
2158        assert_eq!(
2159            Some(last_checkpoint_seq),
2160            self.checkpoint_store
2161                .get_highest_executed_checkpoint_seq_number()
2162                .expect("Error loading highest executed checkpoint sequence number")
2163        );
2164
2165        let epoch_start_configuration = EpochStartConfiguration::new(
2166            next_epoch_start_system_state,
2167            *last_checkpoint.digest(),
2168            state.get_object_store().as_ref(),
2169            EpochFlag::default_flags_for_new_epoch(&state.config),
2170        )
2171        .expect("EpochStartConfiguration construction cannot fail");
2172
2173        let new_epoch_store = self
2174            .state
2175            .reconfigure(
2176                cur_epoch_store,
2177                self.config.supported_protocol_versions.unwrap(),
2178                next_epoch_committee,
2179                epoch_start_configuration,
2180                accumulator,
2181                &self.config.expensive_safety_check_config,
2182                epoch_supply_change,
2183                last_checkpoint_seq,
2184            )
2185            .await
2186            .expect("Reconfigure authority state cannot fail");
2187        info!(next_epoch, "Node State has been reconfigured");
2188        assert_eq!(next_epoch, new_epoch_store.epoch());
2189        self.state.get_reconfig_api().update_epoch_flags_metrics(
2190            cur_epoch_store.epoch_start_config().flags(),
2191            new_epoch_store.epoch_start_config().flags(),
2192        );
2193
2194        Ok(new_epoch_store)
2195    }
2196
2197    pub fn get_config(&self) -> &NodeConfig {
2198        &self.config
2199    }
2200
2201    async fn execute_transaction_immediately_at_zero_epoch(
2202        state: &Arc<AuthorityState>,
2203        epoch_store: &Arc<AuthorityPerEpochStore>,
2204        tx: &Transaction,
2205        span: tracing::Span,
2206    ) {
2207        let _guard = span.enter();
2208        let transaction =
2209            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2210                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2211                    tx.data().clone(),
2212                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2213                ),
2214            );
2215        state
2216            .try_execute_immediately(&transaction, None, epoch_store)
2217            .unwrap();
2218    }
2219
2220    pub fn randomness_handle(&self) -> randomness::Handle {
2221        self.randomness_handle.clone()
2222    }
2223
2224    /// Sends signed capability notification to committee validators for
2225    /// non-committee validators. This method implements retry logic to handle
2226    /// failed attempts to send the notification. It will retry sending the
2227    /// notification with an increasing interval until it receives a successful
2228    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2229    async fn send_signed_capability_notification_to_committee_with_retry(
2230        &self,
2231        epoch_store: &Arc<AuthorityPerEpochStore>,
2232    ) {
2233        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2234        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2235        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2236
2237        // Create the capability notification once
2238        let config = epoch_store.protocol_config();
2239        let binary_config = to_binary_config(config);
2240
2241        // Create the capability notification
2242        let capabilities = AuthorityCapabilitiesV1::new(
2243            self.state.name,
2244            epoch_store.get_chain_identifier().chain(),
2245            self.config
2246                .supported_protocol_versions
2247                .expect("Supported versions should be populated")
2248                .truncate_below(config.version),
2249            self.state
2250                .get_available_system_packages(&binary_config)
2251                .await,
2252        );
2253
2254        // Sign the capabilities using the authority key pair from config
2255        let signature = AuthoritySignature::new_secure(
2256            &IntentMessage::new(
2257                Intent::iota_app(IntentScope::AuthorityCapabilities),
2258                &capabilities,
2259            ),
2260            &epoch_store.epoch(),
2261            self.config.authority_key_pair(),
2262        );
2263
2264        let request = HandleCapabilityNotificationRequestV1 {
2265            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2266        };
2267
2268        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2269
2270        loop {
2271            let auth_agg = self.auth_agg.load();
2272            match auth_agg
2273                .send_capability_notification_to_quorum(request.clone())
2274                .await
2275            {
2276                Ok(_) => {
2277                    info!("Successfully sent capability notification to committee");
2278                    break;
2279                }
2280                Err(err) => {
2281                    match &err {
2282                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2283                            errors,
2284                        } => {
2285                            warn!(
2286                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2287                                retry_interval, errors
2288                            );
2289                        }
2290                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2291                            errors,
2292                        } => {
2293                            error!(
2294                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2295                                errors
2296                            );
2297                            break;
2298                        }
2299                    };
2300
2301                    // Wait before retrying
2302                    tokio::time::sleep(retry_interval).await;
2303
2304                    // Increase retry interval for the next attempt, capped at max
2305                    retry_interval = std::cmp::min(
2306                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2307                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2308                    );
2309                }
2310            }
2311        }
2312    }
2313}
2314
2315#[cfg(not(msim))]
2316impl IotaNode {
2317    async fn fetch_jwks(
2318        _authority: AuthorityName,
2319        provider: &OIDCProvider,
2320    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2321        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2322        let client = reqwest::Client::new();
2323        fetch_jwks(provider, &client)
2324            .await
2325            .map_err(|_| IotaError::JWKRetrieval)
2326    }
2327}
2328
2329#[cfg(msim)]
2330impl IotaNode {
2331    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2332        self.sim_state.sim_node.id()
2333    }
2334
2335    pub fn set_safe_mode_expected(&self, new_value: bool) {
2336        info!("Setting safe mode expected to {}", new_value);
2337        self.sim_state
2338            .sim_safe_mode_expected
2339            .store(new_value, Ordering::Relaxed);
2340    }
2341
2342    async fn fetch_jwks(
2343        authority: AuthorityName,
2344        provider: &OIDCProvider,
2345    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2346        get_jwk_injector()(authority, provider)
2347    }
2348}
2349
2350enum SpawnOnce {
2351    // Mutex is only needed to make SpawnOnce Sync
2352    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2353    #[allow(unused)]
2354    Started(iota_http::ServerHandle),
2355}
2356
2357impl SpawnOnce {
2358    pub fn new(
2359        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2360    ) -> Self {
2361        Self::Unstarted(Mutex::new(Box::pin(future)))
2362    }
2363
2364    pub async fn start(self) -> Self {
2365        match self {
2366            Self::Unstarted(future) => {
2367                let server = future
2368                    .into_inner()
2369                    .await
2370                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2371                let handle = server.handle().clone();
2372                tokio::spawn(async move {
2373                    if let Err(err) = server.serve().await {
2374                        info!("Server stopped: {err}");
2375                    }
2376                    info!("Server stopped");
2377                });
2378                Self::Started(handle)
2379            }
2380            Self::Started(_) => self,
2381        }
2382    }
2383
2384    pub fn shutdown(self) {
2385        if let SpawnOnce::Started(handle) = self {
2386            handle.trigger_shutdown();
2387        }
2388    }
2389}
2390
2391/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2392/// available.
2393fn send_trusted_peer_change(
2394    config: &NodeConfig,
2395    sender: &watch::Sender<TrustedPeerChangeEvent>,
2396    new_epoch_start_state: &EpochStartSystemState,
2397) {
2398    let new_committee =
2399        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2400
2401    sender.send_modify(|event| {
2402        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2403        event.new_committee = new_committee;
2404    })
2405}
2406
2407fn build_kv_store(
2408    state: &Arc<AuthorityState>,
2409    config: &NodeConfig,
2410    registry: &Registry,
2411) -> Result<Arc<TransactionKeyValueStore>> {
2412    let metrics = KeyValueStoreMetrics::new(registry);
2413    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2414
2415    let base_url = &config.transaction_kv_store_read_config.base_url;
2416
2417    if base_url.is_empty() {
2418        info!("no http kv store url provided, using local db only");
2419        return Ok(Arc::new(db_store));
2420    }
2421
2422    base_url.parse::<url::Url>().tap_err(|e| {
2423        error!(
2424            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2425            base_url, e
2426        )
2427    })?;
2428
2429    let http_store = HttpKVStore::new_kv(
2430        base_url,
2431        config.transaction_kv_store_read_config.cache_size,
2432        metrics.clone(),
2433    )?;
2434    info!("using local key-value store with fallback to http key-value store");
2435    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2436        db_store,
2437        http_store,
2438        metrics,
2439        "json_rpc_fallback",
2440    )))
2441}
2442
2443/// Builds and starts the gRPC server for the IOTA node based on the node's
2444/// configuration.
2445///
2446/// This function performs the following tasks:
2447/// 1. Checks if the node is a validator by inspecting the consensus
2448///    configuration; if so, it returns early as validators do not expose gRPC
2449///    APIs.
2450/// 2. Checks if gRPC is enabled in the configuration.
2451/// 3. Creates broadcast channels for checkpoint streaming.
2452/// 4. Initializes the gRPC checkpoint service.
2453/// 5. Spawns the gRPC server to listen for incoming connections.
2454///
2455/// Returns a tuple of optional broadcast channels for checkpoint summary and
2456/// data.
2457async fn build_grpc_server(
2458    config: &NodeConfig,
2459    state: Arc<AuthorityState>,
2460    state_sync_store: RocksDbStore,
2461    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2462    prometheus_registry: &Registry,
2463    server_version: ServerVersion,
2464) -> Result<Option<GrpcServerHandle>> {
2465    // Validators do not expose gRPC APIs
2466    if config.consensus_config().is_some() || !config.enable_grpc_api {
2467        return Ok(None);
2468    }
2469
2470    let Some(grpc_config) = &config.grpc_api_config else {
2471        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2472    };
2473
2474    // Get chain identifier from state directly
2475    let chain_id = state.get_chain_identifier();
2476
2477    let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2478
2479    // Create cancellation token for proper shutdown hierarchy
2480    let shutdown_token = CancellationToken::new();
2481
2482    // Create GrpcReader
2483    let grpc_reader = Arc::new(GrpcReader::new(
2484        grpc_read_store,
2485        Some(server_version.to_string()),
2486    ));
2487
2488    // Create gRPC server metrics
2489    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2490
2491    let handle = start_grpc_server(
2492        grpc_reader,
2493        executor,
2494        grpc_config.clone(),
2495        shutdown_token,
2496        chain_id,
2497        Some(grpc_server_metrics),
2498    )
2499    .await?;
2500
2501    Ok(Some(handle))
2502}
2503
2504/// Builds and starts the HTTP server for the IOTA node, exposing the JSON-RPC
2505/// API based on the node's configuration.
2506///
2507/// This function performs the following tasks:
2508/// 1. Checks if the node is a validator by inspecting the consensus
2509///    configuration; if so, it returns early as validators do not expose these
2510///    APIs.
2511/// 2. Creates an Axum router to handle HTTP requests.
2512/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2513///    on the node's state and configuration, including CoinApi,
2514///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2515///    IndexerApi.
2516/// 4. Binds the server to the specified JSON-RPC address and starts listening
2517///    for incoming connections.
2518pub async fn build_http_server(
2519    state: Arc<AuthorityState>,
2520    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2521    config: &NodeConfig,
2522    prometheus_registry: &Registry,
2523) -> Result<Option<iota_http::ServerHandle>> {
2524    // Validators do not expose these APIs
2525    if config.consensus_config().is_some() {
2526        return Ok(None);
2527    }
2528
2529    let mut router = axum::Router::new();
2530
2531    let json_rpc_router = {
2532        let mut server = JsonRpcServerBuilder::new(
2533            env!("CARGO_PKG_VERSION"),
2534            prometheus_registry,
2535            config.policy_config.clone(),
2536            config.firewall_config.clone(),
2537        );
2538
2539        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2540
2541        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2542        server.register_module(ReadApi::new(
2543            state.clone(),
2544            kv_store.clone(),
2545            metrics.clone(),
2546        ))?;
2547        server.register_module(CoinReadApi::new(
2548            state.clone(),
2549            kv_store.clone(),
2550            metrics.clone(),
2551        )?)?;
2552
2553        // if run_with_range is enabled we want to prevent any transactions
2554        // run_with_range = None is normal operating conditions
2555        if config.run_with_range.is_none() {
2556            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2557        }
2558        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2559
2560        if let Some(transaction_orchestrator) = transaction_orchestrator {
2561            server.register_module(TransactionExecutionApi::new(
2562                state.clone(),
2563                transaction_orchestrator.clone(),
2564                metrics.clone(),
2565            ))?;
2566        }
2567
2568        let iota_names_config = config
2569            .iota_names_config
2570            .clone()
2571            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2572
2573        server.register_module(IndexerApi::new(
2574            state.clone(),
2575            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2576            kv_store,
2577            metrics,
2578            iota_names_config,
2579            config.indexer_max_subscriptions,
2580        ))?;
2581        server.register_module(MoveUtils::new(state.clone()))?;
2582
2583        let server_type = config.jsonrpc_server_type();
2584
2585        server.to_router(server_type).await?
2586    };
2587
2588    router = router.merge(json_rpc_router);
2589
2590    router = router
2591        .route("/health", axum::routing::get(health_check_handler))
2592        .route_layer(axum::Extension(state));
2593
2594    let layers = ServiceBuilder::new()
2595        .map_request(|mut request: axum::http::Request<_>| {
2596            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2597                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2598                request.extensions_mut().insert(axum_connect_info);
2599            }
2600            request
2601        })
2602        .layer(axum::middleware::from_fn(server_timing_middleware));
2603
2604    router = router.layer(layers);
2605
2606    let handle = iota_http::Builder::new()
2607        .serve(&config.json_rpc_address, router)
2608        .map_err(|e| anyhow::anyhow!("{e}"))?;
2609    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2610
2611    Ok(Some(handle))
2612}
2613
2614#[derive(Debug, serde::Serialize, serde::Deserialize)]
2615pub struct Threshold {
2616    pub threshold_seconds: Option<u32>,
2617}
2618
2619async fn health_check_handler(
2620    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2621    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2622) -> impl axum::response::IntoResponse {
2623    if let Some(threshold_seconds) = threshold_seconds {
2624        // Attempt to get the latest checkpoint
2625        let summary = match state
2626            .get_checkpoint_store()
2627            .get_highest_executed_checkpoint()
2628        {
2629            Ok(Some(summary)) => summary,
2630            Ok(None) => {
2631                warn!("Highest executed checkpoint not found");
2632                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2633            }
2634            Err(err) => {
2635                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2636                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2637            }
2638        };
2639
2640        // Calculate the threshold time based on the provided threshold_seconds
2641        let latest_chain_time = summary.timestamp();
2642        let threshold =
2643            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2644
2645        // Check if the latest checkpoint is within the threshold
2646        if latest_chain_time < threshold {
2647            warn!(
2648                ?latest_chain_time,
2649                ?threshold,
2650                "failing health check due to checkpoint lag"
2651            );
2652            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2653        }
2654    }
2655    // if health endpoint is responding and no threshold is given, respond success
2656    (axum::http::StatusCode::OK, "up")
2657}
2658
2659#[cfg(not(test))]
2660fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2661    protocol_config.max_transactions_per_checkpoint() as usize
2662}
2663
2664#[cfg(test)]
2665fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2666    2
2667}