1use std::{
6 collections::{BTreeMap, BTreeSet},
7 fmt::Debug,
8 sync::{
9 Arc,
10 atomic::{AtomicU32, Ordering},
11 },
12};
13
14use async_trait::async_trait;
15use consensus_config::AuthorityIndex;
16use iota_metrics::{
17 monitored_mpsc::{Receiver, Sender, WeakSender, channel},
18 monitored_scope, spawn_logged_monitored_task,
19};
20use parking_lot::RwLock;
21use thiserror::Error;
22use tokio::sync::{oneshot, watch};
23use tracing::warn;
24
25use crate::{
26 BlockAPI as _,
27 block::{BlockRef, Round, VerifiedBlock},
28 commit::CertifiedCommits,
29 context::Context,
30 core::Core,
31 core_thread::CoreError::Shutdown,
32 dag_state::DagState,
33 error::{ConsensusError, ConsensusResult},
34 round_prober::QuorumRound,
35};
36
37const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
38
39enum CoreThreadCommand {
40 AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
42 CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
44 AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
46 NewBlock(Round, oneshot::Sender<()>, bool),
52 GetMissingBlocks(oneshot::Sender<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>),
55}
56
57#[derive(Error, Debug)]
58pub enum CoreError {
59 #[error("Core thread shutdown: {0}")]
60 Shutdown(String),
61}
62
63#[async_trait]
66pub trait CoreThreadDispatcher: Sync + Send + 'static {
67 async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
68 -> Result<BTreeSet<BlockRef>, CoreError>;
69
70 async fn check_block_refs(
71 &self,
72 block_refs: Vec<BlockRef>,
73 ) -> Result<BTreeSet<BlockRef>, CoreError>;
74
75 async fn add_certified_commits(
76 &self,
77 commits: CertifiedCommits,
78 ) -> Result<BTreeSet<BlockRef>, CoreError>;
79
80 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
81
82 async fn get_missing_blocks(
83 &self,
84 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError>;
85
86 fn set_quorum_subscribers_exists(&self, exists: bool) -> Result<(), CoreError>;
90
91 fn set_propagation_delay_and_quorum_rounds(
95 &self,
96 delay: Round,
97 received_quorum_rounds: Vec<QuorumRound>,
98 accepted_quorum_rounds: Vec<QuorumRound>,
99 ) -> Result<(), CoreError>;
100
101 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
102
103 fn highest_received_rounds(&self) -> Vec<Round>;
105}
106
107pub(crate) struct CoreThreadHandle {
108 sender: Sender<CoreThreadCommand>,
109 join_handle: tokio::task::JoinHandle<()>,
110}
111
112impl CoreThreadHandle {
113 pub async fn stop(self) {
114 drop(self.sender);
117 self.join_handle.await.ok();
118 }
119}
120
121struct CoreThread {
122 core: Core,
123 receiver: Receiver<CoreThreadCommand>,
124 rx_quorum_subscribers_exists: watch::Receiver<bool>,
125 rx_propagation_delay_and_quorum_rounds: watch::Receiver<PropagationDelayAndQuorumRounds>,
126 rx_last_known_proposed_round: watch::Receiver<Round>,
127 context: Arc<Context>,
128}
129
130impl CoreThread {
131 pub async fn run(mut self) -> ConsensusResult<()> {
132 tracing::debug!("Started core thread");
133
134 loop {
135 tokio::select! {
136 command = self.receiver.recv() => {
137 let Some(command) = command else {
138 break;
139 };
140 self.context.metrics.node_metrics.core_lock_dequeued.inc();
141 match command {
142 CoreThreadCommand::AddBlocks(blocks, sender) => {
143 let _scope = monitored_scope("CoreThread::loop::add_blocks");
144 let missing_block_refs = self.core.add_blocks(blocks)?;
145 sender.send(missing_block_refs).ok();
146 }
147 CoreThreadCommand::CheckBlockRefs(blocks, sender) => {
148 let _scope = monitored_scope("CoreThread::loop::find_excluded_blocks");
149 let missing_block_refs = self.core.check_block_refs(blocks)?;
150 sender.send(missing_block_refs).ok();
151 }
152 CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
153 let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
154 let missing_block_refs = self.core.add_certified_commits(commits)?;
155 sender.send(missing_block_refs).ok();
156 }
157 CoreThreadCommand::NewBlock(round, sender, force) => {
158 let _scope = monitored_scope("CoreThread::loop::new_block");
159 self.core.new_block(round, force)?;
160 sender.send(()).ok();
161 }
162 CoreThreadCommand::GetMissingBlocks(sender) => {
163 let _scope = monitored_scope("CoreThread::loop::get_missing_blocks");
164 sender.send(self.core.get_missing_blocks()).ok();
165 }
166 }
167 }
168 _ = self.rx_last_known_proposed_round.changed() => {
169 let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
170 let round = *self.rx_last_known_proposed_round.borrow();
171 self.core.set_last_known_proposed_round(round);
172 self.core.new_block(round + 1, true)?;
173 }
174 _ = self.rx_quorum_subscribers_exists.changed() => {
175 let _scope = monitored_scope("CoreThread::loop::set_subscriber_exists");
176 let should_propose_before = self.core.should_propose();
177 let exists = *self.rx_quorum_subscribers_exists.borrow();
178 self.core.set_quorum_subscribers_exists(exists);
179 if !should_propose_before && self.core.should_propose() {
180 self.core.new_block(Round::MAX, true)?;
183 }
184 }
185 _ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
186 let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
187 let should_propose_before = self.core.should_propose();
188 let state = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
189 self.core.set_propagation_delay_and_quorum_rounds(
190 state.delay,
191 state.received_quorum_rounds,
192 state.accepted_quorum_rounds
193 );
194 if !should_propose_before && self.core.should_propose() {
195 self.core.new_block(Round::MAX, true)?;
198 }
199 }
200 }
201 }
202
203 Ok(())
204 }
205}
206
207#[derive(Clone)]
208pub(crate) struct ChannelCoreThreadDispatcher {
209 context: Arc<Context>,
210 sender: WeakSender<CoreThreadCommand>,
211 tx_quorum_subscribers_exists: Arc<watch::Sender<bool>>,
212 tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<PropagationDelayAndQuorumRounds>>,
213 tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
214 highest_received_rounds: Arc<Vec<AtomicU32>>,
215}
216
217impl ChannelCoreThreadDispatcher {
218 pub(crate) fn start(
221 context: Arc<Context>,
222 dag_state: &RwLock<DagState>,
223 core: Core,
224 ) -> (Self, CoreThreadHandle) {
225 let highest_received_rounds = {
227 let dag_state = dag_state.read();
228 let highest_received_rounds = context
229 .committee
230 .authorities()
231 .map(|(index, _)| {
232 AtomicU32::new(dag_state.get_last_block_for_authority(index).round())
233 })
234 .collect();
235
236 highest_received_rounds
237 };
238 let (sender, receiver) =
239 channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
240 let (tx_quorum_subscribers_exists, mut rx_quorum_subscriber_exists) = watch::channel(false);
241 let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
242 watch::channel(PropagationDelayAndQuorumRounds {
243 delay: 0,
244 received_quorum_rounds: vec![(0, 0); context.committee.size()],
245 accepted_quorum_rounds: vec![(0, 0); context.committee.size()],
246 });
247 let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
248 rx_quorum_subscriber_exists.mark_unchanged();
249 rx_propagation_delay_and_quorum_rounds.mark_unchanged();
250 rx_last_known_proposed_round.mark_unchanged();
251 let core_thread = CoreThread {
252 core,
253 receiver,
254 rx_quorum_subscribers_exists: rx_quorum_subscriber_exists,
255 rx_propagation_delay_and_quorum_rounds,
256 rx_last_known_proposed_round,
257 context: context.clone(),
258 };
259
260 let join_handle = spawn_logged_monitored_task!(
261 async move {
262 if let Err(err) = core_thread.run().await {
263 if !matches!(err, ConsensusError::Shutdown) {
264 panic!("Fatal error occurred: {err}");
265 }
266 }
267 },
268 "ConsensusCoreThread"
269 );
270
271 let dispatcher = ChannelCoreThreadDispatcher {
275 context,
276 sender: sender.downgrade(),
277 tx_quorum_subscribers_exists: Arc::new(tx_quorum_subscribers_exists),
278 tx_propagation_delay_and_quorum_rounds: Arc::new(
279 tx_propagation_delay_and_quorum_rounds,
280 ),
281 tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
282 highest_received_rounds: Arc::new(highest_received_rounds),
283 };
284 let handle = CoreThreadHandle {
285 join_handle,
286 sender,
287 };
288 (dispatcher, handle)
289 }
290
291 async fn send(&self, command: CoreThreadCommand) {
292 self.context.metrics.node_metrics.core_lock_enqueued.inc();
293 if let Some(sender) = self.sender.upgrade() {
294 if let Err(err) = sender.send(command).await {
295 warn!(
296 "Couldn't send command to core thread, probably is shutting down: {}",
297 err
298 );
299 }
300 }
301 }
302}
303
304#[async_trait]
305impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
306 async fn add_blocks(
307 &self,
308 blocks: Vec<VerifiedBlock>,
309 ) -> Result<BTreeSet<BlockRef>, CoreError> {
310 for block in &blocks {
311 self.highest_received_rounds[block.author()].fetch_max(block.round(), Ordering::AcqRel);
312 }
313 let (sender, receiver) = oneshot::channel();
314 self.send(CoreThreadCommand::AddBlocks(blocks.clone(), sender))
315 .await;
316 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
317
318 Ok(missing_block_refs)
319 }
320
321 async fn check_block_refs(
322 &self,
323 block_refs: Vec<BlockRef>,
324 ) -> Result<BTreeSet<BlockRef>, CoreError> {
325 let (sender, receiver) = oneshot::channel();
326 self.send(CoreThreadCommand::CheckBlockRefs(
327 block_refs.clone(),
328 sender,
329 ))
330 .await;
331 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
332
333 Ok(missing_block_refs)
334 }
335
336 async fn add_certified_commits(
337 &self,
338 commits: CertifiedCommits,
339 ) -> Result<BTreeSet<BlockRef>, CoreError> {
340 for commit in commits.commits() {
341 for block in commit.blocks() {
342 self.highest_received_rounds[block.author()]
343 .fetch_max(block.round(), Ordering::AcqRel);
344 }
345 }
346 let (sender, receiver) = oneshot::channel();
347 self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
348 .await;
349 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
350 Ok(missing_block_refs)
351 }
352
353 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
354 let (sender, receiver) = oneshot::channel();
355 self.send(CoreThreadCommand::NewBlock(round, sender, force))
356 .await;
357 receiver.await.map_err(|e| Shutdown(e.to_string()))
358 }
359
360 async fn get_missing_blocks(
361 &self,
362 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
363 let (sender, receiver) = oneshot::channel();
364 self.send(CoreThreadCommand::GetMissingBlocks(sender)).await;
365 receiver.await.map_err(|e| Shutdown(e.to_string()))
366 }
367
368 fn set_quorum_subscribers_exists(&self, exists: bool) -> Result<(), CoreError> {
369 self.tx_quorum_subscribers_exists
370 .send(exists)
371 .map_err(|e| Shutdown(e.to_string()))
372 }
373
374 fn set_propagation_delay_and_quorum_rounds(
375 &self,
376 delay: Round,
377 received_quorum_rounds: Vec<QuorumRound>,
378 accepted_quorum_rounds: Vec<QuorumRound>,
379 ) -> Result<(), CoreError> {
380 self.tx_propagation_delay_and_quorum_rounds
381 .send(PropagationDelayAndQuorumRounds {
382 delay,
383 received_quorum_rounds,
384 accepted_quorum_rounds,
385 })
386 .map_err(|e| Shutdown(e.to_string()))
387 }
388
389 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
390 self.tx_last_known_proposed_round
391 .send(round)
392 .map_err(|e| Shutdown(e.to_string()))
393 }
394
395 fn highest_received_rounds(&self) -> Vec<Round> {
396 self.highest_received_rounds
397 .iter()
398 .map(|round| round.load(Ordering::Relaxed))
399 .collect()
400 }
401}
402
403#[derive(Clone)]
404struct PropagationDelayAndQuorumRounds {
405 delay: Round,
406 received_quorum_rounds: Vec<QuorumRound>,
407 accepted_quorum_rounds: Vec<QuorumRound>,
408}
409
410#[cfg(test)]
411pub(crate) mod tests {
412 use iota_metrics::monitored_mpsc::unbounded_channel;
413 use parking_lot::RwLock;
414
415 use super::*;
416 use crate::{
417 CommitConsumer,
418 block_manager::BlockManager,
419 block_verifier::NoopBlockVerifier,
420 commit_observer::CommitObserver,
421 context::Context,
422 core::CoreSignals,
423 dag_state::DagState,
424 leader_schedule::LeaderSchedule,
425 storage::mem_store::MemStore,
426 transaction::{TransactionClient, TransactionConsumer},
427 };
428
429 #[derive(Default)]
431 pub(crate) struct MockCoreThreadDispatcher {
432 add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
433 missing_blocks: parking_lot::Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
434 last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
435 }
436
437 impl MockCoreThreadDispatcher {
438 pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
439 let mut add_blocks = self.add_blocks.lock();
440 add_blocks.drain(0..).collect()
441 }
442
443 pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
444 let mut missing_blocks = self.missing_blocks.lock();
445 for block_ref in &block_refs {
446 missing_blocks.insert(*block_ref, BTreeSet::from([block_ref.author]));
447 }
448 }
449
450 pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
451 let last_known_proposed_round = self.last_known_proposed_round.lock();
452 last_known_proposed_round.clone()
453 }
454 }
455
456 #[async_trait]
457 impl CoreThreadDispatcher for MockCoreThreadDispatcher {
458 async fn add_blocks(
459 &self,
460 blocks: Vec<VerifiedBlock>,
461 ) -> Result<BTreeSet<BlockRef>, CoreError> {
462 let mut add_blocks = self.add_blocks.lock();
463 add_blocks.extend(blocks);
464 Ok(BTreeSet::new())
465 }
466
467 async fn check_block_refs(
468 &self,
469 _block_refs: Vec<BlockRef>,
470 ) -> Result<BTreeSet<BlockRef>, CoreError> {
471 Ok(BTreeSet::new())
472 }
473
474 async fn add_certified_commits(
475 &self,
476 _commits: CertifiedCommits,
477 ) -> Result<BTreeSet<BlockRef>, CoreError> {
478 todo!()
479 }
480
481 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
482 Ok(())
483 }
484
485 async fn get_missing_blocks(
486 &self,
487 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
488 let mut missing_blocks = self.missing_blocks.lock();
489 let result = missing_blocks.clone();
490 missing_blocks.clear();
491 Ok(result)
492 }
493
494 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
495 todo!()
496 }
497
498 fn set_propagation_delay_and_quorum_rounds(
499 &self,
500 _delay: Round,
501 _received_quorum_rounds: Vec<QuorumRound>,
502 _accepted_quorum_rounds: Vec<QuorumRound>,
503 ) -> Result<(), CoreError> {
504 todo!()
505 }
506
507 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
508 let mut last_known_proposed_round = self.last_known_proposed_round.lock();
509 last_known_proposed_round.push(round);
510 Ok(())
511 }
512
513 fn highest_received_rounds(&self) -> Vec<Round> {
514 todo!()
515 }
516 }
517
518 #[tokio::test]
519 async fn test_core_thread() {
520 telemetry_subscribers::init_for_testing();
521 let (context, mut key_pairs) = Context::new_for_test(4);
522 let context = Arc::new(context);
523 let store = Arc::new(MemStore::new());
524 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
525 let block_manager = BlockManager::new(
526 context.clone(),
527 dag_state.clone(),
528 Arc::new(NoopBlockVerifier),
529 );
530 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
531 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
532 let (signals, signal_receivers) = CoreSignals::new(context.clone());
533 let _block_receiver = signal_receivers.block_broadcast_receiver();
534 let (sender, _receiver) = unbounded_channel("consensus_output");
535 let leader_schedule = Arc::new(LeaderSchedule::from_store(
536 context.clone(),
537 dag_state.clone(),
538 ));
539 let commit_observer = CommitObserver::new(
540 context.clone(),
541 CommitConsumer::new(sender.clone(), 0),
542 dag_state.clone(),
543 store,
544 leader_schedule.clone(),
545 );
546 let leader_schedule = Arc::new(LeaderSchedule::from_store(
547 context.clone(),
548 dag_state.clone(),
549 ));
550 let core = Core::new(
551 context.clone(),
552 leader_schedule,
553 transaction_consumer,
554 block_manager,
555 true,
556 commit_observer,
557 signals,
558 key_pairs.remove(context.own_index.value()).1,
559 dag_state.clone(),
560 false,
561 );
562
563 let (core_dispatcher, handle) =
564 ChannelCoreThreadDispatcher::start(context, &dag_state, core);
565
566 let dispatcher_1 = core_dispatcher.clone();
568 let dispatcher_2 = core_dispatcher.clone();
569
570 assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
572 assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
573
574 handle.stop().await;
576
577 assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
579 assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
580 }
581}