1use std::{
6 collections::{BTreeMap, HashSet},
7 fmt::Debug,
8 ops::Bound::{Excluded, Included},
9 sync::Arc,
10};
11
12use consensus_config::AuthorityIndex;
13use serde::{Deserialize, Serialize};
14use tracing::instrument;
15
16use crate::{
17 Round, VerifiedBlock,
18 block::{BlockAPI, BlockDigest, BlockRef, Slot},
19 commit::{CommitRange, CommittedSubDag},
20 context::Context,
21 stake_aggregator::{QuorumThreshold, StakeAggregator},
22};
23
24pub(crate) struct ReputationScoreCalculator {
25 pub(crate) commit_range: CommitRange,
27 pub(crate) scores_per_authority: Vec<u64>,
29
30 unscored_subdag: UnscoredSubdag,
35}
36
37impl ReputationScoreCalculator {
38 pub(crate) fn new(context: Arc<Context>, unscored_subdags: &[CommittedSubDag]) -> Self {
39 let num_authorities = context.committee.size();
40 let scores_per_authority = vec![0_u64; num_authorities];
41
42 assert!(
43 !unscored_subdags.is_empty(),
44 "Attempted to calculate scores with no unscored subdags"
45 );
46
47 let unscored_subdag = UnscoredSubdag::new(context.clone(), unscored_subdags);
48 let commit_range = unscored_subdag.commit_range.clone();
49
50 Self {
51 unscored_subdag,
52 commit_range,
53 scores_per_authority,
54 }
55 }
56
57 pub(crate) fn calculate(&mut self) -> ReputationScores {
58 let leaders = self.unscored_subdag.committed_leaders.clone();
59 for leader in leaders {
60 let leader_slot = Slot::from(leader);
61 tracing::trace!("Calculating score for leader {leader_slot}");
62 self.add_scores(self.calculate_scores_for_leader(&self.unscored_subdag, leader_slot));
63 }
64
65 ReputationScores::new(self.commit_range.clone(), self.scores_per_authority.clone())
66 }
67
68 fn add_scores(&mut self, scores: Vec<u64>) {
69 assert_eq!(scores.len(), self.scores_per_authority.len());
70
71 for (authority_idx, score) in scores.iter().enumerate() {
72 self.scores_per_authority[authority_idx] += *score;
73 }
74 }
75
76 fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec<u64> {
79 let num_authorities = subdag.context.committee.size();
80 let mut scores_per_authority = vec![0_u64; num_authorities];
81 let leader_blocks = subdag.get_blocks_at_slot(leader_slot);
82 if leader_blocks.is_empty() {
83 tracing::trace!(
84 "[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring",
85 subdag.context.own_index
86 );
87 return scores_per_authority;
88 }
89 assert!(leader_blocks.len() == 1);
92 let leader_block = leader_blocks.first().unwrap();
93 let voting_round = leader_slot.round + 1;
94 let voting_blocks = subdag.get_blocks_at_round(voting_round);
95 for potential_vote in voting_blocks {
96 if subdag.is_vote(&potential_vote, leader_block) {
100 let authority = potential_vote.author();
101 tracing::trace!(
102 "Found a vote {} for leader {leader_block} from authority {authority}",
103 potential_vote.reference()
104 );
105 tracing::trace!(
106 "[{}] scores +1 reputation for {authority}!",
107 subdag.context.own_index
108 );
109 scores_per_authority[authority] += 1;
110 }
111 }
112 scores_per_authority
113 }
114}
115
116#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
117pub(crate) struct ReputationScores {
118 pub(crate) scores_per_authority: Vec<u64>,
120 pub(crate) commit_range: CommitRange,
122}
123
124impl ReputationScores {
125 pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
126 Self {
127 scores_per_authority,
128 commit_range,
129 }
130 }
131
132 pub(crate) fn highest_score(&self) -> u64 {
133 *self.scores_per_authority.iter().max().unwrap_or(&0)
134 }
135
136 pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
138 self.scores_per_authority
139 .iter()
140 .enumerate()
141 .map(|(index, score)| {
142 (
143 context
144 .committee
145 .to_authority_index(index)
146 .expect("Should be a valid AuthorityIndex"),
147 *score,
148 )
149 })
150 .collect()
151 }
152
153 pub(crate) fn update_metrics(&self, context: Arc<Context>) {
154 for (index, score) in self.scores_per_authority.iter().enumerate() {
155 let authority_index = context
156 .committee
157 .to_authority_index(index)
158 .expect("Should be a valid AuthorityIndex");
159 let authority = context.committee.authority(authority_index);
160 if !authority.hostname.is_empty() {
161 context
162 .metrics
163 .node_metrics
164 .reputation_scores
165 .with_label_values(&[&authority.hostname])
166 .set(*score as i64);
167 }
168 }
169 }
170}
171
172pub(crate) struct ScoringSubdag {
180 pub(crate) context: Arc<Context>,
181 pub(crate) commit_range: Option<CommitRange>,
182 pub(crate) leaders: HashSet<BlockRef>,
185 pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
189}
190
191impl ScoringSubdag {
192 pub(crate) fn new(context: Arc<Context>) -> Self {
193 Self {
194 context,
195 commit_range: None,
196 leaders: HashSet::new(),
197 votes: BTreeMap::new(),
198 }
199 }
200
201 #[instrument(level = "trace", skip_all)]
202 pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
203 let _s = self
204 .context
205 .metrics
206 .node_metrics
207 .scope_processing_time
208 .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
209 .start_timer();
210 for subdag in committed_subdags {
211 if self.commit_range.is_none() {
214 self.commit_range = Some(CommitRange::new(
215 subdag.commit_ref.index..=subdag.commit_ref.index,
216 ));
217 } else {
218 let commit_range = self.commit_range.as_mut().unwrap();
219 commit_range.extend_to(subdag.commit_ref.index);
220 }
221 tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
223 self.leaders.insert(subdag.leader);
224 for block in subdag.blocks {
227 for ancestor in block.ancestors() {
228 if ancestor.round != block.round().saturating_sub(1) {
231 continue;
232 }
233 if self.leaders.contains(ancestor) {
236 tracing::trace!(
239 "Found a vote {} for leader {ancestor} from authority {}",
240 block.reference(),
241 block.author()
242 );
243 assert!(
244 self.votes
245 .insert(block.reference(), StakeAggregator::new())
246 .is_none(),
247 "Vote {block} already exists. Duplicate vote found for leader {ancestor}"
248 );
249 }
250 if let Some(stake) = self.votes.get_mut(ancestor) {
251 tracing::trace!(
254 "Found a distributed vote {ancestor} from authority {}",
255 ancestor.author
256 );
257 stake.add(block.author(), &self.context.committee);
258 }
259 }
260 }
261 }
262 }
263
264 pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
267 let scores_per_authority = self.distributed_votes_scores();
268
269 ReputationScores::new(
271 self.commit_range
272 .clone()
273 .expect("CommitRange should be set if calculate_scores is called."),
274 scores_per_authority,
275 )
276 }
277
278 fn distributed_votes_scores(&self) -> Vec<u64> {
283 let _s = self
284 .context
285 .metrics
286 .node_metrics
287 .scope_processing_time
288 .with_label_values(&["ScoringSubdag::score_distributed_votes"])
289 .start_timer();
290
291 let num_authorities = self.context.committee.size();
292 let mut scores_per_authority = vec![0_u64; num_authorities];
293
294 for (vote, stake_agg) in self.votes.iter() {
295 let authority = vote.author;
296 let stake = stake_agg.stake();
297 tracing::trace!(
298 "[{}] scores +{stake} reputation for {authority}!",
299 self.context.own_index,
300 );
301 scores_per_authority[authority.value()] += stake;
302 }
303 scores_per_authority
304 }
305
306 pub(crate) fn scored_subdags_count(&self) -> usize {
307 if let Some(commit_range) = &self.commit_range {
308 commit_range.size()
309 } else {
310 0
311 }
312 }
313
314 pub(crate) fn is_empty(&self) -> bool {
315 self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
316 }
317
318 pub(crate) fn clear(&mut self) {
319 self.leaders.clear();
320 self.votes.clear();
321 self.commit_range = None;
322 }
323}
324
325pub(crate) struct UnscoredSubdag {
332 pub(crate) context: Arc<Context>,
333 pub(crate) commit_range: CommitRange,
334 pub(crate) committed_leaders: Vec<BlockRef>,
335 pub(crate) blocks: BTreeMap<BlockRef, VerifiedBlock>,
340}
341
342impl UnscoredSubdag {
343 pub(crate) fn new(context: Arc<Context>, subdags: &[CommittedSubDag]) -> Self {
344 let mut committed_leaders = vec![];
345 let blocks = subdags
346 .iter()
347 .enumerate()
348 .flat_map(|(subdag_index, subdag)| {
349 committed_leaders.push(subdag.leader);
350 if subdag_index == 0 {
351 subdag.blocks.iter()
352 } else {
353 let previous_subdag = &subdags[subdag_index - 1];
354 let expected_next_subdag_index = previous_subdag.commit_ref.index + 1;
355 assert_eq!(
356 subdag.commit_ref.index, expected_next_subdag_index,
357 "Non-contiguous commit index (expected: {}, found: {})",
358 expected_next_subdag_index, subdag.commit_ref.index
359 );
360 subdag.blocks.iter()
361 }
362 })
363 .map(|block| (block.reference(), block.clone()))
364 .collect::<BTreeMap<_, _>>();
365
366 let commit_range = CommitRange::new(
368 subdags.first().unwrap().commit_ref.index..=subdags.last().unwrap().commit_ref.index,
369 );
370
371 assert!(
372 !blocks.is_empty(),
373 "Attempted to create UnscoredSubdag with no blocks"
374 );
375
376 Self {
377 context,
378 commit_range,
379 committed_leaders,
380 blocks,
381 }
382 }
383
384 pub(crate) fn find_supported_leader_block(
385 &self,
386 leader_slot: Slot,
387 from: &VerifiedBlock,
388 ) -> Option<BlockRef> {
389 if from.round() < leader_slot.round {
390 return None;
391 }
392 for ancestor in from.ancestors() {
393 if Slot::from(*ancestor) == leader_slot {
394 return Some(*ancestor);
395 }
396 if ancestor.round <= leader_slot.round {
398 continue;
399 }
400 if let Some(ancestor) = self.get_block(ancestor) {
401 if let Some(support) = self.find_supported_leader_block(leader_slot, &ancestor) {
402 return Some(support);
403 }
404 } else {
405 tracing::trace!(
407 "Potential vote's ancestor block not found in unscored committed subdags: {:?}",
408 ancestor
409 );
410 return None;
411 }
412 }
413 None
414 }
415
416 pub(crate) fn is_vote(
417 &self,
418 potential_vote: &VerifiedBlock,
419 leader_block: &VerifiedBlock,
420 ) -> bool {
421 let reference = leader_block.reference();
422 let leader_slot = Slot::from(reference);
423 self.find_supported_leader_block(leader_slot, potential_vote) == Some(reference)
424 }
425
426 pub(crate) fn get_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
427 let mut blocks = vec![];
428 for (_block_ref, block) in self.blocks.range((
429 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
430 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
431 )) {
432 blocks.push(block.clone())
433 }
434 blocks
435 }
436
437 pub(crate) fn get_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
438 let mut blocks = vec![];
439 for (_block_ref, block) in self.blocks.range((
440 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
441 Excluded(BlockRef::new(
442 round + 1,
443 AuthorityIndex::ZERO,
444 BlockDigest::MIN,
445 )),
446 )) {
447 blocks.push(block.clone())
448 }
449 blocks
450 }
451
452 pub(crate) fn get_block(&self, block_ref: &BlockRef) -> Option<VerifiedBlock> {
453 self.blocks.get(block_ref).cloned()
454 }
455}
456
457#[cfg(test)]
458mod tests {
459
460 use super::*;
461 use crate::{CommitDigest, CommitRef, test_dag_builder::DagBuilder};
462
463 #[tokio::test]
464 async fn test_reputation_scores_authorities_by_score() {
465 let context = Arc::new(Context::new_for_test(4).0);
466 let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
467 let authorities = scores.authorities_by_score(context);
468 assert_eq!(
469 authorities,
470 vec![
471 (AuthorityIndex::new_for_test(0), 4),
472 (AuthorityIndex::new_for_test(1), 1),
473 (AuthorityIndex::new_for_test(2), 1),
474 (AuthorityIndex::new_for_test(3), 3),
475 ]
476 );
477 }
478
479 #[tokio::test]
480 async fn test_reputation_scores_update_metrics() {
481 let context = Arc::new(Context::new_for_test(4).0);
482 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
483 scores.update_metrics(context.clone());
484 let metrics = context.metrics.node_metrics.reputation_scores.clone();
485 assert_eq!(
486 metrics
487 .get_metric_with_label_values(&["test_host_0"])
488 .unwrap()
489 .get(),
490 1
491 );
492 assert_eq!(
493 metrics
494 .get_metric_with_label_values(&["test_host_1"])
495 .unwrap()
496 .get(),
497 2
498 );
499 assert_eq!(
500 metrics
501 .get_metric_with_label_values(&["test_host_2"])
502 .unwrap()
503 .get(),
504 4
505 );
506 assert_eq!(
507 metrics
508 .get_metric_with_label_values(&["test_host_3"])
509 .unwrap()
510 .get(),
511 3
512 );
513 }
514
515 #[tokio::test]
516 async fn test_scoring_subdag() {
517 telemetry_subscribers::init_for_testing();
518 let context = Arc::new(Context::new_for_test(4).0);
519 let mut dag_builder = DagBuilder::new(context.clone());
521 dag_builder.layers(1..=3).build();
522 dag_builder
524 .layer(4)
525 .authorities(vec![
526 AuthorityIndex::new_for_test(1),
527 AuthorityIndex::new_for_test(2),
528 AuthorityIndex::new_for_test(3),
529 ])
530 .skip_block()
531 .build();
532
533 let mut scoring_subdag = ScoringSubdag::new(context.clone());
534
535 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
536 scoring_subdag.add_subdags(vec![sub_dag]);
537 }
538
539 let scores = scoring_subdag.calculate_distributed_vote_scores();
540 assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
541 assert_eq!(scores.commit_range, (1..=4).into());
542 }
543
544 #[tokio::test]
546 async fn test_reputation_score_calculator() {
547 telemetry_subscribers::init_for_testing();
548 let context = Arc::new(Context::new_for_test(4).0);
549
550 let mut dag_builder = DagBuilder::new(context.clone());
552 dag_builder.layers(1..=3).build();
553 dag_builder
555 .layer(4)
556 .authorities(vec![
557 AuthorityIndex::new_for_test(1),
558 AuthorityIndex::new_for_test(2),
559 AuthorityIndex::new_for_test(3),
560 ])
561 .skip_block()
562 .build();
563
564 let mut unscored_subdags = vec![];
565 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
566 unscored_subdags.push(sub_dag);
567 }
568 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
569 let scores = calculator.calculate();
570 assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
571 assert_eq!(scores.commit_range, (1..=4).into());
572 }
573
574 #[tokio::test]
575 #[should_panic(expected = "Attempted to calculate scores with no unscored subdags")]
576 async fn test_reputation_score_calculator_no_subdags() {
577 telemetry_subscribers::init_for_testing();
578 let context = Arc::new(Context::new_for_test(4).0);
579
580 let unscored_subdags = vec![];
581 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
582 calculator.calculate();
583 }
584
585 #[tokio::test]
586 #[should_panic(expected = "Attempted to create UnscoredSubdag with no blocks")]
587 async fn test_reputation_score_calculator_no_subdag_blocks() {
588 telemetry_subscribers::init_for_testing();
589 let context = Arc::new(Context::new_for_test(4).0);
590
591 let blocks = vec![];
592 let unscored_subdags = vec![CommittedSubDag::new(
593 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
594 blocks,
595 context.clock.timestamp_utc_ms(),
596 CommitRef::new(1, CommitDigest::MIN),
597 vec![],
598 )];
599 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
600 calculator.calculate();
601 }
602
603 #[tokio::test]
604 async fn test_scoring_with_missing_block_in_subdag() {
605 telemetry_subscribers::init_for_testing();
606 let context = Arc::new(Context::new_for_test(4).0);
607
608 let mut dag_builder = DagBuilder::new(context.clone());
609 dag_builder
612 .layer(1)
613 .authorities(vec![AuthorityIndex::new_for_test(0)])
614 .skip_block()
615 .build();
616 dag_builder.layers(2..=3).build();
618 dag_builder
620 .layer(4)
621 .authorities(vec![
622 AuthorityIndex::new_for_test(1),
623 AuthorityIndex::new_for_test(2),
624 AuthorityIndex::new_for_test(3),
625 ])
626 .skip_block()
627 .build();
628
629 let mut unscored_subdags = vec![];
630 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
631 unscored_subdags.push(sub_dag);
632 }
633
634 let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
635 let scores = calculator.calculate();
636 assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
637 assert_eq!(scores.commit_range, (1..=4).into());
638 }
639}