iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::{BTreeSet, HashMap, HashSet},
9    fmt,
10    future::Future,
11    path::PathBuf,
12    str::FromStr,
13    sync::{Arc, Weak},
14    time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19    callback::CallbackLayer,
20    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30    ConsensusConfig, NodeConfig,
31    node::{DBCheckpointConfig, RunWithRange},
32    node_config_metrics::NodeConfigMetrics,
33    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36    authority::{
37        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38        authority_per_epoch_store::AuthorityPerEpochStore,
39        authority_store_pruner::ObjectsCompactionFilter,
40        authority_store_tables::{
41            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42        },
43        backpressure::BackpressureManager,
44        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45    },
46    authority_aggregator::{
47        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48    },
49    authority_client::NetworkAuthorityClient,
50    authority_server::{ValidatorService, ValidatorServiceMetrics},
51    checkpoints::{
52        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53        SubmitCheckpointToConsensus,
54        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55    },
56    connection_monitor::ConnectionMonitor,
57    consensus_adapter::{
58        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59        ConsensusClient,
60    },
61    consensus_handler::ConsensusHandlerInitializer,
62    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64    db_checkpoint_handler::DBCheckpointHandler,
65    epoch::{
66        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68        reconfiguration::ReconfigurationInitiator,
69    },
70    execution_cache::build_execution_cache,
71    jsonrpc_index::IndexStore,
72    module_cache_metrics::ResolverMetrics,
73    overload_monitor::overload_monitor,
74    rest_index::RestIndexStore,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78    storage::{RestReadStore, RocksDbStore},
79    traffic_controller::metrics::TrafficControllerMetrics,
80    transaction_orchestrator::TransactionOrchestrator,
81    validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87    transaction_builder_api::TransactionBuilderApi,
88    transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93    RegistryID, RegistryService,
94    hardware_metrics::register_hardware_metrics,
95    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96    server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::{RestMetrics, ServerVersion};
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108    FileCompression, StorageFormat,
109    http_key_value_store::HttpKVStore,
110    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111    key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114    base_types::{AuthorityName, ConciseableName, EpochId},
115    committee::Committee,
116    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117    digests::ChainIdentifier,
118    error::{IotaError, IotaResult},
119    executable_transaction::VerifiedExecutableTransaction,
120    execution_config_utils::to_binary_config,
121    full_checkpoint_content::CheckpointData,
122    iota_system_state::{
123        IotaSystemState, IotaSystemStateTrait,
124        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125    },
126    messages_consensus::{
127        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
128        SignedAuthorityCapabilitiesV1, check_total_jwk_size,
129    },
130    messages_grpc::HandleCapabilityNotificationRequestV1,
131    quorum_driver_types::QuorumDriverEffectsQueueResult,
132    supported_protocol_versions::SupportedProtocolVersions,
133    transaction::{Transaction, VerifiedCertificate},
134};
135use prometheus::Registry;
136#[cfg(msim)]
137pub use simulator::set_jwk_injector;
138#[cfg(msim)]
139use simulator::*;
140use tap::tap::TapFallible;
141use tokio::{
142    sync::{Mutex, broadcast, mpsc, watch},
143    task::{JoinHandle, JoinSet},
144};
145use tokio_util::sync::CancellationToken;
146use tower::ServiceBuilder;
147use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
148use typed_store::{
149    DBMetrics,
150    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
151};
152
153use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
154
155pub mod admin;
156mod handle;
157pub mod metrics;
158
159pub struct ValidatorComponents {
160    validator_server_handle: SpawnOnce,
161    validator_overload_monitor_handle: Option<JoinHandle<()>>,
162    consensus_manager: ConsensusManager,
163    consensus_store_pruner: ConsensusStorePruner,
164    consensus_adapter: Arc<ConsensusAdapter>,
165    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
166    checkpoint_service_tasks: JoinSet<()>,
167    checkpoint_metrics: Arc<CheckpointMetrics>,
168    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
169    validator_registry_id: RegistryID,
170}
171
172#[cfg(msim)]
173mod simulator {
174    use std::sync::atomic::AtomicBool;
175
176    use super::*;
177
178    pub(super) struct SimState {
179        pub sim_node: iota_simulator::runtime::NodeHandle,
180        pub sim_safe_mode_expected: AtomicBool,
181        _leak_detector: iota_simulator::NodeLeakDetector,
182    }
183
184    impl Default for SimState {
185        fn default() -> Self {
186            Self {
187                sim_node: iota_simulator::runtime::NodeHandle::current(),
188                sim_safe_mode_expected: AtomicBool::new(false),
189                _leak_detector: iota_simulator::NodeLeakDetector::new(),
190            }
191        }
192    }
193
194    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
195        + Send
196        + Sync
197        + 'static;
198
199    fn default_fetch_jwks(
200        _authority: AuthorityName,
201        _provider: &OIDCProvider,
202    ) -> IotaResult<Vec<(JwkId, JWK)>> {
203        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
204        // Just load a default Twitch jwk for testing.
205        parse_jwks(
206            iota_types::zk_login_util::DEFAULT_JWK_BYTES,
207            &OIDCProvider::Twitch,
208        )
209        .map_err(|_| IotaError::JWKRetrieval)
210    }
211
212    thread_local! {
213        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
214    }
215
216    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
217        JWK_INJECTOR.with(|injector| injector.borrow().clone())
218    }
219
220    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
221        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
222    }
223}
224
225pub struct IotaNode {
226    config: NodeConfig,
227    validator_components: Mutex<Option<ValidatorComponents>>,
228    /// The http server responsible for serving JSON-RPC as well as the
229    /// experimental rest service
230    _http_server: Option<iota_http::ServerHandle>,
231    state: Arc<AuthorityState>,
232    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
233    registry_service: RegistryService,
234    metrics: Arc<IotaNodeMetrics>,
235
236    _discovery: discovery::Handle,
237    state_sync_handle: state_sync::Handle,
238    randomness_handle: randomness::Handle,
239    checkpoint_store: Arc<CheckpointStore>,
240    accumulator: Mutex<Option<Arc<StateAccumulator>>>,
241    connection_monitor_status: Arc<ConnectionMonitorStatus>,
242
243    /// Broadcast channel to send the starting system state for the next epoch.
244    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
245
246    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
247    /// peers.
248    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
249
250    backpressure_manager: Arc<BackpressureManager>,
251
252    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
253
254    #[cfg(msim)]
255    sim_state: SimState,
256
257    _state_archive_handle: Option<broadcast::Sender<()>>,
258
259    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
260    // Channel to allow signaling upstream to shutdown iota-node
261    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
262
263    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
264    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
265
266    /// AuthorityAggregator of the network, created at start and beginning of
267    /// each epoch. Use ArcSwap so that we could mutate it without taking
268    /// mut reference.
269    // TODO: Eventually we can make this auth aggregator a shared reference so that this
270    // update will automatically propagate to other uses.
271    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
272}
273
274impl fmt::Debug for IotaNode {
275    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
276        f.debug_struct("IotaNode")
277            .field("name", &self.state.name.concise())
278            .finish()
279    }
280}
281
282static MAX_JWK_KEYS_PER_FETCH: usize = 100;
283
284impl IotaNode {
285    pub async fn start(
286        config: NodeConfig,
287        registry_service: RegistryService,
288    ) -> Result<Arc<IotaNode>> {
289        Self::start_async(
290            config,
291            registry_service,
292            ServerVersion::new("iota-node", "unknown"),
293        )
294        .await
295    }
296
297    /// Starts the JWK (JSON Web Key) updater tasks for the specified node
298    /// configuration.
299    /// This function ensures continuous fetching, validation, and submission of
300    /// JWKs, maintaining up-to-date keys for the specified providers.
301    fn start_jwk_updater(
302        config: &NodeConfig,
303        metrics: Arc<IotaNodeMetrics>,
304        authority: AuthorityName,
305        epoch_store: Arc<AuthorityPerEpochStore>,
306        consensus_adapter: Arc<ConsensusAdapter>,
307    ) {
308        let epoch = epoch_store.epoch();
309
310        let supported_providers = config
311            .zklogin_oauth_providers
312            .get(&epoch_store.get_chain_identifier().chain())
313            .unwrap_or(&BTreeSet::new())
314            .iter()
315            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
316            .collect::<Vec<_>>();
317
318        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
319
320        info!(
321            ?fetch_interval,
322            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
323        );
324
325        fn validate_jwk(
326            metrics: &Arc<IotaNodeMetrics>,
327            provider: &OIDCProvider,
328            id: &JwkId,
329            jwk: &JWK,
330        ) -> bool {
331            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
332                warn!(
333                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
334                    id.iss, provider
335                );
336                metrics
337                    .invalid_jwks
338                    .with_label_values(&[&provider.to_string()])
339                    .inc();
340                return false;
341            };
342
343            if iss_provider != *provider {
344                warn!(
345                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
346                    id.iss, provider, iss_provider
347                );
348                metrics
349                    .invalid_jwks
350                    .with_label_values(&[&provider.to_string()])
351                    .inc();
352                return false;
353            }
354
355            if !check_total_jwk_size(id, jwk) {
356                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
357                metrics
358                    .invalid_jwks
359                    .with_label_values(&[&provider.to_string()])
360                    .inc();
361                return false;
362            }
363
364            true
365        }
366
367        // metrics is:
368        //  pub struct IotaNodeMetrics {
369        //      pub jwk_requests: IntCounterVec,
370        //      pub jwk_request_errors: IntCounterVec,
371        //      pub total_jwks: IntCounterVec,
372        //      pub unique_jwks: IntCounterVec,
373        //  }
374
375        for p in supported_providers.into_iter() {
376            let provider_str = p.to_string();
377            let epoch_store = epoch_store.clone();
378            let consensus_adapter = consensus_adapter.clone();
379            let metrics = metrics.clone();
380            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
381                async move {
382                    // note: restart-safe de-duplication happens after consensus, this is
383                    // just best-effort to reduce unneeded submissions.
384                    let mut seen = HashSet::new();
385                    loop {
386                        info!("fetching JWK for provider {:?}", p);
387                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
388                        match Self::fetch_jwks(authority, &p).await {
389                            Err(e) => {
390                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
391                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
392                                // Retry in 30 seconds
393                                tokio::time::sleep(Duration::from_secs(30)).await;
394                                continue;
395                            }
396                            Ok(mut keys) => {
397                                metrics.total_jwks
398                                    .with_label_values(&[&provider_str])
399                                    .inc_by(keys.len() as u64);
400
401                                keys.retain(|(id, jwk)| {
402                                    validate_jwk(&metrics, &p, id, jwk) &&
403                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
404                                    seen.insert((id.clone(), jwk.clone()))
405                                });
406
407                                metrics.unique_jwks
408                                    .with_label_values(&[&provider_str])
409                                    .inc_by(keys.len() as u64);
410
411                                // prevent oauth providers from sending too many keys,
412                                // inadvertently or otherwise
413                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
414                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
415                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
416                                }
417
418                                for (id, jwk) in keys.into_iter() {
419                                    info!("Submitting JWK to consensus: {:?}", id);
420
421                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
422                                    consensus_adapter.submit(txn, None, &epoch_store)
423                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
424                                        .ok();
425                                }
426                            }
427                        }
428                        tokio::time::sleep(fetch_interval).await;
429                    }
430                }
431                .instrument(error_span!("jwk_updater_task", epoch)),
432            ));
433        }
434    }
435
436    pub async fn start_async(
437        config: NodeConfig,
438        registry_service: RegistryService,
439        server_version: ServerVersion,
440    ) -> Result<Arc<IotaNode>> {
441        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
442        let mut config = config.clone();
443        if config.supported_protocol_versions.is_none() {
444            info!(
445                "populating config.supported_protocol_versions with default {:?}",
446                SupportedProtocolVersions::SYSTEM_DEFAULT
447            );
448            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
449        }
450
451        let run_with_range = config.run_with_range;
452        let is_validator = config.consensus_config().is_some();
453        let is_full_node = !is_validator;
454        let prometheus_registry = registry_service.default_registry();
455
456        info!(node =? config.authority_public_key(),
457            "Initializing iota-node listening on {}", config.network_address
458        );
459
460        let genesis = config.genesis()?.clone();
461
462        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
463        info!("IOTA chain identifier: {chain_identifier}");
464
465        // Check and set the db_corrupted flag
466        let db_corrupted_path = &config.db_path().join("status");
467        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
468            panic!("Failed to check database corruption: {err}");
469        }
470
471        // Initialize metrics to track db usage before creating any stores
472        DBMetrics::init(&prometheus_registry);
473
474        // Initialize IOTA metrics.
475        iota_metrics::init_metrics(&prometheus_registry);
476        // Unsupported (because of the use of static variable) and unnecessary in
477        // simtests.
478        #[cfg(not(msim))]
479        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481        // Register hardware metrics.
482        register_hardware_metrics(&registry_service, &config.db_path)
483            .expect("Failed registering hardware metrics");
484        // Register uptime metric
485        prometheus_registry
486            .register(iota_metrics::uptime_metric(
487                if is_validator {
488                    "validator"
489                } else {
490                    "fullnode"
491                },
492                server_version.version,
493                &chain_identifier.to_string(),
494            ))
495            .expect("Failed registering uptime metric");
496
497        // If genesis come with some migration data then load them into memory from the
498        // file path specified in config.
499        let migration_tx_data = if genesis.contains_migrations() {
500            // Here the load already verifies that the content of the migration blob is
501            // valid in respect to the content found in genesis
502            Some(config.load_migration_tx_data()?)
503        } else {
504            None
505        };
506
507        let secret = Arc::pin(config.authority_key_pair().copy());
508        let genesis_committee = genesis.committee()?;
509        let committee_store = Arc::new(CommitteeStore::new(
510            config.db_path().join("epochs"),
511            &genesis_committee,
512            None,
513        ));
514
515        let mut pruner_db = None;
516        if config
517            .authority_store_pruning_config
518            .enable_compaction_filter
519        {
520            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
521                &config.db_path().join("store"),
522            )));
523        }
524        let compaction_filter = pruner_db
525            .clone()
526            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
527
528        // By default, only enable write stall on validators for perpetual db.
529        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
530        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
531            enable_write_stall,
532            compaction_filter,
533        };
534        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
535            &config.db_path().join("store"),
536            Some(perpetual_tables_options),
537        ));
538        let is_genesis = perpetual_tables
539            .database_is_empty()
540            .expect("Database read should not fail at init.");
541        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
542        let backpressure_manager =
543            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
544
545        let store = AuthorityStore::open(
546            perpetual_tables,
547            &genesis,
548            &config,
549            &prometheus_registry,
550            migration_tx_data.as_ref(),
551        )
552        .await?;
553
554        let cur_epoch = store.get_recovery_epoch_at_restart()?;
555        let committee = committee_store
556            .get_committee(&cur_epoch)?
557            .expect("Committee of the current epoch must exist");
558        let epoch_start_configuration = store
559            .get_epoch_start_configuration()?
560            .expect("EpochStartConfiguration of the current epoch must exist");
561        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
562        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
563
564        let cache_traits = build_execution_cache(
565            &config.execution_cache_config,
566            &epoch_start_configuration,
567            &prometheus_registry,
568            &store,
569            backpressure_manager.clone(),
570        );
571
572        let auth_agg = {
573            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
574            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
575            Arc::new(ArcSwap::new(Arc::new(
576                AuthorityAggregator::new_from_epoch_start_state(
577                    epoch_start_configuration.epoch_start_state(),
578                    &committee_store,
579                    safe_client_metrics_base,
580                    auth_agg_metrics,
581                ),
582            )))
583        };
584
585        let chain = match config.chain_override_for_testing {
586            Some(chain) => chain,
587            None => chain_identifier.chain(),
588        };
589
590        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
591        let epoch_store = AuthorityPerEpochStore::new(
592            config.authority_public_key(),
593            committee.clone(),
594            &config.db_path().join("store"),
595            Some(epoch_options.options),
596            EpochMetrics::new(&registry_service.default_registry()),
597            epoch_start_configuration,
598            cache_traits.backing_package_store.clone(),
599            cache_traits.object_store.clone(),
600            cache_metrics,
601            signature_verifier_metrics,
602            &config.expensive_safety_check_config,
603            (chain_identifier, chain),
604            checkpoint_store
605                .get_highest_executed_checkpoint_seq_number()
606                .expect("checkpoint store read cannot fail")
607                .unwrap_or(0),
608        )?;
609
610        info!("created epoch store");
611
612        replay_log!(
613            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
614            epoch_store.epoch(),
615            epoch_store.protocol_config()
616        );
617
618        // the database is empty at genesis time
619        if is_genesis {
620            info!("checking IOTA conservation at genesis");
621            // When we are opening the db table, the only time when it's safe to
622            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
623            // an epoch and the IOTA conservation check will fail. This also initialize
624            // the expected_network_iota_amount table.
625            cache_traits
626                .reconfig_api
627                .try_expensive_check_iota_conservation(&epoch_store, None)
628                .expect("IOTA conservation check cannot fail at genesis");
629        }
630
631        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
632        let default_buffer_stake = epoch_store
633            .protocol_config()
634            .buffer_stake_for_protocol_upgrade_bps();
635        if effective_buffer_stake != default_buffer_stake {
636            warn!(
637                ?effective_buffer_stake,
638                ?default_buffer_stake,
639                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
640            );
641        }
642
643        checkpoint_store.insert_genesis_checkpoint(
644            genesis.checkpoint(),
645            genesis.checkpoint_contents().clone(),
646            &epoch_store,
647        );
648
649        // Database has everything from genesis, set corrupted key to 0
650        unmark_db_corruption(db_corrupted_path)?;
651
652        info!("creating state sync store");
653        let state_sync_store = RocksDbStore::new(
654            cache_traits.clone(),
655            committee_store.clone(),
656            checkpoint_store.clone(),
657        );
658
659        let index_store = if is_full_node && config.enable_index_processing {
660            info!("creating index store");
661            Some(Arc::new(IndexStore::new(
662                config.db_path().join("indexes"),
663                &prometheus_registry,
664                epoch_store
665                    .protocol_config()
666                    .max_move_identifier_len_as_option(),
667            )))
668        } else {
669            None
670        };
671
672        let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
673        {
674            Some(Arc::new(
675                RestIndexStore::new(
676                    config.db_path().join("rest_index"),
677                    &store,
678                    &checkpoint_store,
679                    &epoch_store,
680                    &cache_traits.backing_package_store,
681                )
682                .await,
683            ))
684        } else {
685            None
686        };
687
688        info!("creating archive reader");
689        // Create network
690        // TODO only configure validators as seed/preferred peers for validators and not
691        // for fullnodes once we've had a chance to re-work fullnode
692        // configuration generation.
693        let archive_readers =
694            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
695        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
696        let (randomness_tx, randomness_rx) = mpsc::channel(
697            config
698                .p2p_config
699                .randomness
700                .clone()
701                .unwrap_or_default()
702                .mailbox_capacity(),
703        );
704        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
705            Self::create_p2p_network(
706                &config,
707                state_sync_store.clone(),
708                chain_identifier,
709                trusted_peer_change_rx,
710                archive_readers.clone(),
711                randomness_tx,
712                &prometheus_registry,
713            )?;
714
715        // We must explicitly send this instead of relying on the initial value to
716        // trigger watch value change, so that state-sync is able to process it.
717        send_trusted_peer_change(
718            &config,
719            &trusted_peer_change_tx,
720            epoch_store.epoch_start_state(),
721        );
722
723        info!("start state archival");
724        // Start archiving local state to remote store
725        let state_archive_handle =
726            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
727                .await?;
728
729        info!("start snapshot upload");
730        // Start uploading state snapshot to remote store
731        let state_snapshot_handle =
732            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
733
734        // Start uploading db checkpoints to remote store
735        info!("start db checkpoint");
736        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
737            &config,
738            &prometheus_registry,
739            state_snapshot_handle.is_some(),
740        )?;
741
742        let mut genesis_objects = genesis.objects().to_vec();
743        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
744            genesis_objects.extend(migration_tx_data.get_objects());
745        }
746
747        let authority_name = config.authority_public_key();
748        let validator_tx_finalizer =
749            config
750                .enable_validator_tx_finalizer
751                .then_some(Arc::new(ValidatorTxFinalizer::new(
752                    auth_agg.clone(),
753                    authority_name,
754                    &prometheus_registry,
755                )));
756
757        info!("create authority state");
758        let state = AuthorityState::new(
759            authority_name,
760            secret,
761            config.supported_protocol_versions.unwrap(),
762            store.clone(),
763            cache_traits.clone(),
764            epoch_store.clone(),
765            committee_store.clone(),
766            index_store.clone(),
767            rest_index,
768            checkpoint_store.clone(),
769            &prometheus_registry,
770            &genesis_objects,
771            &db_checkpoint_config,
772            config.clone(),
773            archive_readers,
774            validator_tx_finalizer,
775            chain_identifier,
776            pruner_db,
777        )
778        .await;
779
780        // ensure genesis and migration txs were executed
781        if epoch_store.epoch() == 0 {
782            let genesis_tx = &genesis.transaction();
783            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
784            // Execute genesis transaction
785            Self::execute_transaction_immediately_at_zero_epoch(
786                &state,
787                &epoch_store,
788                genesis_tx,
789                span,
790            )
791            .await;
792
793            // Execute migration transactions if present
794            if let Some(migration_tx_data) = migration_tx_data {
795                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
796                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
797                    Self::execute_transaction_immediately_at_zero_epoch(
798                        &state,
799                        &epoch_store,
800                        tx,
801                        span,
802                    )
803                    .await;
804                }
805            }
806        }
807
808        // Start the loop that receives new randomness and generates transactions for
809        // it.
810        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
811
812        if config
813            .expensive_safety_check_config
814            .enable_secondary_index_checks()
815        {
816            if let Some(indexes) = state.indexes.clone() {
817                iota_core::verify_indexes::verify_indexes(
818                    state.get_accumulator_store().as_ref(),
819                    indexes,
820                )
821                .expect("secondary indexes are inconsistent");
822            }
823        }
824
825        let (end_of_epoch_channel, end_of_epoch_receiver) =
826            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
827
828        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
829            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
830                auth_agg.load_full(),
831                state.clone(),
832                end_of_epoch_receiver,
833                &config.db_path(),
834                &prometheus_registry,
835            )))
836        } else {
837            None
838        };
839
840        let http_server = build_http_server(
841            state.clone(),
842            state_sync_store.clone(),
843            &transaction_orchestrator.clone(),
844            &config,
845            &prometheus_registry,
846            server_version.clone(),
847        )
848        .await?;
849
850        let accumulator = Arc::new(StateAccumulator::new(
851            cache_traits.accumulator_store.clone(),
852            StateAccumulatorMetrics::new(&prometheus_registry),
853        ));
854
855        let authority_names_to_peer_ids = epoch_store
856            .epoch_start_state()
857            .get_authority_names_to_peer_ids();
858
859        let network_connection_metrics =
860            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
861
862        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
863
864        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
865            p2p_network.downgrade(),
866            network_connection_metrics,
867            HashMap::new(),
868            None,
869        );
870
871        let connection_monitor_status = ConnectionMonitorStatus {
872            connection_statuses,
873            authority_names_to_peer_ids,
874        };
875
876        let connection_monitor_status = Arc::new(connection_monitor_status);
877        let iota_node_metrics =
878            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
879
880        // Convert transaction orchestrator to executor trait object for gRPC server
881        // Note that the transaction_orchestrator (so as executor) will be None if it is
882        // a validator node or run_with_range is set
883        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
884            transaction_orchestrator
885                .clone()
886                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
887
888        let grpc_server_handle = build_grpc_server(
889            &config,
890            state.clone(),
891            state_sync_store.clone(),
892            executor,
893            &registry_service.default_registry(),
894            server_version,
895        )
896        .await?;
897
898        let validator_components = if state.is_committee_validator(&epoch_store) {
899            let (components, _) = futures::join!(
900                Self::construct_validator_components(
901                    config.clone(),
902                    state.clone(),
903                    committee,
904                    epoch_store.clone(),
905                    checkpoint_store.clone(),
906                    state_sync_handle.clone(),
907                    randomness_handle.clone(),
908                    Arc::downgrade(&accumulator),
909                    backpressure_manager.clone(),
910                    connection_monitor_status.clone(),
911                    &registry_service,
912                    iota_node_metrics.clone(),
913                ),
914                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
915            );
916            let mut components = components?;
917
918            components.consensus_adapter.submit_recovered(&epoch_store);
919
920            // Start the gRPC server
921            components.validator_server_handle = components.validator_server_handle.start().await;
922
923            Some(components)
924        } else {
925            None
926        };
927
928        // setup shutdown channel
929        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
930
931        let node = Self {
932            config,
933            validator_components: Mutex::new(validator_components),
934            _http_server: http_server,
935            state,
936            transaction_orchestrator,
937            registry_service,
938            metrics: iota_node_metrics,
939
940            _discovery: discovery_handle,
941            state_sync_handle,
942            randomness_handle,
943            checkpoint_store,
944            accumulator: Mutex::new(Some(accumulator)),
945            end_of_epoch_channel,
946            connection_monitor_status,
947            trusted_peer_change_tx,
948            backpressure_manager,
949
950            _db_checkpoint_handle: db_checkpoint_handle,
951
952            #[cfg(msim)]
953            sim_state: Default::default(),
954
955            _state_archive_handle: state_archive_handle,
956            _state_snapshot_uploader_handle: state_snapshot_handle,
957            shutdown_channel_tx: shutdown_channel,
958
959            grpc_server_handle: Mutex::new(grpc_server_handle),
960
961            auth_agg,
962        };
963
964        info!("IotaNode started!");
965        let node = Arc::new(node);
966        let node_copy = node.clone();
967        spawn_monitored_task!(async move {
968            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
969            if let Err(error) = result {
970                warn!("Reconfiguration finished with error {:?}", error);
971            }
972        });
973
974        Ok(node)
975    }
976
977    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
978        self.end_of_epoch_channel.subscribe()
979    }
980
981    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
982        self.shutdown_channel_tx.subscribe()
983    }
984
985    pub fn current_epoch_for_testing(&self) -> EpochId {
986        self.state.current_epoch_for_testing()
987    }
988
989    pub fn db_checkpoint_path(&self) -> PathBuf {
990        self.config.db_checkpoint_path()
991    }
992
993    // Init reconfig process by starting to reject user certs
994    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
995        info!("close_epoch (current epoch = {})", epoch_store.epoch());
996        self.validator_components
997            .lock()
998            .await
999            .as_ref()
1000            .ok_or_else(|| IotaError::from("Node is not a validator"))?
1001            .consensus_adapter
1002            .close_epoch(epoch_store);
1003        Ok(())
1004    }
1005
1006    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1007        self.state
1008            .clear_override_protocol_upgrade_buffer_stake(epoch)
1009    }
1010
1011    pub fn set_override_protocol_upgrade_buffer_stake(
1012        &self,
1013        epoch: EpochId,
1014        buffer_stake_bps: u64,
1015    ) -> IotaResult {
1016        self.state
1017            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1018    }
1019
1020    // Testing-only API to start epoch close process.
1021    // For production code, please use the non-testing version.
1022    pub async fn close_epoch_for_testing(&self) -> IotaResult {
1023        let epoch_store = self.state.epoch_store_for_testing();
1024        self.close_epoch(&epoch_store).await
1025    }
1026
1027    async fn start_state_archival(
1028        config: &NodeConfig,
1029        prometheus_registry: &Registry,
1030        state_sync_store: RocksDbStore,
1031    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1032        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1033            let local_store_config = ObjectStoreConfig {
1034                object_store: Some(ObjectStoreType::File),
1035                directory: Some(config.archive_path()),
1036                ..Default::default()
1037            };
1038            let archive_writer = ArchiveWriter::new(
1039                local_store_config,
1040                remote_store_config.clone(),
1041                FileCompression::Zstd,
1042                StorageFormat::Blob,
1043                Duration::from_secs(600),
1044                256 * 1024 * 1024,
1045                prometheus_registry,
1046            )
1047            .await?;
1048            Ok(Some(archive_writer.start(state_sync_store).await?))
1049        } else {
1050            Ok(None)
1051        }
1052    }
1053
1054    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
1055    /// is set.
1056    fn start_state_snapshot(
1057        config: &NodeConfig,
1058        prometheus_registry: &Registry,
1059        checkpoint_store: Arc<CheckpointStore>,
1060    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1061        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1062            let snapshot_uploader = StateSnapshotUploader::new(
1063                &config.db_checkpoint_path(),
1064                &config.snapshot_path(),
1065                remote_store_config.clone(),
1066                60,
1067                prometheus_registry,
1068                checkpoint_store,
1069            )?;
1070            Ok(Some(snapshot_uploader.start()))
1071        } else {
1072            Ok(None)
1073        }
1074    }
1075
1076    fn start_db_checkpoint(
1077        config: &NodeConfig,
1078        prometheus_registry: &Registry,
1079        state_snapshot_enabled: bool,
1080    ) -> Result<(
1081        DBCheckpointConfig,
1082        Option<tokio::sync::broadcast::Sender<()>>,
1083    )> {
1084        let checkpoint_path = Some(
1085            config
1086                .db_checkpoint_config
1087                .checkpoint_path
1088                .clone()
1089                .unwrap_or_else(|| config.db_checkpoint_path()),
1090        );
1091        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1092            DBCheckpointConfig {
1093                checkpoint_path,
1094                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1095                    true
1096                } else {
1097                    config
1098                        .db_checkpoint_config
1099                        .perform_db_checkpoints_at_epoch_end
1100                },
1101                ..config.db_checkpoint_config.clone()
1102            }
1103        } else {
1104            config.db_checkpoint_config.clone()
1105        };
1106
1107        match (
1108            db_checkpoint_config.object_store_config.as_ref(),
1109            state_snapshot_enabled,
1110        ) {
1111            // If db checkpoint config object store not specified but
1112            // state snapshot object store is specified, create handler
1113            // anyway for marking db checkpoints as completed so that they
1114            // can be uploaded as state snapshots.
1115            (None, false) => Ok((db_checkpoint_config, None)),
1116            (_, _) => {
1117                let handler = DBCheckpointHandler::new(
1118                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1119                    db_checkpoint_config.object_store_config.as_ref(),
1120                    60,
1121                    db_checkpoint_config
1122                        .prune_and_compact_before_upload
1123                        .unwrap_or(true),
1124                    config.authority_store_pruning_config.clone(),
1125                    prometheus_registry,
1126                    state_snapshot_enabled,
1127                )?;
1128                Ok((
1129                    db_checkpoint_config,
1130                    Some(DBCheckpointHandler::start(handler)),
1131                ))
1132            }
1133        }
1134    }
1135
1136    fn create_p2p_network(
1137        config: &NodeConfig,
1138        state_sync_store: RocksDbStore,
1139        chain_identifier: ChainIdentifier,
1140        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1141        archive_readers: ArchiveReaderBalancer,
1142        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1143        prometheus_registry: &Registry,
1144    ) -> Result<(
1145        Network,
1146        discovery::Handle,
1147        state_sync::Handle,
1148        randomness::Handle,
1149    )> {
1150        let (state_sync, state_sync_server) = state_sync::Builder::new()
1151            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1152            .store(state_sync_store)
1153            .archive_readers(archive_readers)
1154            .with_metrics(prometheus_registry)
1155            .build();
1156
1157        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1158            .config(config.p2p_config.clone())
1159            .build();
1160
1161        let (randomness, randomness_router) =
1162            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1163                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1164                .with_metrics(prometheus_registry)
1165                .build();
1166
1167        let p2p_network = {
1168            let routes = anemo::Router::new()
1169                .add_rpc_service(discovery_server)
1170                .add_rpc_service(state_sync_server);
1171            let routes = routes.merge(randomness_router);
1172
1173            let inbound_network_metrics =
1174                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1175            let outbound_network_metrics =
1176                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1177
1178            let service = ServiceBuilder::new()
1179                .layer(
1180                    TraceLayer::new_for_server_errors()
1181                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1182                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1183                )
1184                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1185                    Arc::new(inbound_network_metrics),
1186                    config.p2p_config.excessive_message_size(),
1187                )))
1188                .service(routes);
1189
1190            let outbound_layer = ServiceBuilder::new()
1191                .layer(
1192                    TraceLayer::new_for_client_and_server_errors()
1193                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1194                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1195                )
1196                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1197                    Arc::new(outbound_network_metrics),
1198                    config.p2p_config.excessive_message_size(),
1199                )))
1200                .into_inner();
1201
1202            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1203            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1204            // many staking events in the epoch change txn.
1205            anemo_config.max_frame_size = Some(1 << 30);
1206
1207            // Set a higher default value for socket send/receive buffers if not already
1208            // configured.
1209            let mut quic_config = anemo_config.quic.unwrap_or_default();
1210            if quic_config.socket_send_buffer_size.is_none() {
1211                quic_config.socket_send_buffer_size = Some(20 << 20);
1212            }
1213            if quic_config.socket_receive_buffer_size.is_none() {
1214                quic_config.socket_receive_buffer_size = Some(20 << 20);
1215            }
1216            quic_config.allow_failed_socket_buffer_size_setting = true;
1217
1218            // Set high-performance defaults for quinn transport.
1219            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1220            if quic_config.max_concurrent_bidi_streams.is_none() {
1221                quic_config.max_concurrent_bidi_streams = Some(500);
1222            }
1223            if quic_config.max_concurrent_uni_streams.is_none() {
1224                quic_config.max_concurrent_uni_streams = Some(500);
1225            }
1226            if quic_config.stream_receive_window.is_none() {
1227                quic_config.stream_receive_window = Some(100 << 20);
1228            }
1229            if quic_config.receive_window.is_none() {
1230                quic_config.receive_window = Some(200 << 20);
1231            }
1232            if quic_config.send_window.is_none() {
1233                quic_config.send_window = Some(200 << 20);
1234            }
1235            if quic_config.crypto_buffer_size.is_none() {
1236                quic_config.crypto_buffer_size = Some(1 << 20);
1237            }
1238            if quic_config.max_idle_timeout_ms.is_none() {
1239                quic_config.max_idle_timeout_ms = Some(30_000);
1240            }
1241            if quic_config.keep_alive_interval_ms.is_none() {
1242                quic_config.keep_alive_interval_ms = Some(5_000);
1243            }
1244            anemo_config.quic = Some(quic_config);
1245
1246            let server_name = format!("iota-{chain_identifier}");
1247            let network = Network::bind(config.p2p_config.listen_address)
1248                .server_name(&server_name)
1249                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1250                .config(anemo_config)
1251                .outbound_request_layer(outbound_layer)
1252                .start(service)?;
1253            info!(
1254                server_name = server_name,
1255                "P2p network started on {}",
1256                network.local_addr()
1257            );
1258
1259            network
1260        };
1261
1262        let discovery_handle =
1263            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1264        let state_sync_handle = state_sync.start(p2p_network.clone());
1265        let randomness_handle = randomness.start(p2p_network.clone());
1266
1267        Ok((
1268            p2p_network,
1269            discovery_handle,
1270            state_sync_handle,
1271            randomness_handle,
1272        ))
1273    }
1274
1275    /// Asynchronously constructs and initializes the components necessary for
1276    /// the validator node.
1277    async fn construct_validator_components(
1278        config: NodeConfig,
1279        state: Arc<AuthorityState>,
1280        committee: Arc<Committee>,
1281        epoch_store: Arc<AuthorityPerEpochStore>,
1282        checkpoint_store: Arc<CheckpointStore>,
1283        state_sync_handle: state_sync::Handle,
1284        randomness_handle: randomness::Handle,
1285        accumulator: Weak<StateAccumulator>,
1286        backpressure_manager: Arc<BackpressureManager>,
1287        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1288        registry_service: &RegistryService,
1289        iota_node_metrics: Arc<IotaNodeMetrics>,
1290    ) -> Result<ValidatorComponents> {
1291        let mut config_clone = config.clone();
1292        let consensus_config = config_clone
1293            .consensus_config
1294            .as_mut()
1295            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1296        let validator_registry = Registry::new();
1297        let validator_registry_id = registry_service.add(validator_registry.clone());
1298
1299        let client = Arc::new(UpdatableConsensusClient::new());
1300        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1301            &committee,
1302            consensus_config,
1303            state.name,
1304            connection_monitor_status.clone(),
1305            &validator_registry,
1306            client.clone(),
1307            checkpoint_store.clone(),
1308        ));
1309        let consensus_manager = ConsensusManager::new(
1310            &config,
1311            consensus_config,
1312            registry_service,
1313            &validator_registry,
1314            client,
1315        );
1316
1317        // This only gets started up once, not on every epoch. (Make call to remove
1318        // every epoch.)
1319        let consensus_store_pruner = ConsensusStorePruner::new(
1320            consensus_manager.get_storage_base_path(),
1321            consensus_config.db_retention_epochs(),
1322            consensus_config.db_pruner_period(),
1323            &validator_registry,
1324        );
1325
1326        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1327        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1328
1329        let validator_server_handle = Self::start_grpc_validator_service(
1330            &config,
1331            state.clone(),
1332            consensus_adapter.clone(),
1333            &validator_registry,
1334        )
1335        .await?;
1336
1337        // Starts an overload monitor that monitors the execution of the authority.
1338        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1339        let validator_overload_monitor_handle = if config
1340            .authority_overload_config
1341            .max_load_shedding_percentage
1342            > 0
1343        {
1344            let authority_state = Arc::downgrade(&state);
1345            let overload_config = config.authority_overload_config.clone();
1346            fail_point!("starting_overload_monitor");
1347            Some(spawn_monitored_task!(overload_monitor(
1348                authority_state,
1349                overload_config,
1350            )))
1351        } else {
1352            None
1353        };
1354
1355        Self::start_epoch_specific_validator_components(
1356            &config,
1357            state.clone(),
1358            consensus_adapter,
1359            checkpoint_store,
1360            epoch_store,
1361            state_sync_handle,
1362            randomness_handle,
1363            consensus_manager,
1364            consensus_store_pruner,
1365            accumulator,
1366            backpressure_manager,
1367            validator_server_handle,
1368            validator_overload_monitor_handle,
1369            checkpoint_metrics,
1370            iota_node_metrics,
1371            iota_tx_validator_metrics,
1372            validator_registry_id,
1373        )
1374        .await
1375    }
1376
1377    /// Initializes and starts components specific to the current
1378    /// epoch for the validator node.
1379    async fn start_epoch_specific_validator_components(
1380        config: &NodeConfig,
1381        state: Arc<AuthorityState>,
1382        consensus_adapter: Arc<ConsensusAdapter>,
1383        checkpoint_store: Arc<CheckpointStore>,
1384        epoch_store: Arc<AuthorityPerEpochStore>,
1385        state_sync_handle: state_sync::Handle,
1386        randomness_handle: randomness::Handle,
1387        consensus_manager: ConsensusManager,
1388        consensus_store_pruner: ConsensusStorePruner,
1389        accumulator: Weak<StateAccumulator>,
1390        backpressure_manager: Arc<BackpressureManager>,
1391        validator_server_handle: SpawnOnce,
1392        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1393        checkpoint_metrics: Arc<CheckpointMetrics>,
1394        iota_node_metrics: Arc<IotaNodeMetrics>,
1395        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1396        validator_registry_id: RegistryID,
1397    ) -> Result<ValidatorComponents> {
1398        let checkpoint_service = Self::build_checkpoint_service(
1399            config,
1400            consensus_adapter.clone(),
1401            checkpoint_store.clone(),
1402            epoch_store.clone(),
1403            state.clone(),
1404            state_sync_handle,
1405            accumulator,
1406            checkpoint_metrics.clone(),
1407        );
1408
1409        // create a new map that gets injected into both the consensus handler and the
1410        // consensus adapter the consensus handler will write values forwarded
1411        // from consensus, and the consensus adapter will read the values to
1412        // make decisions about which validator submits a transaction to consensus
1413        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1414
1415        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1416
1417        let randomness_manager = RandomnessManager::try_new(
1418            Arc::downgrade(&epoch_store),
1419            Box::new(consensus_adapter.clone()),
1420            randomness_handle,
1421            config.authority_key_pair(),
1422        )
1423        .await;
1424        if let Some(randomness_manager) = randomness_manager {
1425            epoch_store
1426                .set_randomness_manager(randomness_manager)
1427                .await?;
1428        }
1429
1430        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1431            state.clone(),
1432            checkpoint_service.clone(),
1433            epoch_store.clone(),
1434            low_scoring_authorities,
1435            backpressure_manager,
1436        );
1437
1438        info!("Starting consensus manager");
1439
1440        consensus_manager
1441            .start(
1442                config,
1443                epoch_store.clone(),
1444                consensus_handler_initializer,
1445                IotaTxValidator::new(
1446                    epoch_store.clone(),
1447                    checkpoint_service.clone(),
1448                    state.transaction_manager().clone(),
1449                    iota_tx_validator_metrics.clone(),
1450                ),
1451            )
1452            .await;
1453
1454        info!("Spawning checkpoint service");
1455        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1456
1457        if epoch_store.authenticator_state_enabled() {
1458            Self::start_jwk_updater(
1459                config,
1460                iota_node_metrics,
1461                state.name,
1462                epoch_store.clone(),
1463                consensus_adapter.clone(),
1464            );
1465        }
1466
1467        Ok(ValidatorComponents {
1468            validator_server_handle,
1469            validator_overload_monitor_handle,
1470            consensus_manager,
1471            consensus_store_pruner,
1472            consensus_adapter,
1473            checkpoint_service_tasks,
1474            checkpoint_metrics,
1475            iota_tx_validator_metrics,
1476            validator_registry_id,
1477        })
1478    }
1479
1480    /// Starts the checkpoint service for the validator node, initializing
1481    /// necessary components and settings.
1482    /// The function ensures proper initialization of the checkpoint service,
1483    /// preparing it to handle checkpoint creation and submission to consensus,
1484    /// while also setting up the necessary monitoring and synchronization
1485    /// mechanisms.
1486    fn build_checkpoint_service(
1487        config: &NodeConfig,
1488        consensus_adapter: Arc<ConsensusAdapter>,
1489        checkpoint_store: Arc<CheckpointStore>,
1490        epoch_store: Arc<AuthorityPerEpochStore>,
1491        state: Arc<AuthorityState>,
1492        state_sync_handle: state_sync::Handle,
1493        accumulator: Weak<StateAccumulator>,
1494        checkpoint_metrics: Arc<CheckpointMetrics>,
1495    ) -> Arc<CheckpointService> {
1496        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1497        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1498
1499        debug!(
1500            "Starting checkpoint service with epoch start timestamp {}
1501            and epoch duration {}",
1502            epoch_start_timestamp_ms, epoch_duration_ms
1503        );
1504
1505        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1506            sender: consensus_adapter,
1507            signer: state.secret.clone(),
1508            authority: config.authority_public_key(),
1509            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1510                .checked_add(epoch_duration_ms)
1511                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1512            metrics: checkpoint_metrics.clone(),
1513        });
1514
1515        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1516        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1517        let max_checkpoint_size_bytes =
1518            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1519
1520        CheckpointService::build(
1521            state.clone(),
1522            checkpoint_store,
1523            epoch_store,
1524            state.get_transaction_cache_reader().clone(),
1525            accumulator,
1526            checkpoint_output,
1527            Box::new(certified_checkpoint_output),
1528            checkpoint_metrics,
1529            max_tx_per_checkpoint,
1530            max_checkpoint_size_bytes,
1531        )
1532    }
1533
1534    fn construct_consensus_adapter(
1535        committee: &Committee,
1536        consensus_config: &ConsensusConfig,
1537        authority: AuthorityName,
1538        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1539        prometheus_registry: &Registry,
1540        consensus_client: Arc<dyn ConsensusClient>,
1541        checkpoint_store: Arc<CheckpointStore>,
1542    ) -> ConsensusAdapter {
1543        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1544        // The consensus adapter allows the authority to send user certificates through
1545        // consensus.
1546
1547        ConsensusAdapter::new(
1548            consensus_client,
1549            checkpoint_store,
1550            authority,
1551            connection_monitor_status,
1552            consensus_config.max_pending_transactions(),
1553            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1554            consensus_config.max_submit_position,
1555            consensus_config.submit_delay_step_override(),
1556            ca_metrics,
1557        )
1558    }
1559
1560    async fn start_grpc_validator_service(
1561        config: &NodeConfig,
1562        state: Arc<AuthorityState>,
1563        consensus_adapter: Arc<ConsensusAdapter>,
1564        prometheus_registry: &Registry,
1565    ) -> Result<SpawnOnce> {
1566        let validator_service = ValidatorService::new(
1567            state,
1568            consensus_adapter,
1569            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1570            TrafficControllerMetrics::new(prometheus_registry),
1571            config.policy_config.clone(),
1572            config.firewall_config.clone(),
1573        );
1574
1575        let mut server_conf = iota_network_stack::config::Config::new();
1576        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1577        server_conf.load_shed = config.grpc_load_shed;
1578        let server_builder =
1579            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1580                .add_service(ValidatorServer::new(validator_service));
1581
1582        let tls_config = iota_tls::create_rustls_server_config(
1583            config.network_key_pair().copy().private(),
1584            IOTA_TLS_SERVER_NAME.to_string(),
1585        );
1586
1587        let network_address = config.network_address().clone();
1588
1589        let bind_future = async move {
1590            let server = server_builder
1591                .bind(&network_address, Some(tls_config))
1592                .await
1593                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1594
1595            let local_addr = server.local_addr();
1596            info!("Listening to traffic on {local_addr}");
1597
1598            Ok(server)
1599        };
1600
1601        Ok(SpawnOnce::new(bind_future))
1602    }
1603
1604    /// Re-executes pending consensus certificates, which may not have been
1605    /// committed to disk before the node restarted. This is necessary for
1606    /// the following reasons:
1607    ///
1608    /// 1. For any transaction for which we returned signed effects to a client,
1609    ///    we must ensure that we have re-executed the transaction before we
1610    ///    begin accepting grpc requests. Otherwise we would appear to have
1611    ///    forgotten about the transaction.
1612    /// 2. While this is running, we are concurrently waiting for all previously
1613    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1614    ///    either direction (from checkpointed consensus transactions to pending
1615    ///    consensus transactions, or vice versa), we must re-execute pending
1616    ///    consensus transactions to ensure that both processes can complete.
1617    /// 3. Also note that for any pending consensus transactions for which we
1618    ///    wrote a signed effects digest to disk, we must re-execute using that
1619    ///    digest as the expected effects digest, to ensure that we cannot
1620    ///    arrive at different effects than what we previously signed.
1621    async fn reexecute_pending_consensus_certs(
1622        epoch_store: &Arc<AuthorityPerEpochStore>,
1623        state: &Arc<AuthorityState>,
1624    ) {
1625        let mut pending_consensus_certificates = Vec::new();
1626        let mut additional_certs = Vec::new();
1627
1628        for tx in epoch_store.get_all_pending_consensus_transactions() {
1629            match tx.kind {
1630                // Shared object txns cannot be re-executed at this point, because we must wait for
1631                // consensus replay to assign shared object versions.
1632                ConsensusTransactionKind::CertifiedTransaction(tx)
1633                    if !tx.contains_shared_object() =>
1634                {
1635                    let tx = *tx;
1636                    // new_unchecked is safe because we never submit a transaction to consensus
1637                    // without verifying it
1638                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1639                        VerifiedCertificate::new_unchecked(tx),
1640                    );
1641                    // we only need to re-execute if we previously signed the effects (which
1642                    // indicates we returned the effects to a client).
1643                    if let Some(fx_digest) = epoch_store
1644                        .get_signed_effects_digest(tx.digest())
1645                        .expect("db error")
1646                    {
1647                        pending_consensus_certificates.push((tx, fx_digest));
1648                    } else {
1649                        additional_certs.push(tx);
1650                    }
1651                }
1652                _ => (),
1653            }
1654        }
1655
1656        let digests = pending_consensus_certificates
1657            .iter()
1658            .map(|(tx, _)| *tx.digest())
1659            .collect::<Vec<_>>();
1660
1661        info!(
1662            "reexecuting {} pending consensus certificates: {:?}",
1663            digests.len(),
1664            digests
1665        );
1666
1667        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1668        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1669
1670        // If this times out, the validator will still almost certainly start up fine.
1671        // But, it is possible that it may temporarily "forget" about
1672        // transactions that it had previously executed. This could confuse
1673        // clients in some circumstances. However, the transactions are still in
1674        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1675        let timeout = if cfg!(msim) { 120 } else { 60 };
1676        if tokio::time::timeout(
1677            std::time::Duration::from_secs(timeout),
1678            state
1679                .get_transaction_cache_reader()
1680                .try_notify_read_executed_effects_digests(&digests),
1681        )
1682        .await
1683        .is_err()
1684        {
1685            // Log all the digests that were not executed to help debugging.
1686            if let Ok(executed_effects_digests) = state
1687                .get_transaction_cache_reader()
1688                .try_multi_get_executed_effects_digests(&digests)
1689            {
1690                let pending_digests = digests
1691                    .iter()
1692                    .zip(executed_effects_digests.iter())
1693                    .filter_map(|(digest, executed_effects_digest)| {
1694                        if executed_effects_digest.is_none() {
1695                            Some(digest)
1696                        } else {
1697                            None
1698                        }
1699                    })
1700                    .collect::<Vec<_>>();
1701                debug_fatal!(
1702                    "Timed out waiting for effects digests to be executed: {:?}",
1703                    pending_digests
1704                );
1705            } else {
1706                debug_fatal!(
1707                    "Timed out waiting for effects digests to be executed, digests not found"
1708                );
1709            }
1710        }
1711    }
1712
1713    pub fn state(&self) -> Arc<AuthorityState> {
1714        self.state.clone()
1715    }
1716
1717    // Only used for testing because of how epoch store is loaded.
1718    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1719        self.state.reference_gas_price_for_testing()
1720    }
1721
1722    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1723        self.state.committee_store().clone()
1724    }
1725
1726    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1727    // self.state.db()
1728    // }
1729
1730    /// Clone an AuthorityAggregator currently used in this node's
1731    /// QuorumDriver, if the node is a fullnode. After reconfig,
1732    /// QuorumDriver builds a new AuthorityAggregator. The caller
1733    /// of this function will mostly likely want to call this again
1734    /// to get a fresh one.
1735    pub fn clone_authority_aggregator(
1736        &self,
1737    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1738        self.transaction_orchestrator
1739            .as_ref()
1740            .map(|to| to.clone_authority_aggregator())
1741    }
1742
1743    pub fn transaction_orchestrator(
1744        &self,
1745    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1746        self.transaction_orchestrator.clone()
1747    }
1748
1749    pub fn subscribe_to_transaction_orchestrator_effects(
1750        &self,
1751    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1752        self.transaction_orchestrator
1753            .as_ref()
1754            .map(|to| to.subscribe_to_effects_queue())
1755            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1756    }
1757
1758    /// This function awaits the completion of checkpoint execution of the
1759    /// current epoch, after which it initiates reconfiguration of the
1760    /// entire system. This function also handles role changes for the node when
1761    /// epoch changes and advertises capabilities to the committee if the node
1762    /// is a validator.
1763    pub async fn monitor_reconfiguration(
1764        self: Arc<Self>,
1765        mut epoch_store: Arc<AuthorityPerEpochStore>,
1766    ) -> Result<()> {
1767        let checkpoint_executor_metrics =
1768            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1769
1770        loop {
1771            let mut accumulator_guard = self.accumulator.lock().await;
1772            let accumulator = accumulator_guard.take().unwrap();
1773            info!(
1774                "Creating checkpoint executor for epoch {}",
1775                epoch_store.epoch()
1776            );
1777
1778            // Create closures that handle gRPC type conversion
1779            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1780                guard.as_ref().map(|handle| {
1781                    let tx = handle.checkpoint_data_broadcaster().clone();
1782                    Box::new(move |data: &CheckpointData| {
1783                        tx.send_traced(data);
1784                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1785                })
1786            } else {
1787                None
1788            };
1789
1790            let checkpoint_executor = CheckpointExecutor::new(
1791                epoch_store.clone(),
1792                self.checkpoint_store.clone(),
1793                self.state.clone(),
1794                accumulator.clone(),
1795                self.backpressure_manager.clone(),
1796                self.config.checkpoint_executor_config.clone(),
1797                checkpoint_executor_metrics.clone(),
1798                data_sender,
1799            );
1800
1801            let run_with_range = self.config.run_with_range;
1802
1803            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1804
1805            // Advertise capabilities to committee, if we are a validator.
1806            if let Some(components) = &*self.validator_components.lock().await {
1807                // TODO: without this sleep, the consensus message is not delivered reliably.
1808                tokio::time::sleep(Duration::from_millis(1)).await;
1809
1810                let config = cur_epoch_store.protocol_config();
1811                let binary_config = to_binary_config(config);
1812                let transaction = ConsensusTransaction::new_capability_notification_v1(
1813                    AuthorityCapabilitiesV1::new(
1814                        self.state.name,
1815                        cur_epoch_store.get_chain_identifier().chain(),
1816                        self.config
1817                            .supported_protocol_versions
1818                            .expect("Supported versions should be populated")
1819                            // no need to send digests of versions less than the current version
1820                            .truncate_below(config.version),
1821                        self.state
1822                            .get_available_system_packages(&binary_config)
1823                            .await,
1824                    ),
1825                );
1826                info!(?transaction, "submitting capabilities to consensus");
1827                components
1828                    .consensus_adapter
1829                    .submit(transaction, None, &cur_epoch_store)?;
1830            } else if self.state.is_active_validator(&cur_epoch_store)
1831                && cur_epoch_store
1832                    .protocol_config()
1833                    .track_non_committee_eligible_validators()
1834            {
1835                // Send signed capabilities to committee validators if we are a non-committee
1836                // validator in a separate task to not block the caller. Sending is done only if
1837                // the feature flag supporting it is enabled.
1838                let epoch_store = cur_epoch_store.clone();
1839                let node_clone = self.clone();
1840                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1841                    node_clone
1842                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1843                        .instrument(trace_span!(
1844                            "send_signed_capability_notification_to_committee_with_retry"
1845                        ))
1846                        .await;
1847                }));
1848            }
1849
1850            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1851
1852            if stop_condition == StopReason::RunWithRangeCondition {
1853                IotaNode::shutdown(&self).await;
1854                self.shutdown_channel_tx
1855                    .send(run_with_range)
1856                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1857                return Ok(());
1858            }
1859
1860            // Safe to call because we are in the middle of reconfiguration.
1861            let latest_system_state = self
1862                .state
1863                .get_object_cache_reader()
1864                .try_get_iota_system_state_object_unsafe()
1865                .expect("Read IOTA System State object cannot fail");
1866
1867            #[cfg(msim)]
1868            if !self
1869                .sim_state
1870                .sim_safe_mode_expected
1871                .load(Ordering::Relaxed)
1872            {
1873                debug_assert!(!latest_system_state.safe_mode());
1874            }
1875
1876            #[cfg(not(msim))]
1877            debug_assert!(!latest_system_state.safe_mode());
1878
1879            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1880                if self.state.is_fullnode(&cur_epoch_store) {
1881                    warn!(
1882                        "Failed to send end of epoch notification to subscriber: {:?}",
1883                        err
1884                    );
1885                }
1886            }
1887
1888            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1889            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1890
1891            self.auth_agg.store(Arc::new(
1892                self.auth_agg
1893                    .load()
1894                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1895            ));
1896
1897            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1898            let next_epoch = next_epoch_committee.epoch();
1899            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1900
1901            info!(
1902                next_epoch,
1903                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1904            );
1905
1906            fail_point_async!("reconfig_delay");
1907
1908            // We save the connection monitor status map regardless of validator / fullnode
1909            // status so that we don't need to restart the connection monitor
1910            // every epoch. Update the mappings that will be used by the
1911            // consensus adapter if it exists or is about to be created.
1912            let authority_names_to_peer_ids =
1913                new_epoch_start_state.get_authority_names_to_peer_ids();
1914            self.connection_monitor_status
1915                .update_mapping_for_epoch(authority_names_to_peer_ids);
1916
1917            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1918
1919            send_trusted_peer_change(
1920                &self.config,
1921                &self.trusted_peer_change_tx,
1922                &new_epoch_start_state,
1923            );
1924
1925            let mut validator_components_lock_guard = self.validator_components.lock().await;
1926
1927            // The following code handles 4 different cases, depending on whether the node
1928            // was a validator in the previous epoch, and whether the node is a validator
1929            // in the new epoch.
1930            let new_epoch_store = self
1931                .reconfigure_state(
1932                    &self.state,
1933                    &cur_epoch_store,
1934                    next_epoch_committee.clone(),
1935                    new_epoch_start_state,
1936                    accumulator.clone(),
1937                )
1938                .await?;
1939
1940            let new_validator_components = if let Some(ValidatorComponents {
1941                validator_server_handle,
1942                validator_overload_monitor_handle,
1943                consensus_manager,
1944                consensus_store_pruner,
1945                consensus_adapter,
1946                mut checkpoint_service_tasks,
1947                checkpoint_metrics,
1948                iota_tx_validator_metrics,
1949                validator_registry_id,
1950            }) = validator_components_lock_guard.take()
1951            {
1952                info!("Reconfiguring the validator.");
1953                // Cancel the old checkpoint service tasks.
1954                // Waiting for checkpoint builder to finish gracefully is not possible, because
1955                // it may wait on transactions while consensus on peers have
1956                // already shut down.
1957                checkpoint_service_tasks.abort_all();
1958                while let Some(result) = checkpoint_service_tasks.join_next().await {
1959                    if let Err(err) = result {
1960                        if err.is_panic() {
1961                            std::panic::resume_unwind(err.into_panic());
1962                        }
1963                        warn!("Error in checkpoint service task: {:?}", err);
1964                    }
1965                }
1966                info!("Checkpoint service has shut down.");
1967
1968                consensus_manager.shutdown().await;
1969                info!("Consensus has shut down.");
1970
1971                info!("Epoch store finished reconfiguration.");
1972
1973                // No other components should be holding a strong reference to state accumulator
1974                // at this point. Confirm here before we swap in the new accumulator.
1975                let accumulator_metrics = Arc::into_inner(accumulator)
1976                    .expect("Accumulator should have no other references at this point")
1977                    .metrics();
1978                let new_accumulator = Arc::new(StateAccumulator::new(
1979                    self.state.get_accumulator_store().clone(),
1980                    accumulator_metrics,
1981                ));
1982                let weak_accumulator = Arc::downgrade(&new_accumulator);
1983                *accumulator_guard = Some(new_accumulator);
1984
1985                consensus_store_pruner.prune(next_epoch).await;
1986
1987                if self.state.is_committee_validator(&new_epoch_store) {
1988                    // Only restart consensus if this node is still a validator in the new epoch.
1989                    Some(
1990                        Self::start_epoch_specific_validator_components(
1991                            &self.config,
1992                            self.state.clone(),
1993                            consensus_adapter,
1994                            self.checkpoint_store.clone(),
1995                            new_epoch_store.clone(),
1996                            self.state_sync_handle.clone(),
1997                            self.randomness_handle.clone(),
1998                            consensus_manager,
1999                            consensus_store_pruner,
2000                            weak_accumulator,
2001                            self.backpressure_manager.clone(),
2002                            validator_server_handle,
2003                            validator_overload_monitor_handle,
2004                            checkpoint_metrics,
2005                            self.metrics.clone(),
2006                            iota_tx_validator_metrics,
2007                            validator_registry_id,
2008                        )
2009                        .await?,
2010                    )
2011                } else {
2012                    info!("This node is no longer a validator after reconfiguration");
2013                    if self.registry_service.remove(validator_registry_id) {
2014                        debug!("Removed validator metrics registry");
2015                    } else {
2016                        warn!("Failed to remove validator metrics registry");
2017                    }
2018                    validator_server_handle.shutdown();
2019                    debug!("Validator grpc server shutdown triggered");
2020
2021                    None
2022                }
2023            } else {
2024                // No other components should be holding a strong reference to state accumulator
2025                // at this point. Confirm here before we swap in the new accumulator.
2026                let accumulator_metrics = Arc::into_inner(accumulator)
2027                    .expect("Accumulator should have no other references at this point")
2028                    .metrics();
2029                let new_accumulator = Arc::new(StateAccumulator::new(
2030                    self.state.get_accumulator_store().clone(),
2031                    accumulator_metrics,
2032                ));
2033                let weak_accumulator = Arc::downgrade(&new_accumulator);
2034                *accumulator_guard = Some(new_accumulator);
2035
2036                if self.state.is_committee_validator(&new_epoch_store) {
2037                    info!("Promoting the node from fullnode to validator, starting grpc server");
2038
2039                    let mut components = Self::construct_validator_components(
2040                        self.config.clone(),
2041                        self.state.clone(),
2042                        Arc::new(next_epoch_committee.clone()),
2043                        new_epoch_store.clone(),
2044                        self.checkpoint_store.clone(),
2045                        self.state_sync_handle.clone(),
2046                        self.randomness_handle.clone(),
2047                        weak_accumulator,
2048                        self.backpressure_manager.clone(),
2049                        self.connection_monitor_status.clone(),
2050                        &self.registry_service,
2051                        self.metrics.clone(),
2052                    )
2053                    .await?;
2054
2055                    components.validator_server_handle =
2056                        components.validator_server_handle.start().await;
2057
2058                    Some(components)
2059                } else {
2060                    None
2061                }
2062            };
2063            *validator_components_lock_guard = new_validator_components;
2064
2065            // Force releasing current epoch store DB handle, because the
2066            // Arc<AuthorityPerEpochStore> may linger.
2067            cur_epoch_store.release_db_handles();
2068
2069            if cfg!(msim)
2070                && !matches!(
2071                    self.config
2072                        .authority_store_pruning_config
2073                        .num_epochs_to_retain_for_checkpoints(),
2074                    None | Some(u64::MAX) | Some(0)
2075                )
2076            {
2077                self.state
2078                .prune_checkpoints_for_eligible_epochs_for_testing(
2079                    self.config.clone(),
2080                    iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2081                )
2082                .await?;
2083            }
2084
2085            epoch_store = new_epoch_store;
2086            info!("Reconfiguration finished");
2087        }
2088    }
2089
2090    async fn shutdown(&self) {
2091        if let Some(validator_components) = &*self.validator_components.lock().await {
2092            validator_components.consensus_manager.shutdown().await;
2093        }
2094
2095        // Shutdown the gRPC server if it's running
2096        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2097            info!("Shutting down gRPC server");
2098            if let Err(e) = grpc_handle.shutdown().await {
2099                warn!("Failed to gracefully shutdown gRPC server: {e}");
2100            }
2101        }
2102    }
2103
2104    /// Asynchronously reconfigures the state of the authority node for the next
2105    /// epoch.
2106    async fn reconfigure_state(
2107        &self,
2108        state: &Arc<AuthorityState>,
2109        cur_epoch_store: &AuthorityPerEpochStore,
2110        next_epoch_committee: Committee,
2111        next_epoch_start_system_state: EpochStartSystemState,
2112        accumulator: Arc<StateAccumulator>,
2113    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2114        let next_epoch = next_epoch_committee.epoch();
2115
2116        let last_checkpoint = self
2117            .checkpoint_store
2118            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2119            .expect("Error loading last checkpoint for current epoch")
2120            .expect("Could not load last checkpoint for current epoch");
2121        let epoch_supply_change = last_checkpoint
2122            .end_of_epoch_data
2123            .as_ref()
2124            .ok_or_else(|| {
2125                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2126            })?
2127            .epoch_supply_change;
2128
2129        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2130
2131        assert_eq!(
2132            Some(last_checkpoint_seq),
2133            self.checkpoint_store
2134                .get_highest_executed_checkpoint_seq_number()
2135                .expect("Error loading highest executed checkpoint sequence number")
2136        );
2137
2138        let epoch_start_configuration = EpochStartConfiguration::new(
2139            next_epoch_start_system_state,
2140            *last_checkpoint.digest(),
2141            state.get_object_store().as_ref(),
2142            EpochFlag::default_flags_for_new_epoch(&state.config),
2143        )
2144        .expect("EpochStartConfiguration construction cannot fail");
2145
2146        let new_epoch_store = self
2147            .state
2148            .reconfigure(
2149                cur_epoch_store,
2150                self.config.supported_protocol_versions.unwrap(),
2151                next_epoch_committee,
2152                epoch_start_configuration,
2153                accumulator,
2154                &self.config.expensive_safety_check_config,
2155                epoch_supply_change,
2156                last_checkpoint_seq,
2157            )
2158            .await
2159            .expect("Reconfigure authority state cannot fail");
2160        info!(next_epoch, "Node State has been reconfigured");
2161        assert_eq!(next_epoch, new_epoch_store.epoch());
2162        self.state.get_reconfig_api().update_epoch_flags_metrics(
2163            cur_epoch_store.epoch_start_config().flags(),
2164            new_epoch_store.epoch_start_config().flags(),
2165        );
2166
2167        Ok(new_epoch_store)
2168    }
2169
2170    pub fn get_config(&self) -> &NodeConfig {
2171        &self.config
2172    }
2173
2174    async fn execute_transaction_immediately_at_zero_epoch(
2175        state: &Arc<AuthorityState>,
2176        epoch_store: &Arc<AuthorityPerEpochStore>,
2177        tx: &Transaction,
2178        span: tracing::Span,
2179    ) {
2180        let _guard = span.enter();
2181        let transaction =
2182            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2183                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2184                    tx.data().clone(),
2185                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2186                ),
2187            );
2188        state
2189            .try_execute_immediately(&transaction, None, epoch_store)
2190            .unwrap();
2191    }
2192
2193    pub fn randomness_handle(&self) -> randomness::Handle {
2194        self.randomness_handle.clone()
2195    }
2196
2197    /// Sends signed capability notification to committee validators for
2198    /// non-committee validators. This method implements retry logic to handle
2199    /// failed attempts to send the notification. It will retry sending the
2200    /// notification with an increasing interval until it receives a successful
2201    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2202    async fn send_signed_capability_notification_to_committee_with_retry(
2203        &self,
2204        epoch_store: &Arc<AuthorityPerEpochStore>,
2205    ) {
2206        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2207        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2208        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2209
2210        // Create the capability notification once
2211        let config = epoch_store.protocol_config();
2212        let binary_config = to_binary_config(config);
2213
2214        // Create the capability notification
2215        let capabilities = AuthorityCapabilitiesV1::new(
2216            self.state.name,
2217            epoch_store.get_chain_identifier().chain(),
2218            self.config
2219                .supported_protocol_versions
2220                .expect("Supported versions should be populated")
2221                .truncate_below(config.version),
2222            self.state
2223                .get_available_system_packages(&binary_config)
2224                .await,
2225        );
2226
2227        // Sign the capabilities using the authority key pair from config
2228        let signature = AuthoritySignature::new_secure(
2229            &IntentMessage::new(
2230                Intent::iota_app(IntentScope::AuthorityCapabilities),
2231                &capabilities,
2232            ),
2233            &epoch_store.epoch(),
2234            self.config.authority_key_pair(),
2235        );
2236
2237        let request = HandleCapabilityNotificationRequestV1 {
2238            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2239        };
2240
2241        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2242
2243        loop {
2244            let auth_agg = self.auth_agg.load();
2245            match auth_agg
2246                .send_capability_notification_to_quorum(request.clone())
2247                .await
2248            {
2249                Ok(_) => {
2250                    info!("Successfully sent capability notification to committee");
2251                    break;
2252                }
2253                Err(err) => {
2254                    match &err {
2255                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2256                            errors,
2257                        } => {
2258                            warn!(
2259                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2260                                retry_interval, errors
2261                            );
2262                        }
2263                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2264                            errors,
2265                        } => {
2266                            error!(
2267                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2268                                errors
2269                            );
2270                            break;
2271                        }
2272                    };
2273
2274                    // Wait before retrying
2275                    tokio::time::sleep(retry_interval).await;
2276
2277                    // Increase retry interval for the next attempt, capped at max
2278                    retry_interval = std::cmp::min(
2279                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2280                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2281                    );
2282                }
2283            }
2284        }
2285    }
2286}
2287
2288#[cfg(not(msim))]
2289impl IotaNode {
2290    async fn fetch_jwks(
2291        _authority: AuthorityName,
2292        provider: &OIDCProvider,
2293    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2294        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2295        let client = reqwest::Client::new();
2296        fetch_jwks(provider, &client)
2297            .await
2298            .map_err(|_| IotaError::JWKRetrieval)
2299    }
2300}
2301
2302#[cfg(msim)]
2303impl IotaNode {
2304    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2305        self.sim_state.sim_node.id()
2306    }
2307
2308    pub fn set_safe_mode_expected(&self, new_value: bool) {
2309        info!("Setting safe mode expected to {}", new_value);
2310        self.sim_state
2311            .sim_safe_mode_expected
2312            .store(new_value, Ordering::Relaxed);
2313    }
2314
2315    async fn fetch_jwks(
2316        authority: AuthorityName,
2317        provider: &OIDCProvider,
2318    ) -> IotaResult<Vec<(JwkId, JWK)>> {
2319        get_jwk_injector()(authority, provider)
2320    }
2321}
2322
2323enum SpawnOnce {
2324    // Mutex is only needed to make SpawnOnce Sync
2325    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2326    #[allow(unused)]
2327    Started(iota_http::ServerHandle),
2328}
2329
2330impl SpawnOnce {
2331    pub fn new(
2332        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2333    ) -> Self {
2334        Self::Unstarted(Mutex::new(Box::pin(future)))
2335    }
2336
2337    pub async fn start(self) -> Self {
2338        match self {
2339            Self::Unstarted(future) => {
2340                let server = future
2341                    .into_inner()
2342                    .await
2343                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2344                let handle = server.handle().clone();
2345                tokio::spawn(async move {
2346                    if let Err(err) = server.serve().await {
2347                        info!("Server stopped: {err}");
2348                    }
2349                    info!("Server stopped");
2350                });
2351                Self::Started(handle)
2352            }
2353            Self::Started(_) => self,
2354        }
2355    }
2356
2357    pub fn shutdown(self) {
2358        if let SpawnOnce::Started(handle) = self {
2359            handle.trigger_shutdown();
2360        }
2361    }
2362}
2363
2364/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2365/// available.
2366fn send_trusted_peer_change(
2367    config: &NodeConfig,
2368    sender: &watch::Sender<TrustedPeerChangeEvent>,
2369    new_epoch_start_state: &EpochStartSystemState,
2370) {
2371    let new_committee =
2372        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2373
2374    sender.send_modify(|event| {
2375        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2376        event.new_committee = new_committee;
2377    })
2378}
2379
2380fn build_kv_store(
2381    state: &Arc<AuthorityState>,
2382    config: &NodeConfig,
2383    registry: &Registry,
2384) -> Result<Arc<TransactionKeyValueStore>> {
2385    let metrics = KeyValueStoreMetrics::new(registry);
2386    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2387
2388    let base_url = &config.transaction_kv_store_read_config.base_url;
2389
2390    if base_url.is_empty() {
2391        info!("no http kv store url provided, using local db only");
2392        return Ok(Arc::new(db_store));
2393    }
2394
2395    base_url.parse::<url::Url>().tap_err(|e| {
2396        error!(
2397            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2398            base_url, e
2399        )
2400    })?;
2401
2402    let http_store = HttpKVStore::new_kv(
2403        base_url,
2404        config.transaction_kv_store_read_config.cache_size,
2405        metrics.clone(),
2406    )?;
2407    info!("using local key-value store with fallback to http key-value store");
2408    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2409        db_store,
2410        http_store,
2411        metrics,
2412        "json_rpc_fallback",
2413    )))
2414}
2415
2416/// Builds and starts the gRPC server for the IOTA node based on the node's
2417/// configuration.
2418///
2419/// This function performs the following tasks:
2420/// 1. Checks if the node is a validator by inspecting the consensus
2421///    configuration; if so, it returns early as validators do not expose gRPC
2422///    APIs.
2423/// 2. Checks if gRPC is enabled in the configuration.
2424/// 3. Creates broadcast channels for checkpoint streaming.
2425/// 4. Initializes the gRPC checkpoint service.
2426/// 5. Spawns the gRPC server to listen for incoming connections.
2427///
2428/// Returns a tuple of optional broadcast channels for checkpoint summary and
2429/// data.
2430async fn build_grpc_server(
2431    config: &NodeConfig,
2432    state: Arc<AuthorityState>,
2433    state_sync_store: RocksDbStore,
2434    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2435    prometheus_registry: &Registry,
2436    server_version: ServerVersion,
2437) -> Result<Option<GrpcServerHandle>> {
2438    // Validators do not expose gRPC APIs
2439    if config.consensus_config().is_some() || !config.enable_grpc_api {
2440        return Ok(None);
2441    }
2442
2443    let Some(grpc_config) = &config.grpc_api_config else {
2444        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2445    };
2446
2447    // Get chain identifier from state directly
2448    let chain_id = state.get_chain_identifier();
2449
2450    let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2451
2452    // Create cancellation token for proper shutdown hierarchy
2453    let shutdown_token = CancellationToken::new();
2454
2455    // Create GrpcReader
2456    let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(
2457        rest_read_store,
2458        Some(server_version.to_string()),
2459    ));
2460
2461    // Create gRPC server metrics
2462    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2463
2464    // Pass the same token to both GrpcReader (already done above) and
2465    // start_grpc_server
2466    let handle = start_grpc_server(
2467        grpc_reader,
2468        executor,
2469        grpc_config.clone(),
2470        shutdown_token,
2471        chain_id,
2472        Some(grpc_server_metrics),
2473    )
2474    .await?;
2475
2476    Ok(Some(handle))
2477}
2478
2479/// Builds and starts the HTTP server for the IOTA node, exposing JSON-RPC and
2480/// REST APIs based on the node's configuration.
2481///
2482/// This function performs the following tasks:
2483/// 1. Checks if the node is a validator by inspecting the consensus
2484///    configuration; if so, it returns early as validators do not expose these
2485///    APIs.
2486/// 2. Creates an Axum router to handle HTTP requests.
2487/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2488///    on the node's state and configuration, including CoinApi,
2489///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2490///    IndexerApi.
2491/// 4. Optionally, if the REST API is enabled, nests the REST API router under
2492///    the `/api/v1` path.
2493/// 5. Binds the server to the specified JSON-RPC address and starts listening
2494///    for incoming connections.
2495pub async fn build_http_server(
2496    state: Arc<AuthorityState>,
2497    store: RocksDbStore,
2498    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2499    config: &NodeConfig,
2500    prometheus_registry: &Registry,
2501    server_version: ServerVersion,
2502) -> Result<Option<iota_http::ServerHandle>> {
2503    // Validators do not expose these APIs
2504    if config.consensus_config().is_some() {
2505        return Ok(None);
2506    }
2507
2508    let mut router = axum::Router::new();
2509
2510    let json_rpc_router = {
2511        let mut server = JsonRpcServerBuilder::new(
2512            env!("CARGO_PKG_VERSION"),
2513            prometheus_registry,
2514            config.policy_config.clone(),
2515            config.firewall_config.clone(),
2516        );
2517
2518        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2519
2520        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2521        server.register_module(ReadApi::new(
2522            state.clone(),
2523            kv_store.clone(),
2524            metrics.clone(),
2525        ))?;
2526        server.register_module(CoinReadApi::new(
2527            state.clone(),
2528            kv_store.clone(),
2529            metrics.clone(),
2530        )?)?;
2531
2532        // if run_with_range is enabled we want to prevent any transactions
2533        // run_with_range = None is normal operating conditions
2534        if config.run_with_range.is_none() {
2535            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2536        }
2537        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2538
2539        if let Some(transaction_orchestrator) = transaction_orchestrator {
2540            server.register_module(TransactionExecutionApi::new(
2541                state.clone(),
2542                transaction_orchestrator.clone(),
2543                metrics.clone(),
2544            ))?;
2545        }
2546
2547        let iota_names_config = config
2548            .iota_names_config
2549            .clone()
2550            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2551
2552        server.register_module(IndexerApi::new(
2553            state.clone(),
2554            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2555            kv_store,
2556            metrics,
2557            iota_names_config,
2558            config.indexer_max_subscriptions,
2559        ))?;
2560        server.register_module(MoveUtils::new(state.clone()))?;
2561
2562        let server_type = config.jsonrpc_server_type();
2563
2564        server.to_router(server_type).await?
2565    };
2566
2567    router = router.merge(json_rpc_router);
2568
2569    if config.enable_rest_api {
2570        let mut rest_service =
2571            iota_rest_api::RestService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2572        rest_service.with_server_version(server_version);
2573
2574        if let Some(config) = config.rest.clone() {
2575            rest_service.with_config(config);
2576        }
2577
2578        rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2579
2580        if let Some(transaction_orchestrator) = transaction_orchestrator {
2581            rest_service.with_executor(transaction_orchestrator.clone())
2582        }
2583
2584        router = router.merge(rest_service.into_router());
2585    }
2586
2587    // TODO: Remove this health check when experimental REST API becomes default
2588    // This is a copy of the health check in crates/iota-rest-api/src/health.rs
2589    router = router
2590        .route("/health", axum::routing::get(health_check_handler))
2591        .route_layer(axum::Extension(state));
2592
2593    let layers = ServiceBuilder::new()
2594        .map_request(|mut request: axum::http::Request<_>| {
2595            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2596                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2597                request.extensions_mut().insert(axum_connect_info);
2598            }
2599            request
2600        })
2601        .layer(axum::middleware::from_fn(server_timing_middleware));
2602
2603    router = router.layer(layers);
2604
2605    let handle = iota_http::Builder::new()
2606        .serve(&config.json_rpc_address, router)
2607        .map_err(|e| anyhow::anyhow!("{e}"))?;
2608    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2609
2610    Ok(Some(handle))
2611}
2612
2613#[derive(Debug, serde::Serialize, serde::Deserialize)]
2614pub struct Threshold {
2615    pub threshold_seconds: Option<u32>,
2616}
2617
2618async fn health_check_handler(
2619    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2620    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2621) -> impl axum::response::IntoResponse {
2622    if let Some(threshold_seconds) = threshold_seconds {
2623        // Attempt to get the latest checkpoint
2624        let summary = match state
2625            .get_checkpoint_store()
2626            .get_highest_executed_checkpoint()
2627        {
2628            Ok(Some(summary)) => summary,
2629            Ok(None) => {
2630                warn!("Highest executed checkpoint not found");
2631                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2632            }
2633            Err(err) => {
2634                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2635                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2636            }
2637        };
2638
2639        // Calculate the threshold time based on the provided threshold_seconds
2640        let latest_chain_time = summary.timestamp();
2641        let threshold =
2642            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2643
2644        // Check if the latest checkpoint is within the threshold
2645        if latest_chain_time < threshold {
2646            warn!(
2647                ?latest_chain_time,
2648                ?threshold,
2649                "failing health check due to checkpoint lag"
2650            );
2651            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2652        }
2653    }
2654    // if health endpoint is responding and no threshold is given, respond success
2655    (axum::http::StatusCode::OK, "up")
2656}
2657
2658#[cfg(not(test))]
2659fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2660    protocol_config.max_transactions_per_checkpoint() as usize
2661}
2662
2663#[cfg(test)]
2664fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2665    2
2666}