1use std::{cmp::Ordering, sync::Arc};
6
7use tokio::time::Instant;
8
9use crate::{
10 block::{BlockRef, Round},
11 context::Context,
12 stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16 aggregator: StakeAggregator<QuorumThreshold>,
17 round: Round,
18 quorum_ts: Instant,
19 context: Arc<Context>,
20}
21
22impl ThresholdClock {
23 pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
24 Self {
25 aggregator: StakeAggregator::new(),
26 round,
27 quorum_ts: Instant::now(),
28 context,
29 }
30 }
31
32 fn try_advance_round(&mut self, new_round: Round) -> bool {
35 if self.aggregator.reached_threshold(&self.context.committee) {
36 self.aggregator.clear();
37 self.round = new_round;
38
39 let now = Instant::now();
40 self.context
41 .metrics
42 .node_metrics
43 .quorum_receive_latency
44 .observe(now.duration_since(self.quorum_ts).as_secs_f64());
45 self.quorum_ts = now;
46 true
47 } else {
48 false
49 }
50 }
51
52 pub(crate) fn add_block(&mut self, block: BlockRef) {
63 match block.round.cmp(&self.round) {
64 Ordering::Less => {}
65 Ordering::Greater => {
66 self.aggregator.clear();
68 self.aggregator.add(block.author, &self.context.committee);
69 self.round = block.round;
70 }
71 Ordering::Equal => {
72 self.aggregator.add(block.author, &self.context.committee);
73 }
74 }
75 self.try_advance_round(block.round + 1);
76 }
77
78 #[cfg(test)]
82 fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
83 let previous_round = self.round;
84 for block_ref in blocks {
85 self.add_block(block_ref);
86 }
87 (self.round > previous_round).then_some(self.round)
88 }
89
90 pub(crate) fn get_round(&self) -> Round {
91 self.round
92 }
93
94 pub(crate) fn get_quorum_ts(&self) -> Instant {
95 self.quorum_ts
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use consensus_config::AuthorityIndex;
102
103 use super::*;
104 use crate::block::BlockDigest;
105
106 #[tokio::test]
107 async fn test_threshold_clock_add_block() {
108 let context = Arc::new(Context::new_for_test(4).0);
109 let mut aggregator = ThresholdClock::new(0, context);
110
111 aggregator.add_block(BlockRef::new(
112 0,
113 AuthorityIndex::new_for_test(0),
114 BlockDigest::default(),
115 ));
116 assert_eq!(aggregator.get_round(), 0);
117 aggregator.add_block(BlockRef::new(
118 0,
119 AuthorityIndex::new_for_test(1),
120 BlockDigest::default(),
121 ));
122 assert_eq!(aggregator.get_round(), 0);
123 aggregator.add_block(BlockRef::new(
124 0,
125 AuthorityIndex::new_for_test(2),
126 BlockDigest::default(),
127 ));
128 assert_eq!(aggregator.get_round(), 1);
129 aggregator.add_block(BlockRef::new(
130 1,
131 AuthorityIndex::new_for_test(0),
132 BlockDigest::default(),
133 ));
134 assert_eq!(aggregator.get_round(), 1);
135 aggregator.add_block(BlockRef::new(
136 1,
137 AuthorityIndex::new_for_test(3),
138 BlockDigest::default(),
139 ));
140 assert_eq!(aggregator.get_round(), 1);
141 aggregator.add_block(BlockRef::new(
142 2,
143 AuthorityIndex::new_for_test(1),
144 BlockDigest::default(),
145 ));
146 assert_eq!(aggregator.get_round(), 2);
147 aggregator.add_block(BlockRef::new(
148 1,
149 AuthorityIndex::new_for_test(1),
150 BlockDigest::default(),
151 ));
152 assert_eq!(aggregator.get_round(), 2);
153 aggregator.add_block(BlockRef::new(
154 5,
155 AuthorityIndex::new_for_test(2),
156 BlockDigest::default(),
157 ));
158 assert_eq!(aggregator.get_round(), 5);
159 }
160
161 #[tokio::test]
162 async fn test_threshold_clock_add_blocks() {
163 let context = Arc::new(Context::new_for_test(4).0);
164 let mut aggregator = ThresholdClock::new(0, context);
165
166 let block_refs = vec![
167 BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
168 BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
169 BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
170 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
171 BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
172 BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
173 BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
174 BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
175 ];
176
177 let result = aggregator.add_blocks(block_refs);
178 assert_eq!(Some(5), result);
179 }
180
181 #[tokio::test]
184 async fn test_threshold_clock_round_skip_then_quorum() {
185 let context = Arc::new(Context::new_for_test(4).0);
186 let mut clock = ThresholdClock::new(0, context);
187
188 clock.add_block(BlockRef::new(
190 5,
191 AuthorityIndex::new_for_test(0),
192 BlockDigest::default(),
193 ));
194 assert_eq!(clock.get_round(), 5);
195
196 clock.add_block(BlockRef::new(
198 5,
199 AuthorityIndex::new_for_test(1),
200 BlockDigest::default(),
201 ));
202 assert_eq!(clock.get_round(), 5);
203
204 clock.add_block(BlockRef::new(
205 5,
206 AuthorityIndex::new_for_test(2),
207 BlockDigest::default(),
208 ));
209 assert_eq!(clock.get_round(), 6); }
211
212 #[tokio::test]
215 async fn test_threshold_clock_super_majority_round_skip() {
216 use consensus_config::Parameters;
217 use tempfile::TempDir;
218
219 use crate::metrics::test_metrics;
220
221 let (committee, _) = consensus_config::local_committee_and_keys(0, vec![5, 1, 1]);
223 let metrics = test_metrics(3);
224 let temp_dir = TempDir::new().unwrap();
225
226 let context = Arc::new(crate::context::Context::new(
227 0,
228 AuthorityIndex::new_for_test(0),
229 committee,
230 Parameters {
231 db_path: temp_dir.keep(),
232 ..Default::default()
233 },
234 iota_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
235 metrics,
236 Arc::new(crate::context::Clock::default()),
237 ));
238
239 let mut clock = ThresholdClock::new(0, context);
240
241 clock.add_block(BlockRef::new(
244 5,
245 AuthorityIndex::new_for_test(0),
246 BlockDigest::default(),
247 ));
248 assert_eq!(clock.get_round(), 6);
249
250 clock.add_block(BlockRef::new(
252 5,
253 AuthorityIndex::new_for_test(1),
254 BlockDigest::default(),
255 ));
256 assert_eq!(clock.get_round(), 6);
257 clock.add_block(BlockRef::new(
258 5,
259 AuthorityIndex::new_for_test(2),
260 BlockDigest::default(),
261 ));
262 assert_eq!(clock.get_round(), 6);
263 }
264}