iota_core/consensus_manager/
mysticeti_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, sync::Arc};
6
7use arc_swap::ArcSwapOption;
8use async_trait::async_trait;
9use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
10use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority};
11use fastcrypto::ed25519;
12use iota_config::NodeConfig;
13use iota_metrics::{RegistryID, RegistryService, monitored_mpsc::unbounded_channel};
14use iota_protocol_config::ConsensusNetwork;
15use iota_types::{
16    committee::EpochId,
17    iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
18};
19use prometheus::Registry;
20use tokio::sync::Mutex;
21use tracing::info;
22
23use crate::{
24    authority::authority_per_epoch_store::AuthorityPerEpochStore,
25    consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler},
26    consensus_manager::{
27        ConsensusManagerMetrics, ConsensusManagerTrait, Running, RunningLockGuard,
28    },
29    consensus_validator::IotaTxValidator,
30    mysticeti_adapter::LazyMysticetiClient,
31};
32
33#[cfg(test)]
34#[path = "../unit_tests/mysticeti_manager_tests.rs"]
35pub mod mysticeti_manager_tests;
36
37pub struct MysticetiManager {
38    protocol_keypair: ProtocolKeyPair,
39    network_keypair: NetworkKeyPair,
40    storage_base_path: PathBuf,
41    // TODO: switch to parking_lot::Mutex.
42    running: Mutex<Running>,
43    metrics: Arc<ConsensusManagerMetrics>,
44    registry_service: RegistryService,
45    authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
46    boot_counter: Mutex<u64>,
47    // Use a shared lazy mysticeti client so we can update the internal mysticeti
48    // client that gets created for every new epoch.
49    client: Arc<LazyMysticetiClient>,
50    // TODO: switch to parking_lot::Mutex.
51    consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
52    consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
53}
54
55impl MysticetiManager {
56    /// NOTE: Mysticeti protocol key uses Ed25519 instead of BLS.
57    /// But for security, the protocol keypair must be different from the
58    /// network keypair.
59    pub fn new(
60        protocol_keypair: ed25519::Ed25519KeyPair,
61        network_keypair: ed25519::Ed25519KeyPair,
62        storage_base_path: PathBuf,
63        registry_service: RegistryService,
64        metrics: Arc<ConsensusManagerMetrics>,
65        client: Arc<LazyMysticetiClient>,
66    ) -> Self {
67        Self {
68            protocol_keypair: ProtocolKeyPair::new(protocol_keypair),
69            network_keypair: NetworkKeyPair::new(network_keypair),
70            storage_base_path,
71            running: Mutex::new(Running::False),
72            metrics,
73            registry_service,
74            authority: ArcSwapOption::empty(),
75            client,
76            consensus_handler: Mutex::new(None),
77            boot_counter: Mutex::new(0),
78            consumer_monitor: ArcSwapOption::empty(),
79        }
80    }
81
82    fn get_store_path(&self, epoch: EpochId) -> PathBuf {
83        let mut store_path = self.storage_base_path.clone();
84        store_path.push(format!("{}", epoch));
85        store_path
86    }
87
88    fn pick_network(&self, epoch_store: &AuthorityPerEpochStore) -> ConsensusNetwork {
89        if let Ok(type_str) = std::env::var("CONSENSUS_NETWORK") {
90            match type_str.to_lowercase().as_str() {
91                "tonic" => return ConsensusNetwork::Tonic,
92                _ => {
93                    info!(
94                        "Invalid consensus network type {} in env var. Continue to use the value from protocol config.",
95                        type_str
96                    );
97                }
98            }
99        }
100        epoch_store.protocol_config().consensus_network()
101    }
102}
103
104#[async_trait]
105
106impl ConsensusManagerTrait for MysticetiManager {
107    /// Starts the Mysticeti consensus manager for the current epoch.
108    async fn start(
109        &self,
110        config: &NodeConfig,
111        epoch_store: Arc<AuthorityPerEpochStore>,
112        consensus_handler_initializer: ConsensusHandlerInitializer,
113        tx_validator: IotaTxValidator,
114    ) {
115        let system_state = epoch_store.epoch_start_state();
116        let committee: Committee = system_state.get_consensus_committee();
117        let epoch = epoch_store.epoch();
118        let protocol_config = epoch_store.protocol_config();
119        let network_type = self.pick_network(&epoch_store);
120
121        let Some(_guard) = RunningLockGuard::acquire_start(
122            &self.metrics,
123            &self.running,
124            epoch,
125            protocol_config.version,
126        )
127        .await
128        else {
129            return;
130        };
131
132        let consensus_config = config
133            .consensus_config()
134            .expect("consensus_config should exist");
135
136        let parameters = Parameters {
137            db_path: self.get_store_path(epoch),
138            ..consensus_config.parameters.clone().unwrap_or_default()
139        };
140
141        let own_protocol_key = self.protocol_keypair.public();
142        let (own_index, _) = committee
143            .authorities()
144            .find(|(_, a)| a.protocol_key == own_protocol_key)
145            .expect("Own authority should be among the consensus authorities!");
146
147        let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
148
149        let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
150
151        let consensus_handler = consensus_handler_initializer.new_consensus_handler();
152        let consumer = CommitConsumer::new(
153            commit_sender,
154            consensus_handler.last_processed_subdag_index() as CommitIndex,
155        );
156        let monitor = consumer.monitor();
157
158        // If there is a previous consumer monitor, it indicates that the consensus
159        // engine has been restarted, due to an epoch change. However, that on its
160        // own doesn't tell us much whether it participated on an active epoch or an old
161        // one. We need to check if it has handled any commits to determine this.
162        // If indeed any commits did happen, then we assume that node did participate on
163        // previous run.
164        let participated_on_previous_run =
165            if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
166                previous_monitor.highest_handled_commit() > 0
167            } else {
168                false
169            };
170
171        // Increment the boot counter only if the consensus successfully participated in
172        // the previous run. This is typical during normal epoch changes, where
173        // the node restarts as expected, and the boot counter is incremented to prevent
174        // amnesia recovery on the next start. If the node is recovering from a
175        // restore process and catching up across multiple epochs, it won't handle any
176        // commits until it reaches the last active epoch. In this scenario, we
177        // do not increment the boot counter, as we need amnesia recovery to run.
178        let mut boot_counter = self.boot_counter.lock().await;
179        if participated_on_previous_run {
180            *boot_counter += 1;
181        } else {
182            info!(
183                "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
184                *boot_counter
185            );
186        }
187
188        let authority = ConsensusAuthority::start(
189            network_type,
190            own_index,
191            committee.clone(),
192            parameters.clone(),
193            protocol_config.clone(),
194            self.protocol_keypair.clone(),
195            self.network_keypair.clone(),
196            Arc::new(tx_validator.clone()),
197            consumer,
198            registry.clone(),
199            *boot_counter,
200        )
201        .await;
202        let client = authority.transaction_client();
203
204        let registry_id = self.registry_service.add(registry.clone());
205
206        let registered_authority = Arc::new((authority, registry_id));
207        self.authority.swap(Some(registered_authority.clone()));
208
209        // Initialize the client to send transactions to this Mysticeti instance.
210        self.client.set(client);
211
212        // spin up the new mysticeti consensus handler to listen for committed sub dags
213        let handler = MysticetiConsensusHandler::new(consensus_handler, commit_receiver, monitor);
214
215        let mut consensus_handler = self.consensus_handler.lock().await;
216        *consensus_handler = Some(handler);
217
218        // Wait until all locally available commits have been processed
219        registered_authority.0.replay_complete().await;
220    }
221
222    async fn shutdown(&self) {
223        let Some(_guard) = RunningLockGuard::acquire_shutdown(&self.metrics, &self.running).await
224        else {
225            return;
226        };
227
228        // Stop consensus submissions.
229        self.client.clear();
230
231        // swap with empty to ensure there is no other reference to authority and we can
232        // safely do Arc unwrap
233        let r = self.authority.swap(None).unwrap();
234        let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
235            panic!("Failed to retrieve the mysticeti authority");
236        };
237
238        // shutdown the authority and wait for it
239        authority.stop().await;
240
241        // drop the old consensus handler to force stop any underlying task running.
242        let mut consensus_handler = self.consensus_handler.lock().await;
243        if let Some(mut handler) = consensus_handler.take() {
244            handler.abort().await;
245        }
246
247        // unregister the registry id
248        self.registry_service.remove(registry_id);
249    }
250
251    async fn is_running(&self) -> bool {
252        Running::False != *self.running.lock().await
253    }
254}