iota_core/consensus_manager/
mod.rs1use std::{
6 path::PathBuf,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use enum_dispatch::enum_dispatch;
14use fastcrypto::traits::KeyPair as _;
15use iota_config::{ConsensusConfig, NodeConfig};
16use iota_metrics::RegistryService;
17use iota_protocol_config::ProtocolVersion;
18use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tokio::{
21 sync::{Mutex, MutexGuard},
22 time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27 authority::authority_per_epoch_store::AuthorityPerEpochStore,
28 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29 consensus_handler::ConsensusHandlerInitializer,
30 consensus_manager::mysticeti_manager::MysticetiManager,
31 consensus_validator::IotaTxValidator,
32 mysticeti_adapter::LazyMysticetiClient,
33};
34
35pub mod mysticeti_manager;
36
37#[derive(PartialEq)]
38pub(crate) enum Running {
39 True(EpochId, ProtocolVersion),
40 False,
41}
42
43#[async_trait]
44#[enum_dispatch(ProtocolManager)]
45pub trait ConsensusManagerTrait {
46 async fn start(
47 &self,
48 node_config: &NodeConfig,
49 epoch_store: Arc<AuthorityPerEpochStore>,
50 consensus_handler_initializer: ConsensusHandlerInitializer,
51 tx_validator: IotaTxValidator,
52 );
53
54 async fn shutdown(&self);
55
56 async fn is_running(&self) -> bool;
57}
58
59#[enum_dispatch]
62enum ProtocolManager {
63 Mysticeti(MysticetiManager),
64}
65
66impl ProtocolManager {
67 pub fn new_mysticeti(
69 config: &NodeConfig,
70 consensus_config: &ConsensusConfig,
71 registry_service: &RegistryService,
72 metrics: Arc<ConsensusManagerMetrics>,
73 client: Arc<LazyMysticetiClient>,
74 ) -> Self {
75 Self::Mysticeti(MysticetiManager::new(
76 config.protocol_key_pair().copy(),
77 config.network_key_pair().copy(),
78 consensus_config.db_path().to_path_buf(),
79 registry_service.clone(),
80 metrics,
81 client,
82 ))
83 }
84}
85
86pub struct ConsensusManager {
88 consensus_config: ConsensusConfig,
89 mysticeti_manager: ProtocolManager,
90 mysticeti_client: Arc<LazyMysticetiClient>,
91 active: parking_lot::Mutex<bool>,
92 consensus_client: Arc<UpdatableConsensusClient>,
93}
94
95impl ConsensusManager {
96 pub fn new(
97 node_config: &NodeConfig,
98 consensus_config: &ConsensusConfig,
99 registry_service: &RegistryService,
100 consensus_client: Arc<UpdatableConsensusClient>,
101 ) -> Self {
102 let metrics = Arc::new(ConsensusManagerMetrics::new(
103 ®istry_service.default_registry(),
104 ));
105 let mysticeti_client = Arc::new(LazyMysticetiClient::new());
106 let mysticeti_manager = ProtocolManager::new_mysticeti(
107 node_config,
108 consensus_config,
109 registry_service,
110 metrics,
111 mysticeti_client.clone(),
112 );
113 Self {
114 consensus_config: consensus_config.clone(),
115 mysticeti_manager,
116 mysticeti_client,
117 active: parking_lot::Mutex::new(false),
118 consensus_client,
119 }
120 }
121
122 pub fn get_storage_base_path(&self) -> PathBuf {
123 self.consensus_config.db_path().to_path_buf()
124 }
125}
126
127#[async_trait]
128impl ConsensusManagerTrait for ConsensusManager {
129 async fn start(
130 &self,
131 node_config: &NodeConfig,
132 epoch_store: Arc<AuthorityPerEpochStore>,
133 consensus_handler_initializer: ConsensusHandlerInitializer,
134 tx_validator: IotaTxValidator,
135 ) {
136 let protocol_manager = {
137 let mut active = self.active.lock();
138 assert!(!*active, "Cannot start consensus. It is already running!");
139 info!("Starting consensus ...");
140 *active = true;
141 self.consensus_client.set(self.mysticeti_client.clone());
142 &self.mysticeti_manager
143 };
144
145 protocol_manager
146 .start(
147 node_config,
148 epoch_store,
149 consensus_handler_initializer,
150 tx_validator,
151 )
152 .await
153 }
154
155 async fn shutdown(&self) {
156 info!("Shutting down consensus ...");
157 let prev_active = {
158 let mut active = self.active.lock();
159 std::mem::replace(&mut *active, false)
160 };
161 if prev_active {
162 self.mysticeti_manager.shutdown().await;
163 }
164 self.consensus_client.clear();
165 }
166
167 async fn is_running(&self) -> bool {
168 let active = self.active.lock();
169 *active
170 }
171}
172
173#[derive(Default)]
177pub struct UpdatableConsensusClient {
178 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
180}
181
182impl UpdatableConsensusClient {
183 pub fn new() -> Self {
184 Self {
185 client: ArcSwapOption::empty(),
186 }
187 }
188
189 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
190 const START_TIMEOUT: Duration = Duration::from_secs(30);
191 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
192 if let Ok(client) = timeout(START_TIMEOUT, async {
193 loop {
194 let Some(client) = self.client.load_full() else {
195 sleep(RETRY_INTERVAL).await;
196 continue;
197 };
198 return client;
199 }
200 })
201 .await
202 {
203 return client;
204 }
205
206 panic!(
207 "Timed out after {:?} waiting for Consensus to start!",
208 START_TIMEOUT,
209 );
210 }
211
212 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
213 self.client.store(Some(Arc::new(client)));
214 }
215
216 pub fn clear(&self) {
217 self.client.store(None);
218 }
219}
220
221#[async_trait]
222impl ConsensusClient for UpdatableConsensusClient {
223 async fn submit(
224 &self,
225 transactions: &[ConsensusTransaction],
226 epoch_store: &Arc<AuthorityPerEpochStore>,
227 ) -> IotaResult<BlockStatusReceiver> {
228 let client = self.get().await;
229 client.submit(transactions, epoch_store).await
230 }
231}
232
233pub struct ConsensusManagerMetrics {
234 start_latency: IntGauge,
235 shutdown_latency: IntGauge,
236}
237
238impl ConsensusManagerMetrics {
239 pub fn new(registry: &Registry) -> Self {
240 Self {
241 start_latency: register_int_gauge_with_registry!(
242 "consensus_manager_start_latency",
243 "The latency of starting up consensus nodes",
244 registry,
245 )
246 .unwrap(),
247 shutdown_latency: register_int_gauge_with_registry!(
248 "consensus_manager_shutdown_latency",
249 "The latency of shutting down consensus nodes",
250 registry,
251 )
252 .unwrap(),
253 }
254 }
255}
256
257pub(crate) struct RunningLockGuard<'a> {
258 state_guard: MutexGuard<'a, Running>,
259 metrics: &'a ConsensusManagerMetrics,
260 epoch: Option<EpochId>,
261 protocol_version: Option<ProtocolVersion>,
262 start: Instant,
263}
264
265impl<'a> RunningLockGuard<'a> {
266 pub(crate) async fn acquire_start(
267 metrics: &'a ConsensusManagerMetrics,
268 running_mutex: &'a Mutex<Running>,
269 epoch: EpochId,
270 version: ProtocolVersion,
271 ) -> Option<RunningLockGuard<'a>> {
272 let running = running_mutex.lock().await;
273 if let Running::True(epoch, version) = *running {
274 tracing::warn!(
275 "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
276 );
277 return None;
278 }
279
280 tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
281
282 Some(RunningLockGuard {
283 state_guard: running,
284 metrics,
285 start: Instant::now(),
286 epoch: Some(epoch),
287 protocol_version: Some(version),
288 })
289 }
290
291 pub(crate) async fn acquire_shutdown(
292 metrics: &'a ConsensusManagerMetrics,
293 running_mutex: &'a Mutex<Running>,
294 ) -> Option<RunningLockGuard<'a>> {
295 let running = running_mutex.lock().await;
296 if let Running::True(epoch, version) = *running {
297 tracing::info!(
298 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
299 );
300 } else {
301 tracing::warn!("Consensus shutdown was called but consensus is not running");
302 return None;
303 }
304
305 Some(RunningLockGuard {
306 state_guard: running,
307 metrics,
308 start: Instant::now(),
309 epoch: None,
310 protocol_version: None,
311 })
312 }
313}
314
315impl Drop for RunningLockGuard<'_> {
316 fn drop(&mut self) {
317 match *self.state_guard {
318 Running::True(epoch, version) => {
320 tracing::info!(
321 "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
322 self.start.elapsed().as_secs_f64()
323 );
324
325 self.metrics
326 .shutdown_latency
327 .set(self.start.elapsed().as_secs_f64() as i64);
328
329 *self.state_guard = Running::False;
330 }
331 Running::False => {
333 tracing::info!(
334 "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
335 self.epoch.unwrap(),
336 self.protocol_version.unwrap(),
337 self.start.elapsed().as_secs_f64()
338 );
339
340 self.metrics
341 .start_latency
342 .set(self.start.elapsed().as_secs_f64() as i64);
343
344 *self.state_guard =
345 Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
346 }
347 }
348 }
349}