consensus_core/
base_committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, fmt::Display, sync::Arc};
6
7use consensus_config::{AuthorityIndex, Stake};
8use parking_lot::RwLock;
9use tracing::warn;
10
11use crate::{
12    block::{BlockAPI, BlockRef, Round, Slot, VerifiedBlock},
13    commit::{DEFAULT_WAVE_LENGTH, LeaderStatus, MINIMUM_WAVE_LENGTH, WaveNumber},
14    context::Context,
15    dag_state::DagState,
16    leader_schedule::LeaderSchedule,
17    stake_aggregator::{QuorumThreshold, StakeAggregator},
18};
19
20#[cfg(test)]
21#[path = "tests/base_committer_tests.rs"]
22mod base_committer_tests;
23
24#[cfg(test)]
25#[path = "tests/base_committer_declarative_tests.rs"]
26mod base_committer_declarative_tests;
27
28pub(crate) struct BaseCommitterOptions {
29    /// TODO: Re-evaluate if we want this to be configurable after running
30    /// experiments. The length of a wave (minimum 3)
31    pub wave_length: u32,
32    /// The offset used in the leader-election protocol. This is used by the
33    /// multi-committer to ensure that each [`BaseCommitter`] instance elects
34    /// a different leader.
35    pub leader_offset: u32,
36    /// The offset of the first wave. This is used by the pipelined committer to
37    /// ensure that each[`BaseCommitter`] instances operates on a different
38    /// view of the dag.
39    pub round_offset: u32,
40}
41
42impl Default for BaseCommitterOptions {
43    fn default() -> Self {
44        Self {
45            wave_length: DEFAULT_WAVE_LENGTH,
46            leader_offset: 0,
47            round_offset: 0,
48        }
49    }
50}
51
52/// The [`BaseCommitter`] contains the bare bone commit logic. Once
53/// instantiated, the method `try_direct_decide` and `try_indirect_decide` can
54/// be called at any time and any number of times (it is idempotent) to
55/// determine whether a leader can be committed or skipped.
56pub(crate) struct BaseCommitter {
57    /// The per-epoch configuration of this authority.
58    context: Arc<Context>,
59    /// The consensus leader schedule to be used to resolve the leader for a
60    /// given round.
61    leader_schedule: Arc<LeaderSchedule>,
62    /// In memory block store representing the dag state
63    dag_state: Arc<RwLock<DagState>>,
64    /// The options used by this committer
65    options: BaseCommitterOptions,
66}
67
68impl BaseCommitter {
69    pub fn new(
70        context: Arc<Context>,
71        leader_schedule: Arc<LeaderSchedule>,
72        dag_state: Arc<RwLock<DagState>>,
73        options: BaseCommitterOptions,
74    ) -> Self {
75        assert!(options.wave_length >= MINIMUM_WAVE_LENGTH);
76        Self {
77            context,
78            leader_schedule,
79            dag_state,
80            options,
81        }
82    }
83
84    /// Apply the direct decision rule to the specified leader to see whether we
85    /// can direct-commit or direct-skip it.
86    #[tracing::instrument(skip_all, fields(leader = %leader))]
87    pub fn try_direct_decide(&self, leader: Slot) -> LeaderStatus {
88        // Check whether the leader has enough blame. That is, whether there are 2f+1
89        // non-votes for that leader (which ensure there will never be a
90        // certificate for that leader).
91        let voting_round = leader.round + 1;
92        if self.enough_leader_blame(voting_round, leader.authority) {
93            return LeaderStatus::Skip(leader);
94        }
95
96        // Check whether the leader(s) has enough support. That is, whether there are
97        // 2f+1 certificates over the leader. Note that there could be more than
98        // one leader block (created by Byzantine leaders).
99        let wave = self.wave_number(leader.round);
100        let decision_round = self.decision_round(wave);
101        let leader_blocks = self.dag_state.read().get_uncommitted_blocks_at_slot(leader);
102        let mut leaders_with_enough_support: Vec<_> = leader_blocks
103            .into_iter()
104            .filter(|l| self.enough_leader_support(decision_round, l))
105            .map(LeaderStatus::Commit)
106            .collect();
107
108        // There can be at most one leader with enough support for each round, otherwise
109        // it means the BFT assumption is broken.
110        if leaders_with_enough_support.len() > 1 {
111            panic!("[{self}] More than one certified block for {leader}")
112        }
113
114        leaders_with_enough_support
115            .pop()
116            .unwrap_or(LeaderStatus::Undecided(leader))
117    }
118
119    /// Apply the indirect decision rule to the specified leader to see whether
120    /// we can indirect-commit or indirect-skip it.
121    #[tracing::instrument(skip_all, fields(leader = %leader_slot))]
122    pub fn try_indirect_decide<'a>(
123        &self,
124        leader_slot: Slot,
125        leaders: impl Iterator<Item = &'a LeaderStatus>,
126    ) -> LeaderStatus {
127        // The anchor is the first committed leader with round higher than the decision
128        // round of the target leader. We must stop the iteration upon
129        // encountering an undecided leader.
130        let anchors = leaders.filter(|x| leader_slot.round + self.options.wave_length <= x.round());
131
132        for anchor in anchors {
133            tracing::trace!(
134                "[{self}] Trying to indirect-decide {leader_slot} using anchor {anchor}",
135            );
136            match anchor {
137                LeaderStatus::Commit(anchor) => {
138                    return self.decide_leader_from_anchor(anchor, leader_slot);
139                }
140                LeaderStatus::Skip(..) => (),
141                LeaderStatus::Undecided(..) => break,
142            }
143        }
144
145        LeaderStatus::Undecided(leader_slot)
146    }
147
148    pub fn elect_leader(&self, round: Round) -> Option<Slot> {
149        let wave = self.wave_number(round);
150        tracing::trace!(
151            "elect_leader: round={}, wave={}, leader_round={}, leader_offset={}",
152            round,
153            wave,
154            self.leader_round(wave),
155            self.options.leader_offset
156        );
157        if self.leader_round(wave) != round {
158            return None;
159        }
160
161        Some(Slot::new(
162            round,
163            self.leader_schedule
164                .elect_leader(round, self.options.leader_offset),
165        ))
166    }
167
168    /// Return the leader round of the specified wave. The leader round is
169    /// always the first round of the wave. This takes into account round
170    /// offset for when pipelining is enabled.
171    pub(crate) fn leader_round(&self, wave: WaveNumber) -> Round {
172        (wave * self.options.wave_length) + self.options.round_offset
173    }
174
175    /// Return the decision round of the specified wave. The decision round is
176    /// always the last round of the wave. This takes into account round offset
177    /// for when pipelining is enabled.
178    pub(crate) fn decision_round(&self, wave: WaveNumber) -> Round {
179        let wave_length = self.options.wave_length;
180        (wave * wave_length) + wave_length - 1 + self.options.round_offset
181    }
182
183    /// Return the wave in which the specified round belongs. This takes into
184    /// account the round offset for when pipelining is enabled.
185    pub(crate) fn wave_number(&self, round: Round) -> WaveNumber {
186        round.saturating_sub(self.options.round_offset) / self.options.wave_length
187    }
188
189    /// Find which block is supported at a slot (author, round) by the given
190    /// block. Blocks can indirectly reference multiple other blocks at a
191    /// slot, but only one block at a slot will be supported by the given
192    /// block. If block A supports B at a slot, it is guaranteed that any
193    /// processed block by the same author that directly or indirectly
194    /// includes A will also support B at that slot.
195    fn find_supported_block(&self, leader_slot: Slot, from: &VerifiedBlock) -> Option<BlockRef> {
196        if from.round() < leader_slot.round {
197            return None;
198        }
199        for ancestor in from.ancestors() {
200            if Slot::from(*ancestor) == leader_slot {
201                return Some(*ancestor);
202            }
203            // Weak links may point to blocks with lower round numbers than strong links.
204            if ancestor.round <= leader_slot.round {
205                continue;
206            }
207            let ancestor = self
208                .dag_state
209                .read()
210                .get_block(ancestor)
211                .unwrap_or_else(|| panic!("Block not found in storage: {:?}", ancestor));
212            if let Some(support) = self.find_supported_block(leader_slot, &ancestor) {
213                return Some(support);
214            }
215        }
216        None
217    }
218
219    /// Check whether the specified block (`potential_vote`) is a vote for
220    /// the specified leader (`leader_block`).
221    fn is_vote(&self, potential_vote: &VerifiedBlock, leader_block: &VerifiedBlock) -> bool {
222        let reference = leader_block.reference();
223        let leader_slot = Slot::from(reference);
224        self.find_supported_block(leader_slot, potential_vote) == Some(reference)
225    }
226
227    /// Check whether the specified block (`potential_certificate`) is a
228    /// certificate for the specified leader (`leader_block`). An
229    /// `all_votes` map can be provided as a cache to quickly skip checking
230    /// against the block store on whether a reference is a vote. This is
231    /// done for efficiency. Bear in mind that the `all_votes` should refer
232    /// to votes considered to the same `leader_block` and it can't be
233    /// reused for different leaders.
234    fn is_certificate(
235        &self,
236        potential_certificate: &VerifiedBlock,
237        leader_block: &VerifiedBlock,
238        all_votes: &mut HashMap<BlockRef, bool>,
239    ) -> bool {
240        let (gc_enabled, gc_round) = {
241            let dag_state = self.dag_state.read();
242            (dag_state.gc_enabled(), dag_state.gc_round())
243        };
244
245        let mut votes_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
246        for reference in potential_certificate.ancestors() {
247            let is_vote = if let Some(is_vote) = all_votes.get(reference) {
248                *is_vote
249            } else {
250                let potential_vote = self.dag_state.read().get_block(reference);
251
252                let is_vote = if gc_enabled {
253                    if let Some(potential_vote) = potential_vote {
254                        self.is_vote(&potential_vote, leader_block)
255                    } else {
256                        assert!(
257                            reference.round <= gc_round,
258                            "Block not found in storage: {:?} , and is not below gc_round: {gc_round}",
259                            reference
260                        );
261                        false
262                    }
263                } else {
264                    let potential_vote = potential_vote
265                        .unwrap_or_else(|| panic!("Block not found in storage: {:?}", reference));
266                    self.is_vote(&potential_vote, leader_block)
267                };
268
269                all_votes.insert(*reference, is_vote);
270                is_vote
271            };
272
273            if is_vote {
274                tracing::trace!("[{self}] {reference} is a vote for {leader_block}");
275                if votes_stake_aggregator.add(reference.author, &self.context.committee) {
276                    tracing::trace!(
277                        "[{self}] {potential_certificate} is a certificate for leader {leader_block}"
278                    );
279                    return true;
280                }
281            } else {
282                tracing::trace!("[{self}] {reference} is not a vote for {leader_block}",);
283            }
284        }
285        tracing::trace!(
286            "[{self}] {potential_certificate} is not a certificate for leader {leader_block}"
287        );
288        false
289    }
290
291    /// Decide the status of a target leader from the specified anchor. We
292    /// commit the target leader if it has a certified link to the anchor.
293    /// Otherwise, we skip the target leader.
294    fn decide_leader_from_anchor(&self, anchor: &VerifiedBlock, leader_slot: Slot) -> LeaderStatus {
295        // Get the block(s) proposed by the leader. There could be more than one leader
296        // block in the slot from a Byzantine authority.
297        let leader_blocks = self
298            .dag_state
299            .read()
300            .get_uncommitted_blocks_at_slot(leader_slot);
301
302        // TODO: Re-evaluate this check once we have a better way to handle/track
303        // byzantine authorities.
304        if leader_blocks.len() > 1 {
305            tracing::warn!(
306                "Multiple blocks found for leader slot {leader_slot}: {:?}",
307                leader_blocks
308            );
309        }
310
311        // Get all blocks that could be potential certificates for the target leader.
312        // These blocks are in the decision round of the target leader and are
313        // linked to the anchor.
314        let wave = self.wave_number(leader_slot.round);
315        let decision_round = self.decision_round(wave);
316        let potential_certificates = self
317            .dag_state
318            .read()
319            .ancestors_at_round(anchor, decision_round);
320
321        // Use those potential certificates to determine which (if any) of the target
322        // leader blocks can be committed.
323        let mut certified_leader_blocks: Vec<_> = leader_blocks
324            .into_iter()
325            .filter(|leader_block| {
326                let mut all_votes = HashMap::new();
327                potential_certificates.iter().any(|potential_certificate| {
328                    self.is_certificate(potential_certificate, leader_block, &mut all_votes)
329                })
330            })
331            .collect();
332
333        // There can be at most one certified leader, otherwise it means the BFT
334        // assumption is broken.
335        if certified_leader_blocks.len() > 1 {
336            panic!("More than one certified block at wave {wave} from leader {leader_slot}")
337        }
338
339        // We commit the target leader if it has a certificate that is an ancestor of
340        // the anchor. Otherwise skip it.
341        match certified_leader_blocks.pop() {
342            Some(certified_leader_block) => LeaderStatus::Commit(certified_leader_block),
343            None => LeaderStatus::Skip(leader_slot),
344        }
345    }
346
347    /// Check whether the specified leader has 2f+1 non-votes (blames) to be
348    /// directly skipped.
349    fn enough_leader_blame(&self, voting_round: Round, leader: AuthorityIndex) -> bool {
350        let voting_blocks = self
351            .dag_state
352            .read()
353            .get_uncommitted_blocks_at_round(voting_round);
354
355        let mut blame_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
356        for voting_block in &voting_blocks {
357            let voter = voting_block.reference().author;
358            if voting_block
359                .ancestors()
360                .iter()
361                .all(|ancestor| ancestor.author != leader)
362            {
363                tracing::trace!(
364                    "[{self}] {voting_block} is a blame for leader {}",
365                    Slot::new(voting_round - 1, leader)
366                );
367                if blame_stake_aggregator.add(voter, &self.context.committee) {
368                    return true;
369                }
370            } else {
371                tracing::trace!(
372                    "[{self}] {voting_block} is not a blame for leader {}",
373                    Slot::new(voting_round - 1, leader)
374                );
375            }
376        }
377        false
378    }
379
380    /// Check whether the specified leader has 2f+1 certificates to be directly
381    /// committed.
382    fn enough_leader_support(&self, decision_round: Round, leader_block: &VerifiedBlock) -> bool {
383        let decision_blocks = self
384            .dag_state
385            .read()
386            .get_uncommitted_blocks_at_round(decision_round);
387
388        // Quickly reject if there isn't enough stake to support the leader from
389        // the potential certificates.
390        let total_stake: Stake = decision_blocks
391            .iter()
392            .map(|b| self.context.committee.stake(b.author()))
393            .sum();
394        if !self.context.committee.reached_quorum(total_stake) {
395            tracing::debug!(
396                "Not enough support for {leader_block}. Stake not enough: {total_stake} < {}",
397                self.context.committee.quorum_threshold()
398            );
399            return false;
400        }
401
402        let mut certificate_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
403        let mut all_votes = HashMap::new();
404        for decision_block in &decision_blocks {
405            let authority = decision_block.reference().author;
406            if self.is_certificate(decision_block, leader_block, &mut all_votes)
407                && certificate_stake_aggregator.add(authority, &self.context.committee)
408            {
409                return true;
410            }
411        }
412        false
413    }
414}
415
416impl Display for BaseCommitter {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        write!(
419            f,
420            "Committer-L{}-R{}",
421            self.options.leader_offset, self.options.round_offset
422        )
423    }
424}
425
426/// A builder for the base committer. By default, the builder creates a base
427/// committer that has no leader or round offset. Which indicates single leader
428/// & pipelining disabled.
429#[cfg(test)]
430mod base_committer_builder {
431    use super::*;
432    use crate::leader_schedule::LeaderSwapTable;
433
434    pub(crate) struct BaseCommitterBuilder {
435        context: Arc<Context>,
436        dag_state: Arc<RwLock<DagState>>,
437        wave_length: u32,
438        leader_offset: u32,
439        round_offset: u32,
440    }
441
442    impl BaseCommitterBuilder {
443        pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
444            Self {
445                context,
446                dag_state,
447                wave_length: DEFAULT_WAVE_LENGTH,
448                leader_offset: 0,
449                round_offset: 0,
450            }
451        }
452
453        #[expect(unused)]
454        pub(crate) fn with_wave_length(mut self, wave_length: u32) -> Self {
455            self.wave_length = wave_length;
456            self
457        }
458
459        #[expect(unused)]
460        pub(crate) fn with_leader_offset(mut self, leader_offset: u32) -> Self {
461            self.leader_offset = leader_offset;
462            self
463        }
464
465        #[expect(unused)]
466        pub(crate) fn with_round_offset(mut self, round_offset: u32) -> Self {
467            self.round_offset = round_offset;
468            self
469        }
470
471        pub(crate) fn build(self) -> BaseCommitter {
472            let options = BaseCommitterOptions {
473                wave_length: self.wave_length,
474                leader_offset: self.leader_offset,
475                round_offset: self.round_offset,
476            };
477            BaseCommitter::new(
478                self.context.clone(),
479                Arc::new(LeaderSchedule::new(
480                    self.context,
481                    LeaderSwapTable::default(),
482                )),
483                self.dag_state,
484                options,
485            )
486        }
487    }
488}