1use std::{
6 cmp::Ordering,
7 fmt::{self, Debug, Display, Formatter},
8 hash::{Hash, Hasher},
9 ops::{Deref, Range, RangeInclusive},
10 sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{AuthorityIndex, DIGEST_LENGTH, DefaultHashFunction};
15use enum_dispatch::enum_dispatch;
16use fastcrypto::hash::{Digest, HashFunction as _};
17use serde::{Deserialize, Serialize};
18
19use crate::{
20 block::{BlockAPI, BlockRef, BlockTimestampMs, Round, Slot, VerifiedBlock},
21 leader_scoring::ReputationScores,
22 storage::Store,
23};
24
25pub type CommitIndex = u32;
27
28pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
29
30pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
37
38pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
40
41pub(crate) type WaveNumber = u32;
44
45#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
60#[enum_dispatch(CommitAPI)]
61pub(crate) enum Commit {
62 V1(CommitV1),
63}
64
65impl Commit {
66 pub(crate) fn new(
68 index: CommitIndex,
69 previous_digest: CommitDigest,
70 timestamp_ms: BlockTimestampMs,
71 leader: BlockRef,
72 blocks: Vec<BlockRef>,
73 ) -> Self {
74 Commit::V1(CommitV1 {
75 index,
76 previous_digest,
77 timestamp_ms,
78 leader,
79 blocks,
80 })
81 }
82
83 pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
84 let bytes = bcs::to_bytes(self)?;
85 Ok(bytes.into())
86 }
87}
88
89#[enum_dispatch]
91pub(crate) trait CommitAPI {
92 fn round(&self) -> Round;
93 fn index(&self) -> CommitIndex;
94 fn previous_digest(&self) -> CommitDigest;
95 fn timestamp_ms(&self) -> BlockTimestampMs;
96 fn leader(&self) -> BlockRef;
97 fn blocks(&self) -> &[BlockRef];
98}
99
100#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
104pub(crate) struct CommitV1 {
105 index: CommitIndex,
109 previous_digest: CommitDigest,
112 timestamp_ms: BlockTimestampMs,
115 leader: BlockRef,
117 blocks: Vec<BlockRef>,
119}
120
121impl CommitAPI for CommitV1 {
122 fn round(&self) -> Round {
123 self.leader.round
124 }
125
126 fn index(&self) -> CommitIndex {
127 self.index
128 }
129
130 fn previous_digest(&self) -> CommitDigest {
131 self.previous_digest
132 }
133
134 fn timestamp_ms(&self) -> BlockTimestampMs {
135 self.timestamp_ms
136 }
137
138 fn leader(&self) -> BlockRef {
139 self.leader
140 }
141
142 fn blocks(&self) -> &[BlockRef] {
143 &self.blocks
144 }
145}
146
147#[derive(Clone, Debug, PartialEq)]
153pub(crate) struct TrustedCommit {
154 inner: Arc<Commit>,
155
156 digest: CommitDigest,
158 serialized: Bytes,
159}
160
161impl TrustedCommit {
162 pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
163 let digest = Self::compute_digest(&serialized);
164 Self {
165 inner: Arc::new(commit),
166 digest,
167 serialized,
168 }
169 }
170
171 #[cfg(test)]
172 pub(crate) fn new_for_test(
173 index: CommitIndex,
174 previous_digest: CommitDigest,
175 timestamp_ms: BlockTimestampMs,
176 leader: BlockRef,
177 blocks: Vec<BlockRef>,
178 ) -> Self {
179 let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
180 let serialized = commit.serialize().unwrap();
181 Self::new_trusted(commit, serialized)
182 }
183
184 pub(crate) fn reference(&self) -> CommitRef {
185 CommitRef {
186 index: self.index(),
187 digest: self.digest(),
188 }
189 }
190
191 pub(crate) fn digest(&self) -> CommitDigest {
192 self.digest
193 }
194
195 pub(crate) fn serialized(&self) -> &Bytes {
196 &self.serialized
197 }
198
199 pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
200 let mut hasher = DefaultHashFunction::new();
201 hasher.update(serialized);
202 CommitDigest(hasher.finalize().into())
203 }
204}
205
206impl Deref for TrustedCommit {
208 type Target = Commit;
209
210 fn deref(&self) -> &Self::Target {
211 &self.inner
212 }
213}
214
215#[derive(Clone, Debug)]
220pub(crate) struct CertifiedCommits {
221 commits: Vec<CertifiedCommit>,
222 votes: Vec<VerifiedBlock>,
223}
224
225impl CertifiedCommits {
226 pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
227 Self { commits, votes }
228 }
229
230 pub(crate) fn commits(&self) -> &[CertifiedCommit] {
231 &self.commits
232 }
233
234 pub(crate) fn votes(&self) -> &[VerifiedBlock] {
235 &self.votes
236 }
237}
238
239#[derive(Clone, Debug)]
241pub(crate) struct CertifiedCommit {
242 commit: Arc<TrustedCommit>,
243 blocks: Vec<VerifiedBlock>,
244}
245
246impl CertifiedCommit {
247 pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
248 Self {
249 commit: Arc::new(commit),
250 blocks,
251 }
252 }
253
254 pub fn blocks(&self) -> &[VerifiedBlock] {
255 &self.blocks
256 }
257}
258
259impl Deref for CertifiedCommit {
260 type Target = TrustedCommit;
261
262 fn deref(&self) -> &Self::Target {
263 &self.commit
264 }
265}
266
267#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
269pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
270
271impl CommitDigest {
272 pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
274 pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
275
276 pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
277 self.0
278 }
279}
280
281impl Hash for CommitDigest {
282 fn hash<H: Hasher>(&self, state: &mut H) {
283 state.write(&self.0[..8]);
284 }
285}
286
287impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
288 fn from(hd: CommitDigest) -> Self {
289 Digest::new(hd.0)
290 }
291}
292
293impl fmt::Display for CommitDigest {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
295 write!(
296 f,
297 "{}",
298 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
299 .get(0..4)
300 .ok_or(fmt::Error)?
301 )
302 }
303}
304
305impl fmt::Debug for CommitDigest {
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
307 write!(
308 f,
309 "{}",
310 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
311 )
312 }
313}
314
315#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
317pub struct CommitRef {
318 pub index: CommitIndex,
319 pub digest: CommitDigest,
320}
321
322impl CommitRef {
323 pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
324 Self { index, digest }
325 }
326}
327
328impl fmt::Display for CommitRef {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
330 write!(f, "C{}({})", self.index, self.digest)
331 }
332}
333
334impl fmt::Debug for CommitRef {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
336 write!(f, "C{}({:?})", self.index, self.digest)
337 }
338}
339
340pub type CommitVote = CommitRef;
342
343#[derive(Clone, PartialEq)]
350pub struct CommittedSubDag {
351 pub leader: BlockRef,
353 pub blocks: Vec<VerifiedBlock>,
355 pub timestamp_ms: BlockTimestampMs,
358 pub commit_ref: CommitRef,
362 pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
366}
367
368impl CommittedSubDag {
369 pub fn new(
371 leader: BlockRef,
372 blocks: Vec<VerifiedBlock>,
373 timestamp_ms: BlockTimestampMs,
374 commit_ref: CommitRef,
375 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
376 ) -> Self {
377 Self {
378 leader,
379 blocks,
380 timestamp_ms,
381 commit_ref,
382 reputation_scores_desc,
383 }
384 }
385}
386
387pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
390 blocks.sort_by(|a, b| {
391 a.round()
392 .cmp(&b.round())
393 .then_with(|| a.author().cmp(&b.author()))
394 })
395}
396
397impl Display for CommittedSubDag {
398 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
399 write!(
400 f,
401 "CommittedSubDag(leader={}, ref={}, blocks=[",
402 self.leader, self.commit_ref
403 )?;
404 for (idx, block) in self.blocks.iter().enumerate() {
405 if idx > 0 {
406 write!(f, ", ")?;
407 }
408 write!(f, "{}", block.digest())?;
409 }
410 write!(f, "])")
411 }
412}
413
414impl fmt::Debug for CommittedSubDag {
415 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416 write!(f, "{}@{} ([", self.leader, self.commit_ref)?;
417 for block in &self.blocks {
418 write!(f, "{}, ", block.reference())?;
419 }
420 write!(
421 f,
422 "];{}ms;rs{:?})",
423 self.timestamp_ms, self.reputation_scores_desc
424 )
425 }
426}
427
428pub fn load_committed_subdag_from_store(
430 store: &dyn Store,
431 commit: TrustedCommit,
432 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
433) -> CommittedSubDag {
434 let mut leader_block_idx = None;
435 let commit_blocks = store
436 .read_blocks(commit.blocks())
437 .expect("We should have the block referenced in the commit data");
438 let blocks = commit_blocks
439 .into_iter()
440 .enumerate()
441 .map(|(idx, commit_block_opt)| {
442 let commit_block =
443 commit_block_opt.expect("We should have the block referenced in the commit data");
444 if commit_block.reference() == commit.leader() {
445 leader_block_idx = Some(idx);
446 }
447 commit_block
448 })
449 .collect::<Vec<_>>();
450 let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
451 let leader_block_ref = blocks[leader_block_idx].reference();
452 CommittedSubDag::new(
453 leader_block_ref,
454 blocks,
455 commit.timestamp_ms(),
456 commit.reference(),
457 reputation_scores_desc,
458 )
459}
460
461#[derive(Debug, Clone, Copy, Eq, PartialEq)]
462pub(crate) enum Decision {
463 Direct,
464 Indirect,
465 Certified, }
467
468#[derive(Debug, Clone, PartialEq)]
470pub(crate) enum LeaderStatus {
471 Commit(VerifiedBlock),
472 Skip(Slot),
473 Undecided(Slot),
474}
475
476impl LeaderStatus {
477 pub(crate) fn round(&self) -> Round {
478 match self {
479 Self::Commit(block) => block.round(),
480 Self::Skip(leader) => leader.round,
481 Self::Undecided(leader) => leader.round,
482 }
483 }
484
485 pub(crate) fn is_decided(&self) -> bool {
486 match self {
487 Self::Commit(_) => true,
488 Self::Skip(_) => true,
489 Self::Undecided(_) => false,
490 }
491 }
492
493 pub(crate) fn into_decided_leader(self) -> Option<DecidedLeader> {
494 match self {
495 Self::Commit(block) => Some(DecidedLeader::Commit(block)),
496 Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
497 Self::Undecided(..) => None,
498 }
499 }
500}
501
502impl Display for LeaderStatus {
503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504 match self {
505 Self::Commit(block) => write!(f, "Commit({})", block.reference()),
506 Self::Skip(slot) => write!(f, "Skip({slot})"),
507 Self::Undecided(slot) => write!(f, "Undecided({slot})"),
508 }
509 }
510}
511
512#[derive(Debug, Clone, PartialEq)]
514pub(crate) enum DecidedLeader {
515 Commit(VerifiedBlock),
516 Skip(Slot),
517}
518
519impl DecidedLeader {
520 pub(crate) fn slot(&self) -> Slot {
522 match self {
523 Self::Commit(block) => block.reference().into(),
524 Self::Skip(slot) => *slot,
525 }
526 }
527
528 pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
531 match self {
532 Self::Commit(block) => Some(block),
533 Self::Skip(_) => None,
534 }
535 }
536
537 #[cfg(test)]
538 pub(crate) fn round(&self) -> Round {
539 match self {
540 Self::Commit(block) => block.round(),
541 Self::Skip(leader) => leader.round,
542 }
543 }
544
545 #[cfg(test)]
546 pub(crate) fn authority(&self) -> AuthorityIndex {
547 match self {
548 Self::Commit(block) => block.author(),
549 Self::Skip(leader) => leader.authority,
550 }
551 }
552}
553
554impl Display for DecidedLeader {
555 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
556 match self {
557 Self::Commit(block) => write!(f, "Commit({})", block.reference()),
558 Self::Skip(slot) => write!(f, "Skip({slot})"),
559 }
560 }
561}
562
563#[derive(Clone, Debug, Serialize, Deserialize)]
569pub(crate) struct CommitInfo {
570 pub(crate) committed_rounds: Vec<Round>,
571 pub(crate) reputation_scores: ReputationScores,
572}
573
574#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
582pub(crate) struct CommitRange(Range<CommitIndex>);
583
584impl CommitRange {
585 pub(crate) fn new(range: RangeInclusive<CommitIndex>) -> Self {
586 Self(*range.start()..(*range.end()).saturating_add(1))
589 }
590
591 pub(crate) fn start(&self) -> CommitIndex {
593 self.0.start
594 }
595
596 pub(crate) fn end(&self) -> CommitIndex {
598 self.0.end.saturating_sub(1)
599 }
600
601 pub(crate) fn extend_to(&mut self, other: CommitIndex) {
602 let new_end = other.saturating_add(1);
603 assert!(self.0.end <= new_end);
604 self.0 = self.0.start..new_end;
605 }
606
607 pub(crate) fn size(&self) -> usize {
608 self.0
609 .end
610 .checked_sub(self.0.start)
611 .expect("Range should never have end < start") as usize
612 }
613
614 pub(crate) fn is_equal_size(&self, other: &Self) -> bool {
616 self.size() == other.size()
617 }
618
619 pub(crate) fn is_next_range(&self, other: &Self) -> bool {
621 self.0.end == other.0.start
622 }
623}
624
625impl Ord for CommitRange {
626 fn cmp(&self, other: &Self) -> Ordering {
627 self.start()
628 .cmp(&other.start())
629 .then_with(|| self.end().cmp(&other.end()))
630 }
631}
632
633impl PartialOrd for CommitRange {
634 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
635 Some(self.cmp(other))
636 }
637}
638
639impl From<RangeInclusive<CommitIndex>> for CommitRange {
640 fn from(range: RangeInclusive<CommitIndex>) -> Self {
641 Self::new(range)
642 }
643}
644
645impl Debug for CommitRange {
647 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
648 write!(f, "CommitRange({}..={})", self.start(), self.end())
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use std::sync::Arc;
655
656 use super::*;
657 use crate::{
658 block::TestBlock,
659 context::Context,
660 storage::{WriteBatch, mem_store::MemStore},
661 };
662
663 #[tokio::test]
664 async fn test_new_subdag_from_commit() {
665 let store = Arc::new(MemStore::new());
666 let context = Arc::new(Context::new_for_test(4).0);
667 let wave_length = DEFAULT_WAVE_LENGTH;
668
669 let first_wave_rounds: u32 = wave_length;
671 let num_authorities: u32 = 4;
672
673 let mut blocks = Vec::new();
674 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
675 .committee
676 .authorities()
677 .map(|index| {
678 let author_idx = index.0.value() as u32;
679 let block = TestBlock::new(0, author_idx).build();
680 VerifiedBlock::new_for_test(block)
681 })
682 .map(|block| (block.reference(), block))
683 .unzip();
684 store.write(WriteBatch::default().blocks(genesis)).unwrap();
686 blocks.append(&mut genesis_references.clone());
687
688 let mut ancestors = genesis_references;
689 let mut leader = None;
690 for round in 1..=first_wave_rounds {
691 let mut new_ancestors = vec![];
692 for author in 0..num_authorities {
693 let base_ts = round as BlockTimestampMs * 1000;
694 let block = VerifiedBlock::new_for_test(
695 TestBlock::new(round, author)
696 .set_timestamp_ms(base_ts + (author + round) as u64)
697 .set_ancestors(ancestors.clone())
698 .build(),
699 );
700 store
701 .write(WriteBatch::default().blocks(vec![block.clone()]))
702 .unwrap();
703 new_ancestors.push(block.reference());
704 blocks.push(block.reference());
705
706 if round == first_wave_rounds {
709 leader = Some(block.clone());
710 break;
711 }
712 }
713 ancestors = new_ancestors;
714 }
715
716 let leader_block = leader.unwrap();
717 let leader_ref = leader_block.reference();
718 let commit_index = 1;
719 let commit = TrustedCommit::new_for_test(
720 commit_index,
721 CommitDigest::MIN,
722 leader_block.timestamp_ms(),
723 leader_ref,
724 blocks.clone(),
725 );
726 let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
727 assert_eq!(subdag.leader, leader_ref);
728 assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
729 assert_eq!(
730 subdag.blocks.len(),
731 (num_authorities * wave_length) as usize + 1
732 );
733 assert_eq!(subdag.commit_ref, commit.reference());
734 assert_eq!(subdag.reputation_scores_desc, vec![]);
735 }
736
737 #[tokio::test]
738 async fn test_commit_range() {
739 telemetry_subscribers::init_for_testing();
740 let mut range1 = CommitRange::new(1..=5);
741 let range2 = CommitRange::new(2..=6);
742 let range3 = CommitRange::new(5..=10);
743 let range4 = CommitRange::new(6..=10);
744 let range5 = CommitRange::new(6..=9);
745 let range6 = CommitRange::new(1..=1);
746
747 assert_eq!(range1.start(), 1);
748 assert_eq!(range1.end(), 5);
749
750 assert_eq!(range1.size(), 5);
752 assert_eq!(range3.size(), 6);
753 assert_eq!(range6.size(), 1);
754
755 assert!(!range1.is_next_range(&range2));
757 assert!(!range1.is_next_range(&range3));
758 assert!(range1.is_next_range(&range4));
759 assert!(range1.is_next_range(&range5));
760
761 assert!(range1.is_equal_size(&range2));
763 assert!(!range1.is_equal_size(&range3));
764 assert!(range1.is_equal_size(&range4));
765 assert!(!range1.is_equal_size(&range5));
766
767 assert!(range1 < range2);
769 assert!(range2 < range3);
770 assert!(range3 < range4);
771 assert!(range5 < range4);
772
773 range1.extend_to(10);
775 assert_eq!(range1.start(), 1);
776 assert_eq!(range1.end(), 10);
777 assert_eq!(range1.size(), 10);
778 range1.extend_to(20);
779 assert_eq!(range1.start(), 1);
780 assert_eq!(range1.end(), 20);
781 assert_eq!(range1.size(), 20);
782 }
783}