consensus_core/
core_thread.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    fmt::Debug,
8    sync::{
9        Arc,
10        atomic::{AtomicU32, Ordering},
11    },
12};
13
14use async_trait::async_trait;
15use consensus_config::AuthorityIndex;
16use iota_metrics::{
17    monitored_mpsc::{Receiver, Sender, WeakSender, channel},
18    monitored_scope, spawn_logged_monitored_task,
19};
20use parking_lot::RwLock;
21use thiserror::Error;
22use tokio::sync::{oneshot, watch};
23use tracing::warn;
24
25use crate::{
26    BlockAPI as _,
27    block::{BlockRef, Round, VerifiedBlock},
28    commit::CertifiedCommits,
29    context::Context,
30    core::Core,
31    core_thread::CoreError::Shutdown,
32    dag_state::DagState,
33    error::{ConsensusError, ConsensusResult},
34    round_prober::QuorumRound,
35};
36
37const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
38
39enum CoreThreadCommand {
40    /// Add blocks to be processed and accepted
41    AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
42    /// Checks if block refs exist locally and sync missing ones.
43    CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
44    /// Add committed sub dag blocks for processing and acceptance.
45    AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
46    /// Called when the min round has passed or the leader timeout occurred and
47    /// a block should be produced. When the command is called with `force =
48    /// true`, then the block will be created for `round` skipping
49    /// any checks (ex leader existence of previous round). More information can
50    /// be found on the `Core` component.
51    NewBlock(Round, oneshot::Sender<()>, bool),
52    /// Request missing blocks that need to be synced together with authorities
53    /// that have these blocks.
54    GetMissingBlocks(oneshot::Sender<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>),
55}
56
57#[derive(Error, Debug)]
58pub enum CoreError {
59    #[error("Core thread shutdown: {0}")]
60    Shutdown(String),
61}
62
63/// The interface to dispatch commands to CoreThread and Core.
64/// Also this allows the easier mocking during unit tests.
65#[async_trait]
66pub trait CoreThreadDispatcher: Sync + Send + 'static {
67    async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
68    -> Result<BTreeSet<BlockRef>, CoreError>;
69
70    async fn check_block_refs(
71        &self,
72        block_refs: Vec<BlockRef>,
73    ) -> Result<BTreeSet<BlockRef>, CoreError>;
74
75    async fn add_certified_commits(
76        &self,
77        commits: CertifiedCommits,
78    ) -> Result<BTreeSet<BlockRef>, CoreError>;
79
80    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
81
82    async fn get_missing_blocks(
83        &self,
84    ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError>;
85
86    /// Informs the core whether consumer of produced blocks exists.
87    /// This is only used by core to decide if it should propose new blocks.
88    /// It is not a guarantee that produced blocks will be accepted by peers.
89    fn set_quorum_subscribers_exists(&self, exists: bool) -> Result<(), CoreError>;
90
91    /// Sets the estimated delay to propagate a block to a quorum of peers, in
92    /// number of rounds, and the received & accepted quorum rounds for all
93    /// authorities.
94    fn set_propagation_delay_and_quorum_rounds(
95        &self,
96        delay: Round,
97        received_quorum_rounds: Vec<QuorumRound>,
98        accepted_quorum_rounds: Vec<QuorumRound>,
99    ) -> Result<(), CoreError>;
100
101    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
102
103    /// Returns the highest round received for each authority by Core.
104    fn highest_received_rounds(&self) -> Vec<Round>;
105}
106
107pub(crate) struct CoreThreadHandle {
108    sender: Sender<CoreThreadCommand>,
109    join_handle: tokio::task::JoinHandle<()>,
110}
111
112impl CoreThreadHandle {
113    pub async fn stop(self) {
114        // drop the sender, that will force all the other weak senders to not able to
115        // upgrade.
116        drop(self.sender);
117        self.join_handle.await.ok();
118    }
119}
120
121struct CoreThread {
122    core: Core,
123    receiver: Receiver<CoreThreadCommand>,
124    rx_quorum_subscribers_exists: watch::Receiver<bool>,
125    rx_propagation_delay_and_quorum_rounds: watch::Receiver<PropagationDelayAndQuorumRounds>,
126    rx_last_known_proposed_round: watch::Receiver<Round>,
127    context: Arc<Context>,
128}
129
130impl CoreThread {
131    pub async fn run(mut self) -> ConsensusResult<()> {
132        tracing::debug!("Started core thread");
133
134        loop {
135            tokio::select! {
136                command = self.receiver.recv() => {
137                    let Some(command) = command else {
138                        break;
139                    };
140                    self.context.metrics.node_metrics.core_lock_dequeued.inc();
141                    match command {
142                        CoreThreadCommand::AddBlocks(blocks, sender) => {
143                            let _scope = monitored_scope("CoreThread::loop::add_blocks");
144                            let missing_block_refs = self.core.add_blocks(blocks)?;
145                            sender.send(missing_block_refs).ok();
146                        }
147                        CoreThreadCommand::CheckBlockRefs(blocks, sender) => {
148                            let _scope = monitored_scope("CoreThread::loop::find_excluded_blocks");
149                            let missing_block_refs = self.core.check_block_refs(blocks)?;
150                            sender.send(missing_block_refs).ok();
151                        }
152                        CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
153                            let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
154                            let missing_block_refs = self.core.add_certified_commits(commits)?;
155                            sender.send(missing_block_refs).ok();
156                        }
157                        CoreThreadCommand::NewBlock(round, sender, force) => {
158                            let _scope = monitored_scope("CoreThread::loop::new_block");
159                            self.core.new_block(round, force)?;
160                            sender.send(()).ok();
161                        }
162                        CoreThreadCommand::GetMissingBlocks(sender) => {
163                            let _scope = monitored_scope("CoreThread::loop::get_missing_blocks");
164                            sender.send(self.core.get_missing_blocks()).ok();
165                        }
166                    }
167                }
168                _ = self.rx_last_known_proposed_round.changed() => {
169                    let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
170                    let round = *self.rx_last_known_proposed_round.borrow();
171                    self.core.set_last_known_proposed_round(round);
172                    self.core.new_block(round + 1, true)?;
173                }
174                _ = self.rx_quorum_subscribers_exists.changed() => {
175                    let _scope = monitored_scope("CoreThread::loop::set_subscriber_exists");
176                    let should_propose_before = self.core.should_propose();
177                    let exists = *self.rx_quorum_subscribers_exists.borrow();
178                    self.core.set_quorum_subscribers_exists(exists);
179                    if !should_propose_before && self.core.should_propose() {
180                        // If core cannot propose before but can propose now, try to produce a new block to ensure liveness,
181                        // because block proposal could have been skipped.
182                        self.core.new_block(Round::MAX, true)?;
183                    }
184                }
185                _ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
186                    let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
187                    let should_propose_before = self.core.should_propose();
188                    let state = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
189                    self.core.set_propagation_delay_and_quorum_rounds(
190                        state.delay,
191                        state.received_quorum_rounds,
192                        state.accepted_quorum_rounds
193                    );
194                    if !should_propose_before && self.core.should_propose() {
195                        // If core cannot propose before but can propose now, try to produce a new block to ensure liveness,
196                        // because block proposal could have been skipped.
197                        self.core.new_block(Round::MAX, true)?;
198                    }
199                }
200            }
201        }
202
203        Ok(())
204    }
205}
206
207#[derive(Clone)]
208pub(crate) struct ChannelCoreThreadDispatcher {
209    context: Arc<Context>,
210    sender: WeakSender<CoreThreadCommand>,
211    tx_quorum_subscribers_exists: Arc<watch::Sender<bool>>,
212    tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<PropagationDelayAndQuorumRounds>>,
213    tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
214    highest_received_rounds: Arc<Vec<AtomicU32>>,
215}
216
217impl ChannelCoreThreadDispatcher {
218    /// Starts the core thread for the consensus authority and returns a
219    /// dispatcher and handle for managing the core thread.
220    pub(crate) fn start(
221        context: Arc<Context>,
222        dag_state: &RwLock<DagState>,
223        core: Core,
224    ) -> (Self, CoreThreadHandle) {
225        // Initialize highest received rounds.
226        let highest_received_rounds = {
227            let dag_state = dag_state.read();
228            let highest_received_rounds = context
229                .committee
230                .authorities()
231                .map(|(index, _)| {
232                    AtomicU32::new(dag_state.get_last_block_for_authority(index).round())
233                })
234                .collect();
235
236            highest_received_rounds
237        };
238        let (sender, receiver) =
239            channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
240        let (tx_quorum_subscribers_exists, mut rx_quorum_subscriber_exists) = watch::channel(false);
241        let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
242            watch::channel(PropagationDelayAndQuorumRounds {
243                delay: 0,
244                received_quorum_rounds: vec![(0, 0); context.committee.size()],
245                accepted_quorum_rounds: vec![(0, 0); context.committee.size()],
246            });
247        let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
248        rx_quorum_subscriber_exists.mark_unchanged();
249        rx_propagation_delay_and_quorum_rounds.mark_unchanged();
250        rx_last_known_proposed_round.mark_unchanged();
251        let core_thread = CoreThread {
252            core,
253            receiver,
254            rx_quorum_subscribers_exists: rx_quorum_subscriber_exists,
255            rx_propagation_delay_and_quorum_rounds,
256            rx_last_known_proposed_round,
257            context: context.clone(),
258        };
259
260        let join_handle = spawn_logged_monitored_task!(
261            async move {
262                if let Err(err) = core_thread.run().await {
263                    if !matches!(err, ConsensusError::Shutdown) {
264                        panic!("Fatal error occurred: {err}");
265                    }
266                }
267            },
268            "ConsensusCoreThread"
269        );
270
271        // Explicitly using downgraded sender in order to allow sharing the
272        // CoreThreadDispatcher but able to shutdown the CoreThread by dropping
273        // the original sender.
274        let dispatcher = ChannelCoreThreadDispatcher {
275            context,
276            sender: sender.downgrade(),
277            tx_quorum_subscribers_exists: Arc::new(tx_quorum_subscribers_exists),
278            tx_propagation_delay_and_quorum_rounds: Arc::new(
279                tx_propagation_delay_and_quorum_rounds,
280            ),
281            tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
282            highest_received_rounds: Arc::new(highest_received_rounds),
283        };
284        let handle = CoreThreadHandle {
285            join_handle,
286            sender,
287        };
288        (dispatcher, handle)
289    }
290
291    async fn send(&self, command: CoreThreadCommand) {
292        self.context.metrics.node_metrics.core_lock_enqueued.inc();
293        if let Some(sender) = self.sender.upgrade() {
294            if let Err(err) = sender.send(command).await {
295                warn!(
296                    "Couldn't send command to core thread, probably is shutting down: {}",
297                    err
298                );
299            }
300        }
301    }
302}
303
304#[async_trait]
305impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
306    async fn add_blocks(
307        &self,
308        blocks: Vec<VerifiedBlock>,
309    ) -> Result<BTreeSet<BlockRef>, CoreError> {
310        for block in &blocks {
311            self.highest_received_rounds[block.author()].fetch_max(block.round(), Ordering::AcqRel);
312        }
313        let (sender, receiver) = oneshot::channel();
314        self.send(CoreThreadCommand::AddBlocks(blocks.clone(), sender))
315            .await;
316        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
317
318        Ok(missing_block_refs)
319    }
320
321    async fn check_block_refs(
322        &self,
323        block_refs: Vec<BlockRef>,
324    ) -> Result<BTreeSet<BlockRef>, CoreError> {
325        let (sender, receiver) = oneshot::channel();
326        self.send(CoreThreadCommand::CheckBlockRefs(
327            block_refs.clone(),
328            sender,
329        ))
330        .await;
331        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
332
333        Ok(missing_block_refs)
334    }
335
336    async fn add_certified_commits(
337        &self,
338        commits: CertifiedCommits,
339    ) -> Result<BTreeSet<BlockRef>, CoreError> {
340        for commit in commits.commits() {
341            for block in commit.blocks() {
342                self.highest_received_rounds[block.author()]
343                    .fetch_max(block.round(), Ordering::AcqRel);
344            }
345        }
346        let (sender, receiver) = oneshot::channel();
347        self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
348            .await;
349        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
350        Ok(missing_block_refs)
351    }
352
353    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
354        let (sender, receiver) = oneshot::channel();
355        self.send(CoreThreadCommand::NewBlock(round, sender, force))
356            .await;
357        receiver.await.map_err(|e| Shutdown(e.to_string()))
358    }
359
360    async fn get_missing_blocks(
361        &self,
362    ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
363        let (sender, receiver) = oneshot::channel();
364        self.send(CoreThreadCommand::GetMissingBlocks(sender)).await;
365        receiver.await.map_err(|e| Shutdown(e.to_string()))
366    }
367
368    fn set_quorum_subscribers_exists(&self, exists: bool) -> Result<(), CoreError> {
369        self.tx_quorum_subscribers_exists
370            .send(exists)
371            .map_err(|e| Shutdown(e.to_string()))
372    }
373
374    fn set_propagation_delay_and_quorum_rounds(
375        &self,
376        delay: Round,
377        received_quorum_rounds: Vec<QuorumRound>,
378        accepted_quorum_rounds: Vec<QuorumRound>,
379    ) -> Result<(), CoreError> {
380        self.tx_propagation_delay_and_quorum_rounds
381            .send(PropagationDelayAndQuorumRounds {
382                delay,
383                received_quorum_rounds,
384                accepted_quorum_rounds,
385            })
386            .map_err(|e| Shutdown(e.to_string()))
387    }
388
389    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
390        self.tx_last_known_proposed_round
391            .send(round)
392            .map_err(|e| Shutdown(e.to_string()))
393    }
394
395    fn highest_received_rounds(&self) -> Vec<Round> {
396        self.highest_received_rounds
397            .iter()
398            .map(|round| round.load(Ordering::Relaxed))
399            .collect()
400    }
401}
402
403#[derive(Clone)]
404struct PropagationDelayAndQuorumRounds {
405    delay: Round,
406    received_quorum_rounds: Vec<QuorumRound>,
407    accepted_quorum_rounds: Vec<QuorumRound>,
408}
409
410#[cfg(test)]
411pub(crate) mod tests {
412    use iota_metrics::monitored_mpsc::unbounded_channel;
413    use parking_lot::RwLock;
414
415    use super::*;
416    use crate::{
417        CommitConsumer,
418        block_manager::BlockManager,
419        block_verifier::NoopBlockVerifier,
420        commit_observer::CommitObserver,
421        context::Context,
422        core::CoreSignals,
423        dag_state::DagState,
424        leader_schedule::LeaderSchedule,
425        storage::mem_store::MemStore,
426        transaction::{TransactionClient, TransactionConsumer},
427    };
428
429    // TODO: complete the Mock for thread dispatcher to be used from several tests
430    #[derive(Default)]
431    pub(crate) struct MockCoreThreadDispatcher {
432        add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
433        missing_blocks: parking_lot::Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
434        last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
435    }
436
437    impl MockCoreThreadDispatcher {
438        pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
439            let mut add_blocks = self.add_blocks.lock();
440            add_blocks.drain(0..).collect()
441        }
442
443        pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
444            let mut missing_blocks = self.missing_blocks.lock();
445            for block_ref in &block_refs {
446                missing_blocks.insert(*block_ref, BTreeSet::from([block_ref.author]));
447            }
448        }
449
450        pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
451            let last_known_proposed_round = self.last_known_proposed_round.lock();
452            last_known_proposed_round.clone()
453        }
454    }
455
456    #[async_trait]
457    impl CoreThreadDispatcher for MockCoreThreadDispatcher {
458        async fn add_blocks(
459            &self,
460            blocks: Vec<VerifiedBlock>,
461        ) -> Result<BTreeSet<BlockRef>, CoreError> {
462            let mut add_blocks = self.add_blocks.lock();
463            add_blocks.extend(blocks);
464            Ok(BTreeSet::new())
465        }
466
467        async fn check_block_refs(
468            &self,
469            _block_refs: Vec<BlockRef>,
470        ) -> Result<BTreeSet<BlockRef>, CoreError> {
471            Ok(BTreeSet::new())
472        }
473
474        async fn add_certified_commits(
475            &self,
476            _commits: CertifiedCommits,
477        ) -> Result<BTreeSet<BlockRef>, CoreError> {
478            todo!()
479        }
480
481        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
482            Ok(())
483        }
484
485        async fn get_missing_blocks(
486            &self,
487        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
488            let mut missing_blocks = self.missing_blocks.lock();
489            let result = missing_blocks.clone();
490            missing_blocks.clear();
491            Ok(result)
492        }
493
494        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
495            todo!()
496        }
497
498        fn set_propagation_delay_and_quorum_rounds(
499            &self,
500            _delay: Round,
501            _received_quorum_rounds: Vec<QuorumRound>,
502            _accepted_quorum_rounds: Vec<QuorumRound>,
503        ) -> Result<(), CoreError> {
504            todo!()
505        }
506
507        fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
508            let mut last_known_proposed_round = self.last_known_proposed_round.lock();
509            last_known_proposed_round.push(round);
510            Ok(())
511        }
512
513        fn highest_received_rounds(&self) -> Vec<Round> {
514            todo!()
515        }
516    }
517
518    #[tokio::test]
519    async fn test_core_thread() {
520        telemetry_subscribers::init_for_testing();
521        let (context, mut key_pairs) = Context::new_for_test(4);
522        let context = Arc::new(context);
523        let store = Arc::new(MemStore::new());
524        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
525        let block_manager = BlockManager::new(
526            context.clone(),
527            dag_state.clone(),
528            Arc::new(NoopBlockVerifier),
529        );
530        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
531        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
532        let (signals, signal_receivers) = CoreSignals::new(context.clone());
533        let _block_receiver = signal_receivers.block_broadcast_receiver();
534        let (sender, _receiver) = unbounded_channel("consensus_output");
535        let leader_schedule = Arc::new(LeaderSchedule::from_store(
536            context.clone(),
537            dag_state.clone(),
538        ));
539        let commit_observer = CommitObserver::new(
540            context.clone(),
541            CommitConsumer::new(sender.clone(), 0),
542            dag_state.clone(),
543            store,
544            leader_schedule.clone(),
545        );
546        let leader_schedule = Arc::new(LeaderSchedule::from_store(
547            context.clone(),
548            dag_state.clone(),
549        ));
550        let core = Core::new(
551            context.clone(),
552            leader_schedule,
553            transaction_consumer,
554            block_manager,
555            true,
556            commit_observer,
557            signals,
558            key_pairs.remove(context.own_index.value()).1,
559            dag_state.clone(),
560            false,
561        );
562
563        let (core_dispatcher, handle) =
564            ChannelCoreThreadDispatcher::start(context, &dag_state, core);
565
566        // Now create some clones of the dispatcher
567        let dispatcher_1 = core_dispatcher.clone();
568        let dispatcher_2 = core_dispatcher.clone();
569
570        // Try to send some commands
571        assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
572        assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
573
574        // Now shutdown the dispatcher
575        handle.stop().await;
576
577        // Try to send some commands
578        assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
579        assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
580    }
581}