1use std::sync::Arc;
6
7use consensus_config::AuthorityIndex;
8use tracing::info;
9
10use crate::{context::Context, leader_scoring::ReputationScores, round_prober::QuorumRound};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
13pub(crate) enum AncestorState {
14 Include,
15 Exclude(u64),
16}
17
18#[derive(Clone)]
19struct AncestorInfo {
20 state: AncestorState,
21 lock_expiry_count: u32,
25}
26
27impl AncestorInfo {
28 fn new() -> Self {
29 Self {
30 state: AncestorState::Include,
31 lock_expiry_count: 0,
32 }
33 }
34
35 fn is_locked(
36 &self,
37 propagation_score_update_count: u32,
38 quorum_round_update_count: u32,
39 ) -> bool {
40 match self.state {
41 AncestorState::Include => self.lock_expiry_count > propagation_score_update_count,
42 AncestorState::Exclude(_) => self.lock_expiry_count > quorum_round_update_count,
43 }
44 }
45
46 fn set_lock(&mut self, future_count: u32) {
47 self.lock_expiry_count = future_count;
48 }
49}
50
51pub(crate) struct AncestorStateManager {
52 context: Arc<Context>,
53 state_map: Vec<AncestorInfo>,
54 propagation_score_update_count: u32,
55 quorum_round_update_count: u32,
56 pub(crate) received_quorum_round_per_authority: Vec<QuorumRound>,
57 pub(crate) accepted_quorum_round_per_authority: Vec<QuorumRound>,
58 pub(crate) propagation_scores: ReputationScores,
61}
62
63impl AncestorStateManager {
64 #[cfg(not(test))]
68 const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 10;
69 #[cfg(test)]
70 const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 1;
71
72 #[cfg(not(test))]
77 const STATE_LOCK_SCORE_UPDATES: u32 = 2;
78 #[cfg(test)]
79 const STATE_LOCK_SCORE_UPDATES: u32 = 1;
80
81 const EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 10;
83
84 pub(crate) fn new(context: Arc<Context>) -> Self {
85 let state_map = vec![AncestorInfo::new(); context.committee.size()];
86
87 let received_quorum_round_per_authority = vec![(0, 0); context.committee.size()];
88 let accepted_quorum_round_per_authority = vec![(0, 0); context.committee.size()];
89 Self {
90 context,
91 state_map,
92 propagation_score_update_count: 0,
93 quorum_round_update_count: 0,
94 propagation_scores: ReputationScores::default(),
95 received_quorum_round_per_authority,
96 accepted_quorum_round_per_authority,
97 }
98 }
99
100 pub(crate) fn set_quorum_rounds_per_authority(
101 &mut self,
102 received_quorum_rounds: Vec<QuorumRound>,
103 accepted_quorum_rounds: Vec<QuorumRound>,
104 ) {
105 self.received_quorum_round_per_authority = received_quorum_rounds;
106 self.accepted_quorum_round_per_authority = accepted_quorum_rounds;
107 self.quorum_round_update_count += 1;
108 }
109
110 pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) {
111 self.propagation_scores = scores;
112 self.propagation_score_update_count += 1;
113 }
114
115 pub(crate) fn get_ancestor_states(&self) -> Vec<AncestorState> {
116 self.state_map.iter().map(|info| info.state).collect()
117 }
118
119 pub(crate) fn update_all_ancestors_state(&mut self) {
122 let propagation_scores_by_authority = self
123 .propagation_scores
124 .scores_per_authority
125 .clone()
126 .into_iter()
127 .enumerate()
128 .map(|(idx, score)| {
129 (
130 self.context
131 .committee
132 .to_authority_index(idx)
133 .expect("Index should be valid"),
134 score,
135 )
136 })
137 .collect::<Vec<_>>();
138
139 let network_high_quorum_round = self.calculate_network_high_quorum_round();
143
144 for (authority_id, score) in propagation_scores_by_authority {
148 let (_low, authority_high_quorum_round) = if self
149 .context
150 .protocol_config
151 .consensus_round_prober_probe_accepted_rounds()
152 {
153 self.accepted_quorum_round_per_authority[authority_id]
154 } else {
155 self.received_quorum_round_per_authority[authority_id]
156 };
157
158 self.update_state(
159 authority_id,
160 score,
161 authority_high_quorum_round,
162 network_high_quorum_round,
163 );
164 }
165 }
166
167 fn update_state(
170 &mut self,
171 authority_id: AuthorityIndex,
172 propagation_score: u64,
173 authority_high_quorum_round: u32,
174 network_high_quorum_round: u32,
175 ) {
176 let block_hostname = &self.context.committee.authority(authority_id).hostname;
177 let mut ancestor_info = self.state_map[authority_id].clone();
178
179 if ancestor_info.is_locked(
180 self.propagation_score_update_count,
181 self.quorum_round_update_count,
182 ) {
183 return;
185 }
186
187 let low_score_threshold =
188 (self.propagation_scores.highest_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100;
189
190 match ancestor_info.state {
191 AncestorState::Include => {
194 if propagation_score <= low_score_threshold {
195 ancestor_info.state = AncestorState::Exclude(propagation_score);
196 ancestor_info.set_lock(
197 self.quorum_round_update_count + Self::STATE_LOCK_QUORUM_ROUND_UPDATES,
198 );
199 info!(
200 "Authority {authority_id} moved to EXCLUDE state with score {propagation_score} <= threshold of {low_score_threshold} and locked for {:?} quorum round updates",
201 Self::STATE_LOCK_QUORUM_ROUND_UPDATES
202 );
203 self.context
204 .metrics
205 .node_metrics
206 .ancestor_state_change_by_authority
207 .with_label_values(&[block_hostname.as_str(), "exclude"])
208 .inc();
209 }
210 }
211 AncestorState::Exclude(_) => {
213 if propagation_score > low_score_threshold
216 || authority_high_quorum_round >= network_high_quorum_round
217 {
218 ancestor_info.state = AncestorState::Include;
219 ancestor_info.set_lock(
220 self.propagation_score_update_count + Self::STATE_LOCK_SCORE_UPDATES,
221 );
222 info!(
223 "Authority {authority_id} moved to INCLUDE state with {propagation_score} > threshold of {low_score_threshold} or {authority_high_quorum_round} >= {network_high_quorum_round} and locked for {:?} score updates.",
224 Self::STATE_LOCK_SCORE_UPDATES
225 );
226 self.context
227 .metrics
228 .node_metrics
229 .ancestor_state_change_by_authority
230 .with_label_values(&[block_hostname.as_str(), "include"])
231 .inc();
232 }
233 }
234 }
235
236 self.state_map[authority_id] = ancestor_info;
238 }
239
240 fn calculate_network_high_quorum_round(&self) -> u32 {
245 if self
246 .context
247 .protocol_config
248 .consensus_round_prober_probe_accepted_rounds()
249 {
250 self.calculate_network_high_accepted_quorum_round()
251 } else {
252 self.calculate_network_high_received_quorum_round()
253 }
254 }
255
256 fn calculate_network_high_accepted_quorum_round(&self) -> u32 {
257 let committee = &self.context.committee;
258
259 let high_quorum_rounds_with_stake = self
260 .accepted_quorum_round_per_authority
261 .iter()
262 .zip(committee.authorities())
263 .map(|((_low, high), (_, authority))| (*high, authority.stake))
264 .collect::<Vec<_>>();
265
266 self.calculate_network_high_quorum_round_internal(high_quorum_rounds_with_stake)
267 }
268
269 fn calculate_network_high_received_quorum_round(&self) -> u32 {
270 let committee = &self.context.committee;
271
272 let high_quorum_rounds_with_stake = self
273 .received_quorum_round_per_authority
274 .iter()
275 .zip(committee.authorities())
276 .map(|((_low, high), (_, authority))| (*high, authority.stake))
277 .collect::<Vec<_>>();
278
279 self.calculate_network_high_quorum_round_internal(high_quorum_rounds_with_stake)
280 }
281
282 fn calculate_network_high_quorum_round_internal(
289 &self,
290 mut high_quorum_rounds_with_stake: Vec<(u32, u64)>,
291 ) -> u32 {
292 high_quorum_rounds_with_stake.sort();
293
294 let mut total_stake = 0;
295 let mut network_high_quorum_round = 0;
296
297 for (round, stake) in high_quorum_rounds_with_stake.iter() {
298 total_stake += stake;
299 if total_stake >= self.context.committee.quorum_threshold() {
300 network_high_quorum_round = *round;
301 break;
302 }
303 }
304
305 network_high_quorum_round
306 }
307}
308
309#[cfg(test)]
310mod test {
311 use super::*;
312 use crate::leader_scoring::ReputationScores;
313
314 #[tokio::test]
315 async fn test_calculate_network_high_received_quorum_round() {
316 telemetry_subscribers::init_for_testing();
317
318 let (mut context, _key_pairs) = Context::new_for_test(4);
319 context
320 .protocol_config
321 .set_consensus_round_prober_probe_accepted_rounds(false);
322 let context = Arc::new(context);
323
324 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
325 let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
326 ancestor_state_manager.set_propagation_scores(scores);
327
328 let network_high_quorum_round =
331 ancestor_state_manager.calculate_network_high_quorum_round();
332 assert_eq!(network_high_quorum_round, 0);
333
334 let received_quorum_rounds = vec![(100, 229), (225, 229), (229, 300), (229, 300)];
335 let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
336 ancestor_state_manager.set_quorum_rounds_per_authority(
337 received_quorum_rounds.clone(),
338 accepted_quorum_rounds.clone(),
339 );
340
341 let network_high_quorum_round =
343 ancestor_state_manager.calculate_network_high_quorum_round();
344 assert_eq!(network_high_quorum_round, 300);
345 }
346
347 #[tokio::test]
348 async fn test_calculate_network_high_accepted_quorum_round() {
349 telemetry_subscribers::init_for_testing();
350
351 let (mut context, _key_pairs) = Context::new_for_test(4);
352 context
353 .protocol_config
354 .set_consensus_round_prober_probe_accepted_rounds(true);
355 let context = Arc::new(context);
356
357 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
358 let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
359 ancestor_state_manager.set_propagation_scores(scores);
360
361 let network_high_quorum_round =
364 ancestor_state_manager.calculate_network_high_quorum_round();
365 assert_eq!(network_high_quorum_round, 0);
366
367 let received_quorum_rounds = vec![(100, 229), (225, 300), (229, 300), (229, 300)];
368 let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
369 ancestor_state_manager.set_quorum_rounds_per_authority(
370 received_quorum_rounds.clone(),
371 accepted_quorum_rounds.clone(),
372 );
373
374 let network_high_quorum_round =
376 ancestor_state_manager.calculate_network_high_quorum_round();
377 assert_eq!(network_high_quorum_round, 229);
378 }
379
380 #[tokio::test]
387 async fn test_update_all_ancestor_state_using_accepted_rounds() {
388 telemetry_subscribers::init_for_testing();
389 let (mut context, _key_pairs) = Context::new_for_test(4);
390 context
391 .protocol_config
392 .set_consensus_round_prober_probe_accepted_rounds(true);
393 let context = Arc::new(context);
394
395 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
396 let mut ancestor_state_manager = AncestorStateManager::new(context);
397 ancestor_state_manager.set_propagation_scores(scores);
398
399 let received_quorum_rounds = vec![(300, 400), (300, 400), (300, 400), (300, 400)];
400 let accepted_quorum_rounds = vec![(225, 229), (225, 229), (229, 300), (229, 300)];
401 ancestor_state_manager
402 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
403 ancestor_state_manager.update_all_ancestors_state();
404
405 let state_map = ancestor_state_manager.get_ancestor_states();
408 for state in state_map.iter() {
409 assert_eq!(*state, AncestorState::Include);
410 }
411
412 let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]);
413 ancestor_state_manager.set_propagation_scores(scores);
414 ancestor_state_manager.update_all_ancestors_state();
415
416 let state_map = ancestor_state_manager.get_ancestor_states();
419 for (authority, state) in state_map.iter().enumerate() {
420 if (0..=1).contains(&authority) {
421 assert_eq!(*state, AncestorState::Exclude(10));
422 } else {
423 assert_eq!(*state, AncestorState::Include);
424 }
425 }
426
427 ancestor_state_manager.update_all_ancestors_state();
428
429 let state_map = ancestor_state_manager.get_ancestor_states();
432 for (authority, state) in state_map.iter().enumerate() {
433 if (0..=1).contains(&authority) {
434 assert_eq!(*state, AncestorState::Exclude(10));
435 } else {
436 assert_eq!(*state, AncestorState::Include);
437 }
438 }
439
440 let received_quorum_rounds = vec![(400, 500), (400, 500), (400, 500), (400, 500)];
443 let accepted_quorum_rounds = vec![(229, 300), (225, 229), (229, 300), (229, 300)];
444 ancestor_state_manager
445 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
446 ancestor_state_manager.update_all_ancestors_state();
447
448 let state_map = ancestor_state_manager.get_ancestor_states();
452 for (authority, state) in state_map.iter().enumerate() {
453 if authority == 1 {
454 assert_eq!(*state, AncestorState::Exclude(10));
455 } else {
456 assert_eq!(*state, AncestorState::Include);
457 }
458 }
459
460 let received_quorum_rounds = vec![(500, 600), (500, 600), (500, 600), (500, 600)];
461 let accepted_quorum_rounds = vec![(229, 300), (229, 300), (229, 300), (229, 300)];
462 ancestor_state_manager
463 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
464 ancestor_state_manager.update_all_ancestors_state();
465
466 let state_map = ancestor_state_manager.get_ancestor_states();
471 for state in state_map.iter() {
472 assert_eq!(*state, AncestorState::Include);
473 }
474
475 let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]);
477 ancestor_state_manager.set_propagation_scores(scores);
478 ancestor_state_manager.update_all_ancestors_state();
479
480 let state_map = ancestor_state_manager.get_ancestor_states();
483 for (authority, state) in state_map.iter().enumerate() {
484 if authority == 1 {
485 assert_eq!(*state, AncestorState::Exclude(10));
486 } else {
487 assert_eq!(*state, AncestorState::Include);
488 }
489 }
490 }
491
492 #[tokio::test]
499 async fn test_update_all_ancestor_state_using_received_rounds() {
500 telemetry_subscribers::init_for_testing();
501 let (mut context, _key_pairs) = Context::new_for_test(4);
502 context
503 .protocol_config
504 .set_consensus_round_prober_probe_accepted_rounds(false);
505 let context = Arc::new(context);
506
507 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
508 let mut ancestor_state_manager = AncestorStateManager::new(context);
509 ancestor_state_manager.set_propagation_scores(scores);
510
511 let received_quorum_rounds = vec![(225, 229), (225, 300), (229, 300), (229, 300)];
512 let accepted_quorum_rounds = vec![(100, 150), (100, 150), (100, 150), (100, 150)];
513 ancestor_state_manager
514 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
515 ancestor_state_manager.update_all_ancestors_state();
516
517 let state_map = ancestor_state_manager.get_ancestor_states();
520 for state in state_map.iter() {
521 assert_eq!(*state, AncestorState::Include);
522 }
523
524 let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]);
525 ancestor_state_manager.set_propagation_scores(scores);
526 ancestor_state_manager.update_all_ancestors_state();
527
528 let state_map = ancestor_state_manager.get_ancestor_states();
531 for (authority, state) in state_map.iter().enumerate() {
532 if (0..=1).contains(&authority) {
533 assert_eq!(*state, AncestorState::Exclude(10));
534 } else {
535 assert_eq!(*state, AncestorState::Include);
536 }
537 }
538
539 ancestor_state_manager.update_all_ancestors_state();
540
541 let state_map = ancestor_state_manager.get_ancestor_states();
544 for (authority, state) in state_map.iter().enumerate() {
545 if (0..=1).contains(&authority) {
546 assert_eq!(*state, AncestorState::Exclude(10));
547 } else {
548 assert_eq!(*state, AncestorState::Include);
549 }
550 }
551
552 let received_quorum_rounds = vec![(229, 300), (225, 229), (229, 300), (229, 300)];
555 let accepted_quorum_rounds = vec![(100, 150), (100, 150), (100, 150), (100, 150)];
556 ancestor_state_manager
557 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
558 ancestor_state_manager.update_all_ancestors_state();
559
560 let state_map = ancestor_state_manager.get_ancestor_states();
564 for (authority, state) in state_map.iter().enumerate() {
565 if authority == 1 {
566 assert_eq!(*state, AncestorState::Exclude(10));
567 } else {
568 assert_eq!(*state, AncestorState::Include);
569 }
570 }
571
572 let received_quorum_rounds = vec![(229, 300), (229, 300), (229, 300), (229, 300)];
573 let accepted_quorum_rounds = vec![(100, 150), (100, 150), (100, 150), (100, 150)];
574 ancestor_state_manager
575 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
576 ancestor_state_manager.update_all_ancestors_state();
577
578 let state_map = ancestor_state_manager.get_ancestor_states();
583 for state in state_map.iter() {
584 assert_eq!(*state, AncestorState::Include);
585 }
586
587 let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]);
589 ancestor_state_manager.set_propagation_scores(scores);
590 ancestor_state_manager.update_all_ancestors_state();
591
592 let state_map = ancestor_state_manager.get_ancestor_states();
595 for (authority, state) in state_map.iter().enumerate() {
596 if authority == 1 {
597 assert_eq!(*state, AncestorState::Exclude(10));
598 } else {
599 assert_eq!(*state, AncestorState::Include);
600 }
601 }
602 }
603}