consensus_core/
core_thread.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeSet,
7    fmt::Debug,
8    sync::{
9        Arc,
10        atomic::{AtomicU32, Ordering},
11    },
12};
13
14use async_trait::async_trait;
15use iota_metrics::{
16    monitored_mpsc::{Receiver, Sender, WeakSender, channel},
17    monitored_scope, spawn_logged_monitored_task,
18};
19use parking_lot::RwLock;
20use thiserror::Error;
21use tokio::sync::{oneshot, watch};
22use tracing::warn;
23
24use crate::{
25    BlockAPI as _,
26    block::{BlockRef, Round, VerifiedBlock},
27    commit::CertifiedCommits,
28    context::Context,
29    core::Core,
30    core_thread::CoreError::Shutdown,
31    dag_state::DagState,
32    error::{ConsensusError, ConsensusResult},
33    round_prober::QuorumRound,
34};
35
36const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
37
38enum CoreThreadCommand {
39    /// Add blocks to be processed and accepted
40    AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
41    /// Checks if block refs exist locally and sync missing ones.
42    CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
43    /// Add committed sub dag blocks for processing and acceptance.
44    AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
45    /// Called when the min round has passed or the leader timeout occurred and
46    /// a block should be produced. When the command is called with `force =
47    /// true`, then the block will be created for `round` skipping
48    /// any checks (ex leader existence of previous round). More information can
49    /// be found on the `Core` component.
50    NewBlock(Round, oneshot::Sender<()>, bool),
51    /// Request missing blocks that need to be synced.
52    GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
53}
54
55#[derive(Error, Debug)]
56pub enum CoreError {
57    #[error("Core thread shutdown: {0}")]
58    Shutdown(String),
59}
60
61/// The interface to dispatch commands to CoreThread and Core.
62/// Also this allows the easier mocking during unit tests.
63#[async_trait]
64pub trait CoreThreadDispatcher: Sync + Send + 'static {
65    async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
66    -> Result<BTreeSet<BlockRef>, CoreError>;
67
68    async fn check_block_refs(
69        &self,
70        block_refs: Vec<BlockRef>,
71    ) -> Result<BTreeSet<BlockRef>, CoreError>;
72
73    async fn add_certified_commits(
74        &self,
75        commits: CertifiedCommits,
76    ) -> Result<BTreeSet<BlockRef>, CoreError>;
77
78    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
79
80    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
81
82    /// Informs the core whether consumer of produced blocks exists.
83    /// This is only used by core to decide if it should propose new blocks.
84    /// It is not a guarantee that produced blocks will be accepted by peers.
85    fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError>;
86
87    /// Sets the estimated delay to propagate a block to a quorum of peers, in
88    /// number of rounds, and the received & accepted quorum rounds for all
89    /// authorities.
90    fn set_propagation_delay_and_quorum_rounds(
91        &self,
92        delay: Round,
93        received_quorum_rounds: Vec<QuorumRound>,
94        accepted_quorum_rounds: Vec<QuorumRound>,
95    ) -> Result<(), CoreError>;
96
97    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
98
99    /// Returns the highest round received for each authority by Core.
100    fn highest_received_rounds(&self) -> Vec<Round>;
101}
102
103pub(crate) struct CoreThreadHandle {
104    sender: Sender<CoreThreadCommand>,
105    join_handle: tokio::task::JoinHandle<()>,
106}
107
108impl CoreThreadHandle {
109    pub async fn stop(self) {
110        // drop the sender, that will force all the other weak senders to not able to
111        // upgrade.
112        drop(self.sender);
113        self.join_handle.await.ok();
114    }
115}
116
117struct CoreThread {
118    core: Core,
119    receiver: Receiver<CoreThreadCommand>,
120    rx_subscriber_exists: watch::Receiver<bool>,
121    rx_propagation_delay_and_quorum_rounds: watch::Receiver<PropagationDelayAndQuorumRounds>,
122    rx_last_known_proposed_round: watch::Receiver<Round>,
123    context: Arc<Context>,
124}
125
126impl CoreThread {
127    pub async fn run(mut self) -> ConsensusResult<()> {
128        tracing::debug!("Started core thread");
129
130        loop {
131            tokio::select! {
132                command = self.receiver.recv() => {
133                    let Some(command) = command else {
134                        break;
135                    };
136                    self.context.metrics.node_metrics.core_lock_dequeued.inc();
137                    match command {
138                        CoreThreadCommand::AddBlocks(blocks, sender) => {
139                            let _scope = monitored_scope("CoreThread::loop::add_blocks");
140                            let missing_block_refs = self.core.add_blocks(blocks)?;
141                            sender.send(missing_block_refs).ok();
142                        }
143                        CoreThreadCommand::CheckBlockRefs(blocks, sender) => {
144                            let _scope = monitored_scope("CoreThread::loop::find_excluded_blocks");
145                            let missing_block_refs = self.core.check_block_refs(blocks)?;
146                            sender.send(missing_block_refs).ok();
147                        }
148                        CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
149                            let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
150                            let missing_block_refs = self.core.add_certified_commits(commits)?;
151                            sender.send(missing_block_refs).ok();
152                        }
153                        CoreThreadCommand::NewBlock(round, sender, force) => {
154                            let _scope = monitored_scope("CoreThread::loop::new_block");
155                            self.core.new_block(round, force)?;
156                            sender.send(()).ok();
157                        }
158                        CoreThreadCommand::GetMissing(sender) => {
159                            let _scope = monitored_scope("CoreThread::loop::get_missing");
160                            sender.send(self.core.get_missing_blocks()).ok();
161                        }
162                    }
163                }
164                _ = self.rx_last_known_proposed_round.changed() => {
165                    let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
166                    let round = *self.rx_last_known_proposed_round.borrow();
167                    self.core.set_last_known_proposed_round(round);
168                    self.core.new_block(round + 1, true)?;
169                }
170                _ = self.rx_subscriber_exists.changed() => {
171                    let _scope = monitored_scope("CoreThread::loop::set_subscriber_exists");
172                    let should_propose_before = self.core.should_propose();
173                    let exists = *self.rx_subscriber_exists.borrow();
174                    self.core.set_subscriber_exists(exists);
175                    if !should_propose_before && self.core.should_propose() {
176                        // If core cannot propose before but can propose now, try to produce a new block to ensure liveness,
177                        // because block proposal could have been skipped.
178                        self.core.new_block(Round::MAX, true)?;
179                    }
180                }
181                _ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
182                    let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
183                    let should_propose_before = self.core.should_propose();
184                    let state = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
185                    self.core.set_propagation_delay_and_quorum_rounds(
186                        state.delay,
187                        state.received_quorum_rounds,
188                        state.accepted_quorum_rounds
189                    );
190                    if !should_propose_before && self.core.should_propose() {
191                        // If core cannot propose before but can propose now, try to produce a new block to ensure liveness,
192                        // because block proposal could have been skipped.
193                        self.core.new_block(Round::MAX, true)?;
194                    }
195                }
196            }
197        }
198
199        Ok(())
200    }
201}
202
203#[derive(Clone)]
204pub(crate) struct ChannelCoreThreadDispatcher {
205    context: Arc<Context>,
206    sender: WeakSender<CoreThreadCommand>,
207    tx_subscriber_exists: Arc<watch::Sender<bool>>,
208    tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<PropagationDelayAndQuorumRounds>>,
209    tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
210    highest_received_rounds: Arc<Vec<AtomicU32>>,
211}
212
213impl ChannelCoreThreadDispatcher {
214    /// Starts the core thread for the consensus authority and returns a
215    /// dispatcher and handle for managing the core thread.
216    pub(crate) fn start(
217        context: Arc<Context>,
218        dag_state: &RwLock<DagState>,
219        core: Core,
220    ) -> (Self, CoreThreadHandle) {
221        // Initialize highest received rounds.
222        let highest_received_rounds = {
223            let dag_state = dag_state.read();
224            let highest_received_rounds = context
225                .committee
226                .authorities()
227                .map(|(index, _)| {
228                    AtomicU32::new(dag_state.get_last_block_for_authority(index).round())
229                })
230                .collect();
231
232            highest_received_rounds
233        };
234        let (sender, receiver) =
235            channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
236        let (tx_subscriber_exists, mut rx_subscriber_exists) = watch::channel(false);
237        let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
238            watch::channel(PropagationDelayAndQuorumRounds {
239                delay: 0,
240                received_quorum_rounds: vec![(0, 0); context.committee.size()],
241                accepted_quorum_rounds: vec![(0, 0); context.committee.size()],
242            });
243        let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
244        rx_subscriber_exists.mark_unchanged();
245        rx_propagation_delay_and_quorum_rounds.mark_unchanged();
246        rx_last_known_proposed_round.mark_unchanged();
247        let core_thread = CoreThread {
248            core,
249            receiver,
250            rx_subscriber_exists,
251            rx_propagation_delay_and_quorum_rounds,
252            rx_last_known_proposed_round,
253            context: context.clone(),
254        };
255
256        let join_handle = spawn_logged_monitored_task!(
257            async move {
258                if let Err(err) = core_thread.run().await {
259                    if !matches!(err, ConsensusError::Shutdown) {
260                        panic!("Fatal error occurred: {err}");
261                    }
262                }
263            },
264            "ConsensusCoreThread"
265        );
266
267        // Explicitly using downgraded sender in order to allow sharing the
268        // CoreThreadDispatcher but able to shutdown the CoreThread by dropping
269        // the original sender.
270        let dispatcher = ChannelCoreThreadDispatcher {
271            context,
272            sender: sender.downgrade(),
273            tx_subscriber_exists: Arc::new(tx_subscriber_exists),
274            tx_propagation_delay_and_quorum_rounds: Arc::new(
275                tx_propagation_delay_and_quorum_rounds,
276            ),
277            tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
278            highest_received_rounds: Arc::new(highest_received_rounds),
279        };
280        let handle = CoreThreadHandle {
281            join_handle,
282            sender,
283        };
284        (dispatcher, handle)
285    }
286
287    async fn send(&self, command: CoreThreadCommand) {
288        self.context.metrics.node_metrics.core_lock_enqueued.inc();
289        if let Some(sender) = self.sender.upgrade() {
290            if let Err(err) = sender.send(command).await {
291                warn!(
292                    "Couldn't send command to core thread, probably is shutting down: {}",
293                    err
294                );
295            }
296        }
297    }
298}
299
300#[async_trait]
301impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
302    async fn add_blocks(
303        &self,
304        blocks: Vec<VerifiedBlock>,
305    ) -> Result<BTreeSet<BlockRef>, CoreError> {
306        for block in &blocks {
307            self.highest_received_rounds[block.author()].fetch_max(block.round(), Ordering::AcqRel);
308        }
309        let (sender, receiver) = oneshot::channel();
310        self.send(CoreThreadCommand::AddBlocks(blocks.clone(), sender))
311            .await;
312        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
313
314        Ok(missing_block_refs)
315    }
316
317    async fn check_block_refs(
318        &self,
319        block_refs: Vec<BlockRef>,
320    ) -> Result<BTreeSet<BlockRef>, CoreError> {
321        let (sender, receiver) = oneshot::channel();
322        self.send(CoreThreadCommand::CheckBlockRefs(
323            block_refs.clone(),
324            sender,
325        ))
326        .await;
327        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
328
329        Ok(missing_block_refs)
330    }
331
332    async fn add_certified_commits(
333        &self,
334        commits: CertifiedCommits,
335    ) -> Result<BTreeSet<BlockRef>, CoreError> {
336        for commit in commits.commits() {
337            for block in commit.blocks() {
338                self.highest_received_rounds[block.author()]
339                    .fetch_max(block.round(), Ordering::AcqRel);
340            }
341        }
342        let (sender, receiver) = oneshot::channel();
343        self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
344            .await;
345        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
346        Ok(missing_block_refs)
347    }
348
349    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
350        let (sender, receiver) = oneshot::channel();
351        self.send(CoreThreadCommand::NewBlock(round, sender, force))
352            .await;
353        receiver.await.map_err(|e| Shutdown(e.to_string()))
354    }
355
356    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
357        let (sender, receiver) = oneshot::channel();
358        self.send(CoreThreadCommand::GetMissing(sender)).await;
359        receiver.await.map_err(|e| Shutdown(e.to_string()))
360    }
361
362    fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError> {
363        self.tx_subscriber_exists
364            .send(exists)
365            .map_err(|e| Shutdown(e.to_string()))
366    }
367
368    fn set_propagation_delay_and_quorum_rounds(
369        &self,
370        delay: Round,
371        received_quorum_rounds: Vec<QuorumRound>,
372        accepted_quorum_rounds: Vec<QuorumRound>,
373    ) -> Result<(), CoreError> {
374        self.tx_propagation_delay_and_quorum_rounds
375            .send(PropagationDelayAndQuorumRounds {
376                delay,
377                received_quorum_rounds,
378                accepted_quorum_rounds,
379            })
380            .map_err(|e| Shutdown(e.to_string()))
381    }
382
383    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
384        self.tx_last_known_proposed_round
385            .send(round)
386            .map_err(|e| Shutdown(e.to_string()))
387    }
388
389    fn highest_received_rounds(&self) -> Vec<Round> {
390        self.highest_received_rounds
391            .iter()
392            .map(|round| round.load(Ordering::Relaxed))
393            .collect()
394    }
395}
396
397#[derive(Clone)]
398struct PropagationDelayAndQuorumRounds {
399    delay: Round,
400    received_quorum_rounds: Vec<QuorumRound>,
401    accepted_quorum_rounds: Vec<QuorumRound>,
402}
403
404#[cfg(test)]
405pub(crate) mod tests {
406    use iota_metrics::monitored_mpsc::unbounded_channel;
407    use parking_lot::RwLock;
408
409    use super::*;
410    use crate::{
411        CommitConsumer,
412        block_manager::BlockManager,
413        block_verifier::NoopBlockVerifier,
414        commit_observer::CommitObserver,
415        context::Context,
416        core::CoreSignals,
417        dag_state::DagState,
418        leader_schedule::LeaderSchedule,
419        storage::mem_store::MemStore,
420        transaction::{TransactionClient, TransactionConsumer},
421    };
422
423    // TODO: complete the Mock for thread dispatcher to be used from several tests
424    #[derive(Default)]
425    pub(crate) struct MockCoreThreadDispatcher {
426        add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
427        missing_blocks: parking_lot::Mutex<BTreeSet<BlockRef>>,
428        last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
429    }
430
431    impl MockCoreThreadDispatcher {
432        pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
433            let mut add_blocks = self.add_blocks.lock();
434            add_blocks.drain(0..).collect()
435        }
436
437        pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
438            let mut missing_blocks = self.missing_blocks.lock();
439            missing_blocks.extend(block_refs);
440        }
441
442        pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
443            let last_known_proposed_round = self.last_known_proposed_round.lock();
444            last_known_proposed_round.clone()
445        }
446    }
447
448    #[async_trait]
449    impl CoreThreadDispatcher for MockCoreThreadDispatcher {
450        async fn add_blocks(
451            &self,
452            blocks: Vec<VerifiedBlock>,
453        ) -> Result<BTreeSet<BlockRef>, CoreError> {
454            let mut add_blocks = self.add_blocks.lock();
455            add_blocks.extend(blocks);
456            Ok(BTreeSet::new())
457        }
458
459        async fn check_block_refs(
460            &self,
461            _block_refs: Vec<BlockRef>,
462        ) -> Result<BTreeSet<BlockRef>, CoreError> {
463            Ok(BTreeSet::new())
464        }
465
466        async fn add_certified_commits(
467            &self,
468            _commits: CertifiedCommits,
469        ) -> Result<BTreeSet<BlockRef>, CoreError> {
470            todo!()
471        }
472
473        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
474            Ok(())
475        }
476
477        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
478            let mut missing_blocks = self.missing_blocks.lock();
479            let result = missing_blocks.clone();
480            missing_blocks.clear();
481            Ok(result)
482        }
483
484        fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
485            todo!()
486        }
487
488        fn set_propagation_delay_and_quorum_rounds(
489            &self,
490            _delay: Round,
491            _received_quorum_rounds: Vec<QuorumRound>,
492            _accepted_quorum_rounds: Vec<QuorumRound>,
493        ) -> Result<(), CoreError> {
494            todo!()
495        }
496
497        fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
498            let mut last_known_proposed_round = self.last_known_proposed_round.lock();
499            last_known_proposed_round.push(round);
500            Ok(())
501        }
502
503        fn highest_received_rounds(&self) -> Vec<Round> {
504            todo!()
505        }
506    }
507
508    #[tokio::test]
509    async fn test_core_thread() {
510        telemetry_subscribers::init_for_testing();
511        let (context, mut key_pairs) = Context::new_for_test(4);
512        let context = Arc::new(context);
513        let store = Arc::new(MemStore::new());
514        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
515        let block_manager = BlockManager::new(
516            context.clone(),
517            dag_state.clone(),
518            Arc::new(NoopBlockVerifier),
519        );
520        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
521        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
522        let (signals, signal_receivers) = CoreSignals::new(context.clone());
523        let _block_receiver = signal_receivers.block_broadcast_receiver();
524        let (sender, _receiver) = unbounded_channel("consensus_output");
525        let leader_schedule = Arc::new(LeaderSchedule::from_store(
526            context.clone(),
527            dag_state.clone(),
528        ));
529        let commit_observer = CommitObserver::new(
530            context.clone(),
531            CommitConsumer::new(sender.clone(), 0),
532            dag_state.clone(),
533            store,
534            leader_schedule.clone(),
535        );
536        let leader_schedule = Arc::new(LeaderSchedule::from_store(
537            context.clone(),
538            dag_state.clone(),
539        ));
540        let core = Core::new(
541            context.clone(),
542            leader_schedule,
543            transaction_consumer,
544            block_manager,
545            true,
546            commit_observer,
547            signals,
548            key_pairs.remove(context.own_index.value()).1,
549            dag_state.clone(),
550            false,
551        );
552
553        let (core_dispatcher, handle) =
554            ChannelCoreThreadDispatcher::start(context, &dag_state, core);
555
556        // Now create some clones of the dispatcher
557        let dispatcher_1 = core_dispatcher.clone();
558        let dispatcher_2 = core_dispatcher.clone();
559
560        // Try to send some commands
561        assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
562        assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
563
564        // Now shutdown the dispatcher
565        handle.stop().await;
566
567        // Try to send some commands
568        assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
569        assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
570    }
571}