iota_core/consensus_manager/
mod.rs1use std::{
6 path::PathBuf,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use enum_dispatch::enum_dispatch;
14use fastcrypto::traits::KeyPair as _;
15use iota_config::{ConsensusConfig, NodeConfig};
16use iota_metrics::RegistryService;
17use iota_protocol_config::ProtocolVersion;
18use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tokio::{
21 sync::{Mutex, MutexGuard},
22 time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27 authority::authority_per_epoch_store::AuthorityPerEpochStore,
28 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29 consensus_handler::ConsensusHandlerInitializer,
30 consensus_manager::mysticeti_manager::MysticetiManager,
31 consensus_validator::IotaTxValidator,
32 mysticeti_adapter::LazyMysticetiClient,
33};
34
35pub mod mysticeti_manager;
36
37#[derive(PartialEq)]
38pub(crate) enum Running {
39 True(EpochId, ProtocolVersion),
40 False,
41}
42
43#[async_trait]
44#[enum_dispatch(ProtocolManager)]
45pub trait ConsensusManagerTrait {
46 async fn start(
47 &self,
48 node_config: &NodeConfig,
49 epoch_store: Arc<AuthorityPerEpochStore>,
50 consensus_handler_initializer: ConsensusHandlerInitializer,
51 tx_validator: IotaTxValidator,
52 );
53
54 async fn shutdown(&self);
55
56 async fn is_running(&self) -> bool;
57}
58
59#[enum_dispatch]
62enum ProtocolManager {
63 Mysticeti(MysticetiManager),
64}
65
66impl ProtocolManager {
67 pub fn new_mysticeti(
69 config: &NodeConfig,
70 consensus_config: &ConsensusConfig,
71 registry_service: &RegistryService,
72 metrics: Arc<ConsensusManagerMetrics>,
73 client: Arc<LazyMysticetiClient>,
74 ) -> Self {
75 Self::Mysticeti(MysticetiManager::new(
76 config.protocol_key_pair().copy(),
77 config.network_key_pair().copy(),
78 consensus_config.db_path().to_path_buf(),
79 registry_service.clone(),
80 metrics,
81 client,
82 ))
83 }
84}
85
86pub struct ConsensusManager {
88 consensus_config: ConsensusConfig,
89 mysticeti_manager: ProtocolManager,
90 mysticeti_client: Arc<LazyMysticetiClient>,
91 active: parking_lot::Mutex<bool>,
92 consensus_client: Arc<UpdatableConsensusClient>,
93}
94
95impl ConsensusManager {
96 pub fn new(
97 node_config: &NodeConfig,
98 consensus_config: &ConsensusConfig,
99 registry_service: &RegistryService,
100 metrics_registry: &Registry,
101 consensus_client: Arc<UpdatableConsensusClient>,
102 ) -> Self {
103 let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
104 let mysticeti_client = Arc::new(LazyMysticetiClient::new());
105 let mysticeti_manager = ProtocolManager::new_mysticeti(
106 node_config,
107 consensus_config,
108 registry_service,
109 metrics,
110 mysticeti_client.clone(),
111 );
112 Self {
113 consensus_config: consensus_config.clone(),
114 mysticeti_manager,
115 mysticeti_client,
116 active: parking_lot::Mutex::new(false),
117 consensus_client,
118 }
119 }
120
121 pub fn get_storage_base_path(&self) -> PathBuf {
122 self.consensus_config.db_path().to_path_buf()
123 }
124}
125
126#[async_trait]
127impl ConsensusManagerTrait for ConsensusManager {
128 async fn start(
129 &self,
130 node_config: &NodeConfig,
131 epoch_store: Arc<AuthorityPerEpochStore>,
132 consensus_handler_initializer: ConsensusHandlerInitializer,
133 tx_validator: IotaTxValidator,
134 ) {
135 let protocol_manager = {
136 let mut active = self.active.lock();
137 assert!(!*active, "Cannot start consensus. It is already running!");
138 info!("Starting consensus ...");
139 *active = true;
140 self.consensus_client.set(self.mysticeti_client.clone());
141 &self.mysticeti_manager
142 };
143
144 protocol_manager
145 .start(
146 node_config,
147 epoch_store,
148 consensus_handler_initializer,
149 tx_validator,
150 )
151 .await
152 }
153
154 async fn shutdown(&self) {
155 info!("Shutting down consensus ...");
156 let prev_active = {
157 let mut active = self.active.lock();
158 std::mem::replace(&mut *active, false)
159 };
160 if prev_active {
161 self.mysticeti_manager.shutdown().await;
162 }
163 self.consensus_client.clear();
164 }
165
166 async fn is_running(&self) -> bool {
167 let active = self.active.lock();
168 *active
169 }
170}
171
172#[derive(Default)]
176pub struct UpdatableConsensusClient {
177 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
179}
180
181impl UpdatableConsensusClient {
182 pub fn new() -> Self {
183 Self {
184 client: ArcSwapOption::empty(),
185 }
186 }
187
188 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
189 const START_TIMEOUT: Duration = Duration::from_secs(30);
190 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
191 if let Ok(client) = timeout(START_TIMEOUT, async {
192 loop {
193 let Some(client) = self.client.load_full() else {
194 sleep(RETRY_INTERVAL).await;
195 continue;
196 };
197 return client;
198 }
199 })
200 .await
201 {
202 return client;
203 }
204
205 panic!(
206 "Timed out after {:?} waiting for Consensus to start!",
207 START_TIMEOUT,
208 );
209 }
210
211 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
212 self.client.store(Some(Arc::new(client)));
213 }
214
215 pub fn clear(&self) {
216 self.client.store(None);
217 }
218}
219
220#[async_trait]
221impl ConsensusClient for UpdatableConsensusClient {
222 async fn submit(
223 &self,
224 transactions: &[ConsensusTransaction],
225 epoch_store: &Arc<AuthorityPerEpochStore>,
226 ) -> IotaResult<BlockStatusReceiver> {
227 let client = self.get().await;
228 client.submit(transactions, epoch_store).await
229 }
230}
231
232pub struct ConsensusManagerMetrics {
233 start_latency: IntGauge,
234 shutdown_latency: IntGauge,
235}
236
237impl ConsensusManagerMetrics {
238 pub fn new(registry: &Registry) -> Self {
239 Self {
240 start_latency: register_int_gauge_with_registry!(
241 "consensus_manager_start_latency",
242 "The latency of starting up consensus nodes",
243 registry,
244 )
245 .unwrap(),
246 shutdown_latency: register_int_gauge_with_registry!(
247 "consensus_manager_shutdown_latency",
248 "The latency of shutting down consensus nodes",
249 registry,
250 )
251 .unwrap(),
252 }
253 }
254}
255
256pub(crate) struct RunningLockGuard<'a> {
257 state_guard: MutexGuard<'a, Running>,
258 metrics: &'a ConsensusManagerMetrics,
259 epoch: Option<EpochId>,
260 protocol_version: Option<ProtocolVersion>,
261 start: Instant,
262}
263
264impl<'a> RunningLockGuard<'a> {
265 pub(crate) async fn acquire_start(
266 metrics: &'a ConsensusManagerMetrics,
267 running_mutex: &'a Mutex<Running>,
268 epoch: EpochId,
269 version: ProtocolVersion,
270 ) -> Option<RunningLockGuard<'a>> {
271 let running = running_mutex.lock().await;
272 if let Running::True(epoch, version) = *running {
273 tracing::warn!(
274 "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
275 );
276 return None;
277 }
278
279 tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
280
281 Some(RunningLockGuard {
282 state_guard: running,
283 metrics,
284 start: Instant::now(),
285 epoch: Some(epoch),
286 protocol_version: Some(version),
287 })
288 }
289
290 pub(crate) async fn acquire_shutdown(
291 metrics: &'a ConsensusManagerMetrics,
292 running_mutex: &'a Mutex<Running>,
293 ) -> Option<RunningLockGuard<'a>> {
294 let running = running_mutex.lock().await;
295 if let Running::True(epoch, version) = *running {
296 tracing::info!(
297 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
298 );
299 } else {
300 tracing::warn!("Consensus shutdown was called but consensus is not running");
301 return None;
302 }
303
304 Some(RunningLockGuard {
305 state_guard: running,
306 metrics,
307 start: Instant::now(),
308 epoch: None,
309 protocol_version: None,
310 })
311 }
312}
313
314impl Drop for RunningLockGuard<'_> {
315 fn drop(&mut self) {
316 match *self.state_guard {
317 Running::True(epoch, version) => {
319 tracing::info!(
320 "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
321 self.start.elapsed().as_secs_f64()
322 );
323
324 self.metrics
325 .shutdown_latency
326 .set(self.start.elapsed().as_secs_f64() as i64);
327
328 *self.state_guard = Running::False;
329 }
330 Running::False => {
332 tracing::info!(
333 "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
334 self.epoch.unwrap(),
335 self.protocol_version.unwrap(),
336 self.start.elapsed().as_secs_f64()
337 );
338
339 self.metrics
340 .start_latency
341 .set(self.start.elapsed().as_secs_f64() as i64);
342
343 *self.state_guard =
344 Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
345 }
346 }
347 }
348}