Skip to main content

iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::HashMap,
9    fmt,
10    future::Future,
11    path::PathBuf,
12    sync::{Arc, Weak},
13    time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18    callback::CallbackLayer,
19    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28    ConsensusConfig, NodeConfig,
29    node::{DBCheckpointConfig, RunWithRange},
30    node_config_metrics::NodeConfigMetrics,
31    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34    authority::{
35        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36        authority_per_epoch_store::AuthorityPerEpochStore,
37        authority_store_pruner::ObjectsCompactionFilter,
38        authority_store_tables::{
39            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40        },
41        backpressure::BackpressureManager,
42        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43    },
44    authority_aggregator::{
45        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46    },
47    authority_client::NetworkAuthorityClient,
48    authority_server::{ValidatorService, ValidatorServiceMetrics},
49    checkpoint_progress_tracker::CheckpointProgressTracker,
50    checkpoints::{
51        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
52        SubmitCheckpointToConsensus,
53        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
54    },
55    connection_monitor::ConnectionMonitor,
56    consensus_adapter::{
57        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
58        ConsensusClient,
59    },
60    consensus_handler::ConsensusHandlerInitializer,
61    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
62    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
63    db_checkpoint_handler::DBCheckpointHandler,
64    epoch::{
65        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
66        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
67        reconfiguration::ReconfigurationInitiator,
68    },
69    execution_cache::build_execution_cache,
70    global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
71    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72    jsonrpc_index::IndexStore,
73    module_cache_metrics::ResolverMetrics,
74    overload_monitor::overload_monitor,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    storage::{GrpcReadStore, RocksDbStore},
78    transaction_orchestrator::TransactionOrchestrator,
79    validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
82use iota_json_rpc::{
83    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
84    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
85    transaction_builder_api::TransactionBuilderApi,
86    transaction_execution_api::TransactionExecutionApi,
87};
88use iota_json_rpc_api::JsonRpcMetrics;
89use iota_macros::{fail_point, fail_point_async, replay_log};
90use iota_metrics::{
91    RegistryID, RegistryService,
92    hardware_metrics::register_hardware_metrics,
93    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
94    server_timing_middleware, spawn_monitored_task,
95};
96use iota_names::config::IotaNamesConfig;
97use iota_network::{
98    api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
99};
100use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
101use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
102use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
103use iota_snapshot::uploader::StateSnapshotUploader;
104use iota_storage::{
105    FileCompression, StorageFormat,
106    http_key_value_store::HttpKVStore,
107    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
108    key_value_store_metrics::KeyValueStoreMetrics,
109};
110use iota_types::{
111    base_types::{AuthorityName, ConciseableName, EpochId},
112    committee::Committee,
113    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
114    digests::ChainIdentifier,
115    error::{IotaError, IotaResult},
116    executable_transaction::VerifiedExecutableTransaction,
117    execution_config_utils::to_binary_config,
118    full_checkpoint_content::CheckpointData,
119    iota_system_state::{
120        IotaSystemState, IotaSystemStateTrait,
121        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
122    },
123    messages_consensus::{
124        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
125        SignedAuthorityCapabilitiesV1,
126    },
127    messages_grpc::HandleCapabilityNotificationRequestV1,
128    quorum_driver_types::QuorumDriverEffectsQueueResult,
129    supported_protocol_versions::SupportedProtocolVersions,
130    transaction::{Transaction, VerifiedCertificate},
131};
132use prometheus::Registry;
133#[cfg(msim)]
134use simulator::*;
135use tap::tap::TapFallible;
136use tokio::{
137    sync::{Mutex, broadcast, mpsc, watch},
138    task::{JoinHandle, JoinSet},
139};
140use tokio_util::sync::CancellationToken;
141use tower::ServiceBuilder;
142use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
143use typed_store::{
144    DBMetrics,
145    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
146};
147
148use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
149
150pub mod admin;
151mod handle;
152pub mod metrics;
153
154pub struct ValidatorComponents {
155    validator_server_handle: SpawnOnce,
156    validator_overload_monitor_handle: Option<JoinHandle<()>>,
157    consensus_manager: ConsensusManager,
158    consensus_store_pruner: ConsensusStorePruner,
159    consensus_adapter: Arc<ConsensusAdapter>,
160    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
161    checkpoint_service_tasks: JoinSet<()>,
162    checkpoint_metrics: Arc<CheckpointMetrics>,
163    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
164    validator_registry_id: RegistryID,
165}
166
167#[cfg(msim)]
168mod simulator {
169    use std::sync::atomic::AtomicBool;
170
171    use super::*;
172
173    pub(super) struct SimState {
174        pub sim_node: iota_simulator::runtime::NodeHandle,
175        pub sim_safe_mode_expected: AtomicBool,
176        _leak_detector: iota_simulator::NodeLeakDetector,
177    }
178
179    impl Default for SimState {
180        fn default() -> Self {
181            Self {
182                sim_node: iota_simulator::runtime::NodeHandle::current(),
183                sim_safe_mode_expected: AtomicBool::new(false),
184                _leak_detector: iota_simulator::NodeLeakDetector::new(),
185            }
186        }
187    }
188}
189
190#[derive(Clone)]
191pub struct ServerVersion {
192    pub bin: &'static str,
193    pub version: &'static str,
194}
195
196impl ServerVersion {
197    pub fn new(bin: &'static str, version: &'static str) -> Self {
198        Self { bin, version }
199    }
200}
201
202impl std::fmt::Display for ServerVersion {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.write_str(self.bin)?;
205        f.write_str("/")?;
206        f.write_str(self.version)
207    }
208}
209
210pub struct IotaNode {
211    config: NodeConfig,
212    validator_components: Mutex<Option<ValidatorComponents>>,
213    /// The http server responsible for serving JSON-RPC
214    _http_server: Option<iota_http::ServerHandle>,
215    state: Arc<AuthorityState>,
216    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
217    registry_service: RegistryService,
218    metrics: Arc<IotaNodeMetrics>,
219
220    _discovery: discovery::Handle,
221    state_sync_handle: state_sync::Handle,
222    randomness_handle: randomness::Handle,
223    checkpoint_store: Arc<CheckpointStore>,
224    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
225    connection_monitor_status: Arc<ConnectionMonitorStatus>,
226
227    /// Broadcast channel to send the starting system state for the next epoch.
228    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
229
230    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
231    /// peers.
232    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
233
234    backpressure_manager: Arc<BackpressureManager>,
235
236    checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
237
238    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
239
240    #[cfg(msim)]
241    sim_state: SimState,
242
243    _state_archive_handle: Option<broadcast::Sender<()>>,
244
245    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
246    // Channel to allow signaling upstream to shutdown iota-node
247    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
248
249    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
250    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
251
252    /// AuthorityAggregator of the network, created at start and beginning of
253    /// each epoch. Use ArcSwap so that we could mutate it without taking
254    /// mut reference.
255    // TODO: Eventually we can make this auth aggregator a shared reference so that this
256    // update will automatically propagate to other uses.
257    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
258}
259
260impl fmt::Debug for IotaNode {
261    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262        f.debug_struct("IotaNode")
263            .field("name", &self.state.name.concise())
264            .finish()
265    }
266}
267
268impl IotaNode {
269    pub async fn start(
270        config: NodeConfig,
271        registry_service: RegistryService,
272    ) -> Result<Arc<IotaNode>> {
273        Self::start_async(
274            config,
275            registry_service,
276            ServerVersion::new("iota-node", "unknown"),
277        )
278        .await
279    }
280
281    pub async fn start_async(
282        config: NodeConfig,
283        registry_service: RegistryService,
284        server_version: ServerVersion,
285    ) -> Result<Arc<IotaNode>> {
286        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
287        let mut config = config.clone();
288        if config.supported_protocol_versions.is_none() {
289            info!(
290                "populating config.supported_protocol_versions with default {:?}",
291                SupportedProtocolVersions::SYSTEM_DEFAULT
292            );
293            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
294        }
295
296        let run_with_range = config.run_with_range;
297        let is_validator = config.consensus_config().is_some();
298        let is_full_node = !is_validator;
299        let prometheus_registry = registry_service.default_registry();
300
301        info!(node =? config.authority_public_key(),
302            "Initializing iota-node listening on {}", config.network_address
303        );
304
305        let genesis = config.genesis()?.clone();
306
307        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
308        info!("IOTA chain identifier: {chain_identifier}");
309
310        // Check and set the db_corrupted flag
311        let db_corrupted_path = &config.db_path().join("status");
312        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
313            panic!("Failed to check database corruption: {err}");
314        }
315
316        // Initialize metrics to track db usage before creating any stores
317        DBMetrics::init(&prometheus_registry);
318
319        // Initialize IOTA metrics.
320        iota_metrics::init_metrics(&prometheus_registry);
321        // Unsupported (because of the use of static variable) and unnecessary in
322        // simtests.
323        #[cfg(not(msim))]
324        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
325
326        // Register hardware metrics.
327        register_hardware_metrics(&registry_service, &config.db_path)
328            .expect("Failed registering hardware metrics");
329        // Register uptime metric
330        prometheus_registry
331            .register(iota_metrics::uptime_metric(
332                if is_validator {
333                    "validator"
334                } else {
335                    "fullnode"
336                },
337                server_version.version,
338                &chain_identifier.to_string(),
339            ))
340            .expect("Failed registering uptime metric");
341
342        // If genesis come with some migration data then load them into memory from the
343        // file path specified in config.
344        let migration_tx_data = if genesis.contains_migrations() {
345            // Here the load already verifies that the content of the migration blob is
346            // valid in respect to the content found in genesis
347            Some(config.load_migration_tx_data()?)
348        } else {
349            None
350        };
351
352        let secret = Arc::pin(config.authority_key_pair().copy());
353        let genesis_committee = genesis.committee()?;
354        let committee_store = Arc::new(CommitteeStore::new(
355            config.db_path().join("epochs"),
356            &genesis_committee,
357            None,
358        ));
359
360        let mut pruner_db = None;
361        if config
362            .authority_store_pruning_config
363            .enable_compaction_filter
364        {
365            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
366                &config.db_path().join("store"),
367            )));
368        }
369        let compaction_filter = pruner_db
370            .clone()
371            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
372
373        // By default, only enable write stall on validators for perpetual db.
374        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
375        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
376            enable_write_stall,
377            compaction_filter,
378        };
379        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
380            &config.db_path().join("store"),
381            Some(perpetual_tables_options),
382        ));
383        let is_genesis = perpetual_tables
384            .database_is_empty()
385            .expect("Database read should not fail at init.");
386        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
387        let backpressure_manager =
388            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
389
390        let perpetual_tables_for_progress = perpetual_tables.clone();
391        let store = AuthorityStore::open(
392            perpetual_tables,
393            &genesis,
394            &config,
395            &prometheus_registry,
396            migration_tx_data.as_ref(),
397        )
398        .await?;
399
400        let cur_epoch = store.get_recovery_epoch_at_restart()?;
401        let committee = committee_store
402            .get_committee(&cur_epoch)?
403            .expect("Committee of the current epoch must exist");
404        let epoch_start_configuration = store
405            .get_epoch_start_configuration()?
406            .expect("EpochStartConfiguration of the current epoch must exist");
407        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
408        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
409
410        let cache_traits = build_execution_cache(
411            &config.execution_cache_config,
412            &prometheus_registry,
413            &store,
414            backpressure_manager.clone(),
415        );
416
417        let auth_agg = {
418            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
419            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
420            Arc::new(ArcSwap::new(Arc::new(
421                AuthorityAggregator::new_from_epoch_start_state(
422                    epoch_start_configuration.epoch_start_state(),
423                    &committee_store,
424                    safe_client_metrics_base,
425                    auth_agg_metrics,
426                ),
427            )))
428        };
429
430        let chain = match config.chain_override_for_testing {
431            Some(chain) => chain,
432            None => chain_identifier.chain(),
433        };
434
435        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
436        let epoch_store = AuthorityPerEpochStore::new(
437            config.authority_public_key(),
438            committee.clone(),
439            &config.db_path().join("store"),
440            Some(epoch_options.options),
441            EpochMetrics::new(&registry_service.default_registry()),
442            epoch_start_configuration,
443            cache_traits.backing_package_store.clone(),
444            cache_metrics,
445            signature_verifier_metrics,
446            &config.expensive_safety_check_config,
447            (chain_identifier, chain),
448            checkpoint_store
449                .get_highest_executed_checkpoint_seq_number()
450                .expect("checkpoint store read cannot fail")
451                .unwrap_or(0),
452        )?;
453
454        info!("created epoch store");
455
456        replay_log!(
457            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
458            epoch_store.epoch(),
459            epoch_store.protocol_config()
460        );
461
462        // the database is empty at genesis time
463        if is_genesis {
464            info!("checking IOTA conservation at genesis");
465            // When we are opening the db table, the only time when it's safe to
466            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
467            // an epoch and the IOTA conservation check will fail. This also initialize
468            // the expected_network_iota_amount table.
469            cache_traits
470                .reconfig_api
471                .try_expensive_check_iota_conservation(&epoch_store, None)
472                .expect("IOTA conservation check cannot fail at genesis");
473        }
474
475        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
476        let default_buffer_stake = epoch_store
477            .protocol_config()
478            .buffer_stake_for_protocol_upgrade_bps();
479        if effective_buffer_stake != default_buffer_stake {
480            warn!(
481                ?effective_buffer_stake,
482                ?default_buffer_stake,
483                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
484            );
485        }
486
487        checkpoint_store.insert_genesis_checkpoint(
488            genesis.checkpoint(),
489            genesis.checkpoint_contents().clone(),
490            &epoch_store,
491        );
492
493        // Database has everything from genesis, set corrupted key to 0
494        unmark_db_corruption(db_corrupted_path)?;
495
496        info!("creating state sync store");
497        let state_sync_store = RocksDbStore::new(
498            cache_traits.clone(),
499            committee_store.clone(),
500            checkpoint_store.clone(),
501        );
502
503        let index_store = if is_full_node && config.enable_index_processing {
504            info!("creating index store");
505            Some(Arc::new(IndexStore::new(
506                config.db_path().join("indexes"),
507                &prometheus_registry,
508                epoch_store
509                    .protocol_config()
510                    .max_move_identifier_len_as_option(),
511            )))
512        } else {
513            None
514        };
515
516        let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
517            Some(Arc::new(
518                GrpcIndexesStore::new(
519                    config.db_path().join(GRPC_INDEXES_DIR),
520                    Arc::clone(&store),
521                    &checkpoint_store,
522                )
523                .await,
524            ))
525        } else {
526            None
527        };
528
529        info!("creating archive reader");
530        // Create network
531        // TODO only configure validators as seed/preferred peers for validators and not
532        // for fullnodes once we've had a chance to re-work fullnode
533        // configuration generation.
534        let archive_readers =
535            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
536        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
537        let (randomness_tx, randomness_rx) = mpsc::channel(
538            config
539                .p2p_config
540                .randomness
541                .clone()
542                .unwrap_or_default()
543                .mailbox_capacity(),
544        );
545        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
546            Self::create_p2p_network(
547                &config,
548                state_sync_store.clone(),
549                chain_identifier,
550                trusted_peer_change_rx,
551                archive_readers.clone(),
552                randomness_tx,
553                &prometheus_registry,
554            )?;
555
556        // We must explicitly send this instead of relying on the initial value to
557        // trigger watch value change, so that state-sync is able to process it.
558        send_trusted_peer_change(
559            &config,
560            &trusted_peer_change_tx,
561            epoch_store.epoch_start_state(),
562        );
563
564        info!("start state archival");
565        // Start archiving local state to remote store
566        let state_archive_handle =
567            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
568                .await?;
569
570        info!("start snapshot upload");
571        // Start uploading state snapshot to remote store
572        let state_snapshot_handle =
573            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
574
575        let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
576
577        // Start uploading db checkpoints to remote store
578        info!("start db checkpoint");
579        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
580            &config,
581            &prometheus_registry,
582            state_snapshot_handle.is_some(),
583            Some(checkpoint_progress_tracker.clone()),
584        )?;
585
586        let mut genesis_objects = genesis.objects().to_vec();
587        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
588            genesis_objects.extend(migration_tx_data.get_objects());
589        }
590
591        let authority_name = config.authority_public_key();
592        let validator_tx_finalizer =
593            config
594                .enable_validator_tx_finalizer
595                .then_some(Arc::new(ValidatorTxFinalizer::new(
596                    auth_agg.clone(),
597                    authority_name,
598                    &prometheus_registry,
599                )));
600
601        info!("create authority state");
602        let state = AuthorityState::new(
603            authority_name,
604            secret,
605            config.supported_protocol_versions.unwrap(),
606            store.clone(),
607            cache_traits.clone(),
608            epoch_store.clone(),
609            committee_store.clone(),
610            index_store.clone(),
611            grpc_indexes_store,
612            checkpoint_store.clone(),
613            &prometheus_registry,
614            &genesis_objects,
615            &db_checkpoint_config,
616            config.clone(),
617            archive_readers,
618            validator_tx_finalizer,
619            chain_identifier,
620            pruner_db,
621            Some(checkpoint_progress_tracker.clone()),
622            config.policy_config.clone(),
623            config.firewall_config.clone(),
624        )
625        .await;
626
627        // ensure genesis and migration txs were executed
628        if epoch_store.epoch() == 0 {
629            let genesis_tx = &genesis.transaction();
630            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
631            // Execute genesis transaction
632            Self::execute_transaction_immediately_at_zero_epoch(
633                &state,
634                &epoch_store,
635                genesis_tx,
636                span,
637            )
638            .await;
639
640            // Execute migration transactions if present
641            if let Some(migration_tx_data) = migration_tx_data {
642                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
643                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
644                    Self::execute_transaction_immediately_at_zero_epoch(
645                        &state,
646                        &epoch_store,
647                        tx,
648                        span,
649                    )
650                    .await;
651                }
652            }
653        }
654
655        // Start the loop that receives new randomness and generates transactions for
656        // it.
657        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
658
659        if config
660            .expensive_safety_check_config
661            .enable_secondary_index_checks()
662        {
663            if let Some(indexes) = state.indexes.clone() {
664                iota_core::verify_indexes::verify_indexes(
665                    state.get_global_state_hash_store().as_ref(),
666                    indexes,
667                )
668                .expect("secondary indexes are inconsistent");
669            }
670        }
671
672        let (end_of_epoch_channel, end_of_epoch_receiver) =
673            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
674
675        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
676            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
677                auth_agg.load_full(),
678                state.clone(),
679                end_of_epoch_receiver,
680                &config.db_path(),
681                &prometheus_registry,
682            )))
683        } else {
684            None
685        };
686
687        let http_server = build_http_server(
688            state.clone(),
689            &transaction_orchestrator.clone(),
690            &config,
691            &prometheus_registry,
692        )
693        .await?;
694
695        let global_state_hasher = Arc::new(GlobalStateHasher::new(
696            cache_traits.global_state_hash_store.clone(),
697            GlobalStateHashMetrics::new(&prometheus_registry),
698        ));
699
700        let authority_names_to_peer_ids = epoch_store
701            .epoch_start_state()
702            .get_authority_names_to_peer_ids();
703
704        let network_connection_metrics =
705            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
706
707        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
708
709        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
710            p2p_network.downgrade(),
711            network_connection_metrics,
712            HashMap::new(),
713            None,
714        );
715
716        let connection_monitor_status = ConnectionMonitorStatus {
717            connection_statuses,
718            authority_names_to_peer_ids,
719        };
720
721        let connection_monitor_status = Arc::new(connection_monitor_status);
722        let iota_node_metrics =
723            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
724
725        iota_node_metrics
726            .binary_max_protocol_version
727            .set(ProtocolVersion::MAX.as_u64() as i64);
728        iota_node_metrics
729            .configured_max_protocol_version
730            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
731
732        // Convert transaction orchestrator to executor trait object for gRPC server
733        // Note that the transaction_orchestrator (so as executor) will be None if it is
734        // a validator node or run_with_range is set
735        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
736            transaction_orchestrator
737                .clone()
738                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
739
740        let grpc_server_handle = build_grpc_server(
741            &config,
742            state.clone(),
743            state_sync_store.clone(),
744            executor,
745            &prometheus_registry,
746            server_version,
747        )
748        .await?;
749
750        let validator_components = if state.is_committee_validator(&epoch_store) {
751            let (components, _) = futures::join!(
752                Self::construct_validator_components(
753                    config.clone(),
754                    state.clone(),
755                    committee,
756                    epoch_store.clone(),
757                    checkpoint_store.clone(),
758                    state_sync_handle.clone(),
759                    randomness_handle.clone(),
760                    Arc::downgrade(&global_state_hasher),
761                    backpressure_manager.clone(),
762                    connection_monitor_status.clone(),
763                    &registry_service,
764                ),
765                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
766            );
767            let mut components = components?;
768
769            components.consensus_adapter.submit_recovered(&epoch_store);
770
771            // Start the gRPC server
772            components.validator_server_handle = components.validator_server_handle.start().await;
773
774            Some(components)
775        } else {
776            None
777        };
778
779        // setup shutdown channel
780        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
781
782        let node = Self {
783            config,
784            validator_components: Mutex::new(validator_components),
785            _http_server: http_server,
786            state,
787            transaction_orchestrator,
788            registry_service,
789            metrics: iota_node_metrics,
790
791            _discovery: discovery_handle,
792            state_sync_handle,
793            randomness_handle,
794            checkpoint_store,
795            global_state_hasher: Mutex::new(Some(global_state_hasher)),
796            end_of_epoch_channel,
797            connection_monitor_status,
798            trusted_peer_change_tx,
799            backpressure_manager,
800            checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
801
802            _db_checkpoint_handle: db_checkpoint_handle,
803
804            #[cfg(msim)]
805            sim_state: Default::default(),
806
807            _state_archive_handle: state_archive_handle,
808            _state_snapshot_uploader_handle: state_snapshot_handle,
809            shutdown_channel_tx: shutdown_channel,
810
811            grpc_server_handle: Mutex::new(grpc_server_handle),
812
813            auth_agg,
814        };
815
816        info!("IotaNode started!");
817        let node = Arc::new(node);
818        let node_copy = node.clone();
819        spawn_monitored_task!(async move {
820            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
821            if let Err(error) = result {
822                warn!("Reconfiguration finished with error {:?}", error);
823            }
824        });
825
826        node.checkpoint_progress_tracker
827            .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
828
829        Ok(node)
830    }
831
832    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
833        self.end_of_epoch_channel.subscribe()
834    }
835
836    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
837        self.shutdown_channel_tx.subscribe()
838    }
839
840    pub fn current_epoch_for_testing(&self) -> EpochId {
841        self.state.current_epoch_for_testing()
842    }
843
844    pub fn db_checkpoint_path(&self) -> PathBuf {
845        self.config.db_checkpoint_path()
846    }
847
848    // Init reconfig process by starting to reject user certs
849    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
850        info!("close_epoch (current epoch = {})", epoch_store.epoch());
851        self.validator_components
852            .lock()
853            .await
854            .as_ref()
855            .ok_or_else(|| IotaError::from("Node is not a validator"))?
856            .consensus_adapter
857            .close_epoch(epoch_store);
858        Ok(())
859    }
860
861    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
862        self.state
863            .clear_override_protocol_upgrade_buffer_stake(epoch)
864    }
865
866    pub fn set_override_protocol_upgrade_buffer_stake(
867        &self,
868        epoch: EpochId,
869        buffer_stake_bps: u64,
870    ) -> IotaResult {
871        self.state
872            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
873    }
874
875    // Testing-only API to start epoch close process.
876    // For production code, please use the non-testing version.
877    pub async fn close_epoch_for_testing(&self) -> IotaResult {
878        let epoch_store = self.state.epoch_store_for_testing();
879        self.close_epoch(&epoch_store).await
880    }
881
882    async fn start_state_archival(
883        config: &NodeConfig,
884        prometheus_registry: &Registry,
885        state_sync_store: RocksDbStore,
886    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
887        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
888            let local_store_config = ObjectStoreConfig {
889                object_store: Some(ObjectStoreType::File),
890                directory: Some(config.archive_path()),
891                ..Default::default()
892            };
893            let archive_writer = ArchiveWriter::new(
894                local_store_config,
895                remote_store_config.clone(),
896                FileCompression::Zstd,
897                StorageFormat::Blob,
898                Duration::from_secs(600),
899                256 * 1024 * 1024,
900                prometheus_registry,
901            )
902            .await?;
903            Ok(Some(archive_writer.start(state_sync_store).await?))
904        } else {
905            Ok(None)
906        }
907    }
908
909    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
910    /// is set.
911    fn start_state_snapshot(
912        config: &NodeConfig,
913        prometheus_registry: &Registry,
914        checkpoint_store: Arc<CheckpointStore>,
915    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
916        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
917            let snapshot_uploader = StateSnapshotUploader::new(
918                &config.db_checkpoint_path(),
919                &config.snapshot_path(),
920                remote_store_config.clone(),
921                60,
922                prometheus_registry,
923                checkpoint_store,
924            )?;
925            Ok(Some(snapshot_uploader.start()))
926        } else {
927            Ok(None)
928        }
929    }
930
931    fn start_db_checkpoint(
932        config: &NodeConfig,
933        prometheus_registry: &Registry,
934        state_snapshot_enabled: bool,
935        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
936    ) -> Result<(
937        DBCheckpointConfig,
938        Option<tokio::sync::broadcast::Sender<()>>,
939    )> {
940        let checkpoint_path = Some(
941            config
942                .db_checkpoint_config
943                .checkpoint_path
944                .clone()
945                .unwrap_or_else(|| config.db_checkpoint_path()),
946        );
947        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
948            DBCheckpointConfig {
949                checkpoint_path,
950                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
951                    true
952                } else {
953                    config
954                        .db_checkpoint_config
955                        .perform_db_checkpoints_at_epoch_end
956                },
957                ..config.db_checkpoint_config.clone()
958            }
959        } else {
960            config.db_checkpoint_config.clone()
961        };
962
963        match (
964            db_checkpoint_config.object_store_config.as_ref(),
965            state_snapshot_enabled,
966        ) {
967            // If db checkpoint config object store not specified but
968            // state snapshot object store is specified, create handler
969            // anyway for marking db checkpoints as completed so that they
970            // can be uploaded as state snapshots.
971            (None, false) => Ok((db_checkpoint_config, None)),
972            (_, _) => {
973                let handler = DBCheckpointHandler::new(
974                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
975                    db_checkpoint_config.object_store_config.as_ref(),
976                    60,
977                    db_checkpoint_config
978                        .prune_and_compact_before_upload
979                        .unwrap_or(true),
980                    config.authority_store_pruning_config.clone(),
981                    prometheus_registry,
982                    state_snapshot_enabled,
983                    checkpoint_progress_tracker,
984                )?;
985                Ok((
986                    db_checkpoint_config,
987                    Some(DBCheckpointHandler::start(handler)),
988                ))
989            }
990        }
991    }
992
993    fn create_p2p_network(
994        config: &NodeConfig,
995        state_sync_store: RocksDbStore,
996        chain_identifier: ChainIdentifier,
997        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
998        archive_readers: ArchiveReaderBalancer,
999        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1000        prometheus_registry: &Registry,
1001    ) -> Result<(
1002        Network,
1003        discovery::Handle,
1004        state_sync::Handle,
1005        randomness::Handle,
1006    )> {
1007        let (state_sync, state_sync_server) = state_sync::Builder::new()
1008            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1009            .store(state_sync_store)
1010            .archive_readers(archive_readers)
1011            .with_metrics(prometheus_registry)
1012            .build();
1013
1014        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1015            .config(config.p2p_config.clone())
1016            .build();
1017
1018        let (randomness, randomness_router) =
1019            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1020                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1021                .with_metrics(prometheus_registry)
1022                .build();
1023
1024        let p2p_network = {
1025            let routes = anemo::Router::new()
1026                .add_rpc_service(discovery_server)
1027                .add_rpc_service(state_sync_server);
1028            let routes = routes.merge(randomness_router);
1029
1030            let inbound_network_metrics =
1031                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1032            let outbound_network_metrics =
1033                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1034
1035            let service = ServiceBuilder::new()
1036                .layer(
1037                    TraceLayer::new_for_server_errors()
1038                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1039                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1040                )
1041                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1042                    Arc::new(inbound_network_metrics),
1043                    config.p2p_config.excessive_message_size(),
1044                )))
1045                .service(routes);
1046
1047            let outbound_layer = ServiceBuilder::new()
1048                .layer(
1049                    TraceLayer::new_for_client_and_server_errors()
1050                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1051                        .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1052                )
1053                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1054                    Arc::new(outbound_network_metrics),
1055                    config.p2p_config.excessive_message_size(),
1056                )))
1057                .into_inner();
1058
1059            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1060            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1061            // many staking events in the epoch change txn.
1062            anemo_config.max_frame_size = Some(1 << 30);
1063
1064            // Set a higher default value for socket send/receive buffers if not already
1065            // configured.
1066            let mut quic_config = anemo_config.quic.unwrap_or_default();
1067            if quic_config.socket_send_buffer_size.is_none() {
1068                quic_config.socket_send_buffer_size = Some(20 << 20);
1069            }
1070            if quic_config.socket_receive_buffer_size.is_none() {
1071                quic_config.socket_receive_buffer_size = Some(20 << 20);
1072            }
1073            quic_config.allow_failed_socket_buffer_size_setting = true;
1074
1075            // Set high-performance defaults for quinn transport.
1076            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1077            if quic_config.max_concurrent_bidi_streams.is_none() {
1078                quic_config.max_concurrent_bidi_streams = Some(500);
1079            }
1080            if quic_config.max_concurrent_uni_streams.is_none() {
1081                quic_config.max_concurrent_uni_streams = Some(500);
1082            }
1083            if quic_config.stream_receive_window.is_none() {
1084                quic_config.stream_receive_window = Some(100 << 20);
1085            }
1086            if quic_config.receive_window.is_none() {
1087                quic_config.receive_window = Some(200 << 20);
1088            }
1089            if quic_config.send_window.is_none() {
1090                quic_config.send_window = Some(200 << 20);
1091            }
1092            if quic_config.crypto_buffer_size.is_none() {
1093                quic_config.crypto_buffer_size = Some(1 << 20);
1094            }
1095            if quic_config.max_idle_timeout_ms.is_none() {
1096                quic_config.max_idle_timeout_ms = Some(30_000);
1097            }
1098            if quic_config.keep_alive_interval_ms.is_none() {
1099                quic_config.keep_alive_interval_ms = Some(5_000);
1100            }
1101            anemo_config.quic = Some(quic_config);
1102
1103            let server_name = format!("iota-{chain_identifier}");
1104            let network = Network::bind(config.p2p_config.listen_address)
1105                .server_name(&server_name)
1106                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1107                .config(anemo_config)
1108                .outbound_request_layer(outbound_layer)
1109                .start(service)?;
1110            info!(
1111                server_name = server_name,
1112                "P2p network started on {}",
1113                network.local_addr()
1114            );
1115
1116            network
1117        };
1118
1119        let discovery_handle =
1120            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1121        let state_sync_handle = state_sync.start(p2p_network.clone());
1122        let randomness_handle = randomness.start(p2p_network.clone());
1123
1124        Ok((
1125            p2p_network,
1126            discovery_handle,
1127            state_sync_handle,
1128            randomness_handle,
1129        ))
1130    }
1131
1132    /// Asynchronously constructs and initializes the components necessary for
1133    /// the validator node.
1134    async fn construct_validator_components(
1135        config: NodeConfig,
1136        state: Arc<AuthorityState>,
1137        committee: Arc<Committee>,
1138        epoch_store: Arc<AuthorityPerEpochStore>,
1139        checkpoint_store: Arc<CheckpointStore>,
1140        state_sync_handle: state_sync::Handle,
1141        randomness_handle: randomness::Handle,
1142        global_state_hasher: Weak<GlobalStateHasher>,
1143        backpressure_manager: Arc<BackpressureManager>,
1144        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1145        registry_service: &RegistryService,
1146    ) -> Result<ValidatorComponents> {
1147        let mut config_clone = config.clone();
1148        let consensus_config = config_clone
1149            .consensus_config
1150            .as_mut()
1151            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1152        let validator_registry = Registry::new();
1153        let validator_registry_id = registry_service.add(validator_registry.clone());
1154
1155        let client = Arc::new(UpdatableConsensusClient::new());
1156        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1157            &committee,
1158            consensus_config,
1159            state.name,
1160            connection_monitor_status.clone(),
1161            &validator_registry,
1162            client.clone(),
1163            checkpoint_store.clone(),
1164        ));
1165        let consensus_manager = ConsensusManager::new(
1166            &config,
1167            consensus_config,
1168            registry_service,
1169            &validator_registry,
1170            client,
1171        );
1172
1173        // This only gets started up once, not on every epoch. (Make call to remove
1174        // every epoch.)
1175        let consensus_store_pruner = ConsensusStorePruner::new(
1176            consensus_manager.get_storage_base_path(),
1177            consensus_config.db_retention_epochs(),
1178            consensus_config.db_pruner_period(),
1179            &validator_registry,
1180        );
1181
1182        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1183        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1184
1185        let validator_server_handle = Self::start_grpc_validator_service(
1186            &config,
1187            state.clone(),
1188            consensus_adapter.clone(),
1189            &validator_registry,
1190        )
1191        .await?;
1192
1193        // Starts an overload monitor that monitors the execution of the authority.
1194        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1195        let validator_overload_monitor_handle = if config
1196            .authority_overload_config
1197            .max_load_shedding_percentage
1198            > 0
1199        {
1200            let authority_state = Arc::downgrade(&state);
1201            let overload_config = config.authority_overload_config.clone();
1202            fail_point!("starting_overload_monitor");
1203            Some(spawn_monitored_task!(overload_monitor(
1204                authority_state,
1205                overload_config,
1206            )))
1207        } else {
1208            None
1209        };
1210
1211        Self::start_epoch_specific_validator_components(
1212            &config,
1213            state.clone(),
1214            consensus_adapter,
1215            checkpoint_store,
1216            epoch_store,
1217            state_sync_handle,
1218            randomness_handle,
1219            consensus_manager,
1220            consensus_store_pruner,
1221            global_state_hasher,
1222            backpressure_manager,
1223            validator_server_handle,
1224            validator_overload_monitor_handle,
1225            checkpoint_metrics,
1226            iota_tx_validator_metrics,
1227            validator_registry_id,
1228        )
1229        .await
1230    }
1231
1232    /// Initializes and starts components specific to the current
1233    /// epoch for the validator node.
1234    async fn start_epoch_specific_validator_components(
1235        config: &NodeConfig,
1236        state: Arc<AuthorityState>,
1237        consensus_adapter: Arc<ConsensusAdapter>,
1238        checkpoint_store: Arc<CheckpointStore>,
1239        epoch_store: Arc<AuthorityPerEpochStore>,
1240        state_sync_handle: state_sync::Handle,
1241        randomness_handle: randomness::Handle,
1242        consensus_manager: ConsensusManager,
1243        consensus_store_pruner: ConsensusStorePruner,
1244        global_state_hasher: Weak<GlobalStateHasher>,
1245        backpressure_manager: Arc<BackpressureManager>,
1246        validator_server_handle: SpawnOnce,
1247        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1248        checkpoint_metrics: Arc<CheckpointMetrics>,
1249        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1250        validator_registry_id: RegistryID,
1251    ) -> Result<ValidatorComponents> {
1252        let checkpoint_service = Self::build_checkpoint_service(
1253            config,
1254            consensus_adapter.clone(),
1255            checkpoint_store.clone(),
1256            epoch_store.clone(),
1257            state.clone(),
1258            state_sync_handle,
1259            global_state_hasher,
1260            checkpoint_metrics.clone(),
1261        );
1262
1263        // create a new map that gets injected into both the consensus handler and the
1264        // consensus adapter the consensus handler will write values forwarded
1265        // from consensus, and the consensus adapter will read the values to
1266        // make decisions about which validator submits a transaction to consensus
1267        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1268
1269        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1270
1271        let randomness_manager = RandomnessManager::try_new(
1272            Arc::downgrade(&epoch_store),
1273            Box::new(consensus_adapter.clone()),
1274            randomness_handle,
1275            config.authority_key_pair(),
1276        )
1277        .await;
1278        if let Some(randomness_manager) = randomness_manager {
1279            epoch_store
1280                .set_randomness_manager(randomness_manager)
1281                .await?;
1282        }
1283
1284        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1285            state.clone(),
1286            checkpoint_service.clone(),
1287            epoch_store.clone(),
1288            low_scoring_authorities,
1289            backpressure_manager,
1290        );
1291
1292        info!("Starting consensus manager");
1293
1294        consensus_manager
1295            .start(
1296                config,
1297                epoch_store.clone(),
1298                consensus_handler_initializer,
1299                IotaTxValidator::new(
1300                    epoch_store.clone(),
1301                    checkpoint_service.clone(),
1302                    state.transaction_manager().clone(),
1303                    iota_tx_validator_metrics.clone(),
1304                ),
1305            )
1306            .await;
1307
1308        info!("Spawning checkpoint service");
1309        let checkpoint_service_tasks = checkpoint_service.spawn().await;
1310
1311        Ok(ValidatorComponents {
1312            validator_server_handle,
1313            validator_overload_monitor_handle,
1314            consensus_manager,
1315            consensus_store_pruner,
1316            consensus_adapter,
1317            checkpoint_service_tasks,
1318            checkpoint_metrics,
1319            iota_tx_validator_metrics,
1320            validator_registry_id,
1321        })
1322    }
1323
1324    /// Starts the checkpoint service for the validator node, initializing
1325    /// necessary components and settings.
1326    /// The function ensures proper initialization of the checkpoint service,
1327    /// preparing it to handle checkpoint creation and submission to consensus,
1328    /// while also setting up the necessary monitoring and synchronization
1329    /// mechanisms.
1330    fn build_checkpoint_service(
1331        config: &NodeConfig,
1332        consensus_adapter: Arc<ConsensusAdapter>,
1333        checkpoint_store: Arc<CheckpointStore>,
1334        epoch_store: Arc<AuthorityPerEpochStore>,
1335        state: Arc<AuthorityState>,
1336        state_sync_handle: state_sync::Handle,
1337        global_state_hasher: Weak<GlobalStateHasher>,
1338        checkpoint_metrics: Arc<CheckpointMetrics>,
1339    ) -> Arc<CheckpointService> {
1340        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1341        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1342
1343        debug!(
1344            "Starting checkpoint service with epoch start timestamp {}
1345            and epoch duration {}",
1346            epoch_start_timestamp_ms, epoch_duration_ms
1347        );
1348
1349        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1350            sender: consensus_adapter,
1351            signer: state.secret.clone(),
1352            authority: config.authority_public_key(),
1353            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1354                .checked_add(epoch_duration_ms)
1355                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1356            metrics: checkpoint_metrics.clone(),
1357        });
1358
1359        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1360        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1361        let max_checkpoint_size_bytes =
1362            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1363
1364        CheckpointService::build(
1365            state.clone(),
1366            checkpoint_store,
1367            epoch_store,
1368            state.get_transaction_cache_reader().clone(),
1369            global_state_hasher,
1370            checkpoint_output,
1371            Box::new(certified_checkpoint_output),
1372            checkpoint_metrics,
1373            max_tx_per_checkpoint,
1374            max_checkpoint_size_bytes,
1375        )
1376    }
1377
1378    fn construct_consensus_adapter(
1379        committee: &Committee,
1380        consensus_config: &ConsensusConfig,
1381        authority: AuthorityName,
1382        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1383        prometheus_registry: &Registry,
1384        consensus_client: Arc<dyn ConsensusClient>,
1385        checkpoint_store: Arc<CheckpointStore>,
1386    ) -> ConsensusAdapter {
1387        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1388        // The consensus adapter allows the authority to send user certificates through
1389        // consensus.
1390
1391        ConsensusAdapter::new(
1392            consensus_client,
1393            checkpoint_store,
1394            authority,
1395            connection_monitor_status,
1396            consensus_config.max_pending_transactions(),
1397            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1398            consensus_config.max_submit_position,
1399            consensus_config.submit_delay_step_override(),
1400            ca_metrics,
1401        )
1402    }
1403
1404    async fn start_grpc_validator_service(
1405        config: &NodeConfig,
1406        state: Arc<AuthorityState>,
1407        consensus_adapter: Arc<ConsensusAdapter>,
1408        prometheus_registry: &Registry,
1409    ) -> Result<SpawnOnce> {
1410        let validator_service = ValidatorService::new(
1411            state,
1412            consensus_adapter,
1413            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1414            config.policy_config.clone().map(|p| p.client_id_source),
1415        );
1416
1417        let mut server_conf = iota_network_stack::config::Config::new();
1418        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1419        server_conf.load_shed = config.grpc_load_shed;
1420        let server_builder =
1421            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1422                .add_service(ValidatorServer::new(validator_service));
1423
1424        let tls_config = iota_tls::create_rustls_server_config(
1425            config.network_key_pair().copy().private(),
1426            IOTA_TLS_SERVER_NAME.to_string(),
1427        );
1428
1429        let network_address = config.network_address().clone();
1430
1431        let bind_future = async move {
1432            let server = server_builder
1433                .bind(&network_address, Some(tls_config))
1434                .await
1435                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1436
1437            let local_addr = server.local_addr();
1438            info!("Listening to traffic on {local_addr}");
1439
1440            Ok(server)
1441        };
1442
1443        Ok(SpawnOnce::new(bind_future))
1444    }
1445
1446    /// Re-executes pending consensus certificates, which may not have been
1447    /// committed to disk before the node restarted. This is necessary for
1448    /// the following reasons:
1449    ///
1450    /// 1. For any transaction for which we returned signed effects to a client,
1451    ///    we must ensure that we have re-executed the transaction before we
1452    ///    begin accepting grpc requests. Otherwise we would appear to have
1453    ///    forgotten about the transaction.
1454    /// 2. While this is running, we are concurrently waiting for all previously
1455    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1456    ///    either direction (from checkpointed consensus transactions to pending
1457    ///    consensus transactions, or vice versa), we must re-execute pending
1458    ///    consensus transactions to ensure that both processes can complete.
1459    /// 3. Also note that for any pending consensus transactions for which we
1460    ///    wrote a signed effects digest to disk, we must re-execute using that
1461    ///    digest as the expected effects digest, to ensure that we cannot
1462    ///    arrive at different effects than what we previously signed.
1463    async fn reexecute_pending_consensus_certs(
1464        epoch_store: &Arc<AuthorityPerEpochStore>,
1465        state: &Arc<AuthorityState>,
1466    ) {
1467        let mut pending_consensus_certificates = Vec::new();
1468        let mut additional_certs = Vec::new();
1469
1470        for tx in epoch_store.get_all_pending_consensus_transactions() {
1471            match tx.kind {
1472                // Shared object txns cannot be re-executed at this point, because we must wait for
1473                // consensus replay to assign shared object versions.
1474                ConsensusTransactionKind::CertifiedTransaction(tx)
1475                    if !tx.contains_shared_object() =>
1476                {
1477                    let tx = *tx;
1478                    // new_unchecked is safe because we never submit a transaction to consensus
1479                    // without verifying it
1480                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1481                        VerifiedCertificate::new_unchecked(tx),
1482                    );
1483                    // we only need to re-execute if we previously signed the effects (which
1484                    // indicates we returned the effects to a client).
1485                    if let Some(fx_digest) = epoch_store
1486                        .get_signed_effects_digest(tx.digest())
1487                        .expect("db error")
1488                    {
1489                        pending_consensus_certificates.push((tx, fx_digest));
1490                    } else {
1491                        additional_certs.push(tx);
1492                    }
1493                }
1494                _ => (),
1495            }
1496        }
1497
1498        let digests = pending_consensus_certificates
1499            .iter()
1500            .map(|(tx, _)| *tx.digest())
1501            .collect::<Vec<_>>();
1502
1503        info!(
1504            "reexecuting {} pending consensus certificates: {:?}",
1505            digests.len(),
1506            digests
1507        );
1508
1509        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1510        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1511
1512        // If this times out, the validator will still almost certainly start up fine.
1513        // But, it is possible that it may temporarily "forget" about
1514        // transactions that it had previously executed. This could confuse
1515        // clients in some circumstances. However, the transactions are still in
1516        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1517        let timeout = if cfg!(msim) { 120 } else { 60 };
1518        if tokio::time::timeout(
1519            std::time::Duration::from_secs(timeout),
1520            state
1521                .get_transaction_cache_reader()
1522                .try_notify_read_executed_effects_digests(&digests),
1523        )
1524        .await
1525        .is_err()
1526        {
1527            // Log all the digests that were not executed to help debugging.
1528            if let Ok(executed_effects_digests) = state
1529                .get_transaction_cache_reader()
1530                .try_multi_get_executed_effects_digests(&digests)
1531            {
1532                let pending_digests = digests
1533                    .iter()
1534                    .zip(executed_effects_digests.iter())
1535                    .filter_map(|(digest, executed_effects_digest)| {
1536                        if executed_effects_digest.is_none() {
1537                            Some(digest)
1538                        } else {
1539                            None
1540                        }
1541                    })
1542                    .collect::<Vec<_>>();
1543                debug_fatal!(
1544                    "Timed out waiting for effects digests to be executed: {:?}",
1545                    pending_digests
1546                );
1547            } else {
1548                debug_fatal!(
1549                    "Timed out waiting for effects digests to be executed, digests not found"
1550                );
1551            }
1552        }
1553    }
1554
1555    pub fn state(&self) -> Arc<AuthorityState> {
1556        self.state.clone()
1557    }
1558
1559    // Only used for testing because of how epoch store is loaded.
1560    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1561        self.state.reference_gas_price_for_testing()
1562    }
1563
1564    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1565        self.state.committee_store().clone()
1566    }
1567
1568    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1569    // self.state.db()
1570    // }
1571
1572    /// Clone an AuthorityAggregator currently used in this node's
1573    /// QuorumDriver, if the node is a fullnode. After reconfig,
1574    /// QuorumDriver builds a new AuthorityAggregator. The caller
1575    /// of this function will mostly likely want to call this again
1576    /// to get a fresh one.
1577    pub fn clone_authority_aggregator(
1578        &self,
1579    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1580        self.transaction_orchestrator
1581            .as_ref()
1582            .map(|to| to.clone_authority_aggregator())
1583    }
1584
1585    pub fn transaction_orchestrator(
1586        &self,
1587    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1588        self.transaction_orchestrator.clone()
1589    }
1590
1591    pub fn subscribe_to_transaction_orchestrator_effects(
1592        &self,
1593    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1594        self.transaction_orchestrator
1595            .as_ref()
1596            .map(|to| to.subscribe_to_effects_queue())
1597            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1598    }
1599
1600    /// This function awaits the completion of checkpoint execution of the
1601    /// current epoch, after which it initiates reconfiguration of the
1602    /// entire system. This function also handles role changes for the node when
1603    /// epoch changes and advertises capabilities to the committee if the node
1604    /// is a validator.
1605    pub async fn monitor_reconfiguration(
1606        self: Arc<Self>,
1607        mut epoch_store: Arc<AuthorityPerEpochStore>,
1608    ) -> Result<()> {
1609        let checkpoint_executor_metrics =
1610            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1611
1612        loop {
1613            let mut hasher_guard = self.global_state_hasher.lock().await;
1614            let hasher = hasher_guard.take().unwrap();
1615            info!(
1616                "Creating checkpoint executor for epoch {}",
1617                epoch_store.epoch()
1618            );
1619
1620            // Create closures that handle gRPC type conversion
1621            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1622                guard.as_ref().map(|handle| {
1623                    let tx = handle.checkpoint_data_broadcaster().clone();
1624                    Box::new(move |data: &CheckpointData| {
1625                        tx.send_traced(data);
1626                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1627                })
1628            } else {
1629                None
1630            };
1631
1632            let checkpoint_executor = CheckpointExecutor::new(
1633                epoch_store.clone(),
1634                self.checkpoint_store.clone(),
1635                self.state.clone(),
1636                hasher.clone(),
1637                self.backpressure_manager.clone(),
1638                self.config.checkpoint_executor_config.clone(),
1639                checkpoint_executor_metrics.clone(),
1640                data_sender,
1641                Some(self.checkpoint_progress_tracker.clone()),
1642            );
1643
1644            let run_with_range = self.config.run_with_range;
1645
1646            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1647
1648            // Update the current protocol version metric.
1649            self.metrics
1650                .current_protocol_version
1651                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1652
1653            // Advertise capabilities to committee, if we are a validator.
1654            if let Some(components) = &*self.validator_components.lock().await {
1655                // TODO: without this sleep, the consensus message is not delivered reliably.
1656                tokio::time::sleep(Duration::from_millis(1)).await;
1657
1658                let config = cur_epoch_store.protocol_config();
1659                let binary_config = to_binary_config(config);
1660                let transaction = ConsensusTransaction::new_capability_notification_v1(
1661                    AuthorityCapabilitiesV1::new(
1662                        self.state.name,
1663                        cur_epoch_store.get_chain(),
1664                        self.config
1665                            .supported_protocol_versions
1666                            .expect("Supported versions should be populated")
1667                            // no need to send digests of versions less than the current version
1668                            .truncate_below(config.version),
1669                        self.state
1670                            .get_available_system_packages(&binary_config)
1671                            .await,
1672                    ),
1673                );
1674                info!(?transaction, "submitting capabilities to consensus");
1675                components
1676                    .consensus_adapter
1677                    .submit(transaction, None, &cur_epoch_store)?;
1678            } else if self.state.is_active_validator(&cur_epoch_store)
1679                && cur_epoch_store
1680                    .protocol_config()
1681                    .track_non_committee_eligible_validators()
1682            {
1683                // Send signed capabilities to committee validators if we are a non-committee
1684                // validator in a separate task to not block the caller. Sending is done only if
1685                // the feature flag supporting it is enabled.
1686                let epoch_store = cur_epoch_store.clone();
1687                let node_clone = self.clone();
1688                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1689                    node_clone
1690                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1691                        .instrument(trace_span!(
1692                            "send_signed_capability_notification_to_committee_with_retry"
1693                        ))
1694                        .await;
1695                }));
1696            }
1697
1698            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1699
1700            if stop_condition == StopReason::RunWithRangeCondition {
1701                IotaNode::shutdown(&self).await;
1702                self.shutdown_channel_tx
1703                    .send(run_with_range)
1704                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1705                return Ok(());
1706            }
1707
1708            // Safe to call because we are in the middle of reconfiguration.
1709            let latest_system_state = self
1710                .state
1711                .get_object_cache_reader()
1712                .try_get_iota_system_state_object_unsafe()
1713                .expect("Read IOTA System State object cannot fail");
1714
1715            #[cfg(msim)]
1716            if !self
1717                .sim_state
1718                .sim_safe_mode_expected
1719                .load(Ordering::Relaxed)
1720            {
1721                debug_assert!(!latest_system_state.safe_mode());
1722            }
1723
1724            #[cfg(not(msim))]
1725            debug_assert!(!latest_system_state.safe_mode());
1726
1727            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1728                if self.state.is_fullnode(&cur_epoch_store) {
1729                    warn!(
1730                        "Failed to send end of epoch notification to subscriber: {:?}",
1731                        err
1732                    );
1733                }
1734            }
1735
1736            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1737            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1738
1739            self.auth_agg.store(Arc::new(
1740                self.auth_agg
1741                    .load()
1742                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1743            ));
1744
1745            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1746            let next_epoch = next_epoch_committee.epoch();
1747            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1748
1749            info!(
1750                next_epoch,
1751                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1752            );
1753
1754            fail_point_async!("reconfig_delay");
1755
1756            // We save the connection monitor status map regardless of validator / fullnode
1757            // status so that we don't need to restart the connection monitor
1758            // every epoch. Update the mappings that will be used by the
1759            // consensus adapter if it exists or is about to be created.
1760            let authority_names_to_peer_ids =
1761                new_epoch_start_state.get_authority_names_to_peer_ids();
1762            self.connection_monitor_status
1763                .update_mapping_for_epoch(authority_names_to_peer_ids);
1764
1765            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1766
1767            send_trusted_peer_change(
1768                &self.config,
1769                &self.trusted_peer_change_tx,
1770                &new_epoch_start_state,
1771            );
1772
1773            let mut validator_components_lock_guard = self.validator_components.lock().await;
1774
1775            // The following code handles 4 different cases, depending on whether the node
1776            // was a validator in the previous epoch, and whether the node is a validator
1777            // in the new epoch.
1778            let new_epoch_store = self
1779                .reconfigure_state(
1780                    &self.state,
1781                    &cur_epoch_store,
1782                    next_epoch_committee.clone(),
1783                    new_epoch_start_state,
1784                    hasher.clone(),
1785                )
1786                .await?;
1787
1788            let new_validator_components = if let Some(ValidatorComponents {
1789                validator_server_handle,
1790                validator_overload_monitor_handle,
1791                consensus_manager,
1792                consensus_store_pruner,
1793                consensus_adapter,
1794                mut checkpoint_service_tasks,
1795                checkpoint_metrics,
1796                iota_tx_validator_metrics,
1797                validator_registry_id,
1798            }) = validator_components_lock_guard.take()
1799            {
1800                info!("Reconfiguring the validator.");
1801                // Cancel the old checkpoint service tasks.
1802                // Waiting for checkpoint builder to finish gracefully is not possible, because
1803                // it may wait on transactions while consensus on peers have
1804                // already shut down.
1805                checkpoint_service_tasks.abort_all();
1806                while let Some(result) = checkpoint_service_tasks.join_next().await {
1807                    if let Err(err) = result {
1808                        if err.is_panic() {
1809                            std::panic::resume_unwind(err.into_panic());
1810                        }
1811                        warn!("Error in checkpoint service task: {:?}", err);
1812                    }
1813                }
1814                info!("Checkpoint service has shut down.");
1815
1816                consensus_manager.shutdown().await;
1817                info!("Consensus has shut down.");
1818
1819                info!("Epoch store finished reconfiguration.");
1820
1821                // No other components should be holding a strong reference to state hasher
1822                // at this point. Confirm here before we swap in the new hasher.
1823                let global_state_hasher_metrics = Arc::into_inner(hasher)
1824                    .expect("Object state hasher should have no other references at this point")
1825                    .metrics();
1826                let new_hasher = Arc::new(GlobalStateHasher::new(
1827                    self.state.get_global_state_hash_store().clone(),
1828                    global_state_hasher_metrics,
1829                ));
1830                let weak_hasher = Arc::downgrade(&new_hasher);
1831                *hasher_guard = Some(new_hasher);
1832
1833                consensus_store_pruner.prune(next_epoch).await;
1834
1835                if self.state.is_committee_validator(&new_epoch_store) {
1836                    // Only restart consensus if this node is still a validator in the new epoch.
1837                    Some(
1838                        Self::start_epoch_specific_validator_components(
1839                            &self.config,
1840                            self.state.clone(),
1841                            consensus_adapter,
1842                            self.checkpoint_store.clone(),
1843                            new_epoch_store.clone(),
1844                            self.state_sync_handle.clone(),
1845                            self.randomness_handle.clone(),
1846                            consensus_manager,
1847                            consensus_store_pruner,
1848                            weak_hasher,
1849                            self.backpressure_manager.clone(),
1850                            validator_server_handle,
1851                            validator_overload_monitor_handle,
1852                            checkpoint_metrics,
1853                            iota_tx_validator_metrics,
1854                            validator_registry_id,
1855                        )
1856                        .await?,
1857                    )
1858                } else {
1859                    info!("This node is no longer a validator after reconfiguration");
1860                    if self.registry_service.remove(validator_registry_id) {
1861                        debug!("Removed validator metrics registry");
1862                    } else {
1863                        warn!("Failed to remove validator metrics registry");
1864                    }
1865                    validator_server_handle.shutdown();
1866                    debug!("Validator grpc server shutdown triggered");
1867
1868                    None
1869                }
1870            } else {
1871                // No other components should be holding a strong reference to state hasher
1872                // at this point. Confirm here before we swap in the new hasher.
1873                let global_state_hasher_metrics = Arc::into_inner(hasher)
1874                    .expect("Object state hasher should have no other references at this point")
1875                    .metrics();
1876                let new_hasher = Arc::new(GlobalStateHasher::new(
1877                    self.state.get_global_state_hash_store().clone(),
1878                    global_state_hasher_metrics,
1879                ));
1880                let weak_hasher = Arc::downgrade(&new_hasher);
1881                *hasher_guard = Some(new_hasher);
1882
1883                if self.state.is_committee_validator(&new_epoch_store) {
1884                    info!("Promoting the node from fullnode to validator, starting grpc server");
1885
1886                    let mut components = Self::construct_validator_components(
1887                        self.config.clone(),
1888                        self.state.clone(),
1889                        Arc::new(next_epoch_committee.clone()),
1890                        new_epoch_store.clone(),
1891                        self.checkpoint_store.clone(),
1892                        self.state_sync_handle.clone(),
1893                        self.randomness_handle.clone(),
1894                        weak_hasher,
1895                        self.backpressure_manager.clone(),
1896                        self.connection_monitor_status.clone(),
1897                        &self.registry_service,
1898                    )
1899                    .await?;
1900
1901                    components.validator_server_handle =
1902                        components.validator_server_handle.start().await;
1903
1904                    Some(components)
1905                } else {
1906                    None
1907                }
1908            };
1909            *validator_components_lock_guard = new_validator_components;
1910
1911            // Force releasing current epoch store DB handle, because the
1912            // Arc<AuthorityPerEpochStore> may linger.
1913            cur_epoch_store.release_db_handles();
1914
1915            // Drop the old epoch store to free its in-memory structures
1916            // (ConsensusOutputCache, ConsensusQuarantine, DashMaps, etc.).
1917            // The DB tables were already released above.
1918            drop(cur_epoch_store);
1919
1920            // Prune old epoch databases after each epoch transition to prevent
1921            // accumulation of RocksDB instances during fast catch-up sync
1922            // (e.g. syncing from genesis).
1923            self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
1924
1925            if cfg!(msim)
1926                && !matches!(
1927                    self.config
1928                        .authority_store_pruning_config
1929                        .num_epochs_to_retain_for_checkpoints(),
1930                    None | Some(u64::MAX) | Some(0)
1931                )
1932            {
1933                self.state
1934                    .prune_checkpoints_for_eligible_epochs_for_testing(
1935                        self.config.clone(),
1936                        iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1937                    )
1938                    .await?;
1939            }
1940
1941            epoch_store = new_epoch_store;
1942            info!("Reconfiguration finished");
1943        }
1944    }
1945
1946    async fn shutdown(&self) {
1947        if let Some(validator_components) = &*self.validator_components.lock().await {
1948            validator_components.consensus_manager.shutdown().await;
1949        }
1950
1951        // Shutdown the gRPC server if it's running
1952        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1953            info!("Shutting down gRPC server");
1954            if let Err(e) = grpc_handle.shutdown().await {
1955                warn!("Failed to gracefully shutdown gRPC server: {e}");
1956            }
1957        }
1958    }
1959
1960    /// Asynchronously reconfigures the state of the authority node for the next
1961    /// epoch.
1962    async fn reconfigure_state(
1963        &self,
1964        state: &Arc<AuthorityState>,
1965        cur_epoch_store: &AuthorityPerEpochStore,
1966        next_epoch_committee: Committee,
1967        next_epoch_start_system_state: EpochStartSystemState,
1968        global_state_hasher: Arc<GlobalStateHasher>,
1969    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1970        let next_epoch = next_epoch_committee.epoch();
1971
1972        let last_checkpoint = self
1973            .checkpoint_store
1974            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1975            .expect("Error loading last checkpoint for current epoch")
1976            .expect("Could not load last checkpoint for current epoch");
1977        let epoch_supply_change = last_checkpoint
1978            .end_of_epoch_data
1979            .as_ref()
1980            .ok_or_else(|| {
1981                IotaError::from("last checkpoint in epoch should contain end of epoch data")
1982            })?
1983            .epoch_supply_change;
1984
1985        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1986
1987        assert_eq!(
1988            Some(last_checkpoint_seq),
1989            self.checkpoint_store
1990                .get_highest_executed_checkpoint_seq_number()
1991                .expect("Error loading highest executed checkpoint sequence number")
1992        );
1993
1994        let epoch_start_configuration = EpochStartConfiguration::new(
1995            next_epoch_start_system_state,
1996            *last_checkpoint.digest(),
1997            state.get_object_store().as_ref(),
1998            EpochFlag::default_flags_for_new_epoch(&state.config),
1999        )
2000        .expect("EpochStartConfiguration construction cannot fail");
2001
2002        let new_epoch_store = self
2003            .state
2004            .reconfigure(
2005                cur_epoch_store,
2006                self.config.supported_protocol_versions.unwrap(),
2007                next_epoch_committee,
2008                epoch_start_configuration,
2009                global_state_hasher,
2010                &self.config.expensive_safety_check_config,
2011                epoch_supply_change,
2012                last_checkpoint_seq,
2013            )
2014            .await
2015            .expect("Reconfigure authority state cannot fail");
2016        info!(next_epoch, "Node State has been reconfigured");
2017        assert_eq!(next_epoch, new_epoch_store.epoch());
2018        self.state.get_reconfig_api().update_epoch_flags_metrics(
2019            cur_epoch_store.epoch_start_config().flags(),
2020            new_epoch_store.epoch_start_config().flags(),
2021        );
2022
2023        Ok(new_epoch_store)
2024    }
2025
2026    pub fn get_config(&self) -> &NodeConfig {
2027        &self.config
2028    }
2029
2030    async fn execute_transaction_immediately_at_zero_epoch(
2031        state: &Arc<AuthorityState>,
2032        epoch_store: &Arc<AuthorityPerEpochStore>,
2033        tx: &Transaction,
2034        span: tracing::Span,
2035    ) {
2036        let _guard = span.enter();
2037        let transaction =
2038            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2039                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2040                    tx.data().clone(),
2041                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2042                ),
2043            );
2044        state
2045            .try_execute_immediately(&transaction, None, epoch_store)
2046            .unwrap();
2047    }
2048
2049    pub fn randomness_handle(&self) -> randomness::Handle {
2050        self.randomness_handle.clone()
2051    }
2052
2053    /// Sends signed capability notification to committee validators for
2054    /// non-committee validators. This method implements retry logic to handle
2055    /// failed attempts to send the notification. It will retry sending the
2056    /// notification with an increasing interval until it receives a successful
2057    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2058    async fn send_signed_capability_notification_to_committee_with_retry(
2059        &self,
2060        epoch_store: &Arc<AuthorityPerEpochStore>,
2061    ) {
2062        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2063        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2064        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2065
2066        // Create the capability notification once
2067        let config = epoch_store.protocol_config();
2068        let binary_config = to_binary_config(config);
2069
2070        // Create the capability notification
2071        let capabilities = AuthorityCapabilitiesV1::new(
2072            self.state.name,
2073            epoch_store.get_chain(),
2074            self.config
2075                .supported_protocol_versions
2076                .expect("Supported versions should be populated")
2077                .truncate_below(config.version),
2078            self.state
2079                .get_available_system_packages(&binary_config)
2080                .await,
2081        );
2082
2083        // Sign the capabilities using the authority key pair from config
2084        let signature = AuthoritySignature::new_secure(
2085            &IntentMessage::new(
2086                Intent::iota_app(IntentScope::AuthorityCapabilities),
2087                &capabilities,
2088            ),
2089            &epoch_store.epoch(),
2090            self.config.authority_key_pair(),
2091        );
2092
2093        let request = HandleCapabilityNotificationRequestV1 {
2094            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2095        };
2096
2097        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2098
2099        loop {
2100            let auth_agg = self.auth_agg.load();
2101            match auth_agg
2102                .send_capability_notification_to_quorum(request.clone())
2103                .await
2104            {
2105                Ok(_) => {
2106                    info!("Successfully sent capability notification to committee");
2107                    break;
2108                }
2109                Err(err) => {
2110                    match &err {
2111                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2112                            errors,
2113                        } => {
2114                            warn!(
2115                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2116                                retry_interval, errors
2117                            );
2118                        }
2119                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2120                            errors,
2121                        } => {
2122                            error!(
2123                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2124                                errors
2125                            );
2126                            break;
2127                        }
2128                    };
2129
2130                    // Wait before retrying
2131                    tokio::time::sleep(retry_interval).await;
2132
2133                    // Increase retry interval for the next attempt, capped at max
2134                    retry_interval = std::cmp::min(
2135                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2136                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2137                    );
2138                }
2139            }
2140        }
2141    }
2142}
2143
2144#[cfg(msim)]
2145impl IotaNode {
2146    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2147        self.sim_state.sim_node.id()
2148    }
2149
2150    pub fn set_safe_mode_expected(&self, new_value: bool) {
2151        info!("Setting safe mode expected to {}", new_value);
2152        self.sim_state
2153            .sim_safe_mode_expected
2154            .store(new_value, Ordering::Relaxed);
2155    }
2156}
2157
2158enum SpawnOnce {
2159    // Mutex is only needed to make SpawnOnce Sync
2160    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2161    #[allow(unused)]
2162    Started(iota_http::ServerHandle),
2163}
2164
2165impl SpawnOnce {
2166    pub fn new(
2167        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2168    ) -> Self {
2169        Self::Unstarted(Mutex::new(Box::pin(future)))
2170    }
2171
2172    pub async fn start(self) -> Self {
2173        match self {
2174            Self::Unstarted(future) => {
2175                let server = future
2176                    .into_inner()
2177                    .await
2178                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2179                let handle = server.handle().clone();
2180                tokio::spawn(async move {
2181                    if let Err(err) = server.serve().await {
2182                        info!("Server stopped: {err}");
2183                    }
2184                    info!("Server stopped");
2185                });
2186                Self::Started(handle)
2187            }
2188            Self::Started(_) => self,
2189        }
2190    }
2191
2192    pub fn shutdown(self) {
2193        if let SpawnOnce::Started(handle) = self {
2194            handle.trigger_shutdown();
2195        }
2196    }
2197}
2198
2199/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2200/// available.
2201fn send_trusted_peer_change(
2202    config: &NodeConfig,
2203    sender: &watch::Sender<TrustedPeerChangeEvent>,
2204    new_epoch_start_state: &EpochStartSystemState,
2205) {
2206    let new_committee =
2207        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2208
2209    sender.send_modify(|event| {
2210        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2211        event.new_committee = new_committee;
2212    })
2213}
2214
2215fn build_kv_store(
2216    state: &Arc<AuthorityState>,
2217    config: &NodeConfig,
2218    registry: &Registry,
2219) -> Result<Arc<TransactionKeyValueStore>> {
2220    let metrics = KeyValueStoreMetrics::new(registry);
2221    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2222
2223    let base_url = &config.transaction_kv_store_read_config.base_url;
2224
2225    if base_url.is_empty() {
2226        info!("no http kv store url provided, using local db only");
2227        return Ok(Arc::new(db_store));
2228    }
2229
2230    base_url.parse::<url::Url>().tap_err(|e| {
2231        error!(
2232            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2233            base_url, e
2234        )
2235    })?;
2236
2237    let http_store = HttpKVStore::new_kv(
2238        base_url,
2239        config.transaction_kv_store_read_config.cache_size,
2240        metrics.clone(),
2241    )?;
2242    info!("using local key-value store with fallback to http key-value store");
2243    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2244        db_store,
2245        http_store,
2246        metrics,
2247        "json_rpc_fallback",
2248    )))
2249}
2250
2251/// Builds and starts the gRPC server for the IOTA node based on the node's
2252/// configuration.
2253///
2254/// This function performs the following tasks:
2255/// 1. Checks if the node is a validator by inspecting the consensus
2256///    configuration; if so, it returns early as validators do not expose gRPC
2257///    APIs.
2258/// 2. Checks if gRPC is enabled in the configuration.
2259/// 3. Creates broadcast channels for checkpoint streaming.
2260/// 4. Initializes the gRPC checkpoint service.
2261/// 5. Spawns the gRPC server to listen for incoming connections.
2262///
2263/// Returns a tuple of optional broadcast channels for checkpoint summary and
2264/// data.
2265async fn build_grpc_server(
2266    config: &NodeConfig,
2267    state: Arc<AuthorityState>,
2268    state_sync_store: RocksDbStore,
2269    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2270    prometheus_registry: &Registry,
2271    server_version: ServerVersion,
2272) -> Result<Option<GrpcServerHandle>> {
2273    // Validators do not expose gRPC APIs
2274    if config.consensus_config().is_some() || !config.enable_grpc_api {
2275        return Ok(None);
2276    }
2277
2278    let Some(grpc_config) = &config.grpc_api_config else {
2279        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2280    };
2281
2282    // Get chain identifier from state directly
2283    let chain_id = state.get_chain_identifier();
2284
2285    let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2286
2287    // Create cancellation token for proper shutdown hierarchy
2288    let shutdown_token = CancellationToken::new();
2289
2290    // Create GrpcReader
2291    let grpc_reader = Arc::new(GrpcReader::new(
2292        grpc_read_store,
2293        Some(server_version.to_string()),
2294    ));
2295
2296    // Create gRPC server metrics
2297    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2298
2299    let handle = start_grpc_server(
2300        grpc_reader,
2301        executor,
2302        grpc_config.clone(),
2303        shutdown_token,
2304        chain_id,
2305        Some(grpc_server_metrics),
2306    )
2307    .await?;
2308
2309    Ok(Some(handle))
2310}
2311
2312/// Builds and starts the HTTP server for the IOTA node, exposing the JSON-RPC
2313/// API based on the node's configuration.
2314///
2315/// This function performs the following tasks:
2316/// 1. Checks if the node is a validator by inspecting the consensus
2317///    configuration; if so, it returns early as validators do not expose these
2318///    APIs.
2319/// 2. Creates an Axum router to handle HTTP requests.
2320/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2321///    on the node's state and configuration, including CoinApi,
2322///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2323///    IndexerApi.
2324/// 4. Binds the server to the specified JSON-RPC address and starts listening
2325///    for incoming connections.
2326pub async fn build_http_server(
2327    state: Arc<AuthorityState>,
2328    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2329    config: &NodeConfig,
2330    prometheus_registry: &Registry,
2331) -> Result<Option<iota_http::ServerHandle>> {
2332    // Validators do not expose these APIs
2333    if config.consensus_config().is_some() {
2334        return Ok(None);
2335    }
2336
2337    let mut router = axum::Router::new();
2338
2339    let json_rpc_router = {
2340        let traffic_controller = state.traffic_controller.clone();
2341        let mut server = JsonRpcServerBuilder::new(
2342            env!("CARGO_PKG_VERSION"),
2343            prometheus_registry,
2344            traffic_controller,
2345            config.policy_config.clone(),
2346        );
2347
2348        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2349
2350        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2351        server.register_module(ReadApi::new(
2352            state.clone(),
2353            kv_store.clone(),
2354            metrics.clone(),
2355        ))?;
2356        server.register_module(CoinReadApi::new(
2357            state.clone(),
2358            kv_store.clone(),
2359            metrics.clone(),
2360        )?)?;
2361
2362        // if run_with_range is enabled we want to prevent any transactions
2363        // run_with_range = None is normal operating conditions
2364        if config.run_with_range.is_none() {
2365            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2366        }
2367        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2368
2369        if let Some(transaction_orchestrator) = transaction_orchestrator {
2370            server.register_module(TransactionExecutionApi::new(
2371                state.clone(),
2372                transaction_orchestrator.clone(),
2373                metrics.clone(),
2374            ))?;
2375        }
2376
2377        let iota_names_config = config
2378            .iota_names_config
2379            .clone()
2380            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2381
2382        server.register_module(IndexerApi::new(
2383            state.clone(),
2384            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2385            kv_store,
2386            metrics,
2387            iota_names_config,
2388            config.indexer_max_subscriptions,
2389        ))?;
2390        server.register_module(MoveUtils::new(state.clone()))?;
2391
2392        let server_type = config.jsonrpc_server_type();
2393
2394        server.to_router(server_type).await?
2395    };
2396
2397    router = router.merge(json_rpc_router);
2398
2399    router = router
2400        .route("/health", axum::routing::get(health_check_handler))
2401        .route_layer(axum::Extension(state));
2402
2403    let layers = ServiceBuilder::new()
2404        .map_request(|mut request: axum::http::Request<_>| {
2405            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2406                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2407                request.extensions_mut().insert(axum_connect_info);
2408            }
2409            request
2410        })
2411        .layer(axum::middleware::from_fn(server_timing_middleware));
2412
2413    router = router.layer(layers);
2414
2415    let handle = iota_http::Builder::new()
2416        .serve(&config.json_rpc_address, router)
2417        .map_err(|e| anyhow::anyhow!("{e}"))?;
2418    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2419
2420    Ok(Some(handle))
2421}
2422
2423#[derive(Debug, serde::Serialize, serde::Deserialize)]
2424pub struct Threshold {
2425    pub threshold_seconds: Option<u32>,
2426}
2427
2428async fn health_check_handler(
2429    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2430    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2431) -> impl axum::response::IntoResponse {
2432    if let Some(threshold_seconds) = threshold_seconds {
2433        // Attempt to get the latest checkpoint
2434        let summary = match state
2435            .get_checkpoint_store()
2436            .get_highest_executed_checkpoint()
2437        {
2438            Ok(Some(summary)) => summary,
2439            Ok(None) => {
2440                warn!("Highest executed checkpoint not found");
2441                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2442            }
2443            Err(err) => {
2444                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2445                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2446            }
2447        };
2448
2449        // Calculate the threshold time based on the provided threshold_seconds
2450        let latest_chain_time = summary.timestamp();
2451        let threshold =
2452            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2453
2454        // Check if the latest checkpoint is within the threshold
2455        if latest_chain_time < threshold {
2456            warn!(
2457                ?latest_chain_time,
2458                ?threshold,
2459                "failing health check due to checkpoint lag"
2460            );
2461            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2462        }
2463    }
2464    // if health endpoint is responding and no threshold is given, respond success
2465    (axum::http::StatusCode::OK, "up")
2466}
2467
2468#[cfg(not(test))]
2469fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2470    protocol_config.max_transactions_per_checkpoint() as usize
2471}
2472
2473#[cfg(test)]
2474fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2475    2
2476}