iota_core/consensus_manager/
mysticeti_manager.rs1use std::{path::PathBuf, sync::Arc};
6
7use arc_swap::ArcSwapOption;
8use async_trait::async_trait;
9use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
10use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority};
11use fastcrypto::ed25519;
12use iota_config::NodeConfig;
13use iota_metrics::{RegistryID, RegistryService, monitored_mpsc::unbounded_channel};
14use iota_protocol_config::ConsensusNetwork;
15use iota_types::{
16 committee::EpochId,
17 iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
18};
19use prometheus::Registry;
20use tokio::sync::Mutex;
21use tracing::info;
22
23use crate::{
24 authority::authority_per_epoch_store::AuthorityPerEpochStore,
25 consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler},
26 consensus_manager::{
27 ConsensusManagerMetrics, ConsensusManagerTrait, Running, RunningLockGuard,
28 },
29 consensus_validator::IotaTxValidator,
30 mysticeti_adapter::LazyMysticetiClient,
31};
32
33#[cfg(test)]
34#[path = "../unit_tests/mysticeti_manager_tests.rs"]
35pub mod mysticeti_manager_tests;
36
37pub struct MysticetiManager {
38 protocol_keypair: ProtocolKeyPair,
39 network_keypair: NetworkKeyPair,
40 storage_base_path: PathBuf,
41 running: Mutex<Running>,
43 metrics: Arc<ConsensusManagerMetrics>,
44 registry_service: RegistryService,
45 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
46 boot_counter: Mutex<u64>,
47 client: Arc<LazyMysticetiClient>,
50 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
52 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
53}
54
55impl MysticetiManager {
56 pub fn new(
60 protocol_keypair: ed25519::Ed25519KeyPair,
61 network_keypair: ed25519::Ed25519KeyPair,
62 storage_base_path: PathBuf,
63 registry_service: RegistryService,
64 metrics: Arc<ConsensusManagerMetrics>,
65 client: Arc<LazyMysticetiClient>,
66 ) -> Self {
67 Self {
68 protocol_keypair: ProtocolKeyPair::new(protocol_keypair),
69 network_keypair: NetworkKeyPair::new(network_keypair),
70 storage_base_path,
71 running: Mutex::new(Running::False),
72 metrics,
73 registry_service,
74 authority: ArcSwapOption::empty(),
75 client,
76 consensus_handler: Mutex::new(None),
77 boot_counter: Mutex::new(0),
78 consumer_monitor: ArcSwapOption::empty(),
79 }
80 }
81
82 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
83 let mut store_path = self.storage_base_path.clone();
84 store_path.push(format!("{}", epoch));
85 store_path
86 }
87
88 fn pick_network(&self, epoch_store: &AuthorityPerEpochStore) -> ConsensusNetwork {
89 if let Ok(type_str) = std::env::var("CONSENSUS_NETWORK") {
90 match type_str.to_lowercase().as_str() {
91 "tonic" => return ConsensusNetwork::Tonic,
92 _ => {
93 info!(
94 "Invalid consensus network type {} in env var. Continue to use the value from protocol config.",
95 type_str
96 );
97 }
98 }
99 }
100 epoch_store.protocol_config().consensus_network()
101 }
102}
103
104#[async_trait]
105
106impl ConsensusManagerTrait for MysticetiManager {
107 async fn start(
109 &self,
110 config: &NodeConfig,
111 epoch_store: Arc<AuthorityPerEpochStore>,
112 consensus_handler_initializer: ConsensusHandlerInitializer,
113 tx_validator: IotaTxValidator,
114 ) {
115 let system_state = epoch_store.epoch_start_state();
116 let committee: Committee = system_state.get_consensus_committee();
117 let epoch = epoch_store.epoch();
118 let protocol_config = epoch_store.protocol_config();
119 let network_type = self.pick_network(&epoch_store);
120
121 let Some(_guard) = RunningLockGuard::acquire_start(
122 &self.metrics,
123 &self.running,
124 epoch,
125 protocol_config.version,
126 )
127 .await
128 else {
129 return;
130 };
131
132 let consensus_config = config
133 .consensus_config()
134 .expect("consensus_config should exist");
135
136 let parameters = Parameters {
137 db_path: self.get_store_path(epoch),
138 ..consensus_config.parameters.clone().unwrap_or_default()
139 };
140
141 let own_protocol_key = self.protocol_keypair.public();
142 let (own_index, _) = committee
143 .authorities()
144 .find(|(_, a)| a.protocol_key == own_protocol_key)
145 .expect("Own authority should be among the consensus authorities!");
146
147 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
148
149 let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
150
151 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
152 let consumer = CommitConsumer::new(
153 commit_sender,
154 consensus_handler.last_processed_subdag_index() as CommitIndex,
155 );
156 let monitor = consumer.monitor();
157
158 let participated_on_previous_run =
165 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
166 previous_monitor.highest_handled_commit() > 0
167 } else {
168 false
169 };
170
171 let mut boot_counter = self.boot_counter.lock().await;
179 if participated_on_previous_run {
180 *boot_counter += 1;
181 } else {
182 info!(
183 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
184 *boot_counter
185 );
186 }
187
188 let authority = ConsensusAuthority::start(
189 network_type,
190 own_index,
191 committee.clone(),
192 parameters.clone(),
193 protocol_config.clone(),
194 self.protocol_keypair.clone(),
195 self.network_keypair.clone(),
196 Arc::new(tx_validator.clone()),
197 consumer,
198 registry.clone(),
199 *boot_counter,
200 )
201 .await;
202 let client = authority.transaction_client();
203
204 let registry_id = self.registry_service.add(registry.clone());
205
206 let registered_authority = Arc::new((authority, registry_id));
207 self.authority.swap(Some(registered_authority.clone()));
208
209 self.client.set(client);
211
212 let handler = MysticetiConsensusHandler::new(consensus_handler, commit_receiver, monitor);
214
215 let mut consensus_handler = self.consensus_handler.lock().await;
216 *consensus_handler = Some(handler);
217
218 registered_authority.0.replay_complete().await;
220 }
221
222 async fn shutdown(&self) {
223 let Some(_guard) = RunningLockGuard::acquire_shutdown(&self.metrics, &self.running).await
224 else {
225 return;
226 };
227
228 self.client.clear();
230
231 let r = self.authority.swap(None).unwrap();
234 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
235 panic!("Failed to retrieve the mysticeti authority");
236 };
237
238 authority.stop().await;
240
241 let mut consensus_handler = self.consensus_handler.lock().await;
243 if let Some(mut handler) = consensus_handler.take() {
244 handler.abort().await;
245 }
246
247 self.registry_service.remove(registry_id);
249 }
250
251 async fn is_running(&self) -> bool {
252 Running::False != *self.running.lock().await
253 }
254}