iota_core/consensus_manager/
mod.rs1use std::{
6 path::PathBuf,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use enum_dispatch::enum_dispatch;
14use fastcrypto::traits::KeyPair as _;
15use iota_config::{ConsensusConfig, NodeConfig, node::ConsensusProtocol};
16use iota_metrics::RegistryService;
17use iota_protocol_config::{ConsensusChoice, ProtocolVersion};
18use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use tokio::{
21 sync::{Mutex, MutexGuard},
22 time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27 authority::authority_per_epoch_store::AuthorityPerEpochStore,
28 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29 consensus_handler::ConsensusHandlerInitializer,
30 consensus_manager::{mysticeti_manager::MysticetiManager, starfish_manager::StarfishManager},
31 consensus_validator::IotaTxValidator,
32 mysticeti_adapter::LazyMysticetiClient,
33 starfish_adapter::LazyStarfishClient,
34};
35
36pub mod mysticeti_manager;
37pub mod starfish_manager;
38
39#[derive(PartialEq)]
40pub(crate) enum Running {
41 True(EpochId, ProtocolVersion),
42 False,
43}
44
45#[async_trait]
46#[enum_dispatch(ProtocolManager)]
47pub trait ConsensusManagerTrait {
48 async fn start(
49 &self,
50 node_config: &NodeConfig,
51 epoch_store: Arc<AuthorityPerEpochStore>,
52 consensus_handler_initializer: ConsensusHandlerInitializer,
53 tx_validator: IotaTxValidator,
54 );
55
56 async fn shutdown(&self);
57
58 async fn is_running(&self) -> bool;
59}
60
61#[enum_dispatch]
64enum ProtocolManager {
65 Mysticeti(MysticetiManager),
66 Starfish(StarfishManager),
67}
68
69impl ProtocolManager {
70 pub fn new_mysticeti(
72 config: &NodeConfig,
73 consensus_config: &ConsensusConfig,
74 registry_service: &RegistryService,
75 metrics: Arc<ConsensusManagerMetrics>,
76 client: Arc<LazyMysticetiClient>,
77 ) -> Self {
78 Self::Mysticeti(MysticetiManager::new(
79 config.protocol_key_pair().copy(),
80 config.network_key_pair().copy(),
81 consensus_config.db_path().to_path_buf(),
82 registry_service.clone(),
83 metrics,
84 client,
85 ))
86 }
87
88 pub fn new_starfish(
90 config: &NodeConfig,
91 consensus_config: &ConsensusConfig,
92 registry_service: &RegistryService,
93 metrics: Arc<ConsensusManagerMetrics>,
94 client: Arc<LazyStarfishClient>,
95 ) -> Self {
96 Self::Starfish(StarfishManager::new(
97 config.protocol_key_pair().copy(),
98 config.network_key_pair().copy(),
99 consensus_config.db_path().to_path_buf(),
100 registry_service.clone(),
101 metrics,
102 client,
103 ))
104 }
105}
106
107pub struct ConsensusManager {
109 consensus_config: ConsensusConfig,
110 mysticeti_manager: ProtocolManager,
111 starfish_manager: ProtocolManager,
112 mysticeti_client: Arc<LazyMysticetiClient>,
113 starfish_client: Arc<LazyStarfishClient>,
114 active: parking_lot::Mutex<Vec<bool>>,
115 consensus_client: Arc<UpdatableConsensusClient>,
116}
117
118impl ConsensusManager {
119 pub fn new(
120 node_config: &NodeConfig,
121 consensus_config: &ConsensusConfig,
122 registry_service: &RegistryService,
123 metrics_registry: &Registry,
124 consensus_client: Arc<UpdatableConsensusClient>,
125 ) -> Self {
126 let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
127 let mysticeti_client = Arc::new(LazyMysticetiClient::new());
128 let mysticeti_manager = ProtocolManager::new_mysticeti(
129 node_config,
130 consensus_config,
131 registry_service,
132 metrics.clone(),
133 mysticeti_client.clone(),
134 );
135 let starfish_client = Arc::new(LazyStarfishClient::new());
136 let starfish_manager = ProtocolManager::new_starfish(
137 node_config,
138 consensus_config,
139 registry_service,
140 metrics,
141 starfish_client.clone(),
142 );
143 Self {
144 consensus_config: consensus_config.clone(),
145 mysticeti_manager,
146 starfish_manager,
147 mysticeti_client,
148 starfish_client,
149 active: parking_lot::Mutex::new(vec![false; 2]),
150 consensus_client,
151 }
152 }
153
154 pub fn get_storage_base_path(&self) -> PathBuf {
155 self.consensus_config.db_path().to_path_buf()
156 }
157
158 fn pick_protocol(&self, epoch_store: &AuthorityPerEpochStore) -> ConsensusProtocol {
160 let protocol_config = epoch_store.protocol_config();
161 if let Ok(consensus_choice) = std::env::var("CONSENSUS_PROTOCOL") {
162 match consensus_choice.to_lowercase().as_str() {
163 "mysticeti" => return ConsensusProtocol::Mysticeti,
164 "starfish" => return ConsensusProtocol::Starfish,
165 "swap_each_epoch" => {
166 let protocol = if epoch_store.epoch() % 2 == 0 {
167 ConsensusProtocol::Starfish
168 } else {
169 ConsensusProtocol::Mysticeti
170 };
171 return protocol;
172 }
173 _ => {
174 info!(
175 "Invalid consensus choice {} in env var. Continue to pick consensus with protocol config",
176 consensus_choice
177 );
178 }
179 };
180 }
181
182 match protocol_config.consensus_choice() {
183 ConsensusChoice::Mysticeti => ConsensusProtocol::Mysticeti,
184 }
185 }
186}
187
188#[async_trait]
189impl ConsensusManagerTrait for ConsensusManager {
190 async fn start(
191 &self,
192 node_config: &NodeConfig,
193 epoch_store: Arc<AuthorityPerEpochStore>,
194 consensus_handler_initializer: ConsensusHandlerInitializer,
195 tx_validator: IotaTxValidator,
196 ) {
197 let protocol_manager = {
198 let mut active = self.active.lock();
199 active.iter().enumerate().for_each(|(index, active)| {
200 assert!(
201 !*active,
202 "Cannot start consensus. ConsensusManager protocol {index} is already running"
203 );
204 });
205 let protocol = self.pick_protocol(&epoch_store);
206 info!("Starting consensus protocol {protocol:?} ...");
207 self.consensus_client.set(self.mysticeti_client.clone());
208 match protocol {
209 ConsensusProtocol::Mysticeti => {
210 active[0] = true;
211 self.consensus_client.set(self.mysticeti_client.clone());
212 &self.mysticeti_manager
213 }
214 ConsensusProtocol::Starfish => {
215 active[1] = true;
216 self.consensus_client.set(self.starfish_client.clone());
217 &self.starfish_manager
218 }
219 }
220 };
221
222 protocol_manager
223 .start(
224 node_config,
225 epoch_store,
226 consensus_handler_initializer,
227 tx_validator,
228 )
229 .await
230 }
231
232 async fn shutdown(&self) {
233 info!("Shutting down consensus ...");
234 let prev_active = {
235 let mut active = self.active.lock();
236 std::mem::replace(&mut *active, vec![false; 2])
237 };
238 if prev_active[0] {
239 self.mysticeti_manager.shutdown().await;
240 }
241
242 if prev_active[1] {
243 self.starfish_manager.shutdown().await;
244 }
245
246 self.consensus_client.clear();
247 }
248
249 async fn is_running(&self) -> bool {
250 let active = self.active.lock();
251 active.iter().any(|i| *i)
252 }
253}
254
255#[derive(Default)]
259pub struct UpdatableConsensusClient {
260 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
262}
263
264impl UpdatableConsensusClient {
265 pub fn new() -> Self {
266 Self {
267 client: ArcSwapOption::empty(),
268 }
269 }
270
271 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
272 const START_TIMEOUT: Duration = Duration::from_secs(30);
273 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
274 if let Ok(client) = timeout(START_TIMEOUT, async {
275 loop {
276 let Some(client) = self.client.load_full() else {
277 sleep(RETRY_INTERVAL).await;
278 continue;
279 };
280 return client;
281 }
282 })
283 .await
284 {
285 return client;
286 }
287
288 panic!("Timed out after {START_TIMEOUT:?} waiting for Consensus to start!",);
289 }
290
291 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
292 self.client.store(Some(Arc::new(client)));
293 }
294
295 pub fn clear(&self) {
296 self.client.store(None);
297 }
298}
299
300#[async_trait]
301impl ConsensusClient for UpdatableConsensusClient {
302 async fn submit(
303 &self,
304 transactions: &[ConsensusTransaction],
305 epoch_store: &Arc<AuthorityPerEpochStore>,
306 ) -> IotaResult<BlockStatusReceiver> {
307 let client = self.get().await;
308 client.submit(transactions, epoch_store).await
309 }
310}
311
312pub struct ConsensusManagerMetrics {
313 start_latency: IntGauge,
314 shutdown_latency: IntGauge,
315}
316
317impl ConsensusManagerMetrics {
318 pub fn new(registry: &Registry) -> Self {
319 Self {
320 start_latency: register_int_gauge_with_registry!(
321 "consensus_manager_start_latency",
322 "The latency of starting up consensus nodes",
323 registry,
324 )
325 .unwrap(),
326 shutdown_latency: register_int_gauge_with_registry!(
327 "consensus_manager_shutdown_latency",
328 "The latency of shutting down consensus nodes",
329 registry,
330 )
331 .unwrap(),
332 }
333 }
334}
335
336pub(crate) struct RunningLockGuard<'a> {
337 state_guard: MutexGuard<'a, Running>,
338 metrics: &'a ConsensusManagerMetrics,
339 epoch: Option<EpochId>,
340 protocol_version: Option<ProtocolVersion>,
341 start: Instant,
342}
343
344impl<'a> RunningLockGuard<'a> {
345 pub(crate) async fn acquire_start(
346 metrics: &'a ConsensusManagerMetrics,
347 running_mutex: &'a Mutex<Running>,
348 epoch: EpochId,
349 version: ProtocolVersion,
350 ) -> Option<RunningLockGuard<'a>> {
351 let running = running_mutex.lock().await;
352 if let Running::True(epoch, version) = *running {
353 tracing::warn!(
354 "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
355 );
356 return None;
357 }
358
359 tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
360
361 Some(RunningLockGuard {
362 state_guard: running,
363 metrics,
364 start: Instant::now(),
365 epoch: Some(epoch),
366 protocol_version: Some(version),
367 })
368 }
369
370 pub(crate) async fn acquire_shutdown(
371 metrics: &'a ConsensusManagerMetrics,
372 running_mutex: &'a Mutex<Running>,
373 ) -> Option<RunningLockGuard<'a>> {
374 let running = running_mutex.lock().await;
375 if let Running::True(epoch, version) = *running {
376 tracing::info!(
377 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
378 );
379 } else {
380 tracing::warn!("Consensus shutdown was called but consensus is not running");
381 return None;
382 }
383
384 Some(RunningLockGuard {
385 state_guard: running,
386 metrics,
387 start: Instant::now(),
388 epoch: None,
389 protocol_version: None,
390 })
391 }
392}
393
394impl Drop for RunningLockGuard<'_> {
395 fn drop(&mut self) {
396 match *self.state_guard {
397 Running::True(epoch, version) => {
399 tracing::info!(
400 "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
401 self.start.elapsed().as_secs_f64()
402 );
403
404 self.metrics
405 .shutdown_latency
406 .set(self.start.elapsed().as_secs_f64() as i64);
407
408 *self.state_guard = Running::False;
409 }
410 Running::False => {
412 tracing::info!(
413 "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
414 self.epoch.unwrap(),
415 self.protocol_version.unwrap(),
416 self.start.elapsed().as_secs_f64()
417 );
418
419 self.metrics
420 .start_latency
421 .set(self.start.elapsed().as_secs_f64() as i64);
422
423 *self.state_guard =
424 Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
425 }
426 }
427 }
428}