iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::HashMap,
9    fmt,
10    future::Future,
11    path::PathBuf,
12    sync::{Arc, Weak},
13    time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18    callback::CallbackLayer,
19    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28    ConsensusConfig, NodeConfig,
29    node::{DBCheckpointConfig, RunWithRange},
30    node_config_metrics::NodeConfigMetrics,
31    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34    authority::{
35        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36        authority_per_epoch_store::AuthorityPerEpochStore,
37        authority_store_pruner::ObjectsCompactionFilter,
38        authority_store_tables::{
39            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40        },
41        backpressure::BackpressureManager,
42        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43    },
44    authority_aggregator::{
45        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46    },
47    authority_client::NetworkAuthorityClient,
48    authority_server::{ValidatorService, ValidatorServiceMetrics},
49    checkpoints::{
50        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
51        SubmitCheckpointToConsensus,
52        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
53    },
54    connection_monitor::ConnectionMonitor,
55    consensus_adapter::{
56        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
57        ConsensusClient,
58    },
59    consensus_handler::ConsensusHandlerInitializer,
60    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
61    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
62    db_checkpoint_handler::DBCheckpointHandler,
63    epoch::{
64        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
65        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
66        reconfiguration::ReconfigurationInitiator,
67    },
68    execution_cache::build_execution_cache,
69    global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
70    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
71    jsonrpc_index::IndexStore,
72    module_cache_metrics::ResolverMetrics,
73    overload_monitor::overload_monitor,
74    safe_client::SafeClientMetricsBase,
75    signature_verifier::SignatureVerifierMetrics,
76    storage::{GrpcReadStore, RocksDbStore},
77    traffic_controller::metrics::TrafficControllerMetrics,
78    transaction_orchestrator::TransactionOrchestrator,
79    validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
82use iota_json_rpc::{
83    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
84    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
85    transaction_builder_api::TransactionBuilderApi,
86    transaction_execution_api::TransactionExecutionApi,
87};
88use iota_json_rpc_api::JsonRpcMetrics;
89use iota_macros::{fail_point, fail_point_async, replay_log};
90use iota_metrics::{
91    RegistryID, RegistryService,
92    hardware_metrics::register_hardware_metrics,
93    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
94    server_timing_middleware, spawn_monitored_task,
95};
96use iota_names::config::IotaNamesConfig;
97use iota_network::{
98    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
99};
100use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
101use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
102use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
103use iota_snapshot::uploader::StateSnapshotUploader;
104use iota_storage::{
105    FileCompression, StorageFormat,
106    http_key_value_store::HttpKVStore,
107    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
108    key_value_store_metrics::KeyValueStoreMetrics,
109};
110use iota_types::{
111    base_types::{AuthorityName, ConciseableName, EpochId},
112    committee::Committee,
113    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
114    digests::ChainIdentifier,
115    error::{IotaError, IotaResult},
116    executable_transaction::VerifiedExecutableTransaction,
117    execution_config_utils::to_binary_config,
118    full_checkpoint_content::CheckpointData,
119    iota_system_state::{
120        IotaSystemState, IotaSystemStateTrait,
121        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
122    },
123    messages_consensus::{
124        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
125        SignedAuthorityCapabilitiesV1,
126    },
127    messages_grpc::HandleCapabilityNotificationRequestV1,
128    quorum_driver_types::QuorumDriverEffectsQueueResult,
129    supported_protocol_versions::SupportedProtocolVersions,
130    transaction::{Transaction, VerifiedCertificate},
131};
132use prometheus::Registry;
133#[cfg(msim)]
134use simulator::*;
135use tap::tap::TapFallible;
136use tokio::{
137    sync::{Mutex, broadcast, mpsc, watch},
138    task::{JoinHandle, JoinSet},
139};
140use tokio_util::sync::CancellationToken;
141use tower::ServiceBuilder;
142use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
143use typed_store::{
144    DBMetrics,
145    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
146};
147
148use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
149
150pub mod admin;
151mod handle;
152pub mod metrics;
153
154pub struct ValidatorComponents {
155    validator_server_handle: SpawnOnce,
156    validator_overload_monitor_handle: Option<JoinHandle<()>>,
157    consensus_manager: ConsensusManager,
158    consensus_store_pruner: ConsensusStorePruner,
159    consensus_adapter: Arc<ConsensusAdapter>,
160    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
161    checkpoint_service_tasks: JoinSet<()>,
162    checkpoint_metrics: Arc<CheckpointMetrics>,
163    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
164    validator_registry_id: RegistryID,
165}
166
167#[cfg(msim)]
168mod simulator {
169    use std::sync::atomic::AtomicBool;
170
171    use super::*;
172
173    pub(super) struct SimState {
174        pub sim_node: iota_simulator::runtime::NodeHandle,
175        pub sim_safe_mode_expected: AtomicBool,
176        _leak_detector: iota_simulator::NodeLeakDetector,
177    }
178
179    impl Default for SimState {
180        fn default() -> Self {
181            Self {
182                sim_node: iota_simulator::runtime::NodeHandle::current(),
183                sim_safe_mode_expected: AtomicBool::new(false),
184                _leak_detector: iota_simulator::NodeLeakDetector::new(),
185            }
186        }
187    }
188}
189
190#[derive(Clone)]
191pub struct ServerVersion {
192    pub bin: &'static str,
193    pub version: &'static str,
194}
195
196impl ServerVersion {
197    pub fn new(bin: &'static str, version: &'static str) -> Self {
198        Self { bin, version }
199    }
200}
201
202impl std::fmt::Display for ServerVersion {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.write_str(self.bin)?;
205        f.write_str("/")?;
206        f.write_str(self.version)
207    }
208}
209
210pub struct IotaNode {
211    config: NodeConfig,
212    validator_components: Mutex<Option<ValidatorComponents>>,
213    /// The http server responsible for serving JSON-RPC
214    _http_server: Option<iota_http::ServerHandle>,
215    state: Arc<AuthorityState>,
216    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
217    registry_service: RegistryService,
218    metrics: Arc<IotaNodeMetrics>,
219
220    _discovery: discovery::Handle,
221    state_sync_handle: state_sync::Handle,
222    randomness_handle: randomness::Handle,
223    checkpoint_store: Arc<CheckpointStore>,
224    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
225    connection_monitor_status: Arc<ConnectionMonitorStatus>,
226
227    /// Broadcast channel to send the starting system state for the next epoch.
228    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
229
230    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
231    /// peers.
232    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
233
234    backpressure_manager: Arc<BackpressureManager>,
235
236    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
237
238    #[cfg(msim)]
239    sim_state: SimState,
240
241    _state_archive_handle: Option<broadcast::Sender<()>>,
242
243    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
244    // Channel to allow signaling upstream to shutdown iota-node
245    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
246
247    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
248    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
249
250    /// AuthorityAggregator of the network, created at start and beginning of
251    /// each epoch. Use ArcSwap so that we could mutate it without taking
252    /// mut reference.
253    // TODO: Eventually we can make this auth aggregator a shared reference so that this
254    // update will automatically propagate to other uses.
255    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
256}
257
258impl fmt::Debug for IotaNode {
259    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
260        f.debug_struct("IotaNode")
261            .field("name", &self.state.name.concise())
262            .finish()
263    }
264}
265
266impl IotaNode {
267    pub async fn start(
268        config: NodeConfig,
269        registry_service: RegistryService,
270    ) -> Result<Arc<IotaNode>> {
271        Self::start_async(
272            config,
273            registry_service,
274            ServerVersion::new("iota-node", "unknown"),
275        )
276        .await
277    }
278
279    pub async fn start_async(
280        config: NodeConfig,
281        registry_service: RegistryService,
282        server_version: ServerVersion,
283    ) -> Result<Arc<IotaNode>> {
284        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
285        let mut config = config.clone();
286        if config.supported_protocol_versions.is_none() {
287            info!(
288                "populating config.supported_protocol_versions with default {:?}",
289                SupportedProtocolVersions::SYSTEM_DEFAULT
290            );
291            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
292        }
293
294        let run_with_range = config.run_with_range;
295        let is_validator = config.consensus_config().is_some();
296        let is_full_node = !is_validator;
297        let prometheus_registry = registry_service.default_registry();
298
299        info!(node =? config.authority_public_key(),
300            "Initializing iota-node listening on {}", config.network_address
301        );
302
303        let genesis = config.genesis()?.clone();
304
305        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
306        info!("IOTA chain identifier: {chain_identifier}");
307
308        // Check and set the db_corrupted flag
309        let db_corrupted_path = &config.db_path().join("status");
310        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
311            panic!("Failed to check database corruption: {err}");
312        }
313
314        // Initialize metrics to track db usage before creating any stores
315        DBMetrics::init(&prometheus_registry);
316
317        // Initialize IOTA metrics.
318        iota_metrics::init_metrics(&prometheus_registry);
319        // Unsupported (because of the use of static variable) and unnecessary in
320        // simtests.
321        #[cfg(not(msim))]
322        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
323
324        // Register hardware metrics.
325        register_hardware_metrics(&registry_service, &config.db_path)
326            .expect("Failed registering hardware metrics");
327        // Register uptime metric
328        prometheus_registry
329            .register(iota_metrics::uptime_metric(
330                if is_validator {
331                    "validator"
332                } else {
333                    "fullnode"
334                },
335                server_version.version,
336                &chain_identifier.to_string(),
337            ))
338            .expect("Failed registering uptime metric");
339
340        // If genesis come with some migration data then load them into memory from the
341        // file path specified in config.
342        let migration_tx_data = if genesis.contains_migrations() {
343            // Here the load already verifies that the content of the migration blob is
344            // valid in respect to the content found in genesis
345            Some(config.load_migration_tx_data()?)
346        } else {
347            None
348        };
349
350        let secret = Arc::pin(config.authority_key_pair().copy());
351        let genesis_committee = genesis.committee()?;
352        let committee_store = Arc::new(CommitteeStore::new(
353            config.db_path().join("epochs"),
354            &genesis_committee,
355            None,
356        ));
357
358        let mut pruner_db = None;
359        if config
360            .authority_store_pruning_config
361            .enable_compaction_filter
362        {
363            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
364                &config.db_path().join("store"),
365            )));
366        }
367        let compaction_filter = pruner_db
368            .clone()
369            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
370
371        // By default, only enable write stall on validators for perpetual db.
372        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
373        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
374            enable_write_stall,
375            compaction_filter,
376        };
377        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
378            &config.db_path().join("store"),
379            Some(perpetual_tables_options),
380        ));
381        let is_genesis = perpetual_tables
382            .database_is_empty()
383            .expect("Database read should not fail at init.");
384        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
385        let backpressure_manager =
386            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
387
388        let store = AuthorityStore::open(
389            perpetual_tables,
390            &genesis,
391            &config,
392            &prometheus_registry,
393            migration_tx_data.as_ref(),
394        )
395        .await?;
396
397        let cur_epoch = store.get_recovery_epoch_at_restart()?;
398        let committee = committee_store
399            .get_committee(&cur_epoch)?
400            .expect("Committee of the current epoch must exist");
401        let epoch_start_configuration = store
402            .get_epoch_start_configuration()?
403            .expect("EpochStartConfiguration of the current epoch must exist");
404        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
405        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
406
407        let cache_traits = build_execution_cache(
408            &config.execution_cache_config,
409            &epoch_start_configuration,
410            &prometheus_registry,
411            &store,
412            backpressure_manager.clone(),
413        );
414
415        let auth_agg = {
416            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
417            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
418            Arc::new(ArcSwap::new(Arc::new(
419                AuthorityAggregator::new_from_epoch_start_state(
420                    epoch_start_configuration.epoch_start_state(),
421                    &committee_store,
422                    safe_client_metrics_base,
423                    auth_agg_metrics,
424                ),
425            )))
426        };
427
428        let chain = match config.chain_override_for_testing {
429            Some(chain) => chain,
430            None => chain_identifier.chain(),
431        };
432
433        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
434        let epoch_store = AuthorityPerEpochStore::new(
435            config.authority_public_key(),
436            committee.clone(),
437            &config.db_path().join("store"),
438            Some(epoch_options.options),
439            EpochMetrics::new(&registry_service.default_registry()),
440            epoch_start_configuration,
441            cache_traits.backing_package_store.clone(),
442            cache_metrics,
443            signature_verifier_metrics,
444            &config.expensive_safety_check_config,
445            (chain_identifier, chain),
446            checkpoint_store
447                .get_highest_executed_checkpoint_seq_number()
448                .expect("checkpoint store read cannot fail")
449                .unwrap_or(0),
450        )?;
451
452        info!("created epoch store");
453
454        replay_log!(
455            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
456            epoch_store.epoch(),
457            epoch_store.protocol_config()
458        );
459
460        // the database is empty at genesis time
461        if is_genesis {
462            info!("checking IOTA conservation at genesis");
463            // When we are opening the db table, the only time when it's safe to
464            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
465            // an epoch and the IOTA conservation check will fail. This also initialize
466            // the expected_network_iota_amount table.
467            cache_traits
468                .reconfig_api
469                .try_expensive_check_iota_conservation(&epoch_store, None)
470                .expect("IOTA conservation check cannot fail at genesis");
471        }
472
473        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
474        let default_buffer_stake = epoch_store
475            .protocol_config()
476            .buffer_stake_for_protocol_upgrade_bps();
477        if effective_buffer_stake != default_buffer_stake {
478            warn!(
479                ?effective_buffer_stake,
480                ?default_buffer_stake,
481                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
482            );
483        }
484
485        checkpoint_store.insert_genesis_checkpoint(
486            genesis.checkpoint(),
487            genesis.checkpoint_contents().clone(),
488            &epoch_store,
489        );
490
491        // Database has everything from genesis, set corrupted key to 0
492        unmark_db_corruption(db_corrupted_path)?;
493
494        info!("creating state sync store");
495        let state_sync_store = RocksDbStore::new(
496            cache_traits.clone(),
497            committee_store.clone(),
498            checkpoint_store.clone(),
499        );
500
501        let index_store = if is_full_node && config.enable_index_processing {
502            info!("creating index store");
503            Some(Arc::new(IndexStore::new(
504                config.db_path().join("indexes"),
505                &prometheus_registry,
506                epoch_store
507                    .protocol_config()
508                    .max_move_identifier_len_as_option(),
509            )))
510        } else {
511            None
512        };
513
514        let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
515            Some(Arc::new(
516                GrpcIndexesStore::new(
517                    config.db_path().join(GRPC_INDEXES_DIR),
518                    Arc::clone(&store),
519                    &checkpoint_store,
520                )
521                .await,
522            ))
523        } else {
524            None
525        };
526
527        info!("creating archive reader");
528        // Create network
529        // TODO only configure validators as seed/preferred peers for validators and not
530        // for fullnodes once we've had a chance to re-work fullnode
531        // configuration generation.
532        let archive_readers =
533            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
534        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
535        let (randomness_tx, randomness_rx) = mpsc::channel(
536            config
537                .p2p_config
538                .randomness
539                .clone()
540                .unwrap_or_default()
541                .mailbox_capacity(),
542        );
543        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
544            Self::create_p2p_network(
545                &config,
546                state_sync_store.clone(),
547                chain_identifier,
548                trusted_peer_change_rx,
549                archive_readers.clone(),
550                randomness_tx,
551                &prometheus_registry,
552            )?;
553
554        // We must explicitly send this instead of relying on the initial value to
555        // trigger watch value change, so that state-sync is able to process it.
556        send_trusted_peer_change(
557            &config,
558            &trusted_peer_change_tx,
559            epoch_store.epoch_start_state(),
560        );
561
562        info!("start state archival");
563        // Start archiving local state to remote store
564        let state_archive_handle =
565            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
566                .await?;
567
568        info!("start snapshot upload");
569        // Start uploading state snapshot to remote store
570        let state_snapshot_handle =
571            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
572
573        // Start uploading db checkpoints to remote store
574        info!("start db checkpoint");
575        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
576            &config,
577            &prometheus_registry,
578            state_snapshot_handle.is_some(),
579        )?;
580
581        let mut genesis_objects = genesis.objects().to_vec();
582        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
583            genesis_objects.extend(migration_tx_data.get_objects());
584        }
585
586        let authority_name = config.authority_public_key();
587        let validator_tx_finalizer =
588            config
589                .enable_validator_tx_finalizer
590                .then_some(Arc::new(ValidatorTxFinalizer::new(
591                    auth_agg.clone(),
592                    authority_name,
593                    &prometheus_registry,
594                )));
595
596        info!("create authority state");
597        let state = AuthorityState::new(
598            authority_name,
599            secret,
600            config.supported_protocol_versions.unwrap(),
601            store.clone(),
602            cache_traits.clone(),
603            epoch_store.clone(),
604            committee_store.clone(),
605            index_store.clone(),
606            grpc_indexes_store,
607            checkpoint_store.clone(),
608            &prometheus_registry,
609            &genesis_objects,
610            &db_checkpoint_config,
611            config.clone(),
612            archive_readers,
613            validator_tx_finalizer,
614            chain_identifier,
615            pruner_db,
616        )
617        .await;
618
619        // ensure genesis and migration txs were executed
620        if epoch_store.epoch() == 0 {
621            let genesis_tx = &genesis.transaction();
622            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
623            // Execute genesis transaction
624            Self::execute_transaction_immediately_at_zero_epoch(
625                &state,
626                &epoch_store,
627                genesis_tx,
628                span,
629            )
630            .await;
631
632            // Execute migration transactions if present
633            if let Some(migration_tx_data) = migration_tx_data {
634                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
635                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
636                    Self::execute_transaction_immediately_at_zero_epoch(
637                        &state,
638                        &epoch_store,
639                        tx,
640                        span,
641                    )
642                    .await;
643                }
644            }
645        }
646
647        // Start the loop that receives new randomness and generates transactions for
648        // it.
649        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
650
651        if config
652            .expensive_safety_check_config
653            .enable_secondary_index_checks()
654        {
655            if let Some(indexes) = state.indexes.clone() {
656                iota_core::verify_indexes::verify_indexes(
657                    state.get_global_state_hash_store().as_ref(),
658                    indexes,
659                )
660                .expect("secondary indexes are inconsistent");
661            }
662        }
663
664        let (end_of_epoch_channel, end_of_epoch_receiver) =
665            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
666
667        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
668            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
669                auth_agg.load_full(),
670                state.clone(),
671                end_of_epoch_receiver,
672                &config.db_path(),
673                &prometheus_registry,
674            )))
675        } else {
676            None
677        };
678
679        let http_server = build_http_server(
680            state.clone(),
681            &transaction_orchestrator.clone(),
682            &config,
683            &prometheus_registry,
684        )
685        .await?;
686
687        let global_state_hasher = Arc::new(GlobalStateHasher::new(
688            cache_traits.global_state_hash_store.clone(),
689            GlobalStateHashMetrics::new(&prometheus_registry),
690        ));
691
692        let authority_names_to_peer_ids = epoch_store
693            .epoch_start_state()
694            .get_authority_names_to_peer_ids();
695
696        let network_connection_metrics =
697            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
698
699        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
700
701        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
702            p2p_network.downgrade(),
703            network_connection_metrics,
704            HashMap::new(),
705            None,
706        );
707
708        let connection_monitor_status = ConnectionMonitorStatus {
709            connection_statuses,
710            authority_names_to_peer_ids,
711        };
712
713        let connection_monitor_status = Arc::new(connection_monitor_status);
714        let iota_node_metrics =
715            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
716
717        iota_node_metrics
718            .binary_max_protocol_version
719            .set(ProtocolVersion::MAX.as_u64() as i64);
720        iota_node_metrics
721            .configured_max_protocol_version
722            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
723
724        // Convert transaction orchestrator to executor trait object for gRPC server
725        // Note that the transaction_orchestrator (so as executor) will be None if it is
726        // a validator node or run_with_range is set
727        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
728            transaction_orchestrator
729                .clone()
730                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
731
732        let grpc_server_handle = build_grpc_server(
733            &config,
734            state.clone(),
735            state_sync_store.clone(),
736            executor,
737            &prometheus_registry,
738            server_version,
739        )
740        .await?;
741
742        let validator_components = if state.is_committee_validator(&epoch_store) {
743            let (components, _) = futures::join!(
744                Self::construct_validator_components(
745                    config.clone(),
746                    state.clone(),
747                    committee,
748                    epoch_store.clone(),
749                    checkpoint_store.clone(),
750                    state_sync_handle.clone(),
751                    randomness_handle.clone(),
752                    Arc::downgrade(&global_state_hasher),
753                    backpressure_manager.clone(),
754                    connection_monitor_status.clone(),
755                    &registry_service,
756                ),
757                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
758            );
759            let mut components = components?;
760
761            components.consensus_adapter.submit_recovered(&epoch_store);
762
763            // Start the gRPC server
764            components.validator_server_handle = components.validator_server_handle.start().await;
765
766            Some(components)
767        } else {
768            None
769        };
770
771        // setup shutdown channel
772        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
773
774        let node = Self {
775            config,
776            validator_components: Mutex::new(validator_components),
777            _http_server: http_server,
778            state,
779            transaction_orchestrator,
780            registry_service,
781            metrics: iota_node_metrics,
782
783            _discovery: discovery_handle,
784            state_sync_handle,
785            randomness_handle,
786            checkpoint_store,
787            global_state_hasher: Mutex::new(Some(global_state_hasher)),
788            end_of_epoch_channel,
789            connection_monitor_status,
790            trusted_peer_change_tx,
791            backpressure_manager,
792
793            _db_checkpoint_handle: db_checkpoint_handle,
794
795            #[cfg(msim)]
796            sim_state: Default::default(),
797
798            _state_archive_handle: state_archive_handle,
799            _state_snapshot_uploader_handle: state_snapshot_handle,
800            shutdown_channel_tx: shutdown_channel,
801
802            grpc_server_handle: Mutex::new(grpc_server_handle),
803
804            auth_agg,
805        };
806
807        info!("IotaNode started!");
808        let node = Arc::new(node);
809        let node_copy = node.clone();
810        spawn_monitored_task!(async move {
811            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
812            if let Err(error) = result {
813                warn!("Reconfiguration finished with error {:?}", error);
814            }
815        });
816
817        Ok(node)
818    }
819
820    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
821        self.end_of_epoch_channel.subscribe()
822    }
823
824    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
825        self.shutdown_channel_tx.subscribe()
826    }
827
828    pub fn current_epoch_for_testing(&self) -> EpochId {
829        self.state.current_epoch_for_testing()
830    }
831
832    pub fn db_checkpoint_path(&self) -> PathBuf {
833        self.config.db_checkpoint_path()
834    }
835
836    // Init reconfig process by starting to reject user certs
837    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
838        info!("close_epoch (current epoch = {})", epoch_store.epoch());
839        self.validator_components
840            .lock()
841            .await
842            .as_ref()
843            .ok_or_else(|| IotaError::from("Node is not a validator"))?
844            .consensus_adapter
845            .close_epoch(epoch_store);
846        Ok(())
847    }
848
849    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
850        self.state
851            .clear_override_protocol_upgrade_buffer_stake(epoch)
852    }
853
854    pub fn set_override_protocol_upgrade_buffer_stake(
855        &self,
856        epoch: EpochId,
857        buffer_stake_bps: u64,
858    ) -> IotaResult {
859        self.state
860            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
861    }
862
863    // Testing-only API to start epoch close process.
864    // For production code, please use the non-testing version.
865    pub async fn close_epoch_for_testing(&self) -> IotaResult {
866        let epoch_store = self.state.epoch_store_for_testing();
867        self.close_epoch(&epoch_store).await
868    }
869
870    async fn start_state_archival(
871        config: &NodeConfig,
872        prometheus_registry: &Registry,
873        state_sync_store: RocksDbStore,
874    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
875        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
876            let local_store_config = ObjectStoreConfig {
877                object_store: Some(ObjectStoreType::File),
878                directory: Some(config.archive_path()),
879                ..Default::default()
880            };
881            let archive_writer = ArchiveWriter::new(
882                local_store_config,
883                remote_store_config.clone(),
884                FileCompression::Zstd,
885                StorageFormat::Blob,
886                Duration::from_secs(600),
887                256 * 1024 * 1024,
888                prometheus_registry,
889            )
890            .await?;
891            Ok(Some(archive_writer.start(state_sync_store).await?))
892        } else {
893            Ok(None)
894        }
895    }
896
897    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
898    /// is set.
899    fn start_state_snapshot(
900        config: &NodeConfig,
901        prometheus_registry: &Registry,
902        checkpoint_store: Arc<CheckpointStore>,
903    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
904        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
905            let snapshot_uploader = StateSnapshotUploader::new(
906                &config.db_checkpoint_path(),
907                &config.snapshot_path(),
908                remote_store_config.clone(),
909                60,
910                prometheus_registry,
911                checkpoint_store,
912            )?;
913            Ok(Some(snapshot_uploader.start()))
914        } else {
915            Ok(None)
916        }
917    }
918
919    fn start_db_checkpoint(
920        config: &NodeConfig,
921        prometheus_registry: &Registry,
922        state_snapshot_enabled: bool,
923    ) -> Result<(
924        DBCheckpointConfig,
925        Option<tokio::sync::broadcast::Sender<()>>,
926    )> {
927        let checkpoint_path = Some(
928            config
929                .db_checkpoint_config
930                .checkpoint_path
931                .clone()
932                .unwrap_or_else(|| config.db_checkpoint_path()),
933        );
934        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
935            DBCheckpointConfig {
936                checkpoint_path,
937                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
938                    true
939                } else {
940                    config
941                        .db_checkpoint_config
942                        .perform_db_checkpoints_at_epoch_end
943                },
944                ..config.db_checkpoint_config.clone()
945            }
946        } else {
947            config.db_checkpoint_config.clone()
948        };
949
950        match (
951            db_checkpoint_config.object_store_config.as_ref(),
952            state_snapshot_enabled,
953        ) {
954            // If db checkpoint config object store not specified but
955            // state snapshot object store is specified, create handler
956            // anyway for marking db checkpoints as completed so that they
957            // can be uploaded as state snapshots.
958            (None, false) => Ok((db_checkpoint_config, None)),
959            (_, _) => {
960                let handler = DBCheckpointHandler::new(
961                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
962                    db_checkpoint_config.object_store_config.as_ref(),
963                    60,
964                    db_checkpoint_config
965                        .prune_and_compact_before_upload
966                        .unwrap_or(true),
967                    config.authority_store_pruning_config.clone(),
968                    prometheus_registry,
969                    state_snapshot_enabled,
970                )?;
971                Ok((
972                    db_checkpoint_config,
973                    Some(DBCheckpointHandler::start(handler)),
974                ))
975            }
976        }
977    }
978
979    fn create_p2p_network(
980        config: &NodeConfig,
981        state_sync_store: RocksDbStore,
982        chain_identifier: ChainIdentifier,
983        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
984        archive_readers: ArchiveReaderBalancer,
985        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
986        prometheus_registry: &Registry,
987    ) -> Result<(
988        Network,
989        discovery::Handle,
990        state_sync::Handle,
991        randomness::Handle,
992    )> {
993        let (state_sync, state_sync_server) = state_sync::Builder::new()
994            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
995            .store(state_sync_store)
996            .archive_readers(archive_readers)
997            .with_metrics(prometheus_registry)
998            .build();
999
1000        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1001            .config(config.p2p_config.clone())
1002            .build();
1003
1004        let (randomness, randomness_router) =
1005            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1006                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1007                .with_metrics(prometheus_registry)
1008                .build();
1009
1010        let p2p_network = {
1011            let routes = anemo::Router::new()
1012                .add_rpc_service(discovery_server)
1013                .add_rpc_service(state_sync_server);
1014            let routes = routes.merge(randomness_router);
1015
1016            let inbound_network_metrics =
1017                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1018            let outbound_network_metrics =
1019                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1020
1021            let service = ServiceBuilder::new()
1022                .layer(
1023                    TraceLayer::new_for_server_errors()
1024                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1025                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1026                )
1027                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1028                    Arc::new(inbound_network_metrics),
1029                    config.p2p_config.excessive_message_size(),
1030                )))
1031                .service(routes);
1032
1033            let outbound_layer = ServiceBuilder::new()
1034                .layer(
1035                    TraceLayer::new_for_client_and_server_errors()
1036                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1037                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1038                )
1039                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1040                    Arc::new(outbound_network_metrics),
1041                    config.p2p_config.excessive_message_size(),
1042                )))
1043                .into_inner();
1044
1045            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1046            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1047            // many staking events in the epoch change txn.
1048            anemo_config.max_frame_size = Some(1 << 30);
1049
1050            // Set a higher default value for socket send/receive buffers if not already
1051            // configured.
1052            let mut quic_config = anemo_config.quic.unwrap_or_default();
1053            if quic_config.socket_send_buffer_size.is_none() {
1054                quic_config.socket_send_buffer_size = Some(20 << 20);
1055            }
1056            if quic_config.socket_receive_buffer_size.is_none() {
1057                quic_config.socket_receive_buffer_size = Some(20 << 20);
1058            }
1059            quic_config.allow_failed_socket_buffer_size_setting = true;
1060
1061            // Set high-performance defaults for quinn transport.
1062            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1063            if quic_config.max_concurrent_bidi_streams.is_none() {
1064                quic_config.max_concurrent_bidi_streams = Some(500);
1065            }
1066            if quic_config.max_concurrent_uni_streams.is_none() {
1067                quic_config.max_concurrent_uni_streams = Some(500);
1068            }
1069            if quic_config.stream_receive_window.is_none() {
1070                quic_config.stream_receive_window = Some(100 << 20);
1071            }
1072            if quic_config.receive_window.is_none() {
1073                quic_config.receive_window = Some(200 << 20);
1074            }
1075            if quic_config.send_window.is_none() {
1076                quic_config.send_window = Some(200 << 20);
1077            }
1078            if quic_config.crypto_buffer_size.is_none() {
1079                quic_config.crypto_buffer_size = Some(1 << 20);
1080            }
1081            if quic_config.max_idle_timeout_ms.is_none() {
1082                quic_config.max_idle_timeout_ms = Some(30_000);
1083            }
1084            if quic_config.keep_alive_interval_ms.is_none() {
1085                quic_config.keep_alive_interval_ms = Some(5_000);
1086            }
1087            anemo_config.quic = Some(quic_config);
1088
1089            let server_name = format!("iota-{chain_identifier}");
1090            let network = Network::bind(config.p2p_config.listen_address)
1091                .server_name(&server_name)
1092                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1093                .config(anemo_config)
1094                .outbound_request_layer(outbound_layer)
1095                .start(service)?;
1096            info!(
1097                server_name = server_name,
1098                "P2p network started on {}",
1099                network.local_addr()
1100            );
1101
1102            network
1103        };
1104
1105        let discovery_handle =
1106            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1107        let state_sync_handle = state_sync.start(p2p_network.clone());
1108        let randomness_handle = randomness.start(p2p_network.clone());
1109
1110        Ok((
1111            p2p_network,
1112            discovery_handle,
1113            state_sync_handle,
1114            randomness_handle,
1115        ))
1116    }
1117
1118    /// Asynchronously constructs and initializes the components necessary for
1119    /// the validator node.
1120    async fn construct_validator_components(
1121        config: NodeConfig,
1122        state: Arc<AuthorityState>,
1123        committee: Arc<Committee>,
1124        epoch_store: Arc<AuthorityPerEpochStore>,
1125        checkpoint_store: Arc<CheckpointStore>,
1126        state_sync_handle: state_sync::Handle,
1127        randomness_handle: randomness::Handle,
1128        global_state_hasher: Weak<GlobalStateHasher>,
1129        backpressure_manager: Arc<BackpressureManager>,
1130        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1131        registry_service: &RegistryService,
1132    ) -> Result<ValidatorComponents> {
1133        let mut config_clone = config.clone();
1134        let consensus_config = config_clone
1135            .consensus_config
1136            .as_mut()
1137            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1138        let validator_registry = Registry::new();
1139        let validator_registry_id = registry_service.add(validator_registry.clone());
1140
1141        let client = Arc::new(UpdatableConsensusClient::new());
1142        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1143            &committee,
1144            consensus_config,
1145            state.name,
1146            connection_monitor_status.clone(),
1147            &validator_registry,
1148            client.clone(),
1149            checkpoint_store.clone(),
1150        ));
1151        let consensus_manager = ConsensusManager::new(
1152            &config,
1153            consensus_config,
1154            registry_service,
1155            &validator_registry,
1156            client,
1157        );
1158
1159        // This only gets started up once, not on every epoch. (Make call to remove
1160        // every epoch.)
1161        let consensus_store_pruner = ConsensusStorePruner::new(
1162            consensus_manager.get_storage_base_path(),
1163            consensus_config.db_retention_epochs(),
1164            consensus_config.db_pruner_period(),
1165            &validator_registry,
1166        );
1167
1168        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1169        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1170
1171        let validator_server_handle = Self::start_grpc_validator_service(
1172            &config,
1173            state.clone(),
1174            consensus_adapter.clone(),
1175            &validator_registry,
1176        )
1177        .await?;
1178
1179        // Starts an overload monitor that monitors the execution of the authority.
1180        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1181        let validator_overload_monitor_handle = if config
1182            .authority_overload_config
1183            .max_load_shedding_percentage
1184            > 0
1185        {
1186            let authority_state = Arc::downgrade(&state);
1187            let overload_config = config.authority_overload_config.clone();
1188            fail_point!("starting_overload_monitor");
1189            Some(spawn_monitored_task!(overload_monitor(
1190                authority_state,
1191                overload_config,
1192            )))
1193        } else {
1194            None
1195        };
1196
1197        Self::start_epoch_specific_validator_components(
1198            &config,
1199            state.clone(),
1200            consensus_adapter,
1201            checkpoint_store,
1202            epoch_store,
1203            state_sync_handle,
1204            randomness_handle,
1205            consensus_manager,
1206            consensus_store_pruner,
1207            global_state_hasher,
1208            backpressure_manager,
1209            validator_server_handle,
1210            validator_overload_monitor_handle,
1211            checkpoint_metrics,
1212            iota_tx_validator_metrics,
1213            validator_registry_id,
1214        )
1215        .await
1216    }
1217
1218    /// Initializes and starts components specific to the current
1219    /// epoch for the validator node.
1220    async fn start_epoch_specific_validator_components(
1221        config: &NodeConfig,
1222        state: Arc<AuthorityState>,
1223        consensus_adapter: Arc<ConsensusAdapter>,
1224        checkpoint_store: Arc<CheckpointStore>,
1225        epoch_store: Arc<AuthorityPerEpochStore>,
1226        state_sync_handle: state_sync::Handle,
1227        randomness_handle: randomness::Handle,
1228        consensus_manager: ConsensusManager,
1229        consensus_store_pruner: ConsensusStorePruner,
1230        global_state_hasher: Weak<GlobalStateHasher>,
1231        backpressure_manager: Arc<BackpressureManager>,
1232        validator_server_handle: SpawnOnce,
1233        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1234        checkpoint_metrics: Arc<CheckpointMetrics>,
1235        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1236        validator_registry_id: RegistryID,
1237    ) -> Result<ValidatorComponents> {
1238        let checkpoint_service = Self::build_checkpoint_service(
1239            config,
1240            consensus_adapter.clone(),
1241            checkpoint_store.clone(),
1242            epoch_store.clone(),
1243            state.clone(),
1244            state_sync_handle,
1245            global_state_hasher,
1246            checkpoint_metrics.clone(),
1247        );
1248
1249        // create a new map that gets injected into both the consensus handler and the
1250        // consensus adapter the consensus handler will write values forwarded
1251        // from consensus, and the consensus adapter will read the values to
1252        // make decisions about which validator submits a transaction to consensus
1253        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1254
1255        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1256
1257        let randomness_manager = RandomnessManager::try_new(
1258            Arc::downgrade(&epoch_store),
1259            Box::new(consensus_adapter.clone()),
1260            randomness_handle,
1261            config.authority_key_pair(),
1262        )
1263        .await;
1264        if let Some(randomness_manager) = randomness_manager {
1265            epoch_store
1266                .set_randomness_manager(randomness_manager)
1267                .await?;
1268        }
1269
1270        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1271            state.clone(),
1272            checkpoint_service.clone(),
1273            epoch_store.clone(),
1274            low_scoring_authorities,
1275            backpressure_manager,
1276        );
1277
1278        info!("Starting consensus manager");
1279
1280        consensus_manager
1281            .start(
1282                config,
1283                epoch_store.clone(),
1284                consensus_handler_initializer,
1285                IotaTxValidator::new(
1286                    epoch_store.clone(),
1287                    checkpoint_service.clone(),
1288                    state.transaction_manager().clone(),
1289                    iota_tx_validator_metrics.clone(),
1290                ),
1291            )
1292            .await;
1293
1294        info!("Spawning checkpoint service");
1295        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1296
1297        Ok(ValidatorComponents {
1298            validator_server_handle,
1299            validator_overload_monitor_handle,
1300            consensus_manager,
1301            consensus_store_pruner,
1302            consensus_adapter,
1303            checkpoint_service_tasks,
1304            checkpoint_metrics,
1305            iota_tx_validator_metrics,
1306            validator_registry_id,
1307        })
1308    }
1309
1310    /// Starts the checkpoint service for the validator node, initializing
1311    /// necessary components and settings.
1312    /// The function ensures proper initialization of the checkpoint service,
1313    /// preparing it to handle checkpoint creation and submission to consensus,
1314    /// while also setting up the necessary monitoring and synchronization
1315    /// mechanisms.
1316    fn build_checkpoint_service(
1317        config: &NodeConfig,
1318        consensus_adapter: Arc<ConsensusAdapter>,
1319        checkpoint_store: Arc<CheckpointStore>,
1320        epoch_store: Arc<AuthorityPerEpochStore>,
1321        state: Arc<AuthorityState>,
1322        state_sync_handle: state_sync::Handle,
1323        global_state_hasher: Weak<GlobalStateHasher>,
1324        checkpoint_metrics: Arc<CheckpointMetrics>,
1325    ) -> Arc<CheckpointService> {
1326        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1327        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1328
1329        debug!(
1330            "Starting checkpoint service with epoch start timestamp {}
1331            and epoch duration {}",
1332            epoch_start_timestamp_ms, epoch_duration_ms
1333        );
1334
1335        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1336            sender: consensus_adapter,
1337            signer: state.secret.clone(),
1338            authority: config.authority_public_key(),
1339            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1340                .checked_add(epoch_duration_ms)
1341                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1342            metrics: checkpoint_metrics.clone(),
1343        });
1344
1345        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1346        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1347        let max_checkpoint_size_bytes =
1348            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1349
1350        CheckpointService::build(
1351            state.clone(),
1352            checkpoint_store,
1353            epoch_store,
1354            state.get_transaction_cache_reader().clone(),
1355            global_state_hasher,
1356            checkpoint_output,
1357            Box::new(certified_checkpoint_output),
1358            checkpoint_metrics,
1359            max_tx_per_checkpoint,
1360            max_checkpoint_size_bytes,
1361        )
1362    }
1363
1364    fn construct_consensus_adapter(
1365        committee: &Committee,
1366        consensus_config: &ConsensusConfig,
1367        authority: AuthorityName,
1368        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1369        prometheus_registry: &Registry,
1370        consensus_client: Arc<dyn ConsensusClient>,
1371        checkpoint_store: Arc<CheckpointStore>,
1372    ) -> ConsensusAdapter {
1373        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1374        // The consensus adapter allows the authority to send user certificates through
1375        // consensus.
1376
1377        ConsensusAdapter::new(
1378            consensus_client,
1379            checkpoint_store,
1380            authority,
1381            connection_monitor_status,
1382            consensus_config.max_pending_transactions(),
1383            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1384            consensus_config.max_submit_position,
1385            consensus_config.submit_delay_step_override(),
1386            ca_metrics,
1387        )
1388    }
1389
1390    async fn start_grpc_validator_service(
1391        config: &NodeConfig,
1392        state: Arc<AuthorityState>,
1393        consensus_adapter: Arc<ConsensusAdapter>,
1394        prometheus_registry: &Registry,
1395    ) -> Result<SpawnOnce> {
1396        let validator_service = ValidatorService::new(
1397            state,
1398            consensus_adapter,
1399            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1400            TrafficControllerMetrics::new(prometheus_registry),
1401            config.policy_config.clone(),
1402            config.firewall_config.clone(),
1403        );
1404
1405        let mut server_conf = iota_network_stack::config::Config::new();
1406        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1407        server_conf.load_shed = config.grpc_load_shed;
1408        let server_builder =
1409            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1410                .add_service(ValidatorServer::new(validator_service));
1411
1412        let tls_config = iota_tls::create_rustls_server_config(
1413            config.network_key_pair().copy().private(),
1414            IOTA_TLS_SERVER_NAME.to_string(),
1415        );
1416
1417        let network_address = config.network_address().clone();
1418
1419        let bind_future = async move {
1420            let server = server_builder
1421                .bind(&network_address, Some(tls_config))
1422                .await
1423                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1424
1425            let local_addr = server.local_addr();
1426            info!("Listening to traffic on {local_addr}");
1427
1428            Ok(server)
1429        };
1430
1431        Ok(SpawnOnce::new(bind_future))
1432    }
1433
1434    /// Re-executes pending consensus certificates, which may not have been
1435    /// committed to disk before the node restarted. This is necessary for
1436    /// the following reasons:
1437    ///
1438    /// 1. For any transaction for which we returned signed effects to a client,
1439    ///    we must ensure that we have re-executed the transaction before we
1440    ///    begin accepting grpc requests. Otherwise we would appear to have
1441    ///    forgotten about the transaction.
1442    /// 2. While this is running, we are concurrently waiting for all previously
1443    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1444    ///    either direction (from checkpointed consensus transactions to pending
1445    ///    consensus transactions, or vice versa), we must re-execute pending
1446    ///    consensus transactions to ensure that both processes can complete.
1447    /// 3. Also note that for any pending consensus transactions for which we
1448    ///    wrote a signed effects digest to disk, we must re-execute using that
1449    ///    digest as the expected effects digest, to ensure that we cannot
1450    ///    arrive at different effects than what we previously signed.
1451    async fn reexecute_pending_consensus_certs(
1452        epoch_store: &Arc<AuthorityPerEpochStore>,
1453        state: &Arc<AuthorityState>,
1454    ) {
1455        let mut pending_consensus_certificates = Vec::new();
1456        let mut additional_certs = Vec::new();
1457
1458        for tx in epoch_store.get_all_pending_consensus_transactions() {
1459            match tx.kind {
1460                // Shared object txns cannot be re-executed at this point, because we must wait for
1461                // consensus replay to assign shared object versions.
1462                ConsensusTransactionKind::CertifiedTransaction(tx)
1463                    if !tx.contains_shared_object() =>
1464                {
1465                    let tx = *tx;
1466                    // new_unchecked is safe because we never submit a transaction to consensus
1467                    // without verifying it
1468                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1469                        VerifiedCertificate::new_unchecked(tx),
1470                    );
1471                    // we only need to re-execute if we previously signed the effects (which
1472                    // indicates we returned the effects to a client).
1473                    if let Some(fx_digest) = epoch_store
1474                        .get_signed_effects_digest(tx.digest())
1475                        .expect("db error")
1476                    {
1477                        pending_consensus_certificates.push((tx, fx_digest));
1478                    } else {
1479                        additional_certs.push(tx);
1480                    }
1481                }
1482                _ => (),
1483            }
1484        }
1485
1486        let digests = pending_consensus_certificates
1487            .iter()
1488            .map(|(tx, _)| *tx.digest())
1489            .collect::<Vec<_>>();
1490
1491        info!(
1492            "reexecuting {} pending consensus certificates: {:?}",
1493            digests.len(),
1494            digests
1495        );
1496
1497        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1498        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1499
1500        // If this times out, the validator will still almost certainly start up fine.
1501        // But, it is possible that it may temporarily "forget" about
1502        // transactions that it had previously executed. This could confuse
1503        // clients in some circumstances. However, the transactions are still in
1504        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1505        let timeout = if cfg!(msim) { 120 } else { 60 };
1506        if tokio::time::timeout(
1507            std::time::Duration::from_secs(timeout),
1508            state
1509                .get_transaction_cache_reader()
1510                .try_notify_read_executed_effects_digests(&digests),
1511        )
1512        .await
1513        .is_err()
1514        {
1515            // Log all the digests that were not executed to help debugging.
1516            if let Ok(executed_effects_digests) = state
1517                .get_transaction_cache_reader()
1518                .try_multi_get_executed_effects_digests(&digests)
1519            {
1520                let pending_digests = digests
1521                    .iter()
1522                    .zip(executed_effects_digests.iter())
1523                    .filter_map(|(digest, executed_effects_digest)| {
1524                        if executed_effects_digest.is_none() {
1525                            Some(digest)
1526                        } else {
1527                            None
1528                        }
1529                    })
1530                    .collect::<Vec<_>>();
1531                debug_fatal!(
1532                    "Timed out waiting for effects digests to be executed: {:?}",
1533                    pending_digests
1534                );
1535            } else {
1536                debug_fatal!(
1537                    "Timed out waiting for effects digests to be executed, digests not found"
1538                );
1539            }
1540        }
1541    }
1542
1543    pub fn state(&self) -> Arc<AuthorityState> {
1544        self.state.clone()
1545    }
1546
1547    // Only used for testing because of how epoch store is loaded.
1548    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1549        self.state.reference_gas_price_for_testing()
1550    }
1551
1552    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1553        self.state.committee_store().clone()
1554    }
1555
1556    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1557    // self.state.db()
1558    // }
1559
1560    /// Clone an AuthorityAggregator currently used in this node's
1561    /// QuorumDriver, if the node is a fullnode. After reconfig,
1562    /// QuorumDriver builds a new AuthorityAggregator. The caller
1563    /// of this function will mostly likely want to call this again
1564    /// to get a fresh one.
1565    pub fn clone_authority_aggregator(
1566        &self,
1567    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1568        self.transaction_orchestrator
1569            .as_ref()
1570            .map(|to| to.clone_authority_aggregator())
1571    }
1572
1573    pub fn transaction_orchestrator(
1574        &self,
1575    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1576        self.transaction_orchestrator.clone()
1577    }
1578
1579    pub fn subscribe_to_transaction_orchestrator_effects(
1580        &self,
1581    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1582        self.transaction_orchestrator
1583            .as_ref()
1584            .map(|to| to.subscribe_to_effects_queue())
1585            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1586    }
1587
1588    /// This function awaits the completion of checkpoint execution of the
1589    /// current epoch, after which it initiates reconfiguration of the
1590    /// entire system. This function also handles role changes for the node when
1591    /// epoch changes and advertises capabilities to the committee if the node
1592    /// is a validator.
1593    pub async fn monitor_reconfiguration(
1594        self: Arc<Self>,
1595        mut epoch_store: Arc<AuthorityPerEpochStore>,
1596    ) -> Result<()> {
1597        let checkpoint_executor_metrics =
1598            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1599
1600        loop {
1601            let mut hasher_guard = self.global_state_hasher.lock().await;
1602            let hasher = hasher_guard.take().unwrap();
1603            info!(
1604                "Creating checkpoint executor for epoch {}",
1605                epoch_store.epoch()
1606            );
1607
1608            // Create closures that handle gRPC type conversion
1609            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1610                guard.as_ref().map(|handle| {
1611                    let tx = handle.checkpoint_data_broadcaster().clone();
1612                    Box::new(move |data: &CheckpointData| {
1613                        tx.send_traced(data);
1614                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1615                })
1616            } else {
1617                None
1618            };
1619
1620            let checkpoint_executor = CheckpointExecutor::new(
1621                epoch_store.clone(),
1622                self.checkpoint_store.clone(),
1623                self.state.clone(),
1624                hasher.clone(),
1625                self.backpressure_manager.clone(),
1626                self.config.checkpoint_executor_config.clone(),
1627                checkpoint_executor_metrics.clone(),
1628                data_sender,
1629            );
1630
1631            let run_with_range = self.config.run_with_range;
1632
1633            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1634
1635            // Update the current protocol version metric.
1636            self.metrics
1637                .current_protocol_version
1638                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1639
1640            // Advertise capabilities to committee, if we are a validator.
1641            if let Some(components) = &*self.validator_components.lock().await {
1642                // TODO: without this sleep, the consensus message is not delivered reliably.
1643                tokio::time::sleep(Duration::from_millis(1)).await;
1644
1645                let config = cur_epoch_store.protocol_config();
1646                let binary_config = to_binary_config(config);
1647                let transaction = ConsensusTransaction::new_capability_notification_v1(
1648                    AuthorityCapabilitiesV1::new(
1649                        self.state.name,
1650                        cur_epoch_store.get_chain_identifier().chain(),
1651                        self.config
1652                            .supported_protocol_versions
1653                            .expect("Supported versions should be populated")
1654                            // no need to send digests of versions less than the current version
1655                            .truncate_below(config.version),
1656                        self.state
1657                            .get_available_system_packages(&binary_config)
1658                            .await,
1659                    ),
1660                );
1661                info!(?transaction, "submitting capabilities to consensus");
1662                components
1663                    .consensus_adapter
1664                    .submit(transaction, None, &cur_epoch_store)?;
1665            } else if self.state.is_active_validator(&cur_epoch_store)
1666                && cur_epoch_store
1667                    .protocol_config()
1668                    .track_non_committee_eligible_validators()
1669            {
1670                // Send signed capabilities to committee validators if we are a non-committee
1671                // validator in a separate task to not block the caller. Sending is done only if
1672                // the feature flag supporting it is enabled.
1673                let epoch_store = cur_epoch_store.clone();
1674                let node_clone = self.clone();
1675                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1676                    node_clone
1677                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1678                        .instrument(trace_span!(
1679                            "send_signed_capability_notification_to_committee_with_retry"
1680                        ))
1681                        .await;
1682                }));
1683            }
1684
1685            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1686
1687            if stop_condition == StopReason::RunWithRangeCondition {
1688                IotaNode::shutdown(&self).await;
1689                self.shutdown_channel_tx
1690                    .send(run_with_range)
1691                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1692                return Ok(());
1693            }
1694
1695            // Safe to call because we are in the middle of reconfiguration.
1696            let latest_system_state = self
1697                .state
1698                .get_object_cache_reader()
1699                .try_get_iota_system_state_object_unsafe()
1700                .expect("Read IOTA System State object cannot fail");
1701
1702            #[cfg(msim)]
1703            if !self
1704                .sim_state
1705                .sim_safe_mode_expected
1706                .load(Ordering::Relaxed)
1707            {
1708                debug_assert!(!latest_system_state.safe_mode());
1709            }
1710
1711            #[cfg(not(msim))]
1712            debug_assert!(!latest_system_state.safe_mode());
1713
1714            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1715                if self.state.is_fullnode(&cur_epoch_store) {
1716                    warn!(
1717                        "Failed to send end of epoch notification to subscriber: {:?}",
1718                        err
1719                    );
1720                }
1721            }
1722
1723            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1724            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1725
1726            self.auth_agg.store(Arc::new(
1727                self.auth_agg
1728                    .load()
1729                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1730            ));
1731
1732            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1733            let next_epoch = next_epoch_committee.epoch();
1734            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1735
1736            info!(
1737                next_epoch,
1738                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1739            );
1740
1741            fail_point_async!("reconfig_delay");
1742
1743            // We save the connection monitor status map regardless of validator / fullnode
1744            // status so that we don't need to restart the connection monitor
1745            // every epoch. Update the mappings that will be used by the
1746            // consensus adapter if it exists or is about to be created.
1747            let authority_names_to_peer_ids =
1748                new_epoch_start_state.get_authority_names_to_peer_ids();
1749            self.connection_monitor_status
1750                .update_mapping_for_epoch(authority_names_to_peer_ids);
1751
1752            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1753
1754            send_trusted_peer_change(
1755                &self.config,
1756                &self.trusted_peer_change_tx,
1757                &new_epoch_start_state,
1758            );
1759
1760            let mut validator_components_lock_guard = self.validator_components.lock().await;
1761
1762            // The following code handles 4 different cases, depending on whether the node
1763            // was a validator in the previous epoch, and whether the node is a validator
1764            // in the new epoch.
1765            let new_epoch_store = self
1766                .reconfigure_state(
1767                    &self.state,
1768                    &cur_epoch_store,
1769                    next_epoch_committee.clone(),
1770                    new_epoch_start_state,
1771                    hasher.clone(),
1772                )
1773                .await?;
1774
1775            let new_validator_components = if let Some(ValidatorComponents {
1776                validator_server_handle,
1777                validator_overload_monitor_handle,
1778                consensus_manager,
1779                consensus_store_pruner,
1780                consensus_adapter,
1781                mut checkpoint_service_tasks,
1782                checkpoint_metrics,
1783                iota_tx_validator_metrics,
1784                validator_registry_id,
1785            }) = validator_components_lock_guard.take()
1786            {
1787                info!("Reconfiguring the validator.");
1788                // Cancel the old checkpoint service tasks.
1789                // Waiting for checkpoint builder to finish gracefully is not possible, because
1790                // it may wait on transactions while consensus on peers have
1791                // already shut down.
1792                checkpoint_service_tasks.abort_all();
1793                while let Some(result) = checkpoint_service_tasks.join_next().await {
1794                    if let Err(err) = result {
1795                        if err.is_panic() {
1796                            std::panic::resume_unwind(err.into_panic());
1797                        }
1798                        warn!("Error in checkpoint service task: {:?}", err);
1799                    }
1800                }
1801                info!("Checkpoint service has shut down.");
1802
1803                consensus_manager.shutdown().await;
1804                info!("Consensus has shut down.");
1805
1806                info!("Epoch store finished reconfiguration.");
1807
1808                // No other components should be holding a strong reference to state hasher
1809                // at this point. Confirm here before we swap in the new hasher.
1810                let global_state_hasher_metrics = Arc::into_inner(hasher)
1811                    .expect("Object state hasher should have no other references at this point")
1812                    .metrics();
1813                let new_hasher = Arc::new(GlobalStateHasher::new(
1814                    self.state.get_global_state_hash_store().clone(),
1815                    global_state_hasher_metrics,
1816                ));
1817                let weak_hasher = Arc::downgrade(&new_hasher);
1818                *hasher_guard = Some(new_hasher);
1819
1820                consensus_store_pruner.prune(next_epoch).await;
1821
1822                if self.state.is_committee_validator(&new_epoch_store) {
1823                    // Only restart consensus if this node is still a validator in the new epoch.
1824                    Some(
1825                        Self::start_epoch_specific_validator_components(
1826                            &self.config,
1827                            self.state.clone(),
1828                            consensus_adapter,
1829                            self.checkpoint_store.clone(),
1830                            new_epoch_store.clone(),
1831                            self.state_sync_handle.clone(),
1832                            self.randomness_handle.clone(),
1833                            consensus_manager,
1834                            consensus_store_pruner,
1835                            weak_hasher,
1836                            self.backpressure_manager.clone(),
1837                            validator_server_handle,
1838                            validator_overload_monitor_handle,
1839                            checkpoint_metrics,
1840                            iota_tx_validator_metrics,
1841                            validator_registry_id,
1842                        )
1843                        .await?,
1844                    )
1845                } else {
1846                    info!("This node is no longer a validator after reconfiguration");
1847                    if self.registry_service.remove(validator_registry_id) {
1848                        debug!("Removed validator metrics registry");
1849                    } else {
1850                        warn!("Failed to remove validator metrics registry");
1851                    }
1852                    validator_server_handle.shutdown();
1853                    debug!("Validator grpc server shutdown triggered");
1854
1855                    None
1856                }
1857            } else {
1858                // No other components should be holding a strong reference to state hasher
1859                // at this point. Confirm here before we swap in the new hasher.
1860                let global_state_hasher_metrics = Arc::into_inner(hasher)
1861                    .expect("Object state hasher should have no other references at this point")
1862                    .metrics();
1863                let new_hasher = Arc::new(GlobalStateHasher::new(
1864                    self.state.get_global_state_hash_store().clone(),
1865                    global_state_hasher_metrics,
1866                ));
1867                let weak_hasher = Arc::downgrade(&new_hasher);
1868                *hasher_guard = Some(new_hasher);
1869
1870                if self.state.is_committee_validator(&new_epoch_store) {
1871                    info!("Promoting the node from fullnode to validator, starting grpc server");
1872
1873                    let mut components = Self::construct_validator_components(
1874                        self.config.clone(),
1875                        self.state.clone(),
1876                        Arc::new(next_epoch_committee.clone()),
1877                        new_epoch_store.clone(),
1878                        self.checkpoint_store.clone(),
1879                        self.state_sync_handle.clone(),
1880                        self.randomness_handle.clone(),
1881                        weak_hasher,
1882                        self.backpressure_manager.clone(),
1883                        self.connection_monitor_status.clone(),
1884                        &self.registry_service,
1885                    )
1886                    .await?;
1887
1888                    components.validator_server_handle =
1889                        components.validator_server_handle.start().await;
1890
1891                    Some(components)
1892                } else {
1893                    None
1894                }
1895            };
1896            *validator_components_lock_guard = new_validator_components;
1897
1898            // Force releasing current epoch store DB handle, because the
1899            // Arc<AuthorityPerEpochStore> may linger.
1900            cur_epoch_store.release_db_handles();
1901
1902            if cfg!(msim)
1903                && !matches!(
1904                    self.config
1905                        .authority_store_pruning_config
1906                        .num_epochs_to_retain_for_checkpoints(),
1907                    None | Some(u64::MAX) | Some(0)
1908                )
1909            {
1910                self.state
1911                    .prune_checkpoints_for_eligible_epochs_for_testing(
1912                        self.config.clone(),
1913                        iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1914                    )
1915                    .await?;
1916            }
1917
1918            epoch_store = new_epoch_store;
1919            info!("Reconfiguration finished");
1920        }
1921    }
1922
1923    async fn shutdown(&self) {
1924        if let Some(validator_components) = &*self.validator_components.lock().await {
1925            validator_components.consensus_manager.shutdown().await;
1926        }
1927
1928        // Shutdown the gRPC server if it's running
1929        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1930            info!("Shutting down gRPC server");
1931            if let Err(e) = grpc_handle.shutdown().await {
1932                warn!("Failed to gracefully shutdown gRPC server: {e}");
1933            }
1934        }
1935    }
1936
1937    /// Asynchronously reconfigures the state of the authority node for the next
1938    /// epoch.
1939    async fn reconfigure_state(
1940        &self,
1941        state: &Arc<AuthorityState>,
1942        cur_epoch_store: &AuthorityPerEpochStore,
1943        next_epoch_committee: Committee,
1944        next_epoch_start_system_state: EpochStartSystemState,
1945        global_state_hasher: Arc<GlobalStateHasher>,
1946    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1947        let next_epoch = next_epoch_committee.epoch();
1948
1949        let last_checkpoint = self
1950            .checkpoint_store
1951            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1952            .expect("Error loading last checkpoint for current epoch")
1953            .expect("Could not load last checkpoint for current epoch");
1954        let epoch_supply_change = last_checkpoint
1955            .end_of_epoch_data
1956            .as_ref()
1957            .ok_or_else(|| {
1958                IotaError::from("last checkpoint in epoch should contain end of epoch data")
1959            })?
1960            .epoch_supply_change;
1961
1962        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1963
1964        assert_eq!(
1965            Some(last_checkpoint_seq),
1966            self.checkpoint_store
1967                .get_highest_executed_checkpoint_seq_number()
1968                .expect("Error loading highest executed checkpoint sequence number")
1969        );
1970
1971        let epoch_start_configuration = EpochStartConfiguration::new(
1972            next_epoch_start_system_state,
1973            *last_checkpoint.digest(),
1974            state.get_object_store().as_ref(),
1975            EpochFlag::default_flags_for_new_epoch(&state.config),
1976        )
1977        .expect("EpochStartConfiguration construction cannot fail");
1978
1979        let new_epoch_store = self
1980            .state
1981            .reconfigure(
1982                cur_epoch_store,
1983                self.config.supported_protocol_versions.unwrap(),
1984                next_epoch_committee,
1985                epoch_start_configuration,
1986                global_state_hasher,
1987                &self.config.expensive_safety_check_config,
1988                epoch_supply_change,
1989                last_checkpoint_seq,
1990            )
1991            .await
1992            .expect("Reconfigure authority state cannot fail");
1993        info!(next_epoch, "Node State has been reconfigured");
1994        assert_eq!(next_epoch, new_epoch_store.epoch());
1995        self.state.get_reconfig_api().update_epoch_flags_metrics(
1996            cur_epoch_store.epoch_start_config().flags(),
1997            new_epoch_store.epoch_start_config().flags(),
1998        );
1999
2000        Ok(new_epoch_store)
2001    }
2002
2003    pub fn get_config(&self) -> &NodeConfig {
2004        &self.config
2005    }
2006
2007    async fn execute_transaction_immediately_at_zero_epoch(
2008        state: &Arc<AuthorityState>,
2009        epoch_store: &Arc<AuthorityPerEpochStore>,
2010        tx: &Transaction,
2011        span: tracing::Span,
2012    ) {
2013        let _guard = span.enter();
2014        let transaction =
2015            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2016                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2017                    tx.data().clone(),
2018                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2019                ),
2020            );
2021        state
2022            .try_execute_immediately(&transaction, None, epoch_store)
2023            .unwrap();
2024    }
2025
2026    pub fn randomness_handle(&self) -> randomness::Handle {
2027        self.randomness_handle.clone()
2028    }
2029
2030    /// Sends signed capability notification to committee validators for
2031    /// non-committee validators. This method implements retry logic to handle
2032    /// failed attempts to send the notification. It will retry sending the
2033    /// notification with an increasing interval until it receives a successful
2034    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2035    async fn send_signed_capability_notification_to_committee_with_retry(
2036        &self,
2037        epoch_store: &Arc<AuthorityPerEpochStore>,
2038    ) {
2039        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2040        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2041        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2042
2043        // Create the capability notification once
2044        let config = epoch_store.protocol_config();
2045        let binary_config = to_binary_config(config);
2046
2047        // Create the capability notification
2048        let capabilities = AuthorityCapabilitiesV1::new(
2049            self.state.name,
2050            epoch_store.get_chain_identifier().chain(),
2051            self.config
2052                .supported_protocol_versions
2053                .expect("Supported versions should be populated")
2054                .truncate_below(config.version),
2055            self.state
2056                .get_available_system_packages(&binary_config)
2057                .await,
2058        );
2059
2060        // Sign the capabilities using the authority key pair from config
2061        let signature = AuthoritySignature::new_secure(
2062            &IntentMessage::new(
2063                Intent::iota_app(IntentScope::AuthorityCapabilities),
2064                &capabilities,
2065            ),
2066            &epoch_store.epoch(),
2067            self.config.authority_key_pair(),
2068        );
2069
2070        let request = HandleCapabilityNotificationRequestV1 {
2071            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2072        };
2073
2074        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2075
2076        loop {
2077            let auth_agg = self.auth_agg.load();
2078            match auth_agg
2079                .send_capability_notification_to_quorum(request.clone())
2080                .await
2081            {
2082                Ok(_) => {
2083                    info!("Successfully sent capability notification to committee");
2084                    break;
2085                }
2086                Err(err) => {
2087                    match &err {
2088                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2089                            errors,
2090                        } => {
2091                            warn!(
2092                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2093                                retry_interval, errors
2094                            );
2095                        }
2096                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2097                            errors,
2098                        } => {
2099                            error!(
2100                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2101                                errors
2102                            );
2103                            break;
2104                        }
2105                    };
2106
2107                    // Wait before retrying
2108                    tokio::time::sleep(retry_interval).await;
2109
2110                    // Increase retry interval for the next attempt, capped at max
2111                    retry_interval = std::cmp::min(
2112                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2113                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2114                    );
2115                }
2116            }
2117        }
2118    }
2119}
2120
2121#[cfg(msim)]
2122impl IotaNode {
2123    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2124        self.sim_state.sim_node.id()
2125    }
2126
2127    pub fn set_safe_mode_expected(&self, new_value: bool) {
2128        info!("Setting safe mode expected to {}", new_value);
2129        self.sim_state
2130            .sim_safe_mode_expected
2131            .store(new_value, Ordering::Relaxed);
2132    }
2133}
2134
2135enum SpawnOnce {
2136    // Mutex is only needed to make SpawnOnce Sync
2137    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2138    #[allow(unused)]
2139    Started(iota_http::ServerHandle),
2140}
2141
2142impl SpawnOnce {
2143    pub fn new(
2144        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2145    ) -> Self {
2146        Self::Unstarted(Mutex::new(Box::pin(future)))
2147    }
2148
2149    pub async fn start(self) -> Self {
2150        match self {
2151            Self::Unstarted(future) => {
2152                let server = future
2153                    .into_inner()
2154                    .await
2155                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2156                let handle = server.handle().clone();
2157                tokio::spawn(async move {
2158                    if let Err(err) = server.serve().await {
2159                        info!("Server stopped: {err}");
2160                    }
2161                    info!("Server stopped");
2162                });
2163                Self::Started(handle)
2164            }
2165            Self::Started(_) => self,
2166        }
2167    }
2168
2169    pub fn shutdown(self) {
2170        if let SpawnOnce::Started(handle) = self {
2171            handle.trigger_shutdown();
2172        }
2173    }
2174}
2175
2176/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2177/// available.
2178fn send_trusted_peer_change(
2179    config: &NodeConfig,
2180    sender: &watch::Sender<TrustedPeerChangeEvent>,
2181    new_epoch_start_state: &EpochStartSystemState,
2182) {
2183    let new_committee =
2184        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2185
2186    sender.send_modify(|event| {
2187        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2188        event.new_committee = new_committee;
2189    })
2190}
2191
2192fn build_kv_store(
2193    state: &Arc<AuthorityState>,
2194    config: &NodeConfig,
2195    registry: &Registry,
2196) -> Result<Arc<TransactionKeyValueStore>> {
2197    let metrics = KeyValueStoreMetrics::new(registry);
2198    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2199
2200    let base_url = &config.transaction_kv_store_read_config.base_url;
2201
2202    if base_url.is_empty() {
2203        info!("no http kv store url provided, using local db only");
2204        return Ok(Arc::new(db_store));
2205    }
2206
2207    base_url.parse::<url::Url>().tap_err(|e| {
2208        error!(
2209            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2210            base_url, e
2211        )
2212    })?;
2213
2214    let http_store = HttpKVStore::new_kv(
2215        base_url,
2216        config.transaction_kv_store_read_config.cache_size,
2217        metrics.clone(),
2218    )?;
2219    info!("using local key-value store with fallback to http key-value store");
2220    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2221        db_store,
2222        http_store,
2223        metrics,
2224        "json_rpc_fallback",
2225    )))
2226}
2227
2228/// Builds and starts the gRPC server for the IOTA node based on the node's
2229/// configuration.
2230///
2231/// This function performs the following tasks:
2232/// 1. Checks if the node is a validator by inspecting the consensus
2233///    configuration; if so, it returns early as validators do not expose gRPC
2234///    APIs.
2235/// 2. Checks if gRPC is enabled in the configuration.
2236/// 3. Creates broadcast channels for checkpoint streaming.
2237/// 4. Initializes the gRPC checkpoint service.
2238/// 5. Spawns the gRPC server to listen for incoming connections.
2239///
2240/// Returns a tuple of optional broadcast channels for checkpoint summary and
2241/// data.
2242async fn build_grpc_server(
2243    config: &NodeConfig,
2244    state: Arc<AuthorityState>,
2245    state_sync_store: RocksDbStore,
2246    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2247    prometheus_registry: &Registry,
2248    server_version: ServerVersion,
2249) -> Result<Option<GrpcServerHandle>> {
2250    // Validators do not expose gRPC APIs
2251    if config.consensus_config().is_some() || !config.enable_grpc_api {
2252        return Ok(None);
2253    }
2254
2255    let Some(grpc_config) = &config.grpc_api_config else {
2256        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2257    };
2258
2259    // Get chain identifier from state directly
2260    let chain_id = state.get_chain_identifier();
2261
2262    let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2263
2264    // Create cancellation token for proper shutdown hierarchy
2265    let shutdown_token = CancellationToken::new();
2266
2267    // Create GrpcReader
2268    let grpc_reader = Arc::new(GrpcReader::new(
2269        grpc_read_store,
2270        Some(server_version.to_string()),
2271    ));
2272
2273    // Create gRPC server metrics
2274    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2275
2276    let handle = start_grpc_server(
2277        grpc_reader,
2278        executor,
2279        grpc_config.clone(),
2280        shutdown_token,
2281        chain_id,
2282        Some(grpc_server_metrics),
2283    )
2284    .await?;
2285
2286    Ok(Some(handle))
2287}
2288
2289/// Builds and starts the HTTP server for the IOTA node, exposing the JSON-RPC
2290/// API based on the node's configuration.
2291///
2292/// This function performs the following tasks:
2293/// 1. Checks if the node is a validator by inspecting the consensus
2294///    configuration; if so, it returns early as validators do not expose these
2295///    APIs.
2296/// 2. Creates an Axum router to handle HTTP requests.
2297/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2298///    on the node's state and configuration, including CoinApi,
2299///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2300///    IndexerApi.
2301/// 4. Binds the server to the specified JSON-RPC address and starts listening
2302///    for incoming connections.
2303pub async fn build_http_server(
2304    state: Arc<AuthorityState>,
2305    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2306    config: &NodeConfig,
2307    prometheus_registry: &Registry,
2308) -> Result<Option<iota_http::ServerHandle>> {
2309    // Validators do not expose these APIs
2310    if config.consensus_config().is_some() {
2311        return Ok(None);
2312    }
2313
2314    let mut router = axum::Router::new();
2315
2316    let json_rpc_router = {
2317        let mut server = JsonRpcServerBuilder::new(
2318            env!("CARGO_PKG_VERSION"),
2319            prometheus_registry,
2320            config.policy_config.clone(),
2321            config.firewall_config.clone(),
2322        );
2323
2324        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2325
2326        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2327        server.register_module(ReadApi::new(
2328            state.clone(),
2329            kv_store.clone(),
2330            metrics.clone(),
2331        ))?;
2332        server.register_module(CoinReadApi::new(
2333            state.clone(),
2334            kv_store.clone(),
2335            metrics.clone(),
2336        )?)?;
2337
2338        // if run_with_range is enabled we want to prevent any transactions
2339        // run_with_range = None is normal operating conditions
2340        if config.run_with_range.is_none() {
2341            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2342        }
2343        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2344
2345        if let Some(transaction_orchestrator) = transaction_orchestrator {
2346            server.register_module(TransactionExecutionApi::new(
2347                state.clone(),
2348                transaction_orchestrator.clone(),
2349                metrics.clone(),
2350            ))?;
2351        }
2352
2353        let iota_names_config = config
2354            .iota_names_config
2355            .clone()
2356            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2357
2358        server.register_module(IndexerApi::new(
2359            state.clone(),
2360            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2361            kv_store,
2362            metrics,
2363            iota_names_config,
2364            config.indexer_max_subscriptions,
2365        ))?;
2366        server.register_module(MoveUtils::new(state.clone()))?;
2367
2368        let server_type = config.jsonrpc_server_type();
2369
2370        server.to_router(server_type).await?
2371    };
2372
2373    router = router.merge(json_rpc_router);
2374
2375    router = router
2376        .route("/health", axum::routing::get(health_check_handler))
2377        .route_layer(axum::Extension(state));
2378
2379    let layers = ServiceBuilder::new()
2380        .map_request(|mut request: axum::http::Request<_>| {
2381            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2382                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2383                request.extensions_mut().insert(axum_connect_info);
2384            }
2385            request
2386        })
2387        .layer(axum::middleware::from_fn(server_timing_middleware));
2388
2389    router = router.layer(layers);
2390
2391    let handle = iota_http::Builder::new()
2392        .serve(&config.json_rpc_address, router)
2393        .map_err(|e| anyhow::anyhow!("{e}"))?;
2394    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2395
2396    Ok(Some(handle))
2397}
2398
2399#[derive(Debug, serde::Serialize, serde::Deserialize)]
2400pub struct Threshold {
2401    pub threshold_seconds: Option<u32>,
2402}
2403
2404async fn health_check_handler(
2405    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2406    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2407) -> impl axum::response::IntoResponse {
2408    if let Some(threshold_seconds) = threshold_seconds {
2409        // Attempt to get the latest checkpoint
2410        let summary = match state
2411            .get_checkpoint_store()
2412            .get_highest_executed_checkpoint()
2413        {
2414            Ok(Some(summary)) => summary,
2415            Ok(None) => {
2416                warn!("Highest executed checkpoint not found");
2417                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2418            }
2419            Err(err) => {
2420                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2421                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2422            }
2423        };
2424
2425        // Calculate the threshold time based on the provided threshold_seconds
2426        let latest_chain_time = summary.timestamp();
2427        let threshold =
2428            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2429
2430        // Check if the latest checkpoint is within the threshold
2431        if latest_chain_time < threshold {
2432            warn!(
2433                ?latest_chain_time,
2434                ?threshold,
2435                "failing health check due to checkpoint lag"
2436            );
2437            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2438        }
2439    }
2440    // if health endpoint is responding and no threshold is given, respond success
2441    (axum::http::StatusCode::OK, "up")
2442}
2443
2444#[cfg(not(test))]
2445fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2446    protocol_config.max_transactions_per_checkpoint() as usize
2447}
2448
2449#[cfg(test)]
2450fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2451    2
2452}