iota_core/consensus_manager/
mod.rs1use std::{
6 path::PathBuf,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use fastcrypto::traits::KeyPair as _;
14use iota_config::{ConsensusConfig, NodeConfig};
15use iota_metrics::RegistryService;
16use iota_protocol_config::ProtocolVersion;
17use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
18use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
19use tokio::{
20 sync::{Mutex, MutexGuard},
21 time::{sleep, timeout},
22};
23use tracing::info;
24
25use crate::{
26 authority::authority_per_epoch_store::AuthorityPerEpochStore,
27 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
28 consensus_handler::ConsensusHandlerInitializer,
29 consensus_manager::starfish_manager::StarfishManager,
30 consensus_validator::IotaTxValidator,
31 starfish_adapter::LazyStarfishClient,
32};
33
34pub mod starfish_manager;
35
36#[derive(PartialEq)]
37pub(crate) enum Running {
38 True(EpochId, ProtocolVersion),
39 False,
40}
41
42#[async_trait]
43pub trait ConsensusManagerTrait {
44 async fn start(
45 &self,
46 node_config: &NodeConfig,
47 epoch_store: Arc<AuthorityPerEpochStore>,
48 consensus_handler_initializer: ConsensusHandlerInitializer,
49 tx_validator: IotaTxValidator,
50 );
51
52 async fn shutdown(&self);
53
54 async fn is_running(&self) -> bool;
55}
56
57pub struct ConsensusManager {
59 consensus_config: ConsensusConfig,
60 starfish_manager: StarfishManager,
61}
62
63impl ConsensusManager {
64 pub fn new(
65 node_config: &NodeConfig,
66 consensus_config: &ConsensusConfig,
67 registry_service: &RegistryService,
68 metrics_registry: &Registry,
69 consensus_client: Arc<UpdatableConsensusClient>,
70 ) -> Self {
71 let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
72 let starfish_client = Arc::new(LazyStarfishClient::new());
73 consensus_client.set(starfish_client.clone());
74 let starfish_manager = StarfishManager::new(
75 node_config.protocol_key_pair().copy(),
76 node_config.network_key_pair().copy(),
77 consensus_config.db_path().to_path_buf(),
78 registry_service.clone(),
79 metrics,
80 starfish_client,
81 );
82 Self {
83 consensus_config: consensus_config.clone(),
84 starfish_manager,
85 }
86 }
87
88 pub fn get_storage_base_path(&self) -> PathBuf {
89 self.consensus_config.db_path().to_path_buf()
90 }
91}
92
93#[async_trait]
94impl ConsensusManagerTrait for ConsensusManager {
95 async fn start(
96 &self,
97 node_config: &NodeConfig,
98 epoch_store: Arc<AuthorityPerEpochStore>,
99 consensus_handler_initializer: ConsensusHandlerInitializer,
100 tx_validator: IotaTxValidator,
101 ) {
102 info!("Starting consensus protocol Starfish ...");
103 self.starfish_manager
104 .start(
105 node_config,
106 epoch_store,
107 consensus_handler_initializer,
108 tx_validator,
109 )
110 .await
111 }
112
113 async fn shutdown(&self) {
114 info!("Shutting down consensus ...");
115 self.starfish_manager.shutdown().await;
116 }
117
118 async fn is_running(&self) -> bool {
119 self.starfish_manager.is_running().await
120 }
121}
122
123#[derive(Default)]
127pub struct UpdatableConsensusClient {
128 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
130}
131
132impl UpdatableConsensusClient {
133 pub fn new() -> Self {
134 Self {
135 client: ArcSwapOption::empty(),
136 }
137 }
138
139 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
140 const START_TIMEOUT: Duration = Duration::from_secs(30);
141 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
142 if let Ok(client) = timeout(START_TIMEOUT, async {
143 loop {
144 let Some(client) = self.client.load_full() else {
145 sleep(RETRY_INTERVAL).await;
146 continue;
147 };
148 return client;
149 }
150 })
151 .await
152 {
153 return client;
154 }
155
156 panic!("Timed out after {START_TIMEOUT:?} waiting for Consensus to start!",);
157 }
158
159 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
160 self.client.store(Some(Arc::new(client)));
161 }
162
163 pub fn clear(&self) {
164 self.client.store(None);
165 }
166}
167
168#[async_trait]
169impl ConsensusClient for UpdatableConsensusClient {
170 async fn submit(
171 &self,
172 transactions: &[ConsensusTransaction],
173 epoch_store: &Arc<AuthorityPerEpochStore>,
174 ) -> IotaResult<BlockStatusReceiver> {
175 let client = self.get().await;
176 client.submit(transactions, epoch_store).await
177 }
178}
179
180pub struct ConsensusManagerMetrics {
181 start_latency: IntGauge,
182 shutdown_latency: IntGauge,
183}
184
185impl ConsensusManagerMetrics {
186 pub fn new(registry: &Registry) -> Self {
187 Self {
188 start_latency: register_int_gauge_with_registry!(
189 "consensus_manager_start_latency",
190 "The latency of starting up consensus nodes",
191 registry,
192 )
193 .unwrap(),
194 shutdown_latency: register_int_gauge_with_registry!(
195 "consensus_manager_shutdown_latency",
196 "The latency of shutting down consensus nodes",
197 registry,
198 )
199 .unwrap(),
200 }
201 }
202}
203
204pub(crate) struct RunningLockGuard<'a> {
205 state_guard: MutexGuard<'a, Running>,
206 metrics: &'a ConsensusManagerMetrics,
207 epoch: Option<EpochId>,
208 protocol_version: Option<ProtocolVersion>,
209 start: Instant,
210}
211
212impl<'a> RunningLockGuard<'a> {
213 pub(crate) async fn acquire_start(
214 metrics: &'a ConsensusManagerMetrics,
215 running_mutex: &'a Mutex<Running>,
216 epoch: EpochId,
217 version: ProtocolVersion,
218 ) -> Option<RunningLockGuard<'a>> {
219 let running = running_mutex.lock().await;
220 if let Running::True(epoch, version) = *running {
221 tracing::warn!(
222 "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
223 );
224 return None;
225 }
226
227 tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
228
229 Some(RunningLockGuard {
230 state_guard: running,
231 metrics,
232 start: Instant::now(),
233 epoch: Some(epoch),
234 protocol_version: Some(version),
235 })
236 }
237
238 pub(crate) async fn acquire_shutdown(
239 metrics: &'a ConsensusManagerMetrics,
240 running_mutex: &'a Mutex<Running>,
241 ) -> Option<RunningLockGuard<'a>> {
242 let running = running_mutex.lock().await;
243 if let Running::True(epoch, version) = *running {
244 tracing::info!(
245 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
246 );
247 } else {
248 tracing::warn!("Consensus shutdown was called but consensus is not running");
249 return None;
250 }
251
252 Some(RunningLockGuard {
253 state_guard: running,
254 metrics,
255 start: Instant::now(),
256 epoch: None,
257 protocol_version: None,
258 })
259 }
260}
261
262impl Drop for RunningLockGuard<'_> {
263 fn drop(&mut self) {
264 match *self.state_guard {
265 Running::True(epoch, version) => {
267 tracing::info!(
268 "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
269 self.start.elapsed().as_secs_f64()
270 );
271
272 self.metrics
273 .shutdown_latency
274 .set(self.start.elapsed().as_secs_f64() as i64);
275
276 *self.state_guard = Running::False;
277 }
278 Running::False => {
280 tracing::info!(
281 "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
282 self.epoch.unwrap(),
283 self.protocol_version.unwrap(),
284 self.start.elapsed().as_secs_f64()
285 );
286
287 self.metrics
288 .start_latency
289 .set(self.start.elapsed().as_secs_f64() as i64);
290
291 *self.state_guard =
292 Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
293 }
294 }
295 }
296}