1use std::{
6 collections::{BTreeMap, HashSet},
7 fmt::Debug,
8 ops::Bound::{Excluded, Included},
9 sync::Arc,
10};
11
12use consensus_config::AuthorityIndex;
13use serde::{Deserialize, Serialize};
14
15use crate::{
16 Round, VerifiedBlock,
17 block::{BlockAPI, BlockDigest, BlockRef, Slot},
18 commit::{CommitRange, CommittedSubDag},
19 context::Context,
20 stake_aggregator::{QuorumThreshold, StakeAggregator},
21};
22
23pub(crate) struct ReputationScoreCalculator {
24 pub(crate) commit_range: CommitRange,
26 pub(crate) scores_per_authority: Vec<u64>,
28
29 unscored_subdag: UnscoredSubdag,
34}
35
36impl ReputationScoreCalculator {
37 pub(crate) fn new(context: Arc<Context>, unscored_subdags: &[CommittedSubDag]) -> Self {
38 let num_authorities = context.committee.size();
39 let scores_per_authority = vec![0_u64; num_authorities];
40
41 assert!(
42 !unscored_subdags.is_empty(),
43 "Attempted to calculate scores with no unscored subdags"
44 );
45
46 let unscored_subdag = UnscoredSubdag::new(context.clone(), unscored_subdags);
47 let commit_range = unscored_subdag.commit_range.clone();
48
49 Self {
50 unscored_subdag,
51 commit_range,
52 scores_per_authority,
53 }
54 }
55
56 pub(crate) fn calculate(&mut self) -> ReputationScores {
57 let leaders = self.unscored_subdag.committed_leaders.clone();
58 for leader in leaders {
59 let leader_slot = Slot::from(leader);
60 tracing::trace!("Calculating score for leader {leader_slot}");
61 self.add_scores(self.calculate_scores_for_leader(&self.unscored_subdag, leader_slot));
62 }
63
64 ReputationScores::new(self.commit_range.clone(), self.scores_per_authority.clone())
65 }
66
67 fn add_scores(&mut self, scores: Vec<u64>) {
68 assert_eq!(scores.len(), self.scores_per_authority.len());
69
70 for (authority_idx, score) in scores.iter().enumerate() {
71 self.scores_per_authority[authority_idx] += *score;
72 }
73 }
74
75 fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec<u64> {
78 let num_authorities = subdag.context.committee.size();
79 let mut scores_per_authority = vec![0_u64; num_authorities];
80 let leader_blocks = subdag.get_blocks_at_slot(leader_slot);
81 if leader_blocks.is_empty() {
82 tracing::trace!(
83 "[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring",
84 subdag.context.own_index
85 );
86 return scores_per_authority;
87 }
88 assert!(leader_blocks.len() == 1);
91 let leader_block = leader_blocks.first().unwrap();
92 let voting_round = leader_slot.round + 1;
93 let voting_blocks = subdag.get_blocks_at_round(voting_round);
94 for potential_vote in voting_blocks {
95 if subdag.is_vote(&potential_vote, leader_block) {
99 let authority = potential_vote.author();
100 tracing::trace!(
101 "Found a vote {} for leader {leader_block} from authority {authority}",
102 potential_vote.reference()
103 );
104 tracing::trace!(
105 "[{}] scores +1 reputation for {authority}!",
106 subdag.context.own_index
107 );
108 scores_per_authority[authority] += 1;
109 }
110 }
111 scores_per_authority
112 }
113}
114
115#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
116pub(crate) struct ReputationScores {
117 pub(crate) scores_per_authority: Vec<u64>,
119 pub(crate) commit_range: CommitRange,
121}
122
123impl ReputationScores {
124 pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
125 Self {
126 scores_per_authority,
127 commit_range,
128 }
129 }
130
131 pub(crate) fn highest_score(&self) -> u64 {
132 *self.scores_per_authority.iter().max().unwrap_or(&0)
133 }
134
135 pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
137 self.scores_per_authority
138 .iter()
139 .enumerate()
140 .map(|(index, score)| {
141 (
142 context
143 .committee
144 .to_authority_index(index)
145 .expect("Should be a valid AuthorityIndex"),
146 *score,
147 )
148 })
149 .collect()
150 }
151
152 pub(crate) fn update_metrics(&self, context: Arc<Context>) {
153 for (index, score) in self.scores_per_authority.iter().enumerate() {
154 let authority_index = context
155 .committee
156 .to_authority_index(index)
157 .expect("Should be a valid AuthorityIndex");
158 let authority = context.committee.authority(authority_index);
159 if !authority.hostname.is_empty() {
160 context
161 .metrics
162 .node_metrics
163 .reputation_scores
164 .with_label_values(&[&authority.hostname])
165 .set(*score as i64);
166 }
167 }
168 }
169}
170
171pub(crate) struct ScoringSubdag {
179 pub(crate) context: Arc<Context>,
180 pub(crate) commit_range: Option<CommitRange>,
181 pub(crate) leaders: HashSet<BlockRef>,
184 pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
188}
189
190impl ScoringSubdag {
191 pub(crate) fn new(context: Arc<Context>) -> Self {
192 Self {
193 context,
194 commit_range: None,
195 leaders: HashSet::new(),
196 votes: BTreeMap::new(),
197 }
198 }
199
200 pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
201 let _s = self
202 .context
203 .metrics
204 .node_metrics
205 .scope_processing_time
206 .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
207 .start_timer();
208 for subdag in committed_subdags {
209 if self.commit_range.is_none() {
212 self.commit_range = Some(CommitRange::new(
213 subdag.commit_ref.index..=subdag.commit_ref.index,
214 ));
215 } else {
216 let commit_range = self.commit_range.as_mut().unwrap();
217 commit_range.extend_to(subdag.commit_ref.index);
218 }
219 tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
221 self.leaders.insert(subdag.leader);
222 for block in subdag.blocks {
225 for ancestor in block.ancestors() {
226 if ancestor.round != block.round().saturating_sub(1) {
229 continue;
230 }
231 if self.leaders.contains(ancestor) {
234 tracing::trace!(
237 "Found a vote {} for leader {ancestor} from authority {}",
238 block.reference(),
239 block.author()
240 );
241 assert!(
242 self.votes
243 .insert(block.reference(), StakeAggregator::new())
244 .is_none(),
245 "Vote {block} already exists. Duplicate vote found for leader {ancestor}"
246 );
247 }
248 if let Some(stake) = self.votes.get_mut(ancestor) {
249 tracing::trace!(
252 "Found a distributed vote {ancestor} from authority {}",
253 ancestor.author
254 );
255 stake.add(block.author(), &self.context.committee);
256 }
257 }
258 }
259 }
260 }
261
262 pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
265 let scores_per_authority = self.distributed_votes_scores();
266
267 ReputationScores::new(
269 self.commit_range
270 .clone()
271 .expect("CommitRange should be set if calculate_scores is called."),
272 scores_per_authority,
273 )
274 }
275
276 fn distributed_votes_scores(&self) -> Vec<u64> {
281 let _s = self
282 .context
283 .metrics
284 .node_metrics
285 .scope_processing_time
286 .with_label_values(&["ScoringSubdag::score_distributed_votes"])
287 .start_timer();
288
289 let num_authorities = self.context.committee.size();
290 let mut scores_per_authority = vec![0_u64; num_authorities];
291
292 for (vote, stake_agg) in self.votes.iter() {
293 let authority = vote.author;
294 let stake = stake_agg.stake();
295 tracing::trace!(
296 "[{}] scores +{stake} reputation for {authority}!",
297 self.context.own_index,
298 );
299 scores_per_authority[authority.value()] += stake;
300 }
301 scores_per_authority
302 }
303
304 pub(crate) fn scored_subdags_count(&self) -> usize {
305 if let Some(commit_range) = &self.commit_range {
306 commit_range.size()
307 } else {
308 0
309 }
310 }
311
312 pub(crate) fn is_empty(&self) -> bool {
313 self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
314 }
315
316 pub(crate) fn clear(&mut self) {
317 self.leaders.clear();
318 self.votes.clear();
319 self.commit_range = None;
320 }
321}
322
323pub(crate) struct UnscoredSubdag {
330 pub(crate) context: Arc<Context>,
331 pub(crate) commit_range: CommitRange,
332 pub(crate) committed_leaders: Vec<BlockRef>,
333 pub(crate) blocks: BTreeMap<BlockRef, VerifiedBlock>,
338}
339
340impl UnscoredSubdag {
341 pub(crate) fn new(context: Arc<Context>, subdags: &[CommittedSubDag]) -> Self {
342 let mut committed_leaders = vec![];
343 let blocks = subdags
344 .iter()
345 .enumerate()
346 .flat_map(|(subdag_index, subdag)| {
347 committed_leaders.push(subdag.leader);
348 if subdag_index == 0 {
349 subdag.blocks.iter()
350 } else {
351 let previous_subdag = &subdags[subdag_index - 1];
352 let expected_next_subdag_index = previous_subdag.commit_ref.index + 1;
353 assert_eq!(
354 subdag.commit_ref.index, expected_next_subdag_index,
355 "Non-contiguous commit index (expected: {}, found: {})",
356 expected_next_subdag_index, subdag.commit_ref.index
357 );
358 subdag.blocks.iter()
359 }
360 })
361 .map(|block| (block.reference(), block.clone()))
362 .collect::<BTreeMap<_, _>>();
363
364 let commit_range = CommitRange::new(
366 subdags.first().unwrap().commit_ref.index..=subdags.last().unwrap().commit_ref.index,
367 );
368
369 assert!(
370 !blocks.is_empty(),
371 "Attempted to create UnscoredSubdag with no blocks"
372 );
373
374 Self {
375 context,
376 commit_range,
377 committed_leaders,
378 blocks,
379 }
380 }
381
382 pub(crate) fn find_supported_leader_block(
383 &self,
384 leader_slot: Slot,
385 from: &VerifiedBlock,
386 ) -> Option<BlockRef> {
387 if from.round() < leader_slot.round {
388 return None;
389 }
390 for ancestor in from.ancestors() {
391 if Slot::from(*ancestor) == leader_slot {
392 return Some(*ancestor);
393 }
394 if ancestor.round <= leader_slot.round {
396 continue;
397 }
398 if let Some(ancestor) = self.get_block(ancestor) {
399 if let Some(support) = self.find_supported_leader_block(leader_slot, &ancestor) {
400 return Some(support);
401 }
402 } else {
403 tracing::trace!(
405 "Potential vote's ancestor block not found in unscored committed subdags: {:?}",
406 ancestor
407 );
408 return None;
409 }
410 }
411 None
412 }
413
414 pub(crate) fn is_vote(
415 &self,
416 potential_vote: &VerifiedBlock,
417 leader_block: &VerifiedBlock,
418 ) -> bool {
419 let reference = leader_block.reference();
420 let leader_slot = Slot::from(reference);
421 self.find_supported_leader_block(leader_slot, potential_vote) == Some(reference)
422 }
423
424 pub(crate) fn get_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
425 let mut blocks = vec![];
426 for (_block_ref, block) in self.blocks.range((
427 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
428 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
429 )) {
430 blocks.push(block.clone())
431 }
432 blocks
433 }
434
435 pub(crate) fn get_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
436 let mut blocks = vec![];
437 for (_block_ref, block) in self.blocks.range((
438 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
439 Excluded(BlockRef::new(
440 round + 1,
441 AuthorityIndex::ZERO,
442 BlockDigest::MIN,
443 )),
444 )) {
445 blocks.push(block.clone())
446 }
447 blocks
448 }
449
450 pub(crate) fn get_block(&self, block_ref: &BlockRef) -> Option<VerifiedBlock> {
451 self.blocks.get(block_ref).cloned()
452 }
453}
454
455#[cfg(test)]
456mod tests {
457
458 use super::*;
459 use crate::{CommitDigest, CommitRef, test_dag_builder::DagBuilder};
460
461 #[tokio::test]
462 async fn test_reputation_scores_authorities_by_score() {
463 let context = Arc::new(Context::new_for_test(4).0);
464 let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
465 let authorities = scores.authorities_by_score(context);
466 assert_eq!(
467 authorities,
468 vec![
469 (AuthorityIndex::new_for_test(0), 4),
470 (AuthorityIndex::new_for_test(1), 1),
471 (AuthorityIndex::new_for_test(2), 1),
472 (AuthorityIndex::new_for_test(3), 3),
473 ]
474 );
475 }
476
477 #[tokio::test]
478 async fn test_reputation_scores_update_metrics() {
479 let context = Arc::new(Context::new_for_test(4).0);
480 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
481 scores.update_metrics(context.clone());
482 let metrics = context.metrics.node_metrics.reputation_scores.clone();
483 assert_eq!(
484 metrics
485 .get_metric_with_label_values(&["test_host_0"])
486 .unwrap()
487 .get(),
488 1
489 );
490 assert_eq!(
491 metrics
492 .get_metric_with_label_values(&["test_host_1"])
493 .unwrap()
494 .get(),
495 2
496 );
497 assert_eq!(
498 metrics
499 .get_metric_with_label_values(&["test_host_2"])
500 .unwrap()
501 .get(),
502 4
503 );
504 assert_eq!(
505 metrics
506 .get_metric_with_label_values(&["test_host_3"])
507 .unwrap()
508 .get(),
509 3
510 );
511 }
512
513 #[tokio::test]
514 async fn test_scoring_subdag() {
515 telemetry_subscribers::init_for_testing();
516 let context = Arc::new(Context::new_for_test(4).0);
517 let mut dag_builder = DagBuilder::new(context.clone());
519 dag_builder.layers(1..=3).build();
520 dag_builder
522 .layer(4)
523 .authorities(vec![
524 AuthorityIndex::new_for_test(1),
525 AuthorityIndex::new_for_test(2),
526 AuthorityIndex::new_for_test(3),
527 ])
528 .skip_block()
529 .build();
530
531 let mut scoring_subdag = ScoringSubdag::new(context.clone());
532
533 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
534 scoring_subdag.add_subdags(vec![sub_dag]);
535 }
536
537 let scores = scoring_subdag.calculate_distributed_vote_scores();
538 assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
539 assert_eq!(scores.commit_range, (1..=4).into());
540 }
541
542 #[tokio::test]
544 async fn test_reputation_score_calculator() {
545 telemetry_subscribers::init_for_testing();
546 let context = Arc::new(Context::new_for_test(4).0);
547
548 let mut dag_builder = DagBuilder::new(context.clone());
550 dag_builder.layers(1..=3).build();
551 dag_builder
553 .layer(4)
554 .authorities(vec![
555 AuthorityIndex::new_for_test(1),
556 AuthorityIndex::new_for_test(2),
557 AuthorityIndex::new_for_test(3),
558 ])
559 .skip_block()
560 .build();
561
562 let mut unscored_subdags = vec![];
563 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
564 unscored_subdags.push(sub_dag);
565 }
566 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
567 let scores = calculator.calculate();
568 assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
569 assert_eq!(scores.commit_range, (1..=4).into());
570 }
571
572 #[tokio::test]
573 #[should_panic(expected = "Attempted to calculate scores with no unscored subdags")]
574 async fn test_reputation_score_calculator_no_subdags() {
575 telemetry_subscribers::init_for_testing();
576 let context = Arc::new(Context::new_for_test(4).0);
577
578 let unscored_subdags = vec![];
579 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
580 calculator.calculate();
581 }
582
583 #[tokio::test]
584 #[should_panic(expected = "Attempted to create UnscoredSubdag with no blocks")]
585 async fn test_reputation_score_calculator_no_subdag_blocks() {
586 telemetry_subscribers::init_for_testing();
587 let context = Arc::new(Context::new_for_test(4).0);
588
589 let blocks = vec![];
590 let unscored_subdags = vec![CommittedSubDag::new(
591 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
592 blocks,
593 context.clock.timestamp_utc_ms(),
594 CommitRef::new(1, CommitDigest::MIN),
595 vec![],
596 )];
597 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
598 calculator.calculate();
599 }
600
601 #[tokio::test]
602 async fn test_scoring_with_missing_block_in_subdag() {
603 telemetry_subscribers::init_for_testing();
604 let context = Arc::new(Context::new_for_test(4).0);
605
606 let mut dag_builder = DagBuilder::new(context.clone());
607 dag_builder
610 .layer(1)
611 .authorities(vec![AuthorityIndex::new_for_test(0)])
612 .skip_block()
613 .build();
614 dag_builder.layers(2..=3).build();
616 dag_builder
618 .layer(4)
619 .authorities(vec![
620 AuthorityIndex::new_for_test(1),
621 AuthorityIndex::new_for_test(2),
622 AuthorityIndex::new_for_test(3),
623 ])
624 .skip_block()
625 .build();
626
627 let mut unscored_subdags = vec![];
628 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
629 unscored_subdags.push(sub_dag);
630 }
631
632 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
633 let scores = calculator.calculate();
634 assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
635 assert_eq!(scores.commit_range, (1..=4).into());
636 }
637}