iota_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    path::PathBuf,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use enum_dispatch::enum_dispatch;
14use fastcrypto::traits::KeyPair as _;
15use iota_config::{ConsensusConfig, NodeConfig, node::ConsensusProtocol};
16use iota_metrics::RegistryService;
17use iota_protocol_config::{ConsensusChoice, ProtocolVersion};
18use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tokio::{
21    sync::{Mutex, MutexGuard},
22    time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27    authority::authority_per_epoch_store::AuthorityPerEpochStore,
28    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29    consensus_handler::ConsensusHandlerInitializer,
30    consensus_manager::{mysticeti_manager::MysticetiManager, starfish_manager::StarfishManager},
31    consensus_validator::IotaTxValidator,
32    mysticeti_adapter::LazyMysticetiClient,
33    starfish_adapter::LazyStarfishClient,
34};
35
36pub mod mysticeti_manager;
37pub mod starfish_manager;
38
39#[derive(PartialEq)]
40pub(crate) enum Running {
41    True(EpochId, ProtocolVersion),
42    False,
43}
44
45#[async_trait]
46#[enum_dispatch(ProtocolManager)]
47pub trait ConsensusManagerTrait {
48    async fn start(
49        &self,
50        node_config: &NodeConfig,
51        epoch_store: Arc<AuthorityPerEpochStore>,
52        consensus_handler_initializer: ConsensusHandlerInitializer,
53        tx_validator: IotaTxValidator,
54    );
55
56    async fn shutdown(&self);
57
58    async fn is_running(&self) -> bool;
59}
60
61// Wraps the underlying consensus protocol managers to make calling
62// the ConsensusManagerTrait easier.
63#[enum_dispatch]
64enum ProtocolManager {
65    Mysticeti(MysticetiManager),
66    Starfish(StarfishManager),
67}
68
69impl ProtocolManager {
70    /// Creates a new mysticeti manager.
71    pub fn new_mysticeti(
72        config: &NodeConfig,
73        consensus_config: &ConsensusConfig,
74        registry_service: &RegistryService,
75        metrics: Arc<ConsensusManagerMetrics>,
76        client: Arc<LazyMysticetiClient>,
77    ) -> Self {
78        Self::Mysticeti(MysticetiManager::new(
79            config.protocol_key_pair().copy(),
80            config.network_key_pair().copy(),
81            consensus_config.db_path().to_path_buf(),
82            registry_service.clone(),
83            metrics,
84            client,
85        ))
86    }
87
88    /// Creates a new starfish manager.
89    pub fn new_starfish(
90        config: &NodeConfig,
91        consensus_config: &ConsensusConfig,
92        registry_service: &RegistryService,
93        metrics: Arc<ConsensusManagerMetrics>,
94        client: Arc<LazyStarfishClient>,
95    ) -> Self {
96        Self::Starfish(StarfishManager::new(
97            config.protocol_key_pair().copy(),
98            config.network_key_pair().copy(),
99            consensus_config.db_path().to_path_buf(),
100            registry_service.clone(),
101            metrics,
102            client,
103        ))
104    }
105}
106
107/// Used by IOTA validator to start consensus protocol for each epoch.
108pub struct ConsensusManager {
109    consensus_config: ConsensusConfig,
110    mysticeti_manager: ProtocolManager,
111    starfish_manager: ProtocolManager,
112    mysticeti_client: Arc<LazyMysticetiClient>,
113    starfish_client: Arc<LazyStarfishClient>,
114    active: parking_lot::Mutex<Vec<bool>>,
115    consensus_client: Arc<UpdatableConsensusClient>,
116}
117
118impl ConsensusManager {
119    pub fn new(
120        node_config: &NodeConfig,
121        consensus_config: &ConsensusConfig,
122        registry_service: &RegistryService,
123        metrics_registry: &Registry,
124        consensus_client: Arc<UpdatableConsensusClient>,
125    ) -> Self {
126        let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
127        let mysticeti_client = Arc::new(LazyMysticetiClient::new());
128        let mysticeti_manager = ProtocolManager::new_mysticeti(
129            node_config,
130            consensus_config,
131            registry_service,
132            metrics.clone(),
133            mysticeti_client.clone(),
134        );
135        let starfish_client = Arc::new(LazyStarfishClient::new());
136        let starfish_manager = ProtocolManager::new_starfish(
137            node_config,
138            consensus_config,
139            registry_service,
140            metrics,
141            starfish_client.clone(),
142        );
143        Self {
144            consensus_config: consensus_config.clone(),
145            mysticeti_manager,
146            starfish_manager,
147            mysticeti_client,
148            starfish_client,
149            active: parking_lot::Mutex::new(vec![false; 2]),
150            consensus_client,
151        }
152    }
153
154    pub fn get_storage_base_path(&self) -> PathBuf {
155        self.consensus_config.db_path().to_path_buf()
156    }
157
158    // Picks the consensus protocol based on the protocol config and the epoch.
159    fn pick_protocol(&self, epoch_store: &AuthorityPerEpochStore) -> ConsensusProtocol {
160        let protocol_config = epoch_store.protocol_config();
161        if let Ok(consensus_choice) = std::env::var("CONSENSUS_PROTOCOL") {
162            match consensus_choice.to_lowercase().as_str() {
163                "mysticeti" => return ConsensusProtocol::Mysticeti,
164                "starfish" => return ConsensusProtocol::Starfish,
165                "swap_each_epoch" => {
166                    let protocol = if epoch_store.epoch() % 2 == 0 {
167                        ConsensusProtocol::Starfish
168                    } else {
169                        ConsensusProtocol::Mysticeti
170                    };
171                    return protocol;
172                }
173                _ => {
174                    info!(
175                        "Invalid consensus choice {} in env var. Continue to pick consensus with protocol config",
176                        consensus_choice
177                    );
178                }
179            };
180        }
181
182        match protocol_config.consensus_choice() {
183            ConsensusChoice::Mysticeti => ConsensusProtocol::Mysticeti,
184        }
185    }
186}
187
188#[async_trait]
189impl ConsensusManagerTrait for ConsensusManager {
190    async fn start(
191        &self,
192        node_config: &NodeConfig,
193        epoch_store: Arc<AuthorityPerEpochStore>,
194        consensus_handler_initializer: ConsensusHandlerInitializer,
195        tx_validator: IotaTxValidator,
196    ) {
197        let protocol_manager = {
198            let mut active = self.active.lock();
199            active.iter().enumerate().for_each(|(index, active)| {
200                assert!(
201                    !*active,
202                    "Cannot start consensus. ConsensusManager protocol {index} is already running"
203                );
204            });
205            let protocol = self.pick_protocol(&epoch_store);
206            info!("Starting consensus protocol {protocol:?} ...");
207            self.consensus_client.set(self.mysticeti_client.clone());
208            match protocol {
209                ConsensusProtocol::Mysticeti => {
210                    active[0] = true;
211                    self.consensus_client.set(self.mysticeti_client.clone());
212                    &self.mysticeti_manager
213                }
214                ConsensusProtocol::Starfish => {
215                    active[1] = true;
216                    self.consensus_client.set(self.starfish_client.clone());
217                    &self.starfish_manager
218                }
219            }
220        };
221
222        protocol_manager
223            .start(
224                node_config,
225                epoch_store,
226                consensus_handler_initializer,
227                tx_validator,
228            )
229            .await
230    }
231
232    async fn shutdown(&self) {
233        info!("Shutting down consensus ...");
234        let prev_active = {
235            let mut active = self.active.lock();
236            std::mem::replace(&mut *active, vec![false; 2])
237        };
238        if prev_active[0] {
239            self.mysticeti_manager.shutdown().await;
240        }
241
242        if prev_active[1] {
243            self.starfish_manager.shutdown().await;
244        }
245
246        self.consensus_client.clear();
247    }
248
249    async fn is_running(&self) -> bool {
250        let active = self.active.lock();
251        active.iter().any(|i| *i)
252    }
253}
254
255/// A ConsensusClient that can be updated internally at any time. This usually
256/// happening during epoch change where a client is set after the new consensus
257/// is started for the new epoch.
258#[derive(Default)]
259pub struct UpdatableConsensusClient {
260    // An extra layer of Arc<> is needed as required by ArcSwapAny.
261    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
262}
263
264impl UpdatableConsensusClient {
265    pub fn new() -> Self {
266        Self {
267            client: ArcSwapOption::empty(),
268        }
269    }
270
271    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
272        const START_TIMEOUT: Duration = Duration::from_secs(30);
273        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
274        if let Ok(client) = timeout(START_TIMEOUT, async {
275            loop {
276                let Some(client) = self.client.load_full() else {
277                    sleep(RETRY_INTERVAL).await;
278                    continue;
279                };
280                return client;
281            }
282        })
283        .await
284        {
285            return client;
286        }
287
288        panic!("Timed out after {START_TIMEOUT:?} waiting for Consensus to start!",);
289    }
290
291    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
292        self.client.store(Some(Arc::new(client)));
293    }
294
295    pub fn clear(&self) {
296        self.client.store(None);
297    }
298}
299
300#[async_trait]
301impl ConsensusClient for UpdatableConsensusClient {
302    async fn submit(
303        &self,
304        transactions: &[ConsensusTransaction],
305        epoch_store: &Arc<AuthorityPerEpochStore>,
306    ) -> IotaResult<BlockStatusReceiver> {
307        let client = self.get().await;
308        client.submit(transactions, epoch_store).await
309    }
310}
311
312pub struct ConsensusManagerMetrics {
313    start_latency: IntGauge,
314    shutdown_latency: IntGauge,
315}
316
317impl ConsensusManagerMetrics {
318    pub fn new(registry: &Registry) -> Self {
319        Self {
320            start_latency: register_int_gauge_with_registry!(
321                "consensus_manager_start_latency",
322                "The latency of starting up consensus nodes",
323                registry,
324            )
325            .unwrap(),
326            shutdown_latency: register_int_gauge_with_registry!(
327                "consensus_manager_shutdown_latency",
328                "The latency of shutting down consensus nodes",
329                registry,
330            )
331            .unwrap(),
332        }
333    }
334}
335
336pub(crate) struct RunningLockGuard<'a> {
337    state_guard: MutexGuard<'a, Running>,
338    metrics: &'a ConsensusManagerMetrics,
339    epoch: Option<EpochId>,
340    protocol_version: Option<ProtocolVersion>,
341    start: Instant,
342}
343
344impl<'a> RunningLockGuard<'a> {
345    pub(crate) async fn acquire_start(
346        metrics: &'a ConsensusManagerMetrics,
347        running_mutex: &'a Mutex<Running>,
348        epoch: EpochId,
349        version: ProtocolVersion,
350    ) -> Option<RunningLockGuard<'a>> {
351        let running = running_mutex.lock().await;
352        if let Running::True(epoch, version) = *running {
353            tracing::warn!(
354                "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
355            );
356            return None;
357        }
358
359        tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
360
361        Some(RunningLockGuard {
362            state_guard: running,
363            metrics,
364            start: Instant::now(),
365            epoch: Some(epoch),
366            protocol_version: Some(version),
367        })
368    }
369
370    pub(crate) async fn acquire_shutdown(
371        metrics: &'a ConsensusManagerMetrics,
372        running_mutex: &'a Mutex<Running>,
373    ) -> Option<RunningLockGuard<'a>> {
374        let running = running_mutex.lock().await;
375        if let Running::True(epoch, version) = *running {
376            tracing::info!(
377                "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
378            );
379        } else {
380            tracing::warn!("Consensus shutdown was called but consensus is not running");
381            return None;
382        }
383
384        Some(RunningLockGuard {
385            state_guard: running,
386            metrics,
387            start: Instant::now(),
388            epoch: None,
389            protocol_version: None,
390        })
391    }
392}
393
394impl Drop for RunningLockGuard<'_> {
395    fn drop(&mut self) {
396        match *self.state_guard {
397            // consensus was running and now will have to be marked as shutdown
398            Running::True(epoch, version) => {
399                tracing::info!(
400                    "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
401                    self.start.elapsed().as_secs_f64()
402                );
403
404                self.metrics
405                    .shutdown_latency
406                    .set(self.start.elapsed().as_secs_f64() as i64);
407
408                *self.state_guard = Running::False;
409            }
410            // consensus was not running and now will be marked as started
411            Running::False => {
412                tracing::info!(
413                    "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
414                    self.epoch.unwrap(),
415                    self.protocol_version.unwrap(),
416                    self.start.elapsed().as_secs_f64()
417                );
418
419                self.metrics
420                    .start_latency
421                    .set(self.start.elapsed().as_secs_f64() as i64);
422
423                *self.state_guard =
424                    Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
425            }
426        }
427    }
428}