Skip to main content

iota_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8    collections::HashMap,
9    fmt,
10    future::Future,
11    path::PathBuf,
12    sync::{Arc, Weak},
13    time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18    callback::CallbackLayer,
19    trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28    ConsensusConfig, NodeConfig,
29    node::{DBCheckpointConfig, RunWithRange},
30    node_config_metrics::NodeConfigMetrics,
31    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34    authority::{
35        AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36        authority_per_epoch_store::AuthorityPerEpochStore,
37        authority_store_pruner::ObjectsCompactionFilter,
38        authority_store_tables::{
39            AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40        },
41        backpressure::BackpressureManager,
42        epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43    },
44    authority_aggregator::{
45        AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46    },
47    authority_client::NetworkAuthorityClient,
48    authority_server::{ValidatorService, ValidatorServiceMetrics},
49    checkpoint_progress_tracker::CheckpointProgressTracker,
50    checkpoints::{
51        CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
52        SubmitCheckpointToConsensus,
53        checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
54    },
55    connection_monitor::ConnectionMonitor,
56    consensus_adapter::{
57        CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
58        ConsensusClient,
59    },
60    consensus_handler::ConsensusHandlerInitializer,
61    consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
62    consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
63    db_checkpoint_handler::DBCheckpointHandler,
64    epoch::{
65        committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
66        epoch_metrics::EpochMetrics, randomness::RandomnessManager,
67        reconfiguration::ReconfigurationInitiator,
68    },
69    execution_cache::build_execution_cache,
70    global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
71    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72    jsonrpc_index::IndexStore,
73    module_cache_metrics::ResolverMetrics,
74    overload_monitor::overload_monitor,
75    safe_client::SafeClientMetricsBase,
76    signature_verifier::SignatureVerifierMetrics,
77    storage::{GrpcReadStore, RocksDbStore},
78    transaction_orchestrator::TransactionOrchestrator,
79    validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
82use iota_json_rpc::{
83    JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
84    indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
85    transaction_builder_api::TransactionBuilderApi,
86    transaction_execution_api::TransactionExecutionApi,
87};
88use iota_json_rpc_api::JsonRpcMetrics;
89use iota_macros::{fail_point, fail_point_async, replay_log};
90use iota_metrics::{
91    RegistryID, RegistryService,
92    hardware_metrics::register_hardware_metrics,
93    metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
94    server_timing_middleware, spawn_monitored_task,
95};
96use iota_names::config::IotaNamesConfig;
97use iota_network::{
98    api::{ValidatorPeerServer, ValidatorServer, ValidatorV2Server},
99    discovery,
100    discovery::TrustedPeerChangeEvent,
101    randomness, state_sync,
102};
103use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
104use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
105use iota_sdk_types::{
106    RandomnessRound,
107    crypto::{Intent, IntentMessage, IntentScope},
108};
109use iota_snapshot::uploader::StateSnapshotUploader;
110use iota_storage::{
111    FileCompression, StorageFormat,
112    http_key_value_store::HttpKVStore,
113    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
114    key_value_store_metrics::KeyValueStoreMetrics,
115};
116use iota_types::{
117    base_types::{AuthorityName, ConciseableName, EpochId},
118    committee::Committee,
119    crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits},
120    digests::ChainIdentifier,
121    error::{IotaError, IotaResult},
122    executable_transaction::VerifiedExecutableTransaction,
123    execution_config_utils::to_binary_config,
124    full_checkpoint_content::CheckpointData,
125    iota_system_state::{
126        IotaSystemState, IotaSystemStateTrait,
127        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
128    },
129    messages_consensus::{
130        AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
131        SignedAuthorityCapabilitiesV1,
132    },
133    messages_grpc::HandleCapabilityNotificationRequestV1,
134    quorum_driver_types::QuorumDriverEffectsQueueResult,
135    supported_protocol_versions::SupportedProtocolVersions,
136    transaction::{Transaction, VerifiedCertificate},
137};
138use prometheus::Registry;
139#[cfg(msim)]
140use simulator::*;
141use tap::tap::TapFallible;
142use tokio::{
143    sync::{Mutex, broadcast, mpsc, watch},
144    task::{JoinHandle, JoinSet},
145};
146use tokio_util::sync::CancellationToken;
147use tower::ServiceBuilder;
148use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
149use typed_store::{
150    DBMetrics,
151    rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
152};
153
154use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
155
156pub mod admin;
157mod handle;
158pub mod metrics;
159
160pub struct ValidatorComponents {
161    validator_server_handle: SpawnOnce,
162    validator_overload_monitor_handle: Option<JoinHandle<()>>,
163    consensus_manager: ConsensusManager,
164    consensus_store_pruner: ConsensusStorePruner,
165    consensus_adapter: Arc<ConsensusAdapter>,
166    // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
167    checkpoint_service_tasks: JoinSet<()>,
168    checkpoint_metrics: Arc<CheckpointMetrics>,
169    iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
170    validator_registry_id: RegistryID,
171}
172
173#[cfg(msim)]
174mod simulator {
175    use std::sync::atomic::AtomicBool;
176
177    pub(super) struct SimState {
178        pub sim_node: iota_simulator::runtime::NodeHandle,
179        pub sim_safe_mode_expected: AtomicBool,
180        _leak_detector: iota_simulator::NodeLeakDetector,
181    }
182
183    impl Default for SimState {
184        fn default() -> Self {
185            Self {
186                sim_node: iota_simulator::runtime::NodeHandle::current(),
187                sim_safe_mode_expected: AtomicBool::new(false),
188                _leak_detector: iota_simulator::NodeLeakDetector::new(),
189            }
190        }
191    }
192}
193
194#[derive(Clone)]
195pub struct ServerVersion {
196    pub bin: &'static str,
197    pub version: &'static str,
198}
199
200impl ServerVersion {
201    pub fn new(bin: &'static str, version: &'static str) -> Self {
202        Self { bin, version }
203    }
204}
205
206impl std::fmt::Display for ServerVersion {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        f.write_str(self.bin)?;
209        f.write_str("/")?;
210        f.write_str(self.version)
211    }
212}
213
214pub struct IotaNode {
215    config: NodeConfig,
216    validator_components: Mutex<Option<ValidatorComponents>>,
217    /// The http server responsible for serving JSON-RPC
218    _http_server: Option<iota_http::ServerHandle>,
219    state: Arc<AuthorityState>,
220    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
221    registry_service: RegistryService,
222    metrics: Arc<IotaNodeMetrics>,
223
224    _discovery: discovery::Handle,
225    state_sync_handle: state_sync::Handle,
226    randomness_handle: randomness::Handle,
227    checkpoint_store: Arc<CheckpointStore>,
228    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
229    connection_monitor_status: Arc<ConnectionMonitorStatus>,
230
231    /// Broadcast channel to send the starting system state for the next epoch.
232    end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
233
234    /// Broadcast channel to notify [`DiscoveryEventLoop`] for new validator
235    /// peers.
236    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
237
238    backpressure_manager: Arc<BackpressureManager>,
239
240    checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
241
242    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
243
244    #[cfg(msim)]
245    sim_state: SimState,
246
247    _state_archive_handle: Option<broadcast::Sender<()>>,
248
249    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
250    // Channel to allow signaling upstream to shutdown iota-node
251    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
252
253    /// Handle to the gRPC server for gRPC streaming and graceful shutdown
254    grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
255
256    /// AuthorityAggregator of the network, created at start and beginning of
257    /// each epoch. Use ArcSwap so that we could mutate it without taking
258    /// mut reference.
259    // TODO: Eventually we can make this auth aggregator a shared reference so that this
260    // update will automatically propagate to other uses.
261    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
262}
263
264impl fmt::Debug for IotaNode {
265    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
266        f.debug_struct("IotaNode")
267            .field("name", &self.state.name.concise())
268            .finish()
269    }
270}
271
272impl IotaNode {
273    pub async fn start(
274        config: NodeConfig,
275        registry_service: RegistryService,
276    ) -> Result<Arc<IotaNode>> {
277        Self::start_async(
278            config,
279            registry_service,
280            ServerVersion::new("iota-node", "unknown"),
281        )
282        .await
283    }
284
285    pub async fn start_async(
286        config: NodeConfig,
287        registry_service: RegistryService,
288        server_version: ServerVersion,
289    ) -> Result<Arc<IotaNode>> {
290        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
291        let mut config = config.clone();
292        if config.supported_protocol_versions.is_none() {
293            info!(
294                "populating config.supported_protocol_versions with default {:?}",
295                SupportedProtocolVersions::SYSTEM_DEFAULT
296            );
297            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
298        }
299
300        let run_with_range = config.run_with_range;
301        let is_validator = config.consensus_config().is_some();
302        let is_full_node = !is_validator;
303        let prometheus_registry = registry_service.default_registry();
304
305        info!(node =? config.authority_public_key(),
306            "Initializing iota-node listening on {}", config.network_address
307        );
308
309        let genesis = config.genesis()?.clone();
310
311        let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
312        info!("IOTA chain identifier: {chain_identifier}");
313
314        // Check and set the db_corrupted flag
315        let db_corrupted_path = &config.db_path().join("status");
316        if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
317            panic!("Failed to check database corruption: {err}");
318        }
319
320        // Initialize metrics to track db usage before creating any stores
321        DBMetrics::init(&prometheus_registry);
322
323        // Initialize IOTA metrics.
324        iota_metrics::init_metrics(&prometheus_registry);
325        // Unsupported (because of the use of static variable) and unnecessary in
326        // simtests.
327        #[cfg(not(msim))]
328        iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
329
330        // Register hardware metrics.
331        register_hardware_metrics(&registry_service, &config.db_path)
332            .expect("Failed registering hardware metrics");
333        // Register uptime metric
334        prometheus_registry
335            .register(iota_metrics::uptime_metric(
336                if is_validator {
337                    "validator"
338                } else {
339                    "fullnode"
340                },
341                server_version.version,
342                &chain_identifier.to_string(),
343            ))
344            .expect("Failed registering uptime metric");
345
346        // If genesis come with some migration data then load them into memory from the
347        // file path specified in config.
348        let migration_tx_data = if genesis.contains_migrations() {
349            // Here the load already verifies that the content of the migration blob is
350            // valid in respect to the content found in genesis
351            Some(config.load_migration_tx_data()?)
352        } else {
353            None
354        };
355
356        let secret = Arc::pin(config.authority_key_pair().copy());
357        let genesis_committee = genesis.committee()?;
358        let committee_store = Arc::new(CommitteeStore::new(
359            config.db_path().join("epochs"),
360            &genesis_committee,
361            None,
362        ));
363
364        let mut pruner_db = None;
365        if config
366            .authority_store_pruning_config
367            .enable_compaction_filter
368        {
369            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
370                &config.db_path().join("store"),
371            )));
372        }
373        let compaction_filter = pruner_db
374            .clone()
375            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
376
377        // By default, only enable write stall on validators for perpetual db.
378        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
379        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
380            enable_write_stall,
381            compaction_filter,
382        };
383        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
384            &config.db_path().join("store"),
385            Some(perpetual_tables_options),
386        ));
387        let is_genesis = perpetual_tables
388            .database_is_empty()
389            .expect("Database read should not fail at init.");
390        let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
391        let backpressure_manager =
392            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
393
394        let perpetual_tables_for_progress = perpetual_tables.clone();
395        let store = AuthorityStore::open(
396            perpetual_tables,
397            &genesis,
398            &config,
399            &prometheus_registry,
400            migration_tx_data.as_ref(),
401        )
402        .await?;
403
404        let cur_epoch = store.get_recovery_epoch_at_restart()?;
405        let committee = committee_store
406            .get_committee(&cur_epoch)?
407            .expect("Committee of the current epoch must exist");
408        let epoch_start_configuration = store
409            .get_epoch_start_configuration()?
410            .expect("EpochStartConfiguration of the current epoch must exist");
411        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
412        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
413
414        let cache_traits = build_execution_cache(
415            &config.execution_cache_config,
416            &prometheus_registry,
417            &store,
418            backpressure_manager.clone(),
419        );
420
421        let auth_agg = {
422            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
423            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
424            Arc::new(ArcSwap::new(Arc::new(
425                AuthorityAggregator::new_from_epoch_start_state(
426                    epoch_start_configuration.epoch_start_state(),
427                    &committee_store,
428                    safe_client_metrics_base,
429                    auth_agg_metrics,
430                ),
431            )))
432        };
433
434        let chain = match config.chain_override_for_testing {
435            Some(chain) => chain,
436            None => chain_identifier.chain(),
437        };
438
439        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
440        let epoch_store = AuthorityPerEpochStore::new(
441            config.authority_public_key(),
442            committee.clone(),
443            &config.db_path().join("store"),
444            Some(epoch_options.options),
445            EpochMetrics::new(&registry_service.default_registry()),
446            epoch_start_configuration,
447            cache_traits.backing_package_store.clone(),
448            cache_metrics,
449            signature_verifier_metrics,
450            &config.expensive_safety_check_config,
451            (chain_identifier, chain),
452            checkpoint_store
453                .get_highest_executed_checkpoint_seq_number()
454                .expect("checkpoint store read cannot fail")
455                .unwrap_or(0),
456        )?;
457
458        info!("created epoch store");
459
460        replay_log!(
461            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
462            epoch_store.epoch(),
463            epoch_store.protocol_config()
464        );
465
466        // the database is empty at genesis time
467        if is_genesis {
468            info!("checking IOTA conservation at genesis");
469            // When we are opening the db table, the only time when it's safe to
470            // check IOTA conservation is at genesis. Otherwise we may be in the middle of
471            // an epoch and the IOTA conservation check will fail. This also initialize
472            // the expected_network_iota_amount table.
473            cache_traits
474                .reconfig_api
475                .try_expensive_check_iota_conservation(&epoch_store, None)
476                .expect("IOTA conservation check cannot fail at genesis");
477        }
478
479        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
480        let default_buffer_stake = epoch_store
481            .protocol_config()
482            .buffer_stake_for_protocol_upgrade_bps();
483        if effective_buffer_stake != default_buffer_stake {
484            warn!(
485                ?effective_buffer_stake,
486                ?default_buffer_stake,
487                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
488            );
489        }
490
491        checkpoint_store.insert_genesis_checkpoint(
492            genesis.checkpoint(),
493            genesis.checkpoint_contents().clone(),
494            &epoch_store,
495        );
496
497        // Database has everything from genesis, set corrupted key to 0
498        unmark_db_corruption(db_corrupted_path)?;
499
500        info!("creating state sync store");
501        let state_sync_store = RocksDbStore::new(
502            cache_traits.clone(),
503            committee_store.clone(),
504            checkpoint_store.clone(),
505        );
506
507        let index_store = if is_full_node && config.enable_index_processing {
508            info!("creating index store");
509            Some(Arc::new(IndexStore::new(
510                config.db_path().join("indexes"),
511                &prometheus_registry,
512                epoch_store
513                    .protocol_config()
514                    .max_move_identifier_len_as_option(),
515            )))
516        } else {
517            None
518        };
519
520        let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
521            Some(Arc::new(
522                GrpcIndexesStore::new(
523                    config.db_path().join(GRPC_INDEXES_DIR),
524                    Arc::clone(&store),
525                    &checkpoint_store,
526                )
527                .await,
528            ))
529        } else {
530            None
531        };
532
533        info!("creating archive reader");
534        // Create network
535        // TODO only configure validators as seed/preferred peers for validators and not
536        // for fullnodes once we've had a chance to re-work fullnode
537        // configuration generation.
538        let archive_readers =
539            ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
540        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
541        let (randomness_tx, randomness_rx) = mpsc::channel(
542            config
543                .p2p_config
544                .randomness
545                .clone()
546                .unwrap_or_default()
547                .mailbox_capacity(),
548        );
549        let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
550            Self::create_p2p_network(
551                &config,
552                state_sync_store.clone(),
553                chain_identifier,
554                trusted_peer_change_rx,
555                archive_readers.clone(),
556                randomness_tx,
557                &prometheus_registry,
558            )?;
559
560        // We must explicitly send this instead of relying on the initial value to
561        // trigger watch value change, so that state-sync is able to process it.
562        send_trusted_peer_change(
563            &config,
564            &trusted_peer_change_tx,
565            epoch_store.epoch_start_state(),
566        );
567
568        info!("start state archival");
569        // Start archiving local state to remote store
570        let state_archive_handle =
571            Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
572                .await?;
573
574        info!("start snapshot upload");
575        // Start uploading state snapshot to remote store
576        let state_snapshot_handle =
577            Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
578
579        let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
580
581        // Start uploading db checkpoints to remote store
582        info!("start db checkpoint");
583        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
584            &config,
585            &prometheus_registry,
586            state_snapshot_handle.is_some(),
587            Some(checkpoint_progress_tracker.clone()),
588        )?;
589
590        let mut genesis_objects = genesis.objects().to_vec();
591        if let Some(migration_tx_data) = migration_tx_data.as_ref() {
592            genesis_objects.extend(migration_tx_data.get_objects());
593        }
594
595        let authority_name = config.authority_public_key();
596        let validator_tx_finalizer =
597            config
598                .enable_validator_tx_finalizer
599                .then_some(Arc::new(ValidatorTxFinalizer::new(
600                    auth_agg.clone(),
601                    authority_name,
602                    &prometheus_registry,
603                )));
604
605        info!("create authority state");
606        let state = AuthorityState::new(
607            authority_name,
608            secret,
609            config.supported_protocol_versions.unwrap(),
610            store.clone(),
611            cache_traits.clone(),
612            epoch_store.clone(),
613            committee_store.clone(),
614            index_store.clone(),
615            grpc_indexes_store,
616            checkpoint_store.clone(),
617            &prometheus_registry,
618            &genesis_objects,
619            &db_checkpoint_config,
620            config.clone(),
621            archive_readers,
622            validator_tx_finalizer,
623            chain_identifier,
624            pruner_db,
625            Some(checkpoint_progress_tracker.clone()),
626            config.policy_config.clone(),
627            config.firewall_config.clone(),
628        )
629        .await;
630
631        // ensure genesis and migration txs were executed
632        if epoch_store.epoch() == 0 {
633            let genesis_tx = &genesis.transaction();
634            let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
635            // Execute genesis transaction
636            Self::execute_transaction_immediately_at_zero_epoch(
637                &state,
638                &epoch_store,
639                genesis_tx,
640                span,
641            )
642            .await;
643
644            // Execute migration transactions if present
645            if let Some(migration_tx_data) = migration_tx_data {
646                for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
647                    let span = error_span!("migration_txn", tx_digest = ?tx_digest);
648                    Self::execute_transaction_immediately_at_zero_epoch(
649                        &state,
650                        &epoch_store,
651                        tx,
652                        span,
653                    )
654                    .await;
655                }
656            }
657        }
658
659        // Start the loop that receives new randomness and generates transactions for
660        // it.
661        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
662
663        if config
664            .expensive_safety_check_config
665            .enable_secondary_index_checks()
666        {
667            if let Some(indexes) = state.indexes.clone() {
668                iota_core::verify_indexes::verify_indexes(
669                    state.get_global_state_hash_store().as_ref(),
670                    indexes,
671                )
672                .expect("secondary indexes are inconsistent");
673            }
674        }
675
676        let (end_of_epoch_channel, end_of_epoch_receiver) =
677            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
678
679        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
680            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
681                auth_agg.load_full(),
682                state.clone(),
683                end_of_epoch_receiver,
684                &config.db_path(),
685                &prometheus_registry,
686                Some(&config),
687            )))
688        } else {
689            None
690        };
691
692        let http_server = build_http_server(
693            state.clone(),
694            &transaction_orchestrator.clone(),
695            &config,
696            &prometheus_registry,
697        )
698        .await?;
699
700        let global_state_hasher = Arc::new(GlobalStateHasher::new(
701            cache_traits.global_state_hash_store.clone(),
702            GlobalStateHashMetrics::new(&prometheus_registry),
703        ));
704
705        let authority_names_to_peer_ids = epoch_store
706            .epoch_start_state()
707            .get_authority_names_to_peer_ids();
708
709        let network_connection_metrics =
710            NetworkConnectionMetrics::new("iota", &registry_service.default_registry());
711
712        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
713
714        let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
715            p2p_network.downgrade(),
716            network_connection_metrics,
717            HashMap::new(),
718            None,
719        );
720
721        let connection_monitor_status = ConnectionMonitorStatus {
722            connection_statuses,
723            authority_names_to_peer_ids,
724        };
725
726        let connection_monitor_status = Arc::new(connection_monitor_status);
727        let iota_node_metrics =
728            Arc::new(IotaNodeMetrics::new(&registry_service.default_registry()));
729
730        iota_node_metrics
731            .binary_max_protocol_version
732            .set(ProtocolVersion::MAX.as_u64() as i64);
733        iota_node_metrics
734            .configured_max_protocol_version
735            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
736
737        // Convert transaction orchestrator to executor trait object for gRPC server
738        // Note that the transaction_orchestrator (so as executor) will be None if it is
739        // a validator node or run_with_range is set
740        let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
741            transaction_orchestrator
742                .clone()
743                .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
744
745        let grpc_server_handle = build_grpc_server(
746            &config,
747            state.clone(),
748            state_sync_store.clone(),
749            executor,
750            &prometheus_registry,
751            server_version,
752        )
753        .await?;
754
755        let validator_components = if state.is_committee_validator(&epoch_store) {
756            let (components, _) = futures::join!(
757                Self::construct_validator_components(
758                    config.clone(),
759                    state.clone(),
760                    committee,
761                    epoch_store.clone(),
762                    checkpoint_store.clone(),
763                    state_sync_handle.clone(),
764                    randomness_handle.clone(),
765                    Arc::downgrade(&global_state_hasher),
766                    backpressure_manager.clone(),
767                    connection_monitor_status.clone(),
768                    &registry_service,
769                ),
770                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
771            );
772            let mut components = components?;
773
774            components.consensus_adapter.submit_recovered(&epoch_store);
775
776            // Start the gRPC server
777            components.validator_server_handle = components.validator_server_handle.start().await;
778
779            Some(components)
780        } else {
781            None
782        };
783
784        // setup shutdown channel
785        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
786
787        let node = Self {
788            config,
789            validator_components: Mutex::new(validator_components),
790            _http_server: http_server,
791            state,
792            transaction_orchestrator,
793            registry_service,
794            metrics: iota_node_metrics,
795
796            _discovery: discovery_handle,
797            state_sync_handle,
798            randomness_handle,
799            checkpoint_store,
800            global_state_hasher: Mutex::new(Some(global_state_hasher)),
801            end_of_epoch_channel,
802            connection_monitor_status,
803            trusted_peer_change_tx,
804            backpressure_manager,
805            checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
806
807            _db_checkpoint_handle: db_checkpoint_handle,
808
809            #[cfg(msim)]
810            sim_state: Default::default(),
811
812            _state_archive_handle: state_archive_handle,
813            _state_snapshot_uploader_handle: state_snapshot_handle,
814            shutdown_channel_tx: shutdown_channel,
815
816            grpc_server_handle: Mutex::new(grpc_server_handle),
817
818            auth_agg,
819        };
820
821        info!("IotaNode started!");
822        let node = Arc::new(node);
823        let node_copy = node.clone();
824        spawn_monitored_task!(async move {
825            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
826            if let Err(error) = result {
827                warn!("Reconfiguration finished with error {:?}", error);
828            }
829        });
830
831        node.checkpoint_progress_tracker
832            .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
833
834        Ok(node)
835    }
836
837    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
838        self.end_of_epoch_channel.subscribe()
839    }
840
841    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
842        self.shutdown_channel_tx.subscribe()
843    }
844
845    pub fn current_epoch_for_testing(&self) -> EpochId {
846        self.state.current_epoch_for_testing()
847    }
848
849    pub fn db_checkpoint_path(&self) -> PathBuf {
850        self.config.db_checkpoint_path()
851    }
852
853    // Init reconfig process by starting to reject user certs
854    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
855        info!("close_epoch (current epoch = {})", epoch_store.epoch());
856        self.validator_components
857            .lock()
858            .await
859            .as_ref()
860            .ok_or_else(|| IotaError::from("Node is not a validator"))?
861            .consensus_adapter
862            .close_epoch(epoch_store);
863        Ok(())
864    }
865
866    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
867        self.state
868            .clear_override_protocol_upgrade_buffer_stake(epoch)
869    }
870
871    pub fn set_override_protocol_upgrade_buffer_stake(
872        &self,
873        epoch: EpochId,
874        buffer_stake_bps: u64,
875    ) -> IotaResult {
876        self.state
877            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
878    }
879
880    // Testing-only API to start epoch close process.
881    // For production code, please use the non-testing version.
882    pub async fn close_epoch_for_testing(&self) -> IotaResult {
883        let epoch_store = self.state.epoch_store_for_testing();
884        self.close_epoch(&epoch_store).await
885    }
886
887    async fn start_state_archival(
888        config: &NodeConfig,
889        prometheus_registry: &Registry,
890        state_sync_store: RocksDbStore,
891    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
892        if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
893            let local_store_config = ObjectStoreConfig {
894                object_store: Some(ObjectStoreType::File),
895                directory: Some(config.archive_path()),
896                ..Default::default()
897            };
898            let archive_writer = ArchiveWriter::new(
899                local_store_config,
900                remote_store_config.clone(),
901                FileCompression::Zstd,
902                StorageFormat::Blob,
903                Duration::from_secs(600),
904                256 * 1024 * 1024,
905                prometheus_registry,
906            )
907            .await?;
908            Ok(Some(archive_writer.start(state_sync_store).await?))
909        } else {
910            Ok(None)
911        }
912    }
913
914    /// Creates an StateSnapshotUploader and start it if the StateSnapshotConfig
915    /// is set.
916    fn start_state_snapshot(
917        config: &NodeConfig,
918        prometheus_registry: &Registry,
919        checkpoint_store: Arc<CheckpointStore>,
920    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
921        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
922            let snapshot_uploader = StateSnapshotUploader::new(
923                &config.db_checkpoint_path(),
924                &config.snapshot_path(),
925                remote_store_config.clone(),
926                60,
927                prometheus_registry,
928                checkpoint_store,
929            )?;
930            Ok(Some(snapshot_uploader.start()))
931        } else {
932            Ok(None)
933        }
934    }
935
936    fn start_db_checkpoint(
937        config: &NodeConfig,
938        prometheus_registry: &Registry,
939        state_snapshot_enabled: bool,
940        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
941    ) -> Result<(
942        DBCheckpointConfig,
943        Option<tokio::sync::broadcast::Sender<()>>,
944    )> {
945        let checkpoint_path = Some(
946            config
947                .db_checkpoint_config
948                .checkpoint_path
949                .clone()
950                .unwrap_or_else(|| config.db_checkpoint_path()),
951        );
952        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
953            DBCheckpointConfig {
954                checkpoint_path,
955                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
956                    true
957                } else {
958                    config
959                        .db_checkpoint_config
960                        .perform_db_checkpoints_at_epoch_end
961                },
962                ..config.db_checkpoint_config.clone()
963            }
964        } else {
965            config.db_checkpoint_config.clone()
966        };
967
968        match (
969            db_checkpoint_config.object_store_config.as_ref(),
970            state_snapshot_enabled,
971        ) {
972            // If db checkpoint config object store not specified but
973            // state snapshot object store is specified, create handler
974            // anyway for marking db checkpoints as completed so that they
975            // can be uploaded as state snapshots.
976            (None, false) => Ok((db_checkpoint_config, None)),
977            (_, _) => {
978                let handler = DBCheckpointHandler::new(
979                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
980                    db_checkpoint_config.object_store_config.as_ref(),
981                    60,
982                    db_checkpoint_config
983                        .prune_and_compact_before_upload
984                        .unwrap_or(true),
985                    config.authority_store_pruning_config.clone(),
986                    prometheus_registry,
987                    state_snapshot_enabled,
988                    checkpoint_progress_tracker,
989                )?;
990                Ok((
991                    db_checkpoint_config,
992                    Some(DBCheckpointHandler::start(handler)),
993                ))
994            }
995        }
996    }
997
998    fn create_p2p_network(
999        config: &NodeConfig,
1000        state_sync_store: RocksDbStore,
1001        chain_identifier: ChainIdentifier,
1002        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1003        archive_readers: ArchiveReaderBalancer,
1004        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1005        prometheus_registry: &Registry,
1006    ) -> Result<(
1007        Network,
1008        discovery::Handle,
1009        state_sync::Handle,
1010        randomness::Handle,
1011    )> {
1012        let (state_sync, state_sync_server) = state_sync::Builder::new()
1013            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1014            .store(state_sync_store)
1015            .archive_readers(archive_readers)
1016            .with_metrics(prometheus_registry)
1017            .build();
1018
1019        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1020            .config(config.p2p_config.clone())
1021            .build();
1022
1023        let (randomness, randomness_router) =
1024            randomness::Builder::new(config.authority_public_key(), randomness_tx)
1025                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1026                .with_metrics(prometheus_registry)
1027                .build();
1028
1029        let p2p_network = {
1030            let routes = anemo::Router::new()
1031                .add_rpc_service(discovery_server)
1032                .add_rpc_service(state_sync_server);
1033            let routes = routes.merge(randomness_router);
1034
1035            let inbound_network_metrics =
1036                NetworkMetrics::new("iota", "inbound", prometheus_registry);
1037            let outbound_network_metrics =
1038                NetworkMetrics::new("iota", "outbound", prometheus_registry);
1039
1040            let service = ServiceBuilder::new()
1041                .layer(
1042                    TraceLayer::new_for_server_errors()
1043                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1044                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1045                )
1046                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1047                    Arc::new(inbound_network_metrics),
1048                    config.p2p_config.excessive_message_size(),
1049                )))
1050                .service(routes);
1051
1052            let outbound_layer = ServiceBuilder::new()
1053                .layer(
1054                    TraceLayer::new_for_client_and_server_errors()
1055                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1056                        .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1057                )
1058                .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1059                    Arc::new(outbound_network_metrics),
1060                    config.p2p_config.excessive_message_size(),
1061                )))
1062                .into_inner();
1063
1064            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1065            // Set the max_frame_size to be 1 GB to work around the issue of there being too
1066            // many staking events in the epoch change txn.
1067            anemo_config.max_frame_size = Some(1 << 30);
1068
1069            // Set a higher default value for socket send/receive buffers if not already
1070            // configured.
1071            let mut quic_config = anemo_config.quic.unwrap_or_default();
1072            if quic_config.socket_send_buffer_size.is_none() {
1073                quic_config.socket_send_buffer_size = Some(20 << 20);
1074            }
1075            if quic_config.socket_receive_buffer_size.is_none() {
1076                quic_config.socket_receive_buffer_size = Some(20 << 20);
1077            }
1078            quic_config.allow_failed_socket_buffer_size_setting = true;
1079
1080            // Set high-performance defaults for quinn transport.
1081            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1082            if quic_config.max_concurrent_bidi_streams.is_none() {
1083                quic_config.max_concurrent_bidi_streams = Some(500);
1084            }
1085            if quic_config.max_concurrent_uni_streams.is_none() {
1086                quic_config.max_concurrent_uni_streams = Some(500);
1087            }
1088            if quic_config.stream_receive_window.is_none() {
1089                quic_config.stream_receive_window = Some(100 << 20);
1090            }
1091            if quic_config.receive_window.is_none() {
1092                quic_config.receive_window = Some(200 << 20);
1093            }
1094            if quic_config.send_window.is_none() {
1095                quic_config.send_window = Some(200 << 20);
1096            }
1097            if quic_config.crypto_buffer_size.is_none() {
1098                quic_config.crypto_buffer_size = Some(1 << 20);
1099            }
1100            if quic_config.max_idle_timeout_ms.is_none() {
1101                quic_config.max_idle_timeout_ms = Some(30_000);
1102            }
1103            if quic_config.keep_alive_interval_ms.is_none() {
1104                quic_config.keep_alive_interval_ms = Some(5_000);
1105            }
1106            anemo_config.quic = Some(quic_config);
1107
1108            let server_name = format!("iota-{chain_identifier}");
1109            let network = Network::bind(config.p2p_config.listen_address)
1110                .server_name(&server_name)
1111                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1112                .config(anemo_config)
1113                .outbound_request_layer(outbound_layer)
1114                .start(service)?;
1115            info!(
1116                server_name = server_name,
1117                "P2p network started on {}",
1118                network.local_addr()
1119            );
1120
1121            network
1122        };
1123
1124        let discovery_handle =
1125            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1126        let state_sync_handle = state_sync.start(p2p_network.clone());
1127        let randomness_handle = randomness.start(p2p_network.clone());
1128
1129        Ok((
1130            p2p_network,
1131            discovery_handle,
1132            state_sync_handle,
1133            randomness_handle,
1134        ))
1135    }
1136
1137    /// Asynchronously constructs and initializes the components necessary for
1138    /// the validator node.
1139    async fn construct_validator_components(
1140        config: NodeConfig,
1141        state: Arc<AuthorityState>,
1142        committee: Arc<Committee>,
1143        epoch_store: Arc<AuthorityPerEpochStore>,
1144        checkpoint_store: Arc<CheckpointStore>,
1145        state_sync_handle: state_sync::Handle,
1146        randomness_handle: randomness::Handle,
1147        global_state_hasher: Weak<GlobalStateHasher>,
1148        backpressure_manager: Arc<BackpressureManager>,
1149        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1150        registry_service: &RegistryService,
1151    ) -> Result<ValidatorComponents> {
1152        let mut config_clone = config.clone();
1153        let consensus_config = config_clone
1154            .consensus_config
1155            .as_mut()
1156            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1157        let validator_registry = Registry::new();
1158        let validator_registry_id = registry_service.add(validator_registry.clone());
1159
1160        let client = Arc::new(UpdatableConsensusClient::new());
1161        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1162            &committee,
1163            consensus_config,
1164            state.name,
1165            connection_monitor_status.clone(),
1166            &validator_registry,
1167            client.clone(),
1168            checkpoint_store.clone(),
1169        ));
1170        let consensus_manager = ConsensusManager::new(
1171            &config,
1172            consensus_config,
1173            registry_service,
1174            &validator_registry,
1175            client,
1176        );
1177
1178        // This only gets started up once, not on every epoch. (Make call to remove
1179        // every epoch.)
1180        let consensus_store_pruner = ConsensusStorePruner::new(
1181            consensus_manager.get_storage_base_path(),
1182            consensus_config.db_retention_epochs(),
1183            consensus_config.db_pruner_period(),
1184            &validator_registry,
1185        );
1186
1187        let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1188        let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1189
1190        let validator_server_handle = Self::start_grpc_validator_service(
1191            &config,
1192            state.clone(),
1193            consensus_adapter.clone(),
1194            &validator_registry,
1195        )
1196        .await?;
1197
1198        // Starts an overload monitor that monitors the execution of the authority.
1199        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1200        let validator_overload_monitor_handle = if config
1201            .authority_overload_config
1202            .max_load_shedding_percentage
1203            > 0
1204        {
1205            let authority_state = Arc::downgrade(&state);
1206            let overload_config = config.authority_overload_config.clone();
1207            fail_point!("starting_overload_monitor");
1208            Some(spawn_monitored_task!(overload_monitor(
1209                authority_state,
1210                overload_config,
1211            )))
1212        } else {
1213            None
1214        };
1215
1216        Self::start_epoch_specific_validator_components(
1217            &config,
1218            state.clone(),
1219            consensus_adapter,
1220            checkpoint_store,
1221            epoch_store,
1222            state_sync_handle,
1223            randomness_handle,
1224            consensus_manager,
1225            consensus_store_pruner,
1226            global_state_hasher,
1227            backpressure_manager,
1228            validator_server_handle,
1229            validator_overload_monitor_handle,
1230            checkpoint_metrics,
1231            iota_tx_validator_metrics,
1232            validator_registry_id,
1233        )
1234        .await
1235    }
1236
1237    /// Initializes and starts components specific to the current
1238    /// epoch for the validator node.
1239    async fn start_epoch_specific_validator_components(
1240        config: &NodeConfig,
1241        state: Arc<AuthorityState>,
1242        consensus_adapter: Arc<ConsensusAdapter>,
1243        checkpoint_store: Arc<CheckpointStore>,
1244        epoch_store: Arc<AuthorityPerEpochStore>,
1245        state_sync_handle: state_sync::Handle,
1246        randomness_handle: randomness::Handle,
1247        consensus_manager: ConsensusManager,
1248        consensus_store_pruner: ConsensusStorePruner,
1249        global_state_hasher: Weak<GlobalStateHasher>,
1250        backpressure_manager: Arc<BackpressureManager>,
1251        validator_server_handle: SpawnOnce,
1252        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1253        checkpoint_metrics: Arc<CheckpointMetrics>,
1254        iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1255        validator_registry_id: RegistryID,
1256    ) -> Result<ValidatorComponents> {
1257        let checkpoint_service = Self::build_checkpoint_service(
1258            config,
1259            consensus_adapter.clone(),
1260            checkpoint_store.clone(),
1261            epoch_store.clone(),
1262            state.clone(),
1263            state_sync_handle,
1264            global_state_hasher,
1265            checkpoint_metrics.clone(),
1266        );
1267
1268        // create a new map that gets injected into both the consensus handler and the
1269        // consensus adapter the consensus handler will write values forwarded
1270        // from consensus, and the consensus adapter will read the values to
1271        // make decisions about which validator submits a transaction to consensus
1272        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1273
1274        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1275
1276        let randomness_manager = RandomnessManager::try_new(
1277            Arc::downgrade(&epoch_store),
1278            Box::new(consensus_adapter.clone()),
1279            randomness_handle,
1280            config.authority_key_pair(),
1281        )
1282        .await;
1283        if let Some(randomness_manager) = randomness_manager {
1284            epoch_store
1285                .set_randomness_manager(randomness_manager)
1286                .await?;
1287        }
1288
1289        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1290            state.clone(),
1291            checkpoint_service.clone(),
1292            epoch_store.clone(),
1293            low_scoring_authorities,
1294            backpressure_manager,
1295        );
1296
1297        info!("Starting consensus manager");
1298
1299        consensus_manager
1300            .start(
1301                config,
1302                epoch_store.clone(),
1303                consensus_handler_initializer,
1304                IotaTxValidator::new(
1305                    epoch_store.clone(),
1306                    checkpoint_service.clone(),
1307                    state.transaction_manager().clone(),
1308                    iota_tx_validator_metrics.clone(),
1309                ),
1310            )
1311            .await;
1312        let consensus_replay_waiter = consensus_manager.replay_waiter();
1313
1314        info!("Spawning checkpoint service");
1315        let checkpoint_service_tasks = checkpoint_service.spawn(consensus_replay_waiter).await;
1316
1317        Ok(ValidatorComponents {
1318            validator_server_handle,
1319            validator_overload_monitor_handle,
1320            consensus_manager,
1321            consensus_store_pruner,
1322            consensus_adapter,
1323            checkpoint_service_tasks,
1324            checkpoint_metrics,
1325            iota_tx_validator_metrics,
1326            validator_registry_id,
1327        })
1328    }
1329
1330    /// Starts the checkpoint service for the validator node, initializing
1331    /// necessary components and settings.
1332    /// The function ensures proper initialization of the checkpoint service,
1333    /// preparing it to handle checkpoint creation and submission to consensus,
1334    /// while also setting up the necessary monitoring and synchronization
1335    /// mechanisms.
1336    fn build_checkpoint_service(
1337        config: &NodeConfig,
1338        consensus_adapter: Arc<ConsensusAdapter>,
1339        checkpoint_store: Arc<CheckpointStore>,
1340        epoch_store: Arc<AuthorityPerEpochStore>,
1341        state: Arc<AuthorityState>,
1342        state_sync_handle: state_sync::Handle,
1343        global_state_hasher: Weak<GlobalStateHasher>,
1344        checkpoint_metrics: Arc<CheckpointMetrics>,
1345    ) -> Arc<CheckpointService> {
1346        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1347        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1348
1349        debug!(
1350            "Starting checkpoint service with epoch start timestamp {}
1351            and epoch duration {}",
1352            epoch_start_timestamp_ms, epoch_duration_ms
1353        );
1354
1355        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1356            sender: consensus_adapter,
1357            signer: state.secret.clone(),
1358            authority: config.authority_public_key(),
1359            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1360                .checked_add(epoch_duration_ms)
1361                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1362            metrics: checkpoint_metrics.clone(),
1363        });
1364
1365        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1366        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1367        let max_checkpoint_size_bytes =
1368            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1369
1370        CheckpointService::build(
1371            state.clone(),
1372            checkpoint_store,
1373            epoch_store,
1374            state.get_transaction_cache_reader().clone(),
1375            global_state_hasher,
1376            checkpoint_output,
1377            Box::new(certified_checkpoint_output),
1378            checkpoint_metrics,
1379            max_tx_per_checkpoint,
1380            max_checkpoint_size_bytes,
1381        )
1382    }
1383
1384    fn construct_consensus_adapter(
1385        committee: &Committee,
1386        consensus_config: &ConsensusConfig,
1387        authority: AuthorityName,
1388        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1389        prometheus_registry: &Registry,
1390        consensus_client: Arc<dyn ConsensusClient>,
1391        checkpoint_store: Arc<CheckpointStore>,
1392    ) -> ConsensusAdapter {
1393        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1394        // The consensus adapter allows the authority to send user certificates through
1395        // consensus.
1396
1397        ConsensusAdapter::new(
1398            consensus_client,
1399            checkpoint_store,
1400            authority,
1401            connection_monitor_status,
1402            consensus_config.max_pending_transactions(),
1403            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1404            consensus_config.max_submit_position,
1405            consensus_config.submit_delay_step_override(),
1406            ca_metrics,
1407        )
1408    }
1409
1410    async fn start_grpc_validator_service(
1411        config: &NodeConfig,
1412        state: Arc<AuthorityState>,
1413        consensus_adapter: Arc<ConsensusAdapter>,
1414        prometheus_registry: &Registry,
1415    ) -> Result<SpawnOnce> {
1416        let validator_service = ValidatorService::new(
1417            state,
1418            consensus_adapter,
1419            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1420            config.policy_config.clone().map(|p| p.client_id_source),
1421        );
1422
1423        let mut server_conf = iota_network_stack::config::Config::new();
1424        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1425        server_conf.load_shed = config.grpc_load_shed;
1426        let server_builder =
1427            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1428                .add_service(ValidatorServer::new(validator_service.clone()))
1429                .add_service(ValidatorV2Server::new(validator_service.clone()))
1430                .add_service(ValidatorPeerServer::new(validator_service));
1431
1432        let tls_config = iota_tls::create_rustls_server_config(
1433            config.network_key_pair().copy().private(),
1434            IOTA_TLS_SERVER_NAME.to_string(),
1435        );
1436
1437        let network_address = config.network_address().clone();
1438
1439        let bind_future = async move {
1440            let server = server_builder
1441                .bind(&network_address, Some(tls_config))
1442                .await
1443                .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1444
1445            let local_addr = server.local_addr();
1446            info!("Listening to traffic on {local_addr}");
1447
1448            Ok(server)
1449        };
1450
1451        Ok(SpawnOnce::new(bind_future))
1452    }
1453
1454    /// Re-executes pending consensus certificates, which may not have been
1455    /// committed to disk before the node restarted. This is necessary for
1456    /// the following reasons:
1457    ///
1458    /// 1. For any transaction for which we returned signed effects to a client,
1459    ///    we must ensure that we have re-executed the transaction before we
1460    ///    begin accepting grpc requests. Otherwise we would appear to have
1461    ///    forgotten about the transaction.
1462    /// 2. While this is running, we are concurrently waiting for all previously
1463    ///    built checkpoints to be rebuilt. Since there may be dependencies in
1464    ///    either direction (from checkpointed consensus transactions to pending
1465    ///    consensus transactions, or vice versa), we must re-execute pending
1466    ///    consensus transactions to ensure that both processes can complete.
1467    /// 3. Also note that for any pending consensus transactions for which we
1468    ///    wrote a signed effects digest to disk, we must re-execute using that
1469    ///    digest as the expected effects digest, to ensure that we cannot
1470    ///    arrive at different effects than what we previously signed.
1471    async fn reexecute_pending_consensus_certs(
1472        epoch_store: &Arc<AuthorityPerEpochStore>,
1473        state: &Arc<AuthorityState>,
1474    ) {
1475        let mut pending_consensus_certificates = Vec::new();
1476        let mut additional_certs = Vec::new();
1477
1478        for tx in epoch_store.get_all_pending_consensus_transactions() {
1479            match tx.kind {
1480                // TODO: what to do with UserTransactionV1 here? It seems like this only applies to
1481                //  optimistically executed owned-object transactions that possibly didn't go
1482                //  through  consensus before the node restarted. UserTransactionsV1
1483                //  always needs to go  through consensus, so it will be replayed
1484                //  there, just like shared object  transactions.
1485                //
1486                // Shared object txns
1487                // cannot be re-executed at this  point, because we must wait for
1488                // consensus replay to assign shared  object versions.
1489                ConsensusTransactionKind::CertifiedTransaction(tx)
1490                    if !tx.contains_shared_object() =>
1491                {
1492                    let tx = *tx;
1493                    // new_unchecked is safe because we never submit a transaction to consensus
1494                    // without verifying it
1495                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1496                        VerifiedCertificate::new_unchecked(tx),
1497                    );
1498                    // we only need to re-execute if we previously signed the effects (which
1499                    // indicates we returned the effects to a client).
1500                    if let Some(fx_digest) = epoch_store
1501                        .get_signed_effects_digest(tx.digest())
1502                        .expect("db error")
1503                    {
1504                        pending_consensus_certificates.push((tx, fx_digest));
1505                    } else {
1506                        additional_certs.push(tx);
1507                    }
1508                }
1509                _ => (),
1510            }
1511        }
1512
1513        let digests = pending_consensus_certificates
1514            .iter()
1515            .map(|(tx, _)| *tx.digest())
1516            .collect::<Vec<_>>();
1517
1518        info!(
1519            "reexecuting {} pending consensus certificates: {:?}",
1520            digests.len(),
1521            digests
1522        );
1523
1524        state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1525        state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1526
1527        // If this times out, the validator will still almost certainly start up fine.
1528        // But, it is possible that it may temporarily "forget" about
1529        // transactions that it had previously executed. This could confuse
1530        // clients in some circumstances. However, the transactions are still in
1531        // pending_consensus_certificates, so we cannot lose any finality guarantees.
1532        let timeout = if cfg!(msim) { 120 } else { 60 };
1533        if tokio::time::timeout(
1534            std::time::Duration::from_secs(timeout),
1535            state
1536                .get_transaction_cache_reader()
1537                .try_notify_read_executed_effects_digests(&digests),
1538        )
1539        .await
1540        .is_err()
1541        {
1542            // Log all the digests that were not executed to help debugging.
1543            if let Ok(executed_effects_digests) = state
1544                .get_transaction_cache_reader()
1545                .try_multi_get_executed_effects_digests(&digests)
1546            {
1547                let pending_digests = digests
1548                    .iter()
1549                    .zip(executed_effects_digests.iter())
1550                    .filter_map(|(digest, executed_effects_digest)| {
1551                        if executed_effects_digest.is_none() {
1552                            Some(digest)
1553                        } else {
1554                            None
1555                        }
1556                    })
1557                    .collect::<Vec<_>>();
1558                debug_fatal!(
1559                    "Timed out waiting for effects digests to be executed: {:?}",
1560                    pending_digests
1561                );
1562            } else {
1563                debug_fatal!(
1564                    "Timed out waiting for effects digests to be executed, digests not found"
1565                );
1566            }
1567        }
1568    }
1569
1570    pub fn state(&self) -> Arc<AuthorityState> {
1571        self.state.clone()
1572    }
1573
1574    // Only used for testing because of how epoch store is loaded.
1575    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1576        self.state.reference_gas_price_for_testing()
1577    }
1578
1579    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1580        self.state.committee_store().clone()
1581    }
1582
1583    // pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1584    // self.state.db()
1585    // }
1586
1587    /// Clone an AuthorityAggregator currently used in this node's
1588    /// QuorumDriver, if the node is a fullnode. After reconfig,
1589    /// QuorumDriver builds a new AuthorityAggregator. The caller
1590    /// of this function will mostly likely want to call this again
1591    /// to get a fresh one.
1592    pub fn clone_authority_aggregator(
1593        &self,
1594    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1595        self.transaction_orchestrator
1596            .as_ref()
1597            .map(|to| to.clone_authority_aggregator())
1598    }
1599
1600    pub fn transaction_orchestrator(
1601        &self,
1602    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1603        self.transaction_orchestrator.clone()
1604    }
1605
1606    pub fn subscribe_to_transaction_orchestrator_effects(
1607        &self,
1608    ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1609        self.transaction_orchestrator
1610            .as_ref()
1611            .map(|to| to.subscribe_to_effects_queue())
1612            .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1613    }
1614
1615    /// This function awaits the completion of checkpoint execution of the
1616    /// current epoch, after which it initiates reconfiguration of the
1617    /// entire system. This function also handles role changes for the node when
1618    /// epoch changes and advertises capabilities to the committee if the node
1619    /// is a validator.
1620    pub async fn monitor_reconfiguration(
1621        self: Arc<Self>,
1622        mut epoch_store: Arc<AuthorityPerEpochStore>,
1623    ) -> Result<()> {
1624        let checkpoint_executor_metrics =
1625            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1626
1627        loop {
1628            let mut hasher_guard = self.global_state_hasher.lock().await;
1629            let hasher = hasher_guard.take().unwrap();
1630            info!(
1631                "Creating checkpoint executor for epoch {}",
1632                epoch_store.epoch()
1633            );
1634
1635            // Create closures that handle gRPC type conversion
1636            let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1637                guard.as_ref().map(|handle| {
1638                    let tx = handle.checkpoint_data_broadcaster().clone();
1639                    Box::new(move |data: &CheckpointData| {
1640                        tx.send_traced(data);
1641                    }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1642                })
1643            } else {
1644                None
1645            };
1646
1647            let checkpoint_executor = CheckpointExecutor::new(
1648                epoch_store.clone(),
1649                self.checkpoint_store.clone(),
1650                self.state.clone(),
1651                hasher.clone(),
1652                self.backpressure_manager.clone(),
1653                self.config.checkpoint_executor_config.clone(),
1654                checkpoint_executor_metrics.clone(),
1655                data_sender,
1656                Some(self.checkpoint_progress_tracker.clone()),
1657            );
1658
1659            let run_with_range = self.config.run_with_range;
1660
1661            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1662
1663            // Update the current protocol version metric.
1664            self.metrics
1665                .current_protocol_version
1666                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1667
1668            // Advertise capabilities to committee, if we are a validator.
1669            if let Some(components) = &*self.validator_components.lock().await {
1670                // TODO: without this sleep, the consensus message is not delivered reliably.
1671                tokio::time::sleep(Duration::from_millis(1)).await;
1672
1673                let config = cur_epoch_store.protocol_config();
1674                let binary_config = to_binary_config(config);
1675                let transaction = ConsensusTransaction::new_capability_notification_v1(
1676                    AuthorityCapabilitiesV1::new(
1677                        self.state.name,
1678                        cur_epoch_store.get_chain(),
1679                        self.config
1680                            .supported_protocol_versions
1681                            .expect("Supported versions should be populated")
1682                            // no need to send digests of versions less than the current version
1683                            .truncate_below(config.version),
1684                        self.state
1685                            .get_available_system_packages(&binary_config)
1686                            .await,
1687                    ),
1688                );
1689                info!(?transaction, "submitting capabilities to consensus");
1690                components
1691                    .consensus_adapter
1692                    .submit(transaction, None, &cur_epoch_store)?;
1693            } else if self.state.is_active_validator(&cur_epoch_store)
1694                && cur_epoch_store
1695                    .protocol_config()
1696                    .track_non_committee_eligible_validators()
1697            {
1698                // Send signed capabilities to committee validators if we are a non-committee
1699                // validator in a separate task to not block the caller. Sending is done only if
1700                // the feature flag supporting it is enabled.
1701                let epoch_store = cur_epoch_store.clone();
1702                let node_clone = self.clone();
1703                spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1704                    node_clone
1705                        .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1706                        .instrument(trace_span!(
1707                            "send_signed_capability_notification_to_committee_with_retry"
1708                        ))
1709                        .await;
1710                }));
1711            }
1712
1713            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1714
1715            if stop_condition == StopReason::RunWithRangeCondition {
1716                IotaNode::shutdown(&self).await;
1717                self.shutdown_channel_tx
1718                    .send(run_with_range)
1719                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1720                return Ok(());
1721            }
1722
1723            // Safe to call because we are in the middle of reconfiguration.
1724            let latest_system_state = self
1725                .state
1726                .get_object_cache_reader()
1727                .try_get_iota_system_state_object_unsafe()
1728                .expect("Read IOTA System State object cannot fail");
1729
1730            #[cfg(msim)]
1731            if !self
1732                .sim_state
1733                .sim_safe_mode_expected
1734                .load(Ordering::Relaxed)
1735            {
1736                debug_assert!(!latest_system_state.safe_mode());
1737            }
1738
1739            #[cfg(not(msim))]
1740            debug_assert!(!latest_system_state.safe_mode());
1741
1742            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1743                if self.state.is_fullnode(&cur_epoch_store) {
1744                    warn!(
1745                        "Failed to send end of epoch notification to subscriber: {:?}",
1746                        err
1747                    );
1748                }
1749            }
1750
1751            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1752            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1753
1754            self.auth_agg.store(Arc::new(
1755                self.auth_agg
1756                    .load()
1757                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1758            ));
1759
1760            let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1761            let next_epoch = next_epoch_committee.epoch();
1762            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1763
1764            info!(
1765                next_epoch,
1766                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1767            );
1768
1769            fail_point_async!("reconfig_delay");
1770
1771            // We save the connection monitor status map regardless of validator / fullnode
1772            // status so that we don't need to restart the connection monitor
1773            // every epoch. Update the mappings that will be used by the
1774            // consensus adapter if it exists or is about to be created.
1775            let authority_names_to_peer_ids =
1776                new_epoch_start_state.get_authority_names_to_peer_ids();
1777            self.connection_monitor_status
1778                .update_mapping_for_epoch(authority_names_to_peer_ids);
1779
1780            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1781
1782            send_trusted_peer_change(
1783                &self.config,
1784                &self.trusted_peer_change_tx,
1785                &new_epoch_start_state,
1786            );
1787
1788            let mut validator_components_lock_guard = self.validator_components.lock().await;
1789
1790            // The following code handles 4 different cases, depending on whether the node
1791            // was a validator in the previous epoch, and whether the node is a validator
1792            // in the new epoch.
1793            let new_epoch_store = self
1794                .reconfigure_state(
1795                    &self.state,
1796                    &cur_epoch_store,
1797                    next_epoch_committee.clone(),
1798                    new_epoch_start_state,
1799                    hasher.clone(),
1800                )
1801                .await?;
1802
1803            let new_validator_components = if let Some(ValidatorComponents {
1804                validator_server_handle,
1805                validator_overload_monitor_handle,
1806                consensus_manager,
1807                consensus_store_pruner,
1808                consensus_adapter,
1809                mut checkpoint_service_tasks,
1810                checkpoint_metrics,
1811                iota_tx_validator_metrics,
1812                validator_registry_id,
1813            }) = validator_components_lock_guard.take()
1814            {
1815                info!("Reconfiguring the validator.");
1816                // Cancel the old checkpoint service tasks.
1817                // Waiting for checkpoint builder to finish gracefully is not possible, because
1818                // it may wait on transactions while consensus on peers have
1819                // already shut down.
1820                checkpoint_service_tasks.abort_all();
1821                while let Some(result) = checkpoint_service_tasks.join_next().await {
1822                    if let Err(err) = result {
1823                        if err.is_panic() {
1824                            std::panic::resume_unwind(err.into_panic());
1825                        }
1826                        warn!("Error in checkpoint service task: {:?}", err);
1827                    }
1828                }
1829                info!("Checkpoint service has shut down.");
1830
1831                consensus_manager.shutdown().await;
1832                info!("Consensus has shut down.");
1833
1834                info!("Epoch store finished reconfiguration.");
1835
1836                // No other components should be holding a strong reference to state hasher
1837                // at this point. Confirm here before we swap in the new hasher.
1838                let global_state_hasher_metrics = Arc::into_inner(hasher)
1839                    .expect("Object state hasher should have no other references at this point")
1840                    .metrics();
1841                let new_hasher = Arc::new(GlobalStateHasher::new(
1842                    self.state.get_global_state_hash_store().clone(),
1843                    global_state_hasher_metrics,
1844                ));
1845                let weak_hasher = Arc::downgrade(&new_hasher);
1846                *hasher_guard = Some(new_hasher);
1847
1848                consensus_store_pruner.prune(next_epoch).await;
1849
1850                if self.state.is_committee_validator(&new_epoch_store) {
1851                    // Only restart consensus if this node is still a validator in the new epoch.
1852                    Some(
1853                        Self::start_epoch_specific_validator_components(
1854                            &self.config,
1855                            self.state.clone(),
1856                            consensus_adapter,
1857                            self.checkpoint_store.clone(),
1858                            new_epoch_store.clone(),
1859                            self.state_sync_handle.clone(),
1860                            self.randomness_handle.clone(),
1861                            consensus_manager,
1862                            consensus_store_pruner,
1863                            weak_hasher,
1864                            self.backpressure_manager.clone(),
1865                            validator_server_handle,
1866                            validator_overload_monitor_handle,
1867                            checkpoint_metrics,
1868                            iota_tx_validator_metrics,
1869                            validator_registry_id,
1870                        )
1871                        .await?,
1872                    )
1873                } else {
1874                    info!("This node is no longer a validator after reconfiguration");
1875                    if self.registry_service.remove(validator_registry_id) {
1876                        debug!("Removed validator metrics registry");
1877                    } else {
1878                        warn!("Failed to remove validator metrics registry");
1879                    }
1880                    validator_server_handle.shutdown();
1881                    debug!("Validator grpc server shutdown triggered");
1882
1883                    None
1884                }
1885            } else {
1886                // No other components should be holding a strong reference to state hasher
1887                // at this point. Confirm here before we swap in the new hasher.
1888                let global_state_hasher_metrics = Arc::into_inner(hasher)
1889                    .expect("Object state hasher should have no other references at this point")
1890                    .metrics();
1891                let new_hasher = Arc::new(GlobalStateHasher::new(
1892                    self.state.get_global_state_hash_store().clone(),
1893                    global_state_hasher_metrics,
1894                ));
1895                let weak_hasher = Arc::downgrade(&new_hasher);
1896                *hasher_guard = Some(new_hasher);
1897
1898                if self.state.is_committee_validator(&new_epoch_store) {
1899                    info!("Promoting the node from fullnode to validator, starting grpc server");
1900
1901                    let mut components = Self::construct_validator_components(
1902                        self.config.clone(),
1903                        self.state.clone(),
1904                        Arc::new(next_epoch_committee.clone()),
1905                        new_epoch_store.clone(),
1906                        self.checkpoint_store.clone(),
1907                        self.state_sync_handle.clone(),
1908                        self.randomness_handle.clone(),
1909                        weak_hasher,
1910                        self.backpressure_manager.clone(),
1911                        self.connection_monitor_status.clone(),
1912                        &self.registry_service,
1913                    )
1914                    .await?;
1915
1916                    components.validator_server_handle =
1917                        components.validator_server_handle.start().await;
1918
1919                    Some(components)
1920                } else {
1921                    None
1922                }
1923            };
1924            *validator_components_lock_guard = new_validator_components;
1925
1926            // Force releasing current epoch store DB handle, because the
1927            // Arc<AuthorityPerEpochStore> may linger.
1928            cur_epoch_store.release_db_handles();
1929
1930            // Drop the old epoch store to free its in-memory structures
1931            // (ConsensusOutputCache, ConsensusQuarantine, DashMaps, etc.).
1932            // The DB tables were already released above.
1933            drop(cur_epoch_store);
1934
1935            // Prune old epoch databases after each epoch transition to prevent
1936            // accumulation of RocksDB instances during fast catch-up sync
1937            // (e.g. syncing from genesis).
1938            self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
1939
1940            if cfg!(msim)
1941                && !matches!(
1942                    self.config
1943                        .authority_store_pruning_config
1944                        .num_epochs_to_retain_for_checkpoints(),
1945                    None | Some(u64::MAX) | Some(0)
1946                )
1947            {
1948                self.state
1949                    .prune_checkpoints_for_eligible_epochs_for_testing(
1950                        self.config.clone(),
1951                        iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1952                    )
1953                    .await?;
1954            }
1955
1956            epoch_store = new_epoch_store;
1957            info!("Reconfiguration finished");
1958        }
1959    }
1960
1961    async fn shutdown(&self) {
1962        if let Some(validator_components) = &*self.validator_components.lock().await {
1963            validator_components.consensus_manager.shutdown().await;
1964        }
1965
1966        // Shutdown the gRPC server if it's running
1967        if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1968            info!("Shutting down gRPC server");
1969            if let Err(e) = grpc_handle.shutdown().await {
1970                warn!("Failed to gracefully shutdown gRPC server: {e}");
1971            }
1972        }
1973    }
1974
1975    /// Asynchronously reconfigures the state of the authority node for the next
1976    /// epoch.
1977    async fn reconfigure_state(
1978        &self,
1979        state: &Arc<AuthorityState>,
1980        cur_epoch_store: &AuthorityPerEpochStore,
1981        next_epoch_committee: Committee,
1982        next_epoch_start_system_state: EpochStartSystemState,
1983        global_state_hasher: Arc<GlobalStateHasher>,
1984    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1985        let next_epoch = next_epoch_committee.epoch();
1986
1987        let last_checkpoint = self
1988            .checkpoint_store
1989            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1990            .expect("Error loading last checkpoint for current epoch")
1991            .expect("Could not load last checkpoint for current epoch");
1992        let epoch_supply_change = last_checkpoint
1993            .end_of_epoch_data
1994            .as_ref()
1995            .ok_or_else(|| {
1996                IotaError::from("last checkpoint in epoch should contain end of epoch data")
1997            })?
1998            .epoch_supply_change;
1999
2000        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2001
2002        assert_eq!(
2003            Some(last_checkpoint_seq),
2004            self.checkpoint_store
2005                .get_highest_executed_checkpoint_seq_number()
2006                .expect("Error loading highest executed checkpoint sequence number")
2007        );
2008
2009        let epoch_start_configuration = EpochStartConfiguration::new(
2010            next_epoch_start_system_state,
2011            *last_checkpoint.digest(),
2012            state.get_object_store().as_ref(),
2013            EpochFlag::default_flags_for_new_epoch(&state.config),
2014        )
2015        .expect("EpochStartConfiguration construction cannot fail");
2016
2017        let new_epoch_store = self
2018            .state
2019            .reconfigure(
2020                cur_epoch_store,
2021                self.config.supported_protocol_versions.unwrap(),
2022                next_epoch_committee,
2023                epoch_start_configuration,
2024                global_state_hasher,
2025                &self.config.expensive_safety_check_config,
2026                epoch_supply_change,
2027                last_checkpoint_seq,
2028            )
2029            .await
2030            .expect("Reconfigure authority state cannot fail");
2031        info!(next_epoch, "Node State has been reconfigured");
2032        assert_eq!(next_epoch, new_epoch_store.epoch());
2033        self.state.get_reconfig_api().update_epoch_flags_metrics(
2034            cur_epoch_store.epoch_start_config().flags(),
2035            new_epoch_store.epoch_start_config().flags(),
2036        );
2037
2038        Ok(new_epoch_store)
2039    }
2040
2041    pub fn get_config(&self) -> &NodeConfig {
2042        &self.config
2043    }
2044
2045    async fn execute_transaction_immediately_at_zero_epoch(
2046        state: &Arc<AuthorityState>,
2047        epoch_store: &Arc<AuthorityPerEpochStore>,
2048        tx: &Transaction,
2049        span: tracing::Span,
2050    ) {
2051        let _guard = span.enter();
2052        let transaction =
2053            iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2054                iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2055                    tx.data().clone(),
2056                    iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2057                ),
2058            );
2059        state
2060            .try_execute_immediately(&transaction, None, epoch_store)
2061            .unwrap();
2062    }
2063
2064    pub fn randomness_handle(&self) -> randomness::Handle {
2065        self.randomness_handle.clone()
2066    }
2067
2068    /// Sends signed capability notification to committee validators for
2069    /// non-committee validators. This method implements retry logic to handle
2070    /// failed attempts to send the notification. It will retry sending the
2071    /// notification with an increasing interval until it receives a successful
2072    /// response from a f+1 committee members or 2f+1 non-retryable errors.
2073    async fn send_signed_capability_notification_to_committee_with_retry(
2074        &self,
2075        epoch_store: &Arc<AuthorityPerEpochStore>,
2076    ) {
2077        const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2078        const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2079        const MAX_RETRY_INTERVAL_SECS: u64 = 300; // 5 minutes
2080
2081        // Create the capability notification once
2082        let config = epoch_store.protocol_config();
2083        let binary_config = to_binary_config(config);
2084
2085        // Create the capability notification
2086        let capabilities = AuthorityCapabilitiesV1::new(
2087            self.state.name,
2088            epoch_store.get_chain(),
2089            self.config
2090                .supported_protocol_versions
2091                .expect("Supported versions should be populated")
2092                .truncate_below(config.version),
2093            self.state
2094                .get_available_system_packages(&binary_config)
2095                .await,
2096        );
2097
2098        // Sign the capabilities using the authority key pair from config
2099        let signature = AuthoritySignature::new_secure(
2100            &IntentMessage::new(
2101                Intent::iota_app(IntentScope::AuthorityCapabilities),
2102                &capabilities,
2103            ),
2104            &epoch_store.epoch(),
2105            self.config.authority_key_pair(),
2106        );
2107
2108        let request = HandleCapabilityNotificationRequestV1 {
2109            message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2110        };
2111
2112        let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2113
2114        loop {
2115            let auth_agg = self.auth_agg.load();
2116            match auth_agg
2117                .send_capability_notification_to_quorum(request.clone())
2118                .await
2119            {
2120                Ok(_) => {
2121                    info!("Successfully sent capability notification to committee");
2122                    break;
2123                }
2124                Err(err) => {
2125                    match &err {
2126                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2127                            errors,
2128                        } => {
2129                            warn!(
2130                                "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2131                                retry_interval, errors
2132                            );
2133                        }
2134                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2135                            errors,
2136                        } => {
2137                            error!(
2138                                "Failed to send capability notification to committee (non-retryable error): {:?}",
2139                                errors
2140                            );
2141                            break;
2142                        }
2143                    };
2144
2145                    // Wait before retrying
2146                    tokio::time::sleep(retry_interval).await;
2147
2148                    // Increase retry interval for the next attempt, capped at max
2149                    retry_interval = std::cmp::min(
2150                        retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2151                        Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2152                    );
2153                }
2154            }
2155        }
2156    }
2157}
2158
2159#[cfg(msim)]
2160impl IotaNode {
2161    pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2162        self.sim_state.sim_node.id()
2163    }
2164
2165    pub fn set_safe_mode_expected(&self, new_value: bool) {
2166        info!("Setting safe mode expected to {}", new_value);
2167        self.sim_state
2168            .sim_safe_mode_expected
2169            .store(new_value, Ordering::Relaxed);
2170    }
2171}
2172
2173enum SpawnOnce {
2174    // Mutex is only needed to make SpawnOnce Sync
2175    Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2176    #[allow(unused)]
2177    Started(iota_http::ServerHandle),
2178}
2179
2180impl SpawnOnce {
2181    pub fn new(
2182        future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2183    ) -> Self {
2184        Self::Unstarted(Mutex::new(Box::pin(future)))
2185    }
2186
2187    pub async fn start(self) -> Self {
2188        match self {
2189            Self::Unstarted(future) => {
2190                let server = future
2191                    .into_inner()
2192                    .await
2193                    .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2194                let handle = server.handle().clone();
2195                tokio::spawn(async move {
2196                    if let Err(err) = server.serve().await {
2197                        info!("Server stopped: {err}");
2198                    }
2199                    info!("Server stopped");
2200                });
2201                Self::Started(handle)
2202            }
2203            Self::Started(_) => self,
2204        }
2205    }
2206
2207    pub fn shutdown(self) {
2208        if let SpawnOnce::Started(handle) = self {
2209            handle.trigger_shutdown();
2210        }
2211    }
2212}
2213
2214/// Notify [`DiscoveryEventLoop`] that a new list of trusted peers are now
2215/// available.
2216fn send_trusted_peer_change(
2217    config: &NodeConfig,
2218    sender: &watch::Sender<TrustedPeerChangeEvent>,
2219    new_epoch_start_state: &EpochStartSystemState,
2220) {
2221    let new_committee =
2222        new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2223
2224    sender.send_modify(|event| {
2225        core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2226        event.new_committee = new_committee;
2227    })
2228}
2229
2230fn build_kv_store(
2231    state: &Arc<AuthorityState>,
2232    config: &NodeConfig,
2233    registry: &Registry,
2234) -> Result<Arc<TransactionKeyValueStore>> {
2235    let metrics = KeyValueStoreMetrics::new(registry);
2236    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2237
2238    let base_url = &config.transaction_kv_store_read_config.base_url;
2239
2240    if base_url.is_empty() {
2241        info!("no http kv store url provided, using local db only");
2242        return Ok(Arc::new(db_store));
2243    }
2244
2245    base_url.parse::<url::Url>().tap_err(|e| {
2246        error!(
2247            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2248            base_url, e
2249        )
2250    })?;
2251
2252    let http_store = HttpKVStore::new_kv(
2253        base_url,
2254        config.transaction_kv_store_read_config.cache_size,
2255        metrics.clone(),
2256    )?;
2257    info!("using local key-value store with fallback to http key-value store");
2258    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2259        db_store,
2260        http_store,
2261        metrics,
2262        "json_rpc_fallback",
2263    )))
2264}
2265
2266/// Builds and starts the gRPC server for the IOTA node based on the node's
2267/// configuration.
2268///
2269/// This function performs the following tasks:
2270/// 1. Checks if the node is a validator by inspecting the consensus
2271///    configuration; if so, it returns early as validators do not expose gRPC
2272///    APIs.
2273/// 2. Checks if gRPC is enabled in the configuration.
2274/// 3. Creates broadcast channels for checkpoint streaming.
2275/// 4. Initializes the gRPC checkpoint service.
2276/// 5. Spawns the gRPC server to listen for incoming connections.
2277///
2278/// Returns a tuple of optional broadcast channels for checkpoint summary and
2279/// data.
2280async fn build_grpc_server(
2281    config: &NodeConfig,
2282    state: Arc<AuthorityState>,
2283    state_sync_store: RocksDbStore,
2284    executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2285    prometheus_registry: &Registry,
2286    server_version: ServerVersion,
2287) -> Result<Option<GrpcServerHandle>> {
2288    // Validators do not expose gRPC APIs
2289    if config.consensus_config().is_some() || !config.enable_grpc_api {
2290        return Ok(None);
2291    }
2292
2293    let Some(grpc_config) = &config.grpc_api_config else {
2294        return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2295    };
2296
2297    // Get chain identifier from state directly
2298    let chain_id = state.get_chain_identifier();
2299
2300    let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2301
2302    // Create cancellation token for proper shutdown hierarchy
2303    let shutdown_token = CancellationToken::new();
2304
2305    // Create GrpcReader
2306    let grpc_reader = Arc::new(GrpcReader::new(
2307        grpc_read_store,
2308        Some(server_version.to_string()),
2309    ));
2310
2311    // Create gRPC server metrics
2312    let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2313    let client_id_source = config
2314        .policy_config
2315        .as_ref()
2316        .map(|p| p.client_id_source.clone());
2317
2318    let handle = start_grpc_server(
2319        grpc_reader,
2320        executor,
2321        grpc_config.clone(),
2322        shutdown_token,
2323        chain_id,
2324        Some(grpc_server_metrics),
2325        state.traffic_controller.clone(),
2326        client_id_source,
2327    )
2328    .await?;
2329
2330    Ok(Some(handle))
2331}
2332
2333/// Builds and starts the HTTP server for the IOTA node, exposing the JSON-RPC
2334/// API based on the node's configuration.
2335///
2336/// This function performs the following tasks:
2337/// 1. Checks if the node is a validator by inspecting the consensus
2338///    configuration; if so, it returns early as validators do not expose these
2339///    APIs.
2340/// 2. Creates an Axum router to handle HTTP requests.
2341/// 3. Initializes the JSON-RPC server and registers various RPC modules based
2342///    on the node's state and configuration, including CoinApi,
2343///    TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
2344///    IndexerApi.
2345/// 4. Binds the server to the specified JSON-RPC address and starts listening
2346///    for incoming connections.
2347pub async fn build_http_server(
2348    state: Arc<AuthorityState>,
2349    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2350    config: &NodeConfig,
2351    prometheus_registry: &Registry,
2352) -> Result<Option<iota_http::ServerHandle>> {
2353    // Validators do not expose these APIs
2354    if config.consensus_config().is_some() {
2355        return Ok(None);
2356    }
2357
2358    let mut router = axum::Router::new();
2359
2360    let json_rpc_router = {
2361        let traffic_controller = state.traffic_controller.clone();
2362        let mut server = JsonRpcServerBuilder::new(
2363            env!("CARGO_PKG_VERSION"),
2364            prometheus_registry,
2365            traffic_controller,
2366            config.policy_config.clone(),
2367        );
2368
2369        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2370
2371        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2372        server.register_module(ReadApi::new(
2373            state.clone(),
2374            kv_store.clone(),
2375            metrics.clone(),
2376        ))?;
2377        server.register_module(CoinReadApi::new(
2378            state.clone(),
2379            kv_store.clone(),
2380            metrics.clone(),
2381        )?)?;
2382
2383        // if run_with_range is enabled we want to prevent any transactions
2384        // run_with_range = None is normal operating conditions
2385        if config.run_with_range.is_none() {
2386            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2387        }
2388        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2389
2390        if let Some(transaction_orchestrator) = transaction_orchestrator {
2391            server.register_module(TransactionExecutionApi::new(
2392                state.clone(),
2393                transaction_orchestrator.clone(),
2394                metrics.clone(),
2395            ))?;
2396        }
2397
2398        let iota_names_config = config
2399            .iota_names_config
2400            .clone()
2401            .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2402
2403        server.register_module(IndexerApi::new(
2404            state.clone(),
2405            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2406            kv_store,
2407            metrics,
2408            iota_names_config,
2409            config.indexer_max_subscriptions,
2410        ))?;
2411        server.register_module(MoveUtils::new(state.clone()))?;
2412
2413        let server_type = config.jsonrpc_server_type();
2414
2415        server.to_router(server_type).await?
2416    };
2417
2418    router = router.merge(json_rpc_router);
2419
2420    router = router
2421        .route("/health", axum::routing::get(health_check_handler))
2422        .route_layer(axum::Extension(state));
2423
2424    let layers = ServiceBuilder::new()
2425        .map_request(|mut request: axum::http::Request<_>| {
2426            if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2427                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2428                request.extensions_mut().insert(axum_connect_info);
2429            }
2430            request
2431        })
2432        .layer(axum::middleware::from_fn(server_timing_middleware));
2433
2434    router = router.layer(layers);
2435
2436    let handle = iota_http::Builder::new()
2437        .serve(&config.json_rpc_address, router)
2438        .map_err(|e| anyhow::anyhow!("{e}"))?;
2439    info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2440
2441    Ok(Some(handle))
2442}
2443
2444#[derive(Debug, serde::Serialize, serde::Deserialize)]
2445pub struct Threshold {
2446    pub threshold_seconds: Option<u32>,
2447}
2448
2449async fn health_check_handler(
2450    axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2451    axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2452) -> impl axum::response::IntoResponse {
2453    if let Some(threshold_seconds) = threshold_seconds {
2454        // Attempt to get the latest checkpoint
2455        let summary = match state
2456            .get_checkpoint_store()
2457            .get_highest_executed_checkpoint()
2458        {
2459            Ok(Some(summary)) => summary,
2460            Ok(None) => {
2461                warn!("Highest executed checkpoint not found");
2462                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2463            }
2464            Err(err) => {
2465                warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2466                return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2467            }
2468        };
2469
2470        // Calculate the threshold time based on the provided threshold_seconds
2471        let latest_chain_time = summary.timestamp();
2472        let threshold =
2473            std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2474
2475        // Check if the latest checkpoint is within the threshold
2476        if latest_chain_time < threshold {
2477            warn!(
2478                ?latest_chain_time,
2479                ?threshold,
2480                "failing health check due to checkpoint lag"
2481            );
2482            return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2483        }
2484    }
2485    // if health endpoint is responding and no threshold is given, respond success
2486    (axum::http::StatusCode::OK, "up")
2487}
2488
2489#[cfg(not(test))]
2490fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2491    protocol_config.max_transactions_per_checkpoint() as usize
2492}
2493
2494#[cfg(test)]
2495fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2496    2
2497}