iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30    ConsensusConfig, NodeConfig,
31    node::{DBCheckpointConfig, RunWithRange},
32    node_config_metrics::NodeConfigMetrics,
33    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36    authority::{
37        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38        authority_per_epoch_store::AuthorityPerEpochStore,
39        authority_store_pruner::ObjectsCompactionFilter,
40        authority_store_tables::{
41            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42        },
43        backpressure::BackpressureManager,
44        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45    },
46    authority_aggregator::{
47        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48    },
49    authority_client::NetworkAuthorityClient,
50    authority_server::{ValidatorService, ValidatorServiceMetrics},
51    checkpoints::{
52        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53        SubmitCheckpointToConsensus,
54        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55    },
56    connection_monitor::ConnectionMonitor,
57    consensus_adapter::{
58        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59        ConsensusClient,
60    },
61    consensus_handler::ConsensusHandlerInitializer,
62    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64    db_checkpoint_handler::DBCheckpointHandler,
65    epoch::{
66        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68        reconfiguration::ReconfigurationInitiator,
69    },
70    execution_cache::build_execution_cache,
71    jsonrpc_index::IndexStore,
72    module_cache_metrics::ResolverMetrics,
73    overload_monitor::overload_monitor,
74    rest_index::RestIndexStore,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78    storage::{RestReadStore, RocksDbStore},
79    traffic_controller::metrics::TrafficControllerMetrics,
80    transaction_orchestrator::TransactionOrchestrator,
81    validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87    transaction_builder_api::TransactionBuilderApi,
88    transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93    RegistryID, RegistryService,
94    hardware_metrics::register_hardware_metrics,
95    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96    server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::RestMetrics;
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108    FileCompression, StorageFormat,
109    http_key_value_store::HttpKVStore,
110    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111    key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114    base_types::{AuthorityName, ConciseableName, EpochId},
115    committee::Committee,
116    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117    digests::ChainIdentifier,
118    error::{IotaError, IotaResult},
119    executable_transaction::VerifiedExecutableTransaction,
120    execution_config_utils::to_binary_config,
121    full_checkpoint_content::CheckpointData,
122    iota_system_state::{
123        IotaSystemState, IotaSystemStateTrait,
124        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125    },
126    messages_consensus::{
127        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
128        SignedAuthorityCapabilitiesV1, check_total_jwk_size,
129    },
130    messages_grpc::HandleCapabilityNotificationRequestV1,
131    quorum_driver_types::QuorumDriverEffectsQueueResult,
132    supported_protocol_versions::SupportedProtocolVersions,
133    transaction::{Transaction, VerifiedCertificate},
134};
135use prometheus::Registry;
136#[cfg(msim)]
137pub use simulator::set_jwk_injector;
138#[cfg(msim)]
139use simulator::*;
140use tap::tap::TapFallible;
141use tokio::{
142    runtime::Handle,
143    sync::{Mutex, broadcast, mpsc, watch},
144    task::{JoinHandle, JoinSet},
145};
146use tokio_util::sync::CancellationToken;
147use tower::ServiceBuilder;
148use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
149use typed_store::{
150    DBMetrics,
151    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
152};
153
154use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
155
156pub mod admin;
157mod handle;
158pub mod metrics;
159
160pub struct ValidatorComponents {
161    validator_server_handle: SpawnOnce,
162    validator_overload_monitor_handle: Option<JoinHandle<()>>,
163    consensus_manager: ConsensusManager,
164    consensus_store_pruner: ConsensusStorePruner,
165    consensus_adapter: Arc<ConsensusAdapter>,
166    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
167    checkpoint_service_tasks: JoinSet<()>,
168    checkpoint_metrics: Arc<CheckpointMetrics>,
169    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
170    validator_registry_id: RegistryID,
171}
172
173#[cfg(msim)]
174mod simulator {
175    use std::sync::atomic::AtomicBool;
176
177    use super::*;
178
179    pub(super) struct SimState {
180        pub sim_node: iota_simulator::runtime::NodeHandle,
181        pub sim_safe_mode_expected: AtomicBool,
182        _leak_detector: iota_simulator::NodeLeakDetector,
183    }
184
185    impl Default for SimState {
186        fn default() -> Self {
187            Self {
188                sim_node: iota_simulator::runtime::NodeHandle::current(),
189                sim_safe_mode_expected: AtomicBool::new(false),
190                _leak_detector: iota_simulator::NodeLeakDetector::new(),
191            }
192        }
193    }
194
195    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
196        + Send
197        + Sync
198        + 'static;
199
200    fn default_fetch_jwks(
201        _authority: AuthorityName,
202        _provider: &OIDCProvider,
203    ) -> IotaResult<Vec<(JwkId, JWK)>> {
204        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
205        // Just load a default Twitch jwk for testing.
206        parse_jwks(
207            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
208            &OIDCProvider::Twitch,
209        )
210        .map_err(|_| IotaError::JWKRetrieval)
211    }
212
213    thread_local! {
214        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
215    }
216
217    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
218        JWK_INJECTOR.with(|injector| injector.borrow().clone())
219    }
220
221    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
222        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
223    }
224}
225
226pub struct IotaNode {
227    config: NodeConfig,
228    validator_components: Mutex<Option<ValidatorComponents>>,
229    /// The http server responsible for serving JSON-RPC as well as the
230    /// experimental rest service
231    _http_server: Option<iota_http::ServerHandle>,
232    state: Arc<AuthorityState>,
233    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
234    registry_service: RegistryService,
235    metrics: Arc<IotaNodeMetrics>,
236
237    _discovery: discovery::Handle,
238    state_sync_handle: state_sync::Handle,
239    randomness_handle: randomness::Handle,
240    checkpoint_store: Arc<CheckpointStore>,
241    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
242    connection_monitor_status: Arc<ConnectionMonitorStatus>,
243
244    /// Broadcast channel to send the starting system state for the next epoch.
245    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
246
247    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
248    /// peers.
249    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
250
251    backpressure_manager: Arc<BackpressureManager>,
252
253    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
254
255    #[cfg(msim)]
256    sim_state: SimState,
257
258    _state_archive_handle: Option<broadcast::Sender<()>>,
259
260    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
261    // Channel to allow signaling upstream to shutdown iota-node
262    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
263
264    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
265    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
266
267    /// AuthorityAggregator of the network, created at start and beginning of
268    /// each epoch. Use ArcSwap so that we could mutate it without taking
269    /// mut reference.
270    // TODO: Eventually we can make this auth aggregator a shared reference so that this
271    // update will automatically propagate to other uses.
272    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
273}
274
275impl fmt::Debug for IotaNode {
276    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
277        f.debug_struct("IotaNode")
278            .field("name", &self.state.name.concise())
279            .finish()
280    }
281}
282
283static MAX_JWK_KEYS_PER_FETCH: usize = 100;
284
285impl IotaNode {
286    pub async fn start(
287        config: NodeConfig,
288        registry_service: RegistryService,
289        custom_rpc_runtime: Option<Handle>,
290    ) -> Result<Arc<IotaNode>> {
291        Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
292    }
293
294    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
295    /// configuration.
296    /// This function ensures continuous fetching, validation, and submission of
297    /// JWKs, maintaining up-to-date keys for the specified providers.
298    fn start_jwk_updater(
299        config: &NodeConfig,
300        metrics: Arc<IotaNodeMetrics>,
301        authority: AuthorityName,
302        epoch_store: Arc<AuthorityPerEpochStore>,
303        consensus_adapter: Arc<ConsensusAdapter>,
304    ) {
305        let epoch = epoch_store.epoch();
306
307        let supported_providers = config
308            .zklogin_oauth_providers
309            .get(&epoch_store.get_chain_identifier().chain())
310            .unwrap_or(&BTreeSet::new())
311            .iter()
312            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
313            .collect::<Vec<_>>();
314
315        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
316
317        info!(
318            ?fetch_interval,
319            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
320        );
321
322        fn validate_jwk(
323            metrics: &Arc<IotaNodeMetrics>,
324            provider: &OIDCProvider,
325            id: &JwkId,
326            jwk: &JWK,
327        ) -> bool {
328            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
329                warn!(
330                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
331                    id.iss, provider
332                );
333                metrics
334                    .invalid_jwks
335                    .with_label_values(&[&provider.to_string()])
336                    .inc();
337                return false;
338            };
339
340            if iss_provider != *provider {
341                warn!(
342                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
343                    id.iss, provider, iss_provider
344                );
345                metrics
346                    .invalid_jwks
347                    .with_label_values(&[&provider.to_string()])
348                    .inc();
349                return false;
350            }
351
352            if !check_total_jwk_size(id, jwk) {
353                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
354                metrics
355                    .invalid_jwks
356                    .with_label_values(&[&provider.to_string()])
357                    .inc();
358                return false;
359            }
360
361            true
362        }
363
364        // metrics is:
365        //  pub struct IotaNodeMetrics {
366        //      pub jwk_requests: IntCounterVec,
367        //      pub jwk_request_errors: IntCounterVec,
368        //      pub total_jwks: IntCounterVec,
369        //      pub unique_jwks: IntCounterVec,
370        //  }
371
372        for p in supported_providers.into_iter() {
373            let provider_str = p.to_string();
374            let epoch_store = epoch_store.clone();
375            let consensus_adapter = consensus_adapter.clone();
376            let metrics = metrics.clone();
377            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
378                async move {
379                    // note: restart-safe de-duplication happens after consensus, this is
380                    // just best-effort to reduce unneeded submissions.
381                    let mut seen = HashSet::new();
382                    loop {
383                        info!("fetching JWK for provider {:?}", p);
384                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
385                        match Self::fetch_jwks(authority, &p).await {
386                            Err(e) => {
387                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
388                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
389                                // Retry in 30 seconds
390                                tokio::time::sleep(Duration::from_secs(30)).await;
391                                continue;
392                            }
393                            Ok(mut keys) => {
394                                metrics.total_jwks
395                                    .with_label_values(&[&provider_str])
396                                    .inc_by(keys.len() as u64);
397
398                                keys.retain(|(id, jwk)| {
399                                    validate_jwk(&metrics, &p, id, jwk) &&
400                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
401                                    seen.insert((id.clone(), jwk.clone()))
402                                });
403
404                                metrics.unique_jwks
405                                    .with_label_values(&[&provider_str])
406                                    .inc_by(keys.len() as u64);
407
408                                // prevent oauth providers from sending too many keys,
409                                // inadvertently or otherwise
410                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
411                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
412                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
413                                }
414
415                                for (id, jwk) in keys.into_iter() {
416                                    info!("Submitting JWK to consensus: {:?}", id);
417
418                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
419                                    consensus_adapter.submit(txn, None, &epoch_store)
420                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
421                                        .ok();
422                                }
423                            }
424                        }
425                        tokio::time::sleep(fetch_interval).await;
426                    }
427                }
428                .instrument(error_span!("jwk_updater_task", epoch)),
429            ));
430        }
431    }
432
433    pub async fn start_async(
434        config: NodeConfig,
435        registry_service: RegistryService,
436        custom_rpc_runtime: Option<Handle>,
437        software_version: &'static str,
438    ) -> Result<Arc<IotaNode>> {
439        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
440        let mut config = config.clone();
441        if config.supported_protocol_versions.is_none() {
442            info!(
443                "populating config.supported_protocol_versions with default {:?}",
444                SupportedProtocolVersions::SYSTEM_DEFAULT
445            );
446            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
447        }
448
449        let run_with_range = config.run_with_range;
450        let is_validator = config.consensus_config().is_some();
451        let is_full_node = !is_validator;
452        let prometheus_registry = registry_service.default_registry();
453
454        info!(node =? config.authority_public_key(),
455            "Initializing iota-node listening on {}", config.network_address
456        );
457
458        let genesis = config.genesis()?.clone();
459
460        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
461        info!("IOTA chain identifier: {chain_identifier}");
462
463        // Check and set the db_corrupted flag
464        let db_corrupted_path = &config.db_path().join("status");
465        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
466            panic!("Failed to check database corruption: {err}");
467        }
468
469        // Initialize metrics to track db usage before creating any stores
470        DBMetrics::init(&prometheus_registry);
471
472        // Initialize IOTA metrics.
473        iota_metrics::init_metrics(&prometheus_registry);
474        // Unsupported (because of the use of static variable) and unnecessary in
475        // simtests.
476        #[cfg(not(msim))]
477        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
478
479        // Register hardware metrics.
480        register_hardware_metrics(&registry_service, &config.db_path)
481            .expect("Failed registering hardware metrics");
482        // Register uptime metric
483        prometheus_registry
484            .register(iota_metrics::uptime_metric(
485                if is_validator {
486                    "validator"
487                } else {
488                    "fullnode"
489                },
490                software_version,
491                &chain_identifier.to_string(),
492            ))
493            .expect("Failed registering uptime metric");
494
495        // If genesis come with some migration data then load them into memory from the
496        // file path specified in config.
497        let migration_tx_data = if genesis.contains_migrations() {
498            // Here the load already verifies that the content of the migration blob is
499            // valid in respect to the content found in genesis
500            Some(config.load_migration_tx_data()?)
501        } else {
502            None
503        };
504
505        let secret = Arc::pin(config.authority_key_pair().copy());
506        let genesis_committee = genesis.committee()?;
507        let committee_store = Arc::new(CommitteeStore::new(
508            config.db_path().join("epochs"),
509            &genesis_committee,
510            None,
511        ));
512
513        let mut pruner_db = None;
514        if config
515            .authority_store_pruning_config
516            .enable_compaction_filter
517        {
518            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
519                &config.db_path().join("store"),
520            )));
521        }
522        let compaction_filter = pruner_db
523            .clone()
524            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
525
526        // By default, only enable write stall on validators for perpetual db.
527        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
528        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
529            enable_write_stall,
530            compaction_filter,
531        };
532        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
533            &config.db_path().join("store"),
534            Some(perpetual_tables_options),
535        ));
536        let is_genesis = perpetual_tables
537            .database_is_empty()
538            .expect("Database read should not fail at init.");
539        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
540        let backpressure_manager =
541            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
542
543        let store = AuthorityStore::open(
544            perpetual_tables,
545            &genesis,
546            &config,
547            &prometheus_registry,
548            migration_tx_data.as_ref(),
549        )
550        .await?;
551
552        let cur_epoch = store.get_recovery_epoch_at_restart()?;
553        let committee = committee_store
554            .get_committee(&cur_epoch)?
555            .expect("Committee of the current epoch must exist");
556        let epoch_start_configuration = store
557            .get_epoch_start_configuration()?
558            .expect("EpochStartConfiguration of the current epoch must exist");
559        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
560        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
561
562        let cache_traits = build_execution_cache(
563            &config.execution_cache_config,
564            &epoch_start_configuration,
565            &prometheus_registry,
566            &store,
567            backpressure_manager.clone(),
568        );
569
570        let auth_agg = {
571            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
572            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
573            Arc::new(ArcSwap::new(Arc::new(
574                AuthorityAggregator::new_from_epoch_start_state(
575                    epoch_start_configuration.epoch_start_state(),
576                    &committee_store,
577                    safe_client_metrics_base,
578                    auth_agg_metrics,
579                ),
580            )))
581        };
582
583        let chain = match config.chain_override_for_testing {
584            Some(chain) => chain,
585            None => chain_identifier.chain(),
586        };
587
588        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
589        let epoch_store = AuthorityPerEpochStore::new(
590            config.authority_public_key(),
591            committee.clone(),
592            &config.db_path().join("store"),
593            Some(epoch_options.options),
594            EpochMetrics::new(&registry_service.default_registry()),
595            epoch_start_configuration,
596            cache_traits.backing_package_store.clone(),
597            cache_traits.object_store.clone(),
598            cache_metrics,
599            signature_verifier_metrics,
600            &config.expensive_safety_check_config,
601            (chain_identifier, chain),
602            checkpoint_store
603                .get_highest_executed_checkpoint_seq_number()
604                .expect("checkpoint store read cannot fail")
605                .unwrap_or(0),
606        )?;
607
608        info!("created epoch store");
609
610        replay_log!(
611            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
612            epoch_store.epoch(),
613            epoch_store.protocol_config()
614        );
615
616        // the database is empty at genesis time
617        if is_genesis {
618            info!("checking IOTA conservation at genesis");
619            // When we are opening the db table, the only time when it's safe to
620            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
621            // an epoch and the IOTA conservation check will fail. This also initialize
622            // the expected_network_iota_amount table.
623            cache_traits
624                .reconfig_api
625                .try_expensive_check_iota_conservation(&epoch_store, None)
626                .expect("IOTA conservation check cannot fail at genesis");
627        }
628
629        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
630        let default_buffer_stake = epoch_store
631            .protocol_config()
632            .buffer_stake_for_protocol_upgrade_bps();
633        if effective_buffer_stake != default_buffer_stake {
634            warn!(
635                ?effective_buffer_stake,
636                ?default_buffer_stake,
637                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
638            );
639        }
640
641        checkpoint_store.insert_genesis_checkpoint(
642            genesis.checkpoint(),
643            genesis.checkpoint_contents().clone(),
644            &epoch_store,
645        );
646
647        // Database has everything from genesis, set corrupted key to 0
648        unmark_db_corruption(db_corrupted_path)?;
649
650        info!("creating state sync store");
651        let state_sync_store = RocksDbStore::new(
652            cache_traits.clone(),
653            committee_store.clone(),
654            checkpoint_store.clone(),
655        );
656
657        let index_store = if is_full_node && config.enable_index_processing {
658            info!("creating index store");
659            Some(Arc::new(IndexStore::new(
660                config.db_path().join("indexes"),
661                &prometheus_registry,
662                epoch_store
663                    .protocol_config()
664                    .max_move_identifier_len_as_option(),
665            )))
666        } else {
667            None
668        };
669
670        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
671        {
672            Some(Arc::new(
673                RestIndexStore::new(
674                    config.db_path().join("rest_index"),
675                    &store,
676                    &checkpoint_store,
677                    &epoch_store,
678                    &cache_traits.backing_package_store,
679                )
680                .await,
681            ))
682        } else {
683            None
684        };
685
686        info!("creating archive reader");
687        // Create network
688        // TODO only configure validators as seed/preferred peers for validators and not
689        // for fullnodes once we've had a chance to re-work fullnode
690        // configuration generation.
691        let archive_readers =
692            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
693        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
694        let (randomness_tx, randomness_rx) = mpsc::channel(
695            config
696                .p2p_config
697                .randomness
698                .clone()
699                .unwrap_or_default()
700                .mailbox_capacity(),
701        );
702        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
703            Self::create_p2p_network(
704                &config,
705                state_sync_store.clone(),
706                chain_identifier,
707                trusted_peer_change_rx,
708                archive_readers.clone(),
709                randomness_tx,
710                &prometheus_registry,
711            )?;
712
713        // We must explicitly send this instead of relying on the initial value to
714        // trigger watch value change, so that state-sync is able to process it.
715        send_trusted_peer_change(
716            &config,
717            &trusted_peer_change_tx,
718            epoch_store.epoch_start_state(),
719        );
720
721        info!("start state archival");
722        // Start archiving local state to remote store
723        let state_archive_handle =
724            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
725                .await?;
726
727        info!("start snapshot upload");
728        // Start uploading state snapshot to remote store
729        let state_snapshot_handle =
730            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
731
732        // Start uploading db checkpoints to remote store
733        info!("start db checkpoint");
734        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735            &config,
736            &prometheus_registry,
737            state_snapshot_handle.is_some(),
738        )?;
739
740        let mut genesis_objects = genesis.objects().to_vec();
741        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
742            genesis_objects.extend(migration_tx_data.get_objects());
743        }
744
745        let authority_name = config.authority_public_key();
746        let validator_tx_finalizer =
747            config
748                .enable_validator_tx_finalizer
749                .then_some(Arc::new(ValidatorTxFinalizer::new(
750                    auth_agg.clone(),
751                    authority_name,
752                    &prometheus_registry,
753                )));
754
755        info!("create authority state");
756        let state = AuthorityState::new(
757            authority_name,
758            secret,
759            config.supported_protocol_versions.unwrap(),
760            store.clone(),
761            cache_traits.clone(),
762            epoch_store.clone(),
763            committee_store.clone(),
764            index_store.clone(),
765            rest_index,
766            checkpoint_store.clone(),
767            &prometheus_registry,
768            &genesis_objects,
769            &db_checkpoint_config,
770            config.clone(),
771            archive_readers,
772            validator_tx_finalizer,
773            chain_identifier,
774            pruner_db,
775        )
776        .await;
777
778        // ensure genesis and migration txs were executed
779        if epoch_store.epoch() == 0 {
780            let genesis_tx = &genesis.transaction();
781            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
782            // Execute genesis transaction
783            Self::execute_transaction_immediately_at_zero_epoch(
784                &state,
785                &epoch_store,
786                genesis_tx,
787                span,
788            )
789            .await;
790
791            // Execute migration transactions if present
792            if let Some(migration_tx_data) = migration_tx_data {
793                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
794                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
795                    Self::execute_transaction_immediately_at_zero_epoch(
796                        &state,
797                        &epoch_store,
798                        tx,
799                        span,
800                    )
801                    .await;
802                }
803            }
804        }
805
806        // Start the loop that receives new randomness and generates transactions for
807        // it.
808        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
809
810        if config
811            .expensive_safety_check_config
812            .enable_secondary_index_checks()
813        {
814            if let Some(indexes) = state.indexes.clone() {
815                iota_core::verify_indexes::verify_indexes(
816                    state.get_accumulator_store().as_ref(),
817                    indexes,
818                )
819                .expect("secondary indexes are inconsistent");
820            }
821        }
822
823        let (end_of_epoch_channel, end_of_epoch_receiver) =
824            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
825
826        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
827            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
828                auth_agg.load_full(),
829                state.clone(),
830                end_of_epoch_receiver,
831                &config.db_path(),
832                &prometheus_registry,
833            )))
834        } else {
835            None
836        };
837
838        let http_server = build_http_server(
839            state.clone(),
840            state_sync_store.clone(),
841            &transaction_orchestrator.clone(),
842            &config,
843            &prometheus_registry,
844            custom_rpc_runtime,
845            software_version,
846        )
847        .await?;
848
849        let accumulator = Arc::new(StateAccumulator::new(
850            cache_traits.accumulator_store.clone(),
851            StateAccumulatorMetrics::new(&prometheus_registry),
852        ));
853
854        let authority_names_to_peer_ids = epoch_store
855            .epoch_start_state()
856            .get_authority_names_to_peer_ids();
857
858        let network_connection_metrics =
859            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
860
861        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
862
863        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
864            p2p_network.downgrade(),
865            network_connection_metrics,
866            HashMap::new(),
867            None,
868        );
869
870        let connection_monitor_status = ConnectionMonitorStatus {
871            connection_statuses,
872            authority_names_to_peer_ids,
873        };
874
875        let connection_monitor_status = Arc::new(connection_monitor_status);
876        let iota_node_metrics =
877            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
878
879        // Convert transaction orchestrator to executor trait object for gRPC server
880        // Note that the transaction_orchestrator (so as executor) will be None if it is
881        // a validator node or run_with_range is set
882        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
883            transaction_orchestrator
884                .clone()
885                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
886
887        let grpc_server_handle = build_grpc_server(
888            &config,
889            state.clone(),
890            state_sync_store.clone(),
891            executor,
892            &registry_service.default_registry(),
893        )
894        .await?;
895
896        let validator_components = if state.is_committee_validator(&epoch_store) {
897            let (components, _) = futures::join!(
898                Self::construct_validator_components(
899                    config.clone(),
900                    state.clone(),
901                    committee,
902                    epoch_store.clone(),
903                    checkpoint_store.clone(),
904                    state_sync_handle.clone(),
905                    randomness_handle.clone(),
906                    Arc::downgrade(&accumulator),
907                    backpressure_manager.clone(),
908                    connection_monitor_status.clone(),
909                    &registry_service,
910                    iota_node_metrics.clone(),
911                ),
912                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
913            );
914            let mut components = components?;
915
916            components.consensus_adapter.submit_recovered(&epoch_store);
917
918            // Start the gRPC server
919            components.validator_server_handle = components.validator_server_handle.start().await;
920
921            Some(components)
922        } else {
923            None
924        };
925
926        // setup shutdown channel
927        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
928
929        let node = Self {
930            config,
931            validator_components: Mutex::new(validator_components),
932            _http_server: http_server,
933            state,
934            transaction_orchestrator,
935            registry_service,
936            metrics: iota_node_metrics,
937
938            _discovery: discovery_handle,
939            state_sync_handle,
940            randomness_handle,
941            checkpoint_store,
942            accumulator: Mutex::new(Some(accumulator)),
943            end_of_epoch_channel,
944            connection_monitor_status,
945            trusted_peer_change_tx,
946            backpressure_manager,
947
948            _db_checkpoint_handle: db_checkpoint_handle,
949
950            #[cfg(msim)]
951            sim_state: Default::default(),
952
953            _state_archive_handle: state_archive_handle,
954            _state_snapshot_uploader_handle: state_snapshot_handle,
955            shutdown_channel_tx: shutdown_channel,
956
957            grpc_server_handle: Mutex::new(grpc_server_handle),
958
959            auth_agg,
960        };
961
962        info!("IotaNode started!");
963        let node = Arc::new(node);
964        let node_copy = node.clone();
965        spawn_monitored_task!(async move {
966            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
967            if let Err(error) = result {
968                warn!("Reconfiguration finished with error {:?}", error);
969            }
970        });
971
972        Ok(node)
973    }
974
975    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
976        self.end_of_epoch_channel.subscribe()
977    }
978
979    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
980        self.shutdown_channel_tx.subscribe()
981    }
982
983    pub fn current_epoch_for_testing(&self) -> EpochId {
984        self.state.current_epoch_for_testing()
985    }
986
987    pub fn db_checkpoint_path(&self) -> PathBuf {
988        self.config.db_checkpoint_path()
989    }
990
991    // Init reconfig process by starting to reject user certs
992    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
993        info!("close_epoch (current epoch = {})", epoch_store.epoch());
994        self.validator_components
995            .lock()
996            .await
997            .as_ref()
998            .ok_or_else(|| IotaError::from("Node is not a validator"))?
999            .consensus_adapter
1000            .close_epoch(epoch_store);
1001        Ok(())
1002    }
1003
1004    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1005        self.state
1006            .clear_override_protocol_upgrade_buffer_stake(epoch)
1007    }
1008
1009    pub fn set_override_protocol_upgrade_buffer_stake(
1010        &self,
1011        epoch: EpochId,
1012        buffer_stake_bps: u64,
1013    ) -> IotaResult {
1014        self.state
1015            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1016    }
1017
1018    // Testing-only API to start epoch close process.
1019    // For production code, please use the non-testing version.
1020    pub async fn close_epoch_for_testing(&self) -> IotaResult {
1021        let epoch_store = self.state.epoch_store_for_testing();
1022        self.close_epoch(&epoch_store).await
1023    }
1024
1025    async fn start_state_archival(
1026        config: &NodeConfig,
1027        prometheus_registry: &Registry,
1028        state_sync_store: RocksDbStore,
1029    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1030        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1031            let local_store_config = ObjectStoreConfig {
1032                object_store: Some(ObjectStoreType::File),
1033                directory: Some(config.archive_path()),
1034                ..Default::default()
1035            };
1036            let archive_writer = ArchiveWriter::new(
1037                local_store_config,
1038                remote_store_config.clone(),
1039                FileCompression::Zstd,
1040                StorageFormat::Blob,
1041                Duration::from_secs(600),
1042                256 * 1024 * 1024,
1043                prometheus_registry,
1044            )
1045            .await?;
1046            Ok(Some(archive_writer.start(state_sync_store).await?))
1047        } else {
1048            Ok(None)
1049        }
1050    }
1051
1052    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
1053    /// is set.
1054    fn start_state_snapshot(
1055        config: &NodeConfig,
1056        prometheus_registry: &Registry,
1057        checkpoint_store: Arc<CheckpointStore>,
1058    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1059        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1060            let snapshot_uploader = StateSnapshotUploader::new(
1061                &config.db_checkpoint_path(),
1062                &config.snapshot_path(),
1063                remote_store_config.clone(),
1064                60,
1065                prometheus_registry,
1066                checkpoint_store,
1067            )?;
1068            Ok(Some(snapshot_uploader.start()))
1069        } else {
1070            Ok(None)
1071        }
1072    }
1073
1074    fn start_db_checkpoint(
1075        config: &NodeConfig,
1076        prometheus_registry: &Registry,
1077        state_snapshot_enabled: bool,
1078    ) -> Result<(
1079        DBCheckpointConfig,
1080        Option<tokio::sync::broadcast::Sender<()>>,
1081    )> {
1082        let checkpoint_path = Some(
1083            config
1084                .db_checkpoint_config
1085                .checkpoint_path
1086                .clone()
1087                .unwrap_or_else(|| config.db_checkpoint_path()),
1088        );
1089        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1090            DBCheckpointConfig {
1091                checkpoint_path,
1092                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1093                    true
1094                } else {
1095                    config
1096                        .db_checkpoint_config
1097                        .perform_db_checkpoints_at_epoch_end
1098                },
1099                ..config.db_checkpoint_config.clone()
1100            }
1101        } else {
1102            config.db_checkpoint_config.clone()
1103        };
1104
1105        match (
1106            db_checkpoint_config.object_store_config.as_ref(),
1107            state_snapshot_enabled,
1108        ) {
1109            // If db checkpoint config object store not specified but
1110            // state snapshot object store is specified, create handler
1111            // anyway for marking db checkpoints as completed so that they
1112            // can be uploaded as state snapshots.
1113            (None, false) => Ok((db_checkpoint_config, None)),
1114            (_, _) => {
1115                let handler = DBCheckpointHandler::new(
1116                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1117                    db_checkpoint_config.object_store_config.as_ref(),
1118                    60,
1119                    db_checkpoint_config
1120                        .prune_and_compact_before_upload
1121                        .unwrap_or(true),
1122                    config.authority_store_pruning_config.clone(),
1123                    prometheus_registry,
1124                    state_snapshot_enabled,
1125                )?;
1126                Ok((
1127                    db_checkpoint_config,
1128                    Some(DBCheckpointHandler::start(handler)),
1129                ))
1130            }
1131        }
1132    }
1133
1134    fn create_p2p_network(
1135        config: &NodeConfig,
1136        state_sync_store: RocksDbStore,
1137        chain_identifier: ChainIdentifier,
1138        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1139        archive_readers: ArchiveReaderBalancer,
1140        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1141        prometheus_registry: &Registry,
1142    ) -> Result<(
1143        Network,
1144        discovery::Handle,
1145        state_sync::Handle,
1146        randomness::Handle,
1147    )> {
1148        let (state_sync, state_sync_server) = state_sync::Builder::new()
1149            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1150            .store(state_sync_store)
1151            .archive_readers(archive_readers)
1152            .with_metrics(prometheus_registry)
1153            .build();
1154
1155        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1156            .config(config.p2p_config.clone())
1157            .build();
1158
1159        let (randomness, randomness_router) =
1160            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1161                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1162                .with_metrics(prometheus_registry)
1163                .build();
1164
1165        let p2p_network = {
1166            let routes = anemo::Router::new()
1167                .add_rpc_service(discovery_server)
1168                .add_rpc_service(state_sync_server);
1169            let routes = routes.merge(randomness_router);
1170
1171            let inbound_network_metrics =
1172                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1173            let outbound_network_metrics =
1174                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1175
1176            let service = ServiceBuilder::new()
1177                .layer(
1178                    TraceLayer::new_for_server_errors()
1179                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1180                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1181                )
1182                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1183                    Arc::new(inbound_network_metrics),
1184                    config.p2p_config.excessive_message_size(),
1185                )))
1186                .service(routes);
1187
1188            let outbound_layer = ServiceBuilder::new()
1189                .layer(
1190                    TraceLayer::new_for_client_and_server_errors()
1191                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1192                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1193                )
1194                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1195                    Arc::new(outbound_network_metrics),
1196                    config.p2p_config.excessive_message_size(),
1197                )))
1198                .into_inner();
1199
1200            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1201            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1202            // many staking events in the epoch change txn.
1203            anemo_config.max_frame_size = Some(1 << 30);
1204
1205            // Set a higher default value for socket send/receive buffers if not already
1206            // configured.
1207            let mut quic_config = anemo_config.quic.unwrap_or_default();
1208            if quic_config.socket_send_buffer_size.is_none() {
1209                quic_config.socket_send_buffer_size = Some(20 << 20);
1210            }
1211            if quic_config.socket_receive_buffer_size.is_none() {
1212                quic_config.socket_receive_buffer_size = Some(20 << 20);
1213            }
1214            quic_config.allow_failed_socket_buffer_size_setting = true;
1215
1216            // Set high-performance defaults for quinn transport.
1217            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1218            if quic_config.max_concurrent_bidi_streams.is_none() {
1219                quic_config.max_concurrent_bidi_streams = Some(500);
1220            }
1221            if quic_config.max_concurrent_uni_streams.is_none() {
1222                quic_config.max_concurrent_uni_streams = Some(500);
1223            }
1224            if quic_config.stream_receive_window.is_none() {
1225                quic_config.stream_receive_window = Some(100 << 20);
1226            }
1227            if quic_config.receive_window.is_none() {
1228                quic_config.receive_window = Some(200 << 20);
1229            }
1230            if quic_config.send_window.is_none() {
1231                quic_config.send_window = Some(200 << 20);
1232            }
1233            if quic_config.crypto_buffer_size.is_none() {
1234                quic_config.crypto_buffer_size = Some(1 << 20);
1235            }
1236            if quic_config.max_idle_timeout_ms.is_none() {
1237                quic_config.max_idle_timeout_ms = Some(30_000);
1238            }
1239            if quic_config.keep_alive_interval_ms.is_none() {
1240                quic_config.keep_alive_interval_ms = Some(5_000);
1241            }
1242            anemo_config.quic = Some(quic_config);
1243
1244            let server_name = format!("iota-{chain_identifier}");
1245            let network = Network::bind(config.p2p_config.listen_address)
1246                .server_name(&server_name)
1247                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1248                .config(anemo_config)
1249                .outbound_request_layer(outbound_layer)
1250                .start(service)?;
1251            info!(
1252                server_name = server_name,
1253                "P2p network started on {}",
1254                network.local_addr()
1255            );
1256
1257            network
1258        };
1259
1260        let discovery_handle =
1261            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1262        let state_sync_handle = state_sync.start(p2p_network.clone());
1263        let randomness_handle = randomness.start(p2p_network.clone());
1264
1265        Ok((
1266            p2p_network,
1267            discovery_handle,
1268            state_sync_handle,
1269            randomness_handle,
1270        ))
1271    }
1272
1273    /// Asynchronously constructs and initializes the components necessary for
1274    /// the validator node.
1275    async fn construct_validator_components(
1276        config: NodeConfig,
1277        state: Arc<AuthorityState>,
1278        committee: Arc<Committee>,
1279        epoch_store: Arc<AuthorityPerEpochStore>,
1280        checkpoint_store: Arc<CheckpointStore>,
1281        state_sync_handle: state_sync::Handle,
1282        randomness_handle: randomness::Handle,
1283        accumulator: Weak<StateAccumulator>,
1284        backpressure_manager: Arc<BackpressureManager>,
1285        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1286        registry_service: &RegistryService,
1287        iota_node_metrics: Arc<IotaNodeMetrics>,
1288    ) -> Result<ValidatorComponents> {
1289        let mut config_clone = config.clone();
1290        let consensus_config = config_clone
1291            .consensus_config
1292            .as_mut()
1293            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1294        let validator_registry = Registry::new();
1295        let validator_registry_id = registry_service.add(validator_registry.clone());
1296
1297        let client = Arc::new(UpdatableConsensusClient::new());
1298        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1299            &committee,
1300            consensus_config,
1301            state.name,
1302            connection_monitor_status.clone(),
1303            &validator_registry,
1304            client.clone(),
1305            checkpoint_store.clone(),
1306        ));
1307        let consensus_manager = ConsensusManager::new(
1308            &config,
1309            consensus_config,
1310            registry_service,
1311            &validator_registry,
1312            client,
1313        );
1314
1315        // This only gets started up once, not on every epoch. (Make call to remove
1316        // every epoch.)
1317        let consensus_store_pruner = ConsensusStorePruner::new(
1318            consensus_manager.get_storage_base_path(),
1319            consensus_config.db_retention_epochs(),
1320            consensus_config.db_pruner_period(),
1321            &validator_registry,
1322        );
1323
1324        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1325        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1326
1327        let validator_server_handle = Self::start_grpc_validator_service(
1328            &config,
1329            state.clone(),
1330            consensus_adapter.clone(),
1331            &validator_registry,
1332        )
1333        .await?;
1334
1335        // Starts an overload monitor that monitors the execution of the authority.
1336        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1337        let validator_overload_monitor_handle = if config
1338            .authority_overload_config
1339            .max_load_shedding_percentage
1340            > 0
1341        {
1342            let authority_state = Arc::downgrade(&state);
1343            let overload_config = config.authority_overload_config.clone();
1344            fail_point!("starting_overload_monitor");
1345            Some(spawn_monitored_task!(overload_monitor(
1346                authority_state,
1347                overload_config,
1348            )))
1349        } else {
1350            None
1351        };
1352
1353        Self::start_epoch_specific_validator_components(
1354            &config,
1355            state.clone(),
1356            consensus_adapter,
1357            checkpoint_store,
1358            epoch_store,
1359            state_sync_handle,
1360            randomness_handle,
1361            consensus_manager,
1362            consensus_store_pruner,
1363            accumulator,
1364            backpressure_manager,
1365            validator_server_handle,
1366            validator_overload_monitor_handle,
1367            checkpoint_metrics,
1368            iota_node_metrics,
1369            iota_tx_validator_metrics,
1370            validator_registry_id,
1371        )
1372        .await
1373    }
1374
1375    /// Initializes and starts components specific to the current
1376    /// epoch for the validator node.
1377    async fn start_epoch_specific_validator_components(
1378        config: &NodeConfig,
1379        state: Arc<AuthorityState>,
1380        consensus_adapter: Arc<ConsensusAdapter>,
1381        checkpoint_store: Arc<CheckpointStore>,
1382        epoch_store: Arc<AuthorityPerEpochStore>,
1383        state_sync_handle: state_sync::Handle,
1384        randomness_handle: randomness::Handle,
1385        consensus_manager: ConsensusManager,
1386        consensus_store_pruner: ConsensusStorePruner,
1387        accumulator: Weak<StateAccumulator>,
1388        backpressure_manager: Arc<BackpressureManager>,
1389        validator_server_handle: SpawnOnce,
1390        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1391        checkpoint_metrics: Arc<CheckpointMetrics>,
1392        iota_node_metrics: Arc<IotaNodeMetrics>,
1393        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1394        validator_registry_id: RegistryID,
1395    ) -> Result<ValidatorComponents> {
1396        let checkpoint_service = Self::build_checkpoint_service(
1397            config,
1398            consensus_adapter.clone(),
1399            checkpoint_store.clone(),
1400            epoch_store.clone(),
1401            state.clone(),
1402            state_sync_handle,
1403            accumulator,
1404            checkpoint_metrics.clone(),
1405        );
1406
1407        // create a new map that gets injected into both the consensus handler and the
1408        // consensus adapter the consensus handler will write values forwarded
1409        // from consensus, and the consensus adapter will read the values to
1410        // make decisions about which validator submits a transaction to consensus
1411        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1412
1413        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1414
1415        let randomness_manager = RandomnessManager::try_new(
1416            Arc::downgrade(&epoch_store),
1417            Box::new(consensus_adapter.clone()),
1418            randomness_handle,
1419            config.authority_key_pair(),
1420        )
1421        .await;
1422        if let Some(randomness_manager) = randomness_manager {
1423            epoch_store
1424                .set_randomness_manager(randomness_manager)
1425                .await?;
1426        }
1427
1428        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1429            state.clone(),
1430            checkpoint_service.clone(),
1431            epoch_store.clone(),
1432            low_scoring_authorities,
1433            backpressure_manager,
1434        );
1435
1436        info!("Starting consensus manager");
1437
1438        consensus_manager
1439            .start(
1440                config,
1441                epoch_store.clone(),
1442                consensus_handler_initializer,
1443                IotaTxValidator::new(
1444                    epoch_store.clone(),
1445                    checkpoint_service.clone(),
1446                    state.transaction_manager().clone(),
1447                    iota_tx_validator_metrics.clone(),
1448                ),
1449            )
1450            .await;
1451
1452        info!("Spawning checkpoint service");
1453        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1454
1455        if epoch_store.authenticator_state_enabled() {
1456            Self::start_jwk_updater(
1457                config,
1458                iota_node_metrics,
1459                state.name,
1460                epoch_store.clone(),
1461                consensus_adapter.clone(),
1462            );
1463        }
1464
1465        Ok(ValidatorComponents {
1466            validator_server_handle,
1467            validator_overload_monitor_handle,
1468            consensus_manager,
1469            consensus_store_pruner,
1470            consensus_adapter,
1471            checkpoint_service_tasks,
1472            checkpoint_metrics,
1473            iota_tx_validator_metrics,
1474            validator_registry_id,
1475        })
1476    }
1477
1478    /// Starts the checkpoint service for the validator node, initializing
1479    /// necessary components and settings.
1480    /// The function ensures proper initialization of the checkpoint service,
1481    /// preparing it to handle checkpoint creation and submission to consensus,
1482    /// while also setting up the necessary monitoring and synchronization
1483    /// mechanisms.
1484    fn build_checkpoint_service(
1485        config: &NodeConfig,
1486        consensus_adapter: Arc<ConsensusAdapter>,
1487        checkpoint_store: Arc<CheckpointStore>,
1488        epoch_store: Arc<AuthorityPerEpochStore>,
1489        state: Arc<AuthorityState>,
1490        state_sync_handle: state_sync::Handle,
1491        accumulator: Weak<StateAccumulator>,
1492        checkpoint_metrics: Arc<CheckpointMetrics>,
1493    ) -> Arc<CheckpointService> {
1494        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1495        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1496
1497        debug!(
1498            "Starting checkpoint service with epoch start timestamp {}
1499            and epoch duration {}",
1500            epoch_start_timestamp_ms, epoch_duration_ms
1501        );
1502
1503        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1504            sender: consensus_adapter,
1505            signer: state.secret.clone(),
1506            authority: config.authority_public_key(),
1507            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1508                .checked_add(epoch_duration_ms)
1509                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1510            metrics: checkpoint_metrics.clone(),
1511        });
1512
1513        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1514        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1515        let max_checkpoint_size_bytes =
1516            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1517
1518        CheckpointService::build(
1519            state.clone(),
1520            checkpoint_store,
1521            epoch_store,
1522            state.get_transaction_cache_reader().clone(),
1523            accumulator,
1524            checkpoint_output,
1525            Box::new(certified_checkpoint_output),
1526            checkpoint_metrics,
1527            max_tx_per_checkpoint,
1528            max_checkpoint_size_bytes,
1529        )
1530    }
1531
1532    fn construct_consensus_adapter(
1533        committee: &Committee,
1534        consensus_config: &ConsensusConfig,
1535        authority: AuthorityName,
1536        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1537        prometheus_registry: &Registry,
1538        consensus_client: Arc<dyn ConsensusClient>,
1539        checkpoint_store: Arc<CheckpointStore>,
1540    ) -> ConsensusAdapter {
1541        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1542        // The consensus adapter allows the authority to send user certificates through
1543        // consensus.
1544
1545        ConsensusAdapter::new(
1546            consensus_client,
1547            checkpoint_store,
1548            authority,
1549            connection_monitor_status,
1550            consensus_config.max_pending_transactions(),
1551            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1552            consensus_config.max_submit_position,
1553            consensus_config.submit_delay_step_override(),
1554            ca_metrics,
1555        )
1556    }
1557
1558    async fn start_grpc_validator_service(
1559        config: &NodeConfig,
1560        state: Arc<AuthorityState>,
1561        consensus_adapter: Arc<ConsensusAdapter>,
1562        prometheus_registry: &Registry,
1563    ) -> Result<SpawnOnce> {
1564        let validator_service = ValidatorService::new(
1565            state.clone(),
1566            consensus_adapter,
1567            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1568            TrafficControllerMetrics::new(prometheus_registry),
1569            config.policy_config.clone(),
1570            config.firewall_config.clone(),
1571        );
1572
1573        let mut server_conf = iota_network_stack::config::Config::new();
1574        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1575        server_conf.load_shed = config.grpc_load_shed;
1576        let server_builder =
1577            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1578                .add_service(ValidatorServer::new(validator_service));
1579
1580        let tls_config = iota_tls::create_rustls_server_config(
1581            config.network_key_pair().copy().private(),
1582            IOTA_TLS_SERVER_NAME.to_string(),
1583        );
1584
1585        let network_address = config.network_address().clone();
1586
1587        let bind_future = async move {
1588            let server = server_builder
1589                .bind(&network_address, Some(tls_config))
1590                .await
1591                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1592
1593            let local_addr = server.local_addr();
1594            info!("Listening to traffic on {local_addr}");
1595
1596            Ok(server)
1597        };
1598
1599        Ok(SpawnOnce::new(bind_future))
1600    }
1601
1602    /// Re-executes pending consensus certificates, which may not have been
1603    /// committed to disk before the node restarted. This is necessary for
1604    /// the following reasons:
1605    ///
1606    /// 1. For any transaction for which we returned signed effects to a client,
1607    ///    we must ensure that we have re-executed the transaction before we
1608    ///    begin accepting grpc requests. Otherwise we would appear to have
1609    ///    forgotten about the transaction.
1610    /// 2. While this is running, we are concurrently waiting for all previously
1611    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1612    ///    either direction (from checkpointed consensus transactions to pending
1613    ///    consensus transactions, or vice versa), we must re-execute pending
1614    ///    consensus transactions to ensure that both processes can complete.
1615    /// 3. Also note that for any pending consensus transactions for which we
1616    ///    wrote a signed effects digest to disk, we must re-execute using that
1617    ///    digest as the expected effects digest, to ensure that we cannot
1618    ///    arrive at different effects than what we previously signed.
1619    async fn reexecute_pending_consensus_certs(
1620        epoch_store: &Arc<AuthorityPerEpochStore>,
1621        state: &Arc<AuthorityState>,
1622    ) {
1623        let mut pending_consensus_certificates = Vec::new();
1624        let mut additional_certs = Vec::new();
1625
1626        for tx in epoch_store.get_all_pending_consensus_transactions() {
1627            match tx.kind {
1628                // Shared object txns cannot be re-executed at this point, because we must wait for
1629                // consensus replay to assign shared object versions.
1630                ConsensusTransactionKind::CertifiedTransaction(tx)
1631                    if !tx.contains_shared_object() =>
1632                {
1633                    let tx = *tx;
1634                    // new_unchecked is safe because we never submit a transaction to consensus
1635                    // without verifying it
1636                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1637                        VerifiedCertificate::new_unchecked(tx),
1638                    );
1639                    // we only need to re-execute if we previously signed the effects (which
1640                    // indicates we returned the effects to a client).
1641                    if let Some(fx_digest) = epoch_store
1642                        .get_signed_effects_digest(tx.digest())
1643                        .expect("db error")
1644                    {
1645                        pending_consensus_certificates.push((tx, fx_digest));
1646                    } else {
1647                        additional_certs.push(tx);
1648                    }
1649                }
1650                _ => (),
1651            }
1652        }
1653
1654        let digests = pending_consensus_certificates
1655            .iter()
1656            .map(|(tx, _)| *tx.digest())
1657            .collect::<Vec<_>>();
1658
1659        info!(
1660            "reexecuting {} pending consensus certificates: {:?}",
1661            digests.len(),
1662            digests
1663        );
1664
1665        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1666        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1667
1668        // If this times out, the validator will still almost certainly start up fine.
1669        // But, it is possible that it may temporarily "forget" about
1670        // transactions that it had previously executed. This could confuse
1671        // clients in some circumstances. However, the transactions are still in
1672        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1673        let timeout = if cfg!(msim) { 120 } else { 60 };
1674        if tokio::time::timeout(
1675            std::time::Duration::from_secs(timeout),
1676            state
1677                .get_transaction_cache_reader()
1678                .try_notify_read_executed_effects_digests(&digests),
1679        )
1680        .await
1681        .is_err()
1682        {
1683            // Log all the digests that were not executed to help debugging.
1684            if let Ok(executed_effects_digests) = state
1685                .get_transaction_cache_reader()
1686                .try_multi_get_executed_effects_digests(&digests)
1687            {
1688                let pending_digests = digests
1689                    .iter()
1690                    .zip(executed_effects_digests.iter())
1691                    .filter_map(|(digest, executed_effects_digest)| {
1692                        if executed_effects_digest.is_none() {
1693                            Some(digest)
1694                        } else {
1695                            None
1696                        }
1697                    })
1698                    .collect::<Vec<_>>();
1699                debug_fatal!(
1700                    "Timed out waiting for effects digests to be executed: {:?}",
1701                    pending_digests
1702                );
1703            } else {
1704                debug_fatal!(
1705                    "Timed out waiting for effects digests to be executed, digests not found"
1706                );
1707            }
1708        }
1709    }
1710
1711    pub fn state(&self) -> Arc<AuthorityState> {
1712        self.state.clone()
1713    }
1714
1715    // Only used for testing because of how epoch store is loaded.
1716    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1717        self.state.reference_gas_price_for_testing()
1718    }
1719
1720    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1721        self.state.committee_store().clone()
1722    }
1723
1724    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1725    // self.state.db()
1726    // }
1727
1728    /// Clone an AuthorityAggregator currently used in this node's
1729    /// QuorumDriver, if the node is a fullnode. After reconfig,
1730    /// QuorumDriver builds a new AuthorityAggregator. The caller
1731    /// of this function will mostly likely want to call this again
1732    /// to get a fresh one.
1733    pub fn clone_authority_aggregator(
1734        &self,
1735    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1736        self.transaction_orchestrator
1737            .as_ref()
1738            .map(|to| to.clone_authority_aggregator())
1739    }
1740
1741    pub fn transaction_orchestrator(
1742        &self,
1743    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1744        self.transaction_orchestrator.clone()
1745    }
1746
1747    pub fn subscribe_to_transaction_orchestrator_effects(
1748        &self,
1749    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1750        self.transaction_orchestrator
1751            .as_ref()
1752            .map(|to| to.subscribe_to_effects_queue())
1753            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1754    }
1755
1756    /// This function awaits the completion of checkpoint execution of the
1757    /// current epoch, after which it initiates reconfiguration of the
1758    /// entire system. This function also handles role changes for the node when
1759    /// epoch changes and advertises capabilities to the committee if the node
1760    /// is a validator.
1761    pub async fn monitor_reconfiguration(
1762        self: Arc<Self>,
1763        mut epoch_store: Arc<AuthorityPerEpochStore>,
1764    ) -> Result<()> {
1765        let checkpoint_executor_metrics =
1766            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1767
1768        loop {
1769            let mut accumulator_guard = self.accumulator.lock().await;
1770            let accumulator = accumulator_guard.take().unwrap();
1771            info!(
1772                "Creating checkpoint executor for epoch {}",
1773                epoch_store.epoch()
1774            );
1775
1776            // Create closures that handle gRPC type conversion
1777            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1778                guard.as_ref().map(|handle| {
1779                    let tx = handle.checkpoint_data_broadcaster().clone();
1780                    Box::new(move |data: &CheckpointData| {
1781                        tx.send_traced(data);
1782                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1783                })
1784            } else {
1785                None
1786            };
1787
1788            let checkpoint_executor = CheckpointExecutor::new(
1789                epoch_store.clone(),
1790                self.checkpoint_store.clone(),
1791                self.state.clone(),
1792                accumulator.clone(),
1793                self.backpressure_manager.clone(),
1794                self.config.checkpoint_executor_config.clone(),
1795                checkpoint_executor_metrics.clone(),
1796                data_sender,
1797            );
1798
1799            let run_with_range = self.config.run_with_range;
1800
1801            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1802
1803            // Advertise capabilities to committee, if we are a validator.
1804            if let Some(components) = &*self.validator_components.lock().await {
1805                // TODO: without this sleep, the consensus message is not delivered reliably.
1806                tokio::time::sleep(Duration::from_millis(1)).await;
1807
1808                let config = cur_epoch_store.protocol_config();
1809                let binary_config = to_binary_config(config);
1810                let transaction = ConsensusTransaction::new_capability_notification_v1(
1811                    AuthorityCapabilitiesV1::new(
1812                        self.state.name,
1813                        cur_epoch_store.get_chain_identifier().chain(),
1814                        self.config
1815                            .supported_protocol_versions
1816                            .expect("Supported versions should be populated")
1817                            // no need to send digests of versions less than the current version
1818                            .truncate_below(config.version),
1819                        self.state
1820                            .get_available_system_packages(&binary_config)
1821                            .await,
1822                    ),
1823                );
1824                info!(?transaction, "submitting capabilities to consensus");
1825                components
1826                    .consensus_adapter
1827                    .submit(transaction, None, &cur_epoch_store)?;
1828            } else if self.state.is_active_validator(&cur_epoch_store)
1829                && cur_epoch_store
1830                    .protocol_config()
1831                    .track_non_committee_eligible_validators()
1832            {
1833                // Send signed capabilities to committee validators if we are a non-committee
1834                // validator in a separate task to not block the caller. Sending is done only if
1835                // the feature flag supporting it is enabled.
1836                let epoch_store = cur_epoch_store.clone();
1837                let node_clone = self.clone();
1838                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1839                    node_clone
1840                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1841                        .instrument(trace_span!(
1842                            "send_signed_capability_notification_to_committee_with_retry"
1843                        ))
1844                        .await;
1845                }));
1846            }
1847
1848            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1849
1850            if stop_condition == StopReason::RunWithRangeCondition {
1851                IotaNode::shutdown(&self).await;
1852                self.shutdown_channel_tx
1853                    .send(run_with_range)
1854                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1855                return Ok(());
1856            }
1857
1858            // Safe to call because we are in the middle of reconfiguration.
1859            let latest_system_state = self
1860                .state
1861                .get_object_cache_reader()
1862                .try_get_iota_system_state_object_unsafe()
1863                .expect("Read IOTA System State object cannot fail");
1864
1865            #[cfg(msim)]
1866            if !self
1867                .sim_state
1868                .sim_safe_mode_expected
1869                .load(Ordering::Relaxed)
1870            {
1871                debug_assert!(!latest_system_state.safe_mode());
1872            }
1873
1874            #[cfg(not(msim))]
1875            debug_assert!(!latest_system_state.safe_mode());
1876
1877            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1878                if self.state.is_fullnode(&cur_epoch_store) {
1879                    warn!(
1880                        "Failed to send end of epoch notification to subscriber: {:?}",
1881                        err
1882                    );
1883                }
1884            }
1885
1886            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1887            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1888
1889            self.auth_agg.store(Arc::new(
1890                self.auth_agg
1891                    .load()
1892                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1893            ));
1894
1895            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1896            let next_epoch = next_epoch_committee.epoch();
1897            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1898
1899            info!(
1900                next_epoch,
1901                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1902            );
1903
1904            fail_point_async!("reconfig_delay");
1905
1906            // We save the connection monitor status map regardless of validator / fullnode
1907            // status so that we don't need to restart the connection monitor
1908            // every epoch. Update the mappings that will be used by the
1909            // consensus adapter if it exists or is about to be created.
1910            let authority_names_to_peer_ids =
1911                new_epoch_start_state.get_authority_names_to_peer_ids();
1912            self.connection_monitor_status
1913                .update_mapping_for_epoch(authority_names_to_peer_ids);
1914
1915            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1916
1917            send_trusted_peer_change(
1918                &self.config,
1919                &self.trusted_peer_change_tx,
1920                &new_epoch_start_state,
1921            );
1922
1923            let mut validator_components_lock_guard = self.validator_components.lock().await;
1924
1925            // The following code handles 4 different cases, depending on whether the node
1926            // was a validator in the previous epoch, and whether the node is a validator
1927            // in the new epoch.
1928            let new_epoch_store = self
1929                .reconfigure_state(
1930                    &self.state,
1931                    &cur_epoch_store,
1932                    next_epoch_committee.clone(),
1933                    new_epoch_start_state,
1934                    accumulator.clone(),
1935                )
1936                .await?;
1937
1938            let new_validator_components = if let Some(ValidatorComponents {
1939                validator_server_handle,
1940                validator_overload_monitor_handle,
1941                consensus_manager,
1942                consensus_store_pruner,
1943                consensus_adapter,
1944                mut checkpoint_service_tasks,
1945                checkpoint_metrics,
1946                iota_tx_validator_metrics,
1947                validator_registry_id,
1948            }) = validator_components_lock_guard.take()
1949            {
1950                info!("Reconfiguring the validator.");
1951                // Cancel the old checkpoint service tasks.
1952                // Waiting for checkpoint builder to finish gracefully is not possible, because
1953                // it may wait on transactions while consensus on peers have
1954                // already shut down.
1955                checkpoint_service_tasks.abort_all();
1956                while let Some(result) = checkpoint_service_tasks.join_next().await {
1957                    if let Err(err) = result {
1958                        if err.is_panic() {
1959                            std::panic::resume_unwind(err.into_panic());
1960                        }
1961                        warn!("Error in checkpoint service task: {:?}", err);
1962                    }
1963                }
1964                info!("Checkpoint service has shut down.");
1965
1966                consensus_manager.shutdown().await;
1967                info!("Consensus has shut down.");
1968
1969                info!("Epoch store finished reconfiguration.");
1970
1971                // No other components should be holding a strong reference to state accumulator
1972                // at this point. Confirm here before we swap in the new accumulator.
1973                let accumulator_metrics = Arc::into_inner(accumulator)
1974                    .expect("Accumulator should have no other references at this point")
1975                    .metrics();
1976                let new_accumulator = Arc::new(StateAccumulator::new(
1977                    self.state.get_accumulator_store().clone(),
1978                    accumulator_metrics,
1979                ));
1980                let weak_accumulator = Arc::downgrade(&new_accumulator);
1981                *accumulator_guard = Some(new_accumulator);
1982
1983                consensus_store_pruner.prune(next_epoch).await;
1984
1985                if self.state.is_committee_validator(&new_epoch_store) {
1986                    // Only restart consensus if this node is still a validator in the new epoch.
1987                    Some(
1988                        Self::start_epoch_specific_validator_components(
1989                            &self.config,
1990                            self.state.clone(),
1991                            consensus_adapter,
1992                            self.checkpoint_store.clone(),
1993                            new_epoch_store.clone(),
1994                            self.state_sync_handle.clone(),
1995                            self.randomness_handle.clone(),
1996                            consensus_manager,
1997                            consensus_store_pruner,
1998                            weak_accumulator,
1999                            self.backpressure_manager.clone(),
2000                            validator_server_handle,
2001                            validator_overload_monitor_handle,
2002                            checkpoint_metrics,
2003                            self.metrics.clone(),
2004                            iota_tx_validator_metrics,
2005                            validator_registry_id,
2006                        )
2007                        .await?,
2008                    )
2009                } else {
2010                    info!("This node is no longer a validator after reconfiguration");
2011                    if self.registry_service.remove(validator_registry_id) {
2012                        debug!("Removed validator metrics registry");
2013                    } else {
2014                        warn!("Failed to remove validator metrics registry");
2015                    }
2016                    validator_server_handle.shutdown();
2017                    debug!("Validator grpc server shutdown triggered");
2018
2019                    None
2020                }
2021            } else {
2022                // No other components should be holding a strong reference to state accumulator
2023                // at this point. Confirm here before we swap in the new accumulator.
2024                let accumulator_metrics = Arc::into_inner(accumulator)
2025                    .expect("Accumulator should have no other references at this point")
2026                    .metrics();
2027                let new_accumulator = Arc::new(StateAccumulator::new(
2028                    self.state.get_accumulator_store().clone(),
2029                    accumulator_metrics,
2030                ));
2031                let weak_accumulator = Arc::downgrade(&new_accumulator);
2032                *accumulator_guard = Some(new_accumulator);
2033
2034                if self.state.is_committee_validator(&new_epoch_store) {
2035                    info!("Promoting the node from fullnode to validator, starting grpc server");
2036
2037                    let mut components = Self::construct_validator_components(
2038                        self.config.clone(),
2039                        self.state.clone(),
2040                        Arc::new(next_epoch_committee.clone()),
2041                        new_epoch_store.clone(),
2042                        self.checkpoint_store.clone(),
2043                        self.state_sync_handle.clone(),
2044                        self.randomness_handle.clone(),
2045                        weak_accumulator,
2046                        self.backpressure_manager.clone(),
2047                        self.connection_monitor_status.clone(),
2048                        &self.registry_service,
2049                        self.metrics.clone(),
2050                    )
2051                    .await?;
2052
2053                    components.validator_server_handle =
2054                        components.validator_server_handle.start().await;
2055
2056                    Some(components)
2057                } else {
2058                    None
2059                }
2060            };
2061            *validator_components_lock_guard = new_validator_components;
2062
2063            // Force releasing current epoch store DB handle, because the
2064            // Arc<AuthorityPerEpochStore> may linger.
2065            cur_epoch_store.release_db_handles();
2066
2067            if cfg!(msim)
2068                && !matches!(
2069                    self.config
2070                        .authority_store_pruning_config
2071                        .num_epochs_to_retain_for_checkpoints(),
2072                    None | Some(u64::MAX) | Some(0)
2073                )
2074            {
2075                self.state
2076                .prune_checkpoints_for_eligible_epochs_for_testing(
2077                    self.config.clone(),
2078                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2079                )
2080                .await?;
2081            }
2082
2083            epoch_store = new_epoch_store;
2084            info!("Reconfiguration finished");
2085        }
2086    }
2087
2088    async fn shutdown(&self) {
2089        if let Some(validator_components) = &*self.validator_components.lock().await {
2090            validator_components.consensus_manager.shutdown().await;
2091        }
2092
2093        // Shutdown the gRPC server if it's running
2094        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2095            info!("Shutting down gRPC server");
2096            if let Err(e) = grpc_handle.shutdown().await {
2097                warn!("Failed to gracefully shutdown gRPC server: {e}");
2098            }
2099        }
2100    }
2101
2102    /// Asynchronously reconfigures the state of the authority node for the next
2103    /// epoch.
2104    async fn reconfigure_state(
2105        &self,
2106        state: &Arc<AuthorityState>,
2107        cur_epoch_store: &AuthorityPerEpochStore,
2108        next_epoch_committee: Committee,
2109        next_epoch_start_system_state: EpochStartSystemState,
2110        accumulator: Arc<StateAccumulator>,
2111    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2112        let next_epoch = next_epoch_committee.epoch();
2113
2114        let last_checkpoint = self
2115            .checkpoint_store
2116            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2117            .expect("Error loading last checkpoint for current epoch")
2118            .expect("Could not load last checkpoint for current epoch");
2119        let epoch_supply_change = last_checkpoint
2120            .end_of_epoch_data
2121            .as_ref()
2122            .ok_or_else(|| {
2123                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2124            })?
2125            .epoch_supply_change;
2126
2127        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2128
2129        assert_eq!(
2130            Some(last_checkpoint_seq),
2131            self.checkpoint_store
2132                .get_highest_executed_checkpoint_seq_number()
2133                .expect("Error loading highest executed checkpoint sequence number")
2134        );
2135
2136        let epoch_start_configuration = EpochStartConfiguration::new(
2137            next_epoch_start_system_state,
2138            *last_checkpoint.digest(),
2139            state.get_object_store().as_ref(),
2140            EpochFlag::default_flags_for_new_epoch(&state.config),
2141        )
2142        .expect("EpochStartConfiguration construction cannot fail");
2143
2144        let new_epoch_store = self
2145            .state
2146            .reconfigure(
2147                cur_epoch_store,
2148                self.config.supported_protocol_versions.unwrap(),
2149                next_epoch_committee,
2150                epoch_start_configuration,
2151                accumulator,
2152                &self.config.expensive_safety_check_config,
2153                epoch_supply_change,
2154                last_checkpoint_seq,
2155            )
2156            .await
2157            .expect("Reconfigure authority state cannot fail");
2158        info!(next_epoch, "Node State has been reconfigured");
2159        assert_eq!(next_epoch, new_epoch_store.epoch());
2160        self.state.get_reconfig_api().update_epoch_flags_metrics(
2161            cur_epoch_store.epoch_start_config().flags(),
2162            new_epoch_store.epoch_start_config().flags(),
2163        );
2164
2165        Ok(new_epoch_store)
2166    }
2167
2168    pub fn get_config(&self) -> &NodeConfig {
2169        &self.config
2170    }
2171
2172    async fn execute_transaction_immediately_at_zero_epoch(
2173        state: &Arc<AuthorityState>,
2174        epoch_store: &Arc<AuthorityPerEpochStore>,
2175        tx: &Transaction,
2176        span: tracing::Span,
2177    ) {
2178        let _guard = span.enter();
2179        let transaction =
2180            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2181                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2182                    tx.data().clone(),
2183                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2184                ),
2185            );
2186        state
2187            .try_execute_immediately(&transaction, None, epoch_store)
2188            .unwrap();
2189    }
2190
2191    pub fn randomness_handle(&self) -> randomness::Handle {
2192        self.randomness_handle.clone()
2193    }
2194
2195    /// Sends signed capability notification to committee validators for
2196    /// non-committee validators. This method implements retry logic to handle
2197    /// failed attempts to send the notification. It will retry sending the
2198    /// notification with an increasing interval until it receives a successful
2199    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2200    async fn send_signed_capability_notification_to_committee_with_retry(
2201        &self,
2202        epoch_store: &Arc<AuthorityPerEpochStore>,
2203    ) {
2204        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2205        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2206        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2207
2208        // Create the capability notification once
2209        let config = epoch_store.protocol_config();
2210        let binary_config = to_binary_config(config);
2211
2212        // Create the capability notification
2213        let capabilities = AuthorityCapabilitiesV1::new(
2214            self.state.name,
2215            epoch_store.get_chain_identifier().chain(),
2216            self.config
2217                .supported_protocol_versions
2218                .expect("Supported versions should be populated")
2219                .truncate_below(config.version),
2220            self.state
2221                .get_available_system_packages(&binary_config)
2222                .await,
2223        );
2224
2225        // Sign the capabilities using the authority key pair from config
2226        let signature = AuthoritySignature::new_secure(
2227            &IntentMessage::new(
2228                Intent::iota_app(IntentScope::AuthorityCapabilities),
2229                &capabilities,
2230            ),
2231            &epoch_store.epoch(),
2232            self.config.authority_key_pair(),
2233        );
2234
2235        let request = HandleCapabilityNotificationRequestV1 {
2236            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2237        };
2238
2239        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2240
2241        loop {
2242            let auth_agg = self.auth_agg.load();
2243            match auth_agg
2244                .send_capability_notification_to_quorum(request.clone())
2245                .await
2246            {
2247                Ok(_) => {
2248                    info!("Successfully sent capability notification to committee");
2249                    break;
2250                }
2251                Err(err) => {
2252                    match &err {
2253                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2254                            errors,
2255                        } => {
2256                            warn!(
2257                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2258                                retry_interval, errors
2259                            );
2260                        }
2261                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2262                            errors,
2263                        } => {
2264                            error!(
2265                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2266                                errors
2267                            );
2268                            break;
2269                        }
2270                    };
2271
2272                    // Wait before retrying
2273                    tokio::time::sleep(retry_interval).await;
2274
2275                    // Increase retry interval for the next attempt, capped at max
2276                    retry_interval = std::cmp::min(
2277                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2278                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2279                    );
2280                }
2281            }
2282        }
2283    }
2284}
2285
2286#[cfg(not(msim))]
2287impl IotaNode {
2288    async fn fetch_jwks(
2289        _authority: AuthorityName,
2290        provider: &OIDCProvider,
2291    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2292        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2293        let client = reqwest::Client::new();
2294        fetch_jwks(provider, &client)
2295            .await
2296            .map_err(|_| IotaError::JWKRetrieval)
2297    }
2298}
2299
2300#[cfg(msim)]
2301impl IotaNode {
2302    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2303        self.sim_state.sim_node.id()
2304    }
2305
2306    pub fn set_safe_mode_expected(&self, new_value: bool) {
2307        info!("Setting safe mode expected to {}", new_value);
2308        self.sim_state
2309            .sim_safe_mode_expected
2310            .store(new_value, Ordering::Relaxed);
2311    }
2312
2313    async fn fetch_jwks(
2314        authority: AuthorityName,
2315        provider: &OIDCProvider,
2316    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2317        get_jwk_injector()(authority, provider)
2318    }
2319}
2320
2321enum SpawnOnce {
2322    // Mutex is only needed to make SpawnOnce Sync
2323    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2324    #[allow(unused)]
2325    Started(iota_http::ServerHandle),
2326}
2327
2328impl SpawnOnce {
2329    pub fn new(
2330        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2331    ) -> Self {
2332        Self::Unstarted(Mutex::new(Box::pin(future)))
2333    }
2334
2335    pub async fn start(self) -> Self {
2336        match self {
2337            Self::Unstarted(future) => {
2338                let server = future
2339                    .into_inner()
2340                    .await
2341                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2342                let handle = server.handle().clone();
2343                tokio::spawn(async move {
2344                    if let Err(err) = server.serve().await {
2345                        info!("Server stopped: {err}");
2346                    }
2347                    info!("Server stopped");
2348                });
2349                Self::Started(handle)
2350            }
2351            Self::Started(_) => self,
2352        }
2353    }
2354
2355    pub fn shutdown(self) {
2356        if let SpawnOnce::Started(handle) = self {
2357            handle.trigger_shutdown();
2358        }
2359    }
2360}
2361
2362/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2363/// available.
2364fn send_trusted_peer_change(
2365    config: &NodeConfig,
2366    sender: &watch::Sender<TrustedPeerChangeEvent>,
2367    new_epoch_start_state: &EpochStartSystemState,
2368) {
2369    let new_committee =
2370        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2371
2372    sender.send_modify(|event| {
2373        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2374        event.new_committee = new_committee;
2375    })
2376}
2377
2378fn build_kv_store(
2379    state: &Arc<AuthorityState>,
2380    config: &NodeConfig,
2381    registry: &Registry,
2382) -> Result<Arc<TransactionKeyValueStore>> {
2383    let metrics = KeyValueStoreMetrics::new(registry);
2384    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2385
2386    let base_url = &config.transaction_kv_store_read_config.base_url;
2387
2388    if base_url.is_empty() {
2389        info!("no http kv store url provided, using local db only");
2390        return Ok(Arc::new(db_store));
2391    }
2392
2393    base_url.parse::<url::Url>().tap_err(|e| {
2394        error!(
2395            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2396            base_url, e
2397        )
2398    })?;
2399
2400    let http_store = HttpKVStore::new_kv(
2401        base_url,
2402        config.transaction_kv_store_read_config.cache_size,
2403        metrics.clone(),
2404    )?;
2405    info!("using local key-value store with fallback to http key-value store");
2406    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2407        db_store,
2408        http_store,
2409        metrics,
2410        "json_rpc_fallback",
2411    )))
2412}
2413
2414/// Builds and starts the gRPC server for the IOTA node based on the node's
2415/// configuration.
2416///
2417/// This function performs the following tasks:
2418/// 1. Checks if the node is a validator by inspecting the consensus
2419///    configuration; if so, it returns early as validators do not expose gRPC
2420///    APIs.
2421/// 2. Checks if gRPC is enabled in the configuration.
2422/// 3. Creates broadcast channels for checkpoint streaming.
2423/// 4. Initializes the gRPC checkpoint service.
2424/// 5. Spawns the gRPC server to listen for incoming connections.
2425///
2426/// Returns a tuple of optional broadcast channels for checkpoint summary and
2427/// data.
2428async fn build_grpc_server(
2429    config: &NodeConfig,
2430    state: Arc<AuthorityState>,
2431    state_sync_store: RocksDbStore,
2432    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2433    prometheus_registry: &Registry,
2434) -> Result<Option<GrpcServerHandle>> {
2435    // Validators do not expose gRPC APIs
2436    if config.consensus_config().is_some() || !config.enable_grpc_api {
2437        return Ok(None);
2438    }
2439
2440    let Some(grpc_config) = &config.grpc_api_config else {
2441        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2442    };
2443
2444    // Get chain identifier from state directly
2445    let chain_id = state.get_chain_identifier();
2446
2447    let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2448
2449    // Create cancellation token for proper shutdown hierarchy
2450    let shutdown_token = CancellationToken::new();
2451
2452    // Create GrpcReader
2453    let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(
2454        rest_read_store,
2455        Some(env!("CARGO_PKG_VERSION").to_string()),
2456    ));
2457
2458    // Create gRPC server metrics
2459    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2460
2461    // Pass the same token to both GrpcReader (already done above) and
2462    // start_grpc_server
2463    let handle = start_grpc_server(
2464        grpc_reader,
2465        executor,
2466        grpc_config.clone(),
2467        shutdown_token,
2468        chain_id,
2469        Some(grpc_server_metrics),
2470    )
2471    .await?;
2472
2473    Ok(Some(handle))
2474}
2475
2476/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2477/// REST APIs based on the node's configuration.
2478///
2479/// This function performs the following tasks:
2480/// 1. Checks if the node is a validator by inspecting the consensus
2481///    configuration; if so, it returns early as validators do not expose these
2482///    APIs.
2483/// 2. Creates an Axum router to handle HTTP requests.
2484/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2485///    on the node's state and configuration, including CoinApi,
2486///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2487///    IndexerApi.
2488/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2489///    the `/api/v1` path.
2490/// 5. Binds the server to the specified JSON-RPC address and starts listening
2491///    for incoming connections.
2492pub async fn build_http_server(
2493    state: Arc<AuthorityState>,
2494    store: RocksDbStore,
2495    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2496    config: &NodeConfig,
2497    prometheus_registry: &Registry,
2498    _custom_runtime: Option<Handle>,
2499    software_version: &'static str,
2500) -> Result<Option<iota_http::ServerHandle>> {
2501    // Validators do not expose these APIs
2502    if config.consensus_config().is_some() {
2503        return Ok(None);
2504    }
2505
2506    let mut router = axum::Router::new();
2507
2508    let json_rpc_router = {
2509        let mut server = JsonRpcServerBuilder::new(
2510            env!("CARGO_PKG_VERSION"),
2511            prometheus_registry,
2512            config.policy_config.clone(),
2513            config.firewall_config.clone(),
2514        );
2515
2516        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2517
2518        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2519        server.register_module(ReadApi::new(
2520            state.clone(),
2521            kv_store.clone(),
2522            metrics.clone(),
2523        ))?;
2524        server.register_module(CoinReadApi::new(
2525            state.clone(),
2526            kv_store.clone(),
2527            metrics.clone(),
2528        )?)?;
2529
2530        // if run_with_range is enabled we want to prevent any transactions
2531        // run_with_range = None is normal operating conditions
2532        if config.run_with_range.is_none() {
2533            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2534        }
2535        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2536
2537        if let Some(transaction_orchestrator) = transaction_orchestrator {
2538            server.register_module(TransactionExecutionApi::new(
2539                state.clone(),
2540                transaction_orchestrator.clone(),
2541                metrics.clone(),
2542            ))?;
2543        }
2544
2545        let iota_names_config = config
2546            .iota_names_config
2547            .clone()
2548            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2549
2550        server.register_module(IndexerApi::new(
2551            state.clone(),
2552            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2553            kv_store,
2554            metrics,
2555            iota_names_config,
2556            config.indexer_max_subscriptions,
2557        ))?;
2558        server.register_module(MoveUtils::new(state.clone()))?;
2559
2560        let server_type = config.jsonrpc_server_type();
2561
2562        server.to_router(server_type).await?
2563    };
2564
2565    router = router.merge(json_rpc_router);
2566
2567    if config.enable_rest_api {
2568        let mut rest_service = iota_rest_api::RestService::new(
2569            Arc::new(RestReadStore::new(state.clone(), store)),
2570            software_version,
2571        );
2572
2573        if let Some(config) = config.rest.clone() {
2574            rest_service.with_config(config);
2575        }
2576
2577        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2578
2579        if let Some(transaction_orchestrator) = transaction_orchestrator {
2580            rest_service.with_executor(transaction_orchestrator.clone())
2581        }
2582
2583        router = router.merge(rest_service.into_router());
2584    }
2585
2586    // TODO: Remove this health check when experimental REST API becomes default
2587    // This is a copy of the health check in crates/iota-rest-api/src/health.rs
2588    router = router
2589        .route("/health", axum::routing::get(health_check_handler))
2590        .route_layer(axum::Extension(state));
2591
2592    let layers = ServiceBuilder::new()
2593        .map_request(|mut request: axum::http::Request<_>| {
2594            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2595                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2596                request.extensions_mut().insert(axum_connect_info);
2597            }
2598            request
2599        })
2600        .layer(axum::middleware::from_fn(server_timing_middleware));
2601
2602    router = router.layer(layers);
2603
2604    let handle = iota_http::Builder::new()
2605        .serve(&config.json_rpc_address, router)
2606        .map_err(|e| anyhow::anyhow!("{e}"))?;
2607    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2608
2609    Ok(Some(handle))
2610}
2611
2612#[derive(Debug, serde::Serialize, serde::Deserialize)]
2613pub struct Threshold {
2614    pub threshold_seconds: Option<u32>,
2615}
2616
2617async fn health_check_handler(
2618    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2619    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2620) -> impl axum::response::IntoResponse {
2621    if let Some(threshold_seconds) = threshold_seconds {
2622        // Attempt to get the latest checkpoint
2623        let summary = match state
2624            .get_checkpoint_store()
2625            .get_highest_executed_checkpoint()
2626        {
2627            Ok(Some(summary)) => summary,
2628            Ok(None) => {
2629                warn!("Highest executed checkpoint not found");
2630                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2631            }
2632            Err(err) => {
2633                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2634                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2635            }
2636        };
2637
2638        // Calculate the threshold time based on the provided threshold_seconds
2639        let latest_chain_time = summary.timestamp();
2640        let threshold =
2641            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2642
2643        // Check if the latest checkpoint is within the threshold
2644        if latest_chain_time < threshold {
2645            warn!(
2646                ?latest_chain_time,
2647                ?threshold,
2648                "failing health check due to checkpoint lag"
2649            );
2650            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2651        }
2652    }
2653    // if health endpoint is responding and no threshold is given, respond success
2654    (axum::http::StatusCode::OK, "up")
2655}
2656
2657#[cfg(not(test))]
2658fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2659    protocol_config.max_transactions_per_checkpoint() as usize
2660}
2661
2662#[cfg(test)]
2663fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2664    2
2665}