1use std::{cmp::Ordering, sync::Arc};
6
7use tokio::time::Instant;
8
9use crate::{
10 block::{BlockRef, Round},
11 context::Context,
12 stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16 aggregator: StakeAggregator<QuorumThreshold>,
17 round: Round,
18 quorum_ts: Instant,
19 context: Arc<Context>,
20}
21
22impl ThresholdClock {
23 pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
24 Self {
25 aggregator: StakeAggregator::new(),
26 round,
27 quorum_ts: Instant::now(),
28 context,
29 }
30 }
31
32 fn try_advance_round(&mut self, new_round: Round) -> bool {
35 if self.aggregator.reached_threshold(&self.context.committee) {
36 self.aggregator.clear();
37 self.round = new_round;
38
39 let now = Instant::now();
40 self.context
41 .metrics
42 .node_metrics
43 .quorum_receive_latency
44 .observe(now.duration_since(self.quorum_ts).as_secs_f64());
45 self.quorum_ts = now;
46 true
47 } else {
48 false
49 }
50 }
51
52 pub(crate) fn add_block(&mut self, block: BlockRef) {
63 match block.round.cmp(&self.round) {
64 Ordering::Less => {}
65 Ordering::Greater => {
66 self.aggregator.clear();
68 self.aggregator.add(block.author, &self.context.committee);
69 self.round = block.round;
70 }
71 Ordering::Equal => {
72 self.aggregator.add(block.author, &self.context.committee);
73 }
74 }
75 self.try_advance_round(block.round + 1);
76 }
77
78 #[cfg(test)]
82 fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
83 let previous_round = self.round;
84 for block_ref in blocks {
85 self.add_block(block_ref);
86 }
87 (self.round > previous_round).then_some(self.round)
88 }
89
90 pub(crate) fn get_round(&self) -> Round {
91 self.round
92 }
93
94 pub(crate) fn get_quorum_ts(&self) -> Instant {
95 self.quorum_ts
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use consensus_config::AuthorityIndex;
102 use iota_common::scoring_metrics::{ScoringMetricsV1, VersionedScoringMetrics};
103 use iota_protocol_config::ProtocolConfig;
104
105 use super::*;
106 use crate::{block::BlockDigest, scoring_metrics_store::MysticetiScoringMetricsStore};
107
108 #[tokio::test]
109 async fn test_threshold_clock_add_block() {
110 let context = Arc::new(Context::new_for_test(4).0);
111 let mut aggregator = ThresholdClock::new(0, context);
112
113 aggregator.add_block(BlockRef::new(
114 0,
115 AuthorityIndex::new_for_test(0),
116 BlockDigest::default(),
117 ));
118 assert_eq!(aggregator.get_round(), 0);
119 aggregator.add_block(BlockRef::new(
120 0,
121 AuthorityIndex::new_for_test(1),
122 BlockDigest::default(),
123 ));
124 assert_eq!(aggregator.get_round(), 0);
125 aggregator.add_block(BlockRef::new(
126 0,
127 AuthorityIndex::new_for_test(2),
128 BlockDigest::default(),
129 ));
130 assert_eq!(aggregator.get_round(), 1);
131 aggregator.add_block(BlockRef::new(
132 1,
133 AuthorityIndex::new_for_test(0),
134 BlockDigest::default(),
135 ));
136 assert_eq!(aggregator.get_round(), 1);
137 aggregator.add_block(BlockRef::new(
138 1,
139 AuthorityIndex::new_for_test(3),
140 BlockDigest::default(),
141 ));
142 assert_eq!(aggregator.get_round(), 1);
143 aggregator.add_block(BlockRef::new(
144 2,
145 AuthorityIndex::new_for_test(1),
146 BlockDigest::default(),
147 ));
148 assert_eq!(aggregator.get_round(), 2);
149 aggregator.add_block(BlockRef::new(
150 1,
151 AuthorityIndex::new_for_test(1),
152 BlockDigest::default(),
153 ));
154 assert_eq!(aggregator.get_round(), 2);
155 aggregator.add_block(BlockRef::new(
156 5,
157 AuthorityIndex::new_for_test(2),
158 BlockDigest::default(),
159 ));
160 assert_eq!(aggregator.get_round(), 5);
161 }
162
163 #[tokio::test]
164 async fn test_threshold_clock_add_blocks() {
165 let context = Arc::new(Context::new_for_test(4).0);
166 let mut aggregator = ThresholdClock::new(0, context);
167
168 let block_refs = vec![
169 BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
170 BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
171 BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
172 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
173 BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
174 BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
175 BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
176 BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
177 ];
178
179 let result = aggregator.add_blocks(block_refs);
180 assert_eq!(Some(5), result);
181 }
182
183 #[tokio::test]
186 async fn test_threshold_clock_round_skip_then_quorum() {
187 let context = Arc::new(Context::new_for_test(4).0);
188 let mut clock = ThresholdClock::new(0, context);
189
190 clock.add_block(BlockRef::new(
192 5,
193 AuthorityIndex::new_for_test(0),
194 BlockDigest::default(),
195 ));
196 assert_eq!(clock.get_round(), 5);
197
198 clock.add_block(BlockRef::new(
200 5,
201 AuthorityIndex::new_for_test(1),
202 BlockDigest::default(),
203 ));
204 assert_eq!(clock.get_round(), 5);
205
206 clock.add_block(BlockRef::new(
207 5,
208 AuthorityIndex::new_for_test(2),
209 BlockDigest::default(),
210 ));
211 assert_eq!(clock.get_round(), 6); }
213
214 #[tokio::test]
217 async fn test_threshold_clock_super_majority_round_skip() {
218 use consensus_config::Parameters;
219 use tempfile::TempDir;
220
221 use crate::metrics::test_metrics;
222
223 let (committee, _) = consensus_config::local_committee_and_keys(0, vec![5, 1, 1]);
225 let committee_size = committee.size();
226 let metrics = test_metrics();
227 let temp_dir = TempDir::new().unwrap();
228 let current_local_metrics_count =
229 Arc::new(VersionedScoringMetrics::V1(ScoringMetricsV1::new(3)));
230 let scoring_metrics_store = Arc::new(MysticetiScoringMetricsStore::new(
231 committee_size,
232 current_local_metrics_count,
233 &ProtocolConfig::get_for_max_version_UNSAFE(),
234 ));
235
236 let context = Arc::new(crate::context::Context::new(
237 0,
238 AuthorityIndex::new_for_test(0),
239 committee,
240 Parameters {
241 db_path: temp_dir.keep(),
242 ..Default::default()
243 },
244 iota_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE(),
245 metrics,
246 scoring_metrics_store,
247 Arc::new(crate::context::Clock::default()),
248 ));
249
250 let mut clock = ThresholdClock::new(0, context);
251
252 clock.add_block(BlockRef::new(
255 5,
256 AuthorityIndex::new_for_test(0),
257 BlockDigest::default(),
258 ));
259 assert_eq!(clock.get_round(), 6);
260
261 clock.add_block(BlockRef::new(
263 5,
264 AuthorityIndex::new_for_test(1),
265 BlockDigest::default(),
266 ));
267 assert_eq!(clock.get_round(), 6);
268 clock.add_block(BlockRef::new(
269 5,
270 AuthorityIndex::new_for_test(2),
271 BlockDigest::default(),
272 ));
273 assert_eq!(clock.get_round(), 6);
274 }
275}