consensus_core/
threshold_clock.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::Ordering, sync::Arc};
6
7use tokio::time::Instant;
8
9use crate::{
10    block::{BlockRef, Round},
11    context::Context,
12    stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16    aggregator: StakeAggregator<QuorumThreshold>,
17    round: Round,
18    quorum_ts: Instant,
19    context: Arc<Context>,
20}
21
22impl ThresholdClock {
23    pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
24        Self {
25            aggregator: StakeAggregator::new(),
26            round,
27            quorum_ts: Instant::now(),
28            context,
29        }
30    }
31
32    /// Add the block reference that have been accepted and advance the round
33    /// accordingly.
34    pub(crate) fn add_block(&mut self, block: BlockRef) {
35        match block.round.cmp(&self.round) {
36            // Blocks with round less then what we currently build are irrelevant here
37            Ordering::Less => {}
38            // If we processed block for round r, we also have stored 2f+1 blocks from r-1
39            Ordering::Greater => {
40                self.aggregator.clear();
41                self.aggregator.add(block.author, &self.context.committee);
42                self.round = block.round;
43            }
44            Ordering::Equal => {
45                if self.aggregator.add(block.author, &self.context.committee) {
46                    self.aggregator.clear();
47                    // We have seen 2f+1 blocks for current round, advance
48                    self.round = block.round + 1;
49
50                    // now record the time of receipt from last quorum
51                    let now = Instant::now();
52                    self.context
53                        .metrics
54                        .node_metrics
55                        .quorum_receive_latency
56                        .observe(now.duration_since(self.quorum_ts).as_secs_f64());
57                    self.quorum_ts = now;
58                }
59            }
60        }
61    }
62
63    /// Add the block references that have been successfully processed and
64    /// advance the round accordingly. If the round has indeed advanced then
65    /// the new round is returned, otherwise None is returned.
66    #[cfg(test)]
67    fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
68        let previous_round = self.round;
69        for block_ref in blocks {
70            self.add_block(block_ref);
71        }
72        (self.round > previous_round).then_some(self.round)
73    }
74
75    pub(crate) fn get_round(&self) -> Round {
76        self.round
77    }
78
79    pub(crate) fn get_quorum_ts(&self) -> Instant {
80        self.quorum_ts
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use consensus_config::AuthorityIndex;
87
88    use super::*;
89    use crate::block::BlockDigest;
90
91    #[tokio::test]
92    async fn test_threshold_clock_add_block() {
93        let context = Arc::new(Context::new_for_test(4).0);
94        let mut aggregator = ThresholdClock::new(0, context);
95
96        aggregator.add_block(BlockRef::new(
97            0,
98            AuthorityIndex::new_for_test(0),
99            BlockDigest::default(),
100        ));
101        assert_eq!(aggregator.get_round(), 0);
102        aggregator.add_block(BlockRef::new(
103            0,
104            AuthorityIndex::new_for_test(1),
105            BlockDigest::default(),
106        ));
107        assert_eq!(aggregator.get_round(), 0);
108        aggregator.add_block(BlockRef::new(
109            0,
110            AuthorityIndex::new_for_test(2),
111            BlockDigest::default(),
112        ));
113        assert_eq!(aggregator.get_round(), 1);
114        aggregator.add_block(BlockRef::new(
115            1,
116            AuthorityIndex::new_for_test(0),
117            BlockDigest::default(),
118        ));
119        assert_eq!(aggregator.get_round(), 1);
120        aggregator.add_block(BlockRef::new(
121            1,
122            AuthorityIndex::new_for_test(3),
123            BlockDigest::default(),
124        ));
125        assert_eq!(aggregator.get_round(), 1);
126        aggregator.add_block(BlockRef::new(
127            2,
128            AuthorityIndex::new_for_test(1),
129            BlockDigest::default(),
130        ));
131        assert_eq!(aggregator.get_round(), 2);
132        aggregator.add_block(BlockRef::new(
133            1,
134            AuthorityIndex::new_for_test(1),
135            BlockDigest::default(),
136        ));
137        assert_eq!(aggregator.get_round(), 2);
138        aggregator.add_block(BlockRef::new(
139            5,
140            AuthorityIndex::new_for_test(2),
141            BlockDigest::default(),
142        ));
143        assert_eq!(aggregator.get_round(), 5);
144    }
145
146    #[tokio::test]
147    async fn test_threshold_clock_add_blocks() {
148        let context = Arc::new(Context::new_for_test(4).0);
149        let mut aggregator = ThresholdClock::new(0, context);
150
151        let block_refs = vec![
152            BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
153            BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
154            BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
155            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
156            BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
157            BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
158            BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
159            BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
160        ];
161
162        let result = aggregator.add_blocks(block_refs);
163        assert_eq!(Some(5), result);
164    }
165}