consensus_core/
threshold_clock.rs1use std::{cmp::Ordering, sync::Arc};
6
7use tokio::time::Instant;
8
9use crate::{
10 block::{BlockRef, Round},
11 context::Context,
12 stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16 aggregator: StakeAggregator<QuorumThreshold>,
17 round: Round,
18 quorum_ts: Instant,
19 context: Arc<Context>,
20}
21
22impl ThresholdClock {
23 pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
24 Self {
25 aggregator: StakeAggregator::new(),
26 round,
27 quorum_ts: Instant::now(),
28 context,
29 }
30 }
31
32 pub(crate) fn add_block(&mut self, block: BlockRef) {
35 match block.round.cmp(&self.round) {
36 Ordering::Less => {}
38 Ordering::Greater => {
40 self.aggregator.clear();
41 self.aggregator.add(block.author, &self.context.committee);
42 self.round = block.round;
43 }
44 Ordering::Equal => {
45 if self.aggregator.add(block.author, &self.context.committee) {
46 self.aggregator.clear();
47 self.round = block.round + 1;
49
50 let now = Instant::now();
52 self.context
53 .metrics
54 .node_metrics
55 .quorum_receive_latency
56 .observe(now.duration_since(self.quorum_ts).as_secs_f64());
57 self.quorum_ts = now;
58 }
59 }
60 }
61 }
62
63 #[cfg(test)]
67 fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
68 let previous_round = self.round;
69 for block_ref in blocks {
70 self.add_block(block_ref);
71 }
72 (self.round > previous_round).then_some(self.round)
73 }
74
75 pub(crate) fn get_round(&self) -> Round {
76 self.round
77 }
78
79 pub(crate) fn get_quorum_ts(&self) -> Instant {
80 self.quorum_ts
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use consensus_config::AuthorityIndex;
87
88 use super::*;
89 use crate::block::BlockDigest;
90
91 #[tokio::test]
92 async fn test_threshold_clock_add_block() {
93 let context = Arc::new(Context::new_for_test(4).0);
94 let mut aggregator = ThresholdClock::new(0, context);
95
96 aggregator.add_block(BlockRef::new(
97 0,
98 AuthorityIndex::new_for_test(0),
99 BlockDigest::default(),
100 ));
101 assert_eq!(aggregator.get_round(), 0);
102 aggregator.add_block(BlockRef::new(
103 0,
104 AuthorityIndex::new_for_test(1),
105 BlockDigest::default(),
106 ));
107 assert_eq!(aggregator.get_round(), 0);
108 aggregator.add_block(BlockRef::new(
109 0,
110 AuthorityIndex::new_for_test(2),
111 BlockDigest::default(),
112 ));
113 assert_eq!(aggregator.get_round(), 1);
114 aggregator.add_block(BlockRef::new(
115 1,
116 AuthorityIndex::new_for_test(0),
117 BlockDigest::default(),
118 ));
119 assert_eq!(aggregator.get_round(), 1);
120 aggregator.add_block(BlockRef::new(
121 1,
122 AuthorityIndex::new_for_test(3),
123 BlockDigest::default(),
124 ));
125 assert_eq!(aggregator.get_round(), 1);
126 aggregator.add_block(BlockRef::new(
127 2,
128 AuthorityIndex::new_for_test(1),
129 BlockDigest::default(),
130 ));
131 assert_eq!(aggregator.get_round(), 2);
132 aggregator.add_block(BlockRef::new(
133 1,
134 AuthorityIndex::new_for_test(1),
135 BlockDigest::default(),
136 ));
137 assert_eq!(aggregator.get_round(), 2);
138 aggregator.add_block(BlockRef::new(
139 5,
140 AuthorityIndex::new_for_test(2),
141 BlockDigest::default(),
142 ));
143 assert_eq!(aggregator.get_round(), 5);
144 }
145
146 #[tokio::test]
147 async fn test_threshold_clock_add_blocks() {
148 let context = Arc::new(Context::new_for_test(4).0);
149 let mut aggregator = ThresholdClock::new(0, context);
150
151 let block_refs = vec![
152 BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
153 BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
154 BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
155 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
156 BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
157 BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
158 BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
159 BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
160 ];
161
162 let result = aggregator.add_blocks(block_refs);
163 assert_eq!(Some(5), result);
164 }
165}