iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::HashMap,
9    fmt,
10    future::Future,
11    path::PathBuf,
12    sync::{Arc, Weak},
13    time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18    callback::CallbackLayer,
19    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28    ConsensusConfig, NodeConfig,
29    node::{DBCheckpointConfig, RunWithRange},
30    node_config_metrics::NodeConfigMetrics,
31    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34    authority::{
35        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36        authority_per_epoch_store::AuthorityPerEpochStore,
37        authority_store_pruner::ObjectsCompactionFilter,
38        authority_store_tables::{
39            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40        },
41        backpressure::BackpressureManager,
42        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43    },
44    authority_aggregator::{
45        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46    },
47    authority_client::NetworkAuthorityClient,
48    authority_server::{ValidatorService, ValidatorServiceMetrics},
49    checkpoint_progress_tracker::CheckpointProgressTracker,
50    checkpoints::{
51        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
52        SubmitCheckpointToConsensus,
53        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
54    },
55    connection_monitor::ConnectionMonitor,
56    consensus_adapter::{
57        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
58        ConsensusClient,
59    },
60    consensus_handler::ConsensusHandlerInitializer,
61    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
62    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
63    db_checkpoint_handler::DBCheckpointHandler,
64    epoch::{
65        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
66        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
67        reconfiguration::ReconfigurationInitiator,
68    },
69    execution_cache::build_execution_cache,
70    global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
71    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72    jsonrpc_index::IndexStore,
73    module_cache_metrics::ResolverMetrics,
74    overload_monitor::overload_monitor,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    storage::{GrpcReadStore, RocksDbStore},
78    traffic_controller::metrics::TrafficControllerMetrics,
79    transaction_orchestrator::TransactionOrchestrator,
80    validator_tx_finalizer::ValidatorTxFinalizer,
81};
82use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
83use iota_json_rpc::{
84    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
85    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
86    transaction_builder_api::TransactionBuilderApi,
87    transaction_execution_api::TransactionExecutionApi,
88};
89use iota_json_rpc_api::JsonRpcMetrics;
90use iota_macros::{fail_point, fail_point_async, replay_log};
91use iota_metrics::{
92    RegistryID, RegistryService,
93    hardware_metrics::register_hardware_metrics,
94    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
95    server_timing_middleware, spawn_monitored_task,
96};
97use iota_names::config::IotaNamesConfig;
98use iota_network::{
99    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
100};
101use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
102use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
103use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
104use iota_snapshot::uploader::StateSnapshotUploader;
105use iota_storage::{
106    FileCompression, StorageFormat,
107    http_key_value_store::HttpKVStore,
108    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
109    key_value_store_metrics::KeyValueStoreMetrics,
110};
111use iota_types::{
112    base_types::{AuthorityName, ConciseableName, EpochId},
113    committee::Committee,
114    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
115    digests::ChainIdentifier,
116    error::{IotaError, IotaResult},
117    executable_transaction::VerifiedExecutableTransaction,
118    execution_config_utils::to_binary_config,
119    full_checkpoint_content::CheckpointData,
120    iota_system_state::{
121        IotaSystemState, IotaSystemStateTrait,
122        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
123    },
124    messages_consensus::{
125        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
126        SignedAuthorityCapabilitiesV1,
127    },
128    messages_grpc::HandleCapabilityNotificationRequestV1,
129    quorum_driver_types::QuorumDriverEffectsQueueResult,
130    supported_protocol_versions::SupportedProtocolVersions,
131    transaction::{Transaction, VerifiedCertificate},
132};
133use prometheus::Registry;
134#[cfg(msim)]
135use simulator::*;
136use tap::tap::TapFallible;
137use tokio::{
138    sync::{Mutex, broadcast, mpsc, watch},
139    task::{JoinHandle, JoinSet},
140};
141use tokio_util::sync::CancellationToken;
142use tower::ServiceBuilder;
143use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
144use typed_store::{
145    DBMetrics,
146    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
147};
148
149use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
150
151pub mod admin;
152mod handle;
153pub mod metrics;
154
155pub struct ValidatorComponents {
156    validator_server_handle: SpawnOnce,
157    validator_overload_monitor_handle: Option<JoinHandle<()>>,
158    consensus_manager: ConsensusManager,
159    consensus_store_pruner: ConsensusStorePruner,
160    consensus_adapter: Arc<ConsensusAdapter>,
161    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
162    checkpoint_service_tasks: JoinSet<()>,
163    checkpoint_metrics: Arc<CheckpointMetrics>,
164    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
165    validator_registry_id: RegistryID,
166}
167
168#[cfg(msim)]
169mod simulator {
170    use std::sync::atomic::AtomicBool;
171
172    use super::*;
173
174    pub(super) struct SimState {
175        pub sim_node: iota_simulator::runtime::NodeHandle,
176        pub sim_safe_mode_expected: AtomicBool,
177        _leak_detector: iota_simulator::NodeLeakDetector,
178    }
179
180    impl Default for SimState {
181        fn default() -> Self {
182            Self {
183                sim_node: iota_simulator::runtime::NodeHandle::current(),
184                sim_safe_mode_expected: AtomicBool::new(false),
185                _leak_detector: iota_simulator::NodeLeakDetector::new(),
186            }
187        }
188    }
189}
190
191#[derive(Clone)]
192pub struct ServerVersion {
193    pub bin: &'static str,
194    pub version: &'static str,
195}
196
197impl ServerVersion {
198    pub fn new(bin: &'static str, version: &'static str) -> Self {
199        Self { bin, version }
200    }
201}
202
203impl std::fmt::Display for ServerVersion {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.write_str(self.bin)?;
206        f.write_str("/")?;
207        f.write_str(self.version)
208    }
209}
210
211pub struct IotaNode {
212    config: NodeConfig,
213    validator_components: Mutex<Option<ValidatorComponents>>,
214    /// The http server responsible for serving JSON-RPC
215    _http_server: Option<iota_http::ServerHandle>,
216    state: Arc<AuthorityState>,
217    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
218    registry_service: RegistryService,
219    metrics: Arc<IotaNodeMetrics>,
220
221    _discovery: discovery::Handle,
222    state_sync_handle: state_sync::Handle,
223    randomness_handle: randomness::Handle,
224    checkpoint_store: Arc<CheckpointStore>,
225    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
226    connection_monitor_status: Arc<ConnectionMonitorStatus>,
227
228    /// Broadcast channel to send the starting system state for the next epoch.
229    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
230
231    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
232    /// peers.
233    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
234
235    backpressure_manager: Arc<BackpressureManager>,
236
237    checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
238
239    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
240
241    #[cfg(msim)]
242    sim_state: SimState,
243
244    _state_archive_handle: Option<broadcast::Sender<()>>,
245
246    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
247    // Channel to allow signaling upstream to shutdown iota-node
248    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
249
250    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
251    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
252
253    /// AuthorityAggregator of the network, created at start and beginning of
254    /// each epoch. Use ArcSwap so that we could mutate it without taking
255    /// mut reference.
256    // TODO: Eventually we can make this auth aggregator a shared reference so that this
257    // update will automatically propagate to other uses.
258    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
259}
260
261impl fmt::Debug for IotaNode {
262    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
263        f.debug_struct("IotaNode")
264            .field("name", &self.state.name.concise())
265            .finish()
266    }
267}
268
269impl IotaNode {
270    pub async fn start(
271        config: NodeConfig,
272        registry_service: RegistryService,
273    ) -> Result<Arc<IotaNode>> {
274        Self::start_async(
275            config,
276            registry_service,
277            ServerVersion::new("iota-node", "unknown"),
278        )
279        .await
280    }
281
282    pub async fn start_async(
283        config: NodeConfig,
284        registry_service: RegistryService,
285        server_version: ServerVersion,
286    ) -> Result<Arc<IotaNode>> {
287        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
288        let mut config = config.clone();
289        if config.supported_protocol_versions.is_none() {
290            info!(
291                "populating config.supported_protocol_versions with default {:?}",
292                SupportedProtocolVersions::SYSTEM_DEFAULT
293            );
294            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
295        }
296
297        let run_with_range = config.run_with_range;
298        let is_validator = config.consensus_config().is_some();
299        let is_full_node = !is_validator;
300        let prometheus_registry = registry_service.default_registry();
301
302        info!(node =? config.authority_public_key(),
303            "Initializing iota-node listening on {}", config.network_address
304        );
305
306        let genesis = config.genesis()?.clone();
307
308        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
309        info!("IOTA chain identifier: {chain_identifier}");
310
311        // Check and set the db_corrupted flag
312        let db_corrupted_path = &config.db_path().join("status");
313        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
314            panic!("Failed to check database corruption: {err}");
315        }
316
317        // Initialize metrics to track db usage before creating any stores
318        DBMetrics::init(&prometheus_registry);
319
320        // Initialize IOTA metrics.
321        iota_metrics::init_metrics(&prometheus_registry);
322        // Unsupported (because of the use of static variable) and unnecessary in
323        // simtests.
324        #[cfg(not(msim))]
325        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
326
327        // Register hardware metrics.
328        register_hardware_metrics(&registry_service, &config.db_path)
329            .expect("Failed registering hardware metrics");
330        // Register uptime metric
331        prometheus_registry
332            .register(iota_metrics::uptime_metric(
333                if is_validator {
334                    "validator"
335                } else {
336                    "fullnode"
337                },
338                server_version.version,
339                &chain_identifier.to_string(),
340            ))
341            .expect("Failed registering uptime metric");
342
343        // If genesis come with some migration data then load them into memory from the
344        // file path specified in config.
345        let migration_tx_data = if genesis.contains_migrations() {
346            // Here the load already verifies that the content of the migration blob is
347            // valid in respect to the content found in genesis
348            Some(config.load_migration_tx_data()?)
349        } else {
350            None
351        };
352
353        let secret = Arc::pin(config.authority_key_pair().copy());
354        let genesis_committee = genesis.committee()?;
355        let committee_store = Arc::new(CommitteeStore::new(
356            config.db_path().join("epochs"),
357            &genesis_committee,
358            None,
359        ));
360
361        let mut pruner_db = None;
362        if config
363            .authority_store_pruning_config
364            .enable_compaction_filter
365        {
366            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
367                &config.db_path().join("store"),
368            )));
369        }
370        let compaction_filter = pruner_db
371            .clone()
372            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
373
374        // By default, only enable write stall on validators for perpetual db.
375        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
376        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
377            enable_write_stall,
378            compaction_filter,
379        };
380        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
381            &config.db_path().join("store"),
382            Some(perpetual_tables_options),
383        ));
384        let is_genesis = perpetual_tables
385            .database_is_empty()
386            .expect("Database read should not fail at init.");
387        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
388        let backpressure_manager =
389            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
390
391        let perpetual_tables_for_progress = perpetual_tables.clone();
392        let store = AuthorityStore::open(
393            perpetual_tables,
394            &genesis,
395            &config,
396            &prometheus_registry,
397            migration_tx_data.as_ref(),
398        )
399        .await?;
400
401        let cur_epoch = store.get_recovery_epoch_at_restart()?;
402        let committee = committee_store
403            .get_committee(&cur_epoch)?
404            .expect("Committee of the current epoch must exist");
405        let epoch_start_configuration = store
406            .get_epoch_start_configuration()?
407            .expect("EpochStartConfiguration of the current epoch must exist");
408        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
409        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
410
411        let cache_traits = build_execution_cache(
412            &config.execution_cache_config,
413            &epoch_start_configuration,
414            &prometheus_registry,
415            &store,
416            backpressure_manager.clone(),
417        );
418
419        let auth_agg = {
420            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
421            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
422            Arc::new(ArcSwap::new(Arc::new(
423                AuthorityAggregator::new_from_epoch_start_state(
424                    epoch_start_configuration.epoch_start_state(),
425                    &committee_store,
426                    safe_client_metrics_base,
427                    auth_agg_metrics,
428                ),
429            )))
430        };
431
432        let chain = match config.chain_override_for_testing {
433            Some(chain) => chain,
434            None => chain_identifier.chain(),
435        };
436
437        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
438        let epoch_store = AuthorityPerEpochStore::new(
439            config.authority_public_key(),
440            committee.clone(),
441            &config.db_path().join("store"),
442            Some(epoch_options.options),
443            EpochMetrics::new(&registry_service.default_registry()),
444            epoch_start_configuration,
445            cache_traits.backing_package_store.clone(),
446            cache_metrics,
447            signature_verifier_metrics,
448            &config.expensive_safety_check_config,
449            (chain_identifier, chain),
450            checkpoint_store
451                .get_highest_executed_checkpoint_seq_number()
452                .expect("checkpoint store read cannot fail")
453                .unwrap_or(0),
454        )?;
455
456        info!("created epoch store");
457
458        replay_log!(
459            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
460            epoch_store.epoch(),
461            epoch_store.protocol_config()
462        );
463
464        // the database is empty at genesis time
465        if is_genesis {
466            info!("checking IOTA conservation at genesis");
467            // When we are opening the db table, the only time when it's safe to
468            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
469            // an epoch and the IOTA conservation check will fail. This also initialize
470            // the expected_network_iota_amount table.
471            cache_traits
472                .reconfig_api
473                .try_expensive_check_iota_conservation(&epoch_store, None)
474                .expect("IOTA conservation check cannot fail at genesis");
475        }
476
477        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
478        let default_buffer_stake = epoch_store
479            .protocol_config()
480            .buffer_stake_for_protocol_upgrade_bps();
481        if effective_buffer_stake != default_buffer_stake {
482            warn!(
483                ?effective_buffer_stake,
484                ?default_buffer_stake,
485                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
486            );
487        }
488
489        checkpoint_store.insert_genesis_checkpoint(
490            genesis.checkpoint(),
491            genesis.checkpoint_contents().clone(),
492            &epoch_store,
493        );
494
495        // Database has everything from genesis, set corrupted key to 0
496        unmark_db_corruption(db_corrupted_path)?;
497
498        info!("creating state sync store");
499        let state_sync_store = RocksDbStore::new(
500            cache_traits.clone(),
501            committee_store.clone(),
502            checkpoint_store.clone(),
503        );
504
505        let index_store = if is_full_node && config.enable_index_processing {
506            info!("creating index store");
507            Some(Arc::new(IndexStore::new(
508                config.db_path().join("indexes"),
509                &prometheus_registry,
510                epoch_store
511                    .protocol_config()
512                    .max_move_identifier_len_as_option(),
513            )))
514        } else {
515            None
516        };
517
518        let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
519            Some(Arc::new(
520                GrpcIndexesStore::new(
521                    config.db_path().join(GRPC_INDEXES_DIR),
522                    Arc::clone(&store),
523                    &checkpoint_store,
524                )
525                .await,
526            ))
527        } else {
528            None
529        };
530
531        info!("creating archive reader");
532        // Create network
533        // TODO only configure validators as seed/preferred peers for validators and not
534        // for fullnodes once we've had a chance to re-work fullnode
535        // configuration generation.
536        let archive_readers =
537            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
538        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
539        let (randomness_tx, randomness_rx) = mpsc::channel(
540            config
541                .p2p_config
542                .randomness
543                .clone()
544                .unwrap_or_default()
545                .mailbox_capacity(),
546        );
547        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
548            Self::create_p2p_network(
549                &config,
550                state_sync_store.clone(),
551                chain_identifier,
552                trusted_peer_change_rx,
553                archive_readers.clone(),
554                randomness_tx,
555                &prometheus_registry,
556            )?;
557
558        // We must explicitly send this instead of relying on the initial value to
559        // trigger watch value change, so that state-sync is able to process it.
560        send_trusted_peer_change(
561            &config,
562            &trusted_peer_change_tx,
563            epoch_store.epoch_start_state(),
564        );
565
566        info!("start state archival");
567        // Start archiving local state to remote store
568        let state_archive_handle =
569            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
570                .await?;
571
572        info!("start snapshot upload");
573        // Start uploading state snapshot to remote store
574        let state_snapshot_handle =
575            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
576
577        let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
578
579        // Start uploading db checkpoints to remote store
580        info!("start db checkpoint");
581        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
582            &config,
583            &prometheus_registry,
584            state_snapshot_handle.is_some(),
585            Some(checkpoint_progress_tracker.clone()),
586        )?;
587
588        let mut genesis_objects = genesis.objects().to_vec();
589        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
590            genesis_objects.extend(migration_tx_data.get_objects());
591        }
592
593        let authority_name = config.authority_public_key();
594        let validator_tx_finalizer =
595            config
596                .enable_validator_tx_finalizer
597                .then_some(Arc::new(ValidatorTxFinalizer::new(
598                    auth_agg.clone(),
599                    authority_name,
600                    &prometheus_registry,
601                )));
602
603        info!("create authority state");
604        let state = AuthorityState::new(
605            authority_name,
606            secret,
607            config.supported_protocol_versions.unwrap(),
608            store.clone(),
609            cache_traits.clone(),
610            epoch_store.clone(),
611            committee_store.clone(),
612            index_store.clone(),
613            grpc_indexes_store,
614            checkpoint_store.clone(),
615            &prometheus_registry,
616            &genesis_objects,
617            &db_checkpoint_config,
618            config.clone(),
619            archive_readers,
620            validator_tx_finalizer,
621            chain_identifier,
622            pruner_db,
623            Some(checkpoint_progress_tracker.clone()),
624        )
625        .await;
626
627        // ensure genesis and migration txs were executed
628        if epoch_store.epoch() == 0 {
629            let genesis_tx = &genesis.transaction();
630            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
631            // Execute genesis transaction
632            Self::execute_transaction_immediately_at_zero_epoch(
633                &state,
634                &epoch_store,
635                genesis_tx,
636                span,
637            )
638            .await;
639
640            // Execute migration transactions if present
641            if let Some(migration_tx_data) = migration_tx_data {
642                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
643                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
644                    Self::execute_transaction_immediately_at_zero_epoch(
645                        &state,
646                        &epoch_store,
647                        tx,
648                        span,
649                    )
650                    .await;
651                }
652            }
653        }
654
655        // Start the loop that receives new randomness and generates transactions for
656        // it.
657        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
658
659        if config
660            .expensive_safety_check_config
661            .enable_secondary_index_checks()
662        {
663            if let Some(indexes) = state.indexes.clone() {
664                iota_core::verify_indexes::verify_indexes(
665                    state.get_global_state_hash_store().as_ref(),
666                    indexes,
667                )
668                .expect("secondary indexes are inconsistent");
669            }
670        }
671
672        let (end_of_epoch_channel, end_of_epoch_receiver) =
673            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
674
675        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
676            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
677                auth_agg.load_full(),
678                state.clone(),
679                end_of_epoch_receiver,
680                &config.db_path(),
681                &prometheus_registry,
682            )))
683        } else {
684            None
685        };
686
687        let http_server = build_http_server(
688            state.clone(),
689            &transaction_orchestrator.clone(),
690            &config,
691            &prometheus_registry,
692        )
693        .await?;
694
695        let global_state_hasher = Arc::new(GlobalStateHasher::new(
696            cache_traits.global_state_hash_store.clone(),
697            GlobalStateHashMetrics::new(&prometheus_registry),
698        ));
699
700        let authority_names_to_peer_ids = epoch_store
701            .epoch_start_state()
702            .get_authority_names_to_peer_ids();
703
704        let network_connection_metrics =
705            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
706
707        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
708
709        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
710            p2p_network.downgrade(),
711            network_connection_metrics,
712            HashMap::new(),
713            None,
714        );
715
716        let connection_monitor_status = ConnectionMonitorStatus {
717            connection_statuses,
718            authority_names_to_peer_ids,
719        };
720
721        let connection_monitor_status = Arc::new(connection_monitor_status);
722        let iota_node_metrics =
723            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
724
725        iota_node_metrics
726            .binary_max_protocol_version
727            .set(ProtocolVersion::MAX.as_u64() as i64);
728        iota_node_metrics
729            .configured_max_protocol_version
730            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
731
732        // Convert transaction orchestrator to executor trait object for gRPC server
733        // Note that the transaction_orchestrator (so as executor) will be None if it is
734        // a validator node or run_with_range is set
735        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
736            transaction_orchestrator
737                .clone()
738                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
739
740        let grpc_server_handle = build_grpc_server(
741            &config,
742            state.clone(),
743            state_sync_store.clone(),
744            executor,
745            &prometheus_registry,
746            server_version,
747        )
748        .await?;
749
750        let validator_components = if state.is_committee_validator(&epoch_store) {
751            let (components, _) = futures::join!(
752                Self::construct_validator_components(
753                    config.clone(),
754                    state.clone(),
755                    committee,
756                    epoch_store.clone(),
757                    checkpoint_store.clone(),
758                    state_sync_handle.clone(),
759                    randomness_handle.clone(),
760                    Arc::downgrade(&global_state_hasher),
761                    backpressure_manager.clone(),
762                    connection_monitor_status.clone(),
763                    &registry_service,
764                ),
765                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
766            );
767            let mut components = components?;
768
769            components.consensus_adapter.submit_recovered(&epoch_store);
770
771            // Start the gRPC server
772            components.validator_server_handle = components.validator_server_handle.start().await;
773
774            Some(components)
775        } else {
776            None
777        };
778
779        // setup shutdown channel
780        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
781
782        let node = Self {
783            config,
784            validator_components: Mutex::new(validator_components),
785            _http_server: http_server,
786            state,
787            transaction_orchestrator,
788            registry_service,
789            metrics: iota_node_metrics,
790
791            _discovery: discovery_handle,
792            state_sync_handle,
793            randomness_handle,
794            checkpoint_store,
795            global_state_hasher: Mutex::new(Some(global_state_hasher)),
796            end_of_epoch_channel,
797            connection_monitor_status,
798            trusted_peer_change_tx,
799            backpressure_manager,
800            checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
801
802            _db_checkpoint_handle: db_checkpoint_handle,
803
804            #[cfg(msim)]
805            sim_state: Default::default(),
806
807            _state_archive_handle: state_archive_handle,
808            _state_snapshot_uploader_handle: state_snapshot_handle,
809            shutdown_channel_tx: shutdown_channel,
810
811            grpc_server_handle: Mutex::new(grpc_server_handle),
812
813            auth_agg,
814        };
815
816        info!("IotaNode started!");
817        let node = Arc::new(node);
818        let node_copy = node.clone();
819        spawn_monitored_task!(async move {
820            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
821            if let Err(error) = result {
822                warn!("Reconfiguration finished with error {:?}", error);
823            }
824        });
825
826        node.checkpoint_progress_tracker
827            .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
828
829        Ok(node)
830    }
831
832    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
833        self.end_of_epoch_channel.subscribe()
834    }
835
836    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
837        self.shutdown_channel_tx.subscribe()
838    }
839
840    pub fn current_epoch_for_testing(&self) -> EpochId {
841        self.state.current_epoch_for_testing()
842    }
843
844    pub fn db_checkpoint_path(&self) -> PathBuf {
845        self.config.db_checkpoint_path()
846    }
847
848    // Init reconfig process by starting to reject user certs
849    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
850        info!("close_epoch (current epoch = {})", epoch_store.epoch());
851        self.validator_components
852            .lock()
853            .await
854            .as_ref()
855            .ok_or_else(|| IotaError::from("Node is not a validator"))?
856            .consensus_adapter
857            .close_epoch(epoch_store);
858        Ok(())
859    }
860
861    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
862        self.state
863            .clear_override_protocol_upgrade_buffer_stake(epoch)
864    }
865
866    pub fn set_override_protocol_upgrade_buffer_stake(
867        &self,
868        epoch: EpochId,
869        buffer_stake_bps: u64,
870    ) -> IotaResult {
871        self.state
872            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
873    }
874
875    // Testing-only API to start epoch close process.
876    // For production code, please use the non-testing version.
877    pub async fn close_epoch_for_testing(&self) -> IotaResult {
878        let epoch_store = self.state.epoch_store_for_testing();
879        self.close_epoch(&epoch_store).await
880    }
881
882    async fn start_state_archival(
883        config: &NodeConfig,
884        prometheus_registry: &Registry,
885        state_sync_store: RocksDbStore,
886    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
887        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
888            let local_store_config = ObjectStoreConfig {
889                object_store: Some(ObjectStoreType::File),
890                directory: Some(config.archive_path()),
891                ..Default::default()
892            };
893            let archive_writer = ArchiveWriter::new(
894                local_store_config,
895                remote_store_config.clone(),
896                FileCompression::Zstd,
897                StorageFormat::Blob,
898                Duration::from_secs(600),
899                256 * 1024 * 1024,
900                prometheus_registry,
901            )
902            .await?;
903            Ok(Some(archive_writer.start(state_sync_store).await?))
904        } else {
905            Ok(None)
906        }
907    }
908
909    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
910    /// is set.
911    fn start_state_snapshot(
912        config: &NodeConfig,
913        prometheus_registry: &Registry,
914        checkpoint_store: Arc<CheckpointStore>,
915    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
916        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
917            let snapshot_uploader = StateSnapshotUploader::new(
918                &config.db_checkpoint_path(),
919                &config.snapshot_path(),
920                remote_store_config.clone(),
921                60,
922                prometheus_registry,
923                checkpoint_store,
924            )?;
925            Ok(Some(snapshot_uploader.start()))
926        } else {
927            Ok(None)
928        }
929    }
930
931    fn start_db_checkpoint(
932        config: &NodeConfig,
933        prometheus_registry: &Registry,
934        state_snapshot_enabled: bool,
935        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
936    ) -> Result<(
937        DBCheckpointConfig,
938        Option<tokio::sync::broadcast::Sender<()>>,
939    )> {
940        let checkpoint_path = Some(
941            config
942                .db_checkpoint_config
943                .checkpoint_path
944                .clone()
945                .unwrap_or_else(|| config.db_checkpoint_path()),
946        );
947        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
948            DBCheckpointConfig {
949                checkpoint_path,
950                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
951                    true
952                } else {
953                    config
954                        .db_checkpoint_config
955                        .perform_db_checkpoints_at_epoch_end
956                },
957                ..config.db_checkpoint_config.clone()
958            }
959        } else {
960            config.db_checkpoint_config.clone()
961        };
962
963        match (
964            db_checkpoint_config.object_store_config.as_ref(),
965            state_snapshot_enabled,
966        ) {
967            // If db checkpoint config object store not specified but
968            // state snapshot object store is specified, create handler
969            // anyway for marking db checkpoints as completed so that they
970            // can be uploaded as state snapshots.
971            (None, false) => Ok((db_checkpoint_config, None)),
972            (_, _) => {
973                let handler = DBCheckpointHandler::new(
974                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
975                    db_checkpoint_config.object_store_config.as_ref(),
976                    60,
977                    db_checkpoint_config
978                        .prune_and_compact_before_upload
979                        .unwrap_or(true),
980                    config.authority_store_pruning_config.clone(),
981                    prometheus_registry,
982                    state_snapshot_enabled,
983                    checkpoint_progress_tracker,
984                )?;
985                Ok((
986                    db_checkpoint_config,
987                    Some(DBCheckpointHandler::start(handler)),
988                ))
989            }
990        }
991    }
992
993    fn create_p2p_network(
994        config: &NodeConfig,
995        state_sync_store: RocksDbStore,
996        chain_identifier: ChainIdentifier,
997        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
998        archive_readers: ArchiveReaderBalancer,
999        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1000        prometheus_registry: &Registry,
1001    ) -> Result<(
1002        Network,
1003        discovery::Handle,
1004        state_sync::Handle,
1005        randomness::Handle,
1006    )> {
1007        let (state_sync, state_sync_server) = state_sync::Builder::new()
1008            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1009            .store(state_sync_store)
1010            .archive_readers(archive_readers)
1011            .with_metrics(prometheus_registry)
1012            .build();
1013
1014        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1015            .config(config.p2p_config.clone())
1016            .build();
1017
1018        let (randomness, randomness_router) =
1019            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1020                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1021                .with_metrics(prometheus_registry)
1022                .build();
1023
1024        let p2p_network = {
1025            let routes = anemo::Router::new()
1026                .add_rpc_service(discovery_server)
1027                .add_rpc_service(state_sync_server);
1028            let routes = routes.merge(randomness_router);
1029
1030            let inbound_network_metrics =
1031                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1032            let outbound_network_metrics =
1033                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1034
1035            let service = ServiceBuilder::new()
1036                .layer(
1037                    TraceLayer::new_for_server_errors()
1038                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1039                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1040                )
1041                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1042                    Arc::new(inbound_network_metrics),
1043                    config.p2p_config.excessive_message_size(),
1044                )))
1045                .service(routes);
1046
1047            let outbound_layer = ServiceBuilder::new()
1048                .layer(
1049                    TraceLayer::new_for_client_and_server_errors()
1050                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1051                        .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1052                )
1053                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1054                    Arc::new(outbound_network_metrics),
1055                    config.p2p_config.excessive_message_size(),
1056                )))
1057                .into_inner();
1058
1059            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1060            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1061            // many staking events in the epoch change txn.
1062            anemo_config.max_frame_size = Some(1 << 30);
1063
1064            // Set a higher default value for socket send/receive buffers if not already
1065            // configured.
1066            let mut quic_config = anemo_config.quic.unwrap_or_default();
1067            if quic_config.socket_send_buffer_size.is_none() {
1068                quic_config.socket_send_buffer_size = Some(20 << 20);
1069            }
1070            if quic_config.socket_receive_buffer_size.is_none() {
1071                quic_config.socket_receive_buffer_size = Some(20 << 20);
1072            }
1073            quic_config.allow_failed_socket_buffer_size_setting = true;
1074
1075            // Set high-performance defaults for quinn transport.
1076            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1077            if quic_config.max_concurrent_bidi_streams.is_none() {
1078                quic_config.max_concurrent_bidi_streams = Some(500);
1079            }
1080            if quic_config.max_concurrent_uni_streams.is_none() {
1081                quic_config.max_concurrent_uni_streams = Some(500);
1082            }
1083            if quic_config.stream_receive_window.is_none() {
1084                quic_config.stream_receive_window = Some(100 << 20);
1085            }
1086            if quic_config.receive_window.is_none() {
1087                quic_config.receive_window = Some(200 << 20);
1088            }
1089            if quic_config.send_window.is_none() {
1090                quic_config.send_window = Some(200 << 20);
1091            }
1092            if quic_config.crypto_buffer_size.is_none() {
1093                quic_config.crypto_buffer_size = Some(1 << 20);
1094            }
1095            if quic_config.max_idle_timeout_ms.is_none() {
1096                quic_config.max_idle_timeout_ms = Some(30_000);
1097            }
1098            if quic_config.keep_alive_interval_ms.is_none() {
1099                quic_config.keep_alive_interval_ms = Some(5_000);
1100            }
1101            anemo_config.quic = Some(quic_config);
1102
1103            let server_name = format!("iota-{chain_identifier}");
1104            let network = Network::bind(config.p2p_config.listen_address)
1105                .server_name(&server_name)
1106                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1107                .config(anemo_config)
1108                .outbound_request_layer(outbound_layer)
1109                .start(service)?;
1110            info!(
1111                server_name = server_name,
1112                "P2p network started on {}",
1113                network.local_addr()
1114            );
1115
1116            network
1117        };
1118
1119        let discovery_handle =
1120            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1121        let state_sync_handle = state_sync.start(p2p_network.clone());
1122        let randomness_handle = randomness.start(p2p_network.clone());
1123
1124        Ok((
1125            p2p_network,
1126            discovery_handle,
1127            state_sync_handle,
1128            randomness_handle,
1129        ))
1130    }
1131
1132    /// Asynchronously constructs and initializes the components necessary for
1133    /// the validator node.
1134    async fn construct_validator_components(
1135        config: NodeConfig,
1136        state: Arc<AuthorityState>,
1137        committee: Arc<Committee>,
1138        epoch_store: Arc<AuthorityPerEpochStore>,
1139        checkpoint_store: Arc<CheckpointStore>,
1140        state_sync_handle: state_sync::Handle,
1141        randomness_handle: randomness::Handle,
1142        global_state_hasher: Weak<GlobalStateHasher>,
1143        backpressure_manager: Arc<BackpressureManager>,
1144        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1145        registry_service: &RegistryService,
1146    ) -> Result<ValidatorComponents> {
1147        let mut config_clone = config.clone();
1148        let consensus_config = config_clone
1149            .consensus_config
1150            .as_mut()
1151            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1152        let validator_registry = Registry::new();
1153        let validator_registry_id = registry_service.add(validator_registry.clone());
1154
1155        let client = Arc::new(UpdatableConsensusClient::new());
1156        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1157            &committee,
1158            consensus_config,
1159            state.name,
1160            connection_monitor_status.clone(),
1161            &validator_registry,
1162            client.clone(),
1163            checkpoint_store.clone(),
1164        ));
1165        let consensus_manager = ConsensusManager::new(
1166            &config,
1167            consensus_config,
1168            registry_service,
1169            &validator_registry,
1170            client,
1171        );
1172
1173        // This only gets started up once, not on every epoch. (Make call to remove
1174        // every epoch.)
1175        let consensus_store_pruner = ConsensusStorePruner::new(
1176            consensus_manager.get_storage_base_path(),
1177            consensus_config.db_retention_epochs(),
1178            consensus_config.db_pruner_period(),
1179            &validator_registry,
1180        );
1181
1182        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1183        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1184
1185        let validator_server_handle = Self::start_grpc_validator_service(
1186            &config,
1187            state.clone(),
1188            consensus_adapter.clone(),
1189            &validator_registry,
1190        )
1191        .await?;
1192
1193        // Starts an overload monitor that monitors the execution of the authority.
1194        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1195        let validator_overload_monitor_handle = if config
1196            .authority_overload_config
1197            .max_load_shedding_percentage
1198            > 0
1199        {
1200            let authority_state = Arc::downgrade(&state);
1201            let overload_config = config.authority_overload_config.clone();
1202            fail_point!("starting_overload_monitor");
1203            Some(spawn_monitored_task!(overload_monitor(
1204                authority_state,
1205                overload_config,
1206            )))
1207        } else {
1208            None
1209        };
1210
1211        Self::start_epoch_specific_validator_components(
1212            &config,
1213            state.clone(),
1214            consensus_adapter,
1215            checkpoint_store,
1216            epoch_store,
1217            state_sync_handle,
1218            randomness_handle,
1219            consensus_manager,
1220            consensus_store_pruner,
1221            global_state_hasher,
1222            backpressure_manager,
1223            validator_server_handle,
1224            validator_overload_monitor_handle,
1225            checkpoint_metrics,
1226            iota_tx_validator_metrics,
1227            validator_registry_id,
1228        )
1229        .await
1230    }
1231
1232    /// Initializes and starts components specific to the current
1233    /// epoch for the validator node.
1234    async fn start_epoch_specific_validator_components(
1235        config: &NodeConfig,
1236        state: Arc<AuthorityState>,
1237        consensus_adapter: Arc<ConsensusAdapter>,
1238        checkpoint_store: Arc<CheckpointStore>,
1239        epoch_store: Arc<AuthorityPerEpochStore>,
1240        state_sync_handle: state_sync::Handle,
1241        randomness_handle: randomness::Handle,
1242        consensus_manager: ConsensusManager,
1243        consensus_store_pruner: ConsensusStorePruner,
1244        global_state_hasher: Weak<GlobalStateHasher>,
1245        backpressure_manager: Arc<BackpressureManager>,
1246        validator_server_handle: SpawnOnce,
1247        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1248        checkpoint_metrics: Arc<CheckpointMetrics>,
1249        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1250        validator_registry_id: RegistryID,
1251    ) -> Result<ValidatorComponents> {
1252        let checkpoint_service = Self::build_checkpoint_service(
1253            config,
1254            consensus_adapter.clone(),
1255            checkpoint_store.clone(),
1256            epoch_store.clone(),
1257            state.clone(),
1258            state_sync_handle,
1259            global_state_hasher,
1260            checkpoint_metrics.clone(),
1261        );
1262
1263        // create a new map that gets injected into both the consensus handler and the
1264        // consensus adapter the consensus handler will write values forwarded
1265        // from consensus, and the consensus adapter will read the values to
1266        // make decisions about which validator submits a transaction to consensus
1267        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1268
1269        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1270
1271        let randomness_manager = RandomnessManager::try_new(
1272            Arc::downgrade(&epoch_store),
1273            Box::new(consensus_adapter.clone()),
1274            randomness_handle,
1275            config.authority_key_pair(),
1276        )
1277        .await;
1278        if let Some(randomness_manager) = randomness_manager {
1279            epoch_store
1280                .set_randomness_manager(randomness_manager)
1281                .await?;
1282        }
1283
1284        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1285            state.clone(),
1286            checkpoint_service.clone(),
1287            epoch_store.clone(),
1288            low_scoring_authorities,
1289            backpressure_manager,
1290        );
1291
1292        info!("Starting consensus manager");
1293
1294        consensus_manager
1295            .start(
1296                config,
1297                epoch_store.clone(),
1298                consensus_handler_initializer,
1299                IotaTxValidator::new(
1300                    epoch_store.clone(),
1301                    checkpoint_service.clone(),
1302                    state.transaction_manager().clone(),
1303                    iota_tx_validator_metrics.clone(),
1304                ),
1305            )
1306            .await;
1307
1308        info!("Spawning checkpoint service");
1309        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1310
1311        Ok(ValidatorComponents {
1312            validator_server_handle,
1313            validator_overload_monitor_handle,
1314            consensus_manager,
1315            consensus_store_pruner,
1316            consensus_adapter,
1317            checkpoint_service_tasks,
1318            checkpoint_metrics,
1319            iota_tx_validator_metrics,
1320            validator_registry_id,
1321        })
1322    }
1323
1324    /// Starts the checkpoint service for the validator node, initializing
1325    /// necessary components and settings.
1326    /// The function ensures proper initialization of the checkpoint service,
1327    /// preparing it to handle checkpoint creation and submission to consensus,
1328    /// while also setting up the necessary monitoring and synchronization
1329    /// mechanisms.
1330    fn build_checkpoint_service(
1331        config: &NodeConfig,
1332        consensus_adapter: Arc<ConsensusAdapter>,
1333        checkpoint_store: Arc<CheckpointStore>,
1334        epoch_store: Arc<AuthorityPerEpochStore>,
1335        state: Arc<AuthorityState>,
1336        state_sync_handle: state_sync::Handle,
1337        global_state_hasher: Weak<GlobalStateHasher>,
1338        checkpoint_metrics: Arc<CheckpointMetrics>,
1339    ) -> Arc<CheckpointService> {
1340        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1341        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1342
1343        debug!(
1344            "Starting checkpoint service with epoch start timestamp {}
1345            and epoch duration {}",
1346            epoch_start_timestamp_ms, epoch_duration_ms
1347        );
1348
1349        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1350            sender: consensus_adapter,
1351            signer: state.secret.clone(),
1352            authority: config.authority_public_key(),
1353            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1354                .checked_add(epoch_duration_ms)
1355                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1356            metrics: checkpoint_metrics.clone(),
1357        });
1358
1359        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1360        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1361        let max_checkpoint_size_bytes =
1362            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1363
1364        CheckpointService::build(
1365            state.clone(),
1366            checkpoint_store,
1367            epoch_store,
1368            state.get_transaction_cache_reader().clone(),
1369            global_state_hasher,
1370            checkpoint_output,
1371            Box::new(certified_checkpoint_output),
1372            checkpoint_metrics,
1373            max_tx_per_checkpoint,
1374            max_checkpoint_size_bytes,
1375        )
1376    }
1377
1378    fn construct_consensus_adapter(
1379        committee: &Committee,
1380        consensus_config: &ConsensusConfig,
1381        authority: AuthorityName,
1382        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1383        prometheus_registry: &Registry,
1384        consensus_client: Arc<dyn ConsensusClient>,
1385        checkpoint_store: Arc<CheckpointStore>,
1386    ) -> ConsensusAdapter {
1387        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1388        // The consensus adapter allows the authority to send user certificates through
1389        // consensus.
1390
1391        ConsensusAdapter::new(
1392            consensus_client,
1393            checkpoint_store,
1394            authority,
1395            connection_monitor_status,
1396            consensus_config.max_pending_transactions(),
1397            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1398            consensus_config.max_submit_position,
1399            consensus_config.submit_delay_step_override(),
1400            ca_metrics,
1401        )
1402    }
1403
1404    async fn start_grpc_validator_service(
1405        config: &NodeConfig,
1406        state: Arc<AuthorityState>,
1407        consensus_adapter: Arc<ConsensusAdapter>,
1408        prometheus_registry: &Registry,
1409    ) -> Result<SpawnOnce> {
1410        let validator_service = ValidatorService::new(
1411            state,
1412            consensus_adapter,
1413            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1414            TrafficControllerMetrics::new(prometheus_registry),
1415            config.policy_config.clone(),
1416            config.firewall_config.clone(),
1417        );
1418
1419        let mut server_conf = iota_network_stack::config::Config::new();
1420        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1421        server_conf.load_shed = config.grpc_load_shed;
1422        let server_builder =
1423            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1424                .add_service(ValidatorServer::new(validator_service));
1425
1426        let tls_config = iota_tls::create_rustls_server_config(
1427            config.network_key_pair().copy().private(),
1428            IOTA_TLS_SERVER_NAME.to_string(),
1429        );
1430
1431        let network_address = config.network_address().clone();
1432
1433        let bind_future = async move {
1434            let server = server_builder
1435                .bind(&network_address, Some(tls_config))
1436                .await
1437                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1438
1439            let local_addr = server.local_addr();
1440            info!("Listening to traffic on {local_addr}");
1441
1442            Ok(server)
1443        };
1444
1445        Ok(SpawnOnce::new(bind_future))
1446    }
1447
1448    /// Re-executes pending consensus certificates, which may not have been
1449    /// committed to disk before the node restarted. This is necessary for
1450    /// the following reasons:
1451    ///
1452    /// 1. For any transaction for which we returned signed effects to a client,
1453    ///    we must ensure that we have re-executed the transaction before we
1454    ///    begin accepting grpc requests. Otherwise we would appear to have
1455    ///    forgotten about the transaction.
1456    /// 2. While this is running, we are concurrently waiting for all previously
1457    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1458    ///    either direction (from checkpointed consensus transactions to pending
1459    ///    consensus transactions, or vice versa), we must re-execute pending
1460    ///    consensus transactions to ensure that both processes can complete.
1461    /// 3. Also note that for any pending consensus transactions for which we
1462    ///    wrote a signed effects digest to disk, we must re-execute using that
1463    ///    digest as the expected effects digest, to ensure that we cannot
1464    ///    arrive at different effects than what we previously signed.
1465    async fn reexecute_pending_consensus_certs(
1466        epoch_store: &Arc<AuthorityPerEpochStore>,
1467        state: &Arc<AuthorityState>,
1468    ) {
1469        let mut pending_consensus_certificates = Vec::new();
1470        let mut additional_certs = Vec::new();
1471
1472        for tx in epoch_store.get_all_pending_consensus_transactions() {
1473            match tx.kind {
1474                // Shared object txns cannot be re-executed at this point, because we must wait for
1475                // consensus replay to assign shared object versions.
1476                ConsensusTransactionKind::CertifiedTransaction(tx)
1477                    if !tx.contains_shared_object() =>
1478                {
1479                    let tx = *tx;
1480                    // new_unchecked is safe because we never submit a transaction to consensus
1481                    // without verifying it
1482                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1483                        VerifiedCertificate::new_unchecked(tx),
1484                    );
1485                    // we only need to re-execute if we previously signed the effects (which
1486                    // indicates we returned the effects to a client).
1487                    if let Some(fx_digest) = epoch_store
1488                        .get_signed_effects_digest(tx.digest())
1489                        .expect("db error")
1490                    {
1491                        pending_consensus_certificates.push((tx, fx_digest));
1492                    } else {
1493                        additional_certs.push(tx);
1494                    }
1495                }
1496                _ => (),
1497            }
1498        }
1499
1500        let digests = pending_consensus_certificates
1501            .iter()
1502            .map(|(tx, _)| *tx.digest())
1503            .collect::<Vec<_>>();
1504
1505        info!(
1506            "reexecuting {} pending consensus certificates: {:?}",
1507            digests.len(),
1508            digests
1509        );
1510
1511        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1512        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1513
1514        // If this times out, the validator will still almost certainly start up fine.
1515        // But, it is possible that it may temporarily "forget" about
1516        // transactions that it had previously executed. This could confuse
1517        // clients in some circumstances. However, the transactions are still in
1518        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1519        let timeout = if cfg!(msim) { 120 } else { 60 };
1520        if tokio::time::timeout(
1521            std::time::Duration::from_secs(timeout),
1522            state
1523                .get_transaction_cache_reader()
1524                .try_notify_read_executed_effects_digests(&digests),
1525        )
1526        .await
1527        .is_err()
1528        {
1529            // Log all the digests that were not executed to help debugging.
1530            if let Ok(executed_effects_digests) = state
1531                .get_transaction_cache_reader()
1532                .try_multi_get_executed_effects_digests(&digests)
1533            {
1534                let pending_digests = digests
1535                    .iter()
1536                    .zip(executed_effects_digests.iter())
1537                    .filter_map(|(digest, executed_effects_digest)| {
1538                        if executed_effects_digest.is_none() {
1539                            Some(digest)
1540                        } else {
1541                            None
1542                        }
1543                    })
1544                    .collect::<Vec<_>>();
1545                debug_fatal!(
1546                    "Timed out waiting for effects digests to be executed: {:?}",
1547                    pending_digests
1548                );
1549            } else {
1550                debug_fatal!(
1551                    "Timed out waiting for effects digests to be executed, digests not found"
1552                );
1553            }
1554        }
1555    }
1556
1557    pub fn state(&self) -> Arc<AuthorityState> {
1558        self.state.clone()
1559    }
1560
1561    // Only used for testing because of how epoch store is loaded.
1562    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1563        self.state.reference_gas_price_for_testing()
1564    }
1565
1566    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1567        self.state.committee_store().clone()
1568    }
1569
1570    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1571    // self.state.db()
1572    // }
1573
1574    /// Clone an AuthorityAggregator currently used in this node's
1575    /// QuorumDriver, if the node is a fullnode. After reconfig,
1576    /// QuorumDriver builds a new AuthorityAggregator. The caller
1577    /// of this function will mostly likely want to call this again
1578    /// to get a fresh one.
1579    pub fn clone_authority_aggregator(
1580        &self,
1581    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1582        self.transaction_orchestrator
1583            .as_ref()
1584            .map(|to| to.clone_authority_aggregator())
1585    }
1586
1587    pub fn transaction_orchestrator(
1588        &self,
1589    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1590        self.transaction_orchestrator.clone()
1591    }
1592
1593    pub fn subscribe_to_transaction_orchestrator_effects(
1594        &self,
1595    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1596        self.transaction_orchestrator
1597            .as_ref()
1598            .map(|to| to.subscribe_to_effects_queue())
1599            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1600    }
1601
1602    /// This function awaits the completion of checkpoint execution of the
1603    /// current epoch, after which it initiates reconfiguration of the
1604    /// entire system. This function also handles role changes for the node when
1605    /// epoch changes and advertises capabilities to the committee if the node
1606    /// is a validator.
1607    pub async fn monitor_reconfiguration(
1608        self: Arc<Self>,
1609        mut epoch_store: Arc<AuthorityPerEpochStore>,
1610    ) -> Result<()> {
1611        let checkpoint_executor_metrics =
1612            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1613
1614        loop {
1615            let mut hasher_guard = self.global_state_hasher.lock().await;
1616            let hasher = hasher_guard.take().unwrap();
1617            info!(
1618                "Creating checkpoint executor for epoch {}",
1619                epoch_store.epoch()
1620            );
1621
1622            // Create closures that handle gRPC type conversion
1623            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1624                guard.as_ref().map(|handle| {
1625                    let tx = handle.checkpoint_data_broadcaster().clone();
1626                    Box::new(move |data: &CheckpointData| {
1627                        tx.send_traced(data);
1628                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1629                })
1630            } else {
1631                None
1632            };
1633
1634            let checkpoint_executor = CheckpointExecutor::new(
1635                epoch_store.clone(),
1636                self.checkpoint_store.clone(),
1637                self.state.clone(),
1638                hasher.clone(),
1639                self.backpressure_manager.clone(),
1640                self.config.checkpoint_executor_config.clone(),
1641                checkpoint_executor_metrics.clone(),
1642                data_sender,
1643                Some(self.checkpoint_progress_tracker.clone()),
1644            );
1645
1646            let run_with_range = self.config.run_with_range;
1647
1648            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1649
1650            // Update the current protocol version metric.
1651            self.metrics
1652                .current_protocol_version
1653                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1654
1655            // Advertise capabilities to committee, if we are a validator.
1656            if let Some(components) = &*self.validator_components.lock().await {
1657                // TODO: without this sleep, the consensus message is not delivered reliably.
1658                tokio::time::sleep(Duration::from_millis(1)).await;
1659
1660                let config = cur_epoch_store.protocol_config();
1661                let binary_config = to_binary_config(config);
1662                let transaction = ConsensusTransaction::new_capability_notification_v1(
1663                    AuthorityCapabilitiesV1::new(
1664                        self.state.name,
1665                        cur_epoch_store.get_chain(),
1666                        self.config
1667                            .supported_protocol_versions
1668                            .expect("Supported versions should be populated")
1669                            // no need to send digests of versions less than the current version
1670                            .truncate_below(config.version),
1671                        self.state
1672                            .get_available_system_packages(&binary_config)
1673                            .await,
1674                    ),
1675                );
1676                info!(?transaction, "submitting capabilities to consensus");
1677                components
1678                    .consensus_adapter
1679                    .submit(transaction, None, &cur_epoch_store)?;
1680            } else if self.state.is_active_validator(&cur_epoch_store)
1681                && cur_epoch_store
1682                    .protocol_config()
1683                    .track_non_committee_eligible_validators()
1684            {
1685                // Send signed capabilities to committee validators if we are a non-committee
1686                // validator in a separate task to not block the caller. Sending is done only if
1687                // the feature flag supporting it is enabled.
1688                let epoch_store = cur_epoch_store.clone();
1689                let node_clone = self.clone();
1690                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1691                    node_clone
1692                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1693                        .instrument(trace_span!(
1694                            "send_signed_capability_notification_to_committee_with_retry"
1695                        ))
1696                        .await;
1697                }));
1698            }
1699
1700            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1701
1702            if stop_condition == StopReason::RunWithRangeCondition {
1703                IotaNode::shutdown(&self).await;
1704                self.shutdown_channel_tx
1705                    .send(run_with_range)
1706                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1707                return Ok(());
1708            }
1709
1710            // Safe to call because we are in the middle of reconfiguration.
1711            let latest_system_state = self
1712                .state
1713                .get_object_cache_reader()
1714                .try_get_iota_system_state_object_unsafe()
1715                .expect("Read IOTA System State object cannot fail");
1716
1717            #[cfg(msim)]
1718            if !self
1719                .sim_state
1720                .sim_safe_mode_expected
1721                .load(Ordering::Relaxed)
1722            {
1723                debug_assert!(!latest_system_state.safe_mode());
1724            }
1725
1726            #[cfg(not(msim))]
1727            debug_assert!(!latest_system_state.safe_mode());
1728
1729            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1730                if self.state.is_fullnode(&cur_epoch_store) {
1731                    warn!(
1732                        "Failed to send end of epoch notification to subscriber: {:?}",
1733                        err
1734                    );
1735                }
1736            }
1737
1738            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1739            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1740
1741            self.auth_agg.store(Arc::new(
1742                self.auth_agg
1743                    .load()
1744                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1745            ));
1746
1747            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1748            let next_epoch = next_epoch_committee.epoch();
1749            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1750
1751            info!(
1752                next_epoch,
1753                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1754            );
1755
1756            fail_point_async!("reconfig_delay");
1757
1758            // We save the connection monitor status map regardless of validator / fullnode
1759            // status so that we don't need to restart the connection monitor
1760            // every epoch. Update the mappings that will be used by the
1761            // consensus adapter if it exists or is about to be created.
1762            let authority_names_to_peer_ids =
1763                new_epoch_start_state.get_authority_names_to_peer_ids();
1764            self.connection_monitor_status
1765                .update_mapping_for_epoch(authority_names_to_peer_ids);
1766
1767            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1768
1769            send_trusted_peer_change(
1770                &self.config,
1771                &self.trusted_peer_change_tx,
1772                &new_epoch_start_state,
1773            );
1774
1775            let mut validator_components_lock_guard = self.validator_components.lock().await;
1776
1777            // The following code handles 4 different cases, depending on whether the node
1778            // was a validator in the previous epoch, and whether the node is a validator
1779            // in the new epoch.
1780            let new_epoch_store = self
1781                .reconfigure_state(
1782                    &self.state,
1783                    &cur_epoch_store,
1784                    next_epoch_committee.clone(),
1785                    new_epoch_start_state,
1786                    hasher.clone(),
1787                )
1788                .await?;
1789
1790            let new_validator_components = if let Some(ValidatorComponents {
1791                validator_server_handle,
1792                validator_overload_monitor_handle,
1793                consensus_manager,
1794                consensus_store_pruner,
1795                consensus_adapter,
1796                mut checkpoint_service_tasks,
1797                checkpoint_metrics,
1798                iota_tx_validator_metrics,
1799                validator_registry_id,
1800            }) = validator_components_lock_guard.take()
1801            {
1802                info!("Reconfiguring the validator.");
1803                // Cancel the old checkpoint service tasks.
1804                // Waiting for checkpoint builder to finish gracefully is not possible, because
1805                // it may wait on transactions while consensus on peers have
1806                // already shut down.
1807                checkpoint_service_tasks.abort_all();
1808                while let Some(result) = checkpoint_service_tasks.join_next().await {
1809                    if let Err(err) = result {
1810                        if err.is_panic() {
1811                            std::panic::resume_unwind(err.into_panic());
1812                        }
1813                        warn!("Error in checkpoint service task: {:?}", err);
1814                    }
1815                }
1816                info!("Checkpoint service has shut down.");
1817
1818                consensus_manager.shutdown().await;
1819                info!("Consensus has shut down.");
1820
1821                info!("Epoch store finished reconfiguration.");
1822
1823                // No other components should be holding a strong reference to state hasher
1824                // at this point. Confirm here before we swap in the new hasher.
1825                let global_state_hasher_metrics = Arc::into_inner(hasher)
1826                    .expect("Object state hasher should have no other references at this point")
1827                    .metrics();
1828                let new_hasher = Arc::new(GlobalStateHasher::new(
1829                    self.state.get_global_state_hash_store().clone(),
1830                    global_state_hasher_metrics,
1831                ));
1832                let weak_hasher = Arc::downgrade(&new_hasher);
1833                *hasher_guard = Some(new_hasher);
1834
1835                consensus_store_pruner.prune(next_epoch).await;
1836
1837                if self.state.is_committee_validator(&new_epoch_store) {
1838                    // Only restart consensus if this node is still a validator in the new epoch.
1839                    Some(
1840                        Self::start_epoch_specific_validator_components(
1841                            &self.config,
1842                            self.state.clone(),
1843                            consensus_adapter,
1844                            self.checkpoint_store.clone(),
1845                            new_epoch_store.clone(),
1846                            self.state_sync_handle.clone(),
1847                            self.randomness_handle.clone(),
1848                            consensus_manager,
1849                            consensus_store_pruner,
1850                            weak_hasher,
1851                            self.backpressure_manager.clone(),
1852                            validator_server_handle,
1853                            validator_overload_monitor_handle,
1854                            checkpoint_metrics,
1855                            iota_tx_validator_metrics,
1856                            validator_registry_id,
1857                        )
1858                        .await?,
1859                    )
1860                } else {
1861                    info!("This node is no longer a validator after reconfiguration");
1862                    if self.registry_service.remove(validator_registry_id) {
1863                        debug!("Removed validator metrics registry");
1864                    } else {
1865                        warn!("Failed to remove validator metrics registry");
1866                    }
1867                    validator_server_handle.shutdown();
1868                    debug!("Validator grpc server shutdown triggered");
1869
1870                    None
1871                }
1872            } else {
1873                // No other components should be holding a strong reference to state hasher
1874                // at this point. Confirm here before we swap in the new hasher.
1875                let global_state_hasher_metrics = Arc::into_inner(hasher)
1876                    .expect("Object state hasher should have no other references at this point")
1877                    .metrics();
1878                let new_hasher = Arc::new(GlobalStateHasher::new(
1879                    self.state.get_global_state_hash_store().clone(),
1880                    global_state_hasher_metrics,
1881                ));
1882                let weak_hasher = Arc::downgrade(&new_hasher);
1883                *hasher_guard = Some(new_hasher);
1884
1885                if self.state.is_committee_validator(&new_epoch_store) {
1886                    info!("Promoting the node from fullnode to validator, starting grpc server");
1887
1888                    let mut components = Self::construct_validator_components(
1889                        self.config.clone(),
1890                        self.state.clone(),
1891                        Arc::new(next_epoch_committee.clone()),
1892                        new_epoch_store.clone(),
1893                        self.checkpoint_store.clone(),
1894                        self.state_sync_handle.clone(),
1895                        self.randomness_handle.clone(),
1896                        weak_hasher,
1897                        self.backpressure_manager.clone(),
1898                        self.connection_monitor_status.clone(),
1899                        &self.registry_service,
1900                    )
1901                    .await?;
1902
1903                    components.validator_server_handle =
1904                        components.validator_server_handle.start().await;
1905
1906                    Some(components)
1907                } else {
1908                    None
1909                }
1910            };
1911            *validator_components_lock_guard = new_validator_components;
1912
1913            // Force releasing current epoch store DB handle, because the
1914            // Arc<AuthorityPerEpochStore> may linger.
1915            cur_epoch_store.release_db_handles();
1916
1917            // Drop the old epoch store to free its in-memory structures
1918            // (ConsensusOutputCache, ConsensusQuarantine, DashMaps, etc.).
1919            // The DB tables were already released above.
1920            drop(cur_epoch_store);
1921
1922            // Prune old epoch databases after each epoch transition to prevent
1923            // accumulation of RocksDB instances during fast catch-up sync
1924            // (e.g. syncing from genesis).
1925            self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
1926
1927            if cfg!(msim)
1928                && !matches!(
1929                    self.config
1930                        .authority_store_pruning_config
1931                        .num_epochs_to_retain_for_checkpoints(),
1932                    None | Some(u64::MAX) | Some(0)
1933                )
1934            {
1935                self.state
1936                    .prune_checkpoints_for_eligible_epochs_for_testing(
1937                        self.config.clone(),
1938                        iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1939                    )
1940                    .await?;
1941            }
1942
1943            epoch_store = new_epoch_store;
1944            info!("Reconfiguration finished");
1945        }
1946    }
1947
1948    async fn shutdown(&self) {
1949        if let Some(validator_components) = &*self.validator_components.lock().await {
1950            validator_components.consensus_manager.shutdown().await;
1951        }
1952
1953        // Shutdown the gRPC server if it's running
1954        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1955            info!("Shutting down gRPC server");
1956            if let Err(e) = grpc_handle.shutdown().await {
1957                warn!("Failed to gracefully shutdown gRPC server: {e}");
1958            }
1959        }
1960    }
1961
1962    /// Asynchronously reconfigures the state of the authority node for the next
1963    /// epoch.
1964    async fn reconfigure_state(
1965        &self,
1966        state: &Arc<AuthorityState>,
1967        cur_epoch_store: &AuthorityPerEpochStore,
1968        next_epoch_committee: Committee,
1969        next_epoch_start_system_state: EpochStartSystemState,
1970        global_state_hasher: Arc<GlobalStateHasher>,
1971    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1972        let next_epoch = next_epoch_committee.epoch();
1973
1974        let last_checkpoint = self
1975            .checkpoint_store
1976            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1977            .expect("Error loading last checkpoint for current epoch")
1978            .expect("Could not load last checkpoint for current epoch");
1979        let epoch_supply_change = last_checkpoint
1980            .end_of_epoch_data
1981            .as_ref()
1982            .ok_or_else(|| {
1983                IotaError::from("last checkpoint in epoch should contain end of epoch data")
1984            })?
1985            .epoch_supply_change;
1986
1987        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1988
1989        assert_eq!(
1990            Some(last_checkpoint_seq),
1991            self.checkpoint_store
1992                .get_highest_executed_checkpoint_seq_number()
1993                .expect("Error loading highest executed checkpoint sequence number")
1994        );
1995
1996        let epoch_start_configuration = EpochStartConfiguration::new(
1997            next_epoch_start_system_state,
1998            *last_checkpoint.digest(),
1999            state.get_object_store().as_ref(),
2000            EpochFlag::default_flags_for_new_epoch(&state.config),
2001        )
2002        .expect("EpochStartConfiguration construction cannot fail");
2003
2004        let new_epoch_store = self
2005            .state
2006            .reconfigure(
2007                cur_epoch_store,
2008                self.config.supported_protocol_versions.unwrap(),
2009                next_epoch_committee,
2010                epoch_start_configuration,
2011                global_state_hasher,
2012                &self.config.expensive_safety_check_config,
2013                epoch_supply_change,
2014                last_checkpoint_seq,
2015            )
2016            .await
2017            .expect("Reconfigure authority state cannot fail");
2018        info!(next_epoch, "Node State has been reconfigured");
2019        assert_eq!(next_epoch, new_epoch_store.epoch());
2020        self.state.get_reconfig_api().update_epoch_flags_metrics(
2021            cur_epoch_store.epoch_start_config().flags(),
2022            new_epoch_store.epoch_start_config().flags(),
2023        );
2024
2025        Ok(new_epoch_store)
2026    }
2027
2028    pub fn get_config(&self) -> &NodeConfig {
2029        &self.config
2030    }
2031
2032    async fn execute_transaction_immediately_at_zero_epoch(
2033        state: &Arc<AuthorityState>,
2034        epoch_store: &Arc<AuthorityPerEpochStore>,
2035        tx: &Transaction,
2036        span: tracing::Span,
2037    ) {
2038        let _guard = span.enter();
2039        let transaction =
2040            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2041                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2042                    tx.data().clone(),
2043                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2044                ),
2045            );
2046        state
2047            .try_execute_immediately(&transaction, None, epoch_store)
2048            .unwrap();
2049    }
2050
2051    pub fn randomness_handle(&self) -> randomness::Handle {
2052        self.randomness_handle.clone()
2053    }
2054
2055    /// Sends signed capability notification to committee validators for
2056    /// non-committee validators. This method implements retry logic to handle
2057    /// failed attempts to send the notification. It will retry sending the
2058    /// notification with an increasing interval until it receives a successful
2059    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2060    async fn send_signed_capability_notification_to_committee_with_retry(
2061        &self,
2062        epoch_store: &Arc<AuthorityPerEpochStore>,
2063    ) {
2064        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2065        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2066        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2067
2068        // Create the capability notification once
2069        let config = epoch_store.protocol_config();
2070        let binary_config = to_binary_config(config);
2071
2072        // Create the capability notification
2073        let capabilities = AuthorityCapabilitiesV1::new(
2074            self.state.name,
2075            epoch_store.get_chain(),
2076            self.config
2077                .supported_protocol_versions
2078                .expect("Supported versions should be populated")
2079                .truncate_below(config.version),
2080            self.state
2081                .get_available_system_packages(&binary_config)
2082                .await,
2083        );
2084
2085        // Sign the capabilities using the authority key pair from config
2086        let signature = AuthoritySignature::new_secure(
2087            &IntentMessage::new(
2088                Intent::iota_app(IntentScope::AuthorityCapabilities),
2089                &capabilities,
2090            ),
2091            &epoch_store.epoch(),
2092            self.config.authority_key_pair(),
2093        );
2094
2095        let request = HandleCapabilityNotificationRequestV1 {
2096            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2097        };
2098
2099        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2100
2101        loop {
2102            let auth_agg = self.auth_agg.load();
2103            match auth_agg
2104                .send_capability_notification_to_quorum(request.clone())
2105                .await
2106            {
2107                Ok(_) => {
2108                    info!("Successfully sent capability notification to committee");
2109                    break;
2110                }
2111                Err(err) => {
2112                    match &err {
2113                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2114                            errors,
2115                        } => {
2116                            warn!(
2117                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2118                                retry_interval, errors
2119                            );
2120                        }
2121                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2122                            errors,
2123                        } => {
2124                            error!(
2125                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2126                                errors
2127                            );
2128                            break;
2129                        }
2130                    };
2131
2132                    // Wait before retrying
2133                    tokio::time::sleep(retry_interval).await;
2134
2135                    // Increase retry interval for the next attempt, capped at max
2136                    retry_interval = std::cmp::min(
2137                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2138                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2139                    );
2140                }
2141            }
2142        }
2143    }
2144}
2145
2146#[cfg(msim)]
2147impl IotaNode {
2148    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2149        self.sim_state.sim_node.id()
2150    }
2151
2152    pub fn set_safe_mode_expected(&self, new_value: bool) {
2153        info!("Setting safe mode expected to {}", new_value);
2154        self.sim_state
2155            .sim_safe_mode_expected
2156            .store(new_value, Ordering::Relaxed);
2157    }
2158}
2159
2160enum SpawnOnce {
2161    // Mutex is only needed to make SpawnOnce Sync
2162    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2163    #[allow(unused)]
2164    Started(iota_http::ServerHandle),
2165}
2166
2167impl SpawnOnce {
2168    pub fn new(
2169        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2170    ) -> Self {
2171        Self::Unstarted(Mutex::new(Box::pin(future)))
2172    }
2173
2174    pub async fn start(self) -> Self {
2175        match self {
2176            Self::Unstarted(future) => {
2177                let server = future
2178                    .into_inner()
2179                    .await
2180                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2181                let handle = server.handle().clone();
2182                tokio::spawn(async move {
2183                    if let Err(err) = server.serve().await {
2184                        info!("Server stopped: {err}");
2185                    }
2186                    info!("Server stopped");
2187                });
2188                Self::Started(handle)
2189            }
2190            Self::Started(_) => self,
2191        }
2192    }
2193
2194    pub fn shutdown(self) {
2195        if let SpawnOnce::Started(handle) = self {
2196            handle.trigger_shutdown();
2197        }
2198    }
2199}
2200
2201/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2202/// available.
2203fn send_trusted_peer_change(
2204    config: &NodeConfig,
2205    sender: &watch::Sender<TrustedPeerChangeEvent>,
2206    new_epoch_start_state: &EpochStartSystemState,
2207) {
2208    let new_committee =
2209        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2210
2211    sender.send_modify(|event| {
2212        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2213        event.new_committee = new_committee;
2214    })
2215}
2216
2217fn build_kv_store(
2218    state: &Arc<AuthorityState>,
2219    config: &NodeConfig,
2220    registry: &Registry,
2221) -> Result<Arc<TransactionKeyValueStore>> {
2222    let metrics = KeyValueStoreMetrics::new(registry);
2223    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2224
2225    let base_url = &config.transaction_kv_store_read_config.base_url;
2226
2227    if base_url.is_empty() {
2228        info!("no http kv store url provided, using local db only");
2229        return Ok(Arc::new(db_store));
2230    }
2231
2232    base_url.parse::<url::Url>().tap_err(|e| {
2233        error!(
2234            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2235            base_url, e
2236        )
2237    })?;
2238
2239    let http_store = HttpKVStore::new_kv(
2240        base_url,
2241        config.transaction_kv_store_read_config.cache_size,
2242        metrics.clone(),
2243    )?;
2244    info!("using local key-value store with fallback to http key-value store");
2245    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2246        db_store,
2247        http_store,
2248        metrics,
2249        "json_rpc_fallback",
2250    )))
2251}
2252
2253/// Builds and starts the gRPC server for the IOTA node based on the node's
2254/// configuration.
2255///
2256/// This function performs the following tasks:
2257/// 1. Checks if the node is a validator by inspecting the consensus
2258///    configuration; if so, it returns early as validators do not expose gRPC
2259///    APIs.
2260/// 2. Checks if gRPC is enabled in the configuration.
2261/// 3. Creates broadcast channels for checkpoint streaming.
2262/// 4. Initializes the gRPC checkpoint service.
2263/// 5. Spawns the gRPC server to listen for incoming connections.
2264///
2265/// Returns a tuple of optional broadcast channels for checkpoint summary and
2266/// data.
2267async fn build_grpc_server(
2268    config: &NodeConfig,
2269    state: Arc<AuthorityState>,
2270    state_sync_store: RocksDbStore,
2271    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2272    prometheus_registry: &Registry,
2273    server_version: ServerVersion,
2274) -> Result<Option<GrpcServerHandle>> {
2275    // Validators do not expose gRPC APIs
2276    if config.consensus_config().is_some() || !config.enable_grpc_api {
2277        return Ok(None);
2278    }
2279
2280    let Some(grpc_config) = &config.grpc_api_config else {
2281        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2282    };
2283
2284    // Get chain identifier from state directly
2285    let chain_id = state.get_chain_identifier();
2286
2287    let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2288
2289    // Create cancellation token for proper shutdown hierarchy
2290    let shutdown_token = CancellationToken::new();
2291
2292    // Create GrpcReader
2293    let grpc_reader = Arc::new(GrpcReader::new(
2294        grpc_read_store,
2295        Some(server_version.to_string()),
2296    ));
2297
2298    // Create gRPC server metrics
2299    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2300
2301    let handle = start_grpc_server(
2302        grpc_reader,
2303        executor,
2304        grpc_config.clone(),
2305        shutdown_token,
2306        chain_id,
2307        Some(grpc_server_metrics),
2308    )
2309    .await?;
2310
2311    Ok(Some(handle))
2312}
2313
2314/// Builds and starts the HTTP server for the IOTA node, exposing the JSON-RPC
2315/// API based on the node's configuration.
2316///
2317/// This function performs the following tasks:
2318/// 1. Checks if the node is a validator by inspecting the consensus
2319///    configuration; if so, it returns early as validators do not expose these
2320///    APIs.
2321/// 2. Creates an Axum router to handle HTTP requests.
2322/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2323///    on the node's state and configuration, including CoinApi,
2324///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2325///    IndexerApi.
2326/// 4. Binds the server to the specified JSON-RPC address and starts listening
2327///    for incoming connections.
2328pub async fn build_http_server(
2329    state: Arc<AuthorityState>,
2330    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2331    config: &NodeConfig,
2332    prometheus_registry: &Registry,
2333) -> Result<Option<iota_http::ServerHandle>> {
2334    // Validators do not expose these APIs
2335    if config.consensus_config().is_some() {
2336        return Ok(None);
2337    }
2338
2339    let mut router = axum::Router::new();
2340
2341    let json_rpc_router = {
2342        let mut server = JsonRpcServerBuilder::new(
2343            env!("CARGO_PKG_VERSION"),
2344            prometheus_registry,
2345            config.policy_config.clone(),
2346            config.firewall_config.clone(),
2347        );
2348
2349        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2350
2351        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2352        server.register_module(ReadApi::new(
2353            state.clone(),
2354            kv_store.clone(),
2355            metrics.clone(),
2356        ))?;
2357        server.register_module(CoinReadApi::new(
2358            state.clone(),
2359            kv_store.clone(),
2360            metrics.clone(),
2361        )?)?;
2362
2363        // if run_with_range is enabled we want to prevent any transactions
2364        // run_with_range = None is normal operating conditions
2365        if config.run_with_range.is_none() {
2366            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2367        }
2368        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2369
2370        if let Some(transaction_orchestrator) = transaction_orchestrator {
2371            server.register_module(TransactionExecutionApi::new(
2372                state.clone(),
2373                transaction_orchestrator.clone(),
2374                metrics.clone(),
2375            ))?;
2376        }
2377
2378        let iota_names_config = config
2379            .iota_names_config
2380            .clone()
2381            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2382
2383        server.register_module(IndexerApi::new(
2384            state.clone(),
2385            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2386            kv_store,
2387            metrics,
2388            iota_names_config,
2389            config.indexer_max_subscriptions,
2390        ))?;
2391        server.register_module(MoveUtils::new(state.clone()))?;
2392
2393        let server_type = config.jsonrpc_server_type();
2394
2395        server.to_router(server_type).await?
2396    };
2397
2398    router = router.merge(json_rpc_router);
2399
2400    router = router
2401        .route("/health", axum::routing::get(health_check_handler))
2402        .route_layer(axum::Extension(state));
2403
2404    let layers = ServiceBuilder::new()
2405        .map_request(|mut request: axum::http::Request<_>| {
2406            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2407                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2408                request.extensions_mut().insert(axum_connect_info);
2409            }
2410            request
2411        })
2412        .layer(axum::middleware::from_fn(server_timing_middleware));
2413
2414    router = router.layer(layers);
2415
2416    let handle = iota_http::Builder::new()
2417        .serve(&config.json_rpc_address, router)
2418        .map_err(|e| anyhow::anyhow!("{e}"))?;
2419    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2420
2421    Ok(Some(handle))
2422}
2423
2424#[derive(Debug, serde::Serialize, serde::Deserialize)]
2425pub struct Threshold {
2426    pub threshold_seconds: Option<u32>,
2427}
2428
2429async fn health_check_handler(
2430    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2431    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2432) -> impl axum::response::IntoResponse {
2433    if let Some(threshold_seconds) = threshold_seconds {
2434        // Attempt to get the latest checkpoint
2435        let summary = match state
2436            .get_checkpoint_store()
2437            .get_highest_executed_checkpoint()
2438        {
2439            Ok(Some(summary)) => summary,
2440            Ok(None) => {
2441                warn!("Highest executed checkpoint not found");
2442                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2443            }
2444            Err(err) => {
2445                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2446                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2447            }
2448        };
2449
2450        // Calculate the threshold time based on the provided threshold_seconds
2451        let latest_chain_time = summary.timestamp();
2452        let threshold =
2453            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2454
2455        // Check if the latest checkpoint is within the threshold
2456        if latest_chain_time < threshold {
2457            warn!(
2458                ?latest_chain_time,
2459                ?threshold,
2460                "failing health check due to checkpoint lag"
2461            );
2462            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2463        }
2464    }
2465    // if health endpoint is responding and no threshold is given, respond success
2466    (axum::http::StatusCode::OK, "up")
2467}
2468
2469#[cfg(not(test))]
2470fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2471    protocol_config.max_transactions_per_checkpoint() as usize
2472}
2473
2474#[cfg(test)]
2475fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2476    2
2477}