Skip to main content

iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::HashMap,
9    fmt,
10    future::Future,
11    path::PathBuf,
12    sync::{Arc, Weak},
13    time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18    callback::CallbackLayer,
19    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28    ConsensusConfig, NodeConfig,
29    node::{DBCheckpointConfig, RunWithRange},
30    node_config_metrics::NodeConfigMetrics,
31    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34    authority::{
35        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36        authority_per_epoch_store::AuthorityPerEpochStore,
37        authority_store_pruner::ObjectsCompactionFilter,
38        authority_store_tables::{
39            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40        },
41        backpressure::BackpressureManager,
42        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43    },
44    authority_aggregator::{
45        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46    },
47    authority_client::NetworkAuthorityClient,
48    authority_server::{
49        ValidatorService, ValidatorServiceMetrics, soft_lock::PreConsensusSoftLocks,
50    },
51    checkpoint_progress_tracker::CheckpointProgressTracker,
52    checkpoints::{
53        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
54        SubmitCheckpointToConsensus,
55        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
56    },
57    connection_monitor::ConnectionMonitor,
58    consensus_adapter::{
59        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
60        ConsensusClient,
61    },
62    consensus_handler::ConsensusHandlerInitializer,
63    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
64    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
65    db_checkpoint_handler::DBCheckpointHandler,
66    epoch::{
67        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
68        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
69        reconfiguration::ReconfigurationInitiator,
70    },
71    execution_cache::build_execution_cache,
72    global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
73    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
74    jsonrpc_index::IndexStore,
75    module_cache_metrics::ResolverMetrics,
76    overload_monitor::{consensus_queue_overload_monitor, overload_monitor},
77    safe_client::SafeClientMetricsBase,
78    signature_verifier::SignatureVerifierMetrics,
79    storage::{GrpcReadStore, RocksDbStore},
80    transaction_orchestrator::TransactionOrchestrator,
81    validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87    transaction_builder_api::TransactionBuilderApi,
88    transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93    RegistryID, RegistryService,
94    hardware_metrics::register_hardware_metrics,
95    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96    server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100    api::{ValidatorPeerServer, ValidatorServer, ValidatorV2Server},
101    discovery,
102    discovery::TrustedPeerChangeEvent,
103    randomness, state_sync,
104};
105use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
106use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
107use iota_sdk_types::{
108    RandomnessRound,
109    crypto::{Intent, IntentMessage, IntentScope},
110};
111use iota_snapshot::{
112    reader::{StateSnapshotReaderV1, latest_available_epoch},
113    uploader::StateSnapshotUploader,
114};
115use iota_storage::{
116    FileCompression, StorageFormat,
117    http_key_value_store::HttpKVStore,
118    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
119    key_value_store_metrics::KeyValueStoreMetrics,
120};
121use iota_types::{
122    base_types::{AuthorityName, ConciseableName, EpochId},
123    committee::Committee,
124    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits},
125    digests::{ChainIdentifier, get_devnet_chain_identifier},
126    error::{IotaError, IotaResult},
127    executable_transaction::VerifiedExecutableTransaction,
128    execution_config_utils::to_binary_config,
129    full_checkpoint_content::CheckpointData,
130    iota_system_state::{
131        IotaSystemState, IotaSystemStateTrait,
132        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
133    },
134    messages_consensus::{
135        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
136        SignedAuthorityCapabilitiesV1,
137    },
138    messages_grpc::HandleCapabilityNotificationRequestV1,
139    quorum_driver_types::QuorumDriverEffectsQueueResult,
140    supported_protocol_versions::SupportedProtocolVersions,
141    transaction::{Transaction, VerifiedCertificate},
142};
143use prometheus_filtered::Registry;
144#[cfg(msim)]
145use simulator::*;
146use tap::tap::TapFallible;
147use tokio::{
148    sync::{Mutex, broadcast, mpsc, watch},
149    task::{JoinHandle, JoinSet},
150};
151use tokio_util::sync::CancellationToken;
152use tower::ServiceBuilder;
153use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
154use typed_store::{
155    DBMetrics,
156    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
157};
158
159use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
160
161pub mod admin;
162mod handle;
163pub mod metrics;
164
165pub struct ValidatorComponents {
166    validator_server_handle: SpawnOnce,
167    validator_overload_monitor_handle: Option<JoinHandle<()>>,
168    /// Handle for the consensus queue overload monitor task, present only
169    /// when the certificate-less (P-COOL) flow is enabled. The
170    /// task self-terminates via `Weak` references; this handle exists purely
171    /// for ownership clarity.
172    consensus_queue_overload_monitor_handle: Option<JoinHandle<()>>,
173    /// Handle for the soft-lock expiry sweep task. The task self-terminates
174    /// via a `Weak` reference; this handle exists purely for ownership clarity.
175    soft_lock_sweep_handle: JoinHandle<()>,
176    overload_notifier_handle: Option<JoinHandle<()>>,
177    consensus_manager: Arc<ConsensusManager>,
178    consensus_store_pruner: ConsensusStorePruner,
179    consensus_adapter: Arc<ConsensusAdapter>,
180    soft_locks: Arc<PreConsensusSoftLocks>,
181    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
182    checkpoint_service_tasks: JoinSet<()>,
183    checkpoint_metrics: Arc<CheckpointMetrics>,
184    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
185    validator_registry_id: RegistryID,
186}
187
188#[cfg(msim)]
189mod simulator {
190    use std::sync::atomic::AtomicBool;
191
192    pub(super) struct SimState {
193        pub sim_node: iota_simulator::runtime::NodeHandle,
194        pub sim_safe_mode_expected: AtomicBool,
195        _leak_detector: iota_simulator::NodeLeakDetector,
196    }
197
198    impl Default for SimState {
199        fn default() -> Self {
200            Self {
201                sim_node: iota_simulator::runtime::NodeHandle::current(),
202                sim_safe_mode_expected: AtomicBool::new(false),
203                _leak_detector: iota_simulator::NodeLeakDetector::new(),
204            }
205        }
206    }
207}
208
209#[derive(Clone)]
210pub struct ServerVersion {
211    pub bin: &'static str,
212    pub version: &'static str,
213}
214
215impl ServerVersion {
216    pub fn new(bin: &'static str, version: &'static str) -> Self {
217        Self { bin, version }
218    }
219}
220
221impl std::fmt::Display for ServerVersion {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        f.write_str(self.bin)?;
224        f.write_str("/")?;
225        f.write_str(self.version)
226    }
227}
228
229pub struct IotaNode {
230    config: NodeConfig,
231    validator_components: Mutex<Option<ValidatorComponents>>,
232    /// The http server responsible for serving JSON-RPC
233    _http_server: Option<iota_http::ServerHandle>,
234    state: Arc<AuthorityState>,
235    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
236    registry_service: RegistryService,
237    metrics: Arc<IotaNodeMetrics>,
238
239    _discovery: discovery::Handle,
240    state_sync_handle: state_sync::Handle,
241    randomness_handle: randomness::Handle,
242    checkpoint_store: Arc<CheckpointStore>,
243    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
244    connection_monitor_status: Arc<ConnectionMonitorStatus>,
245
246    /// Broadcast channel to send the starting system state for the next epoch.
247    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
248
249    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
250    /// peers.
251    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
252
253    backpressure_manager: Arc<BackpressureManager>,
254
255    checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
256
257    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
258
259    #[cfg(msim)]
260    sim_state: SimState,
261
262    _state_archive_handle: Option<broadcast::Sender<()>>,
263
264    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
265    // Channel to allow signaling upstream to shutdown iota-node
266    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
267
268    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
269    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
270
271    /// AuthorityAggregator of the network, created at start and beginning of
272    /// each epoch. Use ArcSwap so that we could mutate it without taking
273    /// mut reference.
274    // TODO: Eventually we can make this auth aggregator a shared reference so that this
275    // update will automatically propagate to other uses.
276    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
277}
278
279impl fmt::Debug for IotaNode {
280    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
281        f.debug_struct("IotaNode")
282            .field("name", &self.state.name.concise())
283            .finish()
284    }
285}
286
287impl IotaNode {
288    pub async fn start(
289        config: NodeConfig,
290        registry_service: RegistryService,
291    ) -> Result<Arc<IotaNode>> {
292        Self::start_async(
293            config,
294            registry_service,
295            ServerVersion::new("iota-node", "unknown"),
296        )
297        .await
298    }
299
300    /// Starts a background task that polls the authority's load shedding
301    /// percentage and broadcasts changes to other validators via consensus.
302    /// Returns the task handle if the feature flag is enabled, or `None`
303    /// otherwise.
304    fn start_overload_notifier(
305        config: &NodeConfig,
306        state: Arc<AuthorityState>,
307        epoch_store: Arc<AuthorityPerEpochStore>,
308        consensus_adapter: Arc<ConsensusAdapter>,
309    ) -> Option<JoinHandle<()>> {
310        if !epoch_store.protocol_config().enable_pcool_flow() {
311            return None;
312        }
313
314        let poll_interval = config.authority_overload_config.overload_monitor_interval;
315        let authority_name = state.name;
316
317        Some(spawn_monitored_task!(async move {
318            // Seed from the percentage this authority last broadcasted
319            let mut last_notified_percentage: u32 = epoch_store
320                .load_overload_notification(&authority_name)
321                .unwrap_or(0) as u32;
322            loop {
323                tokio::time::sleep(poll_interval).await;
324                let current = state
325                    .overload_info
326                    .local_load_shedding_percentage
327                    .load(std::sync::atomic::Ordering::Relaxed);
328                if current != last_notified_percentage {
329                    last_notified_percentage = current;
330                    let transaction = ConsensusTransaction::new_overload_notification_v1(
331                        authority_name,
332                        current as u8,
333                    );
334                    if let Err(e) = consensus_adapter.submit(transaction, None, &epoch_store) {
335                        tracing::warn!(
336                            "Failed to submit overload notification to consensus: {:?}",
337                            e
338                        );
339                    }
340                }
341            }
342        }))
343    }
344
345    pub async fn start_async(
346        config: NodeConfig,
347        registry_service: RegistryService,
348        server_version: ServerVersion,
349    ) -> Result<Arc<IotaNode>> {
350        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
351        let mut config = config.clone();
352        if config.supported_protocol_versions.is_none() {
353            info!(
354                "populating config.supported_protocol_versions with default {:?}",
355                SupportedProtocolVersions::SYSTEM_DEFAULT
356            );
357            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
358        }
359
360        let run_with_range = config.run_with_range;
361        let is_validator = config.consensus_config().is_some();
362        let is_full_node = !is_validator;
363        let prometheus_registry = registry_service.default_registry();
364
365        info!(node =? config.authority_public_key(),
366            "Initializing iota-node listening on {}", config.network_address
367        );
368
369        let genesis = config.genesis()?.clone();
370
371        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
372        info!("IOTA chain identifier: {chain_identifier}");
373
374        // Check and set the db_corrupted flag
375        let db_corrupted_path = &config.db_path().join("status");
376        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
377            panic!("Failed to check database corruption: {err}");
378        }
379
380        // Initialize metrics to track db usage before creating any stores
381        DBMetrics::init(&prometheus_registry);
382
383        // Initialize IOTA metrics.
384        iota_metrics::init_metrics(&prometheus_registry);
385        // Unsupported (because of the use of static variable) and unnecessary in
386        // simtests.
387        #[cfg(not(msim))]
388        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
389
390        // Register hardware metrics.
391        register_hardware_metrics(&registry_service, &config.db_path)
392            .expect("Failed registering hardware metrics");
393        // Register uptime metric
394        prometheus_registry
395            .register(iota_metrics::uptime_metric(
396                if is_validator {
397                    "validator"
398                } else {
399                    "fullnode"
400                },
401                server_version.version,
402                &chain_identifier.to_string(),
403            ))
404            .expect("Failed registering uptime metric");
405
406        // If genesis come with some migration data then load them into memory from the
407        // file path specified in config.
408        let migration_tx_data = if genesis.contains_migrations() {
409            // Here the load already verifies that the content of the migration blob is
410            // valid in respect to the content found in genesis
411            Some(config.load_migration_tx_data()?)
412        } else {
413            None
414        };
415
416        let secret = Arc::pin(config.authority_key_pair().copy());
417        let genesis_committee = genesis.committee()?;
418        let committee_store = Arc::new(CommitteeStore::new(
419            config.db_path().join("epochs"),
420            &genesis_committee,
421            None,
422        ));
423
424        let mut pruner_db = None;
425        if config
426            .authority_store_pruning_config
427            .enable_compaction_filter
428        {
429            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
430                &config.db_path().join("store"),
431            )));
432        }
433        let compaction_filter = pruner_db
434            .clone()
435            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
436
437        // By default, only enable write stall on validators for perpetual db.
438        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
439        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
440            enable_write_stall,
441            compaction_filter,
442        };
443        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
444            &config.db_path().join("store"),
445            Some(perpetual_tables_options),
446        ));
447        let is_genesis = perpetual_tables
448            .database_is_empty()
449            .expect("Database read should not fail at init.");
450        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
451        let backpressure_manager =
452            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
453
454        let perpetual_tables_for_progress = perpetual_tables.clone();
455        let store = AuthorityStore::open(
456            perpetual_tables,
457            &genesis,
458            &config,
459            &prometheus_registry,
460            migration_tx_data.as_ref(),
461        )
462        .await?;
463
464        let cur_epoch = store.get_recovery_epoch_at_restart()?;
465        let committee = committee_store
466            .get_committee(&cur_epoch)?
467            .expect("Committee of the current epoch must exist");
468        let epoch_start_configuration = store
469            .get_epoch_start_configuration()?
470            .expect("EpochStartConfiguration of the current epoch must exist");
471        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
472        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
473
474        let cache_traits = build_execution_cache(
475            &config.execution_cache_config,
476            &prometheus_registry,
477            &store,
478            backpressure_manager.clone(),
479        );
480
481        let auth_agg = {
482            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
483            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
484            Arc::new(ArcSwap::new(Arc::new(
485                AuthorityAggregator::new_from_epoch_start_state(
486                    epoch_start_configuration.epoch_start_state(),
487                    &committee_store,
488                    safe_client_metrics_base,
489                    auth_agg_metrics,
490                ),
491            )))
492        };
493
494        let chain = match config.chain_override_for_testing {
495            Some(chain) => chain,
496            None => chain_identifier.chain(),
497        };
498
499        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
500        let epoch_store = AuthorityPerEpochStore::new(
501            config.authority_public_key(),
502            committee.clone(),
503            &config.db_path().join("store"),
504            Some(epoch_options.options),
505            EpochMetrics::new(&registry_service.default_registry()),
506            epoch_start_configuration,
507            cache_traits.backing_package_store.clone(),
508            cache_metrics,
509            signature_verifier_metrics,
510            &config.expensive_safety_check_config,
511            (chain_identifier, chain),
512            checkpoint_store
513                .get_highest_executed_checkpoint_seq_number()
514                .expect("checkpoint store read cannot fail")
515                .unwrap_or(0),
516        )?;
517
518        info!("created epoch store");
519
520        replay_log!(
521            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
522            epoch_store.epoch(),
523            epoch_store.protocol_config()
524        );
525
526        // the database is empty at genesis time
527        if is_genesis {
528            info!("checking IOTA conservation at genesis");
529            // When we are opening the db table, the only time when it's safe to
530            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
531            // an epoch and the IOTA conservation check will fail. This also initialize
532            // the expected_network_iota_amount table.
533            cache_traits
534                .reconfig_api
535                .try_expensive_check_iota_conservation(&epoch_store, None)
536                .expect("IOTA conservation check cannot fail at genesis");
537        }
538
539        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
540        let default_buffer_stake = epoch_store
541            .protocol_config()
542            .buffer_stake_for_protocol_upgrade_bps();
543        if effective_buffer_stake != default_buffer_stake {
544            warn!(
545                ?effective_buffer_stake,
546                ?default_buffer_stake,
547                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
548            );
549        }
550
551        checkpoint_store.insert_genesis_checkpoint(
552            genesis.checkpoint(),
553            genesis.checkpoint_contents().clone(),
554            &epoch_store,
555        );
556
557        // Database has everything from genesis, set corrupted key to 0
558        unmark_db_corruption(db_corrupted_path)?;
559
560        info!("creating state sync store");
561        let state_sync_store = RocksDbStore::new(
562            cache_traits.clone(),
563            committee_store.clone(),
564            checkpoint_store.clone(),
565        );
566
567        let index_store = if is_full_node && config.enable_index_processing {
568            info!("creating index store");
569            Some(Arc::new(IndexStore::new(
570                config.db_path().join("indexes"),
571                &prometheus_registry,
572                epoch_store
573                    .protocol_config()
574                    .max_move_identifier_len_as_option(),
575            )))
576        } else {
577            None
578        };
579
580        let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
581            Some(Arc::new(
582                GrpcIndexesStore::new(
583                    config.db_path().join(GRPC_INDEXES_DIR),
584                    Arc::clone(&store),
585                    &checkpoint_store,
586                )
587                .await,
588            ))
589        } else {
590            None
591        };
592
593        // Seed and backfill the `epoch_info` chain before services start. Done
594        // here, not in the store, because the EPOCH_INFO reader lives in
595        // `iota-snapshot`, which depends on `iota-core`.
596        Self::seed_epoch_info(&checkpoint_store, &store, &genesis, chain_identifier)
597            .await
598            .expect("failed to seed the epoch_info chain");
599
600        info!("creating archive reader");
601        // Create network
602        // TODO only configure validators as seed/preferred peers for validators and not
603        // for fullnodes once we've had a chance to re-work fullnode
604        // configuration generation.
605        let archive_readers =
606            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
607        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
608        let (randomness_tx, randomness_rx) = mpsc::channel(
609            config
610                .p2p_config
611                .randomness
612                .clone()
613                .unwrap_or_default()
614                .mailbox_capacity(),
615        );
616        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
617            Self::create_p2p_network(
618                &config,
619                state_sync_store.clone(),
620                chain_identifier,
621                trusted_peer_change_rx,
622                archive_readers.clone(),
623                randomness_tx,
624                &prometheus_registry,
625            )?;
626
627        // We must explicitly send this instead of relying on the initial value to
628        // trigger watch value change, so that state-sync is able to process it.
629        send_trusted_peer_change(
630            &config,
631            &trusted_peer_change_tx,
632            epoch_store.epoch_start_state(),
633        );
634
635        info!("start state archival");
636        // Start archiving local state to remote store
637        let state_archive_handle =
638            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
639                .await?;
640
641        info!("start snapshot upload");
642        // Start uploading state snapshot to remote store
643        let state_snapshot_handle = Self::start_state_snapshot(
644            &config,
645            &prometheus_registry,
646            checkpoint_store.clone(),
647            is_full_node,
648        )?;
649
650        let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
651
652        // Start uploading db checkpoints to remote store
653        info!("start db checkpoint");
654        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
655            &config,
656            &prometheus_registry,
657            state_snapshot_handle.is_some(),
658            Some(checkpoint_progress_tracker.clone()),
659        )?;
660
661        let mut genesis_objects = genesis.objects().to_vec();
662        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
663            genesis_objects.extend(migration_tx_data.get_objects());
664        }
665
666        let authority_name = config.authority_public_key();
667        let validator_tx_finalizer =
668            config
669                .enable_validator_tx_finalizer
670                .then_some(Arc::new(ValidatorTxFinalizer::new(
671                    auth_agg.clone(),
672                    authority_name,
673                    &prometheus_registry,
674                )));
675
676        info!("create authority state");
677        let state = AuthorityState::new(
678            authority_name,
679            secret,
680            config.supported_protocol_versions.unwrap(),
681            store.clone(),
682            cache_traits.clone(),
683            epoch_store.clone(),
684            committee_store.clone(),
685            index_store.clone(),
686            grpc_indexes_store,
687            checkpoint_store.clone(),
688            &prometheus_registry,
689            &genesis_objects,
690            &db_checkpoint_config,
691            config.clone(),
692            archive_readers,
693            validator_tx_finalizer,
694            chain_identifier,
695            pruner_db,
696            Some(checkpoint_progress_tracker.clone()),
697            config.policy_config.clone(),
698            config.firewall_config.clone(),
699        )
700        .await;
701
702        // ensure genesis and migration txs were executed
703        if epoch_store.epoch() == 0 {
704            let genesis_tx = &genesis.transaction();
705            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
706            // Execute genesis transaction
707            Self::execute_transaction_immediately_at_zero_epoch(
708                &state,
709                &epoch_store,
710                genesis_tx,
711                span,
712            )
713            .await;
714
715            // Execute migration transactions if present
716            if let Some(migration_tx_data) = migration_tx_data {
717                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
718                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
719                    Self::execute_transaction_immediately_at_zero_epoch(
720                        &state,
721                        &epoch_store,
722                        tx,
723                        span,
724                    )
725                    .await;
726                }
727            }
728        }
729
730        // Start the loop that receives new randomness and generates transactions for
731        // it.
732        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
733
734        if config
735            .expensive_safety_check_config
736            .enable_secondary_index_checks()
737        {
738            if let Some(indexes) = state.indexes.clone() {
739                iota_core::verify_indexes::verify_indexes(
740                    state.get_global_state_hash_store().as_ref(),
741                    indexes,
742                )
743                .expect("secondary indexes are inconsistent");
744            }
745        }
746
747        let (end_of_epoch_channel, end_of_epoch_receiver) =
748            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
749
750        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
751            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
752                auth_agg.load_full(),
753                state.clone(),
754                end_of_epoch_receiver,
755                &config.db_path(),
756                &prometheus_registry,
757                Some(&config),
758            )))
759        } else {
760            None
761        };
762
763        let http_server = build_http_server(
764            state.clone(),
765            &transaction_orchestrator.clone(),
766            &config,
767            &prometheus_registry,
768        )
769        .await?;
770
771        let global_state_hasher = Arc::new(GlobalStateHasher::new(
772            cache_traits.global_state_hash_store.clone(),
773            GlobalStateHashMetrics::new(&prometheus_registry),
774        ));
775
776        let authority_names_to_peer_ids = epoch_store
777            .epoch_start_state()
778            .get_authority_names_to_peer_ids();
779
780        let network_connection_metrics =
781            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
782
783        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
784
785        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
786            p2p_network.downgrade(),
787            network_connection_metrics,
788            HashMap::new(),
789            None,
790        );
791
792        let connection_monitor_status = ConnectionMonitorStatus {
793            connection_statuses,
794            authority_names_to_peer_ids,
795        };
796
797        let connection_monitor_status = Arc::new(connection_monitor_status);
798        let iota_node_metrics =
799            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
800
801        iota_node_metrics
802            .binary_max_protocol_version
803            .set(ProtocolVersion::MAX.as_u64() as i64);
804        iota_node_metrics
805            .configured_max_protocol_version
806            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
807
808        // Convert transaction orchestrator to executor trait object for gRPC server
809        // Note that the transaction_orchestrator (so as executor) will be None if it is
810        // a validator node or run_with_range is set
811        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
812            transaction_orchestrator
813                .clone()
814                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
815
816        let grpc_server_handle = build_grpc_server(
817            &config,
818            state.clone(),
819            state_sync_store.clone(),
820            executor,
821            &prometheus_registry,
822            server_version,
823        )
824        .await?;
825
826        let validator_components = if state.is_committee_validator(&epoch_store) {
827            let (components, _) = futures::join!(
828                Self::construct_validator_components(
829                    config.clone(),
830                    state.clone(),
831                    committee,
832                    epoch_store.clone(),
833                    checkpoint_store.clone(),
834                    state_sync_handle.clone(),
835                    randomness_handle.clone(),
836                    Arc::downgrade(&global_state_hasher),
837                    backpressure_manager.clone(),
838                    connection_monitor_status.clone(),
839                    &registry_service,
840                ),
841                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
842            );
843            let mut components = components?;
844
845            components.consensus_adapter.submit_recovered(&epoch_store);
846
847            // Start the gRPC server
848            components.validator_server_handle = components.validator_server_handle.start().await;
849
850            Some(components)
851        } else {
852            None
853        };
854
855        // setup shutdown channel
856        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
857
858        let node = Self {
859            config,
860            validator_components: Mutex::new(validator_components),
861            _http_server: http_server,
862            state,
863            transaction_orchestrator,
864            registry_service,
865            metrics: iota_node_metrics,
866
867            _discovery: discovery_handle,
868            state_sync_handle,
869            randomness_handle,
870            checkpoint_store,
871            global_state_hasher: Mutex::new(Some(global_state_hasher)),
872            end_of_epoch_channel,
873            connection_monitor_status,
874            trusted_peer_change_tx,
875            backpressure_manager,
876            checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
877
878            _db_checkpoint_handle: db_checkpoint_handle,
879
880            #[cfg(msim)]
881            sim_state: Default::default(),
882
883            _state_archive_handle: state_archive_handle,
884            _state_snapshot_uploader_handle: state_snapshot_handle,
885            shutdown_channel_tx: shutdown_channel,
886
887            grpc_server_handle: Mutex::new(grpc_server_handle),
888
889            auth_agg,
890        };
891
892        info!("IotaNode started!");
893        let node = Arc::new(node);
894        let node_copy = node.clone();
895        spawn_monitored_task!(async move {
896            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
897            if let Err(error) = result {
898                warn!("Reconfiguration finished with error {:?}", error);
899            }
900        });
901
902        node.checkpoint_progress_tracker
903            .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
904
905        Ok(node)
906    }
907
908    /// Seeds the open epoch's `epoch_info` row and backfills any historical gap
909    /// before live execution resumes. A recognized chain (mainnet, testnet, or
910    /// the current devnet) fills from its formal-snapshot `EPOCH_INFO` first —
911    /// cheap and bulk — then replays local checkpoints for whatever the
912    /// snapshot still lags; an unfillable gap there is a fatal startup
913    /// error, since every node must hold the verified chain since genesis.
914    /// An unrecognized network (no published snapshot) rebuilds from local
915    /// state alone; a residual gap there is only a warning and is left
916    /// unfilled — acceptable since such a node is not expected to produce
917    /// snapshots or serve the epoch gRPC API for those epochs.
918    ///
919    /// TODO: <https://github.com/iotaledger/iota/issues/12028> — once every node
920    /// holds the chain this startup backfill is no longer needed; it reduces to
921    /// seeding the open epoch, or is removed entirely.
922    async fn seed_epoch_info(
923        checkpoint_store: &CheckpointStore,
924        authority_store: &AuthorityStore,
925        genesis: &iota_config::genesis::Genesis,
926        expected_chain_id: ChainIdentifier,
927    ) -> anyhow::Result<()> {
928        let chain = expected_chain_id.chain();
929        let recognized_source = formal_snapshot_read_config(expected_chain_id);
930
931        // Fill any historical gap before live execution resumes; skip the work
932        // entirely when the chain is already complete (the common restart).
933        if checkpoint_store
934            .epoch_info_gap()
935            .map_err(|e| anyhow::anyhow!("checking epoch_info completeness: {e}"))?
936            .is_some()
937        {
938            // snapshot pull (recognized chains only)
939            if let Some(remote_store_config) = &recognized_source {
940                if let Err(e) = Self::backfill_epoch_info_from_snapshot(
941                    checkpoint_store,
942                    remote_store_config,
943                    genesis.committee()?,
944                    genesis.iota_system_object(),
945                    expected_chain_id,
946                )
947                .await
948                {
949                    warn!(
950                        "epoch_info snapshot backfill failed ({e:#}); falling back to \
951                         rebuilding from local checkpoint history"
952                    );
953                }
954            }
955
956            // local replay
957            checkpoint_store
958                .backfill_epoch_info_from_local_history(authority_store)
959                .map_err(|e| anyhow::anyhow!("rebuilding epoch_info from local history: {e}"))?;
960        }
961
962        // Seed the open epoch's row (no-op if already present). Without it the
963        // next executed boundary can't finalize it and the watermark wedges.
964        checkpoint_store
965            .ensure_current_epoch_info(authority_store)
966            .map_err(|e| anyhow::anyhow!("seeding the current epoch_info row: {e}"))?;
967
968        // A residual gap is fatal on a recognized chain — every node must hold
969        // the verified chain since genesis — and only a warning elsewhere.
970        if let Some((highest_indexed, last_executed)) = checkpoint_store
971            .epoch_info_gap()
972            .map_err(|e| anyhow::anyhow!("re-checking epoch_info completeness: {e}"))?
973        {
974            let detail = format!(
975                "the epoch_info chain is incomplete after backfilling (finalized through \
976                 {highest_indexed:?}, executed through epoch {last_executed})"
977            );
978            if recognized_source.is_some() {
979                anyhow::bail!(
980                    "{detail}: the latest published snapshot is older than this node's history \
981                     and the missing epochs' checkpoint data is already pruned locally; retry \
982                     once a newer snapshot is available"
983                );
984            }
985            warn!(
986                "{detail} and {chain:?} has no public formal-snapshot source to backfill from; \
987                 the missing epochs are left unfilled (live indexing only extends the chain \
988                 forward), so this node cannot produce snapshots or serve the epoch gRPC API for \
989                 those epochs"
990            );
991        }
992        Ok(())
993    }
994
995    /// Restores the CheckpointStore's `epoch_info` rows from the given
996    /// formal-snapshot bucket's EPOCH_INFO, verifying the chain against this
997    /// node's trust roots. A wrong-network snapshot is rejected before any
998    /// write. Only seeds what the snapshot covers — the published snapshot can
999    /// lag this node's executed history, so the caller replays local
1000    /// checkpoints for the residual tail.
1001    ///
1002    /// TODO: <https://github.com/iotaledger/iota/issues/12028> — one-time
1003    /// migration aid; remove once every node has backfilled the chain.
1004    async fn backfill_epoch_info_from_snapshot(
1005        checkpoint_store: &CheckpointStore,
1006        remote_store_config: &ObjectStoreConfig,
1007        genesis_committee: Committee,
1008        genesis_system_state: IotaSystemState,
1009        expected_chain_id: ChainIdentifier,
1010    ) -> anyhow::Result<u64> {
1011        let epoch = latest_available_epoch(remote_store_config).await?;
1012        info!("restoring epoch_info from snapshot EPOCH_INFO up to epoch {epoch}");
1013        let (snapshot_chain_id, epoch_info) =
1014            StateSnapshotReaderV1::read_epoch_info_only(epoch, remote_store_config).await?;
1015        let verified = iota_snapshot::verify_epoch_info_chain(
1016            epoch_info,
1017            genesis_committee,
1018            genesis_system_state,
1019            snapshot_chain_id,
1020            expected_chain_id,
1021        )?;
1022        verified.restore_epoch_info(checkpoint_store).await?;
1023        info!("restored epoch_info from snapshot up to epoch {epoch}");
1024        Ok(epoch)
1025    }
1026
1027    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
1028        self.end_of_epoch_channel.subscribe()
1029    }
1030
1031    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1032        self.shutdown_channel_tx.subscribe()
1033    }
1034
1035    pub fn current_epoch_for_testing(&self) -> EpochId {
1036        self.state.current_epoch_for_testing()
1037    }
1038
1039    pub fn db_checkpoint_path(&self) -> PathBuf {
1040        self.config.db_checkpoint_path()
1041    }
1042
1043    // Init reconfig process by starting to reject user certs
1044    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
1045        info!("close_epoch (current epoch = {})", epoch_store.epoch());
1046        self.validator_components
1047            .lock()
1048            .await
1049            .as_ref()
1050            .ok_or_else(|| IotaError::from("Node is not a validator"))?
1051            .consensus_adapter
1052            .close_epoch(epoch_store);
1053        Ok(())
1054    }
1055
1056    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1057        self.state
1058            .clear_override_protocol_upgrade_buffer_stake(epoch)
1059    }
1060
1061    pub fn set_override_protocol_upgrade_buffer_stake(
1062        &self,
1063        epoch: EpochId,
1064        buffer_stake_bps: u64,
1065    ) -> IotaResult {
1066        self.state
1067            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1068    }
1069
1070    // Testing-only API to start epoch close process.
1071    // For production code, please use the non-testing version.
1072    pub async fn close_epoch_for_testing(&self) -> IotaResult {
1073        let epoch_store = self.state.epoch_store_for_testing();
1074        self.close_epoch(&epoch_store).await
1075    }
1076
1077    async fn start_state_archival(
1078        config: &NodeConfig,
1079        prometheus_registry: &Registry,
1080        state_sync_store: RocksDbStore,
1081    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1082        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1083            let local_store_config = ObjectStoreConfig {
1084                object_store: Some(ObjectStoreType::File),
1085                directory: Some(config.archive_path()),
1086                ..Default::default()
1087            };
1088            let archive_writer = ArchiveWriter::new(
1089                local_store_config,
1090                remote_store_config.clone(),
1091                FileCompression::Zstd,
1092                StorageFormat::Blob,
1093                Duration::from_secs(600),
1094                256 * 1024 * 1024,
1095                prometheus_registry,
1096            )
1097            .await?;
1098            Ok(Some(archive_writer.start(state_sync_store).await?))
1099        } else {
1100            Ok(None)
1101        }
1102    }
1103
1104    /// Creates a StateSnapshotUploader and starts it if the StateSnapshotConfig
1105    /// is set.
1106    fn start_state_snapshot(
1107        config: &NodeConfig,
1108        prometheus_registry: &Registry,
1109        checkpoint_store: Arc<CheckpointStore>,
1110        is_full_node: bool,
1111    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1112        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1113            // Snapshot publication is a fullnode-only role.
1114            anyhow::ensure!(
1115                is_full_node,
1116                "Snapshot upload is configured, but this node is a validator. \
1117                 Snapshot publication is only supported on fullnodes. Remove the \
1118                 `state_snapshot_write_config.object_store_config` setting, or move \
1119                 the upload to a fullnode."
1120            );
1121            let snapshot_uploader = StateSnapshotUploader::new(
1122                &config.db_checkpoint_path(),
1123                &config.snapshot_path(),
1124                remote_store_config.clone(),
1125                config.state_snapshot_write_config.concurrency,
1126                60,
1127                prometheus_registry,
1128                checkpoint_store,
1129            )?;
1130            Ok(Some(snapshot_uploader.start()))
1131        } else {
1132            Ok(None)
1133        }
1134    }
1135
1136    fn start_db_checkpoint(
1137        config: &NodeConfig,
1138        prometheus_registry: &Registry,
1139        state_snapshot_enabled: bool,
1140        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
1141    ) -> Result<(
1142        DBCheckpointConfig,
1143        Option<tokio::sync::broadcast::Sender<()>>,
1144    )> {
1145        let checkpoint_path = Some(
1146            config
1147                .db_checkpoint_config
1148                .checkpoint_path
1149                .clone()
1150                .unwrap_or_else(|| config.db_checkpoint_path()),
1151        );
1152        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1153            DBCheckpointConfig {
1154                checkpoint_path,
1155                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1156                    true
1157                } else {
1158                    config
1159                        .db_checkpoint_config
1160                        .perform_db_checkpoints_at_epoch_end
1161                },
1162                ..config.db_checkpoint_config.clone()
1163            }
1164        } else {
1165            config.db_checkpoint_config.clone()
1166        };
1167
1168        match (
1169            db_checkpoint_config.object_store_config.as_ref(),
1170            state_snapshot_enabled,
1171        ) {
1172            // If db checkpoint config object store not specified but
1173            // state snapshot object store is specified, create handler
1174            // anyway for marking db checkpoints as completed so that they
1175            // can be uploaded as state snapshots.
1176            (None, false) => Ok((db_checkpoint_config, None)),
1177            (_, _) => {
1178                let handler = DBCheckpointHandler::new(
1179                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1180                    db_checkpoint_config.object_store_config.as_ref(),
1181                    60,
1182                    db_checkpoint_config
1183                        .prune_and_compact_before_upload
1184                        .unwrap_or(true),
1185                    config.authority_store_pruning_config.clone(),
1186                    prometheus_registry,
1187                    state_snapshot_enabled,
1188                    checkpoint_progress_tracker,
1189                )?;
1190                Ok((
1191                    db_checkpoint_config,
1192                    Some(DBCheckpointHandler::start(handler)),
1193                ))
1194            }
1195        }
1196    }
1197
1198    fn create_p2p_network(
1199        config: &NodeConfig,
1200        state_sync_store: RocksDbStore,
1201        chain_identifier: ChainIdentifier,
1202        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1203        archive_readers: ArchiveReaderBalancer,
1204        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1205        prometheus_registry: &Registry,
1206    ) -> Result<(
1207        Network,
1208        discovery::Handle,
1209        state_sync::Handle,
1210        randomness::Handle,
1211    )> {
1212        let (state_sync, state_sync_server) = state_sync::Builder::new()
1213            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1214            .store(state_sync_store)
1215            .archive_readers(archive_readers)
1216            .with_metrics(prometheus_registry)
1217            .build();
1218
1219        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1220            .config(config.p2p_config.clone())
1221            .build();
1222
1223        let (randomness, randomness_router) =
1224            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1225                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1226                .with_metrics(prometheus_registry)
1227                .build();
1228
1229        let p2p_network = {
1230            let routes = anemo::Router::new()
1231                .add_rpc_service(discovery_server)
1232                .add_rpc_service(state_sync_server);
1233            let routes = routes.merge(randomness_router);
1234
1235            let inbound_network_metrics =
1236                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1237            let outbound_network_metrics =
1238                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1239
1240            let service = ServiceBuilder::new()
1241                .layer(
1242                    TraceLayer::new_for_server_errors()
1243                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1244                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1245                )
1246                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1247                    Arc::new(inbound_network_metrics),
1248                    config.p2p_config.excessive_message_size(),
1249                )))
1250                .service(routes);
1251
1252            let outbound_layer = ServiceBuilder::new()
1253                .layer(
1254                    TraceLayer::new_for_client_and_server_errors()
1255                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1256                        .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1257                )
1258                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1259                    Arc::new(outbound_network_metrics),
1260                    config.p2p_config.excessive_message_size(),
1261                )))
1262                .into_inner();
1263
1264            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1265            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1266            // many staking events in the epoch change txn.
1267            anemo_config.max_frame_size = Some(1 << 30);
1268
1269            // Set a higher default value for socket send/receive buffers if not already
1270            // configured.
1271            let mut quic_config = anemo_config.quic.unwrap_or_default();
1272            if quic_config.socket_send_buffer_size.is_none() {
1273                quic_config.socket_send_buffer_size = Some(20 << 20);
1274            }
1275            if quic_config.socket_receive_buffer_size.is_none() {
1276                quic_config.socket_receive_buffer_size = Some(20 << 20);
1277            }
1278            quic_config.allow_failed_socket_buffer_size_setting = true;
1279
1280            // Set high-performance defaults for quinn transport.
1281            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1282            if quic_config.max_concurrent_bidi_streams.is_none() {
1283                quic_config.max_concurrent_bidi_streams = Some(500);
1284            }
1285            if quic_config.max_concurrent_uni_streams.is_none() {
1286                quic_config.max_concurrent_uni_streams = Some(500);
1287            }
1288            if quic_config.stream_receive_window.is_none() {
1289                quic_config.stream_receive_window = Some(100 << 20);
1290            }
1291            if quic_config.receive_window.is_none() {
1292                quic_config.receive_window = Some(200 << 20);
1293            }
1294            if quic_config.send_window.is_none() {
1295                quic_config.send_window = Some(200 << 20);
1296            }
1297            if quic_config.crypto_buffer_size.is_none() {
1298                quic_config.crypto_buffer_size = Some(1 << 20);
1299            }
1300            if quic_config.max_idle_timeout_ms.is_none() {
1301                quic_config.max_idle_timeout_ms = Some(30_000);
1302            }
1303            if quic_config.keep_alive_interval_ms.is_none() {
1304                quic_config.keep_alive_interval_ms = Some(5_000);
1305            }
1306            anemo_config.quic = Some(quic_config);
1307
1308            let server_name = format!("iota-{chain_identifier}");
1309            let network = Network::bind(config.p2p_config.listen_address)
1310                .server_name(&server_name)
1311                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1312                .config(anemo_config)
1313                .outbound_request_layer(outbound_layer)
1314                .start(service)?;
1315            info!(
1316                server_name = server_name,
1317                "P2p network started on {}",
1318                network.local_addr()
1319            );
1320
1321            network
1322        };
1323
1324        let discovery_handle =
1325            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1326        let state_sync_handle = state_sync.start(p2p_network.clone());
1327        let randomness_handle = randomness.start(p2p_network.clone());
1328
1329        Ok((
1330            p2p_network,
1331            discovery_handle,
1332            state_sync_handle,
1333            randomness_handle,
1334        ))
1335    }
1336
1337    /// Asynchronously constructs and initializes the components necessary for
1338    /// the validator node.
1339    async fn construct_validator_components(
1340        config: NodeConfig,
1341        state: Arc<AuthorityState>,
1342        committee: Arc<Committee>,
1343        epoch_store: Arc<AuthorityPerEpochStore>,
1344        checkpoint_store: Arc<CheckpointStore>,
1345        state_sync_handle: state_sync::Handle,
1346        randomness_handle: randomness::Handle,
1347        global_state_hasher: Weak<GlobalStateHasher>,
1348        backpressure_manager: Arc<BackpressureManager>,
1349        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1350        registry_service: &RegistryService,
1351    ) -> Result<ValidatorComponents> {
1352        let mut config_clone = config.clone();
1353        let consensus_config = config_clone
1354            .consensus_config
1355            .as_mut()
1356            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1357        let validator_registry = Registry::new();
1358        let validator_registry_id = registry_service.add(validator_registry.clone());
1359
1360        let client = Arc::new(UpdatableConsensusClient::new());
1361        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1362            &committee,
1363            consensus_config,
1364            state.name,
1365            connection_monitor_status.clone(),
1366            &validator_registry,
1367            client.clone(),
1368            checkpoint_store.clone(),
1369        ));
1370        let consensus_manager = Arc::new(ConsensusManager::new(
1371            &config,
1372            consensus_config,
1373            registry_service,
1374            &validator_registry,
1375            client,
1376        ));
1377
1378        // This only gets started up once, not on every epoch. (Make call to remove
1379        // every epoch.)
1380        let consensus_store_pruner = ConsensusStorePruner::new(
1381            consensus_manager.get_storage_base_path(),
1382            consensus_config.db_retention_epochs(),
1383            consensus_config.db_pruner_period(),
1384            &validator_registry,
1385        );
1386
1387        let soft_locks = Arc::new(if config.enable_soft_locking {
1388            PreConsensusSoftLocks::new()
1389        } else {
1390            info!("pre-consensus soft-locking disabled via node config");
1391            PreConsensusSoftLocks::disabled()
1392        });
1393
1394        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1395        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1396        let validator_service_metrics = Arc::new(ValidatorServiceMetrics::new(&validator_registry));
1397
1398        // Spawn the soft-lock sweep once for the lifetime of this validator
1399        // instance. The task holds only a `Weak<PreConsensusSoftLocks>` so it
1400        // stops itself automatically: each iteration it tries to upgrade the
1401        // weak reference, and when all strong `Arc` owners have been dropped
1402        // (i.e. `ValidatorComponents` is destructured and the old epoch store
1403        // is released after an epoch transition that removes us from the
1404        // committee) the upgrade returns `None` and the loop exits. No explicit
1405        // `abort()` is needed. The same `Arc<PreConsensusSoftLocks>` is reused
1406        // across epoch transitions (see `start_epoch_specific_validator_components`),
1407        // so the task keeps running uninterrupted while the node remains a validator.
1408        let soft_lock_sweep_handle = PreConsensusSoftLocks::spawn_sweep(
1409            Arc::downgrade(&soft_locks),
1410            validator_service_metrics.clone(),
1411        );
1412
1413        let validator_server_handle = Self::start_grpc_validator_service(
1414            &config,
1415            state.clone(),
1416            consensus_adapter.clone(),
1417            &validator_registry,
1418            soft_locks.clone(),
1419            validator_service_metrics.clone(),
1420        )
1421        .await?;
1422
1423        // Starts an overload monitor that monitors the execution of the authority.
1424        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1425        let validator_overload_monitor_handle = if config
1426            .authority_overload_config
1427            .max_load_shedding_percentage
1428            > 0
1429        {
1430            let authority_state = Arc::downgrade(&state);
1431            let overload_config = config.authority_overload_config.clone();
1432            fail_point!("starting_overload_monitor");
1433            Some(spawn_monitored_task!(overload_monitor(
1434                authority_state,
1435                overload_config,
1436            )))
1437        } else {
1438            None
1439        };
1440
1441        // Starts a monitor that periodically refreshes the
1442        // `consensus_queue_load_shedding_percentage` metric. Without this, the
1443        // metric goes stale once gRPC traffic stops (the only other update
1444        // path is `AuthorityState::check_consensus_queue_graduated_limits`, called on
1445        // each inbound tx). Used in the certificate-less (P-COOL)
1446        // mode.
1447        let consensus_queue_overload_monitor_handle =
1448            if epoch_store.protocol_config().enable_pcool_flow() {
1449                let consensus_queue_monitor_authority_state = Arc::downgrade(&state);
1450                let consensus_queue_monitor_consensus_adapter = Arc::downgrade(&consensus_adapter);
1451                let consensus_queue_monitor_interval =
1452                    config.authority_overload_config.overload_monitor_interval;
1453                Some(spawn_monitored_task!(consensus_queue_overload_monitor(
1454                    consensus_queue_monitor_authority_state,
1455                    consensus_queue_monitor_consensus_adapter,
1456                    consensus_queue_monitor_interval,
1457                )))
1458            } else {
1459                None
1460            };
1461
1462        Self::start_epoch_specific_validator_components(
1463            &config,
1464            state.clone(),
1465            consensus_adapter,
1466            checkpoint_store,
1467            epoch_store,
1468            state_sync_handle,
1469            randomness_handle,
1470            consensus_manager,
1471            consensus_store_pruner,
1472            global_state_hasher,
1473            backpressure_manager,
1474            soft_locks,
1475            validator_server_handle,
1476            validator_overload_monitor_handle,
1477            consensus_queue_overload_monitor_handle,
1478            soft_lock_sweep_handle,
1479            checkpoint_metrics,
1480            iota_tx_validator_metrics,
1481            validator_registry_id,
1482        )
1483        .await
1484    }
1485
1486    /// Initializes and starts components specific to the current
1487    /// epoch for the validator node.
1488    async fn start_epoch_specific_validator_components(
1489        config: &NodeConfig,
1490        state: Arc<AuthorityState>,
1491        consensus_adapter: Arc<ConsensusAdapter>,
1492        checkpoint_store: Arc<CheckpointStore>,
1493        epoch_store: Arc<AuthorityPerEpochStore>,
1494        state_sync_handle: state_sync::Handle,
1495        randomness_handle: randomness::Handle,
1496        consensus_manager: Arc<ConsensusManager>,
1497        consensus_store_pruner: ConsensusStorePruner,
1498        global_state_hasher: Weak<GlobalStateHasher>,
1499        backpressure_manager: Arc<BackpressureManager>,
1500        soft_locks: Arc<PreConsensusSoftLocks>,
1501        validator_server_handle: SpawnOnce,
1502        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1503        consensus_queue_overload_monitor_handle: Option<JoinHandle<()>>,
1504        soft_lock_sweep_handle: JoinHandle<()>,
1505        checkpoint_metrics: Arc<CheckpointMetrics>,
1506        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1507        validator_registry_id: RegistryID,
1508    ) -> Result<ValidatorComponents> {
1509        let checkpoint_service = Self::build_checkpoint_service(
1510            config,
1511            consensus_adapter.clone(),
1512            checkpoint_store.clone(),
1513            epoch_store.clone(),
1514            state.clone(),
1515            state_sync_handle,
1516            global_state_hasher,
1517            checkpoint_metrics.clone(),
1518        );
1519
1520        // create a new map that gets injected into both the consensus handler and the
1521        // consensus adapter the consensus handler will write values forwarded
1522        // from consensus, and the consensus adapter will read the values to
1523        // make decisions about which validator submits a transaction to consensus
1524        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1525
1526        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1527
1528        // Wire pre-consensus soft locks to the epoch store so that
1529        // post-consensus processing can release locks once permanent locks are
1530        // quarantined. Clear stale locks from the previous epoch and spawn a
1531        // background sweep task.
1532        soft_locks.clear();
1533        epoch_store.set_soft_locks(soft_locks.clone());
1534
1535        let randomness_manager = RandomnessManager::try_new(
1536            Arc::downgrade(&epoch_store),
1537            Box::new(consensus_adapter.clone()),
1538            randomness_handle,
1539            config.authority_key_pair(),
1540        )
1541        .await;
1542        if let Some(randomness_manager) = randomness_manager {
1543            epoch_store
1544                .set_randomness_manager(randomness_manager)
1545                .await?;
1546        }
1547
1548        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1549            state.clone(),
1550            checkpoint_service.clone(),
1551            epoch_store.clone(),
1552            low_scoring_authorities,
1553            backpressure_manager,
1554        );
1555
1556        info!("Starting consensus manager asynchronously");
1557
1558        // Spawn consensus startup asynchronously to avoid blocking other components
1559        tokio::spawn({
1560            let config = config.clone();
1561            let epoch_store = epoch_store.clone();
1562            let iota_tx_validator = IotaTxValidator::new(
1563                epoch_store.clone(),
1564                checkpoint_service.clone(),
1565                state.transaction_manager().clone(),
1566                iota_tx_validator_metrics.clone(),
1567            );
1568            let consensus_manager = consensus_manager.clone();
1569            async move {
1570                consensus_manager
1571                    .start(
1572                        &config,
1573                        epoch_store,
1574                        consensus_handler_initializer,
1575                        iota_tx_validator,
1576                    )
1577                    .await;
1578            }
1579        });
1580        let replay_waiter = consensus_manager.replay_waiter();
1581
1582        info!("Spawning checkpoint service");
1583        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1584            None
1585        } else {
1586            Some(replay_waiter)
1587        };
1588        let checkpoint_service_tasks = checkpoint_service.spawn(replay_waiter).await;
1589
1590        let overload_notifier_handle = Self::start_overload_notifier(
1591            config,
1592            state.clone(),
1593            epoch_store.clone(),
1594            consensus_adapter.clone(),
1595        );
1596
1597        Ok(ValidatorComponents {
1598            validator_server_handle,
1599            validator_overload_monitor_handle,
1600            consensus_queue_overload_monitor_handle,
1601            soft_lock_sweep_handle,
1602            overload_notifier_handle,
1603            consensus_manager,
1604            consensus_store_pruner,
1605            consensus_adapter,
1606            soft_locks,
1607            checkpoint_service_tasks,
1608            checkpoint_metrics,
1609            iota_tx_validator_metrics,
1610            validator_registry_id,
1611        })
1612    }
1613
1614    /// Starts the checkpoint service for the validator node, initializing
1615    /// necessary components and settings.
1616    /// The function ensures proper initialization of the checkpoint service,
1617    /// preparing it to handle checkpoint creation and submission to consensus,
1618    /// while also setting up the necessary monitoring and synchronization
1619    /// mechanisms.
1620    fn build_checkpoint_service(
1621        config: &NodeConfig,
1622        consensus_adapter: Arc<ConsensusAdapter>,
1623        checkpoint_store: Arc<CheckpointStore>,
1624        epoch_store: Arc<AuthorityPerEpochStore>,
1625        state: Arc<AuthorityState>,
1626        state_sync_handle: state_sync::Handle,
1627        global_state_hasher: Weak<GlobalStateHasher>,
1628        checkpoint_metrics: Arc<CheckpointMetrics>,
1629    ) -> Arc<CheckpointService> {
1630        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1631        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1632
1633        debug!(
1634            "Starting checkpoint service with epoch start timestamp {}
1635            and epoch duration {}",
1636            epoch_start_timestamp_ms, epoch_duration_ms
1637        );
1638
1639        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1640            sender: consensus_adapter,
1641            signer: state.secret.clone(),
1642            authority: config.authority_public_key(),
1643            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1644                .checked_add(epoch_duration_ms)
1645                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1646            metrics: checkpoint_metrics.clone(),
1647        });
1648
1649        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1650        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1651        let max_checkpoint_size_bytes =
1652            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1653
1654        CheckpointService::build(
1655            state.clone(),
1656            checkpoint_store,
1657            epoch_store,
1658            state.get_transaction_cache_reader().clone(),
1659            global_state_hasher,
1660            checkpoint_output,
1661            Box::new(certified_checkpoint_output),
1662            checkpoint_metrics,
1663            max_tx_per_checkpoint,
1664            max_checkpoint_size_bytes,
1665        )
1666    }
1667
1668    fn construct_consensus_adapter(
1669        committee: &Committee,
1670        consensus_config: &ConsensusConfig,
1671        authority: AuthorityName,
1672        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1673        prometheus_registry: &Registry,
1674        consensus_client: Arc<dyn ConsensusClient>,
1675        checkpoint_store: Arc<CheckpointStore>,
1676    ) -> ConsensusAdapter {
1677        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1678        // The consensus adapter allows the authority to send user certificates through
1679        // consensus.
1680
1681        ConsensusAdapter::new(
1682            consensus_client,
1683            checkpoint_store,
1684            authority,
1685            connection_monitor_status,
1686            consensus_config.max_pending_transactions(),
1687            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1688            consensus_config.max_submit_position,
1689            consensus_config.submit_delay_step_override(),
1690            ca_metrics,
1691            consensus_config.graduated_load_shedding_soft_limit_pct(),
1692        )
1693    }
1694
1695    async fn start_grpc_validator_service(
1696        config: &NodeConfig,
1697        state: Arc<AuthorityState>,
1698        consensus_adapter: Arc<ConsensusAdapter>,
1699        prometheus_registry: &Registry,
1700        soft_locks: Arc<PreConsensusSoftLocks>,
1701        validator_service_metrics: Arc<ValidatorServiceMetrics>,
1702    ) -> Result<SpawnOnce> {
1703        let validator_service = ValidatorService::new(
1704            state,
1705            consensus_adapter,
1706            validator_service_metrics,
1707            config.policy_config.clone().map(|p| p.client_id_source),
1708            soft_locks,
1709        );
1710
1711        let mut server_conf = iota_network_stack::config::Config::new();
1712        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1713        server_conf.load_shed = config.grpc_load_shed;
1714        let server_builder =
1715            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1716                .add_service(ValidatorServer::new(validator_service.clone()))
1717                .add_service(ValidatorV2Server::new(validator_service.clone()))
1718                .add_service(ValidatorPeerServer::new(validator_service));
1719
1720        let tls_config = iota_tls::create_rustls_server_config(
1721            config.network_key_pair().copy().private(),
1722            IOTA_TLS_SERVER_NAME.to_string(),
1723        );
1724
1725        let network_address = config.network_address().clone();
1726
1727        let bind_future = async move {
1728            let server = server_builder
1729                .bind(&network_address, Some(tls_config))
1730                .await
1731                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1732
1733            let local_addr = server.local_addr();
1734            info!("Listening to traffic on {local_addr}");
1735
1736            Ok(server)
1737        };
1738
1739        Ok(SpawnOnce::new(bind_future))
1740    }
1741
1742    /// Re-executes pending consensus certificates, which may not have been
1743    /// committed to disk before the node restarted. This is necessary for
1744    /// the following reasons:
1745    ///
1746    /// 1. For any transaction for which we returned signed effects to a client,
1747    ///    we must ensure that we have re-executed the transaction before we
1748    ///    begin accepting grpc requests. Otherwise we would appear to have
1749    ///    forgotten about the transaction.
1750    /// 2. While this is running, we are concurrently waiting for all previously
1751    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1752    ///    either direction (from checkpointed consensus transactions to pending
1753    ///    consensus transactions, or vice versa), we must re-execute pending
1754    ///    consensus transactions to ensure that both processes can complete.
1755    /// 3. Also note that for any pending consensus transactions for which we
1756    ///    wrote a signed effects digest to disk, we must re-execute using that
1757    ///    digest as the expected effects digest, to ensure that we cannot
1758    ///    arrive at different effects than what we previously signed.
1759    async fn reexecute_pending_consensus_certs(
1760        epoch_store: &Arc<AuthorityPerEpochStore>,
1761        state: &Arc<AuthorityState>,
1762    ) {
1763        let mut pending_consensus_certificates = Vec::new();
1764        let mut additional_certs = Vec::new();
1765
1766        for tx in epoch_store.get_all_pending_consensus_transactions() {
1767            match tx.kind {
1768                // TODO: what to do with UserTransactionV1 here? It seems like this only applies to
1769                //  optimistically executed owned-object transactions that possibly didn't go
1770                //  through  consensus before the node restarted. UserTransactionsV1
1771                //  always needs to go  through consensus, so it will be replayed
1772                //  there, just like shared object  transactions.
1773                //
1774                // Shared object txns
1775                // cannot be re-executed at this  point, because we must wait for
1776                // consensus replay to assign shared  object versions.
1777                ConsensusTransactionKind::CertifiedTransaction(tx)
1778                    if !tx.contains_shared_object() =>
1779                {
1780                    let tx = *tx;
1781                    // new_unchecked is safe because we never submit a transaction to consensus
1782                    // without verifying it
1783                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1784                        VerifiedCertificate::new_unchecked(tx),
1785                    );
1786                    // we only need to re-execute if we previously signed the effects (which
1787                    // indicates we returned the effects to a client).
1788                    if let Some(fx_digest) = epoch_store
1789                        .get_signed_effects_digest(tx.digest())
1790                        .expect("db error")
1791                    {
1792                        pending_consensus_certificates.push((tx, fx_digest));
1793                    } else {
1794                        additional_certs.push(tx);
1795                    }
1796                }
1797                _ => (),
1798            }
1799        }
1800
1801        let digests = pending_consensus_certificates
1802            .iter()
1803            .map(|(tx, _)| *tx.digest())
1804            .collect::<Vec<_>>();
1805
1806        info!(
1807            "reexecuting {} pending consensus certificates: {:?}",
1808            digests.len(),
1809            digests
1810        );
1811
1812        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1813        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1814
1815        // If this times out, the validator will still almost certainly start up fine.
1816        // But, it is possible that it may temporarily "forget" about
1817        // transactions that it had previously executed. This could confuse
1818        // clients in some circumstances. However, the transactions are still in
1819        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1820        let timeout = if cfg!(msim) { 120 } else { 60 };
1821        if tokio::time::timeout(
1822            std::time::Duration::from_secs(timeout),
1823            state
1824                .get_transaction_cache_reader()
1825                .try_notify_read_executed_effects_digests(&digests),
1826        )
1827        .await
1828        .is_err()
1829        {
1830            // Log all the digests that were not executed to help debugging.
1831            if let Ok(executed_effects_digests) = state
1832                .get_transaction_cache_reader()
1833                .try_multi_get_executed_effects_digests(&digests)
1834            {
1835                let pending_digests = digests
1836                    .iter()
1837                    .zip(executed_effects_digests.iter())
1838                    .filter_map(|(digest, executed_effects_digest)| {
1839                        if executed_effects_digest.is_none() {
1840                            Some(digest)
1841                        } else {
1842                            None
1843                        }
1844                    })
1845                    .collect::<Vec<_>>();
1846                debug_fatal!(
1847                    "Timed out waiting for effects digests to be executed: {:?}",
1848                    pending_digests
1849                );
1850            } else {
1851                debug_fatal!(
1852                    "Timed out waiting for effects digests to be executed, digests not found"
1853                );
1854            }
1855        }
1856    }
1857
1858    pub fn state(&self) -> Arc<AuthorityState> {
1859        self.state.clone()
1860    }
1861
1862    // Only used for testing because of how epoch store is loaded.
1863    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1864        self.state.reference_gas_price_for_testing()
1865    }
1866
1867    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1868        self.state.committee_store().clone()
1869    }
1870
1871    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1872    // self.state.db()
1873    // }
1874
1875    /// Clone an AuthorityAggregator currently used in this node's
1876    /// QuorumDriver, if the node is a fullnode. After reconfig,
1877    /// QuorumDriver builds a new AuthorityAggregator. The caller
1878    /// of this function will mostly likely want to call this again
1879    /// to get a fresh one.
1880    pub fn clone_authority_aggregator(
1881        &self,
1882    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1883        self.transaction_orchestrator
1884            .as_ref()
1885            .map(|to| to.clone_authority_aggregator())
1886    }
1887
1888    pub fn transaction_orchestrator(
1889        &self,
1890    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1891        self.transaction_orchestrator.clone()
1892    }
1893
1894    pub fn subscribe_to_transaction_orchestrator_effects(
1895        &self,
1896    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1897        self.transaction_orchestrator
1898            .as_ref()
1899            .map(|to| to.subscribe_to_effects_queue())
1900            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1901    }
1902
1903    /// This function awaits the completion of checkpoint execution of the
1904    /// current epoch, after which it initiates reconfiguration of the
1905    /// entire system. This function also handles role changes for the node when
1906    /// epoch changes and advertises capabilities to the committee if the node
1907    /// is a validator.
1908    pub async fn monitor_reconfiguration(
1909        self: Arc<Self>,
1910        mut epoch_store: Arc<AuthorityPerEpochStore>,
1911    ) -> Result<()> {
1912        let checkpoint_executor_metrics =
1913            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1914
1915        loop {
1916            let mut hasher_guard = self.global_state_hasher.lock().await;
1917            let hasher = hasher_guard.take().unwrap();
1918            info!(
1919                "Creating checkpoint executor for epoch {}",
1920                epoch_store.epoch()
1921            );
1922
1923            // Create closures that handle gRPC type conversion
1924            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1925                guard.as_ref().map(|handle| {
1926                    let tx = handle.checkpoint_data_broadcaster().clone();
1927                    Box::new(move |data: &CheckpointData| {
1928                        tx.send_traced(data);
1929                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1930                })
1931            } else {
1932                None
1933            };
1934
1935            let checkpoint_executor = CheckpointExecutor::new(
1936                epoch_store.clone(),
1937                self.checkpoint_store.clone(),
1938                self.state.clone(),
1939                hasher.clone(),
1940                self.backpressure_manager.clone(),
1941                self.config.checkpoint_executor_config.clone(),
1942                checkpoint_executor_metrics.clone(),
1943                data_sender,
1944                Some(self.checkpoint_progress_tracker.clone()),
1945            );
1946
1947            let run_with_range = self.config.run_with_range;
1948
1949            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1950
1951            // Update the current protocol version metric.
1952            self.metrics
1953                .current_protocol_version
1954                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1955
1956            // Advertise capabilities to committee, if we are a validator.
1957            if let Some(components) = &*self.validator_components.lock().await {
1958                // TODO: without this sleep, the consensus message is not delivered reliably.
1959                tokio::time::sleep(Duration::from_millis(1)).await;
1960
1961                let config = cur_epoch_store.protocol_config();
1962                let binary_config = to_binary_config(config);
1963                let transaction = ConsensusTransaction::new_capability_notification_v1(
1964                    AuthorityCapabilitiesV1::new(
1965                        self.state.name,
1966                        cur_epoch_store.get_chain(),
1967                        self.config
1968                            .supported_protocol_versions
1969                            .expect("Supported versions should be populated")
1970                            // no need to send digests of versions less than the current version
1971                            .truncate_below(config.version),
1972                        self.state
1973                            .get_available_system_packages(&binary_config)
1974                            .await,
1975                    ),
1976                );
1977                info!(?transaction, "submitting capabilities to consensus");
1978                components
1979                    .consensus_adapter
1980                    .submit(transaction, None, &cur_epoch_store)?;
1981            } else if self.state.is_active_validator(&cur_epoch_store)
1982                && cur_epoch_store
1983                    .protocol_config()
1984                    .track_non_committee_eligible_validators()
1985            {
1986                // Send signed capabilities to committee validators if we are a non-committee
1987                // validator in a separate task to not block the caller. Sending is done only if
1988                // the feature flag supporting it is enabled.
1989                let epoch_store = cur_epoch_store.clone();
1990                let node_clone = self.clone();
1991                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1992                    node_clone
1993                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1994                        .instrument(trace_span!(
1995                            "send_signed_capability_notification_to_committee_with_retry"
1996                        ))
1997                        .await;
1998                }));
1999            }
2000
2001            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
2002
2003            if stop_condition == StopReason::RunWithRangeCondition {
2004                IotaNode::shutdown(&self).await;
2005                self.shutdown_channel_tx
2006                    .send(run_with_range)
2007                    .expect("RunWithRangeCondition met but failed to send shutdown message");
2008                return Ok(());
2009            }
2010
2011            // Safe to call because we are in the middle of reconfiguration.
2012            let latest_system_state = self
2013                .state
2014                .get_object_cache_reader()
2015                .try_get_iota_system_state_object_unsafe()
2016                .expect("Read IOTA System State object cannot fail");
2017
2018            #[cfg(msim)]
2019            if !self
2020                .sim_state
2021                .sim_safe_mode_expected
2022                .load(Ordering::Relaxed)
2023            {
2024                debug_assert!(!latest_system_state.safe_mode());
2025            }
2026
2027            #[cfg(not(msim))]
2028            debug_assert!(!latest_system_state.safe_mode());
2029
2030            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
2031                if self.state.is_fullnode(&cur_epoch_store) {
2032                    warn!(
2033                        "Failed to send end of epoch notification to subscriber: {:?}",
2034                        err
2035                    );
2036                }
2037            }
2038
2039            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
2040            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
2041
2042            self.auth_agg.store(Arc::new(
2043                self.auth_agg
2044                    .load()
2045                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
2046            ));
2047
2048            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
2049            let next_epoch = next_epoch_committee.epoch();
2050            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
2051
2052            info!(
2053                next_epoch,
2054                "Finished executing all checkpoints in epoch. About to reconfigure the system."
2055            );
2056
2057            fail_point_async!("reconfig_delay");
2058
2059            // We save the connection monitor status map regardless of validator / fullnode
2060            // status so that we don't need to restart the connection monitor
2061            // every epoch. Update the mappings that will be used by the
2062            // consensus adapter if it exists or is about to be created.
2063            let authority_names_to_peer_ids =
2064                new_epoch_start_state.get_authority_names_to_peer_ids();
2065            self.connection_monitor_status
2066                .update_mapping_for_epoch(authority_names_to_peer_ids);
2067
2068            cur_epoch_store.record_epoch_reconfig_start_time_metric();
2069
2070            send_trusted_peer_change(
2071                &self.config,
2072                &self.trusted_peer_change_tx,
2073                &new_epoch_start_state,
2074            );
2075
2076            let mut validator_components_lock_guard = self.validator_components.lock().await;
2077
2078            // The following code handles 4 different cases, depending on whether the node
2079            // was a validator in the previous epoch, and whether the node is a validator
2080            // in the new epoch.
2081            let new_epoch_store = self
2082                .reconfigure_state(
2083                    &self.state,
2084                    &cur_epoch_store,
2085                    next_epoch_committee.clone(),
2086                    new_epoch_start_state,
2087                    hasher.clone(),
2088                )
2089                .await?;
2090
2091            let new_validator_components = if let Some(ValidatorComponents {
2092                validator_server_handle,
2093                validator_overload_monitor_handle,
2094                consensus_queue_overload_monitor_handle,
2095                soft_lock_sweep_handle,
2096                overload_notifier_handle,
2097                consensus_manager,
2098                consensus_store_pruner,
2099                consensus_adapter,
2100                soft_locks,
2101                mut checkpoint_service_tasks,
2102                checkpoint_metrics,
2103                iota_tx_validator_metrics,
2104                validator_registry_id,
2105            }) = validator_components_lock_guard.take()
2106            {
2107                info!("Reconfiguring the validator.");
2108                // Cancel the old overload notifier task so a new one can be
2109                // started for the next epoch.
2110                if let Some(handle) = overload_notifier_handle {
2111                    handle.abort();
2112                }
2113                // Cancel the old checkpoint service tasks.
2114                // Waiting for checkpoint builder to finish gracefully is not possible, because
2115                // it may wait on transactions while consensus on peers have
2116                // already shut down.
2117                checkpoint_service_tasks.abort_all();
2118                while let Some(result) = checkpoint_service_tasks.join_next().await {
2119                    if let Err(err) = result {
2120                        if err.is_panic() {
2121                            std::panic::resume_unwind(err.into_panic());
2122                        }
2123                        warn!("Error in checkpoint service task: {:?}", err);
2124                    }
2125                }
2126                info!("Checkpoint service has shut down.");
2127
2128                consensus_manager.shutdown().await;
2129                info!("Consensus has shut down.");
2130
2131                info!("Epoch store finished reconfiguration.");
2132
2133                // No other components should be holding a strong reference to state hasher
2134                // at this point. Confirm here before we swap in the new hasher.
2135                let global_state_hasher_metrics = Arc::into_inner(hasher)
2136                    .expect("Object state hasher should have no other references at this point")
2137                    .metrics();
2138                let new_hasher = Arc::new(GlobalStateHasher::new(
2139                    self.state.get_global_state_hash_store().clone(),
2140                    global_state_hasher_metrics,
2141                ));
2142                let weak_hasher = Arc::downgrade(&new_hasher);
2143                *hasher_guard = Some(new_hasher);
2144
2145                consensus_store_pruner.prune(next_epoch).await;
2146
2147                if self.state.is_committee_validator(&new_epoch_store) {
2148                    // Only restart consensus if this node is still a validator in the new epoch.
2149                    Some(
2150                        Self::start_epoch_specific_validator_components(
2151                            &self.config,
2152                            self.state.clone(),
2153                            consensus_adapter,
2154                            self.checkpoint_store.clone(),
2155                            new_epoch_store.clone(),
2156                            self.state_sync_handle.clone(),
2157                            self.randomness_handle.clone(),
2158                            consensus_manager,
2159                            consensus_store_pruner,
2160                            weak_hasher,
2161                            self.backpressure_manager.clone(),
2162                            soft_locks,
2163                            validator_server_handle,
2164                            validator_overload_monitor_handle,
2165                            consensus_queue_overload_monitor_handle,
2166                            soft_lock_sweep_handle,
2167                            checkpoint_metrics,
2168                            iota_tx_validator_metrics,
2169                            validator_registry_id,
2170                        )
2171                        .await?,
2172                    )
2173                } else {
2174                    info!("This node is no longer a validator after reconfiguration");
2175                    if self.registry_service.remove(validator_registry_id) {
2176                        debug!("Removed validator metrics registry");
2177                    } else {
2178                        warn!("Failed to remove validator metrics registry");
2179                    }
2180                    validator_server_handle.shutdown();
2181                    debug!("Validator grpc server shutdown triggered");
2182
2183                    None
2184                }
2185            } else {
2186                // No other components should be holding a strong reference to state hasher
2187                // at this point. Confirm here before we swap in the new hasher.
2188                let global_state_hasher_metrics = Arc::into_inner(hasher)
2189                    .expect("Object state hasher should have no other references at this point")
2190                    .metrics();
2191                let new_hasher = Arc::new(GlobalStateHasher::new(
2192                    self.state.get_global_state_hash_store().clone(),
2193                    global_state_hasher_metrics,
2194                ));
2195                let weak_hasher = Arc::downgrade(&new_hasher);
2196                *hasher_guard = Some(new_hasher);
2197
2198                if self.state.is_committee_validator(&new_epoch_store) {
2199                    info!("Promoting the node from fullnode to validator, starting grpc server");
2200
2201                    let mut components = Self::construct_validator_components(
2202                        self.config.clone(),
2203                        self.state.clone(),
2204                        Arc::new(next_epoch_committee.clone()),
2205                        new_epoch_store.clone(),
2206                        self.checkpoint_store.clone(),
2207                        self.state_sync_handle.clone(),
2208                        self.randomness_handle.clone(),
2209                        weak_hasher,
2210                        self.backpressure_manager.clone(),
2211                        self.connection_monitor_status.clone(),
2212                        &self.registry_service,
2213                    )
2214                    .await?;
2215
2216                    components.validator_server_handle =
2217                        components.validator_server_handle.start().await;
2218
2219                    Some(components)
2220                } else {
2221                    None
2222                }
2223            };
2224            *validator_components_lock_guard = new_validator_components;
2225
2226            // Force releasing current epoch store DB handle, because the
2227            // Arc<AuthorityPerEpochStore> may linger.
2228            cur_epoch_store.release_db_handles();
2229
2230            // Drop the old epoch store to free its in-memory structures
2231            // (ConsensusOutputCache, ConsensusQuarantine, DashMaps, etc.).
2232            // The DB tables were already released above.
2233            drop(cur_epoch_store);
2234
2235            // Prune old epoch databases after each epoch transition to prevent
2236            // accumulation of RocksDB instances during fast catch-up sync
2237            // (e.g. syncing from genesis).
2238            self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
2239
2240            if cfg!(msim)
2241                && !matches!(
2242                    self.config
2243                        .authority_store_pruning_config
2244                        .num_epochs_to_retain_for_checkpoints(),
2245                    None | Some(u64::MAX) | Some(0)
2246                )
2247            {
2248                self.state
2249                    .prune_checkpoints_for_eligible_epochs_for_testing(
2250                        self.config.clone(),
2251                        iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2252                    )
2253                    .await?;
2254            }
2255
2256            epoch_store = new_epoch_store;
2257            info!("Reconfiguration finished");
2258        }
2259    }
2260
2261    async fn shutdown(&self) {
2262        if let Some(validator_components) = &*self.validator_components.lock().await {
2263            validator_components.consensus_manager.shutdown().await;
2264        }
2265
2266        // Shutdown the gRPC server if it's running
2267        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2268            info!("Shutting down gRPC server");
2269            if let Err(e) = grpc_handle.shutdown().await {
2270                warn!("Failed to gracefully shutdown gRPC server: {e}");
2271            }
2272        }
2273    }
2274
2275    /// Asynchronously reconfigures the state of the authority node for the next
2276    /// epoch.
2277    async fn reconfigure_state(
2278        &self,
2279        state: &Arc<AuthorityState>,
2280        cur_epoch_store: &AuthorityPerEpochStore,
2281        next_epoch_committee: Committee,
2282        next_epoch_start_system_state: EpochStartSystemState,
2283        global_state_hasher: Arc<GlobalStateHasher>,
2284    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2285        let next_epoch = next_epoch_committee.epoch();
2286
2287        let last_checkpoint = self
2288            .checkpoint_store
2289            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2290            .expect("Error loading last checkpoint for current epoch")
2291            .expect("Could not load last checkpoint for current epoch");
2292        let epoch_supply_change = last_checkpoint
2293            .end_of_epoch_data
2294            .as_ref()
2295            .ok_or_else(|| {
2296                IotaError::from("last checkpoint in epoch should contain end of epoch data")
2297            })?
2298            .epoch_supply_change;
2299
2300        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2301
2302        assert_eq!(
2303            Some(last_checkpoint_seq),
2304            self.checkpoint_store
2305                .get_highest_executed_checkpoint_seq_number()
2306                .expect("Error loading highest executed checkpoint sequence number")
2307        );
2308
2309        let epoch_start_configuration = EpochStartConfiguration::new(
2310            next_epoch_start_system_state,
2311            *last_checkpoint.digest(),
2312            state.get_object_store().as_ref(),
2313            EpochFlag::default_flags_for_new_epoch(&state.config),
2314        )
2315        .expect("EpochStartConfiguration construction cannot fail");
2316
2317        let new_epoch_store = self
2318            .state
2319            .reconfigure(
2320                cur_epoch_store,
2321                self.config.supported_protocol_versions.unwrap(),
2322                next_epoch_committee,
2323                epoch_start_configuration,
2324                global_state_hasher,
2325                &self.config.expensive_safety_check_config,
2326                epoch_supply_change,
2327                last_checkpoint_seq,
2328            )
2329            .await
2330            .expect("Reconfigure authority state cannot fail");
2331        info!(next_epoch, "Node State has been reconfigured");
2332        assert_eq!(next_epoch, new_epoch_store.epoch());
2333        self.state.get_reconfig_api().update_epoch_flags_metrics(
2334            cur_epoch_store.epoch_start_config().flags(),
2335            new_epoch_store.epoch_start_config().flags(),
2336        );
2337
2338        Ok(new_epoch_store)
2339    }
2340
2341    pub fn get_config(&self) -> &NodeConfig {
2342        &self.config
2343    }
2344
2345    async fn execute_transaction_immediately_at_zero_epoch(
2346        state: &Arc<AuthorityState>,
2347        epoch_store: &Arc<AuthorityPerEpochStore>,
2348        tx: &Transaction,
2349        span: tracing::Span,
2350    ) {
2351        let _guard = span.enter();
2352        let transaction =
2353            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2354                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2355                    tx.data().clone(),
2356                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2357                ),
2358            );
2359        state
2360            .try_execute_immediately(&transaction, None, epoch_store)
2361            .unwrap();
2362    }
2363
2364    pub fn randomness_handle(&self) -> randomness::Handle {
2365        self.randomness_handle.clone()
2366    }
2367
2368    /// Sends signed capability notification to committee validators for
2369    /// non-committee validators. This method implements retry logic to handle
2370    /// failed attempts to send the notification. It will retry sending the
2371    /// notification with an increasing interval until it receives a successful
2372    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2373    async fn send_signed_capability_notification_to_committee_with_retry(
2374        &self,
2375        epoch_store: &Arc<AuthorityPerEpochStore>,
2376    ) {
2377        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2378        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2379        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2380
2381        // Create the capability notification once
2382        let config = epoch_store.protocol_config();
2383        let binary_config = to_binary_config(config);
2384
2385        // Create the capability notification
2386        let capabilities = AuthorityCapabilitiesV1::new(
2387            self.state.name,
2388            epoch_store.get_chain(),
2389            self.config
2390                .supported_protocol_versions
2391                .expect("Supported versions should be populated")
2392                .truncate_below(config.version),
2393            self.state
2394                .get_available_system_packages(&binary_config)
2395                .await,
2396        );
2397
2398        // Sign the capabilities using the authority key pair from config
2399        let signature = AuthoritySignature::new_secure(
2400            &IntentMessage::new(
2401                Intent::iota_app(IntentScope::AuthorityCapabilities),
2402                &capabilities,
2403            ),
2404            &epoch_store.epoch(),
2405            self.config.authority_key_pair(),
2406        );
2407
2408        let request = HandleCapabilityNotificationRequestV1 {
2409            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2410        };
2411
2412        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2413
2414        loop {
2415            let auth_agg = self.auth_agg.load();
2416            match auth_agg
2417                .send_capability_notification_to_quorum(request.clone())
2418                .await
2419            {
2420                Ok(_) => {
2421                    info!("Successfully sent capability notification to committee");
2422                    break;
2423                }
2424                Err(err) => {
2425                    match &err {
2426                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2427                            errors,
2428                        } => {
2429                            warn!(
2430                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2431                                retry_interval, errors
2432                            );
2433                        }
2434                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2435                            errors,
2436                        } => {
2437                            error!(
2438                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2439                                errors
2440                            );
2441                            break;
2442                        }
2443                    };
2444
2445                    // Wait before retrying
2446                    tokio::time::sleep(retry_interval).await;
2447
2448                    // Increase retry interval for the next attempt, capped at max
2449                    retry_interval = std::cmp::min(
2450                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2451                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2452                    );
2453                }
2454            }
2455        }
2456    }
2457}
2458
2459#[cfg(msim)]
2460impl IotaNode {
2461    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2462        self.sim_state.sim_node.id()
2463    }
2464
2465    pub fn set_safe_mode_expected(&self, new_value: bool) {
2466        info!("Setting safe mode expected to {}", new_value);
2467        self.sim_state
2468            .sim_safe_mode_expected
2469            .store(new_value, Ordering::Relaxed);
2470    }
2471}
2472
2473enum SpawnOnce {
2474    // Mutex is only needed to make SpawnOnce Sync
2475    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2476    #[allow(unused)]
2477    Started(iota_http::ServerHandle),
2478}
2479
2480impl SpawnOnce {
2481    pub fn new(
2482        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2483    ) -> Self {
2484        Self::Unstarted(Mutex::new(Box::pin(future)))
2485    }
2486
2487    pub async fn start(self) -> Self {
2488        match self {
2489            Self::Unstarted(future) => {
2490                let server = future
2491                    .into_inner()
2492                    .await
2493                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2494                let handle = server.handle().clone();
2495                tokio::spawn(async move {
2496                    if let Err(err) = server.serve().await {
2497                        info!("Server stopped: {err}");
2498                    }
2499                    info!("Server stopped");
2500                });
2501                Self::Started(handle)
2502            }
2503            Self::Started(_) => self,
2504        }
2505    }
2506
2507    pub fn shutdown(self) {
2508        if let SpawnOnce::Started(handle) = self {
2509            handle.trigger_shutdown();
2510        }
2511    }
2512}
2513
2514/// The public formal-snapshot source for a recognized chain, used to backfill a
2515/// pruned node's `epoch_info` chain from the bucket's `EPOCH_INFO`. `None` for
2516/// networks without a published bucket (custom/local), where a residual gap is
2517/// left to extend forward instead.
2518///
2519/// Hardcoded deliberately: this is a one-time migration aid. Remove it together
2520/// with `backfill_epoch_info_from_snapshot` (and `get_devnet_chain_identifier`)
2521/// one release after this ships, once every node holds the chain and new nodes
2522/// get it from genesis sync or a V2 formal-snapshot restore. Anything fetched
2523/// is verified against the genesis committee, so the URL is only a source hint,
2524/// not a trust root.
2525///
2526/// TODO: <https://github.com/iotaledger/iota/issues/12028>
2527fn formal_snapshot_read_config(chain_id: ChainIdentifier) -> Option<ObjectStoreConfig> {
2528    let (bucket, endpoint) = match chain_id.chain() {
2529        Chain::Mainnet => (
2530            "iota-mainnet-formal",
2531            "https://formal-snapshot.mainnet.iota.cafe",
2532        ),
2533        Chain::Testnet => (
2534            "iota-testnet-formal",
2535            "https://formal-snapshot.testnet.iota.cafe",
2536        ),
2537        // Devnet has no stable identity (`Chain::Unknown`), so match the current
2538        // devnet genesis explicitly. After a reset the new genesis no longer
2539        // matches and this falls through to `None`. Devnet backfill is
2540        // best-effort (see `seed_epoch_info`), so a stale id or absent bucket is
2541        // never fatal.
2542        Chain::Unknown if chain_id == get_devnet_chain_identifier() => (
2543            "iota-devnet-formal",
2544            "https://formal-snapshot.devnet.iota.cafe",
2545        ),
2546        Chain::Unknown => return None,
2547    };
2548    Some(ObjectStoreConfig {
2549        object_store: Some(ObjectStoreType::S3),
2550        bucket: Some(bucket.to_owned()),
2551        aws_endpoint: Some(endpoint.to_owned()),
2552        aws_virtual_hosted_style_request: true,
2553        object_store_connection_limit: 200,
2554        no_sign_request: true,
2555        ..Default::default()
2556    })
2557}
2558
2559/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2560/// available.
2561fn send_trusted_peer_change(
2562    config: &NodeConfig,
2563    sender: &watch::Sender<TrustedPeerChangeEvent>,
2564    new_epoch_start_state: &EpochStartSystemState,
2565) {
2566    let new_committee =
2567        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2568
2569    sender.send_modify(|event| {
2570        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2571        event.new_committee = new_committee;
2572    })
2573}
2574
2575fn build_kv_store(
2576    state: &Arc<AuthorityState>,
2577    config: &NodeConfig,
2578    registry: &Registry,
2579) -> Result<Arc<TransactionKeyValueStore>> {
2580    let metrics = KeyValueStoreMetrics::new(registry);
2581    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2582
2583    let base_url = &config.transaction_kv_store_read_config.base_url;
2584
2585    if base_url.is_empty() {
2586        info!("no http kv store url provided, using local db only");
2587        return Ok(Arc::new(db_store));
2588    }
2589
2590    base_url.parse::<url::Url>().tap_err(|e| {
2591        error!(
2592            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2593            base_url, e
2594        )
2595    })?;
2596
2597    let http_store = HttpKVStore::new_kv(
2598        base_url,
2599        config.transaction_kv_store_read_config.cache_size,
2600        metrics.clone(),
2601    )?;
2602    info!("using local key-value store with fallback to http key-value store");
2603    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2604        db_store,
2605        http_store,
2606        metrics,
2607        "json_rpc_fallback",
2608    )))
2609}
2610
2611/// Builds and starts the gRPC server for the IOTA node based on the node's
2612/// configuration.
2613///
2614/// This function performs the following tasks:
2615/// 1. Checks if the node is a validator by inspecting the consensus
2616///    configuration; if so, it returns early as validators do not expose gRPC
2617///    APIs.
2618/// 2. Checks if gRPC is enabled in the configuration.
2619/// 3. Creates broadcast channels for checkpoint streaming.
2620/// 4. Initializes the gRPC checkpoint service.
2621/// 5. Spawns the gRPC server to listen for incoming connections.
2622///
2623/// Returns a tuple of optional broadcast channels for checkpoint summary and
2624/// data.
2625async fn build_grpc_server(
2626    config: &NodeConfig,
2627    state: Arc<AuthorityState>,
2628    state_sync_store: RocksDbStore,
2629    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2630    prometheus_registry: &Registry,
2631    server_version: ServerVersion,
2632) -> Result<Option<GrpcServerHandle>> {
2633    // Validators do not expose gRPC APIs
2634    if config.consensus_config().is_some() || !config.enable_grpc_api {
2635        return Ok(None);
2636    }
2637
2638    let Some(grpc_config) = &config.grpc_api_config else {
2639        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2640    };
2641
2642    // Get chain identifier from state directly
2643    let chain_id = state.get_chain_identifier();
2644
2645    let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2646
2647    // Create cancellation token for proper shutdown hierarchy
2648    let shutdown_token = CancellationToken::new();
2649
2650    // Create GrpcReader
2651    let grpc_reader = Arc::new(GrpcReader::new(
2652        grpc_read_store,
2653        Some(server_version.to_string()),
2654    ));
2655
2656    // Create gRPC server metrics
2657    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2658    let client_id_source = config
2659        .policy_config
2660        .as_ref()
2661        .map(|p| p.client_id_source.clone());
2662
2663    let handle = start_grpc_server(
2664        grpc_reader,
2665        executor,
2666        grpc_config.clone(),
2667        shutdown_token,
2668        chain_id,
2669        Some(grpc_server_metrics),
2670        state.traffic_controller.clone(),
2671        client_id_source,
2672    )
2673    .await?;
2674
2675    Ok(Some(handle))
2676}
2677
2678/// Builds and starts the HTTP server for the IOTA node, exposing the JSON-RPC
2679/// API based on the node's configuration.
2680///
2681/// This function performs the following tasks:
2682/// 1. Checks if the node is a validator by inspecting the consensus
2683///    configuration; if so, it returns early as validators do not expose these
2684///    APIs.
2685/// 2. Creates an Axum router to handle HTTP requests.
2686/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2687///    on the node's state and configuration, including CoinApi,
2688///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2689///    IndexerApi.
2690/// 4. Binds the server to the specified JSON-RPC address and starts listening
2691///    for incoming connections.
2692pub async fn build_http_server(
2693    state: Arc<AuthorityState>,
2694    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2695    config: &NodeConfig,
2696    prometheus_registry: &Registry,
2697) -> Result<Option<iota_http::ServerHandle>> {
2698    // Validators do not expose these APIs
2699    if config.consensus_config().is_some() {
2700        return Ok(None);
2701    }
2702
2703    let mut router = axum::Router::new();
2704
2705    let json_rpc_router = {
2706        let traffic_controller = state.traffic_controller.clone();
2707        let mut server = JsonRpcServerBuilder::new(
2708            env!("CARGO_PKG_VERSION"),
2709            prometheus_registry,
2710            traffic_controller,
2711            config.policy_config.clone(),
2712        );
2713
2714        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2715
2716        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2717        server.register_module(ReadApi::new(
2718            state.clone(),
2719            kv_store.clone(),
2720            metrics.clone(),
2721        ))?;
2722        server.register_module(CoinReadApi::new(
2723            state.clone(),
2724            kv_store.clone(),
2725            metrics.clone(),
2726        )?)?;
2727
2728        // if run_with_range is enabled we want to prevent any transactions
2729        // run_with_range = None is normal operating conditions
2730        if config.run_with_range.is_none() {
2731            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2732        }
2733        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2734
2735        if let Some(transaction_orchestrator) = transaction_orchestrator {
2736            server.register_module(TransactionExecutionApi::new(
2737                state.clone(),
2738                transaction_orchestrator.clone(),
2739                metrics.clone(),
2740            ))?;
2741        }
2742
2743        let iota_names_config = config
2744            .iota_names_config
2745            .clone()
2746            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2747
2748        server.register_module(IndexerApi::new(
2749            state.clone(),
2750            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2751            kv_store,
2752            metrics,
2753            iota_names_config,
2754            config.indexer_max_subscriptions,
2755        ))?;
2756        server.register_module(MoveUtils::new(state.clone()))?;
2757
2758        let server_type = config.jsonrpc_server_type();
2759
2760        server.to_router(server_type).await?
2761    };
2762
2763    router = router.merge(json_rpc_router);
2764
2765    router = router
2766        .route("/health", axum::routing::get(health_check_handler))
2767        .route_layer(axum::Extension(state));
2768
2769    let layers = ServiceBuilder::new()
2770        .map_request(|mut request: axum::http::Request<_>| {
2771            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2772                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2773                request.extensions_mut().insert(axum_connect_info);
2774            }
2775            request
2776        })
2777        .layer(axum::middleware::from_fn(server_timing_middleware));
2778
2779    router = router.layer(layers);
2780
2781    let handle = iota_http::Builder::new()
2782        .serve(&config.json_rpc_address, router)
2783        .map_err(|e| anyhow::anyhow!("{e}"))?;
2784    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2785
2786    Ok(Some(handle))
2787}
2788
2789#[derive(Debug, serde::Serialize, serde::Deserialize)]
2790pub struct Threshold {
2791    pub threshold_seconds: Option<u32>,
2792}
2793
2794async fn health_check_handler(
2795    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2796    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2797) -> impl axum::response::IntoResponse {
2798    if let Some(threshold_seconds) = threshold_seconds {
2799        // Attempt to get the latest checkpoint
2800        let summary = match state
2801            .get_checkpoint_store()
2802            .get_highest_executed_checkpoint()
2803        {
2804            Ok(Some(summary)) => summary,
2805            Ok(None) => {
2806                warn!("Highest executed checkpoint not found");
2807                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2808            }
2809            Err(err) => {
2810                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2811                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2812            }
2813        };
2814
2815        // Calculate the threshold time based on the provided threshold_seconds
2816        let latest_chain_time = summary.timestamp();
2817        let threshold =
2818            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2819
2820        // Check if the latest checkpoint is within the threshold
2821        if latest_chain_time < threshold {
2822            warn!(
2823                ?latest_chain_time,
2824                ?threshold,
2825                "failing health check due to checkpoint lag"
2826            );
2827            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2828        }
2829    }
2830    // if health endpoint is responding and no threshold is given, respond success
2831    (axum::http::StatusCode::OK, "up")
2832}
2833
2834#[cfg(not(test))]
2835fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2836    protocol_config.max_transactions_per_checkpoint() as usize
2837}
2838
2839#[cfg(test)]
2840fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2841    2
2842}