iota_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    path::PathBuf,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use enum_dispatch::enum_dispatch;
14use fastcrypto::traits::KeyPair as _;
15use iota_config::{ConsensusConfig, NodeConfig};
16use iota_metrics::RegistryService;
17use iota_protocol_config::ProtocolVersion;
18use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tokio::{
21    sync::{Mutex, MutexGuard},
22    time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27    authority::authority_per_epoch_store::AuthorityPerEpochStore,
28    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29    consensus_handler::ConsensusHandlerInitializer,
30    consensus_manager::mysticeti_manager::MysticetiManager,
31    consensus_validator::IotaTxValidator,
32    mysticeti_adapter::LazyMysticetiClient,
33};
34
35pub mod mysticeti_manager;
36
37#[derive(PartialEq)]
38pub(crate) enum Running {
39    True(EpochId, ProtocolVersion),
40    False,
41}
42
43#[async_trait]
44#[enum_dispatch(ProtocolManager)]
45pub trait ConsensusManagerTrait {
46    async fn start(
47        &self,
48        node_config: &NodeConfig,
49        epoch_store: Arc<AuthorityPerEpochStore>,
50        consensus_handler_initializer: ConsensusHandlerInitializer,
51        tx_validator: IotaTxValidator,
52    );
53
54    async fn shutdown(&self);
55
56    async fn is_running(&self) -> bool;
57}
58
59// Wraps the underlying consensus protocol managers to make calling
60// the ConsensusManagerTrait easier.
61#[enum_dispatch]
62enum ProtocolManager {
63    Mysticeti(MysticetiManager),
64}
65
66impl ProtocolManager {
67    /// Creates a new mysticeti manager.
68    pub fn new_mysticeti(
69        config: &NodeConfig,
70        consensus_config: &ConsensusConfig,
71        registry_service: &RegistryService,
72        metrics: Arc<ConsensusManagerMetrics>,
73        client: Arc<LazyMysticetiClient>,
74    ) -> Self {
75        Self::Mysticeti(MysticetiManager::new(
76            config.protocol_key_pair().copy(),
77            config.network_key_pair().copy(),
78            consensus_config.db_path().to_path_buf(),
79            registry_service.clone(),
80            metrics,
81            client,
82        ))
83    }
84}
85
86/// Used by IOTA validator to start consensus protocol for each epoch.
87pub struct ConsensusManager {
88    consensus_config: ConsensusConfig,
89    mysticeti_manager: ProtocolManager,
90    mysticeti_client: Arc<LazyMysticetiClient>,
91    active: parking_lot::Mutex<bool>,
92    consensus_client: Arc<UpdatableConsensusClient>,
93}
94
95impl ConsensusManager {
96    pub fn new(
97        node_config: &NodeConfig,
98        consensus_config: &ConsensusConfig,
99        registry_service: &RegistryService,
100        consensus_client: Arc<UpdatableConsensusClient>,
101    ) -> Self {
102        let metrics = Arc::new(ConsensusManagerMetrics::new(
103            &registry_service.default_registry(),
104        ));
105        let mysticeti_client = Arc::new(LazyMysticetiClient::new());
106        let mysticeti_manager = ProtocolManager::new_mysticeti(
107            node_config,
108            consensus_config,
109            registry_service,
110            metrics,
111            mysticeti_client.clone(),
112        );
113        Self {
114            consensus_config: consensus_config.clone(),
115            mysticeti_manager,
116            mysticeti_client,
117            active: parking_lot::Mutex::new(false),
118            consensus_client,
119        }
120    }
121
122    pub fn get_storage_base_path(&self) -> PathBuf {
123        self.consensus_config.db_path().to_path_buf()
124    }
125}
126
127#[async_trait]
128impl ConsensusManagerTrait for ConsensusManager {
129    async fn start(
130        &self,
131        node_config: &NodeConfig,
132        epoch_store: Arc<AuthorityPerEpochStore>,
133        consensus_handler_initializer: ConsensusHandlerInitializer,
134        tx_validator: IotaTxValidator,
135    ) {
136        let protocol_manager = {
137            let mut active = self.active.lock();
138            assert!(!*active, "Cannot start consensus. It is already running!");
139            info!("Starting consensus ...");
140            *active = true;
141            self.consensus_client.set(self.mysticeti_client.clone());
142            &self.mysticeti_manager
143        };
144
145        protocol_manager
146            .start(
147                node_config,
148                epoch_store,
149                consensus_handler_initializer,
150                tx_validator,
151            )
152            .await
153    }
154
155    async fn shutdown(&self) {
156        info!("Shutting down consensus ...");
157        let prev_active = {
158            let mut active = self.active.lock();
159            std::mem::replace(&mut *active, false)
160        };
161        if prev_active {
162            self.mysticeti_manager.shutdown().await;
163        }
164        self.consensus_client.clear();
165    }
166
167    async fn is_running(&self) -> bool {
168        let active = self.active.lock();
169        *active
170    }
171}
172
173/// A ConsensusClient that can be updated internally at any time. This usually
174/// happening during epoch change where a client is set after the new consensus
175/// is started for the new epoch.
176#[derive(Default)]
177pub struct UpdatableConsensusClient {
178    // An extra layer of Arc<> is needed as required by ArcSwapAny.
179    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
180}
181
182impl UpdatableConsensusClient {
183    pub fn new() -> Self {
184        Self {
185            client: ArcSwapOption::empty(),
186        }
187    }
188
189    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
190        const START_TIMEOUT: Duration = Duration::from_secs(30);
191        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
192        if let Ok(client) = timeout(START_TIMEOUT, async {
193            loop {
194                let Some(client) = self.client.load_full() else {
195                    sleep(RETRY_INTERVAL).await;
196                    continue;
197                };
198                return client;
199            }
200        })
201        .await
202        {
203            return client;
204        }
205
206        panic!(
207            "Timed out after {:?} waiting for Consensus to start!",
208            START_TIMEOUT,
209        );
210    }
211
212    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
213        self.client.store(Some(Arc::new(client)));
214    }
215
216    pub fn clear(&self) {
217        self.client.store(None);
218    }
219}
220
221#[async_trait]
222impl ConsensusClient for UpdatableConsensusClient {
223    async fn submit(
224        &self,
225        transactions: &[ConsensusTransaction],
226        epoch_store: &Arc<AuthorityPerEpochStore>,
227    ) -> IotaResult<BlockStatusReceiver> {
228        let client = self.get().await;
229        client.submit(transactions, epoch_store).await
230    }
231}
232
233pub struct ConsensusManagerMetrics {
234    start_latency: IntGauge,
235    shutdown_latency: IntGauge,
236}
237
238impl ConsensusManagerMetrics {
239    pub fn new(registry: &Registry) -> Self {
240        Self {
241            start_latency: register_int_gauge_with_registry!(
242                "consensus_manager_start_latency",
243                "The latency of starting up consensus nodes",
244                registry,
245            )
246            .unwrap(),
247            shutdown_latency: register_int_gauge_with_registry!(
248                "consensus_manager_shutdown_latency",
249                "The latency of shutting down consensus nodes",
250                registry,
251            )
252            .unwrap(),
253        }
254    }
255}
256
257pub(crate) struct RunningLockGuard<'a> {
258    state_guard: MutexGuard<'a, Running>,
259    metrics: &'a ConsensusManagerMetrics,
260    epoch: Option<EpochId>,
261    protocol_version: Option<ProtocolVersion>,
262    start: Instant,
263}
264
265impl<'a> RunningLockGuard<'a> {
266    pub(crate) async fn acquire_start(
267        metrics: &'a ConsensusManagerMetrics,
268        running_mutex: &'a Mutex<Running>,
269        epoch: EpochId,
270        version: ProtocolVersion,
271    ) -> Option<RunningLockGuard<'a>> {
272        let running = running_mutex.lock().await;
273        if let Running::True(epoch, version) = *running {
274            tracing::warn!(
275                "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
276            );
277            return None;
278        }
279
280        tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
281
282        Some(RunningLockGuard {
283            state_guard: running,
284            metrics,
285            start: Instant::now(),
286            epoch: Some(epoch),
287            protocol_version: Some(version),
288        })
289    }
290
291    pub(crate) async fn acquire_shutdown(
292        metrics: &'a ConsensusManagerMetrics,
293        running_mutex: &'a Mutex<Running>,
294    ) -> Option<RunningLockGuard<'a>> {
295        let running = running_mutex.lock().await;
296        if let Running::True(epoch, version) = *running {
297            tracing::info!(
298                "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
299            );
300        } else {
301            tracing::warn!("Consensus shutdown was called but consensus is not running");
302            return None;
303        }
304
305        Some(RunningLockGuard {
306            state_guard: running,
307            metrics,
308            start: Instant::now(),
309            epoch: None,
310            protocol_version: None,
311        })
312    }
313}
314
315impl Drop for RunningLockGuard<'_> {
316    fn drop(&mut self) {
317        match *self.state_guard {
318            // consensus was running and now will have to be marked as shutdown
319            Running::True(epoch, version) => {
320                tracing::info!(
321                    "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
322                    self.start.elapsed().as_secs_f64()
323                );
324
325                self.metrics
326                    .shutdown_latency
327                    .set(self.start.elapsed().as_secs_f64() as i64);
328
329                *self.state_guard = Running::False;
330            }
331            // consensus was not running and now will be marked as started
332            Running::False => {
333                tracing::info!(
334                    "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
335                    self.epoch.unwrap(),
336                    self.protocol_version.unwrap(),
337                    self.start.elapsed().as_secs_f64()
338                );
339
340                self.metrics
341                    .start_latency
342                    .set(self.start.elapsed().as_secs_f64() as i64);
343
344                *self.state_guard =
345                    Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
346            }
347        }
348    }
349}