consensus_core/
commit_vote_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use parking_lot::Mutex;
8
9use crate::{
10    CommitIndex,
11    block::{BlockAPI as _, VerifiedBlock},
12    commit::GENESIS_COMMIT_INDEX,
13    context::Context,
14};
15
16/// Monitors the progress of consensus commits across the network.
17pub(crate) struct CommitVoteMonitor {
18    context: Arc<Context>,
19    // Highest commit index voted by each authority.
20    highest_voted_commits: Mutex<Vec<CommitIndex>>,
21}
22
23impl CommitVoteMonitor {
24    pub(crate) fn new(context: Arc<Context>) -> Self {
25        let highest_voted_commits = Mutex::new(vec![0; context.committee.size()]);
26        Self {
27            context,
28            highest_voted_commits,
29        }
30    }
31
32    /// Keeps track of the highest commit voted by each authority.
33    pub(crate) fn observe_block(&self, block: &VerifiedBlock) {
34        let mut highest_voted_commits = self.highest_voted_commits.lock();
35        for vote in block.commit_votes() {
36            if vote.index > highest_voted_commits[block.author()] {
37                highest_voted_commits[block.author()] = vote.index;
38            }
39        }
40    }
41
42    // Finds the highest commit index certified by a quorum.
43    // When an authority votes for commit index S, it is also voting for all commit
44    // indices 1 <= i < S. So the quorum commit index is the smallest index S
45    // such that the sum of stakes of authorities voting for commit indices >= S
46    // passes the quorum threshold.
47    pub(crate) fn quorum_commit_index(&self) -> CommitIndex {
48        let highest_voted_commits = self.highest_voted_commits.lock();
49        let mut highest_voted_commits = highest_voted_commits
50            .iter()
51            .zip(self.context.committee.authorities())
52            .map(|(commit_index, (_, a))| (*commit_index, a.stake))
53            .collect::<Vec<_>>();
54        // Sort by commit index then stake, in descending order.
55        highest_voted_commits.sort_by(|a, b| a.cmp(b).reverse());
56        let mut total_stake = 0;
57        for (commit_index, stake) in highest_voted_commits {
58            total_stake += stake;
59            if total_stake >= self.context.committee.quorum_threshold() {
60                return commit_index;
61            }
62        }
63        GENESIS_COMMIT_INDEX
64    }
65}
66
67#[cfg(test)]
68mod test {
69    use std::sync::Arc;
70
71    use super::CommitVoteMonitor;
72    use crate::{
73        block::{TestBlock, VerifiedBlock},
74        commit::{CommitDigest, CommitRef},
75        context::Context,
76    };
77
78    #[tokio::test]
79    async fn test_commit_vote_monitor() {
80        let context = Arc::new(Context::new_for_test(4).0);
81        let monitor = CommitVoteMonitor::new(context.clone());
82
83        // Observe commit votes for indices 5, 6, 7, 8 from blocks.
84        let blocks = (0..4)
85            .map(|i| {
86                VerifiedBlock::new_for_test(
87                    TestBlock::new(10, i)
88                        .set_commit_votes(vec![CommitRef::new(5 + i, CommitDigest::MIN)])
89                        .build(),
90                )
91            })
92            .collect::<Vec<_>>();
93        for b in blocks {
94            monitor.observe_block(&b);
95        }
96
97        // CommitIndex 6 is the highest index supported by a quorum.
98        assert_eq!(monitor.quorum_commit_index(), 6);
99
100        // Observe new blocks with new votes from authority 0 and 1.
101        let blocks = (0..2)
102            .map(|i| {
103                VerifiedBlock::new_for_test(
104                    TestBlock::new(11, i)
105                        .set_commit_votes(vec![
106                            CommitRef::new(6 + i, CommitDigest::MIN),
107                            CommitRef::new(7 + i, CommitDigest::MIN),
108                        ])
109                        .build(),
110                )
111            })
112            .collect::<Vec<_>>();
113        for b in blocks {
114            monitor.observe_block(&b);
115        }
116
117        // Highest commit index per authority should be 7, 8, 7, 8 now.
118        assert_eq!(monitor.quorum_commit_index(), 7);
119    }
120}