Skip to main content

iota_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    path::PathBuf,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use fastcrypto::traits::KeyPair as _;
14use iota_config::{ConsensusConfig, NodeConfig};
15use iota_metrics::RegistryService;
16use iota_protocol_config::ProtocolVersion;
17use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
18use prometheus_filtered::{IntGauge, Registry, register_int_gauge_with_registry};
19use starfish_core::CommitConsumerMonitor;
20use tokio::{
21    sync::{Mutex, MutexGuard, broadcast},
22    time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27    authority::authority_per_epoch_store::AuthorityPerEpochStore,
28    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29    consensus_handler::ConsensusHandlerInitializer,
30    consensus_manager::starfish_manager::StarfishManager,
31    consensus_validator::IotaTxValidator,
32    starfish_adapter::LazyStarfishClient,
33};
34
35pub mod starfish_manager;
36
37#[derive(PartialEq)]
38pub(crate) enum Running {
39    True(EpochId, ProtocolVersion),
40    False,
41}
42
43#[async_trait]
44pub trait ConsensusManagerTrait {
45    async fn start(
46        &self,
47        node_config: &NodeConfig,
48        epoch_store: Arc<AuthorityPerEpochStore>,
49        consensus_handler_initializer: ConsensusHandlerInitializer,
50        tx_validator: IotaTxValidator,
51    );
52
53    async fn shutdown(&self);
54
55    async fn is_running(&self) -> bool;
56
57    fn replay_waiter(&self) -> ReplayWaiter;
58}
59
60/// Used by IOTA validator to start consensus protocol for each epoch.
61pub struct ConsensusManager {
62    consensus_config: ConsensusConfig,
63    starfish_manager: StarfishManager,
64}
65
66impl ConsensusManager {
67    pub fn new(
68        node_config: &NodeConfig,
69        consensus_config: &ConsensusConfig,
70        registry_service: &RegistryService,
71        metrics_registry: &Registry,
72        consensus_client: Arc<UpdatableConsensusClient>,
73    ) -> Self {
74        let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
75        let starfish_client = Arc::new(LazyStarfishClient::new());
76        consensus_client.set(starfish_client.clone());
77        let starfish_manager = StarfishManager::new(
78            node_config.protocol_key_pair().copy(),
79            node_config.network_key_pair().copy(),
80            consensus_config.db_path().to_path_buf(),
81            registry_service.clone(),
82            metrics,
83            starfish_client,
84        );
85        Self {
86            consensus_config: consensus_config.clone(),
87            starfish_manager,
88        }
89    }
90
91    pub fn get_storage_base_path(&self) -> PathBuf {
92        self.consensus_config.db_path().to_path_buf()
93    }
94}
95
96#[async_trait]
97impl ConsensusManagerTrait for ConsensusManager {
98    async fn start(
99        &self,
100        node_config: &NodeConfig,
101        epoch_store: Arc<AuthorityPerEpochStore>,
102        consensus_handler_initializer: ConsensusHandlerInitializer,
103        tx_validator: IotaTxValidator,
104    ) {
105        info!("Starting consensus protocol Starfish ...");
106        self.starfish_manager
107            .start(
108                node_config,
109                epoch_store,
110                consensus_handler_initializer,
111                tx_validator,
112            )
113            .await
114    }
115
116    async fn shutdown(&self) {
117        info!("Shutting down consensus ...");
118        self.starfish_manager.shutdown().await;
119    }
120
121    async fn is_running(&self) -> bool {
122        self.starfish_manager.is_running().await
123    }
124
125    fn replay_waiter(&self) -> ReplayWaiter {
126        self.starfish_manager.replay_waiter()
127    }
128}
129
130/// A ConsensusClient that can be updated internally at any time. This usually
131/// happening during epoch change where a client is set after the new consensus
132/// is started for the new epoch.
133#[derive(Default)]
134pub struct UpdatableConsensusClient {
135    // An extra layer of Arc<> is needed as required by ArcSwapAny.
136    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
137}
138
139impl UpdatableConsensusClient {
140    pub fn new() -> Self {
141        Self {
142            client: ArcSwapOption::empty(),
143        }
144    }
145
146    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
147        const START_TIMEOUT: Duration = Duration::from_secs(30);
148        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
149        if let Ok(client) = timeout(START_TIMEOUT, async {
150            loop {
151                let Some(client) = self.client.load_full() else {
152                    sleep(RETRY_INTERVAL).await;
153                    continue;
154                };
155                return client;
156            }
157        })
158        .await
159        {
160            return client;
161        }
162
163        panic!("Timed out after {START_TIMEOUT:?} waiting for Consensus to start!",);
164    }
165
166    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
167        self.client.store(Some(Arc::new(client)));
168    }
169
170    pub fn clear(&self) {
171        self.client.store(None);
172    }
173}
174
175#[async_trait]
176impl ConsensusClient for UpdatableConsensusClient {
177    async fn submit(
178        &self,
179        transactions: &[ConsensusTransaction],
180        epoch_store: &Arc<AuthorityPerEpochStore>,
181    ) -> IotaResult<BlockStatusReceiver> {
182        let client = self.get().await;
183        client.submit(transactions, epoch_store).await
184    }
185}
186
187/// Waits for consensus to finish replaying at consensus handler.
188pub struct ReplayWaiter {
189    consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
190}
191
192impl ReplayWaiter {
193    pub(crate) fn new(
194        consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
195    ) -> Self {
196        Self {
197            consumer_monitor_receiver,
198        }
199    }
200
201    pub(crate) async fn wait_for_replay(mut self) {
202        loop {
203            info!("Waiting for consensus to start replaying ...");
204            let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
205                continue;
206            };
207            info!("Waiting for consensus handler to finish replaying ...");
208            monitor.replay_complete().await;
209            break;
210        }
211    }
212}
213
214impl Clone for ReplayWaiter {
215    fn clone(&self) -> Self {
216        Self {
217            consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
218        }
219    }
220}
221
222pub struct ConsensusManagerMetrics {
223    start_latency: IntGauge,
224    shutdown_latency: IntGauge,
225}
226
227impl ConsensusManagerMetrics {
228    pub fn new(registry: &Registry) -> Self {
229        Self {
230            start_latency: register_int_gauge_with_registry!(
231                "consensus_manager_start_latency",
232                "The latency of starting up consensus nodes",
233                registry,
234            )
235            .unwrap(),
236            shutdown_latency: register_int_gauge_with_registry!(
237                "consensus_manager_shutdown_latency",
238                "The latency of shutting down consensus nodes",
239                registry,
240            )
241            .unwrap(),
242        }
243    }
244}
245
246pub(crate) struct RunningLockGuard<'a> {
247    state_guard: MutexGuard<'a, Running>,
248    metrics: &'a ConsensusManagerMetrics,
249    epoch: Option<EpochId>,
250    protocol_version: Option<ProtocolVersion>,
251    start: Instant,
252}
253
254impl<'a> RunningLockGuard<'a> {
255    pub(crate) async fn acquire_start(
256        metrics: &'a ConsensusManagerMetrics,
257        running_mutex: &'a Mutex<Running>,
258        epoch: EpochId,
259        version: ProtocolVersion,
260    ) -> Option<RunningLockGuard<'a>> {
261        let running = running_mutex.lock().await;
262        if let Running::True(epoch, version) = *running {
263            tracing::warn!(
264                "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
265            );
266            return None;
267        }
268
269        tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
270
271        Some(RunningLockGuard {
272            state_guard: running,
273            metrics,
274            start: Instant::now(),
275            epoch: Some(epoch),
276            protocol_version: Some(version),
277        })
278    }
279
280    pub(crate) async fn acquire_shutdown(
281        metrics: &'a ConsensusManagerMetrics,
282        running_mutex: &'a Mutex<Running>,
283    ) -> Option<RunningLockGuard<'a>> {
284        let running = running_mutex.lock().await;
285        if let Running::True(epoch, version) = *running {
286            tracing::info!(
287                "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
288            );
289        } else {
290            tracing::warn!("Consensus shutdown was called but consensus is not running");
291            return None;
292        }
293
294        Some(RunningLockGuard {
295            state_guard: running,
296            metrics,
297            start: Instant::now(),
298            epoch: None,
299            protocol_version: None,
300        })
301    }
302}
303
304impl Drop for RunningLockGuard<'_> {
305    fn drop(&mut self) {
306        match *self.state_guard {
307            // consensus was running and now will have to be marked as shutdown
308            Running::True(epoch, version) => {
309                tracing::info!(
310                    "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
311                    self.start.elapsed().as_secs_f64()
312                );
313
314                self.metrics
315                    .shutdown_latency
316                    .set(self.start.elapsed().as_secs_f64() as i64);
317
318                *self.state_guard = Running::False;
319            }
320            // consensus was not running and now will be marked as started
321            Running::False => {
322                tracing::info!(
323                    "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
324                    self.epoch.unwrap(),
325                    self.protocol_version.unwrap(),
326                    self.start.elapsed().as_secs_f64()
327                );
328
329                self.metrics
330                    .start_latency
331                    .set(self.start.elapsed().as_secs_f64() as i64);
332
333                *self.state_guard =
334                    Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
335            }
336        }
337    }
338}