1use std::{collections::HashMap, fmt::Display, sync::Arc};
6
7use consensus_config::{AuthorityIndex, Stake};
8use parking_lot::RwLock;
9use tracing::warn;
10
11use crate::{
12 block::{BlockAPI, BlockRef, Round, Slot, VerifiedBlock},
13 commit::{DEFAULT_WAVE_LENGTH, LeaderStatus, MINIMUM_WAVE_LENGTH, WaveNumber},
14 context::Context,
15 dag_state::DagState,
16 leader_schedule::LeaderSchedule,
17 stake_aggregator::{QuorumThreshold, StakeAggregator},
18};
19
20#[cfg(test)]
21#[path = "tests/base_committer_tests.rs"]
22mod base_committer_tests;
23
24#[cfg(test)]
25#[path = "tests/base_committer_declarative_tests.rs"]
26mod base_committer_declarative_tests;
27
28pub(crate) struct BaseCommitterOptions {
29 pub wave_length: u32,
32 pub leader_offset: u32,
36 pub round_offset: u32,
40}
41
42impl Default for BaseCommitterOptions {
43 fn default() -> Self {
44 Self {
45 wave_length: DEFAULT_WAVE_LENGTH,
46 leader_offset: 0,
47 round_offset: 0,
48 }
49 }
50}
51
52pub(crate) struct BaseCommitter {
57 context: Arc<Context>,
59 leader_schedule: Arc<LeaderSchedule>,
62 dag_state: Arc<RwLock<DagState>>,
64 options: BaseCommitterOptions,
66}
67
68impl BaseCommitter {
69 pub fn new(
70 context: Arc<Context>,
71 leader_schedule: Arc<LeaderSchedule>,
72 dag_state: Arc<RwLock<DagState>>,
73 options: BaseCommitterOptions,
74 ) -> Self {
75 assert!(options.wave_length >= MINIMUM_WAVE_LENGTH);
76 Self {
77 context,
78 leader_schedule,
79 dag_state,
80 options,
81 }
82 }
83
84 #[tracing::instrument(skip_all, fields(leader = %leader))]
87 pub fn try_direct_decide(&self, leader: Slot) -> LeaderStatus {
88 let voting_round = leader.round + 1;
92 if self.enough_leader_blame(voting_round, leader.authority) {
93 return LeaderStatus::Skip(leader);
94 }
95
96 let wave = self.wave_number(leader.round);
100 let decision_round = self.decision_round(wave);
101 let leader_blocks = self.dag_state.read().get_uncommitted_blocks_at_slot(leader);
102 let mut leaders_with_enough_support: Vec<_> = leader_blocks
103 .into_iter()
104 .filter(|l| self.enough_leader_support(decision_round, l))
105 .map(LeaderStatus::Commit)
106 .collect();
107
108 if leaders_with_enough_support.len() > 1 {
111 panic!("[{self}] More than one certified block for {leader}")
112 }
113
114 leaders_with_enough_support
115 .pop()
116 .unwrap_or(LeaderStatus::Undecided(leader))
117 }
118
119 #[tracing::instrument(skip_all, fields(leader = %leader_slot))]
122 pub fn try_indirect_decide<'a>(
123 &self,
124 leader_slot: Slot,
125 leaders: impl Iterator<Item = &'a LeaderStatus>,
126 ) -> LeaderStatus {
127 let anchors = leaders.filter(|x| leader_slot.round + self.options.wave_length <= x.round());
131
132 for anchor in anchors {
133 tracing::trace!(
134 "[{self}] Trying to indirect-decide {leader_slot} using anchor {anchor}",
135 );
136 match anchor {
137 LeaderStatus::Commit(anchor) => {
138 return self.decide_leader_from_anchor(anchor, leader_slot);
139 }
140 LeaderStatus::Skip(..) => (),
141 LeaderStatus::Undecided(..) => break,
142 }
143 }
144
145 LeaderStatus::Undecided(leader_slot)
146 }
147
148 pub fn elect_leader(&self, round: Round) -> Option<Slot> {
149 let wave = self.wave_number(round);
150 tracing::trace!(
151 "elect_leader: round={}, wave={}, leader_round={}, leader_offset={}",
152 round,
153 wave,
154 self.leader_round(wave),
155 self.options.leader_offset
156 );
157 if self.leader_round(wave) != round {
158 return None;
159 }
160
161 Some(Slot::new(
162 round,
163 self.leader_schedule
164 .elect_leader(round, self.options.leader_offset),
165 ))
166 }
167
168 pub(crate) fn leader_round(&self, wave: WaveNumber) -> Round {
172 (wave * self.options.wave_length) + self.options.round_offset
173 }
174
175 pub(crate) fn decision_round(&self, wave: WaveNumber) -> Round {
179 let wave_length = self.options.wave_length;
180 (wave * wave_length) + wave_length - 1 + self.options.round_offset
181 }
182
183 pub(crate) fn wave_number(&self, round: Round) -> WaveNumber {
186 round.saturating_sub(self.options.round_offset) / self.options.wave_length
187 }
188
189 fn find_supported_block(&self, leader_slot: Slot, from: &VerifiedBlock) -> Option<BlockRef> {
196 if from.round() < leader_slot.round {
197 return None;
198 }
199 for ancestor in from.ancestors() {
200 if Slot::from(*ancestor) == leader_slot {
201 return Some(*ancestor);
202 }
203 if ancestor.round <= leader_slot.round {
205 continue;
206 }
207 let ancestor = self
208 .dag_state
209 .read()
210 .get_block(ancestor)
211 .unwrap_or_else(|| panic!("Block not found in storage: {:?}", ancestor));
212 if let Some(support) = self.find_supported_block(leader_slot, &ancestor) {
213 return Some(support);
214 }
215 }
216 None
217 }
218
219 fn is_vote(&self, potential_vote: &VerifiedBlock, leader_block: &VerifiedBlock) -> bool {
222 let reference = leader_block.reference();
223 let leader_slot = Slot::from(reference);
224 self.find_supported_block(leader_slot, potential_vote) == Some(reference)
225 }
226
227 fn is_certificate(
235 &self,
236 potential_certificate: &VerifiedBlock,
237 leader_block: &VerifiedBlock,
238 all_votes: &mut HashMap<BlockRef, bool>,
239 ) -> bool {
240 let (gc_enabled, gc_round) = {
241 let dag_state = self.dag_state.read();
242 (dag_state.gc_enabled(), dag_state.gc_round())
243 };
244
245 let mut votes_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
246 for reference in potential_certificate.ancestors() {
247 let is_vote = if let Some(is_vote) = all_votes.get(reference) {
248 *is_vote
249 } else {
250 let potential_vote = self.dag_state.read().get_block(reference);
251
252 let is_vote = if gc_enabled {
253 if let Some(potential_vote) = potential_vote {
254 self.is_vote(&potential_vote, leader_block)
255 } else {
256 assert!(
257 reference.round <= gc_round,
258 "Block not found in storage: {:?} , and is not below gc_round: {gc_round}",
259 reference
260 );
261 false
262 }
263 } else {
264 let potential_vote = potential_vote
265 .unwrap_or_else(|| panic!("Block not found in storage: {:?}", reference));
266 self.is_vote(&potential_vote, leader_block)
267 };
268
269 all_votes.insert(*reference, is_vote);
270 is_vote
271 };
272
273 if is_vote {
274 tracing::trace!("[{self}] {reference} is a vote for {leader_block}");
275 if votes_stake_aggregator.add(reference.author, &self.context.committee) {
276 tracing::trace!(
277 "[{self}] {potential_certificate} is a certificate for leader {leader_block}"
278 );
279 return true;
280 }
281 } else {
282 tracing::trace!("[{self}] {reference} is not a vote for {leader_block}",);
283 }
284 }
285 tracing::trace!(
286 "[{self}] {potential_certificate} is not a certificate for leader {leader_block}"
287 );
288 false
289 }
290
291 fn decide_leader_from_anchor(&self, anchor: &VerifiedBlock, leader_slot: Slot) -> LeaderStatus {
295 let leader_blocks = self
298 .dag_state
299 .read()
300 .get_uncommitted_blocks_at_slot(leader_slot);
301
302 if leader_blocks.len() > 1 {
305 tracing::warn!(
306 "Multiple blocks found for leader slot {leader_slot}: {:?}",
307 leader_blocks
308 );
309 }
310
311 let wave = self.wave_number(leader_slot.round);
315 let decision_round = self.decision_round(wave);
316 let potential_certificates = self
317 .dag_state
318 .read()
319 .ancestors_at_round(anchor, decision_round);
320
321 let mut certified_leader_blocks: Vec<_> = leader_blocks
324 .into_iter()
325 .filter(|leader_block| {
326 let mut all_votes = HashMap::new();
327 potential_certificates.iter().any(|potential_certificate| {
328 self.is_certificate(potential_certificate, leader_block, &mut all_votes)
329 })
330 })
331 .collect();
332
333 if certified_leader_blocks.len() > 1 {
336 panic!("More than one certified block at wave {wave} from leader {leader_slot}")
337 }
338
339 match certified_leader_blocks.pop() {
342 Some(certified_leader_block) => LeaderStatus::Commit(certified_leader_block),
343 None => LeaderStatus::Skip(leader_slot),
344 }
345 }
346
347 fn enough_leader_blame(&self, voting_round: Round, leader: AuthorityIndex) -> bool {
350 let voting_blocks = self
351 .dag_state
352 .read()
353 .get_uncommitted_blocks_at_round(voting_round);
354
355 let mut blame_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
356 for voting_block in &voting_blocks {
357 let voter = voting_block.reference().author;
358 if voting_block
359 .ancestors()
360 .iter()
361 .all(|ancestor| ancestor.author != leader)
362 {
363 tracing::trace!(
364 "[{self}] {voting_block} is a blame for leader {}",
365 Slot::new(voting_round - 1, leader)
366 );
367 if blame_stake_aggregator.add(voter, &self.context.committee) {
368 return true;
369 }
370 } else {
371 tracing::trace!(
372 "[{self}] {voting_block} is not a blame for leader {}",
373 Slot::new(voting_round - 1, leader)
374 );
375 }
376 }
377 false
378 }
379
380 fn enough_leader_support(&self, decision_round: Round, leader_block: &VerifiedBlock) -> bool {
383 let decision_blocks = self
384 .dag_state
385 .read()
386 .get_uncommitted_blocks_at_round(decision_round);
387
388 let total_stake: Stake = decision_blocks
391 .iter()
392 .map(|b| self.context.committee.stake(b.author()))
393 .sum();
394 if !self.context.committee.reached_quorum(total_stake) {
395 tracing::debug!(
396 "Not enough support for {leader_block}. Stake not enough: {total_stake} < {}",
397 self.context.committee.quorum_threshold()
398 );
399 return false;
400 }
401
402 let mut certificate_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
403 let mut all_votes = HashMap::new();
404 for decision_block in &decision_blocks {
405 let authority = decision_block.reference().author;
406 if self.is_certificate(decision_block, leader_block, &mut all_votes)
407 && certificate_stake_aggregator.add(authority, &self.context.committee)
408 {
409 return true;
410 }
411 }
412 false
413 }
414}
415
416impl Display for BaseCommitter {
417 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418 write!(
419 f,
420 "Committer-L{}-R{}",
421 self.options.leader_offset, self.options.round_offset
422 )
423 }
424}
425
426#[cfg(test)]
430mod base_committer_builder {
431 use super::*;
432 use crate::leader_schedule::LeaderSwapTable;
433
434 pub(crate) struct BaseCommitterBuilder {
435 context: Arc<Context>,
436 dag_state: Arc<RwLock<DagState>>,
437 wave_length: u32,
438 leader_offset: u32,
439 round_offset: u32,
440 }
441
442 impl BaseCommitterBuilder {
443 pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
444 Self {
445 context,
446 dag_state,
447 wave_length: DEFAULT_WAVE_LENGTH,
448 leader_offset: 0,
449 round_offset: 0,
450 }
451 }
452
453 #[expect(unused)]
454 pub(crate) fn with_wave_length(mut self, wave_length: u32) -> Self {
455 self.wave_length = wave_length;
456 self
457 }
458
459 #[expect(unused)]
460 pub(crate) fn with_leader_offset(mut self, leader_offset: u32) -> Self {
461 self.leader_offset = leader_offset;
462 self
463 }
464
465 #[expect(unused)]
466 pub(crate) fn with_round_offset(mut self, round_offset: u32) -> Self {
467 self.round_offset = round_offset;
468 self
469 }
470
471 pub(crate) fn build(self) -> BaseCommitter {
472 let options = BaseCommitterOptions {
473 wave_length: self.wave_length,
474 leader_offset: self.leader_offset,
475 round_offset: self.round_offset,
476 };
477 BaseCommitter::new(
478 self.context.clone(),
479 Arc::new(LeaderSchedule::new(
480 self.context,
481 LeaderSwapTable::default(),
482 )),
483 self.dag_state,
484 options,
485 )
486 }
487 }
488}