consensus_core/
leader_schedule.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap,
7    fmt::{Debug, Formatter},
8    sync::Arc,
9};
10
11use consensus_config::{AuthorityIndex, Stake};
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16    CommitIndex, Round,
17    commit::CommitRange,
18    context::Context,
19    dag_state::DagState,
20    leader_scoring::{ReputationScoreCalculator, ReputationScores},
21};
22
23/// The `LeaderSchedule` is responsible for producing the leader schedule across
24/// an epoch. The leader schedule is subject to change periodically based on
25/// calculated `ReputationScores` of the authorities.
26#[derive(Clone)]
27pub(crate) struct LeaderSchedule {
28    pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
29    context: Arc<Context>,
30    num_commits_per_schedule: u64,
31}
32
33impl LeaderSchedule {
34    /// The window where the schedule change takes place in consensus. It
35    /// represents number of committed sub dags.
36    /// TODO: move this to protocol config
37    #[cfg(not(msim))]
38    const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
39    #[cfg(msim)]
40    const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
41
42    pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
43        Self {
44            context,
45            num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
46            leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
47        }
48    }
49
50    #[cfg(test)]
51    pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
52        self.num_commits_per_schedule = num_commits_per_schedule;
53        self
54    }
55
56    /// Restores the `LeaderSchedule` from storage. It will attempt to retrieve
57    /// the last stored `ReputationScores` and use them to build a
58    /// `LeaderSwapTable`.
59    pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60        let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
61            LeaderSwapTable::default(),
62            |(last_commit_ref, last_commit_info)| {
63                LeaderSwapTable::new(
64                    context.clone(),
65                    last_commit_ref.index,
66                    last_commit_info.reputation_scores,
67                )
68            },
69        );
70
71        if context
72            .protocol_config
73            .consensus_distributed_vote_scoring_strategy()
74        {
75            tracing::info!(
76                "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
77                dag_state.read().scoring_subdags_count(),
78            );
79        } else {
80            // TODO: Remove when DistributedVoteScoring is enabled.
81            tracing::info!(
82                "LeaderSchedule recovered using {leader_swap_table:?}. There are {} pending unscored subdags in DagState.",
83                dag_state.read().unscored_committed_subdags_count(),
84            );
85        }
86
87        // create the schedule
88        Self::new(context, leader_swap_table)
89    }
90
91    pub(crate) fn commits_until_leader_schedule_update(
92        &self,
93        dag_state: Arc<RwLock<DagState>>,
94    ) -> usize {
95        let subdag_count = if self
96            .context
97            .protocol_config
98            .consensus_distributed_vote_scoring_strategy()
99        {
100            dag_state.read().scoring_subdags_count() as u64
101        } else {
102            // TODO: Remove when DistributedVoteScoring is enabled.
103            dag_state.read().unscored_committed_subdags_count()
104        };
105        assert!(
106            subdag_count <= self.num_commits_per_schedule,
107            "Committed subdags count exceeds the number of commits per schedule"
108        );
109        self.num_commits_per_schedule
110            .checked_sub(subdag_count)
111            .unwrap() as usize
112    }
113
114    /// Checks whether the dag state sub dags list is empty. If yes
115    /// then that means that either (1) the system has just started and
116    /// there is no unscored sub dag available (2) the schedule has updated,
117    /// new scores have been calculated. Both cases we consider as valid cases
118    /// where the schedule has been updated.
119    pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
120        if self
121            .context
122            .protocol_config
123            .consensus_distributed_vote_scoring_strategy()
124        {
125            dag_state.read().is_scoring_subdag_empty()
126        } else {
127            // TODO: Remove when DistributedVoteScoring is enabled.
128            dag_state.read().unscored_committed_subdags_count() == 0
129        }
130    }
131
132    pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
133        let _s = self
134            .context
135            .metrics
136            .node_metrics
137            .scope_processing_time
138            .with_label_values(&["LeaderSchedule::update_leader_schedule"])
139            .start_timer();
140        let (reputation_scores, last_commit_index) = {
141            let dag_state = dag_state.read();
142            let reputation_scores = dag_state.calculate_scoring_subdag_scores();
143            let last_commit_index = dag_state.scoring_subdag_commit_range();
144            (reputation_scores, last_commit_index)
145        };
146        {
147            let mut dag_state = dag_state.write();
148            // Clear scoring subdag as we have updated the leader schedule
149            dag_state.clear_scoring_subdag();
150            // Buffer score and last commit rounds in dag state to be persisted later
151            dag_state.add_commit_info(reputation_scores.clone());
152        }
153        self.update_leader_swap_table(LeaderSwapTable::new(
154            self.context.clone(),
155            last_commit_index,
156            reputation_scores.clone(),
157        ));
158        reputation_scores.update_metrics(self.context.clone());
159        self.context
160            .metrics
161            .node_metrics
162            .num_of_bad_nodes
163            .set(self.leader_swap_table.read().bad_nodes.len() as i64);
164    }
165
166    // TODO: Remove when DistributedVoteScoring is enabled.
167    pub(crate) fn update_leader_schedule_v1(&self, dag_state: &RwLock<DagState>) {
168        let _s = self
169            .context
170            .metrics
171            .node_metrics
172            .scope_processing_time
173            .with_label_values(&["LeaderSchedule::update_leader_schedule"])
174            .start_timer();
175
176        let mut dag_state = dag_state.write();
177        let unscored_subdags = dag_state.take_unscored_committed_subdags();
178
179        let score_calculation_timer = self
180            .context
181            .metrics
182            .node_metrics
183            .scope_processing_time
184            .with_label_values(&["ReputationScoreCalculator::calculate"])
185            .start_timer();
186        let reputation_scores =
187            ReputationScoreCalculator::new(self.context.clone(), &unscored_subdags).calculate();
188        drop(score_calculation_timer);
189
190        reputation_scores.update_metrics(self.context.clone());
191
192        let last_commit_index = unscored_subdags.last().unwrap().commit_ref.index;
193        self.update_leader_swap_table(LeaderSwapTable::new(
194            self.context.clone(),
195            last_commit_index,
196            reputation_scores.clone(),
197        ));
198
199        self.context
200            .metrics
201            .node_metrics
202            .num_of_bad_nodes
203            .set(self.leader_swap_table.read().bad_nodes.len() as i64);
204
205        // Buffer score and last commit rounds in dag state to be persisted later
206        dag_state.add_commit_info(reputation_scores);
207    }
208
209    pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
210        cfg_if::cfg_if! {
211            // TODO: we need to differentiate the leader strategy in tests, so for
212            // some type of testing (ex sim tests) we can use the staked approach.
213            if #[cfg(test)] {
214                let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
215                let table = self.leader_swap_table.read();
216                table.swap(leader, round, leader_offset).unwrap_or(leader)
217            } else {
218                let leader = self.elect_leader_stake_based(round, leader_offset);
219                let table = self.leader_swap_table.read();
220                table.swap(leader, round, leader_offset).unwrap_or(leader)
221            }
222        }
223    }
224
225    pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
226        assert!((offset as usize) < self.context.committee.size());
227
228        // To ensure that we elect different leaders for the same round (using
229        // different offset) we are using the round number as seed to shuffle in
230        // a weighted way the results, but skip based on the offset.
231        // TODO: use a cache in case this proves to be computationally expensive
232        let mut seed_bytes = [0u8; 32];
233        seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
234        let mut rng = StdRng::from_seed(seed_bytes);
235
236        let choices = self
237            .context
238            .committee
239            .authorities()
240            .map(|(index, authority)| (index, authority.stake as f32))
241            .collect::<Vec<_>>();
242
243        let leader_index = *choices
244            .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
245            .expect("Weighted choice error: stake values incorrect!")
246            .skip(offset as usize)
247            .map(|(index, _)| index)
248            .next()
249            .unwrap();
250
251        leader_index
252    }
253
254    /// Atomically updates the `LeaderSwapTable` with the new provided one. Any
255    /// leader queried from now on will get calculated according to this swap
256    /// table until a new one is provided again.
257    fn update_leader_swap_table(&self, table: LeaderSwapTable) {
258        let read = self.leader_swap_table.read();
259        let old_commit_range = &read.reputation_scores.commit_range;
260        let new_commit_range = &table.reputation_scores.commit_range;
261
262        // Unless LeaderSchedule is brand new and using the default commit range
263        // of CommitRange(0..0) all future LeaderSwapTables should be calculated
264        // from a CommitRange of equal length and immediately following the
265        // preceding commit range of the old swap table.
266        if *old_commit_range != CommitRange::default() {
267            assert!(
268                old_commit_range.is_next_range(new_commit_range)
269                    && old_commit_range.is_equal_size(new_commit_range),
270                "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
271            );
272        }
273        drop(read);
274
275        tracing::trace!("Updating {table:?}");
276
277        let mut write = self.leader_swap_table.write();
278        *write = table;
279    }
280}
281
282#[derive(Default, Clone)]
283pub(crate) struct LeaderSwapTable {
284    /// The list of `f` (by configurable stake) authorities with best scores as
285    /// those defined by the provided `ReputationScores`. Those authorities will
286    /// be used in the position of the `bad_nodes` on the final leader schedule.
287    /// Storing the hostname & stake along side the authority index for
288    /// debugging.
289    pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
290
291    /// The set of `f` (by configurable stake) authorities with the worst scores
292    /// as those defined by the provided `ReputationScores`. Every time where
293    /// such authority is elected as leader on the schedule, it will swapped
294    /// by one of the authorities of the `good_nodes`.
295    /// Storing the hostname & stake along side the authority index for
296    /// debugging.
297    pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
298
299    /// Scores by authority in descending order, needed by other parts of the
300    /// system for a consistent view on how each validator performs in
301    /// consensus.
302    pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
303
304    // The scores for which the leader swap table was built from. This struct is
305    // used for debugging purposes. Once `good_nodes` & `bad_nodes` are identified
306    // the `reputation_scores` are no longer needed functionally for the swap table.
307    pub(crate) reputation_scores: ReputationScores,
308}
309
310impl LeaderSwapTable {
311    // Constructs a new table based on the provided reputation scores. The
312    // `swap_stake_threshold` designates the total (by stake) nodes that will be
313    // considered as "bad" based on their scores and will be replaced by good nodes.
314    // The `swap_stake_threshold` should be in the range of [0 - 33].
315    pub(crate) fn new(
316        context: Arc<Context>,
317        commit_index: CommitIndex,
318        reputation_scores: ReputationScores,
319    ) -> Self {
320        let swap_stake_threshold = context
321            .protocol_config
322            .consensus_bad_nodes_stake_threshold();
323        Self::new_inner(
324            context,
325            swap_stake_threshold,
326            commit_index,
327            reputation_scores,
328        )
329    }
330
331    fn new_inner(
332        context: Arc<Context>,
333        // Ignore linter warning in simtests.
334        // TODO: maybe override protocol configs in tests for swap_stake_threshold, and call new().
335        #[cfg_attr(msim, expect(unused_variables))] swap_stake_threshold: u64,
336        commit_index: CommitIndex,
337        reputation_scores: ReputationScores,
338    ) -> Self {
339        #[cfg(msim)]
340        let swap_stake_threshold = 33;
341
342        assert!(
343            (0..=33).contains(&swap_stake_threshold),
344            "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
345        );
346
347        // When reputation scores are disabled or at genesis, use the default value.
348        if reputation_scores.scores_per_authority.is_empty() {
349            return Self::default();
350        }
351
352        // Randomize order of authorities when they have the same score,
353        // to avoid bias in the selection of the good and bad nodes.
354        let mut seed_bytes = [0u8; 32];
355        seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
356        let mut rng = StdRng::from_seed(seed_bytes);
357        let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
358        assert_eq!(authorities_by_score.len(), context.committee.size());
359        authorities_by_score.shuffle(&mut rng);
360        // Stable sort the authorities by score descending. Order of authorities with
361        // the same score is preserved.
362        authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
363
364        // Calculating the good nodes
365        let good_nodes = Self::retrieve_first_nodes(
366            context.clone(),
367            authorities_by_score.iter(),
368            swap_stake_threshold,
369        )
370        .into_iter()
371        .collect::<Vec<(AuthorityIndex, String, Stake)>>();
372
373        // Calculating the bad nodes
374        // Reverse the sorted authorities to score ascending so we get the first
375        // low scorers up to the provided stake threshold.
376        let bad_nodes = Self::retrieve_first_nodes(
377            context.clone(),
378            authorities_by_score.iter().rev(),
379            swap_stake_threshold,
380        )
381        .into_iter()
382        .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
383        .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
384
385        good_nodes.iter().for_each(|(idx, hostname, stake)| {
386            tracing::debug!(
387                "Good node {hostname} with stake {stake} has score {} for {:?}",
388                reputation_scores.scores_per_authority[idx.to_owned()],
389                reputation_scores.commit_range,
390            );
391        });
392
393        bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
394            tracing::debug!(
395                "Bad node {hostname} with stake {stake} has score {} for {:?}",
396                reputation_scores.scores_per_authority[idx.to_owned()],
397                reputation_scores.commit_range,
398            );
399        });
400
401        tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
402
403        Self {
404            good_nodes,
405            bad_nodes,
406            reputation_scores_desc: authorities_by_score,
407            reputation_scores,
408        }
409    }
410
411    /// Checks whether the provided leader is a bad performer and needs to be
412    /// swapped in the schedule with a good performer. If not, then the method
413    /// returns None. Otherwise the leader to swap with is returned instead. The
414    /// `leader_round` & `leader_offset` represents the DAG slot on which the
415    /// provided `AuthorityIndex` is a leader on and is used as a seed to random
416    /// function in order to calculate the good node that will swap in that
417    /// round with the bad node. We are intentionally not doing weighted
418    /// randomness as we want to give to all the good nodes equal
419    /// opportunity to get swapped with bad nodes and nothave one node with
420    /// enough stake end up swapping bad nodes more frequently than the
421    /// others on the final schedule.
422    pub(crate) fn swap(
423        &self,
424        leader: AuthorityIndex,
425        leader_round: Round,
426        leader_offset: u32,
427    ) -> Option<AuthorityIndex> {
428        if self.bad_nodes.contains_key(&leader) {
429            // TODO: Re-work swap for the multileader case
430            assert!(
431                leader_offset == 0,
432                "Swap for multi-leader case not implemented yet."
433            );
434            let mut seed_bytes = [0u8; 32];
435            seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
436            seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
437            let mut rng = StdRng::from_seed(seed_bytes);
438
439            let (idx, _hostname, _stake) = self
440                .good_nodes
441                .choose(&mut rng)
442                .expect("There should be at least one good node available");
443
444            tracing::trace!(
445                "Swapping bad leader {} -> {} for round {}",
446                leader,
447                idx,
448                leader_round
449            );
450
451            return Some(*idx);
452        }
453        None
454    }
455
456    /// Retrieves the first nodes provided by the iterator `authorities` until
457    /// the `stake_threshold` has been reached. The `stake_threshold` should
458    /// be between [0, 100] and expresses the percentage of stake that is
459    /// considered the cutoff. It's the caller's responsibility to ensure
460    /// that the elements of the `authorities` input is already sorted.
461    fn retrieve_first_nodes<'a>(
462        context: Arc<Context>,
463        authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
464        stake_threshold: u64,
465    ) -> Vec<(AuthorityIndex, String, Stake)> {
466        let mut filtered_authorities = Vec::new();
467
468        let mut stake = 0;
469        for &(authority_idx, _score) in authorities {
470            stake += context.committee.stake(authority_idx);
471
472            // If the total accumulated stake has surpassed the stake threshold
473            // then we omit this last authority and we exit the loop. Important to
474            // note that this means if the threshold is too low we may not have
475            // any nodes returned.
476            if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
477                break;
478            }
479
480            let authority = context.committee.authority(authority_idx);
481            filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
482        }
483
484        filtered_authorities
485    }
486}
487
488impl Debug for LeaderSwapTable {
489    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
490        f.write_str(&format!(
491            "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
492            self.reputation_scores.commit_range,
493            self.good_nodes
494                .iter()
495                .map(|(idx, _hostname, _stake)| idx.to_owned())
496                .collect::<Vec<AuthorityIndex>>(),
497            self.good_nodes
498                .iter()
499                .map(|(_idx, _hostname, stake)| stake)
500                .sum::<Stake>(),
501            self.bad_nodes.keys().map(|idx| idx.to_owned()),
502            self.bad_nodes
503                .values()
504                .map(|(_hostname, stake)| stake)
505                .sum::<Stake>(),
506        ))
507    }
508}
509
510#[cfg(test)]
511mod tests {
512
513    use super::*;
514    use crate::{
515        block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
516        commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
517        storage::{Store, WriteBatch, mem_store::MemStore},
518        test_dag_builder::DagBuilder,
519    };
520
521    #[tokio::test]
522    async fn test_elect_leader() {
523        let context = Arc::new(Context::new_for_test(4).0);
524        let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
525
526        assert_eq!(
527            leader_schedule.elect_leader(0, 0),
528            AuthorityIndex::new_for_test(0)
529        );
530        assert_eq!(
531            leader_schedule.elect_leader(1, 0),
532            AuthorityIndex::new_for_test(1)
533        );
534        assert_eq!(
535            leader_schedule.elect_leader(5, 0),
536            AuthorityIndex::new_for_test(1)
537        );
538        // ensure we elect different leaders for the same round for the multi-leader
539        // case
540        assert_ne!(
541            leader_schedule.elect_leader_stake_based(1, 1),
542            leader_schedule.elect_leader_stake_based(1, 2)
543        );
544    }
545
546    #[tokio::test]
547    async fn test_elect_leader_stake_based() {
548        let context = Arc::new(Context::new_for_test(4).0);
549        let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
550
551        assert_eq!(
552            leader_schedule.elect_leader_stake_based(0, 0),
553            AuthorityIndex::new_for_test(1)
554        );
555        assert_eq!(
556            leader_schedule.elect_leader_stake_based(1, 0),
557            AuthorityIndex::new_for_test(1)
558        );
559        assert_eq!(
560            leader_schedule.elect_leader_stake_based(5, 0),
561            AuthorityIndex::new_for_test(3)
562        );
563        // ensure we elect different leaders for the same round for the multi-leader
564        // case
565        assert_ne!(
566            leader_schedule.elect_leader_stake_based(1, 1),
567            leader_schedule.elect_leader_stake_based(1, 2)
568        );
569    }
570
571    #[tokio::test]
572    async fn test_leader_schedule_from_store() {
573        telemetry_subscribers::init_for_testing();
574        let mut context = Context::new_for_test(4).0;
575        context
576            .protocol_config
577            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
578        let context = Arc::new(context);
579        let store = Arc::new(MemStore::new());
580
581        // Populate fully connected test blocks for round 0 ~ 11, authorities 0 ~ 3.
582        let mut dag_builder = DagBuilder::new(context.clone());
583        dag_builder.layers(1..=11).build();
584        let mut subdags = vec![];
585        let mut expected_commits = vec![];
586
587        let mut blocks_to_write = vec![];
588
589        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
590            for block in sub_dag.blocks.iter() {
591                blocks_to_write.push(block.clone());
592            }
593
594            expected_commits.push(commit);
595            subdags.push(sub_dag);
596        }
597
598        // The CommitInfo for the first 10 commits are written to store. This is the
599        // info that LeaderSchedule will be recovered from
600        let commit_range = (1..=10).into();
601        let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
602        let committed_rounds = vec![9, 9, 10, 9];
603        let commit_ref = expected_commits[9].reference();
604        let commit_info = CommitInfo {
605            reputation_scores,
606            committed_rounds,
607        };
608
609        // CommitIndex '11' will be written to store. This should result in the cached
610        // last_committed_rounds & unscored subdags in DagState to be updated with the
611        // latest commit information on recovery.
612        store
613            .write(
614                WriteBatch::default()
615                    .commit_info(vec![(commit_ref, commit_info)])
616                    .blocks(blocks_to_write)
617                    .commits(expected_commits),
618            )
619            .unwrap();
620
621        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
622
623        // Check that DagState recovery from stored CommitInfo worked correctly
624        assert_eq!(
625            dag_builder.last_committed_rounds.clone(),
626            dag_state.read().last_committed_rounds()
627        );
628        assert_eq!(1, dag_state.read().scoring_subdags_count());
629        let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
630        let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
631        assert_eq!(recovered_scores, expected_scores);
632
633        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
634
635        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
636        let leader_swap_table = leader_schedule.leader_swap_table.read();
637        assert_eq!(leader_swap_table.good_nodes.len(), 1);
638        assert_eq!(
639            leader_swap_table.good_nodes[0].0,
640            AuthorityIndex::new_for_test(0)
641        );
642        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
643        assert!(
644            leader_swap_table
645                .bad_nodes
646                .contains_key(&AuthorityIndex::new_for_test(2)),
647            "{:?}",
648            leader_swap_table.bad_nodes
649        );
650    }
651
652    #[tokio::test]
653    async fn test_leader_schedule_from_store_no_commits() {
654        telemetry_subscribers::init_for_testing();
655        let mut context = Context::new_for_test(4).0;
656        context
657            .protocol_config
658            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
659        let context = Arc::new(context);
660        let store = Arc::new(MemStore::new());
661
662        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
663
664        let expected_last_committed_rounds = vec![0, 0, 0, 0];
665
666        // Check that DagState recovery from stored CommitInfo worked correctly
667        assert_eq!(
668            expected_last_committed_rounds,
669            dag_state.read().last_committed_rounds()
670        );
671        assert_eq!(0, dag_state.read().scoring_subdags_count());
672
673        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
674
675        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
676        let leader_swap_table = leader_schedule.leader_swap_table.read();
677        assert_eq!(leader_swap_table.good_nodes.len(), 0);
678        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
679    }
680
681    #[tokio::test]
682    async fn test_leader_schedule_from_store_no_commit_info() {
683        telemetry_subscribers::init_for_testing();
684        let mut context = Context::new_for_test(4).0;
685        context
686            .protocol_config
687            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
688        let context = Arc::new(context);
689        let store = Arc::new(MemStore::new());
690
691        // Populate fully connected test blocks for round 0 ~ 2, authorities 0 ~ 3.
692        let mut dag_builder = DagBuilder::new(context.clone());
693        dag_builder.layers(1..=2).build();
694
695        let mut expected_scored_subdags = vec![];
696        let mut expected_commits = vec![];
697
698        let mut blocks_to_write = vec![];
699
700        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
701            for block in sub_dag.blocks.iter() {
702                blocks_to_write.push(block.clone());
703            }
704            expected_commits.push(commit);
705            expected_scored_subdags.push(sub_dag);
706        }
707
708        // The CommitInfo for the first 2 commits are written to store. 10 commits
709        // would have been required for a leader schedule update so at this point
710        // no commit info should have been persisted and no leader schedule should
711        // be recovered. However dag state should have properly recovered the
712        // unscored subdags & last committed rounds.
713        store
714            .write(
715                WriteBatch::default()
716                    .blocks(blocks_to_write)
717                    .commits(expected_commits),
718            )
719            .unwrap();
720
721        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
722
723        // Check that DagState recovery from stored CommitInfo worked correctly
724        assert_eq!(
725            dag_builder.last_committed_rounds.clone(),
726            dag_state.read().last_committed_rounds()
727        );
728        assert_eq!(
729            expected_scored_subdags.len(),
730            dag_state.read().scoring_subdags_count()
731        );
732        let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
733        let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
734        assert_eq!(recovered_scores, expected_scores);
735
736        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
737
738        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
739        let leader_swap_table = leader_schedule.leader_swap_table.read();
740        assert_eq!(leader_swap_table.good_nodes.len(), 0);
741        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
742    }
743
744    #[tokio::test]
745    async fn test_leader_schedule_commits_until_leader_schedule_update() {
746        telemetry_subscribers::init_for_testing();
747        let context = Arc::new(Context::new_for_test(4).0);
748        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
749
750        let dag_state = Arc::new(RwLock::new(DagState::new(
751            context.clone(),
752            Arc::new(MemStore::new()),
753        )));
754        let unscored_subdags = vec![CommittedSubDag::new(
755            BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
756            vec![],
757            context.clock.timestamp_utc_ms(),
758            CommitRef::new(1, CommitDigest::MIN),
759            vec![],
760        )];
761        dag_state.write().add_scoring_subdags(unscored_subdags);
762
763        let commits_until_leader_schedule_update =
764            leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
765        assert_eq!(commits_until_leader_schedule_update, 299);
766    }
767
768    #[tokio::test]
769    async fn test_leader_schedule_update_leader_schedule() {
770        telemetry_subscribers::init_for_testing();
771        let mut context = Context::new_for_test(4).0;
772        context
773            .protocol_config
774            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
775        let context = Arc::new(context);
776        let leader_schedule = Arc::new(LeaderSchedule::new(
777            context.clone(),
778            LeaderSwapTable::default(),
779        ));
780        let dag_state = Arc::new(RwLock::new(DagState::new(
781            context.clone(),
782            Arc::new(MemStore::new()),
783        )));
784
785        // Populate fully connected test blocks for round 0 ~ 4, authorities 0 ~ 3.
786        let max_round: u32 = 4;
787        let num_authorities: u32 = 4;
788
789        let mut blocks = Vec::new();
790        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
791            .committee
792            .authorities()
793            .map(|index| {
794                let author_idx = index.0.value() as u32;
795                let block = TestBlock::new(0, author_idx).build();
796                VerifiedBlock::new_for_test(block)
797            })
798            .map(|block| (block.reference(), block))
799            .unzip();
800        blocks.extend(genesis);
801
802        let mut ancestors = genesis_references;
803        let mut leader = None;
804        for round in 1..=max_round {
805            let mut new_ancestors = vec![];
806            for author in 0..num_authorities {
807                let base_ts = round as BlockTimestampMs * 1000;
808                let block = VerifiedBlock::new_for_test(
809                    TestBlock::new(round, author)
810                        .set_timestamp_ms(base_ts + (author + round) as u64)
811                        .set_ancestors(ancestors.clone())
812                        .build(),
813                );
814                new_ancestors.push(block.reference());
815
816                // Simulate referenced block which was part of another committed
817                // subdag.
818                if round == 3 && author == 0 {
819                    tracing::info!("Skipping {block} in committed subdags blocks");
820                    continue;
821                }
822
823                blocks.push(block.clone());
824
825                // only write one block for the final round, which is the leader
826                // of the committed subdag.
827                if round == max_round {
828                    leader = Some(block.clone());
829                    break;
830                }
831            }
832            ancestors = new_ancestors;
833        }
834
835        let leader_block = leader.unwrap();
836        let leader_ref = leader_block.reference();
837        let commit_index = 1;
838
839        let last_commit = TrustedCommit::new_for_test(
840            commit_index,
841            CommitDigest::MIN,
842            context.clock.timestamp_utc_ms(),
843            leader_ref,
844            blocks
845                .iter()
846                .map(|block| block.reference())
847                .collect::<Vec<_>>(),
848        );
849
850        let unscored_subdags = vec![CommittedSubDag::new(
851            leader_ref,
852            blocks,
853            context.clock.timestamp_utc_ms(),
854            last_commit.reference(),
855            vec![],
856        )];
857
858        let mut dag_state_write = dag_state.write();
859        dag_state_write.set_last_commit(last_commit);
860        dag_state_write.add_scoring_subdags(unscored_subdags);
861        drop(dag_state_write);
862
863        assert_eq!(
864            leader_schedule.elect_leader(4, 0),
865            AuthorityIndex::new_for_test(0)
866        );
867
868        leader_schedule.update_leader_schedule_v2(&dag_state);
869
870        let leader_swap_table = leader_schedule.leader_swap_table.read();
871        assert_eq!(leader_swap_table.good_nodes.len(), 1);
872        assert_eq!(
873            leader_swap_table.good_nodes[0].0,
874            AuthorityIndex::new_for_test(2)
875        );
876        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
877        assert!(
878            leader_swap_table
879                .bad_nodes
880                .contains_key(&AuthorityIndex::new_for_test(0))
881        );
882        assert_eq!(
883            leader_schedule.elect_leader(4, 0),
884            AuthorityIndex::new_for_test(2)
885        );
886    }
887
888    #[tokio::test]
889    async fn test_leader_schedule_from_store_with_vote_scoring() {
890        telemetry_subscribers::init_for_testing();
891        let mut context = Context::new_for_test(4).0;
892        context
893            .protocol_config
894            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
895        context
896            .protocol_config
897            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
898        let context = Arc::new(context);
899        let store = Arc::new(MemStore::new());
900        // Populate fully connected test blocks for round 0 ~ 11, authorities 0 ~ 3.
901        let mut dag_builder = DagBuilder::new(context.clone());
902        dag_builder.layers(1..=11).build();
903        let mut subdags = vec![];
904        let mut expected_commits = vec![];
905        let mut blocks_to_write = vec![];
906
907        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
908            for block in sub_dag.blocks.iter() {
909                blocks_to_write.push(block.clone());
910            }
911            expected_commits.push(commit);
912            subdags.push(sub_dag);
913        }
914        // The CommitInfo for the first 10 commits are written to store. This is the
915        // info that LeaderSchedule will be recovered from
916        let commit_range = (1..=10).into();
917        let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
918        let committed_rounds = vec![9, 9, 10, 9];
919        let commit_ref = expected_commits[9].reference();
920        let commit_info = CommitInfo {
921            reputation_scores,
922            committed_rounds,
923        };
924        // CommitIndex '11' will be written to store. This should result in the cached
925        // last_committed_rounds & unscored subdags in DagState to be updated with the
926        // latest commit information on recovery.
927        store
928            .write(
929                WriteBatch::default()
930                    .commit_info(vec![(commit_ref, commit_info)])
931                    .blocks(blocks_to_write)
932                    .commits(expected_commits),
933            )
934            .unwrap();
935        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
936        // Check that DagState recovery from stored CommitInfo worked correctly
937        assert_eq!(
938            dag_builder.last_committed_rounds.clone(),
939            dag_state.read().last_committed_rounds()
940        );
941        let actual_unscored_subdags = dag_state.read().unscored_committed_subdags();
942        assert_eq!(1, dag_state.read().unscored_committed_subdags_count());
943        let actual_subdag = actual_unscored_subdags[0].clone();
944        assert_eq!(*subdags.last().unwrap(), actual_subdag);
945        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
946        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
947        let leader_swap_table = leader_schedule.leader_swap_table.read();
948        assert_eq!(leader_swap_table.good_nodes.len(), 1);
949        assert_eq!(
950            leader_swap_table.good_nodes[0].0,
951            AuthorityIndex::new_for_test(0)
952        );
953        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
954        assert!(
955            leader_swap_table
956                .bad_nodes
957                .contains_key(&AuthorityIndex::new_for_test(2)),
958            "{:?}",
959            leader_swap_table.bad_nodes
960        );
961    }
962
963    #[tokio::test]
964    async fn test_leader_schedule_from_store_no_commits_with_vote_scoring() {
965        telemetry_subscribers::init_for_testing();
966        let mut context = Context::new_for_test(4).0;
967        context
968            .protocol_config
969            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
970        context
971            .protocol_config
972            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
973        let context = Arc::new(context);
974        let store = Arc::new(MemStore::new());
975        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
976        let expected_last_committed_rounds = vec![0, 0, 0, 0];
977        // Check that DagState recovery from stored CommitInfo worked correctly
978        assert_eq!(
979            expected_last_committed_rounds,
980            dag_state.read().last_committed_rounds()
981        );
982        assert_eq!(0, dag_state.read().unscored_committed_subdags_count());
983        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
984        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
985        let leader_swap_table = leader_schedule.leader_swap_table.read();
986        assert_eq!(leader_swap_table.good_nodes.len(), 0);
987        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
988    }
989
990    #[tokio::test]
991    async fn test_leader_schedule_from_store_no_commit_info_with_vote_scoring() {
992        telemetry_subscribers::init_for_testing();
993        let mut context = Context::new_for_test(4).0;
994        context
995            .protocol_config
996            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
997        context
998            .protocol_config
999            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1000        let context = Arc::new(context);
1001        let store = Arc::new(MemStore::new());
1002        // Populate fully connected test blocks for round 0 ~ 2, authorities 0 ~ 3.
1003        let mut dag_builder = DagBuilder::new(context.clone());
1004        dag_builder.layers(1..=2).build();
1005
1006        let mut expected_unscored_subdags = vec![];
1007        let mut expected_commits = vec![];
1008        let mut blocks_to_write = vec![];
1009
1010        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
1011            for block in sub_dag.blocks.iter() {
1012                blocks_to_write.push(block.clone());
1013            }
1014            expected_commits.push(commit);
1015            expected_unscored_subdags.push(sub_dag);
1016        }
1017        // The CommitInfo for the first 2 commits are written to store. 10 commits
1018        // would have been required for a leader schedule update so at this point
1019        // no commit info should have been persisted and no leader schedule should
1020        // be recovered. However dag state should have properly recovered the
1021        // unscored subdags & last committed rounds.
1022        store
1023            .write(
1024                WriteBatch::default()
1025                    .blocks(blocks_to_write)
1026                    .commits(expected_commits),
1027            )
1028            .unwrap();
1029        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1030        // Check that DagState recovery from stored CommitInfo worked correctly
1031        assert_eq!(
1032            dag_builder.last_committed_rounds.clone(),
1033            dag_state.read().last_committed_rounds()
1034        );
1035        let actual_unscored_subdags = dag_state.read().unscored_committed_subdags();
1036        assert_eq!(
1037            expected_unscored_subdags.len() as u64,
1038            dag_state.read().unscored_committed_subdags_count()
1039        );
1040        for (idx, expected_subdag) in expected_unscored_subdags.into_iter().enumerate() {
1041            let actual_subdag = actual_unscored_subdags[idx].clone();
1042            assert_eq!(expected_subdag, actual_subdag);
1043        }
1044        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
1045        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
1046        let leader_swap_table = leader_schedule.leader_swap_table.read();
1047        assert_eq!(leader_swap_table.good_nodes.len(), 0);
1048        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
1049    }
1050
1051    #[tokio::test]
1052    async fn test_leader_schedule_commits_until_leader_schedule_update_with_vote_scoring() {
1053        telemetry_subscribers::init_for_testing();
1054        let mut context = Context::new_for_test(4).0;
1055        context
1056            .protocol_config
1057            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1058        let context = Arc::new(context);
1059        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
1060        let dag_state = Arc::new(RwLock::new(DagState::new(
1061            context.clone(),
1062            Arc::new(MemStore::new()),
1063        )));
1064        let unscored_subdags = vec![CommittedSubDag::new(
1065            BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
1066            vec![],
1067            context.clock.timestamp_utc_ms(),
1068            CommitRef::new(1, CommitDigest::MIN),
1069            vec![],
1070        )];
1071        dag_state
1072            .write()
1073            .add_unscored_committed_subdags(unscored_subdags);
1074        let commits_until_leader_schedule_update =
1075            leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
1076        assert_eq!(commits_until_leader_schedule_update, 299);
1077    }
1078
1079    // TODO: Remove when DistributedVoteScoring is enabled.
1080    #[tokio::test]
1081    async fn test_leader_schedule_update_leader_schedule_with_vote_scoring() {
1082        telemetry_subscribers::init_for_testing();
1083        let mut context = Context::new_for_test(4).0;
1084        context
1085            .protocol_config
1086            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1087        context
1088            .protocol_config
1089            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1090        let context = Arc::new(context);
1091        let leader_schedule = Arc::new(LeaderSchedule::new(
1092            context.clone(),
1093            LeaderSwapTable::default(),
1094        ));
1095        let dag_state = Arc::new(RwLock::new(DagState::new(
1096            context.clone(),
1097            Arc::new(MemStore::new()),
1098        )));
1099        // Populate fully connected test blocks for round 0 ~ 4, authorities 0 ~ 3.
1100        let max_round: u32 = 4;
1101        let num_authorities: u32 = 4;
1102        let mut blocks = Vec::new();
1103        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
1104            .committee
1105            .authorities()
1106            .map(|index| {
1107                let author_idx = index.0.value() as u32;
1108                let block = TestBlock::new(0, author_idx).build();
1109                VerifiedBlock::new_for_test(block)
1110            })
1111            .map(|block| (block.reference(), block))
1112            .unzip();
1113        blocks.extend(genesis);
1114        let mut ancestors = genesis_references;
1115        let mut leader = None;
1116        for round in 1..=max_round {
1117            let mut new_ancestors = vec![];
1118            for author in 0..num_authorities {
1119                let base_ts = round as BlockTimestampMs * 1000;
1120                let block = VerifiedBlock::new_for_test(
1121                    TestBlock::new(round, author)
1122                        .set_timestamp_ms(base_ts + (author + round) as u64)
1123                        .set_ancestors(ancestors.clone())
1124                        .build(),
1125                );
1126                new_ancestors.push(block.reference());
1127                // Simulate referenced block which was part of another committed
1128                // subdag.
1129                if round == 3 && author == 0 {
1130                    tracing::info!("Skipping {block} in committed subdags blocks");
1131                    continue;
1132                }
1133                blocks.push(block.clone());
1134                // only write one block for the final round, which is the leader
1135                // of the committed subdag.
1136                if round == max_round {
1137                    leader = Some(block.clone());
1138                    break;
1139                }
1140            }
1141            ancestors = new_ancestors;
1142        }
1143        let leader_block = leader.unwrap();
1144        let leader_ref = leader_block.reference();
1145        let commit_index = 1;
1146        let last_commit = TrustedCommit::new_for_test(
1147            commit_index,
1148            CommitDigest::MIN,
1149            context.clock.timestamp_utc_ms(),
1150            leader_ref,
1151            blocks
1152                .iter()
1153                .map(|block| block.reference())
1154                .collect::<Vec<_>>(),
1155        );
1156        let unscored_subdags = vec![CommittedSubDag::new(
1157            leader_ref,
1158            blocks,
1159            context.clock.timestamp_utc_ms(),
1160            last_commit.reference(),
1161            vec![],
1162        )];
1163        let mut dag_state_write = dag_state.write();
1164        dag_state_write.set_last_commit(last_commit);
1165        dag_state_write.add_unscored_committed_subdags(unscored_subdags);
1166        drop(dag_state_write);
1167        assert_eq!(
1168            leader_schedule.elect_leader(4, 0),
1169            AuthorityIndex::new_for_test(0)
1170        );
1171        leader_schedule.update_leader_schedule_v1(&dag_state);
1172        let leader_swap_table = leader_schedule.leader_swap_table.read();
1173        assert_eq!(leader_swap_table.good_nodes.len(), 1);
1174        assert_eq!(
1175            leader_swap_table.good_nodes[0].0,
1176            AuthorityIndex::new_for_test(2)
1177        );
1178        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
1179        assert!(
1180            leader_swap_table
1181                .bad_nodes
1182                .contains_key(&AuthorityIndex::new_for_test(0))
1183        );
1184        assert_eq!(
1185            leader_schedule.elect_leader(4, 0),
1186            AuthorityIndex::new_for_test(2)
1187        );
1188    }
1189
1190    #[tokio::test]
1191    async fn test_leader_swap_table() {
1192        telemetry_subscribers::init_for_testing();
1193        let context = Arc::new(Context::new_for_test(4).0);
1194
1195        let swap_stake_threshold = 33;
1196        let reputation_scores = ReputationScores::new(
1197            (0..=10).into(),
1198            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1199        );
1200        let leader_swap_table =
1201            LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
1202
1203        assert_eq!(leader_swap_table.good_nodes.len(), 1);
1204        assert_eq!(
1205            leader_swap_table.good_nodes[0].0,
1206            AuthorityIndex::new_for_test(3)
1207        );
1208        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
1209        assert!(
1210            leader_swap_table
1211                .bad_nodes
1212                .contains_key(&AuthorityIndex::new_for_test(0))
1213        );
1214    }
1215
1216    #[tokio::test]
1217    async fn test_leader_swap_table_swap() {
1218        telemetry_subscribers::init_for_testing();
1219        let context = Arc::new(Context::new_for_test(4).0);
1220
1221        let swap_stake_threshold = 33;
1222        let reputation_scores = ReputationScores::new(
1223            (0..=10).into(),
1224            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1225        );
1226        let leader_swap_table =
1227            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1228
1229        // Test swapping a bad leader
1230        let leader = AuthorityIndex::new_for_test(0);
1231        let leader_round = 1;
1232        let leader_offset = 0;
1233        let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
1234        assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
1235
1236        // Test not swapping a good leader
1237        let leader = AuthorityIndex::new_for_test(1);
1238        let leader_round = 1;
1239        let leader_offset = 0;
1240        let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
1241        assert_eq!(swapped_leader, None);
1242    }
1243
1244    #[tokio::test]
1245    async fn test_leader_swap_table_retrieve_first_nodes() {
1246        telemetry_subscribers::init_for_testing();
1247        let context = Arc::new(Context::new_for_test(4).0);
1248
1249        let authorities = [
1250            (AuthorityIndex::new_for_test(0), 1),
1251            (AuthorityIndex::new_for_test(1), 2),
1252            (AuthorityIndex::new_for_test(2), 3),
1253            (AuthorityIndex::new_for_test(3), 4),
1254        ];
1255
1256        let stake_threshold = 50;
1257        let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
1258            context.clone(),
1259            authorities.iter(),
1260            stake_threshold,
1261        );
1262
1263        // Test setup includes 4 validators with even stake. Therefore with a
1264        // stake_threshold of 50% we should see 2 validators filtered.
1265        assert_eq!(filtered_authorities.len(), 2);
1266        let authority_0_idx = AuthorityIndex::new_for_test(0);
1267        let authority_0 = context.committee.authority(authority_0_idx);
1268        assert!(filtered_authorities.contains(&(
1269            authority_0_idx,
1270            authority_0.hostname.clone(),
1271            authority_0.stake
1272        )));
1273        let authority_1_idx = AuthorityIndex::new_for_test(1);
1274        let authority_1 = context.committee.authority(authority_1_idx);
1275        assert!(filtered_authorities.contains(&(
1276            authority_1_idx,
1277            authority_1.hostname.clone(),
1278            authority_1.stake
1279        )));
1280    }
1281
1282    #[tokio::test]
1283    #[should_panic(
1284        expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
1285    )]
1286    async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
1287        telemetry_subscribers::init_for_testing();
1288        let context = Arc::new(Context::new_for_test(4).0);
1289
1290        let swap_stake_threshold = 34;
1291        let reputation_scores = ReputationScores::new(
1292            (0..=10).into(),
1293            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1294        );
1295        LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
1296    }
1297
1298    #[tokio::test]
1299    async fn test_update_leader_swap_table() {
1300        telemetry_subscribers::init_for_testing();
1301        let context = Arc::new(Context::new_for_test(4).0);
1302
1303        let swap_stake_threshold = 33;
1304        let reputation_scores = ReputationScores::new(
1305            (1..=10).into(),
1306            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1307        );
1308        let leader_swap_table =
1309            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1310
1311        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
1312
1313        // Update leader from brand new schedule to first real schedule
1314        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1315
1316        let reputation_scores = ReputationScores::new(
1317            (11..=20).into(),
1318            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1319        );
1320        let leader_swap_table =
1321            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1322
1323        // Update leader from old swap table to new valid swap table
1324        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1325    }
1326
1327    #[tokio::test]
1328    #[should_panic(
1329        expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
1330    )]
1331    async fn test_update_bad_leader_swap_table() {
1332        telemetry_subscribers::init_for_testing();
1333        let context = Arc::new(Context::new_for_test(4).0);
1334
1335        let swap_stake_threshold = 33;
1336        let reputation_scores = ReputationScores::new(
1337            (1..=10).into(),
1338            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1339        );
1340        let leader_swap_table =
1341            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1342
1343        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
1344
1345        // Update leader from brand new schedule to first real schedule
1346        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1347
1348        let reputation_scores = ReputationScores::new(
1349            (11..=20).into(),
1350            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1351        );
1352        let leader_swap_table =
1353            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1354
1355        // Update leader from old swap table to new valid swap table
1356        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1357
1358        let reputation_scores = ReputationScores::new(
1359            (21..=25).into(),
1360            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1361        );
1362        let leader_swap_table =
1363            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1364
1365        // Update leader from old swap table to new invalid swap table
1366        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1367    }
1368}