iota_core/consensus_manager/
mod.rs1use std::{
6 path::PathBuf,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use arc_swap::ArcSwapOption;
12use async_trait::async_trait;
13use fastcrypto::traits::KeyPair as _;
14use iota_config::{ConsensusConfig, NodeConfig};
15use iota_metrics::RegistryService;
16use iota_protocol_config::ProtocolVersion;
17use iota_types::{committee::EpochId, error::IotaResult, messages_consensus::ConsensusTransaction};
18use prometheus_filtered::{IntGauge, Registry, register_int_gauge_with_registry};
19use starfish_core::CommitConsumerMonitor;
20use tokio::{
21 sync::{Mutex, MutexGuard, broadcast},
22 time::{sleep, timeout},
23};
24use tracing::info;
25
26use crate::{
27 authority::authority_per_epoch_store::AuthorityPerEpochStore,
28 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
29 consensus_handler::ConsensusHandlerInitializer,
30 consensus_manager::starfish_manager::StarfishManager,
31 consensus_validator::IotaTxValidator,
32 starfish_adapter::LazyStarfishClient,
33};
34
35pub mod starfish_manager;
36
37#[derive(PartialEq)]
38pub(crate) enum Running {
39 True(EpochId, ProtocolVersion),
40 False,
41}
42
43#[async_trait]
44pub trait ConsensusManagerTrait {
45 async fn start(
46 &self,
47 node_config: &NodeConfig,
48 epoch_store: Arc<AuthorityPerEpochStore>,
49 consensus_handler_initializer: ConsensusHandlerInitializer,
50 tx_validator: IotaTxValidator,
51 );
52
53 async fn shutdown(&self);
54
55 async fn is_running(&self) -> bool;
56
57 fn replay_waiter(&self) -> ReplayWaiter;
58}
59
60pub struct ConsensusManager {
62 consensus_config: ConsensusConfig,
63 starfish_manager: StarfishManager,
64}
65
66impl ConsensusManager {
67 pub fn new(
68 node_config: &NodeConfig,
69 consensus_config: &ConsensusConfig,
70 registry_service: &RegistryService,
71 metrics_registry: &Registry,
72 consensus_client: Arc<UpdatableConsensusClient>,
73 ) -> Self {
74 let metrics = Arc::new(ConsensusManagerMetrics::new(metrics_registry));
75 let starfish_client = Arc::new(LazyStarfishClient::new());
76 consensus_client.set(starfish_client.clone());
77 let starfish_manager = StarfishManager::new(
78 node_config.protocol_key_pair().copy(),
79 node_config.network_key_pair().copy(),
80 consensus_config.db_path().to_path_buf(),
81 registry_service.clone(),
82 metrics,
83 starfish_client,
84 );
85 Self {
86 consensus_config: consensus_config.clone(),
87 starfish_manager,
88 }
89 }
90
91 pub fn get_storage_base_path(&self) -> PathBuf {
92 self.consensus_config.db_path().to_path_buf()
93 }
94}
95
96#[async_trait]
97impl ConsensusManagerTrait for ConsensusManager {
98 async fn start(
99 &self,
100 node_config: &NodeConfig,
101 epoch_store: Arc<AuthorityPerEpochStore>,
102 consensus_handler_initializer: ConsensusHandlerInitializer,
103 tx_validator: IotaTxValidator,
104 ) {
105 info!("Starting consensus protocol Starfish ...");
106 self.starfish_manager
107 .start(
108 node_config,
109 epoch_store,
110 consensus_handler_initializer,
111 tx_validator,
112 )
113 .await
114 }
115
116 async fn shutdown(&self) {
117 info!("Shutting down consensus ...");
118 self.starfish_manager.shutdown().await;
119 }
120
121 async fn is_running(&self) -> bool {
122 self.starfish_manager.is_running().await
123 }
124
125 fn replay_waiter(&self) -> ReplayWaiter {
126 self.starfish_manager.replay_waiter()
127 }
128}
129
130#[derive(Default)]
134pub struct UpdatableConsensusClient {
135 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
137}
138
139impl UpdatableConsensusClient {
140 pub fn new() -> Self {
141 Self {
142 client: ArcSwapOption::empty(),
143 }
144 }
145
146 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
147 const START_TIMEOUT: Duration = Duration::from_secs(30);
148 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
149 if let Ok(client) = timeout(START_TIMEOUT, async {
150 loop {
151 let Some(client) = self.client.load_full() else {
152 sleep(RETRY_INTERVAL).await;
153 continue;
154 };
155 return client;
156 }
157 })
158 .await
159 {
160 return client;
161 }
162
163 panic!("Timed out after {START_TIMEOUT:?} waiting for Consensus to start!",);
164 }
165
166 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
167 self.client.store(Some(Arc::new(client)));
168 }
169
170 pub fn clear(&self) {
171 self.client.store(None);
172 }
173}
174
175#[async_trait]
176impl ConsensusClient for UpdatableConsensusClient {
177 async fn submit(
178 &self,
179 transactions: &[ConsensusTransaction],
180 epoch_store: &Arc<AuthorityPerEpochStore>,
181 ) -> IotaResult<BlockStatusReceiver> {
182 let client = self.get().await;
183 client.submit(transactions, epoch_store).await
184 }
185}
186
187pub struct ReplayWaiter {
189 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
190}
191
192impl ReplayWaiter {
193 pub(crate) fn new(
194 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
195 ) -> Self {
196 Self {
197 consumer_monitor_receiver,
198 }
199 }
200
201 pub(crate) async fn wait_for_replay(mut self) {
202 loop {
203 info!("Waiting for consensus to start replaying ...");
204 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
205 continue;
206 };
207 info!("Waiting for consensus handler to finish replaying ...");
208 monitor.replay_complete().await;
209 break;
210 }
211 }
212}
213
214impl Clone for ReplayWaiter {
215 fn clone(&self) -> Self {
216 Self {
217 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
218 }
219 }
220}
221
222pub struct ConsensusManagerMetrics {
223 start_latency: IntGauge,
224 shutdown_latency: IntGauge,
225}
226
227impl ConsensusManagerMetrics {
228 pub fn new(registry: &Registry) -> Self {
229 Self {
230 start_latency: register_int_gauge_with_registry!(
231 "consensus_manager_start_latency",
232 "The latency of starting up consensus nodes",
233 registry,
234 )
235 .unwrap(),
236 shutdown_latency: register_int_gauge_with_registry!(
237 "consensus_manager_shutdown_latency",
238 "The latency of shutting down consensus nodes",
239 registry,
240 )
241 .unwrap(),
242 }
243 }
244}
245
246pub(crate) struct RunningLockGuard<'a> {
247 state_guard: MutexGuard<'a, Running>,
248 metrics: &'a ConsensusManagerMetrics,
249 epoch: Option<EpochId>,
250 protocol_version: Option<ProtocolVersion>,
251 start: Instant,
252}
253
254impl<'a> RunningLockGuard<'a> {
255 pub(crate) async fn acquire_start(
256 metrics: &'a ConsensusManagerMetrics,
257 running_mutex: &'a Mutex<Running>,
258 epoch: EpochId,
259 version: ProtocolVersion,
260 ) -> Option<RunningLockGuard<'a>> {
261 let running = running_mutex.lock().await;
262 if let Running::True(epoch, version) = *running {
263 tracing::warn!(
264 "Consensus is already Running for epoch {epoch:?} & protocol version {version:?} - shutdown first before starting",
265 );
266 return None;
267 }
268
269 tracing::info!("Starting up consensus for epoch {epoch:?} & protocol version {version:?}");
270
271 Some(RunningLockGuard {
272 state_guard: running,
273 metrics,
274 start: Instant::now(),
275 epoch: Some(epoch),
276 protocol_version: Some(version),
277 })
278 }
279
280 pub(crate) async fn acquire_shutdown(
281 metrics: &'a ConsensusManagerMetrics,
282 running_mutex: &'a Mutex<Running>,
283 ) -> Option<RunningLockGuard<'a>> {
284 let running = running_mutex.lock().await;
285 if let Running::True(epoch, version) = *running {
286 tracing::info!(
287 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
288 );
289 } else {
290 tracing::warn!("Consensus shutdown was called but consensus is not running");
291 return None;
292 }
293
294 Some(RunningLockGuard {
295 state_guard: running,
296 metrics,
297 start: Instant::now(),
298 epoch: None,
299 protocol_version: None,
300 })
301 }
302}
303
304impl Drop for RunningLockGuard<'_> {
305 fn drop(&mut self) {
306 match *self.state_guard {
307 Running::True(epoch, version) => {
309 tracing::info!(
310 "Consensus shutdown for epoch {epoch:?} & protocol version {version:?} is complete - took {} seconds",
311 self.start.elapsed().as_secs_f64()
312 );
313
314 self.metrics
315 .shutdown_latency
316 .set(self.start.elapsed().as_secs_f64() as i64);
317
318 *self.state_guard = Running::False;
319 }
320 Running::False => {
322 tracing::info!(
323 "Starting up consensus for epoch {} & protocol version {:?} is complete - took {} seconds",
324 self.epoch.unwrap(),
325 self.protocol_version.unwrap(),
326 self.start.elapsed().as_secs_f64()
327 );
328
329 self.metrics
330 .start_latency
331 .set(self.start.elapsed().as_secs_f64() as i64);
332
333 *self.state_guard =
334 Running::True(self.epoch.unwrap(), self.protocol_version.unwrap());
335 }
336 }
337 }
338}