iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30    ConsensusConfig, NodeConfig,
31    node::{DBCheckpointConfig, RunWithRange},
32    node_config_metrics::NodeConfigMetrics,
33    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36    authority::{
37        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38        authority_per_epoch_store::AuthorityPerEpochStore,
39        authority_store_pruner::ObjectsCompactionFilter,
40        authority_store_tables::{
41            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42        },
43        backpressure::BackpressureManager,
44        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45    },
46    authority_aggregator::{
47        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48    },
49    authority_client::NetworkAuthorityClient,
50    authority_server::{ValidatorService, ValidatorServiceMetrics},
51    checkpoints::{
52        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53        SubmitCheckpointToConsensus,
54        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55    },
56    connection_monitor::ConnectionMonitor,
57    consensus_adapter::{
58        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59        ConsensusClient,
60    },
61    consensus_handler::ConsensusHandlerInitializer,
62    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64    db_checkpoint_handler::DBCheckpointHandler,
65    epoch::{
66        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68        reconfiguration::ReconfigurationInitiator,
69    },
70    execution_cache::build_execution_cache,
71    jsonrpc_index::IndexStore,
72    module_cache_metrics::ResolverMetrics,
73    overload_monitor::overload_monitor,
74    rest_index::RestIndexStore,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78    storage::{RestReadStore, RocksDbStore},
79    traffic_controller::metrics::TrafficControllerMetrics,
80    transaction_orchestrator::TransactionOrchestrator,
81    validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87    transaction_builder_api::TransactionBuilderApi,
88    transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93    RegistryID, RegistryService,
94    hardware_metrics::register_hardware_metrics,
95    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96    server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::RestMetrics;
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108    FileCompression, StorageFormat,
109    http_key_value_store::HttpKVStore,
110    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111    key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114    base_types::{AuthorityName, ConciseableName, EpochId},
115    committee::Committee,
116    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117    digests::ChainIdentifier,
118    error::{IotaError, IotaResult},
119    executable_transaction::VerifiedExecutableTransaction,
120    execution_config_utils::to_binary_config,
121    full_checkpoint_content::CheckpointData,
122    iota_system_state::{
123        IotaSystemState, IotaSystemStateTrait,
124        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125    },
126    messages_checkpoint::CertifiedCheckpointSummary,
127    messages_consensus::{
128        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
129        SignedAuthorityCapabilitiesV1, check_total_jwk_size,
130    },
131    messages_grpc::HandleCapabilityNotificationRequestV1,
132    quorum_driver_types::QuorumDriverEffectsQueueResult,
133    supported_protocol_versions::SupportedProtocolVersions,
134    transaction::{Transaction, VerifiedCertificate},
135};
136use prometheus::Registry;
137#[cfg(msim)]
138pub use simulator::set_jwk_injector;
139#[cfg(msim)]
140use simulator::*;
141use tap::tap::TapFallible;
142use tokio::{
143    runtime::Handle,
144    sync::{Mutex, broadcast, mpsc, watch},
145    task::{JoinHandle, JoinSet},
146};
147use tokio_util::sync::CancellationToken;
148use tower::ServiceBuilder;
149use tracing::{Instrument, debug, error, error_span, info, warn};
150use typed_store::{
151    DBMetrics,
152    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
153};
154
155use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
156
157pub mod admin;
158mod handle;
159pub mod metrics;
160
161pub struct ValidatorComponents {
162    validator_server_handle: SpawnOnce,
163    validator_overload_monitor_handle: Option<JoinHandle<()>>,
164    consensus_manager: ConsensusManager,
165    consensus_store_pruner: ConsensusStorePruner,
166    consensus_adapter: Arc<ConsensusAdapter>,
167    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
168    checkpoint_service_tasks: JoinSet<()>,
169    checkpoint_metrics: Arc<CheckpointMetrics>,
170    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
171    validator_registry_id: RegistryID,
172}
173
174#[cfg(msim)]
175mod simulator {
176    use std::sync::atomic::AtomicBool;
177
178    use super::*;
179
180    pub(super) struct SimState {
181        pub sim_node: iota_simulator::runtime::NodeHandle,
182        pub sim_safe_mode_expected: AtomicBool,
183        _leak_detector: iota_simulator::NodeLeakDetector,
184    }
185
186    impl Default for SimState {
187        fn default() -> Self {
188            Self {
189                sim_node: iota_simulator::runtime::NodeHandle::current(),
190                sim_safe_mode_expected: AtomicBool::new(false),
191                _leak_detector: iota_simulator::NodeLeakDetector::new(),
192            }
193        }
194    }
195
196    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
197        + Send
198        + Sync
199        + 'static;
200
201    fn default_fetch_jwks(
202        _authority: AuthorityName,
203        _provider: &OIDCProvider,
204    ) -> IotaResult<Vec<(JwkId, JWK)>> {
205        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
206        // Just load a default Twitch jwk for testing.
207        parse_jwks(
208            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
209            &OIDCProvider::Twitch,
210        )
211        .map_err(|_| IotaError::JWKRetrieval)
212    }
213
214    thread_local! {
215        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
216    }
217
218    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
219        JWK_INJECTOR.with(|injector| injector.borrow().clone())
220    }
221
222    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
223        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
224    }
225}
226
227pub struct IotaNode {
228    config: NodeConfig,
229    validator_components: Mutex<Option<ValidatorComponents>>,
230    /// The http server responsible for serving JSON-RPC as well as the
231    /// experimental rest service
232    _http_server: Option<iota_http::ServerHandle>,
233    state: Arc<AuthorityState>,
234    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
235    registry_service: RegistryService,
236    metrics: Arc<IotaNodeMetrics>,
237
238    _discovery: discovery::Handle,
239    state_sync_handle: state_sync::Handle,
240    randomness_handle: randomness::Handle,
241    checkpoint_store: Arc<CheckpointStore>,
242    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
243    connection_monitor_status: Arc<ConnectionMonitorStatus>,
244
245    /// Broadcast channel to send the starting system state for the next epoch.
246    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
247
248    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
249    /// peers.
250    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
251
252    backpressure_manager: Arc<BackpressureManager>,
253
254    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
255
256    #[cfg(msim)]
257    sim_state: SimState,
258
259    _state_archive_handle: Option<broadcast::Sender<()>>,
260
261    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
262    // Channel to allow signaling upstream to shutdown iota-node
263    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
264
265    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
266    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
267
268    /// AuthorityAggregator of the network, created at start and beginning of
269    /// each epoch. Use ArcSwap so that we could mutate it without taking
270    /// mut reference.
271    // TODO: Eventually we can make this auth aggregator a shared reference so that this
272    // update will automatically propagate to other uses.
273    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
274}
275
276impl fmt::Debug for IotaNode {
277    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
278        f.debug_struct("IotaNode")
279            .field("name", &self.state.name.concise())
280            .finish()
281    }
282}
283
284static MAX_JWK_KEYS_PER_FETCH: usize = 100;
285
286impl IotaNode {
287    pub async fn start(
288        config: NodeConfig,
289        registry_service: RegistryService,
290        custom_rpc_runtime: Option<Handle>,
291    ) -> Result<Arc<IotaNode>> {
292        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
293    }
294
295    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
296    /// configuration.
297    /// This function ensures continuous fetching, validation, and submission of
298    /// JWKs, maintaining up-to-date keys for the specified providers.
299    fn start_jwk_updater(
300        config: &NodeConfig,
301        metrics: Arc<IotaNodeMetrics>,
302        authority: AuthorityName,
303        epoch_store: Arc<AuthorityPerEpochStore>,
304        consensus_adapter: Arc<ConsensusAdapter>,
305    ) {
306        let epoch = epoch_store.epoch();
307
308        let supported_providers = config
309            .zklogin_oauth_providers
310            .get(&epoch_store.get_chain_identifier().chain())
311            .unwrap_or(&BTreeSet::new())
312            .iter()
313            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
314            .collect::<Vec<_>>();
315
316        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
317
318        info!(
319            ?fetch_interval,
320            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
321        );
322
323        fn validate_jwk(
324            metrics: &Arc<IotaNodeMetrics>,
325            provider: &OIDCProvider,
326            id: &JwkId,
327            jwk: &JWK,
328        ) -> bool {
329            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
330                warn!(
331                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
332                    id.iss, provider
333                );
334                metrics
335                    .invalid_jwks
336                    .with_label_values(&[&provider.to_string()])
337                    .inc();
338                return false;
339            };
340
341            if iss_provider != *provider {
342                warn!(
343                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
344                    id.iss, provider, iss_provider
345                );
346                metrics
347                    .invalid_jwks
348                    .with_label_values(&[&provider.to_string()])
349                    .inc();
350                return false;
351            }
352
353            if !check_total_jwk_size(id, jwk) {
354                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
355                metrics
356                    .invalid_jwks
357                    .with_label_values(&[&provider.to_string()])
358                    .inc();
359                return false;
360            }
361
362            true
363        }
364
365        // metrics is:
366        //  pub struct IotaNodeMetrics {
367        //      pub jwk_requests: IntCounterVec,
368        //      pub jwk_request_errors: IntCounterVec,
369        //      pub total_jwks: IntCounterVec,
370        //      pub unique_jwks: IntCounterVec,
371        //  }
372
373        for p in supported_providers.into_iter() {
374            let provider_str = p.to_string();
375            let epoch_store = epoch_store.clone();
376            let consensus_adapter = consensus_adapter.clone();
377            let metrics = metrics.clone();
378            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
379                async move {
380                    // note: restart-safe de-duplication happens after consensus, this is
381                    // just best-effort to reduce unneeded submissions.
382                    let mut seen = HashSet::new();
383                    loop {
384                        info!("fetching JWK for provider {:?}", p);
385                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
386                        match Self::fetch_jwks(authority, &p).await {
387                            Err(e) => {
388                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
389                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
390                                // Retry in 30 seconds
391                                tokio::time::sleep(Duration::from_secs(30)).await;
392                                continue;
393                            }
394                            Ok(mut keys) => {
395                                metrics.total_jwks
396                                    .with_label_values(&[&provider_str])
397                                    .inc_by(keys.len() as u64);
398
399                                keys.retain(|(id, jwk)| {
400                                    validate_jwk(&metrics, &p, id, jwk) &&
401                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
402                                    seen.insert((id.clone(), jwk.clone()))
403                                });
404
405                                metrics.unique_jwks
406                                    .with_label_values(&[&provider_str])
407                                    .inc_by(keys.len() as u64);
408
409                                // prevent oauth providers from sending too many keys,
410                                // inadvertently or otherwise
411                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
412                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
413                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
414                                }
415
416                                for (id, jwk) in keys.into_iter() {
417                                    info!("Submitting JWK to consensus: {:?}", id);
418
419                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
420                                    consensus_adapter.submit(txn, None, &epoch_store)
421                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
422                                        .ok();
423                                }
424                            }
425                        }
426                        tokio::time::sleep(fetch_interval).await;
427                    }
428                }
429                .instrument(error_span!("jwk_updater_task", epoch)),
430            ));
431        }
432    }
433
434    pub async fn start_async(
435        config: NodeConfig,
436        registry_service: RegistryService,
437        custom_rpc_runtime: Option<Handle>,
438        software_version: &'static str,
439    ) -> Result<Arc<IotaNode>> {
440        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
441        let mut config = config.clone();
442        if config.supported_protocol_versions.is_none() {
443            info!(
444                "populating config.supported_protocol_versions with default {:?}",
445                SupportedProtocolVersions::SYSTEM_DEFAULT
446            );
447            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
448        }
449
450        let run_with_range = config.run_with_range;
451        let is_validator = config.consensus_config().is_some();
452        let is_full_node = !is_validator;
453        let prometheus_registry = registry_service.default_registry();
454
455        info!(node =? config.authority_public_key(),
456            "Initializing iota-node listening on {}", config.network_address
457        );
458
459        let genesis = config.genesis()?.clone();
460
461        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
462        info!("IOTA chain identifier: {chain_identifier}");
463
464        // Check and set the db_corrupted flag
465        let db_corrupted_path = &config.db_path().join("status");
466        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
467            panic!("Failed to check database corruption: {err}");
468        }
469
470        // Initialize metrics to track db usage before creating any stores
471        DBMetrics::init(&prometheus_registry);
472
473        // Initialize IOTA metrics.
474        iota_metrics::init_metrics(&prometheus_registry);
475        // Unsupported (because of the use of static variable) and unnecessary in
476        // simtests.
477        #[cfg(not(msim))]
478        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
479
480        // Register hardware metrics.
481        register_hardware_metrics(&registry_service, &config.db_path)
482            .expect("Failed registering hardware metrics");
483        // Register uptime metric
484        prometheus_registry
485            .register(iota_metrics::uptime_metric(
486                if is_validator {
487                    "validator"
488                } else {
489                    "fullnode"
490                },
491                software_version,
492                &chain_identifier.to_string(),
493            ))
494            .expect("Failed registering uptime metric");
495
496        // If genesis come with some migration data then load them into memory from the
497        // file path specified in config.
498        let migration_tx_data = if genesis.contains_migrations() {
499            // Here the load already verifies that the content of the migration blob is
500            // valid in respect to the content found in genesis
501            Some(config.load_migration_tx_data()?)
502        } else {
503            None
504        };
505
506        let secret = Arc::pin(config.authority_key_pair().copy());
507        let genesis_committee = genesis.committee()?;
508        let committee_store = Arc::new(CommitteeStore::new(
509            config.db_path().join("epochs"),
510            &genesis_committee,
511            None,
512        ));
513
514        let mut pruner_db = None;
515        if config
516            .authority_store_pruning_config
517            .enable_compaction_filter
518        {
519            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
520                &config.db_path().join("store"),
521            )));
522        }
523        let compaction_filter = pruner_db
524            .clone()
525            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
526
527        // By default, only enable write stall on validators for perpetual db.
528        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
529        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
530            enable_write_stall,
531            compaction_filter,
532        };
533        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
534            &config.db_path().join("store"),
535            Some(perpetual_tables_options),
536        ));
537        let is_genesis = perpetual_tables
538            .database_is_empty()
539            .expect("Database read should not fail at init.");
540        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
541        let backpressure_manager =
542            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
543
544        let store = AuthorityStore::open(
545            perpetual_tables,
546            &genesis,
547            &config,
548            &prometheus_registry,
549            migration_tx_data.as_ref(),
550        )
551        .await?;
552
553        let cur_epoch = store.get_recovery_epoch_at_restart()?;
554        let committee = committee_store
555            .get_committee(&cur_epoch)?
556            .expect("Committee of the current epoch must exist");
557        let epoch_start_configuration = store
558            .get_epoch_start_configuration()?
559            .expect("EpochStartConfiguration of the current epoch must exist");
560        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
561        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
562
563        let cache_traits = build_execution_cache(
564            &config.execution_cache_config,
565            &epoch_start_configuration,
566            &prometheus_registry,
567            &store,
568            backpressure_manager.clone(),
569        );
570
571        let auth_agg = {
572            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
573            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
574            Arc::new(ArcSwap::new(Arc::new(
575                AuthorityAggregator::new_from_epoch_start_state(
576                    epoch_start_configuration.epoch_start_state(),
577                    &committee_store,
578                    safe_client_metrics_base,
579                    auth_agg_metrics,
580                ),
581            )))
582        };
583
584        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
585        let chain = match config.chain_override_for_testing {
586            Some(chain) => chain,
587            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
588        };
589
590        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
591        let epoch_store = AuthorityPerEpochStore::new(
592            config.authority_public_key(),
593            committee.clone(),
594            &config.db_path().join("store"),
595            Some(epoch_options.options),
596            EpochMetrics::new(&registry_service.default_registry()),
597            epoch_start_configuration,
598            cache_traits.backing_package_store.clone(),
599            cache_traits.object_store.clone(),
600            cache_metrics,
601            signature_verifier_metrics,
602            &config.expensive_safety_check_config,
603            (chain_id, chain),
604            checkpoint_store
605                .get_highest_executed_checkpoint_seq_number()
606                .expect("checkpoint store read cannot fail")
607                .unwrap_or(0),
608        )?;
609
610        info!("created epoch store");
611
612        replay_log!(
613            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
614            epoch_store.epoch(),
615            epoch_store.protocol_config()
616        );
617
618        // the database is empty at genesis time
619        if is_genesis {
620            info!("checking IOTA conservation at genesis");
621            // When we are opening the db table, the only time when it's safe to
622            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
623            // an epoch and the IOTA conservation check will fail. This also initialize
624            // the expected_network_iota_amount table.
625            cache_traits
626                .reconfig_api
627                .try_expensive_check_iota_conservation(&epoch_store, None)
628                .expect("IOTA conservation check cannot fail at genesis");
629        }
630
631        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
632        let default_buffer_stake = epoch_store
633            .protocol_config()
634            .buffer_stake_for_protocol_upgrade_bps();
635        if effective_buffer_stake != default_buffer_stake {
636            warn!(
637                ?effective_buffer_stake,
638                ?default_buffer_stake,
639                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
640            );
641        }
642
643        checkpoint_store.insert_genesis_checkpoint(
644            genesis.checkpoint(),
645            genesis.checkpoint_contents().clone(),
646            &epoch_store,
647        );
648
649        // Database has everything from genesis, set corrupted key to 0
650        unmark_db_corruption(db_corrupted_path)?;
651
652        info!("creating state sync store");
653        let state_sync_store = RocksDbStore::new(
654            cache_traits.clone(),
655            committee_store.clone(),
656            checkpoint_store.clone(),
657        );
658
659        let index_store = if is_full_node && config.enable_index_processing {
660            info!("creating index store");
661            Some(Arc::new(IndexStore::new(
662                config.db_path().join("indexes"),
663                &prometheus_registry,
664                epoch_store
665                    .protocol_config()
666                    .max_move_identifier_len_as_option(),
667                config.remove_deprecated_tables,
668            )))
669        } else {
670            None
671        };
672
673        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
674        {
675            Some(Arc::new(RestIndexStore::new(
676                config.db_path().join("rest_index"),
677                &store,
678                &checkpoint_store,
679                &epoch_store,
680                &cache_traits.backing_package_store,
681            )))
682        } else {
683            None
684        };
685
686        info!("creating archive reader");
687        // Create network
688        // TODO only configure validators as seed/preferred peers for validators and not
689        // for fullnodes once we've had a chance to re-work fullnode
690        // configuration generation.
691        let archive_readers =
692            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
693        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
694        let (randomness_tx, randomness_rx) = mpsc::channel(
695            config
696                .p2p_config
697                .randomness
698                .clone()
699                .unwrap_or_default()
700                .mailbox_capacity(),
701        );
702        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
703            Self::create_p2p_network(
704                &config,
705                state_sync_store.clone(),
706                chain_identifier,
707                trusted_peer_change_rx,
708                archive_readers.clone(),
709                randomness_tx,
710                &prometheus_registry,
711            )?;
712
713        // We must explicitly send this instead of relying on the initial value to
714        // trigger watch value change, so that state-sync is able to process it.
715        send_trusted_peer_change(
716            &config,
717            &trusted_peer_change_tx,
718            epoch_store.epoch_start_state(),
719        );
720
721        info!("start state archival");
722        // Start archiving local state to remote store
723        let state_archive_handle =
724            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
725                .await?;
726
727        info!("start snapshot upload");
728        // Start uploading state snapshot to remote store
729        let state_snapshot_handle =
730            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
731
732        // Start uploading db checkpoints to remote store
733        info!("start db checkpoint");
734        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735            &config,
736            &prometheus_registry,
737            state_snapshot_handle.is_some(),
738        )?;
739
740        let mut genesis_objects = genesis.objects().to_vec();
741        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
742            genesis_objects.extend(migration_tx_data.get_objects());
743        }
744
745        let authority_name = config.authority_public_key();
746        let validator_tx_finalizer =
747            config
748                .enable_validator_tx_finalizer
749                .then_some(Arc::new(ValidatorTxFinalizer::new(
750                    auth_agg.clone(),
751                    authority_name,
752                    &prometheus_registry,
753                )));
754
755        info!("create authority state");
756        let state = AuthorityState::new(
757            authority_name,
758            secret,
759            config.supported_protocol_versions.unwrap(),
760            store.clone(),
761            cache_traits.clone(),
762            epoch_store.clone(),
763            committee_store.clone(),
764            index_store.clone(),
765            rest_index,
766            checkpoint_store.clone(),
767            &prometheus_registry,
768            &genesis_objects,
769            &db_checkpoint_config,
770            config.clone(),
771            archive_readers,
772            validator_tx_finalizer,
773            chain_identifier,
774            pruner_db,
775        )
776        .await;
777
778        // ensure genesis and migration txs were executed
779        if epoch_store.epoch() == 0 {
780            let genesis_tx = &genesis.transaction();
781            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
782            // Execute genesis transaction
783            Self::execute_transaction_immediately_at_zero_epoch(
784                &state,
785                &epoch_store,
786                genesis_tx,
787                span,
788            )
789            .await;
790
791            // Execute migration transactions if present
792            if let Some(migration_tx_data) = migration_tx_data {
793                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
794                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
795                    Self::execute_transaction_immediately_at_zero_epoch(
796                        &state,
797                        &epoch_store,
798                        tx,
799                        span,
800                    )
801                    .await;
802                }
803            }
804        }
805
806        // Start the loop that receives new randomness and generates transactions for
807        // it.
808        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
809
810        if config
811            .expensive_safety_check_config
812            .enable_secondary_index_checks()
813        {
814            if let Some(indexes) = state.indexes.clone() {
815                iota_core::verify_indexes::verify_indexes(
816                    state.get_accumulator_store().as_ref(),
817                    indexes,
818                )
819                .expect("secondary indexes are inconsistent");
820            }
821        }
822
823        let (end_of_epoch_channel, end_of_epoch_receiver) =
824            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
825
826        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
827            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
828                auth_agg.load_full(),
829                state.clone(),
830                end_of_epoch_receiver,
831                &config.db_path(),
832                &prometheus_registry,
833            )))
834        } else {
835            None
836        };
837
838        let http_server = build_http_server(
839            state.clone(),
840            state_sync_store.clone(),
841            &transaction_orchestrator.clone(),
842            &config,
843            &prometheus_registry,
844            custom_rpc_runtime,
845            software_version,
846        )
847        .await?;
848
849        let accumulator = Arc::new(StateAccumulator::new(
850            cache_traits.accumulator_store.clone(),
851            StateAccumulatorMetrics::new(&prometheus_registry),
852        ));
853
854        let authority_names_to_peer_ids = epoch_store
855            .epoch_start_state()
856            .get_authority_names_to_peer_ids();
857
858        let network_connection_metrics =
859            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
860
861        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
862
863        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
864            p2p_network.downgrade(),
865            network_connection_metrics,
866            HashMap::new(),
867            None,
868        );
869
870        let connection_monitor_status = ConnectionMonitorStatus {
871            connection_statuses,
872            authority_names_to_peer_ids,
873        };
874
875        let connection_monitor_status = Arc::new(connection_monitor_status);
876        let iota_node_metrics =
877            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
878
879        let grpc_server_handle =
880            build_grpc_server(&config, state.clone(), state_sync_store.clone()).await?;
881
882        let validator_components = if state.is_committee_validator(&epoch_store) {
883            let (components, _) = futures::join!(
884                Self::construct_validator_components(
885                    config.clone(),
886                    state.clone(),
887                    committee,
888                    epoch_store.clone(),
889                    checkpoint_store.clone(),
890                    state_sync_handle.clone(),
891                    randomness_handle.clone(),
892                    Arc::downgrade(&accumulator),
893                    backpressure_manager.clone(),
894                    connection_monitor_status.clone(),
895                    &registry_service,
896                    iota_node_metrics.clone(),
897                ),
898                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
899            );
900            let mut components = components?;
901
902            components.consensus_adapter.submit_recovered(&epoch_store);
903
904            // Start the gRPC server
905            components.validator_server_handle = components.validator_server_handle.start().await;
906
907            Some(components)
908        } else {
909            None
910        };
911
912        // setup shutdown channel
913        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
914
915        let node = Self {
916            config,
917            validator_components: Mutex::new(validator_components),
918            _http_server: http_server,
919            state,
920            transaction_orchestrator,
921            registry_service,
922            metrics: iota_node_metrics,
923
924            _discovery: discovery_handle,
925            state_sync_handle,
926            randomness_handle,
927            checkpoint_store,
928            accumulator: Mutex::new(Some(accumulator)),
929            end_of_epoch_channel,
930            connection_monitor_status,
931            trusted_peer_change_tx,
932            backpressure_manager,
933
934            _db_checkpoint_handle: db_checkpoint_handle,
935
936            #[cfg(msim)]
937            sim_state: Default::default(),
938
939            _state_archive_handle: state_archive_handle,
940            _state_snapshot_uploader_handle: state_snapshot_handle,
941            shutdown_channel_tx: shutdown_channel,
942
943            grpc_server_handle: Mutex::new(grpc_server_handle),
944
945            auth_agg,
946        };
947
948        info!("IotaNode started!");
949        let node = Arc::new(node);
950        let node_copy = node.clone();
951        spawn_monitored_task!(async move {
952            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
953            if let Err(error) = result {
954                warn!("Reconfiguration finished with error {:?}", error);
955            }
956        });
957
958        Ok(node)
959    }
960
961    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
962        self.end_of_epoch_channel.subscribe()
963    }
964
965    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
966        self.shutdown_channel_tx.subscribe()
967    }
968
969    pub fn current_epoch_for_testing(&self) -> EpochId {
970        self.state.current_epoch_for_testing()
971    }
972
973    pub fn db_checkpoint_path(&self) -> PathBuf {
974        self.config.db_checkpoint_path()
975    }
976
977    // Init reconfig process by starting to reject user certs
978    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
979        info!("close_epoch (current epoch = {})", epoch_store.epoch());
980        self.validator_components
981            .lock()
982            .await
983            .as_ref()
984            .ok_or_else(|| IotaError::from("Node is not a validator"))?
985            .consensus_adapter
986            .close_epoch(epoch_store);
987        Ok(())
988    }
989
990    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
991        self.state
992            .clear_override_protocol_upgrade_buffer_stake(epoch)
993    }
994
995    pub fn set_override_protocol_upgrade_buffer_stake(
996        &self,
997        epoch: EpochId,
998        buffer_stake_bps: u64,
999    ) -> IotaResult {
1000        self.state
1001            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1002    }
1003
1004    // Testing-only API to start epoch close process.
1005    // For production code, please use the non-testing version.
1006    pub async fn close_epoch_for_testing(&self) -> IotaResult {
1007        let epoch_store = self.state.epoch_store_for_testing();
1008        self.close_epoch(&epoch_store).await
1009    }
1010
1011    async fn start_state_archival(
1012        config: &NodeConfig,
1013        prometheus_registry: &Registry,
1014        state_sync_store: RocksDbStore,
1015    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1016        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1017            let local_store_config = ObjectStoreConfig {
1018                object_store: Some(ObjectStoreType::File),
1019                directory: Some(config.archive_path()),
1020                ..Default::default()
1021            };
1022            let archive_writer = ArchiveWriter::new(
1023                local_store_config,
1024                remote_store_config.clone(),
1025                FileCompression::Zstd,
1026                StorageFormat::Blob,
1027                Duration::from_secs(600),
1028                256 * 1024 * 1024,
1029                prometheus_registry,
1030            )
1031            .await?;
1032            Ok(Some(archive_writer.start(state_sync_store).await?))
1033        } else {
1034            Ok(None)
1035        }
1036    }
1037
1038    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
1039    /// is set.
1040    fn start_state_snapshot(
1041        config: &NodeConfig,
1042        prometheus_registry: &Registry,
1043        checkpoint_store: Arc<CheckpointStore>,
1044    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1045        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1046            let snapshot_uploader = StateSnapshotUploader::new(
1047                &config.db_checkpoint_path(),
1048                &config.snapshot_path(),
1049                remote_store_config.clone(),
1050                60,
1051                prometheus_registry,
1052                checkpoint_store,
1053            )?;
1054            Ok(Some(snapshot_uploader.start()))
1055        } else {
1056            Ok(None)
1057        }
1058    }
1059
1060    fn start_db_checkpoint(
1061        config: &NodeConfig,
1062        prometheus_registry: &Registry,
1063        state_snapshot_enabled: bool,
1064    ) -> Result<(
1065        DBCheckpointConfig,
1066        Option<tokio::sync::broadcast::Sender<()>>,
1067    )> {
1068        let checkpoint_path = Some(
1069            config
1070                .db_checkpoint_config
1071                .checkpoint_path
1072                .clone()
1073                .unwrap_or_else(|| config.db_checkpoint_path()),
1074        );
1075        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1076            DBCheckpointConfig {
1077                checkpoint_path,
1078                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1079                    true
1080                } else {
1081                    config
1082                        .db_checkpoint_config
1083                        .perform_db_checkpoints_at_epoch_end
1084                },
1085                ..config.db_checkpoint_config.clone()
1086            }
1087        } else {
1088            config.db_checkpoint_config.clone()
1089        };
1090
1091        match (
1092            db_checkpoint_config.object_store_config.as_ref(),
1093            state_snapshot_enabled,
1094        ) {
1095            // If db checkpoint config object store not specified but
1096            // state snapshot object store is specified, create handler
1097            // anyway for marking db checkpoints as completed so that they
1098            // can be uploaded as state snapshots.
1099            (None, false) => Ok((db_checkpoint_config, None)),
1100            (_, _) => {
1101                let handler = DBCheckpointHandler::new(
1102                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1103                    db_checkpoint_config.object_store_config.as_ref(),
1104                    60,
1105                    db_checkpoint_config
1106                        .prune_and_compact_before_upload
1107                        .unwrap_or(true),
1108                    config.authority_store_pruning_config.clone(),
1109                    prometheus_registry,
1110                    state_snapshot_enabled,
1111                )?;
1112                Ok((
1113                    db_checkpoint_config,
1114                    Some(DBCheckpointHandler::start(handler)),
1115                ))
1116            }
1117        }
1118    }
1119
1120    fn create_p2p_network(
1121        config: &NodeConfig,
1122        state_sync_store: RocksDbStore,
1123        chain_identifier: ChainIdentifier,
1124        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1125        archive_readers: ArchiveReaderBalancer,
1126        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1127        prometheus_registry: &Registry,
1128    ) -> Result<(
1129        Network,
1130        discovery::Handle,
1131        state_sync::Handle,
1132        randomness::Handle,
1133    )> {
1134        let (state_sync, state_sync_server) = state_sync::Builder::new()
1135            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1136            .store(state_sync_store)
1137            .archive_readers(archive_readers)
1138            .with_metrics(prometheus_registry)
1139            .build();
1140
1141        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1142            .config(config.p2p_config.clone())
1143            .build();
1144
1145        let (randomness, randomness_router) =
1146            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1147                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1148                .with_metrics(prometheus_registry)
1149                .build();
1150
1151        let p2p_network = {
1152            let routes = anemo::Router::new()
1153                .add_rpc_service(discovery_server)
1154                .add_rpc_service(state_sync_server);
1155            let routes = routes.merge(randomness_router);
1156
1157            let inbound_network_metrics =
1158                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1159            let outbound_network_metrics =
1160                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1161
1162            let service = ServiceBuilder::new()
1163                .layer(
1164                    TraceLayer::new_for_server_errors()
1165                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1166                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1167                )
1168                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1169                    Arc::new(inbound_network_metrics),
1170                    config.p2p_config.excessive_message_size(),
1171                )))
1172                .service(routes);
1173
1174            let outbound_layer = ServiceBuilder::new()
1175                .layer(
1176                    TraceLayer::new_for_client_and_server_errors()
1177                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1178                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1179                )
1180                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1181                    Arc::new(outbound_network_metrics),
1182                    config.p2p_config.excessive_message_size(),
1183                )))
1184                .into_inner();
1185
1186            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1187            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1188            // many staking events in the epoch change txn.
1189            anemo_config.max_frame_size = Some(1 << 30);
1190
1191            // Set a higher default value for socket send/receive buffers if not already
1192            // configured.
1193            let mut quic_config = anemo_config.quic.unwrap_or_default();
1194            if quic_config.socket_send_buffer_size.is_none() {
1195                quic_config.socket_send_buffer_size = Some(20 << 20);
1196            }
1197            if quic_config.socket_receive_buffer_size.is_none() {
1198                quic_config.socket_receive_buffer_size = Some(20 << 20);
1199            }
1200            quic_config.allow_failed_socket_buffer_size_setting = true;
1201
1202            // Set high-performance defaults for quinn transport.
1203            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1204            if quic_config.max_concurrent_bidi_streams.is_none() {
1205                quic_config.max_concurrent_bidi_streams = Some(500);
1206            }
1207            if quic_config.max_concurrent_uni_streams.is_none() {
1208                quic_config.max_concurrent_uni_streams = Some(500);
1209            }
1210            if quic_config.stream_receive_window.is_none() {
1211                quic_config.stream_receive_window = Some(100 << 20);
1212            }
1213            if quic_config.receive_window.is_none() {
1214                quic_config.receive_window = Some(200 << 20);
1215            }
1216            if quic_config.send_window.is_none() {
1217                quic_config.send_window = Some(200 << 20);
1218            }
1219            if quic_config.crypto_buffer_size.is_none() {
1220                quic_config.crypto_buffer_size = Some(1 << 20);
1221            }
1222            if quic_config.max_idle_timeout_ms.is_none() {
1223                quic_config.max_idle_timeout_ms = Some(30_000);
1224            }
1225            if quic_config.keep_alive_interval_ms.is_none() {
1226                quic_config.keep_alive_interval_ms = Some(5_000);
1227            }
1228            anemo_config.quic = Some(quic_config);
1229
1230            let server_name = format!("iota-{chain_identifier}");
1231            let network = Network::bind(config.p2p_config.listen_address)
1232                .server_name(&server_name)
1233                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1234                .config(anemo_config)
1235                .outbound_request_layer(outbound_layer)
1236                .start(service)?;
1237            info!(
1238                server_name = server_name,
1239                "P2p network started on {}",
1240                network.local_addr()
1241            );
1242
1243            network
1244        };
1245
1246        let discovery_handle =
1247            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1248        let state_sync_handle = state_sync.start(p2p_network.clone());
1249        let randomness_handle = randomness.start(p2p_network.clone());
1250
1251        Ok((
1252            p2p_network,
1253            discovery_handle,
1254            state_sync_handle,
1255            randomness_handle,
1256        ))
1257    }
1258
1259    /// Asynchronously constructs and initializes the components necessary for
1260    /// the validator node.
1261    async fn construct_validator_components(
1262        config: NodeConfig,
1263        state: Arc<AuthorityState>,
1264        committee: Arc<Committee>,
1265        epoch_store: Arc<AuthorityPerEpochStore>,
1266        checkpoint_store: Arc<CheckpointStore>,
1267        state_sync_handle: state_sync::Handle,
1268        randomness_handle: randomness::Handle,
1269        accumulator: Weak<StateAccumulator>,
1270        backpressure_manager: Arc<BackpressureManager>,
1271        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1272        registry_service: &RegistryService,
1273        iota_node_metrics: Arc<IotaNodeMetrics>,
1274    ) -> Result<ValidatorComponents> {
1275        let mut config_clone = config.clone();
1276        let consensus_config = config_clone
1277            .consensus_config
1278            .as_mut()
1279            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1280        let validator_registry = Registry::new();
1281        let validator_registry_id = registry_service.add(validator_registry.clone());
1282
1283        let client = Arc::new(UpdatableConsensusClient::new());
1284        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1285            &committee,
1286            consensus_config,
1287            state.name,
1288            connection_monitor_status.clone(),
1289            &validator_registry,
1290            client.clone(),
1291            checkpoint_store.clone(),
1292        ));
1293        let consensus_manager = ConsensusManager::new(
1294            &config,
1295            consensus_config,
1296            registry_service,
1297            &validator_registry,
1298            client,
1299        );
1300
1301        // This only gets started up once, not on every epoch. (Make call to remove
1302        // every epoch.)
1303        let consensus_store_pruner = ConsensusStorePruner::new(
1304            consensus_manager.get_storage_base_path(),
1305            consensus_config.db_retention_epochs(),
1306            consensus_config.db_pruner_period(),
1307            &validator_registry,
1308        );
1309
1310        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1311        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1312
1313        let validator_server_handle = Self::start_grpc_validator_service(
1314            &config,
1315            state.clone(),
1316            consensus_adapter.clone(),
1317            &validator_registry,
1318        )
1319        .await?;
1320
1321        // Starts an overload monitor that monitors the execution of the authority.
1322        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1323        let validator_overload_monitor_handle = if config
1324            .authority_overload_config
1325            .max_load_shedding_percentage
1326            > 0
1327        {
1328            let authority_state = Arc::downgrade(&state);
1329            let overload_config = config.authority_overload_config.clone();
1330            fail_point!("starting_overload_monitor");
1331            Some(spawn_monitored_task!(overload_monitor(
1332                authority_state,
1333                overload_config,
1334            )))
1335        } else {
1336            None
1337        };
1338
1339        Self::start_epoch_specific_validator_components(
1340            &config,
1341            state.clone(),
1342            consensus_adapter,
1343            checkpoint_store,
1344            epoch_store,
1345            state_sync_handle,
1346            randomness_handle,
1347            consensus_manager,
1348            consensus_store_pruner,
1349            accumulator,
1350            backpressure_manager,
1351            validator_server_handle,
1352            validator_overload_monitor_handle,
1353            checkpoint_metrics,
1354            iota_node_metrics,
1355            iota_tx_validator_metrics,
1356            validator_registry_id,
1357        )
1358        .await
1359    }
1360
1361    /// Initializes and starts components specific to the current
1362    /// epoch for the validator node.
1363    async fn start_epoch_specific_validator_components(
1364        config: &NodeConfig,
1365        state: Arc<AuthorityState>,
1366        consensus_adapter: Arc<ConsensusAdapter>,
1367        checkpoint_store: Arc<CheckpointStore>,
1368        epoch_store: Arc<AuthorityPerEpochStore>,
1369        state_sync_handle: state_sync::Handle,
1370        randomness_handle: randomness::Handle,
1371        consensus_manager: ConsensusManager,
1372        consensus_store_pruner: ConsensusStorePruner,
1373        accumulator: Weak<StateAccumulator>,
1374        backpressure_manager: Arc<BackpressureManager>,
1375        validator_server_handle: SpawnOnce,
1376        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1377        checkpoint_metrics: Arc<CheckpointMetrics>,
1378        iota_node_metrics: Arc<IotaNodeMetrics>,
1379        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1380        validator_registry_id: RegistryID,
1381    ) -> Result<ValidatorComponents> {
1382        let checkpoint_service = Self::build_checkpoint_service(
1383            config,
1384            consensus_adapter.clone(),
1385            checkpoint_store.clone(),
1386            epoch_store.clone(),
1387            state.clone(),
1388            state_sync_handle,
1389            accumulator,
1390            checkpoint_metrics.clone(),
1391        );
1392
1393        // create a new map that gets injected into both the consensus handler and the
1394        // consensus adapter the consensus handler will write values forwarded
1395        // from consensus, and the consensus adapter will read the values to
1396        // make decisions about which validator submits a transaction to consensus
1397        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1398
1399        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1400
1401        let randomness_manager = RandomnessManager::try_new(
1402            Arc::downgrade(&epoch_store),
1403            Box::new(consensus_adapter.clone()),
1404            randomness_handle,
1405            config.authority_key_pair(),
1406        )
1407        .await;
1408        if let Some(randomness_manager) = randomness_manager {
1409            epoch_store
1410                .set_randomness_manager(randomness_manager)
1411                .await?;
1412        }
1413
1414        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1415            state.clone(),
1416            checkpoint_service.clone(),
1417            epoch_store.clone(),
1418            low_scoring_authorities,
1419            backpressure_manager,
1420        );
1421
1422        info!("Starting consensus manager");
1423
1424        consensus_manager
1425            .start(
1426                config,
1427                epoch_store.clone(),
1428                consensus_handler_initializer,
1429                IotaTxValidator::new(
1430                    epoch_store.clone(),
1431                    checkpoint_service.clone(),
1432                    state.transaction_manager().clone(),
1433                    iota_tx_validator_metrics.clone(),
1434                ),
1435            )
1436            .await;
1437
1438        info!("Spawning checkpoint service");
1439        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1440
1441        if epoch_store.authenticator_state_enabled() {
1442            Self::start_jwk_updater(
1443                config,
1444                iota_node_metrics,
1445                state.name,
1446                epoch_store.clone(),
1447                consensus_adapter.clone(),
1448            );
1449        }
1450
1451        Ok(ValidatorComponents {
1452            validator_server_handle,
1453            validator_overload_monitor_handle,
1454            consensus_manager,
1455            consensus_store_pruner,
1456            consensus_adapter,
1457            checkpoint_service_tasks,
1458            checkpoint_metrics,
1459            iota_tx_validator_metrics,
1460            validator_registry_id,
1461        })
1462    }
1463
1464    /// Starts the checkpoint service for the validator node, initializing
1465    /// necessary components and settings.
1466    /// The function ensures proper initialization of the checkpoint service,
1467    /// preparing it to handle checkpoint creation and submission to consensus,
1468    /// while also setting up the necessary monitoring and synchronization
1469    /// mechanisms.
1470    fn build_checkpoint_service(
1471        config: &NodeConfig,
1472        consensus_adapter: Arc<ConsensusAdapter>,
1473        checkpoint_store: Arc<CheckpointStore>,
1474        epoch_store: Arc<AuthorityPerEpochStore>,
1475        state: Arc<AuthorityState>,
1476        state_sync_handle: state_sync::Handle,
1477        accumulator: Weak<StateAccumulator>,
1478        checkpoint_metrics: Arc<CheckpointMetrics>,
1479    ) -> Arc<CheckpointService> {
1480        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1481        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1482
1483        debug!(
1484            "Starting checkpoint service with epoch start timestamp {}
1485            and epoch duration {}",
1486            epoch_start_timestamp_ms, epoch_duration_ms
1487        );
1488
1489        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1490            sender: consensus_adapter,
1491            signer: state.secret.clone(),
1492            authority: config.authority_public_key(),
1493            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1494                .checked_add(epoch_duration_ms)
1495                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1496            metrics: checkpoint_metrics.clone(),
1497        });
1498
1499        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1500        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1501        let max_checkpoint_size_bytes =
1502            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1503
1504        CheckpointService::build(
1505            state.clone(),
1506            checkpoint_store,
1507            epoch_store,
1508            state.get_transaction_cache_reader().clone(),
1509            accumulator,
1510            checkpoint_output,
1511            Box::new(certified_checkpoint_output),
1512            checkpoint_metrics,
1513            max_tx_per_checkpoint,
1514            max_checkpoint_size_bytes,
1515        )
1516    }
1517
1518    fn construct_consensus_adapter(
1519        committee: &Committee,
1520        consensus_config: &ConsensusConfig,
1521        authority: AuthorityName,
1522        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1523        prometheus_registry: &Registry,
1524        consensus_client: Arc<dyn ConsensusClient>,
1525        checkpoint_store: Arc<CheckpointStore>,
1526    ) -> ConsensusAdapter {
1527        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1528        // The consensus adapter allows the authority to send user certificates through
1529        // consensus.
1530
1531        ConsensusAdapter::new(
1532            consensus_client,
1533            checkpoint_store,
1534            authority,
1535            connection_monitor_status,
1536            consensus_config.max_pending_transactions(),
1537            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1538            consensus_config.max_submit_position,
1539            consensus_config.submit_delay_step_override(),
1540            ca_metrics,
1541        )
1542    }
1543
1544    async fn start_grpc_validator_service(
1545        config: &NodeConfig,
1546        state: Arc<AuthorityState>,
1547        consensus_adapter: Arc<ConsensusAdapter>,
1548        prometheus_registry: &Registry,
1549    ) -> Result<SpawnOnce> {
1550        let validator_service = ValidatorService::new(
1551            state.clone(),
1552            consensus_adapter,
1553            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1554            TrafficControllerMetrics::new(prometheus_registry),
1555            config.policy_config.clone(),
1556            config.firewall_config.clone(),
1557        );
1558
1559        let mut server_conf = iota_network_stack::config::Config::new();
1560        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1561        server_conf.load_shed = config.grpc_load_shed;
1562        let server_builder =
1563            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1564                .add_service(ValidatorServer::new(validator_service));
1565
1566        let tls_config = iota_tls::create_rustls_server_config(
1567            config.network_key_pair().copy().private(),
1568            IOTA_TLS_SERVER_NAME.to_string(),
1569        );
1570
1571        let network_address = config.network_address().clone();
1572
1573        let bind_future = async move {
1574            let server = server_builder
1575                .bind(&network_address, Some(tls_config))
1576                .await
1577                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1578
1579            let local_addr = server.local_addr();
1580            info!("Listening to traffic on {local_addr}");
1581
1582            Ok(server)
1583        };
1584
1585        Ok(SpawnOnce::new(bind_future))
1586    }
1587
1588    /// Re-executes pending consensus certificates, which may not have been
1589    /// committed to disk before the node restarted. This is necessary for
1590    /// the following reasons:
1591    ///
1592    /// 1. For any transaction for which we returned signed effects to a client,
1593    ///    we must ensure that we have re-executed the transaction before we
1594    ///    begin accepting grpc requests. Otherwise we would appear to have
1595    ///    forgotten about the transaction.
1596    /// 2. While this is running, we are concurrently waiting for all previously
1597    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1598    ///    either direction (from checkpointed consensus transactions to pending
1599    ///    consensus transactions, or vice versa), we must re-execute pending
1600    ///    consensus transactions to ensure that both processes can complete.
1601    /// 3. Also note that for any pending consensus transactions for which we
1602    ///    wrote a signed effects digest to disk, we must re-execute using that
1603    ///    digest as the expected effects digest, to ensure that we cannot
1604    ///    arrive at different effects than what we previously signed.
1605    async fn reexecute_pending_consensus_certs(
1606        epoch_store: &Arc<AuthorityPerEpochStore>,
1607        state: &Arc<AuthorityState>,
1608    ) {
1609        let mut pending_consensus_certificates = Vec::new();
1610        let mut additional_certs = Vec::new();
1611
1612        for tx in epoch_store.get_all_pending_consensus_transactions() {
1613            match tx.kind {
1614                // Shared object txns cannot be re-executed at this point, because we must wait for
1615                // consensus replay to assign shared object versions.
1616                ConsensusTransactionKind::CertifiedTransaction(tx)
1617                    if !tx.contains_shared_object() =>
1618                {
1619                    let tx = *tx;
1620                    // new_unchecked is safe because we never submit a transaction to consensus
1621                    // without verifying it
1622                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1623                        VerifiedCertificate::new_unchecked(tx),
1624                    );
1625                    // we only need to re-execute if we previously signed the effects (which
1626                    // indicates we returned the effects to a client).
1627                    if let Some(fx_digest) = epoch_store
1628                        .get_signed_effects_digest(tx.digest())
1629                        .expect("db error")
1630                    {
1631                        pending_consensus_certificates.push((tx, fx_digest));
1632                    } else {
1633                        additional_certs.push(tx);
1634                    }
1635                }
1636                _ => (),
1637            }
1638        }
1639
1640        let digests = pending_consensus_certificates
1641            .iter()
1642            .map(|(tx, _)| *tx.digest())
1643            .collect::<Vec<_>>();
1644
1645        info!(
1646            "reexecuting {} pending consensus certificates: {:?}",
1647            digests.len(),
1648            digests
1649        );
1650
1651        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1652        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1653
1654        // If this times out, the validator will still almost certainly start up fine.
1655        // But, it is possible that it may temporarily "forget" about
1656        // transactions that it had previously executed. This could confuse
1657        // clients in some circumstances. However, the transactions are still in
1658        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1659        let timeout = if cfg!(msim) { 120 } else { 60 };
1660        if tokio::time::timeout(
1661            std::time::Duration::from_secs(timeout),
1662            state
1663                .get_transaction_cache_reader()
1664                .try_notify_read_executed_effects_digests(&digests),
1665        )
1666        .await
1667        .is_err()
1668        {
1669            // Log all the digests that were not executed to help debugging.
1670            if let Ok(executed_effects_digests) = state
1671                .get_transaction_cache_reader()
1672                .try_multi_get_executed_effects_digests(&digests)
1673            {
1674                let pending_digests = digests
1675                    .iter()
1676                    .zip(executed_effects_digests.iter())
1677                    .filter_map(|(digest, executed_effects_digest)| {
1678                        if executed_effects_digest.is_none() {
1679                            Some(digest)
1680                        } else {
1681                            None
1682                        }
1683                    })
1684                    .collect::<Vec<_>>();
1685                debug_fatal!(
1686                    "Timed out waiting for effects digests to be executed: {:?}",
1687                    pending_digests
1688                );
1689            } else {
1690                debug_fatal!(
1691                    "Timed out waiting for effects digests to be executed, digests not found"
1692                );
1693            }
1694        }
1695    }
1696
1697    pub fn state(&self) -> Arc<AuthorityState> {
1698        self.state.clone()
1699    }
1700
1701    // Only used for testing because of how epoch store is loaded.
1702    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1703        self.state.reference_gas_price_for_testing()
1704    }
1705
1706    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1707        self.state.committee_store().clone()
1708    }
1709
1710    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1711    // self.state.db()
1712    // }
1713
1714    /// Clone an AuthorityAggregator currently used in this node's
1715    /// QuorumDriver, if the node is a fullnode. After reconfig,
1716    /// QuorumDriver builds a new AuthorityAggregator. The caller
1717    /// of this function will mostly likely want to call this again
1718    /// to get a fresh one.
1719    pub fn clone_authority_aggregator(
1720        &self,
1721    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1722        self.transaction_orchestrator
1723            .as_ref()
1724            .map(|to| to.clone_authority_aggregator())
1725    }
1726
1727    pub fn transaction_orchestrator(
1728        &self,
1729    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1730        self.transaction_orchestrator.clone()
1731    }
1732
1733    pub fn subscribe_to_transaction_orchestrator_effects(
1734        &self,
1735    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1736        self.transaction_orchestrator
1737            .as_ref()
1738            .map(|to| to.subscribe_to_effects_queue())
1739            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1740    }
1741
1742    /// This function awaits the completion of checkpoint execution of the
1743    /// current epoch, after which it initiates reconfiguration of the
1744    /// entire system. This function also handles role changes for the node when
1745    /// epoch changes and advertises capabilities to the committee if the node
1746    /// is a validator.
1747    pub async fn monitor_reconfiguration(
1748        self: Arc<Self>,
1749        mut epoch_store: Arc<AuthorityPerEpochStore>,
1750    ) -> Result<()> {
1751        let checkpoint_executor_metrics =
1752            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1753
1754        loop {
1755            let mut accumulator_guard = self.accumulator.lock().await;
1756            let accumulator = accumulator_guard.take().unwrap();
1757            info!(
1758                "Creating checkpoint executor for epoch {}",
1759                epoch_store.epoch()
1760            );
1761
1762            // Create closures that handle gRPC type conversion
1763            let summary_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1764                guard.as_ref().map(|handle| {
1765                    let tx = handle.checkpoint_summary_broadcaster().clone();
1766                    Box::new(move |summary: &CertifiedCheckpointSummary| {
1767                        tx.send_traced(summary);
1768                    }) as Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>
1769                })
1770            } else {
1771                None
1772            };
1773            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1774                guard.as_ref().map(|handle| {
1775                    let tx = handle.checkpoint_data_broadcaster().clone();
1776                    Box::new(move |data: &CheckpointData| {
1777                        tx.send_traced(data);
1778                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1779                })
1780            } else {
1781                None
1782            };
1783
1784            let checkpoint_executor = CheckpointExecutor::new(
1785                epoch_store.clone(),
1786                self.checkpoint_store.clone(),
1787                self.state.clone(),
1788                accumulator.clone(),
1789                self.backpressure_manager.clone(),
1790                self.config.checkpoint_executor_config.clone(),
1791                checkpoint_executor_metrics.clone(),
1792                summary_sender,
1793                data_sender,
1794            );
1795
1796            let run_with_range = self.config.run_with_range;
1797
1798            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1799
1800            // Advertise capabilities to committee, if we are a validator.
1801            if let Some(components) = &*self.validator_components.lock().await {
1802                // TODO: without this sleep, the consensus message is not delivered reliably.
1803                tokio::time::sleep(Duration::from_millis(1)).await;
1804
1805                let config = cur_epoch_store.protocol_config();
1806                let binary_config = to_binary_config(config);
1807                let transaction = ConsensusTransaction::new_capability_notification_v1(
1808                    AuthorityCapabilitiesV1::new(
1809                        self.state.name,
1810                        cur_epoch_store.get_chain_identifier().chain(),
1811                        self.config
1812                            .supported_protocol_versions
1813                            .expect("Supported versions should be populated")
1814                            // no need to send digests of versions less than the current version
1815                            .truncate_below(config.version),
1816                        self.state
1817                            .get_available_system_packages(&binary_config)
1818                            .await,
1819                    ),
1820                );
1821                info!(?transaction, "submitting capabilities to consensus");
1822                components
1823                    .consensus_adapter
1824                    .submit(transaction, None, &cur_epoch_store)?;
1825            } else if self.state.is_active_validator(&cur_epoch_store)
1826                && cur_epoch_store
1827                    .protocol_config()
1828                    .track_non_committee_eligible_validators()
1829            {
1830                // Send signed capabilities to committee validators if we are a non-committee
1831                // validator in a separate task to not block the caller. Sending is done only if
1832                // the feature flag supporting it is enabled.
1833                let epoch_store = cur_epoch_store.clone();
1834                let node_clone = self.clone();
1835                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1836                    node_clone
1837                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1838                        .await;
1839                }));
1840            }
1841
1842            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1843
1844            if stop_condition == StopReason::RunWithRangeCondition {
1845                IotaNode::shutdown(&self).await;
1846                self.shutdown_channel_tx
1847                    .send(run_with_range)
1848                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1849                return Ok(());
1850            }
1851
1852            // Safe to call because we are in the middle of reconfiguration.
1853            let latest_system_state = self
1854                .state
1855                .get_object_cache_reader()
1856                .try_get_iota_system_state_object_unsafe()
1857                .expect("Read IOTA System State object cannot fail");
1858
1859            #[cfg(msim)]
1860            if !self
1861                .sim_state
1862                .sim_safe_mode_expected
1863                .load(Ordering::Relaxed)
1864            {
1865                debug_assert!(!latest_system_state.safe_mode());
1866            }
1867
1868            #[cfg(not(msim))]
1869            debug_assert!(!latest_system_state.safe_mode());
1870
1871            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1872                if self.state.is_fullnode(&cur_epoch_store) {
1873                    warn!(
1874                        "Failed to send end of epoch notification to subscriber: {:?}",
1875                        err
1876                    );
1877                }
1878            }
1879
1880            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1881            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1882
1883            self.auth_agg.store(Arc::new(
1884                self.auth_agg
1885                    .load()
1886                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1887            ));
1888
1889            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1890            let next_epoch = next_epoch_committee.epoch();
1891            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1892
1893            info!(
1894                next_epoch,
1895                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1896            );
1897
1898            fail_point_async!("reconfig_delay");
1899
1900            // We save the connection monitor status map regardless of validator / fullnode
1901            // status so that we don't need to restart the connection monitor
1902            // every epoch. Update the mappings that will be used by the
1903            // consensus adapter if it exists or is about to be created.
1904            let authority_names_to_peer_ids =
1905                new_epoch_start_state.get_authority_names_to_peer_ids();
1906            self.connection_monitor_status
1907                .update_mapping_for_epoch(authority_names_to_peer_ids);
1908
1909            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1910
1911            send_trusted_peer_change(
1912                &self.config,
1913                &self.trusted_peer_change_tx,
1914                &new_epoch_start_state,
1915            );
1916
1917            let mut validator_components_lock_guard = self.validator_components.lock().await;
1918
1919            // The following code handles 4 different cases, depending on whether the node
1920            // was a validator in the previous epoch, and whether the node is a validator
1921            // in the new epoch.
1922            let new_epoch_store = self
1923                .reconfigure_state(
1924                    &self.state,
1925                    &cur_epoch_store,
1926                    next_epoch_committee.clone(),
1927                    new_epoch_start_state,
1928                    accumulator.clone(),
1929                )
1930                .await?;
1931
1932            let new_validator_components = if let Some(ValidatorComponents {
1933                validator_server_handle,
1934                validator_overload_monitor_handle,
1935                consensus_manager,
1936                consensus_store_pruner,
1937                consensus_adapter,
1938                mut checkpoint_service_tasks,
1939                checkpoint_metrics,
1940                iota_tx_validator_metrics,
1941                validator_registry_id,
1942            }) = validator_components_lock_guard.take()
1943            {
1944                info!("Reconfiguring the validator.");
1945                // Cancel the old checkpoint service tasks.
1946                // Waiting for checkpoint builder to finish gracefully is not possible, because
1947                // it may wait on transactions while consensus on peers have
1948                // already shut down.
1949                checkpoint_service_tasks.abort_all();
1950                while let Some(result) = checkpoint_service_tasks.join_next().await {
1951                    if let Err(err) = result {
1952                        if err.is_panic() {
1953                            std::panic::resume_unwind(err.into_panic());
1954                        }
1955                        warn!("Error in checkpoint service task: {:?}", err);
1956                    }
1957                }
1958                info!("Checkpoint service has shut down.");
1959
1960                consensus_manager.shutdown().await;
1961                info!("Consensus has shut down.");
1962
1963                info!("Epoch store finished reconfiguration.");
1964
1965                // No other components should be holding a strong reference to state accumulator
1966                // at this point. Confirm here before we swap in the new accumulator.
1967                let accumulator_metrics = Arc::into_inner(accumulator)
1968                    .expect("Accumulator should have no other references at this point")
1969                    .metrics();
1970                let new_accumulator = Arc::new(StateAccumulator::new(
1971                    self.state.get_accumulator_store().clone(),
1972                    accumulator_metrics,
1973                ));
1974                let weak_accumulator = Arc::downgrade(&new_accumulator);
1975                *accumulator_guard = Some(new_accumulator);
1976
1977                consensus_store_pruner.prune(next_epoch).await;
1978
1979                if self.state.is_committee_validator(&new_epoch_store) {
1980                    // Only restart consensus if this node is still a validator in the new epoch.
1981                    Some(
1982                        Self::start_epoch_specific_validator_components(
1983                            &self.config,
1984                            self.state.clone(),
1985                            consensus_adapter,
1986                            self.checkpoint_store.clone(),
1987                            new_epoch_store.clone(),
1988                            self.state_sync_handle.clone(),
1989                            self.randomness_handle.clone(),
1990                            consensus_manager,
1991                            consensus_store_pruner,
1992                            weak_accumulator,
1993                            self.backpressure_manager.clone(),
1994                            validator_server_handle,
1995                            validator_overload_monitor_handle,
1996                            checkpoint_metrics,
1997                            self.metrics.clone(),
1998                            iota_tx_validator_metrics,
1999                            validator_registry_id,
2000                        )
2001                        .await?,
2002                    )
2003                } else {
2004                    info!("This node is no longer a validator after reconfiguration");
2005                    if self.registry_service.remove(validator_registry_id) {
2006                        debug!("Removed validator metrics registry");
2007                    } else {
2008                        warn!("Failed to remove validator metrics registry");
2009                    }
2010                    validator_server_handle.shutdown();
2011                    debug!("Validator grpc server shutdown triggered");
2012
2013                    None
2014                }
2015            } else {
2016                // No other components should be holding a strong reference to state accumulator
2017                // at this point. Confirm here before we swap in the new accumulator.
2018                let accumulator_metrics = Arc::into_inner(accumulator)
2019                    .expect("Accumulator should have no other references at this point")
2020                    .metrics();
2021                let new_accumulator = Arc::new(StateAccumulator::new(
2022                    self.state.get_accumulator_store().clone(),
2023                    accumulator_metrics,
2024                ));
2025                let weak_accumulator = Arc::downgrade(&new_accumulator);
2026                *accumulator_guard = Some(new_accumulator);
2027
2028                if self.state.is_committee_validator(&new_epoch_store) {
2029                    info!("Promoting the node from fullnode to validator, starting grpc server");
2030
2031                    let mut components = Self::construct_validator_components(
2032                        self.config.clone(),
2033                        self.state.clone(),
2034                        Arc::new(next_epoch_committee.clone()),
2035                        new_epoch_store.clone(),
2036                        self.checkpoint_store.clone(),
2037                        self.state_sync_handle.clone(),
2038                        self.randomness_handle.clone(),
2039                        weak_accumulator,
2040                        self.backpressure_manager.clone(),
2041                        self.connection_monitor_status.clone(),
2042                        &self.registry_service,
2043                        self.metrics.clone(),
2044                    )
2045                    .await?;
2046
2047                    components.validator_server_handle =
2048                        components.validator_server_handle.start().await;
2049
2050                    Some(components)
2051                } else {
2052                    None
2053                }
2054            };
2055            *validator_components_lock_guard = new_validator_components;
2056
2057            // Force releasing current epoch store DB handle, because the
2058            // Arc<AuthorityPerEpochStore> may linger.
2059            cur_epoch_store.release_db_handles();
2060
2061            if cfg!(msim)
2062                && !matches!(
2063                    self.config
2064                        .authority_store_pruning_config
2065                        .num_epochs_to_retain_for_checkpoints(),
2066                    None | Some(u64::MAX) | Some(0)
2067                )
2068            {
2069                self.state
2070                .prune_checkpoints_for_eligible_epochs_for_testing(
2071                    self.config.clone(),
2072                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2073                )
2074                .await?;
2075            }
2076
2077            epoch_store = new_epoch_store;
2078            info!("Reconfiguration finished");
2079        }
2080    }
2081
2082    async fn shutdown(&self) {
2083        if let Some(validator_components) = &*self.validator_components.lock().await {
2084            validator_components.consensus_manager.shutdown().await;
2085        }
2086
2087        // Shutdown the gRPC server if it's running
2088        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2089            info!("Shutting down gRPC server");
2090            if let Err(e) = grpc_handle.shutdown().await {
2091                warn!("Failed to gracefully shutdown gRPC server: {e}");
2092            }
2093        }
2094    }
2095
2096    /// Asynchronously reconfigures the state of the authority node for the next
2097    /// epoch.
2098    async fn reconfigure_state(
2099        &self,
2100        state: &Arc<AuthorityState>,
2101        cur_epoch_store: &AuthorityPerEpochStore,
2102        next_epoch_committee: Committee,
2103        next_epoch_start_system_state: EpochStartSystemState,
2104        accumulator: Arc<StateAccumulator>,
2105    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2106        let next_epoch = next_epoch_committee.epoch();
2107
2108        let last_checkpoint = self
2109            .checkpoint_store
2110            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2111            .expect("Error loading last checkpoint for current epoch")
2112            .expect("Could not load last checkpoint for current epoch");
2113        let epoch_supply_change = last_checkpoint
2114            .end_of_epoch_data
2115            .as_ref()
2116            .ok_or_else(|| {
2117                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2118            })?
2119            .epoch_supply_change;
2120
2121        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2122
2123        assert_eq!(
2124            Some(last_checkpoint_seq),
2125            self.checkpoint_store
2126                .get_highest_executed_checkpoint_seq_number()
2127                .expect("Error loading highest executed checkpoint sequence number")
2128        );
2129
2130        let epoch_start_configuration = EpochStartConfiguration::new(
2131            next_epoch_start_system_state,
2132            *last_checkpoint.digest(),
2133            state.get_object_store().as_ref(),
2134            EpochFlag::default_flags_for_new_epoch(&state.config),
2135        )
2136        .expect("EpochStartConfiguration construction cannot fail");
2137
2138        let new_epoch_store = self
2139            .state
2140            .reconfigure(
2141                cur_epoch_store,
2142                self.config.supported_protocol_versions.unwrap(),
2143                next_epoch_committee,
2144                epoch_start_configuration,
2145                accumulator,
2146                &self.config.expensive_safety_check_config,
2147                epoch_supply_change,
2148                last_checkpoint_seq,
2149            )
2150            .await
2151            .expect("Reconfigure authority state cannot fail");
2152        info!(next_epoch, "Node State has been reconfigured");
2153        assert_eq!(next_epoch, new_epoch_store.epoch());
2154        self.state.get_reconfig_api().update_epoch_flags_metrics(
2155            cur_epoch_store.epoch_start_config().flags(),
2156            new_epoch_store.epoch_start_config().flags(),
2157        );
2158
2159        Ok(new_epoch_store)
2160    }
2161
2162    pub fn get_config(&self) -> &NodeConfig {
2163        &self.config
2164    }
2165
2166    async fn execute_transaction_immediately_at_zero_epoch(
2167        state: &Arc<AuthorityState>,
2168        epoch_store: &Arc<AuthorityPerEpochStore>,
2169        tx: &Transaction,
2170        span: tracing::Span,
2171    ) {
2172        let _guard = span.enter();
2173        let transaction =
2174            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2175                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2176                    tx.data().clone(),
2177                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2178                ),
2179            );
2180        state
2181            .try_execute_immediately(&transaction, None, epoch_store)
2182            .unwrap();
2183    }
2184
2185    pub fn randomness_handle(&self) -> randomness::Handle {
2186        self.randomness_handle.clone()
2187    }
2188
2189    /// Sends signed capability notification to committee validators for
2190    /// non-committee validators. This method implements retry logic to handle
2191    /// failed attempts to send the notification. It will retry sending the
2192    /// notification with an increasing interval until it receives a successful
2193    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2194    async fn send_signed_capability_notification_to_committee_with_retry(
2195        &self,
2196        epoch_store: &Arc<AuthorityPerEpochStore>,
2197    ) {
2198        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2199        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2200        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2201
2202        // Create the capability notification once
2203        let config = epoch_store.protocol_config();
2204        let binary_config = to_binary_config(config);
2205
2206        // Create the capability notification
2207        let capabilities = AuthorityCapabilitiesV1::new(
2208            self.state.name,
2209            epoch_store.get_chain_identifier().chain(),
2210            self.config
2211                .supported_protocol_versions
2212                .expect("Supported versions should be populated")
2213                .truncate_below(config.version),
2214            self.state
2215                .get_available_system_packages(&binary_config)
2216                .await,
2217        );
2218
2219        // Sign the capabilities using the authority key pair from config
2220        let signature = AuthoritySignature::new_secure(
2221            &IntentMessage::new(
2222                Intent::iota_app(IntentScope::AuthorityCapabilities),
2223                &capabilities,
2224            ),
2225            &epoch_store.epoch(),
2226            self.config.authority_key_pair(),
2227        );
2228
2229        let request = HandleCapabilityNotificationRequestV1 {
2230            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2231        };
2232
2233        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2234
2235        loop {
2236            let auth_agg = self.auth_agg.load();
2237            match auth_agg
2238                .send_capability_notification_to_quorum(request.clone())
2239                .await
2240            {
2241                Ok(_) => {
2242                    info!("Successfully sent capability notification to committee");
2243                    break;
2244                }
2245                Err(err) => {
2246                    match &err {
2247                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2248                            errors,
2249                        } => {
2250                            warn!(
2251                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2252                                retry_interval, errors
2253                            );
2254                        }
2255                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2256                            errors,
2257                        } => {
2258                            error!(
2259                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2260                                errors
2261                            );
2262                            break;
2263                        }
2264                    };
2265
2266                    // Wait before retrying
2267                    tokio::time::sleep(retry_interval).await;
2268
2269                    // Increase retry interval for the next attempt, capped at max
2270                    retry_interval = std::cmp::min(
2271                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2272                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2273                    );
2274                }
2275            }
2276        }
2277    }
2278}
2279
2280#[cfg(not(msim))]
2281impl IotaNode {
2282    async fn fetch_jwks(
2283        _authority: AuthorityName,
2284        provider: &OIDCProvider,
2285    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2286        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2287        let client = reqwest::Client::new();
2288        fetch_jwks(provider, &client)
2289            .await
2290            .map_err(|_| IotaError::JWKRetrieval)
2291    }
2292}
2293
2294#[cfg(msim)]
2295impl IotaNode {
2296    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2297        self.sim_state.sim_node.id()
2298    }
2299
2300    pub fn set_safe_mode_expected(&self, new_value: bool) {
2301        info!("Setting safe mode expected to {}", new_value);
2302        self.sim_state
2303            .sim_safe_mode_expected
2304            .store(new_value, Ordering::Relaxed);
2305    }
2306
2307    async fn fetch_jwks(
2308        authority: AuthorityName,
2309        provider: &OIDCProvider,
2310    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2311        get_jwk_injector()(authority, provider)
2312    }
2313}
2314
2315enum SpawnOnce {
2316    // Mutex is only needed to make SpawnOnce Sync
2317    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2318    #[allow(unused)]
2319    Started(iota_http::ServerHandle),
2320}
2321
2322impl SpawnOnce {
2323    pub fn new(
2324        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2325    ) -> Self {
2326        Self::Unstarted(Mutex::new(Box::pin(future)))
2327    }
2328
2329    pub async fn start(self) -> Self {
2330        match self {
2331            Self::Unstarted(future) => {
2332                let server = future
2333                    .into_inner()
2334                    .await
2335                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2336                let handle = server.handle().clone();
2337                tokio::spawn(async move {
2338                    if let Err(err) = server.serve().await {
2339                        info!("Server stopped: {err}");
2340                    }
2341                    info!("Server stopped");
2342                });
2343                Self::Started(handle)
2344            }
2345            Self::Started(_) => self,
2346        }
2347    }
2348
2349    pub fn shutdown(self) {
2350        if let SpawnOnce::Started(handle) = self {
2351            handle.trigger_shutdown();
2352        }
2353    }
2354}
2355
2356/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2357/// available.
2358fn send_trusted_peer_change(
2359    config: &NodeConfig,
2360    sender: &watch::Sender<TrustedPeerChangeEvent>,
2361    new_epoch_start_state: &EpochStartSystemState,
2362) {
2363    let new_committee =
2364        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2365
2366    sender.send_modify(|event| {
2367        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2368        event.new_committee = new_committee;
2369    })
2370}
2371
2372fn build_kv_store(
2373    state: &Arc<AuthorityState>,
2374    config: &NodeConfig,
2375    registry: &Registry,
2376) -> Result<Arc<TransactionKeyValueStore>> {
2377    let metrics = KeyValueStoreMetrics::new(registry);
2378    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2379
2380    let base_url = &config.transaction_kv_store_read_config.base_url;
2381
2382    if base_url.is_empty() {
2383        info!("no http kv store url provided, using local db only");
2384        return Ok(Arc::new(db_store));
2385    }
2386
2387    base_url.parse::<url::Url>().tap_err(|e| {
2388        error!(
2389            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2390            base_url, e
2391        )
2392    })?;
2393
2394    let http_store = HttpKVStore::new_kv(
2395        base_url,
2396        config.transaction_kv_store_read_config.cache_size,
2397        metrics.clone(),
2398    )?;
2399    info!("using local key-value store with fallback to http key-value store");
2400    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2401        db_store,
2402        http_store,
2403        metrics,
2404        "json_rpc_fallback",
2405    )))
2406}
2407
2408/// Builds and starts the gRPC server for the IOTA node based on the node's
2409/// configuration.
2410///
2411/// This function performs the following tasks:
2412/// 1. Checks if the node is a validator by inspecting the consensus
2413///    configuration; if so, it returns early as validators do not expose gRPC
2414///    APIs.
2415/// 2. Checks if gRPC is enabled in the configuration.
2416/// 3. Creates broadcast channels for checkpoint streaming.
2417/// 4. Initializes the gRPC checkpoint service.
2418/// 5. Spawns the gRPC server to listen for incoming connections.
2419///
2420/// Returns a tuple of optional broadcast channels for checkpoint summary and
2421/// data.
2422async fn build_grpc_server(
2423    config: &NodeConfig,
2424    state: Arc<AuthorityState>,
2425    state_sync_store: RocksDbStore,
2426) -> Result<Option<GrpcServerHandle>> {
2427    // Validators do not expose gRPC APIs
2428    if config.consensus_config().is_some() || !config.enable_grpc_api {
2429        return Ok(None);
2430    }
2431
2432    let Some(grpc_config) = &config.grpc_api_config else {
2433        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2434    };
2435
2436    let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2437
2438    // Create cancellation token for proper shutdown hierarchy
2439    let shutdown_token = CancellationToken::new();
2440
2441    // Create GrpcReader
2442    let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(rest_read_store));
2443
2444    // Get the subscription handler from the state for event streaming
2445    let event_subscriber =
2446        state.subscription_handler.clone() as Arc<dyn iota_grpc_server::EventSubscriber>;
2447
2448    // Pass the same token to both GrpcReader (already done above) and
2449    // start_grpc_server
2450    let handle = start_grpc_server(
2451        grpc_reader,
2452        event_subscriber,
2453        grpc_config.clone(),
2454        shutdown_token,
2455    )
2456    .await?;
2457
2458    Ok(Some(handle))
2459}
2460
2461/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2462/// REST APIs based on the node's configuration.
2463///
2464/// This function performs the following tasks:
2465/// 1. Checks if the node is a validator by inspecting the consensus
2466///    configuration; if so, it returns early as validators do not expose these
2467///    APIs.
2468/// 2. Creates an Axum router to handle HTTP requests.
2469/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2470///    on the node's state and configuration, including CoinApi,
2471///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2472///    IndexerApi.
2473/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2474///    the `/api/v1` path.
2475/// 5. Binds the server to the specified JSON-RPC address and starts listening
2476///    for incoming connections.
2477pub async fn build_http_server(
2478    state: Arc<AuthorityState>,
2479    store: RocksDbStore,
2480    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2481    config: &NodeConfig,
2482    prometheus_registry: &Registry,
2483    _custom_runtime: Option<Handle>,
2484    software_version: &'static str,
2485) -> Result<Option<iota_http::ServerHandle>> {
2486    // Validators do not expose these APIs
2487    if config.consensus_config().is_some() {
2488        return Ok(None);
2489    }
2490
2491    let mut router = axum::Router::new();
2492
2493    let json_rpc_router = {
2494        let mut server = JsonRpcServerBuilder::new(
2495            env!("CARGO_PKG_VERSION"),
2496            prometheus_registry,
2497            config.policy_config.clone(),
2498            config.firewall_config.clone(),
2499        );
2500
2501        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2502
2503        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2504        server.register_module(ReadApi::new(
2505            state.clone(),
2506            kv_store.clone(),
2507            metrics.clone(),
2508        ))?;
2509        server.register_module(CoinReadApi::new(
2510            state.clone(),
2511            kv_store.clone(),
2512            metrics.clone(),
2513        )?)?;
2514
2515        // if run_with_range is enabled we want to prevent any transactions
2516        // run_with_range = None is normal operating conditions
2517        if config.run_with_range.is_none() {
2518            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2519        }
2520        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2521
2522        if let Some(transaction_orchestrator) = transaction_orchestrator {
2523            server.register_module(TransactionExecutionApi::new(
2524                state.clone(),
2525                transaction_orchestrator.clone(),
2526                metrics.clone(),
2527            ))?;
2528        }
2529
2530        let iota_names_config = config
2531            .iota_names_config
2532            .clone()
2533            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2534
2535        server.register_module(IndexerApi::new(
2536            state.clone(),
2537            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2538            kv_store,
2539            metrics,
2540            iota_names_config,
2541            config.indexer_max_subscriptions,
2542        ))?;
2543        server.register_module(MoveUtils::new(state.clone()))?;
2544
2545        let server_type = config.jsonrpc_server_type();
2546
2547        server.to_router(server_type).await?
2548    };
2549
2550    router = router.merge(json_rpc_router);
2551
2552    if config.enable_rest_api {
2553        let mut rest_service = iota_rest_api::RestService::new(
2554            Arc::new(RestReadStore::new(state.clone(), store)),
2555            software_version,
2556        );
2557
2558        if let Some(config) = config.rest.clone() {
2559            rest_service.with_config(config);
2560        }
2561
2562        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2563
2564        if let Some(transaction_orchestrator) = transaction_orchestrator {
2565            rest_service.with_executor(transaction_orchestrator.clone())
2566        }
2567
2568        router = router.merge(rest_service.into_router());
2569    }
2570
2571    // TODO: Remove this health check when experimental REST API becomes default
2572    // This is a copy of the health check in crates/iota-rest-api/src/health.rs
2573    router = router
2574        .route("/health", axum::routing::get(health_check_handler))
2575        .route_layer(axum::Extension(state));
2576
2577    let layers = ServiceBuilder::new()
2578        .map_request(|mut request: axum::http::Request<_>| {
2579            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2580                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2581                request.extensions_mut().insert(axum_connect_info);
2582            }
2583            request
2584        })
2585        .layer(axum::middleware::from_fn(server_timing_middleware));
2586
2587    router = router.layer(layers);
2588
2589    let handle = iota_http::Builder::new()
2590        .serve(&config.json_rpc_address, router)
2591        .map_err(|e| anyhow::anyhow!("{e}"))?;
2592    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2593
2594    Ok(Some(handle))
2595}
2596
2597#[derive(Debug, serde::Serialize, serde::Deserialize)]
2598pub struct Threshold {
2599    pub threshold_seconds: Option<u32>,
2600}
2601
2602async fn health_check_handler(
2603    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2604    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2605) -> impl axum::response::IntoResponse {
2606    if let Some(threshold_seconds) = threshold_seconds {
2607        // Attempt to get the latest checkpoint
2608        let summary = match state
2609            .get_checkpoint_store()
2610            .get_highest_executed_checkpoint()
2611        {
2612            Ok(Some(summary)) => summary,
2613            Ok(None) => {
2614                warn!("Highest executed checkpoint not found");
2615                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2616            }
2617            Err(err) => {
2618                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2619                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2620            }
2621        };
2622
2623        // Calculate the threshold time based on the provided threshold_seconds
2624        let latest_chain_time = summary.timestamp();
2625        let threshold =
2626            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2627
2628        // Check if the latest checkpoint is within the threshold
2629        if latest_chain_time < threshold {
2630            warn!(
2631                ?latest_chain_time,
2632                ?threshold,
2633                "failing health check due to checkpoint lag"
2634            );
2635            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2636        }
2637    }
2638    // if health endpoint is responding and no threshold is given, respond success
2639    (axum::http::StatusCode::OK, "up")
2640}
2641
2642#[cfg(not(test))]
2643fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2644    protocol_config.max_transactions_per_checkpoint() as usize
2645}
2646
2647#[cfg(test)]
2648fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2649    2
2650}