1use std::{
6 collections::BTreeSet,
7 fmt::Debug,
8 sync::{
9 Arc,
10 atomic::{AtomicU32, Ordering},
11 },
12};
13
14use async_trait::async_trait;
15use iota_metrics::{
16 monitored_mpsc::{Receiver, Sender, WeakSender, channel},
17 monitored_scope, spawn_logged_monitored_task,
18};
19use parking_lot::RwLock;
20use thiserror::Error;
21use tokio::sync::{oneshot, watch};
22use tracing::warn;
23
24use crate::{
25 BlockAPI as _,
26 block::{BlockRef, Round, VerifiedBlock},
27 commit::CertifiedCommits,
28 context::Context,
29 core::Core,
30 core_thread::CoreError::Shutdown,
31 dag_state::DagState,
32 error::{ConsensusError, ConsensusResult},
33 round_prober::QuorumRound,
34};
35
36const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
37
38enum CoreThreadCommand {
39 AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
41 CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
43 AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
45 NewBlock(Round, oneshot::Sender<()>, bool),
51 GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
53}
54
55#[derive(Error, Debug)]
56pub enum CoreError {
57 #[error("Core thread shutdown: {0}")]
58 Shutdown(String),
59}
60
61#[async_trait]
64pub trait CoreThreadDispatcher: Sync + Send + 'static {
65 async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
66 -> Result<BTreeSet<BlockRef>, CoreError>;
67
68 async fn check_block_refs(
69 &self,
70 block_refs: Vec<BlockRef>,
71 ) -> Result<BTreeSet<BlockRef>, CoreError>;
72
73 async fn add_certified_commits(
74 &self,
75 commits: CertifiedCommits,
76 ) -> Result<BTreeSet<BlockRef>, CoreError>;
77
78 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
79
80 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
81
82 fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError>;
86
87 fn set_propagation_delay_and_quorum_rounds(
91 &self,
92 delay: Round,
93 received_quorum_rounds: Vec<QuorumRound>,
94 accepted_quorum_rounds: Vec<QuorumRound>,
95 ) -> Result<(), CoreError>;
96
97 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
98
99 fn highest_received_rounds(&self) -> Vec<Round>;
101}
102
103pub(crate) struct CoreThreadHandle {
104 sender: Sender<CoreThreadCommand>,
105 join_handle: tokio::task::JoinHandle<()>,
106}
107
108impl CoreThreadHandle {
109 pub async fn stop(self) {
110 drop(self.sender);
113 self.join_handle.await.ok();
114 }
115}
116
117struct CoreThread {
118 core: Core,
119 receiver: Receiver<CoreThreadCommand>,
120 rx_subscriber_exists: watch::Receiver<bool>,
121 rx_propagation_delay_and_quorum_rounds: watch::Receiver<PropagationDelayAndQuorumRounds>,
122 rx_last_known_proposed_round: watch::Receiver<Round>,
123 context: Arc<Context>,
124}
125
126impl CoreThread {
127 pub async fn run(mut self) -> ConsensusResult<()> {
128 tracing::debug!("Started core thread");
129
130 loop {
131 tokio::select! {
132 command = self.receiver.recv() => {
133 let Some(command) = command else {
134 break;
135 };
136 self.context.metrics.node_metrics.core_lock_dequeued.inc();
137 match command {
138 CoreThreadCommand::AddBlocks(blocks, sender) => {
139 let _scope = monitored_scope("CoreThread::loop::add_blocks");
140 let missing_block_refs = self.core.add_blocks(blocks)?;
141 sender.send(missing_block_refs).ok();
142 }
143 CoreThreadCommand::CheckBlockRefs(blocks, sender) => {
144 let _scope = monitored_scope("CoreThread::loop::find_excluded_blocks");
145 let missing_block_refs = self.core.check_block_refs(blocks)?;
146 sender.send(missing_block_refs).ok();
147 }
148 CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
149 let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
150 let missing_block_refs = self.core.add_certified_commits(commits)?;
151 sender.send(missing_block_refs).ok();
152 }
153 CoreThreadCommand::NewBlock(round, sender, force) => {
154 let _scope = monitored_scope("CoreThread::loop::new_block");
155 self.core.new_block(round, force)?;
156 sender.send(()).ok();
157 }
158 CoreThreadCommand::GetMissing(sender) => {
159 let _scope = monitored_scope("CoreThread::loop::get_missing");
160 sender.send(self.core.get_missing_blocks()).ok();
161 }
162 }
163 }
164 _ = self.rx_last_known_proposed_round.changed() => {
165 let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
166 let round = *self.rx_last_known_proposed_round.borrow();
167 self.core.set_last_known_proposed_round(round);
168 self.core.new_block(round + 1, true)?;
169 }
170 _ = self.rx_subscriber_exists.changed() => {
171 let _scope = monitored_scope("CoreThread::loop::set_subscriber_exists");
172 let should_propose_before = self.core.should_propose();
173 let exists = *self.rx_subscriber_exists.borrow();
174 self.core.set_subscriber_exists(exists);
175 if !should_propose_before && self.core.should_propose() {
176 self.core.new_block(Round::MAX, true)?;
179 }
180 }
181 _ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
182 let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
183 let should_propose_before = self.core.should_propose();
184 let state = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
185 self.core.set_propagation_delay_and_quorum_rounds(
186 state.delay,
187 state.received_quorum_rounds,
188 state.accepted_quorum_rounds
189 );
190 if !should_propose_before && self.core.should_propose() {
191 self.core.new_block(Round::MAX, true)?;
194 }
195 }
196 }
197 }
198
199 Ok(())
200 }
201}
202
203#[derive(Clone)]
204pub(crate) struct ChannelCoreThreadDispatcher {
205 context: Arc<Context>,
206 sender: WeakSender<CoreThreadCommand>,
207 tx_subscriber_exists: Arc<watch::Sender<bool>>,
208 tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<PropagationDelayAndQuorumRounds>>,
209 tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
210 highest_received_rounds: Arc<Vec<AtomicU32>>,
211}
212
213impl ChannelCoreThreadDispatcher {
214 pub(crate) fn start(
217 context: Arc<Context>,
218 dag_state: &RwLock<DagState>,
219 core: Core,
220 ) -> (Self, CoreThreadHandle) {
221 let highest_received_rounds = {
223 let dag_state = dag_state.read();
224 let highest_received_rounds = context
225 .committee
226 .authorities()
227 .map(|(index, _)| {
228 AtomicU32::new(dag_state.get_last_block_for_authority(index).round())
229 })
230 .collect();
231
232 highest_received_rounds
233 };
234 let (sender, receiver) =
235 channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
236 let (tx_subscriber_exists, mut rx_subscriber_exists) = watch::channel(false);
237 let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
238 watch::channel(PropagationDelayAndQuorumRounds {
239 delay: 0,
240 received_quorum_rounds: vec![(0, 0); context.committee.size()],
241 accepted_quorum_rounds: vec![(0, 0); context.committee.size()],
242 });
243 let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
244 rx_subscriber_exists.mark_unchanged();
245 rx_propagation_delay_and_quorum_rounds.mark_unchanged();
246 rx_last_known_proposed_round.mark_unchanged();
247 let core_thread = CoreThread {
248 core,
249 receiver,
250 rx_subscriber_exists,
251 rx_propagation_delay_and_quorum_rounds,
252 rx_last_known_proposed_round,
253 context: context.clone(),
254 };
255
256 let join_handle = spawn_logged_monitored_task!(
257 async move {
258 if let Err(err) = core_thread.run().await {
259 if !matches!(err, ConsensusError::Shutdown) {
260 panic!("Fatal error occurred: {err}");
261 }
262 }
263 },
264 "ConsensusCoreThread"
265 );
266
267 let dispatcher = ChannelCoreThreadDispatcher {
271 context,
272 sender: sender.downgrade(),
273 tx_subscriber_exists: Arc::new(tx_subscriber_exists),
274 tx_propagation_delay_and_quorum_rounds: Arc::new(
275 tx_propagation_delay_and_quorum_rounds,
276 ),
277 tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
278 highest_received_rounds: Arc::new(highest_received_rounds),
279 };
280 let handle = CoreThreadHandle {
281 join_handle,
282 sender,
283 };
284 (dispatcher, handle)
285 }
286
287 async fn send(&self, command: CoreThreadCommand) {
288 self.context.metrics.node_metrics.core_lock_enqueued.inc();
289 if let Some(sender) = self.sender.upgrade() {
290 if let Err(err) = sender.send(command).await {
291 warn!(
292 "Couldn't send command to core thread, probably is shutting down: {}",
293 err
294 );
295 }
296 }
297 }
298}
299
300#[async_trait]
301impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
302 async fn add_blocks(
303 &self,
304 blocks: Vec<VerifiedBlock>,
305 ) -> Result<BTreeSet<BlockRef>, CoreError> {
306 for block in &blocks {
307 self.highest_received_rounds[block.author()].fetch_max(block.round(), Ordering::AcqRel);
308 }
309 let (sender, receiver) = oneshot::channel();
310 self.send(CoreThreadCommand::AddBlocks(blocks.clone(), sender))
311 .await;
312 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
313
314 Ok(missing_block_refs)
315 }
316
317 async fn check_block_refs(
318 &self,
319 block_refs: Vec<BlockRef>,
320 ) -> Result<BTreeSet<BlockRef>, CoreError> {
321 let (sender, receiver) = oneshot::channel();
322 self.send(CoreThreadCommand::CheckBlockRefs(
323 block_refs.clone(),
324 sender,
325 ))
326 .await;
327 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
328
329 Ok(missing_block_refs)
330 }
331
332 async fn add_certified_commits(
333 &self,
334 commits: CertifiedCommits,
335 ) -> Result<BTreeSet<BlockRef>, CoreError> {
336 for commit in commits.commits() {
337 for block in commit.blocks() {
338 self.highest_received_rounds[block.author()]
339 .fetch_max(block.round(), Ordering::AcqRel);
340 }
341 }
342 let (sender, receiver) = oneshot::channel();
343 self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
344 .await;
345 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
346 Ok(missing_block_refs)
347 }
348
349 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
350 let (sender, receiver) = oneshot::channel();
351 self.send(CoreThreadCommand::NewBlock(round, sender, force))
352 .await;
353 receiver.await.map_err(|e| Shutdown(e.to_string()))
354 }
355
356 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
357 let (sender, receiver) = oneshot::channel();
358 self.send(CoreThreadCommand::GetMissing(sender)).await;
359 receiver.await.map_err(|e| Shutdown(e.to_string()))
360 }
361
362 fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError> {
363 self.tx_subscriber_exists
364 .send(exists)
365 .map_err(|e| Shutdown(e.to_string()))
366 }
367
368 fn set_propagation_delay_and_quorum_rounds(
369 &self,
370 delay: Round,
371 received_quorum_rounds: Vec<QuorumRound>,
372 accepted_quorum_rounds: Vec<QuorumRound>,
373 ) -> Result<(), CoreError> {
374 self.tx_propagation_delay_and_quorum_rounds
375 .send(PropagationDelayAndQuorumRounds {
376 delay,
377 received_quorum_rounds,
378 accepted_quorum_rounds,
379 })
380 .map_err(|e| Shutdown(e.to_string()))
381 }
382
383 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
384 self.tx_last_known_proposed_round
385 .send(round)
386 .map_err(|e| Shutdown(e.to_string()))
387 }
388
389 fn highest_received_rounds(&self) -> Vec<Round> {
390 self.highest_received_rounds
391 .iter()
392 .map(|round| round.load(Ordering::Relaxed))
393 .collect()
394 }
395}
396
397#[derive(Clone)]
398struct PropagationDelayAndQuorumRounds {
399 delay: Round,
400 received_quorum_rounds: Vec<QuorumRound>,
401 accepted_quorum_rounds: Vec<QuorumRound>,
402}
403
404#[cfg(test)]
405pub(crate) mod tests {
406 use iota_metrics::monitored_mpsc::unbounded_channel;
407 use parking_lot::RwLock;
408
409 use super::*;
410 use crate::{
411 CommitConsumer,
412 block_manager::BlockManager,
413 block_verifier::NoopBlockVerifier,
414 commit_observer::CommitObserver,
415 context::Context,
416 core::CoreSignals,
417 dag_state::DagState,
418 leader_schedule::LeaderSchedule,
419 storage::mem_store::MemStore,
420 transaction::{TransactionClient, TransactionConsumer},
421 };
422
423 #[derive(Default)]
425 pub(crate) struct MockCoreThreadDispatcher {
426 add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
427 missing_blocks: parking_lot::Mutex<BTreeSet<BlockRef>>,
428 last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
429 }
430
431 impl MockCoreThreadDispatcher {
432 pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
433 let mut add_blocks = self.add_blocks.lock();
434 add_blocks.drain(0..).collect()
435 }
436
437 pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
438 let mut missing_blocks = self.missing_blocks.lock();
439 missing_blocks.extend(block_refs);
440 }
441
442 pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
443 let last_known_proposed_round = self.last_known_proposed_round.lock();
444 last_known_proposed_round.clone()
445 }
446 }
447
448 #[async_trait]
449 impl CoreThreadDispatcher for MockCoreThreadDispatcher {
450 async fn add_blocks(
451 &self,
452 blocks: Vec<VerifiedBlock>,
453 ) -> Result<BTreeSet<BlockRef>, CoreError> {
454 let mut add_blocks = self.add_blocks.lock();
455 add_blocks.extend(blocks);
456 Ok(BTreeSet::new())
457 }
458
459 async fn check_block_refs(
460 &self,
461 _block_refs: Vec<BlockRef>,
462 ) -> Result<BTreeSet<BlockRef>, CoreError> {
463 Ok(BTreeSet::new())
464 }
465
466 async fn add_certified_commits(
467 &self,
468 _commits: CertifiedCommits,
469 ) -> Result<BTreeSet<BlockRef>, CoreError> {
470 todo!()
471 }
472
473 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
474 Ok(())
475 }
476
477 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
478 let mut missing_blocks = self.missing_blocks.lock();
479 let result = missing_blocks.clone();
480 missing_blocks.clear();
481 Ok(result)
482 }
483
484 fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
485 todo!()
486 }
487
488 fn set_propagation_delay_and_quorum_rounds(
489 &self,
490 _delay: Round,
491 _received_quorum_rounds: Vec<QuorumRound>,
492 _accepted_quorum_rounds: Vec<QuorumRound>,
493 ) -> Result<(), CoreError> {
494 todo!()
495 }
496
497 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
498 let mut last_known_proposed_round = self.last_known_proposed_round.lock();
499 last_known_proposed_round.push(round);
500 Ok(())
501 }
502
503 fn highest_received_rounds(&self) -> Vec<Round> {
504 todo!()
505 }
506 }
507
508 #[tokio::test]
509 async fn test_core_thread() {
510 telemetry_subscribers::init_for_testing();
511 let (context, mut key_pairs) = Context::new_for_test(4);
512 let context = Arc::new(context);
513 let store = Arc::new(MemStore::new());
514 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
515 let block_manager = BlockManager::new(
516 context.clone(),
517 dag_state.clone(),
518 Arc::new(NoopBlockVerifier),
519 );
520 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
521 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
522 let (signals, signal_receivers) = CoreSignals::new(context.clone());
523 let _block_receiver = signal_receivers.block_broadcast_receiver();
524 let (sender, _receiver) = unbounded_channel("consensus_output");
525 let leader_schedule = Arc::new(LeaderSchedule::from_store(
526 context.clone(),
527 dag_state.clone(),
528 ));
529 let commit_observer = CommitObserver::new(
530 context.clone(),
531 CommitConsumer::new(sender.clone(), 0),
532 dag_state.clone(),
533 store,
534 leader_schedule.clone(),
535 );
536 let leader_schedule = Arc::new(LeaderSchedule::from_store(
537 context.clone(),
538 dag_state.clone(),
539 ));
540 let core = Core::new(
541 context.clone(),
542 leader_schedule,
543 transaction_consumer,
544 block_manager,
545 true,
546 commit_observer,
547 signals,
548 key_pairs.remove(context.own_index.value()).1,
549 dag_state.clone(),
550 false,
551 );
552
553 let (core_dispatcher, handle) =
554 ChannelCoreThreadDispatcher::start(context, &dag_state, core);
555
556 let dispatcher_1 = core_dispatcher.clone();
558 let dispatcher_2 = core_dispatcher.clone();
559
560 assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
562 assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
563
564 handle.stop().await;
566
567 assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
569 assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
570 }
571}