iota_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    path::PathBuf,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use fastcrypto::traits::KeyPair as _;
14use iota_config::{ConsensusConfig, NodeConfig};
15use iota_metrics::RegistryService;
16use iota_protocol_config::ProtocolVersion;
17use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
18use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
19use tokio::{
20    sync::{Mutex, MutexGuard},
21    time::{sleep, timeout},
22};
23use tracing::info;
24
25use crate::{
26    authority::authority_per_epoch_store::AuthorityPerEpochStore,
27    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
28    consensus_handler::ConsensusHandlerInitializer,
29    consensus_manager::starfish_manager::StarfishManager,
30    consensus_validator::IotaTxValidator,
31    starfish_adapter::LazyStarfishClient,
32};
33
34pub mod starfish_manager;
35
36#[derive(PartialEq)]
37pub(crate) enum Running {
38    True(EpochId, ProtocolVersion),
39    False,
40}
41
42#[async_trait]
43pub trait ConsensusManagerTrait {
44    async fn start(
45        &self,
46        node_config: &NodeConfig,
47        epoch_store: Arc<AuthorityPerEpochStore>,
48        consensus_handler_initializer: ConsensusHandlerInitializer,
49        tx_validator: IotaTxValidator,
50    );
51
52    async fn shutdown(&self);
53
54    async fn is_running(&self) -> bool;
55}
56
57/// Used by IOTA validator to start consensus protocol for each epoch.
58pub struct ConsensusManager {
59    consensus_config: ConsensusConfig,
60    starfish_manager: StarfishManager,
61}
62
63impl ConsensusManager {
64    pub fn new(
65        node_config: &NodeConfig,
66        consensus_config: &ConsensusConfig,
67        registry_service: &RegistryService,
68        metrics_registry: &Registry,
69        consensus_client: Arc<UpdatableConsensusClient>,
70    ) -> Self {
71        let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
72        let starfish_client = Arc::new(LazyStarfishClient::new());
73        consensus_client.set(starfish_client.clone());
74        let starfish_manager = StarfishManager::new(
75            node_config.protocol_key_pair().copy(),
76            node_config.network_key_pair().copy(),
77            consensus_config.db_path().to_path_buf(),
78            registry_service.clone(),
79            metrics,
80            starfish_client,
81        );
82        Self {
83            consensus_config: consensus_config.clone(),
84            starfish_manager,
85        }
86    }
87
88    pub fn get_storage_base_path(&self) -> PathBuf {
89        self.consensus_config.db_path().to_path_buf()
90    }
91}
92
93#[async_trait]
94impl ConsensusManagerTrait for ConsensusManager {
95    async fn start(
96        &self,
97        node_config: &NodeConfig,
98        epoch_store: Arc<AuthorityPerEpochStore>,
99        consensus_handler_initializer: ConsensusHandlerInitializer,
100        tx_validator: IotaTxValidator,
101    ) {
102        info!("Starting consensus protocol Starfish ...");
103        self.starfish_manager
104            .start(
105                node_config,
106                epoch_store,
107                consensus_handler_initializer,
108                tx_validator,
109            )
110            .await
111    }
112
113    async fn shutdown(&self) {
114        info!("Shutting down consensus ...");
115        self.starfish_manager.shutdown().await;
116    }
117
118    async fn is_running(&self) -> bool {
119        self.starfish_manager.is_running().await
120    }
121}
122
123/// A ConsensusClient that can be updated internally at any time. This usually
124/// happening during epoch change where a client is set after the new consensus
125/// is started for the new epoch.
126#[derive(Default)]
127pub struct UpdatableConsensusClient {
128    // An extra layer of Arc<> is needed as required by ArcSwapAny.
129    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
130}
131
132impl UpdatableConsensusClient {
133    pub fn new() -> Self {
134        Self {
135            client: ArcSwapOption::empty(),
136        }
137    }
138
139    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
140        const START_TIMEOUT: Duration = Duration::from_secs(30);
141        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
142        if let Ok(client) = timeout(START_TIMEOUT, async {
143            loop {
144                let Some(client) = self.client.load_full() else {
145                    sleep(RETRY_INTERVAL).await;
146                    continue;
147                };
148                return client;
149            }
150        })
151        .await
152        {
153            return client;
154        }
155
156        panic!("Timed out after {START_TIMEOUT:?} waiting for Consensus to start!",);
157    }
158
159    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
160        self.client.store(Some(Arc::new(client)));
161    }
162
163    pub fn clear(&self) {
164        self.client.store(None);
165    }
166}
167
168#[async_trait]
169impl ConsensusClient for UpdatableConsensusClient {
170    async fn submit(
171        &self,
172        transactions: &[ConsensusTransaction],
173        epoch_store: &Arc<AuthorityPerEpochStore>,
174    ) -> IotaResult<BlockStatusReceiver> {
175        let client = self.get().await;
176        client.submit(transactions, epoch_store).await
177    }
178}
179
180pub struct ConsensusManagerMetrics {
181    start_latency: IntGauge,
182    shutdown_latency: IntGauge,
183}
184
185impl ConsensusManagerMetrics {
186    pub fn new(registry: &Registry) -> Self {
187        Self {
188            start_latency: register_int_gauge_with_registry!(
189                "consensus_manager_start_latency",
190                "The latency of starting up consensus nodes",
191                registry,
192            )
193            .unwrap(),
194            shutdown_latency: register_int_gauge_with_registry!(
195                "consensus_manager_shutdown_latency",
196                "The latency of shutting down consensus nodes",
197                registry,
198            )
199            .unwrap(),
200        }
201    }
202}
203
204pub(crate) struct RunningLockGuard<'a> {
205    state_guard: MutexGuard<'a, Running>,
206    metrics: &'a ConsensusManagerMetrics,
207    epoch: Option<EpochId>,
208    protocol_version: Option<ProtocolVersion>,
209    start: Instant,
210}
211
212impl<'a> RunningLockGuard<'a> {
213    pub(crate) async fn acquire_start(
214        metrics: &'a ConsensusManagerMetrics,
215        running_mutex: &'a Mutex<Running>,
216        epoch: EpochId,
217        version: ProtocolVersion,
218    ) -> Option<RunningLockGuard<'a>> {
219        let running = running_mutex.lock().await;
220        if let Running::True(epoch, version) = *running {
221            tracing::warn!(
222                "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
223            );
224            return None;
225        }
226
227        tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
228
229        Some(RunningLockGuard {
230            state_guard: running,
231            metrics,
232            start: Instant::now(),
233            epoch: Some(epoch),
234            protocol_version: Some(version),
235        })
236    }
237
238    pub(crate) async fn acquire_shutdown(
239        metrics: &'a ConsensusManagerMetrics,
240        running_mutex: &'a Mutex<Running>,
241    ) -> Option<RunningLockGuard<'a>> {
242        let running = running_mutex.lock().await;
243        if let Running::True(epoch, version) = *running {
244            tracing::info!(
245                "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
246            );
247        } else {
248            tracing::warn!("Consensus shutdown was called but consensus is not running");
249            return None;
250        }
251
252        Some(RunningLockGuard {
253            state_guard: running,
254            metrics,
255            start: Instant::now(),
256            epoch: None,
257            protocol_version: None,
258        })
259    }
260}
261
262impl Drop for RunningLockGuard<'_> {
263    fn drop(&mut self) {
264        match *self.state_guard {
265            // consensus was running and now will have to be marked as shutdown
266            Running::True(epoch, version) => {
267                tracing::info!(
268                    "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
269                    self.start.elapsed().as_secs_f64()
270                );
271
272                self.metrics
273                    .shutdown_latency
274                    .set(self.start.elapsed().as_secs_f64() as i64);
275
276                *self.state_guard = Running::False;
277            }
278            // consensus was not running and now will be marked as started
279            Running::False => {
280                tracing::info!(
281                    "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
282                    self.epoch.unwrap(),
283                    self.protocol_version.unwrap(),
284                    self.start.elapsed().as_secs_f64()
285                );
286
287                self.metrics
288                    .start_latency
289                    .set(self.start.elapsed().as_secs_f64() as i64);
290
291                *self.state_guard =
292                    Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
293            }
294        }
295    }
296}