consensus_core/
leader_scoring.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashSet},
7    fmt::Debug,
8    ops::Bound::{Excluded, Included},
9    sync::Arc,
10};
11
12use consensus_config::AuthorityIndex;
13use serde::{Deserialize, Serialize};
14
15use crate::{
16    Round, VerifiedBlock,
17    block::{BlockAPI, BlockDigest, BlockRef, Slot},
18    commit::{CommitRange, CommittedSubDag},
19    context::Context,
20    stake_aggregator::{QuorumThreshold, StakeAggregator},
21};
22
23pub(crate) struct ReputationScoreCalculator {
24    // The range of commits that these scores are calculated from.
25    pub(crate) commit_range: CommitRange,
26    // The scores per authority. Vec index is the `AuthorityIndex`.
27    pub(crate) scores_per_authority: Vec<u64>,
28
29    // As leaders are sequenced the subdags are collected and cached in `DagState`.
30    // Then when there are enough commits to trigger a `LeaderSchedule` change,
31    // the subdags are then combined into one `UnscoredSubdag` so that we can
32    // calculate the scores for the leaders in this subdag.
33    unscored_subdag: UnscoredSubdag,
34}
35
36impl ReputationScoreCalculator {
37    pub(crate) fn new(context: Arc<Context>, unscored_subdags: &[CommittedSubDag]) -> Self {
38        let num_authorities = context.committee.size();
39        let scores_per_authority = vec![0_u64; num_authorities];
40
41        assert!(
42            !unscored_subdags.is_empty(),
43            "Attempted to calculate scores with no unscored subdags"
44        );
45
46        let unscored_subdag = UnscoredSubdag::new(context.clone(), unscored_subdags);
47        let commit_range = unscored_subdag.commit_range.clone();
48
49        Self {
50            unscored_subdag,
51            commit_range,
52            scores_per_authority,
53        }
54    }
55
56    pub(crate) fn calculate(&mut self) -> ReputationScores {
57        let leaders = self.unscored_subdag.committed_leaders.clone();
58        for leader in leaders {
59            let leader_slot = Slot::from(leader);
60            tracing::trace!("Calculating score for leader {leader_slot}");
61            self.add_scores(self.calculate_scores_for_leader(&self.unscored_subdag, leader_slot));
62        }
63
64        ReputationScores::new(self.commit_range.clone(), self.scores_per_authority.clone())
65    }
66
67    fn add_scores(&mut self, scores: Vec<u64>) {
68        assert_eq!(scores.len(), self.scores_per_authority.len());
69
70        for (authority_idx, score) in scores.iter().enumerate() {
71            self.scores_per_authority[authority_idx] += *score;
72        }
73    }
74
75    // VoteScoringStrategy
76    // This scoring strategy will give one point to any votes for the leader.
77    fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec<u64> {
78        let num_authorities = subdag.context.committee.size();
79        let mut scores_per_authority = vec![0_u64; num_authorities];
80        let leader_blocks = subdag.get_blocks_at_slot(leader_slot);
81        if leader_blocks.is_empty() {
82            tracing::trace!(
83                "[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring",
84                subdag.context.own_index
85            );
86            return scores_per_authority;
87        }
88        // At this point we are guaranteed that there is only one leader per slot
89        // because we are operating on committed subdags.
90        assert!(leader_blocks.len() == 1);
91        let leader_block = leader_blocks.first().unwrap();
92        let voting_round = leader_slot.round + 1;
93        let voting_blocks = subdag.get_blocks_at_round(voting_round);
94        for potential_vote in voting_blocks {
95            // TODO: use the decided leader as input instead of leader slot. If the leader
96            // was skipped, votes to skip should be included in the score as
97            // well.
98            if subdag.is_vote(&potential_vote, leader_block) {
99                let authority = potential_vote.author();
100                tracing::trace!(
101                    "Found a vote {} for leader {leader_block} from authority {authority}",
102                    potential_vote.reference()
103                );
104                tracing::trace!(
105                    "[{}] scores +1 reputation for {authority}!",
106                    subdag.context.own_index
107                );
108                scores_per_authority[authority] += 1;
109            }
110        }
111        scores_per_authority
112    }
113}
114
115#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
116pub(crate) struct ReputationScores {
117    /// Score per authority. Vec index is the `AuthorityIndex`.
118    pub(crate) scores_per_authority: Vec<u64>,
119    // The range of commits these scores were calculated from.
120    pub(crate) commit_range: CommitRange,
121}
122
123impl ReputationScores {
124    pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
125        Self {
126            scores_per_authority,
127            commit_range,
128        }
129    }
130
131    pub(crate) fn highest_score(&self) -> u64 {
132        *self.scores_per_authority.iter().max().unwrap_or(&0)
133    }
134
135    // Returns the authorities index with score tuples.
136    pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
137        self.scores_per_authority
138            .iter()
139            .enumerate()
140            .map(|(index, score)| {
141                (
142                    context
143                        .committee
144                        .to_authority_index(index)
145                        .expect("Should be a valid AuthorityIndex"),
146                    *score,
147                )
148            })
149            .collect()
150    }
151
152    pub(crate) fn update_metrics(&self, context: Arc<Context>) {
153        for (index, score) in self.scores_per_authority.iter().enumerate() {
154            let authority_index = context
155                .committee
156                .to_authority_index(index)
157                .expect("Should be a valid AuthorityIndex");
158            let authority = context.committee.authority(authority_index);
159            if !authority.hostname.is_empty() {
160                context
161                    .metrics
162                    .node_metrics
163                    .reputation_scores
164                    .with_label_values(&[&authority.hostname])
165                    .set(*score as i64);
166            }
167        }
168    }
169}
170
171/// ScoringSubdag represents the scoring votes in a collection of subdags across
172/// multiple commits.
173/// These subdags are "scoring" for the purposes of leader schedule change. As
174/// new subdags are added, the DAG is traversed and votes for leaders are
175/// recorded and scored along with stake. On a leader schedule change, finalized
176/// reputation scores will be calculated based on the votes & stake collected in
177/// this struct.
178pub(crate) struct ScoringSubdag {
179    pub(crate) context: Arc<Context>,
180    pub(crate) commit_range: Option<CommitRange>,
181    // Only includes committed leaders for now.
182    // TODO: Include skipped leaders as well
183    pub(crate) leaders: HashSet<BlockRef>,
184    // A map of votes to the stake of strongly linked blocks that include that vote
185    // Note: Including stake aggregator so that we can quickly check if it exceeds
186    // quourum threshold and only include those scores for certain scoring strategies.
187    pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
188}
189
190impl ScoringSubdag {
191    pub(crate) fn new(context: Arc<Context>) -> Self {
192        Self {
193            context,
194            commit_range: None,
195            leaders: HashSet::new(),
196            votes: BTreeMap::new(),
197        }
198    }
199
200    pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
201        let _s = self
202            .context
203            .metrics
204            .node_metrics
205            .scope_processing_time
206            .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
207            .start_timer();
208        for subdag in committed_subdags {
209            // If the commit range is not set, then set it to the range of the first
210            // committed subdag index.
211            if self.commit_range.is_none() {
212                self.commit_range = Some(CommitRange::new(
213                    subdag.commit_ref.index..=subdag.commit_ref.index,
214                ));
215            } else {
216                let commit_range = self.commit_range.as_mut().unwrap();
217                commit_range.extend_to(subdag.commit_ref.index);
218            }
219            // Add the committed leader to the list of leaders we will be scoring.
220            tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
221            self.leaders.insert(subdag.leader);
222            // Check each block in subdag. Blocks are in order so we should traverse the
223            // oldest blocks first
224            for block in subdag.blocks {
225                for ancestor in block.ancestors() {
226                    // Weak links may point to blocks with lower round numbers
227                    // than strong links.
228                    if ancestor.round != block.round().saturating_sub(1) {
229                        continue;
230                    }
231                    // If a blocks strong linked ancestor is in leaders, then
232                    // it's a vote for leader.
233                    if self.leaders.contains(ancestor) {
234                        // There should never be duplicate references to blocks
235                        // with strong linked ancestors to leader.
236                        tracing::trace!(
237                            "Found a vote {} for leader {ancestor} from authority {}",
238                            block.reference(),
239                            block.author()
240                        );
241                        assert!(
242                            self.votes
243                                .insert(block.reference(), StakeAggregator::new())
244                                .is_none(),
245                            "Vote {block} already exists. Duplicate vote found for leader {ancestor}"
246                        );
247                    }
248                    if let Some(stake) = self.votes.get_mut(ancestor) {
249                        // Vote is strongly linked to a future block, so we
250                        // consider this a distributed vote.
251                        tracing::trace!(
252                            "Found a distributed vote {ancestor} from authority {}",
253                            ancestor.author
254                        );
255                        stake.add(block.author(), &self.context.committee);
256                    }
257                }
258            }
259        }
260    }
261
262    // Iterate through votes and calculate scores for each authority based on
263    // distributed vote scoring strategy.
264    pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
265        let scores_per_authority = self.distributed_votes_scores();
266
267        // TODO: Normalize scores
268        ReputationScores::new(
269            self.commit_range
270                .clone()
271                .expect("CommitRange should be set if calculate_scores is called."),
272            scores_per_authority,
273        )
274    }
275
276    /// This scoring strategy aims to give scores based on overall vote
277    /// distribution. Instead of only giving one point for each vote that is
278    /// included in 2f+1 blocks. We give a score equal to the amount of
279    /// stake of all blocks that included the vote.
280    fn distributed_votes_scores(&self) -> Vec<u64> {
281        let _s = self
282            .context
283            .metrics
284            .node_metrics
285            .scope_processing_time
286            .with_label_values(&["ScoringSubdag::score_distributed_votes"])
287            .start_timer();
288
289        let num_authorities = self.context.committee.size();
290        let mut scores_per_authority = vec![0_u64; num_authorities];
291
292        for (vote, stake_agg) in self.votes.iter() {
293            let authority = vote.author;
294            let stake = stake_agg.stake();
295            tracing::trace!(
296                "[{}] scores +{stake} reputation for {authority}!",
297                self.context.own_index,
298            );
299            scores_per_authority[authority.value()] += stake;
300        }
301        scores_per_authority
302    }
303
304    pub(crate) fn scored_subdags_count(&self) -> usize {
305        if let Some(commit_range) = &self.commit_range {
306            commit_range.size()
307        } else {
308            0
309        }
310    }
311
312    pub(crate) fn is_empty(&self) -> bool {
313        self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
314    }
315
316    pub(crate) fn clear(&mut self) {
317        self.leaders.clear();
318        self.votes.clear();
319        self.commit_range = None;
320    }
321}
322
323/// UnscoredSubdag represents a collection of subdags across multiple commits.
324/// These subdags are considered unscored for the purposes of leader schedule
325/// change. On a leader schedule change, reputation scores will be calculated
326/// based on the dags collected in this struct. Similar graph traversal methods
327/// that are provided in DagState are also added here to help calculate the
328/// scores.
329pub(crate) struct UnscoredSubdag {
330    pub(crate) context: Arc<Context>,
331    pub(crate) commit_range: CommitRange,
332    pub(crate) committed_leaders: Vec<BlockRef>,
333    // When the blocks are collected form the list of provided subdags we ensure
334    // that the CommittedSubDag instances are contiguous in commit index order.
335    // Therefore we can guarantee the blocks of UnscoredSubdag are also sorted
336    // via the commit index.
337    pub(crate) blocks: BTreeMap<BlockRef, VerifiedBlock>,
338}
339
340impl UnscoredSubdag {
341    pub(crate) fn new(context: Arc<Context>, subdags: &[CommittedSubDag]) -> Self {
342        let mut committed_leaders = vec![];
343        let blocks = subdags
344            .iter()
345            .enumerate()
346            .flat_map(|(subdag_index, subdag)| {
347                committed_leaders.push(subdag.leader);
348                if subdag_index == 0 {
349                    subdag.blocks.iter()
350                } else {
351                    let previous_subdag = &subdags[subdag_index - 1];
352                    let expected_next_subdag_index = previous_subdag.commit_ref.index + 1;
353                    assert_eq!(
354                        subdag.commit_ref.index, expected_next_subdag_index,
355                        "Non-contiguous commit index (expected: {}, found: {})",
356                        expected_next_subdag_index, subdag.commit_ref.index
357                    );
358                    subdag.blocks.iter()
359                }
360            })
361            .map(|block| (block.reference(), block.clone()))
362            .collect::<BTreeMap<_, _>>();
363
364        // Guaranteed to have a contiguous list of commit indices
365        let commit_range = CommitRange::new(
366            subdags.first().unwrap().commit_ref.index..=subdags.last().unwrap().commit_ref.index,
367        );
368
369        assert!(
370            !blocks.is_empty(),
371            "Attempted to create UnscoredSubdag with no blocks"
372        );
373
374        Self {
375            context,
376            commit_range,
377            committed_leaders,
378            blocks,
379        }
380    }
381
382    pub(crate) fn find_supported_leader_block(
383        &self,
384        leader_slot: Slot,
385        from: &VerifiedBlock,
386    ) -> Option<BlockRef> {
387        if from.round() < leader_slot.round {
388            return None;
389        }
390        for ancestor in from.ancestors() {
391            if Slot::from(*ancestor) == leader_slot {
392                return Some(*ancestor);
393            }
394            // Weak links may point to blocks with lower round numbers than strong links.
395            if ancestor.round <= leader_slot.round {
396                continue;
397            }
398            if let Some(ancestor) = self.get_block(ancestor) {
399                if let Some(support) = self.find_supported_leader_block(leader_slot, &ancestor) {
400                    return Some(support);
401                }
402            } else {
403                // TODO: Add unit test for this case once dagbuilder is ready.
404                tracing::trace!(
405                    "Potential vote's ancestor block not found in unscored committed subdags: {:?}",
406                    ancestor
407                );
408                return None;
409            }
410        }
411        None
412    }
413
414    pub(crate) fn is_vote(
415        &self,
416        potential_vote: &VerifiedBlock,
417        leader_block: &VerifiedBlock,
418    ) -> bool {
419        let reference = leader_block.reference();
420        let leader_slot = Slot::from(reference);
421        self.find_supported_leader_block(leader_slot, potential_vote) == Some(reference)
422    }
423
424    pub(crate) fn get_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
425        let mut blocks = vec![];
426        for (_block_ref, block) in self.blocks.range((
427            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
428            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
429        )) {
430            blocks.push(block.clone())
431        }
432        blocks
433    }
434
435    pub(crate) fn get_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
436        let mut blocks = vec![];
437        for (_block_ref, block) in self.blocks.range((
438            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
439            Excluded(BlockRef::new(
440                round + 1,
441                AuthorityIndex::ZERO,
442                BlockDigest::MIN,
443            )),
444        )) {
445            blocks.push(block.clone())
446        }
447        blocks
448    }
449
450    pub(crate) fn get_block(&self, block_ref: &BlockRef) -> Option<VerifiedBlock> {
451        self.blocks.get(block_ref).cloned()
452    }
453}
454
455#[cfg(test)]
456mod tests {
457
458    use super::*;
459    use crate::{CommitDigest, CommitRef, test_dag_builder::DagBuilder};
460
461    #[tokio::test]
462    async fn test_reputation_scores_authorities_by_score() {
463        let context = Arc::new(Context::new_for_test(4).0);
464        let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
465        let authorities = scores.authorities_by_score(context);
466        assert_eq!(
467            authorities,
468            vec![
469                (AuthorityIndex::new_for_test(0), 4),
470                (AuthorityIndex::new_for_test(1), 1),
471                (AuthorityIndex::new_for_test(2), 1),
472                (AuthorityIndex::new_for_test(3), 3),
473            ]
474        );
475    }
476
477    #[tokio::test]
478    async fn test_reputation_scores_update_metrics() {
479        let context = Arc::new(Context::new_for_test(4).0);
480        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
481        scores.update_metrics(context.clone());
482        let metrics = context.metrics.node_metrics.reputation_scores.clone();
483        assert_eq!(
484            metrics
485                .get_metric_with_label_values(&["test_host_0"])
486                .unwrap()
487                .get(),
488            1
489        );
490        assert_eq!(
491            metrics
492                .get_metric_with_label_values(&["test_host_1"])
493                .unwrap()
494                .get(),
495            2
496        );
497        assert_eq!(
498            metrics
499                .get_metric_with_label_values(&["test_host_2"])
500                .unwrap()
501                .get(),
502            4
503        );
504        assert_eq!(
505            metrics
506                .get_metric_with_label_values(&["test_host_3"])
507                .unwrap()
508                .get(),
509            3
510        );
511    }
512
513    #[tokio::test]
514    async fn test_scoring_subdag() {
515        telemetry_subscribers::init_for_testing();
516        let context = Arc::new(Context::new_for_test(4).0);
517        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
518        let mut dag_builder = DagBuilder::new(context.clone());
519        dag_builder.layers(1..=3).build();
520        // Build round 4 but with just the leader block
521        dag_builder
522            .layer(4)
523            .authorities(vec![
524                AuthorityIndex::new_for_test(1),
525                AuthorityIndex::new_for_test(2),
526                AuthorityIndex::new_for_test(3),
527            ])
528            .skip_block()
529            .build();
530
531        let mut scoring_subdag = ScoringSubdag::new(context.clone());
532
533        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
534            scoring_subdag.add_subdags(vec![sub_dag]);
535        }
536
537        let scores = scoring_subdag.calculate_distributed_vote_scores();
538        assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
539        assert_eq!(scores.commit_range, (1..=4).into());
540    }
541
542    // TODO: Remove all tests below this when DistributedVoteScoring is enabled.
543    #[tokio::test]
544    async fn test_reputation_score_calculator() {
545        telemetry_subscribers::init_for_testing();
546        let context = Arc::new(Context::new_for_test(4).0);
547
548        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
549        let mut dag_builder = DagBuilder::new(context.clone());
550        dag_builder.layers(1..=3).build();
551        // Build round 4 but with just the leader block
552        dag_builder
553            .layer(4)
554            .authorities(vec![
555                AuthorityIndex::new_for_test(1),
556                AuthorityIndex::new_for_test(2),
557                AuthorityIndex::new_for_test(3),
558            ])
559            .skip_block()
560            .build();
561
562        let mut unscored_subdags = vec![];
563        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
564            unscored_subdags.push(sub_dag);
565        }
566        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
567        let scores = calculator.calculate();
568        assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
569        assert_eq!(scores.commit_range, (1..=4).into());
570    }
571
572    #[tokio::test]
573    #[should_panic(expected = "Attempted to calculate scores with no unscored subdags")]
574    async fn test_reputation_score_calculator_no_subdags() {
575        telemetry_subscribers::init_for_testing();
576        let context = Arc::new(Context::new_for_test(4).0);
577
578        let unscored_subdags = vec![];
579        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
580        calculator.calculate();
581    }
582
583    #[tokio::test]
584    #[should_panic(expected = "Attempted to create UnscoredSubdag with no blocks")]
585    async fn test_reputation_score_calculator_no_subdag_blocks() {
586        telemetry_subscribers::init_for_testing();
587        let context = Arc::new(Context::new_for_test(4).0);
588
589        let blocks = vec![];
590        let unscored_subdags = vec![CommittedSubDag::new(
591            BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
592            blocks,
593            context.clock.timestamp_utc_ms(),
594            CommitRef::new(1, CommitDigest::MIN),
595            vec![],
596        )];
597        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
598        calculator.calculate();
599    }
600
601    #[tokio::test]
602    async fn test_scoring_with_missing_block_in_subdag() {
603        telemetry_subscribers::init_for_testing();
604        let context = Arc::new(Context::new_for_test(4).0);
605
606        let mut dag_builder = DagBuilder::new(context.clone());
607        // Build layer 1 with missing leader block, simulating it was committed
608        // as part of another committed subdag.
609        dag_builder
610            .layer(1)
611            .authorities(vec![AuthorityIndex::new_for_test(0)])
612            .skip_block()
613            .build();
614        // Build fully connected layers 2 ~ 3.
615        dag_builder.layers(2..=3).build();
616        // Build round 4 but with just the leader block
617        dag_builder
618            .layer(4)
619            .authorities(vec![
620                AuthorityIndex::new_for_test(1),
621                AuthorityIndex::new_for_test(2),
622                AuthorityIndex::new_for_test(3),
623            ])
624            .skip_block()
625            .build();
626
627        let mut unscored_subdags = vec![];
628        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
629            unscored_subdags.push(sub_dag);
630        }
631
632        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
633        let scores = calculator.calculate();
634        assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
635        assert_eq!(scores.commit_range, (1..=4).into());
636    }
637}