consensus_core/
threshold_clock.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::Ordering, sync::Arc};
6
7use tokio::time::Instant;
8
9use crate::{
10    block::{BlockRef, Round},
11    context::Context,
12    stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16    aggregator: StakeAggregator<QuorumThreshold>,
17    round: Round,
18    quorum_ts: Instant,
19    context: Arc<Context>,
20}
21
22impl ThresholdClock {
23    pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
24        Self {
25            aggregator: StakeAggregator::new(),
26            round,
27            quorum_ts: Instant::now(),
28            context,
29        }
30    }
31
32    /// If quorum (2f+1) is reached, advance to the next round and record
33    /// latency metrics. Returns true if quorum was reached.
34    fn try_advance_round(&mut self, new_round: Round) -> bool {
35        if self.aggregator.reached_threshold(&self.context.committee) {
36            self.aggregator.clear();
37            self.round = new_round;
38
39            let now = Instant::now();
40            self.context
41                .metrics
42                .node_metrics
43                .quorum_receive_latency
44                .observe(now.duration_since(self.quorum_ts).as_secs_f64());
45            self.quorum_ts = now;
46            true
47        } else {
48            false
49        }
50    }
51
52    /// Add the block reference and advance the round accordingly.
53    ///
54    /// Round advancement rules:
55    /// - block.round < current: ignored (stale block)
56    /// - block.round > current: jump to block.round, start collecting stake
57    ///   there
58    /// - block.round == current: continue accumulating stake until quorum
59    ///   (2f+1) reached
60    ///
61    /// When quorum is reached, advance to round + 1.
62    pub(crate) fn add_block(&mut self, block: BlockRef) {
63        match block.round.cmp(&self.round) {
64            Ordering::Less => {}
65            Ordering::Greater => {
66                // Jump to the new round and start with fresh state
67                self.aggregator.clear();
68                self.aggregator.add(block.author, &self.context.committee);
69                self.round = block.round;
70            }
71            Ordering::Equal => {
72                self.aggregator.add(block.author, &self.context.committee);
73            }
74        }
75        self.try_advance_round(block.round + 1);
76    }
77
78    /// Add the block references that have been successfully processed and
79    /// advance the round accordingly. If the round has indeed advanced then
80    /// the new round is returned, otherwise None is returned.
81    #[cfg(test)]
82    fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
83        let previous_round = self.round;
84        for block_ref in blocks {
85            self.add_block(block_ref);
86        }
87        (self.round > previous_round).then_some(self.round)
88    }
89
90    pub(crate) fn get_round(&self) -> Round {
91        self.round
92    }
93
94    pub(crate) fn get_quorum_ts(&self) -> Instant {
95        self.quorum_ts
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use consensus_config::AuthorityIndex;
102
103    use super::*;
104    use crate::block::BlockDigest;
105
106    #[tokio::test]
107    async fn test_threshold_clock_add_block() {
108        let context = Arc::new(Context::new_for_test(4).0);
109        let mut aggregator = ThresholdClock::new(0, context);
110
111        aggregator.add_block(BlockRef::new(
112            0,
113            AuthorityIndex::new_for_test(0),
114            BlockDigest::default(),
115        ));
116        assert_eq!(aggregator.get_round(), 0);
117        aggregator.add_block(BlockRef::new(
118            0,
119            AuthorityIndex::new_for_test(1),
120            BlockDigest::default(),
121        ));
122        assert_eq!(aggregator.get_round(), 0);
123        aggregator.add_block(BlockRef::new(
124            0,
125            AuthorityIndex::new_for_test(2),
126            BlockDigest::default(),
127        ));
128        assert_eq!(aggregator.get_round(), 1);
129        aggregator.add_block(BlockRef::new(
130            1,
131            AuthorityIndex::new_for_test(0),
132            BlockDigest::default(),
133        ));
134        assert_eq!(aggregator.get_round(), 1);
135        aggregator.add_block(BlockRef::new(
136            1,
137            AuthorityIndex::new_for_test(3),
138            BlockDigest::default(),
139        ));
140        assert_eq!(aggregator.get_round(), 1);
141        aggregator.add_block(BlockRef::new(
142            2,
143            AuthorityIndex::new_for_test(1),
144            BlockDigest::default(),
145        ));
146        assert_eq!(aggregator.get_round(), 2);
147        aggregator.add_block(BlockRef::new(
148            1,
149            AuthorityIndex::new_for_test(1),
150            BlockDigest::default(),
151        ));
152        assert_eq!(aggregator.get_round(), 2);
153        aggregator.add_block(BlockRef::new(
154            5,
155            AuthorityIndex::new_for_test(2),
156            BlockDigest::default(),
157        ));
158        assert_eq!(aggregator.get_round(), 5);
159    }
160
161    #[tokio::test]
162    async fn test_threshold_clock_add_blocks() {
163        let context = Arc::new(Context::new_for_test(4).0);
164        let mut aggregator = ThresholdClock::new(0, context);
165
166        let block_refs = vec![
167            BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
168            BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
169            BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
170            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
171            BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
172            BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
173            BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
174            BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
175        ];
176
177        let result = aggregator.add_blocks(block_refs);
178        assert_eq!(Some(5), result);
179    }
180
181    /// Test that when jumping to a higher round, the first block's author is
182    /// tracked, allowing subsequent blocks to form quorum.
183    #[tokio::test]
184    async fn test_threshold_clock_round_skip_then_quorum() {
185        let context = Arc::new(Context::new_for_test(4).0);
186        let mut clock = ThresholdClock::new(0, context);
187
188        // Jump from round 0 to round 5 - author should be tracked
189        clock.add_block(BlockRef::new(
190            5,
191            AuthorityIndex::new_for_test(0),
192            BlockDigest::default(),
193        ));
194        assert_eq!(clock.get_round(), 5);
195
196        // Add more blocks at round 5 to reach quorum (need 3 of 4)
197        clock.add_block(BlockRef::new(
198            5,
199            AuthorityIndex::new_for_test(1),
200            BlockDigest::default(),
201        ));
202        assert_eq!(clock.get_round(), 5);
203
204        clock.add_block(BlockRef::new(
205            5,
206            AuthorityIndex::new_for_test(2),
207            BlockDigest::default(),
208        ));
209        assert_eq!(clock.get_round(), 6); // Quorum reached
210    }
211
212    /// Test that a super-majority authority (>2/3 stake) immediately advances
213    /// the round when jumping to a higher round.
214    #[tokio::test]
215    async fn test_threshold_clock_super_majority_round_skip() {
216        use consensus_config::Parameters;
217        use tempfile::TempDir;
218
219        use crate::metrics::test_metrics;
220
221        // Authority 0 has 5/7 stake (>2/3 quorum threshold)
222        let (committee, _) = consensus_config::local_committee_and_keys(0, vec![5, 1, 1]);
223        let metrics = test_metrics(3);
224        let temp_dir = TempDir::new().unwrap();
225
226        let context = Arc::new(crate::context::Context::new(
227            0,
228            AuthorityIndex::new_for_test(0),
229            committee,
230            Parameters {
231                db_path: temp_dir.keep(),
232                ..Default::default()
233            },
234            iota_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
235            metrics,
236            Arc::new(crate::context::Clock::default()),
237        ));
238
239        let mut clock = ThresholdClock::new(0, context);
240
241        // Single block from super-majority authority at round 5 reaches quorum
242        // immediately
243        clock.add_block(BlockRef::new(
244            5,
245            AuthorityIndex::new_for_test(0),
246            BlockDigest::default(),
247        ));
248        assert_eq!(clock.get_round(), 6);
249
250        // Stale blocks from round 5 should be ignored
251        clock.add_block(BlockRef::new(
252            5,
253            AuthorityIndex::new_for_test(1),
254            BlockDigest::default(),
255        ));
256        assert_eq!(clock.get_round(), 6);
257        clock.add_block(BlockRef::new(
258            5,
259            AuthorityIndex::new_for_test(2),
260            BlockDigest::default(),
261        ));
262        assert_eq!(clock.get_round(), 6);
263    }
264}