consensus_core/
leader_scoring.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashSet},
7    fmt::Debug,
8    ops::Bound::{Excluded, Included},
9    sync::Arc,
10};
11
12use consensus_config::AuthorityIndex;
13use serde::{Deserialize, Serialize};
14use tracing::instrument;
15
16use crate::{
17    Round, VerifiedBlock,
18    block::{BlockAPI, BlockDigest, BlockRef, Slot},
19    commit::{CommitRange, CommittedSubDag},
20    context::Context,
21    stake_aggregator::{QuorumThreshold, StakeAggregator},
22};
23
24pub(crate) struct ReputationScoreCalculator {
25    // The range of commits that these scores are calculated from.
26    pub(crate) commit_range: CommitRange,
27    // The scores per authority. Vec index is the `AuthorityIndex`.
28    pub(crate) scores_per_authority: Vec<u64>,
29
30    // As leaders are sequenced the subdags are collected and cached in `DagState`.
31    // Then when there are enough commits to trigger a `LeaderSchedule` change,
32    // the subdags are then combined into one `UnscoredSubdag` so that we can
33    // calculate the scores for the leaders in this subdag.
34    unscored_subdag: UnscoredSubdag,
35}
36
37impl ReputationScoreCalculator {
38    pub(crate) fn new(context: Arc<Context>, unscored_subdags: &[CommittedSubDag]) -> Self {
39        let num_authorities = context.committee.size();
40        let scores_per_authority = vec![0_u64; num_authorities];
41
42        assert!(
43            !unscored_subdags.is_empty(),
44            "Attempted to calculate scores with no unscored subdags"
45        );
46
47        let unscored_subdag = UnscoredSubdag::new(context.clone(), unscored_subdags);
48        let commit_range = unscored_subdag.commit_range.clone();
49
50        Self {
51            unscored_subdag,
52            commit_range,
53            scores_per_authority,
54        }
55    }
56
57    pub(crate) fn calculate(&mut self) -> ReputationScores {
58        let leaders = self.unscored_subdag.committed_leaders.clone();
59        for leader in leaders {
60            let leader_slot = Slot::from(leader);
61            tracing::trace!("Calculating score for leader {leader_slot}");
62            self.add_scores(self.calculate_scores_for_leader(&self.unscored_subdag, leader_slot));
63        }
64
65        ReputationScores::new(self.commit_range.clone(), self.scores_per_authority.clone())
66    }
67
68    fn add_scores(&mut self, scores: Vec<u64>) {
69        assert_eq!(scores.len(), self.scores_per_authority.len());
70
71        for (authority_idx, score) in scores.iter().enumerate() {
72            self.scores_per_authority[authority_idx] += *score;
73        }
74    }
75
76    // VoteScoringStrategy
77    // This scoring strategy will give one point to any votes for the leader.
78    fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec<u64> {
79        let num_authorities = subdag.context.committee.size();
80        let mut scores_per_authority = vec![0_u64; num_authorities];
81        let leader_blocks = subdag.get_blocks_at_slot(leader_slot);
82        if leader_blocks.is_empty() {
83            tracing::trace!(
84                "[{}] No block for leader slot {leader_slot} in this set of unscored committed subdags, skip scoring",
85                subdag.context.own_index
86            );
87            return scores_per_authority;
88        }
89        // At this point we are guaranteed that there is only one leader per slot
90        // because we are operating on committed subdags.
91        assert!(leader_blocks.len() == 1);
92        let leader_block = leader_blocks.first().unwrap();
93        let voting_round = leader_slot.round + 1;
94        let voting_blocks = subdag.get_blocks_at_round(voting_round);
95        for potential_vote in voting_blocks {
96            // TODO: use the decided leader as input instead of leader slot. If the leader
97            // was skipped, votes to skip should be included in the score as
98            // well.
99            if subdag.is_vote(&potential_vote, leader_block) {
100                let authority = potential_vote.author();
101                tracing::trace!(
102                    "Found a vote {} for leader {leader_block} from authority {authority}",
103                    potential_vote.reference()
104                );
105                tracing::trace!(
106                    "[{}] scores +1 reputation for {authority}!",
107                    subdag.context.own_index
108                );
109                scores_per_authority[authority] += 1;
110            }
111        }
112        scores_per_authority
113    }
114}
115
116#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
117pub(crate) struct ReputationScores {
118    /// Score per authority. Vec index is the `AuthorityIndex`.
119    pub(crate) scores_per_authority: Vec<u64>,
120    // The range of commits these scores were calculated from.
121    pub(crate) commit_range: CommitRange,
122}
123
124impl ReputationScores {
125    pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
126        Self {
127            scores_per_authority,
128            commit_range,
129        }
130    }
131
132    pub(crate) fn highest_score(&self) -> u64 {
133        *self.scores_per_authority.iter().max().unwrap_or(&0)
134    }
135
136    // Returns the authorities index with score tuples.
137    pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
138        self.scores_per_authority
139            .iter()
140            .enumerate()
141            .map(|(index, score)| {
142                (
143                    context
144                        .committee
145                        .to_authority_index(index)
146                        .expect("Should be a valid AuthorityIndex"),
147                    *score,
148                )
149            })
150            .collect()
151    }
152
153    pub(crate) fn update_metrics(&self, context: Arc<Context>) {
154        for (index, score) in self.scores_per_authority.iter().enumerate() {
155            let authority_index = context
156                .committee
157                .to_authority_index(index)
158                .expect("Should be a valid AuthorityIndex");
159            let authority = context.committee.authority(authority_index);
160            if !authority.hostname.is_empty() {
161                context
162                    .metrics
163                    .node_metrics
164                    .reputation_scores
165                    .with_label_values(&[&authority.hostname])
166                    .set(*score as i64);
167            }
168        }
169    }
170}
171
172/// ScoringSubdag represents the scoring votes in a collection of subdags across
173/// multiple commits.
174/// These subdags are "scoring" for the purposes of leader schedule change. As
175/// new subdags are added, the DAG is traversed and votes for leaders are
176/// recorded and scored along with stake. On a leader schedule change, finalized
177/// reputation scores will be calculated based on the votes & stake collected in
178/// this struct.
179pub(crate) struct ScoringSubdag {
180    pub(crate) context: Arc<Context>,
181    pub(crate) commit_range: Option<CommitRange>,
182    // Only includes committed leaders for now.
183    // TODO: Include skipped leaders as well
184    pub(crate) leaders: HashSet<BlockRef>,
185    // A map of votes to the stake of strongly linked blocks that include that vote
186    // Note: Including stake aggregator so that we can quickly check if it exceeds
187    // quourum threshold and only include those scores for certain scoring strategies.
188    pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
189}
190
191impl ScoringSubdag {
192    pub(crate) fn new(context: Arc<Context>) -> Self {
193        Self {
194            context,
195            commit_range: None,
196            leaders: HashSet::new(),
197            votes: BTreeMap::new(),
198        }
199    }
200
201    #[instrument(level = "trace", skip_all)]
202    pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
203        let _s = self
204            .context
205            .metrics
206            .node_metrics
207            .scope_processing_time
208            .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
209            .start_timer();
210        for subdag in committed_subdags {
211            // If the commit range is not set, then set it to the range of the first
212            // committed subdag index.
213            if self.commit_range.is_none() {
214                self.commit_range = Some(CommitRange::new(
215                    subdag.commit_ref.index..=subdag.commit_ref.index,
216                ));
217            } else {
218                let commit_range = self.commit_range.as_mut().unwrap();
219                commit_range.extend_to(subdag.commit_ref.index);
220            }
221            // Add the committed leader to the list of leaders we will be scoring.
222            tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
223            self.leaders.insert(subdag.leader);
224            // Check each block in subdag. Blocks are in order so we should traverse the
225            // oldest blocks first
226            for block in subdag.blocks {
227                for ancestor in block.ancestors() {
228                    // Weak links may point to blocks with lower round numbers
229                    // than strong links.
230                    if ancestor.round != block.round().saturating_sub(1) {
231                        continue;
232                    }
233                    // If a blocks strong linked ancestor is in leaders, then
234                    // it's a vote for leader.
235                    if self.leaders.contains(ancestor) {
236                        // There should never be duplicate references to blocks
237                        // with strong linked ancestors to leader.
238                        tracing::trace!(
239                            "Found a vote {} for leader {ancestor} from authority {}",
240                            block.reference(),
241                            block.author()
242                        );
243                        assert!(
244                            self.votes
245                                .insert(block.reference(), StakeAggregator::new())
246                                .is_none(),
247                            "Vote {block} already exists. Duplicate vote found for leader {ancestor}"
248                        );
249                    }
250                    if let Some(stake) = self.votes.get_mut(ancestor) {
251                        // Vote is strongly linked to a future block, so we
252                        // consider this a distributed vote.
253                        tracing::trace!(
254                            "Found a distributed vote {ancestor} from authority {}",
255                            ancestor.author
256                        );
257                        stake.add(block.author(), &self.context.committee);
258                    }
259                }
260            }
261        }
262    }
263
264    // Iterate through votes and calculate scores for each authority based on
265    // distributed vote scoring strategy.
266    pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
267        let scores_per_authority = self.distributed_votes_scores();
268
269        // TODO: Normalize scores
270        ReputationScores::new(
271            self.commit_range
272                .clone()
273                .expect("CommitRange should be set if calculate_scores is called."),
274            scores_per_authority,
275        )
276    }
277
278    /// This scoring strategy aims to give scores based on overall vote
279    /// distribution. Instead of only giving one point for each vote that is
280    /// included in 2f+1 blocks. We give a score equal to the amount of
281    /// stake of all blocks that included the vote.
282    fn distributed_votes_scores(&self) -> Vec<u64> {
283        let _s = self
284            .context
285            .metrics
286            .node_metrics
287            .scope_processing_time
288            .with_label_values(&["ScoringSubdag::score_distributed_votes"])
289            .start_timer();
290
291        let num_authorities = self.context.committee.size();
292        let mut scores_per_authority = vec![0_u64; num_authorities];
293
294        for (vote, stake_agg) in self.votes.iter() {
295            let authority = vote.author;
296            let stake = stake_agg.stake();
297            tracing::trace!(
298                "[{}] scores +{stake} reputation for {authority}!",
299                self.context.own_index,
300            );
301            scores_per_authority[authority.value()] += stake;
302        }
303        scores_per_authority
304    }
305
306    pub(crate) fn scored_subdags_count(&self) -> usize {
307        if let Some(commit_range) = &self.commit_range {
308            commit_range.size()
309        } else {
310            0
311        }
312    }
313
314    pub(crate) fn is_empty(&self) -> bool {
315        self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
316    }
317
318    pub(crate) fn clear(&mut self) {
319        self.leaders.clear();
320        self.votes.clear();
321        self.commit_range = None;
322    }
323}
324
325/// UnscoredSubdag represents a collection of subdags across multiple commits.
326/// These subdags are considered unscored for the purposes of leader schedule
327/// change. On a leader schedule change, reputation scores will be calculated
328/// based on the dags collected in this struct. Similar graph traversal methods
329/// that are provided in DagState are also added here to help calculate the
330/// scores.
331pub(crate) struct UnscoredSubdag {
332    pub(crate) context: Arc<Context>,
333    pub(crate) commit_range: CommitRange,
334    pub(crate) committed_leaders: Vec<BlockRef>,
335    // When the blocks are collected form the list of provided subdags we ensure
336    // that the CommittedSubDag instances are contiguous in commit index order.
337    // Therefore we can guarantee the blocks of UnscoredSubdag are also sorted
338    // via the commit index.
339    pub(crate) blocks: BTreeMap<BlockRef, VerifiedBlock>,
340}
341
342impl UnscoredSubdag {
343    pub(crate) fn new(context: Arc<Context>, subdags: &[CommittedSubDag]) -> Self {
344        let mut committed_leaders = vec![];
345        let blocks = subdags
346            .iter()
347            .enumerate()
348            .flat_map(|(subdag_index, subdag)| {
349                committed_leaders.push(subdag.leader);
350                if subdag_index == 0 {
351                    subdag.blocks.iter()
352                } else {
353                    let previous_subdag = &subdags[subdag_index - 1];
354                    let expected_next_subdag_index = previous_subdag.commit_ref.index + 1;
355                    assert_eq!(
356                        subdag.commit_ref.index, expected_next_subdag_index,
357                        "Non-contiguous commit index (expected: {}, found: {})",
358                        expected_next_subdag_index, subdag.commit_ref.index
359                    );
360                    subdag.blocks.iter()
361                }
362            })
363            .map(|block| (block.reference(), block.clone()))
364            .collect::<BTreeMap<_, _>>();
365
366        // Guaranteed to have a contiguous list of commit indices
367        let commit_range = CommitRange::new(
368            subdags.first().unwrap().commit_ref.index..=subdags.last().unwrap().commit_ref.index,
369        );
370
371        assert!(
372            !blocks.is_empty(),
373            "Attempted to create UnscoredSubdag with no blocks"
374        );
375
376        Self {
377            context,
378            commit_range,
379            committed_leaders,
380            blocks,
381        }
382    }
383
384    pub(crate) fn find_supported_leader_block(
385        &self,
386        leader_slot: Slot,
387        from: &VerifiedBlock,
388    ) -> Option<BlockRef> {
389        if from.round() < leader_slot.round {
390            return None;
391        }
392        for ancestor in from.ancestors() {
393            if Slot::from(*ancestor) == leader_slot {
394                return Some(*ancestor);
395            }
396            // Weak links may point to blocks with lower round numbers than strong links.
397            if ancestor.round <= leader_slot.round {
398                continue;
399            }
400            if let Some(ancestor) = self.get_block(ancestor) {
401                if let Some(support) = self.find_supported_leader_block(leader_slot, &ancestor) {
402                    return Some(support);
403                }
404            } else {
405                // TODO: Add unit test for this case once dagbuilder is ready.
406                tracing::trace!(
407                    "Potential vote's ancestor block not found in unscored committed subdags: {:?}",
408                    ancestor
409                );
410                return None;
411            }
412        }
413        None
414    }
415
416    pub(crate) fn is_vote(
417        &self,
418        potential_vote: &VerifiedBlock,
419        leader_block: &VerifiedBlock,
420    ) -> bool {
421        let reference = leader_block.reference();
422        let leader_slot = Slot::from(reference);
423        self.find_supported_leader_block(leader_slot, potential_vote) == Some(reference)
424    }
425
426    pub(crate) fn get_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
427        let mut blocks = vec![];
428        for (_block_ref, block) in self.blocks.range((
429            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
430            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
431        )) {
432            blocks.push(block.clone())
433        }
434        blocks
435    }
436
437    pub(crate) fn get_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
438        let mut blocks = vec![];
439        for (_block_ref, block) in self.blocks.range((
440            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
441            Excluded(BlockRef::new(
442                round + 1,
443                AuthorityIndex::ZERO,
444                BlockDigest::MIN,
445            )),
446        )) {
447            blocks.push(block.clone())
448        }
449        blocks
450    }
451
452    pub(crate) fn get_block(&self, block_ref: &BlockRef) -> Option<VerifiedBlock> {
453        self.blocks.get(block_ref).cloned()
454    }
455}
456
457#[cfg(test)]
458mod tests {
459
460    use super::*;
461    use crate::{CommitDigest, CommitRef, test_dag_builder::DagBuilder};
462
463    #[tokio::test]
464    async fn test_reputation_scores_authorities_by_score() {
465        let context = Arc::new(Context::new_for_test(4).0);
466        let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
467        let authorities = scores.authorities_by_score(context);
468        assert_eq!(
469            authorities,
470            vec![
471                (AuthorityIndex::new_for_test(0), 4),
472                (AuthorityIndex::new_for_test(1), 1),
473                (AuthorityIndex::new_for_test(2), 1),
474                (AuthorityIndex::new_for_test(3), 3),
475            ]
476        );
477    }
478
479    #[tokio::test]
480    async fn test_reputation_scores_update_metrics() {
481        let context = Arc::new(Context::new_for_test(4).0);
482        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
483        scores.update_metrics(context.clone());
484        let metrics = context.metrics.node_metrics.reputation_scores.clone();
485        assert_eq!(
486            metrics
487                .get_metric_with_label_values(&["test_host_0"])
488                .unwrap()
489                .get(),
490            1
491        );
492        assert_eq!(
493            metrics
494                .get_metric_with_label_values(&["test_host_1"])
495                .unwrap()
496                .get(),
497            2
498        );
499        assert_eq!(
500            metrics
501                .get_metric_with_label_values(&["test_host_2"])
502                .unwrap()
503                .get(),
504            4
505        );
506        assert_eq!(
507            metrics
508                .get_metric_with_label_values(&["test_host_3"])
509                .unwrap()
510                .get(),
511            3
512        );
513    }
514
515    #[tokio::test]
516    async fn test_scoring_subdag() {
517        telemetry_subscribers::init_for_testing();
518        let context = Arc::new(Context::new_for_test(4).0);
519        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
520        let mut dag_builder = DagBuilder::new(context.clone());
521        dag_builder.layers(1..=3).build();
522        // Build round 4 but with just the leader block
523        dag_builder
524            .layer(4)
525            .authorities(vec![
526                AuthorityIndex::new_for_test(1),
527                AuthorityIndex::new_for_test(2),
528                AuthorityIndex::new_for_test(3),
529            ])
530            .skip_block()
531            .build();
532
533        let mut scoring_subdag = ScoringSubdag::new(context.clone());
534
535        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
536            scoring_subdag.add_subdags(vec![sub_dag]);
537        }
538
539        let scores = scoring_subdag.calculate_distributed_vote_scores();
540        assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
541        assert_eq!(scores.commit_range, (1..=4).into());
542    }
543
544    // TODO: Remove all tests below this when DistributedVoteScoring is enabled.
545    #[tokio::test]
546    async fn test_reputation_score_calculator() {
547        telemetry_subscribers::init_for_testing();
548        let context = Arc::new(Context::new_for_test(4).0);
549
550        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
551        let mut dag_builder = DagBuilder::new(context.clone());
552        dag_builder.layers(1..=3).build();
553        // Build round 4 but with just the leader block
554        dag_builder
555            .layer(4)
556            .authorities(vec![
557                AuthorityIndex::new_for_test(1),
558                AuthorityIndex::new_for_test(2),
559                AuthorityIndex::new_for_test(3),
560            ])
561            .skip_block()
562            .build();
563
564        let mut unscored_subdags = vec![];
565        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
566            unscored_subdags.push(sub_dag);
567        }
568        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
569        let scores = calculator.calculate();
570        assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
571        assert_eq!(scores.commit_range, (1..=4).into());
572    }
573
574    #[tokio::test]
575    #[should_panic(expected = "Attempted to calculate scores with no unscored subdags")]
576    async fn test_reputation_score_calculator_no_subdags() {
577        telemetry_subscribers::init_for_testing();
578        let context = Arc::new(Context::new_for_test(4).0);
579
580        let unscored_subdags = vec![];
581        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
582        calculator.calculate();
583    }
584
585    #[tokio::test]
586    #[should_panic(expected = "Attempted to create UnscoredSubdag with no blocks")]
587    async fn test_reputation_score_calculator_no_subdag_blocks() {
588        telemetry_subscribers::init_for_testing();
589        let context = Arc::new(Context::new_for_test(4).0);
590
591        let blocks = vec![];
592        let unscored_subdags = vec![CommittedSubDag::new(
593            BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
594            blocks,
595            context.clock.timestamp_utc_ms(),
596            CommitRef::new(1, CommitDigest::MIN),
597            vec![],
598        )];
599        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
600        calculator.calculate();
601    }
602
603    #[tokio::test]
604    async fn test_scoring_with_missing_block_in_subdag() {
605        telemetry_subscribers::init_for_testing();
606        let context = Arc::new(Context::new_for_test(4).0);
607
608        let mut dag_builder = DagBuilder::new(context.clone());
609        // Build layer 1 with missing leader block, simulating it was committed
610        // as part of another committed subdag.
611        dag_builder
612            .layer(1)
613            .authorities(vec![AuthorityIndex::new_for_test(0)])
614            .skip_block()
615            .build();
616        // Build fully connected layers 2 ~ 3.
617        dag_builder.layers(2..=3).build();
618        // Build round 4 but with just the leader block
619        dag_builder
620            .layer(4)
621            .authorities(vec![
622                AuthorityIndex::new_for_test(1),
623                AuthorityIndex::new_for_test(2),
624                AuthorityIndex::new_for_test(3),
625            ])
626            .skip_block()
627            .build();
628
629        let mut unscored_subdags = vec![];
630        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
631            unscored_subdags.push(sub_dag);
632        }
633
634        let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags);
635        let scores = calculator.calculate();
636        assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]);
637        assert_eq!(scores.commit_range, (1..=4).into());
638    }
639}