iota_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    path::PathBuf,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use enum_dispatch::enum_dispatch;
14use fastcrypto::traits::KeyPair as _;
15use iota_config::{ConsensusConfig, NodeConfig};
16use iota_metrics::RegistryService;
17use iota_protocol_config::ProtocolVersion;
18use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tokio::{
21    sync::{Mutex, MutexGuard},
22    time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27    authority::authority_per_epoch_store::AuthorityPerEpochStore,
28    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29    consensus_handler::ConsensusHandlerInitializer,
30    consensus_manager::mysticeti_manager::MysticetiManager,
31    consensus_validator::IotaTxValidator,
32    mysticeti_adapter::LazyMysticetiClient,
33};
34
35pub mod mysticeti_manager;
36
37#[derive(PartialEq)]
38pub(crate) enum Running {
39    True(EpochId, ProtocolVersion),
40    False,
41}
42
43#[async_trait]
44#[enum_dispatch(ProtocolManager)]
45pub trait ConsensusManagerTrait {
46    async fn start(
47        &self,
48        node_config: &NodeConfig,
49        epoch_store: Arc<AuthorityPerEpochStore>,
50        consensus_handler_initializer: ConsensusHandlerInitializer,
51        tx_validator: IotaTxValidator,
52    );
53
54    async fn shutdown(&self);
55
56    async fn is_running(&self) -> bool;
57}
58
59// Wraps the underlying consensus protocol managers to make calling
60// the ConsensusManagerTrait easier.
61#[enum_dispatch]
62enum ProtocolManager {
63    Mysticeti(MysticetiManager),
64}
65
66impl ProtocolManager {
67    /// Creates a new mysticeti manager.
68    pub fn new_mysticeti(
69        config: &NodeConfig,
70        consensus_config: &ConsensusConfig,
71        registry_service: &RegistryService,
72        metrics: Arc<ConsensusManagerMetrics>,
73        client: Arc<LazyMysticetiClient>,
74    ) -> Self {
75        Self::Mysticeti(MysticetiManager::new(
76            config.protocol_key_pair().copy(),
77            config.network_key_pair().copy(),
78            consensus_config.db_path().to_path_buf(),
79            registry_service.clone(),
80            metrics,
81            client,
82        ))
83    }
84}
85
86/// Used by IOTA validator to start consensus protocol for each epoch.
87pub struct ConsensusManager {
88    consensus_config: ConsensusConfig,
89    mysticeti_manager: ProtocolManager,
90    mysticeti_client: Arc<LazyMysticetiClient>,
91    active: parking_lot::Mutex<bool>,
92    consensus_client: Arc<UpdatableConsensusClient>,
93}
94
95impl ConsensusManager {
96    pub fn new(
97        node_config: &NodeConfig,
98        consensus_config: &ConsensusConfig,
99        registry_service: &RegistryService,
100        metrics_registry: &Registry,
101        consensus_client: Arc<UpdatableConsensusClient>,
102    ) -> Self {
103        let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
104        let mysticeti_client = Arc::new(LazyMysticetiClient::new());
105        let mysticeti_manager = ProtocolManager::new_mysticeti(
106            node_config,
107            consensus_config,
108            registry_service,
109            metrics,
110            mysticeti_client.clone(),
111        );
112        Self {
113            consensus_config: consensus_config.clone(),
114            mysticeti_manager,
115            mysticeti_client,
116            active: parking_lot::Mutex::new(false),
117            consensus_client,
118        }
119    }
120
121    pub fn get_storage_base_path(&self) -> PathBuf {
122        self.consensus_config.db_path().to_path_buf()
123    }
124}
125
126#[async_trait]
127impl ConsensusManagerTrait for ConsensusManager {
128    async fn start(
129        &self,
130        node_config: &NodeConfig,
131        epoch_store: Arc<AuthorityPerEpochStore>,
132        consensus_handler_initializer: ConsensusHandlerInitializer,
133        tx_validator: IotaTxValidator,
134    ) {
135        let protocol_manager = {
136            let mut active = self.active.lock();
137            assert!(!*active, "Cannot start consensus. It is already running!");
138            info!("Starting consensus ...");
139            *active = true;
140            self.consensus_client.set(self.mysticeti_client.clone());
141            &self.mysticeti_manager
142        };
143
144        protocol_manager
145            .start(
146                node_config,
147                epoch_store,
148                consensus_handler_initializer,
149                tx_validator,
150            )
151            .await
152    }
153
154    async fn shutdown(&self) {
155        info!("Shutting down consensus ...");
156        let prev_active = {
157            let mut active = self.active.lock();
158            std::mem::replace(&mut *active, false)
159        };
160        if prev_active {
161            self.mysticeti_manager.shutdown().await;
162        }
163        self.consensus_client.clear();
164    }
165
166    async fn is_running(&self) -> bool {
167        let active = self.active.lock();
168        *active
169    }
170}
171
172/// A ConsensusClient that can be updated internally at any time. This usually
173/// happening during epoch change where a client is set after the new consensus
174/// is started for the new epoch.
175#[derive(Default)]
176pub struct UpdatableConsensusClient {
177    // An extra layer of Arc<> is needed as required by ArcSwapAny.
178    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
179}
180
181impl UpdatableConsensusClient {
182    pub fn new() -> Self {
183        Self {
184            client: ArcSwapOption::empty(),
185        }
186    }
187
188    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
189        const START_TIMEOUT: Duration = Duration::from_secs(30);
190        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
191        if let Ok(client) = timeout(START_TIMEOUT, async {
192            loop {
193                let Some(client) = self.client.load_full() else {
194                    sleep(RETRY_INTERVAL).await;
195                    continue;
196                };
197                return client;
198            }
199        })
200        .await
201        {
202            return client;
203        }
204
205        panic!(
206            "Timed out after {:?} waiting for Consensus to start!",
207            START_TIMEOUT,
208        );
209    }
210
211    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
212        self.client.store(Some(Arc::new(client)));
213    }
214
215    pub fn clear(&self) {
216        self.client.store(None);
217    }
218}
219
220#[async_trait]
221impl ConsensusClient for UpdatableConsensusClient {
222    async fn submit(
223        &self,
224        transactions: &[ConsensusTransaction],
225        epoch_store: &Arc<AuthorityPerEpochStore>,
226    ) -> IotaResult<BlockStatusReceiver> {
227        let client = self.get().await;
228        client.submit(transactions, epoch_store).await
229    }
230}
231
232pub struct ConsensusManagerMetrics {
233    start_latency: IntGauge,
234    shutdown_latency: IntGauge,
235}
236
237impl ConsensusManagerMetrics {
238    pub fn new(registry: &Registry) -> Self {
239        Self {
240            start_latency: register_int_gauge_with_registry!(
241                "consensus_manager_start_latency",
242                "The latency of starting up consensus nodes",
243                registry,
244            )
245            .unwrap(),
246            shutdown_latency: register_int_gauge_with_registry!(
247                "consensus_manager_shutdown_latency",
248                "The latency of shutting down consensus nodes",
249                registry,
250            )
251            .unwrap(),
252        }
253    }
254}
255
256pub(crate) struct RunningLockGuard<'a> {
257    state_guard: MutexGuard<'a, Running>,
258    metrics: &'a ConsensusManagerMetrics,
259    epoch: Option<EpochId>,
260    protocol_version: Option<ProtocolVersion>,
261    start: Instant,
262}
263
264impl<'a> RunningLockGuard<'a> {
265    pub(crate) async fn acquire_start(
266        metrics: &'a ConsensusManagerMetrics,
267        running_mutex: &'a Mutex<Running>,
268        epoch: EpochId,
269        version: ProtocolVersion,
270    ) -> Option<RunningLockGuard<'a>> {
271        let running = running_mutex.lock().await;
272        if let Running::True(epoch, version) = *running {
273            tracing::warn!(
274                "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
275            );
276            return None;
277        }
278
279        tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
280
281        Some(RunningLockGuard {
282            state_guard: running,
283            metrics,
284            start: Instant::now(),
285            epoch: Some(epoch),
286            protocol_version: Some(version),
287        })
288    }
289
290    pub(crate) async fn acquire_shutdown(
291        metrics: &'a ConsensusManagerMetrics,
292        running_mutex: &'a Mutex<Running>,
293    ) -> Option<RunningLockGuard<'a>> {
294        let running = running_mutex.lock().await;
295        if let Running::True(epoch, version) = *running {
296            tracing::info!(
297                "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
298            );
299        } else {
300            tracing::warn!("Consensus shutdown was called but consensus is not running");
301            return None;
302        }
303
304        Some(RunningLockGuard {
305            state_guard: running,
306            metrics,
307            start: Instant::now(),
308            epoch: None,
309            protocol_version: None,
310        })
311    }
312}
313
314impl Drop for RunningLockGuard<'_> {
315    fn drop(&mut self) {
316        match *self.state_guard {
317            // consensus was running and now will have to be marked as shutdown
318            Running::True(epoch, version) => {
319                tracing::info!(
320                    "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
321                    self.start.elapsed().as_secs_f64()
322                );
323
324                self.metrics
325                    .shutdown_latency
326                    .set(self.start.elapsed().as_secs_f64() as i64);
327
328                *self.state_guard = Running::False;
329            }
330            // consensus was not running and now will be marked as started
331            Running::False => {
332                tracing::info!(
333                    "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
334                    self.epoch.unwrap(),
335                    self.protocol_version.unwrap(),
336                    self.start.elapsed().as_secs_f64()
337                );
338
339                self.metrics
340                    .start_latency
341                    .set(self.start.elapsed().as_secs_f64() as i64);
342
343                *self.state_guard =
344                    Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
345            }
346        }
347    }
348}