1use std::{
6 collections::BTreeMap,
7 fmt::{Debug, Formatter},
8 sync::Arc,
9};
10
11use consensus_config::{AuthorityIndex, Stake};
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16 CommitIndex, Round,
17 commit::CommitRange,
18 context::Context,
19 dag_state::DagState,
20 leader_scoring::{ReputationScoreCalculator, ReputationScores},
21};
22
23#[derive(Clone)]
27pub(crate) struct LeaderSchedule {
28 pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
29 context: Arc<Context>,
30 num_commits_per_schedule: u64,
31}
32
33impl LeaderSchedule {
34 #[cfg(not(msim))]
38 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
39 #[cfg(msim)]
40 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
41
42 pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
43 Self {
44 context,
45 num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
46 leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
47 }
48 }
49
50 #[cfg(test)]
51 pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
52 self.num_commits_per_schedule = num_commits_per_schedule;
53 self
54 }
55
56 pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60 let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
61 LeaderSwapTable::default(),
62 |(last_commit_ref, last_commit_info)| {
63 LeaderSwapTable::new(
64 context.clone(),
65 last_commit_ref.index,
66 last_commit_info.reputation_scores,
67 )
68 },
69 );
70
71 if context
72 .protocol_config
73 .consensus_distributed_vote_scoring_strategy()
74 {
75 tracing::info!(
76 "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
77 dag_state.read().scoring_subdags_count(),
78 );
79 } else {
80 tracing::info!(
82 "LeaderSchedule recovered using {leader_swap_table:?}. There are {} pending unscored subdags in DagState.",
83 dag_state.read().unscored_committed_subdags_count(),
84 );
85 }
86
87 Self::new(context, leader_swap_table)
89 }
90
91 pub(crate) fn commits_until_leader_schedule_update(
92 &self,
93 dag_state: Arc<RwLock<DagState>>,
94 ) -> usize {
95 let subdag_count = if self
96 .context
97 .protocol_config
98 .consensus_distributed_vote_scoring_strategy()
99 {
100 dag_state.read().scoring_subdags_count() as u64
101 } else {
102 dag_state.read().unscored_committed_subdags_count()
104 };
105 assert!(
106 subdag_count <= self.num_commits_per_schedule,
107 "Committed subdags count exceeds the number of commits per schedule"
108 );
109 self.num_commits_per_schedule
110 .checked_sub(subdag_count)
111 .unwrap() as usize
112 }
113
114 pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
120 if self
121 .context
122 .protocol_config
123 .consensus_distributed_vote_scoring_strategy()
124 {
125 dag_state.read().is_scoring_subdag_empty()
126 } else {
127 dag_state.read().unscored_committed_subdags_count() == 0
129 }
130 }
131
132 pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
133 let _s = self
134 .context
135 .metrics
136 .node_metrics
137 .scope_processing_time
138 .with_label_values(&["LeaderSchedule::update_leader_schedule"])
139 .start_timer();
140 let (reputation_scores, last_commit_index) = {
141 let dag_state = dag_state.read();
142 let reputation_scores = dag_state.calculate_scoring_subdag_scores();
143 let last_commit_index = dag_state.scoring_subdag_commit_range();
144 (reputation_scores, last_commit_index)
145 };
146 {
147 let mut dag_state = dag_state.write();
148 dag_state.clear_scoring_subdag();
150 dag_state.add_commit_info(reputation_scores.clone());
152 }
153 self.update_leader_swap_table(LeaderSwapTable::new(
154 self.context.clone(),
155 last_commit_index,
156 reputation_scores.clone(),
157 ));
158 reputation_scores.update_metrics(self.context.clone());
159 self.context
160 .metrics
161 .node_metrics
162 .num_of_bad_nodes
163 .set(self.leader_swap_table.read().bad_nodes.len() as i64);
164 }
165
166 pub(crate) fn update_leader_schedule_v1(&self, dag_state: &RwLock<DagState>) {
168 let _s = self
169 .context
170 .metrics
171 .node_metrics
172 .scope_processing_time
173 .with_label_values(&["LeaderSchedule::update_leader_schedule"])
174 .start_timer();
175
176 let mut dag_state = dag_state.write();
177 let unscored_subdags = dag_state.take_unscored_committed_subdags();
178
179 let score_calculation_timer = self
180 .context
181 .metrics
182 .node_metrics
183 .scope_processing_time
184 .with_label_values(&["ReputationScoreCalculator::calculate"])
185 .start_timer();
186 let reputation_scores =
187 ReputationScoreCalculator::new(self.context.clone(), &unscored_subdags).calculate();
188 drop(score_calculation_timer);
189
190 reputation_scores.update_metrics(self.context.clone());
191
192 let last_commit_index = unscored_subdags.last().unwrap().commit_ref.index;
193 self.update_leader_swap_table(LeaderSwapTable::new(
194 self.context.clone(),
195 last_commit_index,
196 reputation_scores.clone(),
197 ));
198
199 self.context
200 .metrics
201 .node_metrics
202 .num_of_bad_nodes
203 .set(self.leader_swap_table.read().bad_nodes.len() as i64);
204
205 dag_state.add_commit_info(reputation_scores);
207 }
208
209 pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
210 cfg_if::cfg_if! {
211 if #[cfg(test)] {
214 let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
215 let table = self.leader_swap_table.read();
216 table.swap(leader, round, leader_offset).unwrap_or(leader)
217 } else {
218 let leader = self.elect_leader_stake_based(round, leader_offset);
219 let table = self.leader_swap_table.read();
220 table.swap(leader, round, leader_offset).unwrap_or(leader)
221 }
222 }
223 }
224
225 pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
226 assert!((offset as usize) < self.context.committee.size());
227
228 let mut seed_bytes = [0u8; 32];
233 seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
234 let mut rng = StdRng::from_seed(seed_bytes);
235
236 let choices = self
237 .context
238 .committee
239 .authorities()
240 .map(|(index, authority)| (index, authority.stake as f32))
241 .collect::<Vec<_>>();
242
243 let leader_index = *choices
244 .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
245 .expect("Weighted choice error: stake values incorrect!")
246 .skip(offset as usize)
247 .map(|(index, _)| index)
248 .next()
249 .unwrap();
250
251 leader_index
252 }
253
254 fn update_leader_swap_table(&self, table: LeaderSwapTable) {
258 let read = self.leader_swap_table.read();
259 let old_commit_range = &read.reputation_scores.commit_range;
260 let new_commit_range = &table.reputation_scores.commit_range;
261
262 if *old_commit_range != CommitRange::default() {
267 assert!(
268 old_commit_range.is_next_range(new_commit_range)
269 && old_commit_range.is_equal_size(new_commit_range),
270 "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
271 );
272 }
273 drop(read);
274
275 tracing::trace!("Updating {table:?}");
276
277 let mut write = self.leader_swap_table.write();
278 *write = table;
279 }
280}
281
282#[derive(Default, Clone)]
283pub(crate) struct LeaderSwapTable {
284 pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
290
291 pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
298
299 pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
303
304 pub(crate) reputation_scores: ReputationScores,
308}
309
310impl LeaderSwapTable {
311 pub(crate) fn new(
316 context: Arc<Context>,
317 commit_index: CommitIndex,
318 reputation_scores: ReputationScores,
319 ) -> Self {
320 let swap_stake_threshold = context
321 .protocol_config
322 .consensus_bad_nodes_stake_threshold();
323 Self::new_inner(
324 context,
325 swap_stake_threshold,
326 commit_index,
327 reputation_scores,
328 )
329 }
330
331 fn new_inner(
332 context: Arc<Context>,
333 #[cfg_attr(msim, expect(unused_variables))] swap_stake_threshold: u64,
336 commit_index: CommitIndex,
337 reputation_scores: ReputationScores,
338 ) -> Self {
339 #[cfg(msim)]
340 let swap_stake_threshold = 33;
341
342 assert!(
343 (0..=33).contains(&swap_stake_threshold),
344 "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
345 );
346
347 if reputation_scores.scores_per_authority.is_empty() {
349 return Self::default();
350 }
351
352 let mut seed_bytes = [0u8; 32];
355 seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
356 let mut rng = StdRng::from_seed(seed_bytes);
357 let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
358 assert_eq!(authorities_by_score.len(), context.committee.size());
359 authorities_by_score.shuffle(&mut rng);
360 authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
363
364 let good_nodes = Self::retrieve_first_nodes(
366 context.clone(),
367 authorities_by_score.iter(),
368 swap_stake_threshold,
369 )
370 .into_iter()
371 .collect::<Vec<(AuthorityIndex, String, Stake)>>();
372
373 let bad_nodes = Self::retrieve_first_nodes(
377 context.clone(),
378 authorities_by_score.iter().rev(),
379 swap_stake_threshold,
380 )
381 .into_iter()
382 .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
383 .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
384
385 good_nodes.iter().for_each(|(idx, hostname, stake)| {
386 tracing::debug!(
387 "Good node {hostname} with stake {stake} has score {} for {:?}",
388 reputation_scores.scores_per_authority[idx.to_owned()],
389 reputation_scores.commit_range,
390 );
391 });
392
393 bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
394 tracing::debug!(
395 "Bad node {hostname} with stake {stake} has score {} for {:?}",
396 reputation_scores.scores_per_authority[idx.to_owned()],
397 reputation_scores.commit_range,
398 );
399 });
400
401 tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
402
403 Self {
404 good_nodes,
405 bad_nodes,
406 reputation_scores_desc: authorities_by_score,
407 reputation_scores,
408 }
409 }
410
411 pub(crate) fn swap(
423 &self,
424 leader: AuthorityIndex,
425 leader_round: Round,
426 leader_offset: u32,
427 ) -> Option<AuthorityIndex> {
428 if self.bad_nodes.contains_key(&leader) {
429 assert!(
431 leader_offset == 0,
432 "Swap for multi-leader case not implemented yet."
433 );
434 let mut seed_bytes = [0u8; 32];
435 seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
436 seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
437 let mut rng = StdRng::from_seed(seed_bytes);
438
439 let (idx, _hostname, _stake) = self
440 .good_nodes
441 .choose(&mut rng)
442 .expect("There should be at least one good node available");
443
444 tracing::trace!(
445 "Swapping bad leader {} -> {} for round {}",
446 leader,
447 idx,
448 leader_round
449 );
450
451 return Some(*idx);
452 }
453 None
454 }
455
456 fn retrieve_first_nodes<'a>(
462 context: Arc<Context>,
463 authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
464 stake_threshold: u64,
465 ) -> Vec<(AuthorityIndex, String, Stake)> {
466 let mut filtered_authorities = Vec::new();
467
468 let mut stake = 0;
469 for &(authority_idx, _score) in authorities {
470 stake += context.committee.stake(authority_idx);
471
472 if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
477 break;
478 }
479
480 let authority = context.committee.authority(authority_idx);
481 filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
482 }
483
484 filtered_authorities
485 }
486}
487
488impl Debug for LeaderSwapTable {
489 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
490 f.write_str(&format!(
491 "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
492 self.reputation_scores.commit_range,
493 self.good_nodes
494 .iter()
495 .map(|(idx, _hostname, _stake)| idx.to_owned())
496 .collect::<Vec<AuthorityIndex>>(),
497 self.good_nodes
498 .iter()
499 .map(|(_idx, _hostname, stake)| stake)
500 .sum::<Stake>(),
501 self.bad_nodes.keys().map(|idx| idx.to_owned()),
502 self.bad_nodes
503 .values()
504 .map(|(_hostname, stake)| stake)
505 .sum::<Stake>(),
506 ))
507 }
508}
509
510#[cfg(test)]
511mod tests {
512
513 use super::*;
514 use crate::{
515 block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
516 commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
517 storage::{Store, WriteBatch, mem_store::MemStore},
518 test_dag_builder::DagBuilder,
519 };
520
521 #[tokio::test]
522 async fn test_elect_leader() {
523 let context = Arc::new(Context::new_for_test(4).0);
524 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
525
526 assert_eq!(
527 leader_schedule.elect_leader(0, 0),
528 AuthorityIndex::new_for_test(0)
529 );
530 assert_eq!(
531 leader_schedule.elect_leader(1, 0),
532 AuthorityIndex::new_for_test(1)
533 );
534 assert_eq!(
535 leader_schedule.elect_leader(5, 0),
536 AuthorityIndex::new_for_test(1)
537 );
538 assert_ne!(
541 leader_schedule.elect_leader_stake_based(1, 1),
542 leader_schedule.elect_leader_stake_based(1, 2)
543 );
544 }
545
546 #[tokio::test]
547 async fn test_elect_leader_stake_based() {
548 let context = Arc::new(Context::new_for_test(4).0);
549 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
550
551 assert_eq!(
552 leader_schedule.elect_leader_stake_based(0, 0),
553 AuthorityIndex::new_for_test(1)
554 );
555 assert_eq!(
556 leader_schedule.elect_leader_stake_based(1, 0),
557 AuthorityIndex::new_for_test(1)
558 );
559 assert_eq!(
560 leader_schedule.elect_leader_stake_based(5, 0),
561 AuthorityIndex::new_for_test(3)
562 );
563 assert_ne!(
566 leader_schedule.elect_leader_stake_based(1, 1),
567 leader_schedule.elect_leader_stake_based(1, 2)
568 );
569 }
570
571 #[tokio::test]
572 async fn test_leader_schedule_from_store() {
573 telemetry_subscribers::init_for_testing();
574 let mut context = Context::new_for_test(4).0;
575 context
576 .protocol_config
577 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
578 let context = Arc::new(context);
579 let store = Arc::new(MemStore::new());
580
581 let mut dag_builder = DagBuilder::new(context.clone());
583 dag_builder.layers(1..=11).build();
584 let mut subdags = vec![];
585 let mut expected_commits = vec![];
586
587 let mut blocks_to_write = vec![];
588
589 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
590 for block in sub_dag.blocks.iter() {
591 blocks_to_write.push(block.clone());
592 }
593
594 expected_commits.push(commit);
595 subdags.push(sub_dag);
596 }
597
598 let commit_range = (1..=10).into();
601 let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
602 let committed_rounds = vec![9, 9, 10, 9];
603 let commit_ref = expected_commits[9].reference();
604 let commit_info = CommitInfo {
605 reputation_scores,
606 committed_rounds,
607 };
608
609 store
613 .write(
614 WriteBatch::default()
615 .commit_info(vec![(commit_ref, commit_info)])
616 .blocks(blocks_to_write)
617 .commits(expected_commits),
618 )
619 .unwrap();
620
621 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
622
623 assert_eq!(
625 dag_builder.last_committed_rounds.clone(),
626 dag_state.read().last_committed_rounds()
627 );
628 assert_eq!(1, dag_state.read().scoring_subdags_count());
629 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
630 let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
631 assert_eq!(recovered_scores, expected_scores);
632
633 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
634
635 let leader_swap_table = leader_schedule.leader_swap_table.read();
637 assert_eq!(leader_swap_table.good_nodes.len(), 1);
638 assert_eq!(
639 leader_swap_table.good_nodes[0].0,
640 AuthorityIndex::new_for_test(0)
641 );
642 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
643 assert!(
644 leader_swap_table
645 .bad_nodes
646 .contains_key(&AuthorityIndex::new_for_test(2)),
647 "{:?}",
648 leader_swap_table.bad_nodes
649 );
650 }
651
652 #[tokio::test]
653 async fn test_leader_schedule_from_store_no_commits() {
654 telemetry_subscribers::init_for_testing();
655 let mut context = Context::new_for_test(4).0;
656 context
657 .protocol_config
658 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
659 let context = Arc::new(context);
660 let store = Arc::new(MemStore::new());
661
662 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
663
664 let expected_last_committed_rounds = vec![0, 0, 0, 0];
665
666 assert_eq!(
668 expected_last_committed_rounds,
669 dag_state.read().last_committed_rounds()
670 );
671 assert_eq!(0, dag_state.read().scoring_subdags_count());
672
673 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
674
675 let leader_swap_table = leader_schedule.leader_swap_table.read();
677 assert_eq!(leader_swap_table.good_nodes.len(), 0);
678 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
679 }
680
681 #[tokio::test]
682 async fn test_leader_schedule_from_store_no_commit_info() {
683 telemetry_subscribers::init_for_testing();
684 let mut context = Context::new_for_test(4).0;
685 context
686 .protocol_config
687 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
688 let context = Arc::new(context);
689 let store = Arc::new(MemStore::new());
690
691 let mut dag_builder = DagBuilder::new(context.clone());
693 dag_builder.layers(1..=2).build();
694
695 let mut expected_scored_subdags = vec![];
696 let mut expected_commits = vec![];
697
698 let mut blocks_to_write = vec![];
699
700 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
701 for block in sub_dag.blocks.iter() {
702 blocks_to_write.push(block.clone());
703 }
704 expected_commits.push(commit);
705 expected_scored_subdags.push(sub_dag);
706 }
707
708 store
714 .write(
715 WriteBatch::default()
716 .blocks(blocks_to_write)
717 .commits(expected_commits),
718 )
719 .unwrap();
720
721 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
722
723 assert_eq!(
725 dag_builder.last_committed_rounds.clone(),
726 dag_state.read().last_committed_rounds()
727 );
728 assert_eq!(
729 expected_scored_subdags.len(),
730 dag_state.read().scoring_subdags_count()
731 );
732 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
733 let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
734 assert_eq!(recovered_scores, expected_scores);
735
736 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
737
738 let leader_swap_table = leader_schedule.leader_swap_table.read();
740 assert_eq!(leader_swap_table.good_nodes.len(), 0);
741 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
742 }
743
744 #[tokio::test]
745 async fn test_leader_schedule_commits_until_leader_schedule_update() {
746 telemetry_subscribers::init_for_testing();
747 let context = Arc::new(Context::new_for_test(4).0);
748 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
749
750 let dag_state = Arc::new(RwLock::new(DagState::new(
751 context.clone(),
752 Arc::new(MemStore::new()),
753 )));
754 let unscored_subdags = vec![CommittedSubDag::new(
755 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
756 vec![],
757 context.clock.timestamp_utc_ms(),
758 CommitRef::new(1, CommitDigest::MIN),
759 vec![],
760 )];
761 dag_state.write().add_scoring_subdags(unscored_subdags);
762
763 let commits_until_leader_schedule_update =
764 leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
765 assert_eq!(commits_until_leader_schedule_update, 299);
766 }
767
768 #[tokio::test]
769 async fn test_leader_schedule_update_leader_schedule() {
770 telemetry_subscribers::init_for_testing();
771 let mut context = Context::new_for_test(4).0;
772 context
773 .protocol_config
774 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
775 let context = Arc::new(context);
776 let leader_schedule = Arc::new(LeaderSchedule::new(
777 context.clone(),
778 LeaderSwapTable::default(),
779 ));
780 let dag_state = Arc::new(RwLock::new(DagState::new(
781 context.clone(),
782 Arc::new(MemStore::new()),
783 )));
784
785 let max_round: u32 = 4;
787 let num_authorities: u32 = 4;
788
789 let mut blocks = Vec::new();
790 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
791 .committee
792 .authorities()
793 .map(|index| {
794 let author_idx = index.0.value() as u32;
795 let block = TestBlock::new(0, author_idx).build();
796 VerifiedBlock::new_for_test(block)
797 })
798 .map(|block| (block.reference(), block))
799 .unzip();
800 blocks.extend(genesis);
801
802 let mut ancestors = genesis_references;
803 let mut leader = None;
804 for round in 1..=max_round {
805 let mut new_ancestors = vec![];
806 for author in 0..num_authorities {
807 let base_ts = round as BlockTimestampMs * 1000;
808 let block = VerifiedBlock::new_for_test(
809 TestBlock::new(round, author)
810 .set_timestamp_ms(base_ts + (author + round) as u64)
811 .set_ancestors(ancestors.clone())
812 .build(),
813 );
814 new_ancestors.push(block.reference());
815
816 if round == 3 && author == 0 {
819 tracing::info!("Skipping {block} in committed subdags blocks");
820 continue;
821 }
822
823 blocks.push(block.clone());
824
825 if round == max_round {
828 leader = Some(block.clone());
829 break;
830 }
831 }
832 ancestors = new_ancestors;
833 }
834
835 let leader_block = leader.unwrap();
836 let leader_ref = leader_block.reference();
837 let commit_index = 1;
838
839 let last_commit = TrustedCommit::new_for_test(
840 commit_index,
841 CommitDigest::MIN,
842 context.clock.timestamp_utc_ms(),
843 leader_ref,
844 blocks
845 .iter()
846 .map(|block| block.reference())
847 .collect::<Vec<_>>(),
848 );
849
850 let unscored_subdags = vec![CommittedSubDag::new(
851 leader_ref,
852 blocks,
853 context.clock.timestamp_utc_ms(),
854 last_commit.reference(),
855 vec![],
856 )];
857
858 let mut dag_state_write = dag_state.write();
859 dag_state_write.set_last_commit(last_commit);
860 dag_state_write.add_scoring_subdags(unscored_subdags);
861 drop(dag_state_write);
862
863 assert_eq!(
864 leader_schedule.elect_leader(4, 0),
865 AuthorityIndex::new_for_test(0)
866 );
867
868 leader_schedule.update_leader_schedule_v2(&dag_state);
869
870 let leader_swap_table = leader_schedule.leader_swap_table.read();
871 assert_eq!(leader_swap_table.good_nodes.len(), 1);
872 assert_eq!(
873 leader_swap_table.good_nodes[0].0,
874 AuthorityIndex::new_for_test(2)
875 );
876 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
877 assert!(
878 leader_swap_table
879 .bad_nodes
880 .contains_key(&AuthorityIndex::new_for_test(0))
881 );
882 assert_eq!(
883 leader_schedule.elect_leader(4, 0),
884 AuthorityIndex::new_for_test(2)
885 );
886 }
887
888 #[tokio::test]
889 async fn test_leader_schedule_from_store_with_vote_scoring() {
890 telemetry_subscribers::init_for_testing();
891 let mut context = Context::new_for_test(4).0;
892 context
893 .protocol_config
894 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
895 context
896 .protocol_config
897 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
898 let context = Arc::new(context);
899 let store = Arc::new(MemStore::new());
900 let mut dag_builder = DagBuilder::new(context.clone());
902 dag_builder.layers(1..=11).build();
903 let mut subdags = vec![];
904 let mut expected_commits = vec![];
905 let mut blocks_to_write = vec![];
906
907 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
908 for block in sub_dag.blocks.iter() {
909 blocks_to_write.push(block.clone());
910 }
911 expected_commits.push(commit);
912 subdags.push(sub_dag);
913 }
914 let commit_range = (1..=10).into();
917 let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
918 let committed_rounds = vec![9, 9, 10, 9];
919 let commit_ref = expected_commits[9].reference();
920 let commit_info = CommitInfo {
921 reputation_scores,
922 committed_rounds,
923 };
924 store
928 .write(
929 WriteBatch::default()
930 .commit_info(vec![(commit_ref, commit_info)])
931 .blocks(blocks_to_write)
932 .commits(expected_commits),
933 )
934 .unwrap();
935 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
936 assert_eq!(
938 dag_builder.last_committed_rounds.clone(),
939 dag_state.read().last_committed_rounds()
940 );
941 let actual_unscored_subdags = dag_state.read().unscored_committed_subdags();
942 assert_eq!(1, dag_state.read().unscored_committed_subdags_count());
943 let actual_subdag = actual_unscored_subdags[0].clone();
944 assert_eq!(*subdags.last().unwrap(), actual_subdag);
945 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
946 let leader_swap_table = leader_schedule.leader_swap_table.read();
948 assert_eq!(leader_swap_table.good_nodes.len(), 1);
949 assert_eq!(
950 leader_swap_table.good_nodes[0].0,
951 AuthorityIndex::new_for_test(0)
952 );
953 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
954 assert!(
955 leader_swap_table
956 .bad_nodes
957 .contains_key(&AuthorityIndex::new_for_test(2)),
958 "{:?}",
959 leader_swap_table.bad_nodes
960 );
961 }
962
963 #[tokio::test]
964 async fn test_leader_schedule_from_store_no_commits_with_vote_scoring() {
965 telemetry_subscribers::init_for_testing();
966 let mut context = Context::new_for_test(4).0;
967 context
968 .protocol_config
969 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
970 context
971 .protocol_config
972 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
973 let context = Arc::new(context);
974 let store = Arc::new(MemStore::new());
975 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
976 let expected_last_committed_rounds = vec![0, 0, 0, 0];
977 assert_eq!(
979 expected_last_committed_rounds,
980 dag_state.read().last_committed_rounds()
981 );
982 assert_eq!(0, dag_state.read().unscored_committed_subdags_count());
983 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
984 let leader_swap_table = leader_schedule.leader_swap_table.read();
986 assert_eq!(leader_swap_table.good_nodes.len(), 0);
987 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
988 }
989
990 #[tokio::test]
991 async fn test_leader_schedule_from_store_no_commit_info_with_vote_scoring() {
992 telemetry_subscribers::init_for_testing();
993 let mut context = Context::new_for_test(4).0;
994 context
995 .protocol_config
996 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
997 context
998 .protocol_config
999 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1000 let context = Arc::new(context);
1001 let store = Arc::new(MemStore::new());
1002 let mut dag_builder = DagBuilder::new(context.clone());
1004 dag_builder.layers(1..=2).build();
1005
1006 let mut expected_unscored_subdags = vec![];
1007 let mut expected_commits = vec![];
1008 let mut blocks_to_write = vec![];
1009
1010 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
1011 for block in sub_dag.blocks.iter() {
1012 blocks_to_write.push(block.clone());
1013 }
1014 expected_commits.push(commit);
1015 expected_unscored_subdags.push(sub_dag);
1016 }
1017 store
1023 .write(
1024 WriteBatch::default()
1025 .blocks(blocks_to_write)
1026 .commits(expected_commits),
1027 )
1028 .unwrap();
1029 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1030 assert_eq!(
1032 dag_builder.last_committed_rounds.clone(),
1033 dag_state.read().last_committed_rounds()
1034 );
1035 let actual_unscored_subdags = dag_state.read().unscored_committed_subdags();
1036 assert_eq!(
1037 expected_unscored_subdags.len() as u64,
1038 dag_state.read().unscored_committed_subdags_count()
1039 );
1040 for (idx, expected_subdag) in expected_unscored_subdags.into_iter().enumerate() {
1041 let actual_subdag = actual_unscored_subdags[idx].clone();
1042 assert_eq!(expected_subdag, actual_subdag);
1043 }
1044 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
1045 let leader_swap_table = leader_schedule.leader_swap_table.read();
1047 assert_eq!(leader_swap_table.good_nodes.len(), 0);
1048 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
1049 }
1050
1051 #[tokio::test]
1052 async fn test_leader_schedule_commits_until_leader_schedule_update_with_vote_scoring() {
1053 telemetry_subscribers::init_for_testing();
1054 let mut context = Context::new_for_test(4).0;
1055 context
1056 .protocol_config
1057 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1058 let context = Arc::new(context);
1059 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
1060 let dag_state = Arc::new(RwLock::new(DagState::new(
1061 context.clone(),
1062 Arc::new(MemStore::new()),
1063 )));
1064 let unscored_subdags = vec![CommittedSubDag::new(
1065 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
1066 vec![],
1067 context.clock.timestamp_utc_ms(),
1068 CommitRef::new(1, CommitDigest::MIN),
1069 vec![],
1070 )];
1071 dag_state
1072 .write()
1073 .add_unscored_committed_subdags(unscored_subdags);
1074 let commits_until_leader_schedule_update =
1075 leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
1076 assert_eq!(commits_until_leader_schedule_update, 299);
1077 }
1078
1079 #[tokio::test]
1081 async fn test_leader_schedule_update_leader_schedule_with_vote_scoring() {
1082 telemetry_subscribers::init_for_testing();
1083 let mut context = Context::new_for_test(4).0;
1084 context
1085 .protocol_config
1086 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1087 context
1088 .protocol_config
1089 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1090 let context = Arc::new(context);
1091 let leader_schedule = Arc::new(LeaderSchedule::new(
1092 context.clone(),
1093 LeaderSwapTable::default(),
1094 ));
1095 let dag_state = Arc::new(RwLock::new(DagState::new(
1096 context.clone(),
1097 Arc::new(MemStore::new()),
1098 )));
1099 let max_round: u32 = 4;
1101 let num_authorities: u32 = 4;
1102 let mut blocks = Vec::new();
1103 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
1104 .committee
1105 .authorities()
1106 .map(|index| {
1107 let author_idx = index.0.value() as u32;
1108 let block = TestBlock::new(0, author_idx).build();
1109 VerifiedBlock::new_for_test(block)
1110 })
1111 .map(|block| (block.reference(), block))
1112 .unzip();
1113 blocks.extend(genesis);
1114 let mut ancestors = genesis_references;
1115 let mut leader = None;
1116 for round in 1..=max_round {
1117 let mut new_ancestors = vec![];
1118 for author in 0..num_authorities {
1119 let base_ts = round as BlockTimestampMs * 1000;
1120 let block = VerifiedBlock::new_for_test(
1121 TestBlock::new(round, author)
1122 .set_timestamp_ms(base_ts + (author + round) as u64)
1123 .set_ancestors(ancestors.clone())
1124 .build(),
1125 );
1126 new_ancestors.push(block.reference());
1127 if round == 3 && author == 0 {
1130 tracing::info!("Skipping {block} in committed subdags blocks");
1131 continue;
1132 }
1133 blocks.push(block.clone());
1134 if round == max_round {
1137 leader = Some(block.clone());
1138 break;
1139 }
1140 }
1141 ancestors = new_ancestors;
1142 }
1143 let leader_block = leader.unwrap();
1144 let leader_ref = leader_block.reference();
1145 let commit_index = 1;
1146 let last_commit = TrustedCommit::new_for_test(
1147 commit_index,
1148 CommitDigest::MIN,
1149 context.clock.timestamp_utc_ms(),
1150 leader_ref,
1151 blocks
1152 .iter()
1153 .map(|block| block.reference())
1154 .collect::<Vec<_>>(),
1155 );
1156 let unscored_subdags = vec![CommittedSubDag::new(
1157 leader_ref,
1158 blocks,
1159 context.clock.timestamp_utc_ms(),
1160 last_commit.reference(),
1161 vec![],
1162 )];
1163 let mut dag_state_write = dag_state.write();
1164 dag_state_write.set_last_commit(last_commit);
1165 dag_state_write.add_unscored_committed_subdags(unscored_subdags);
1166 drop(dag_state_write);
1167 assert_eq!(
1168 leader_schedule.elect_leader(4, 0),
1169 AuthorityIndex::new_for_test(0)
1170 );
1171 leader_schedule.update_leader_schedule_v1(&dag_state);
1172 let leader_swap_table = leader_schedule.leader_swap_table.read();
1173 assert_eq!(leader_swap_table.good_nodes.len(), 1);
1174 assert_eq!(
1175 leader_swap_table.good_nodes[0].0,
1176 AuthorityIndex::new_for_test(2)
1177 );
1178 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
1179 assert!(
1180 leader_swap_table
1181 .bad_nodes
1182 .contains_key(&AuthorityIndex::new_for_test(0))
1183 );
1184 assert_eq!(
1185 leader_schedule.elect_leader(4, 0),
1186 AuthorityIndex::new_for_test(2)
1187 );
1188 }
1189
1190 #[tokio::test]
1191 async fn test_leader_swap_table() {
1192 telemetry_subscribers::init_for_testing();
1193 let context = Arc::new(Context::new_for_test(4).0);
1194
1195 let swap_stake_threshold = 33;
1196 let reputation_scores = ReputationScores::new(
1197 (0..=10).into(),
1198 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1199 );
1200 let leader_swap_table =
1201 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
1202
1203 assert_eq!(leader_swap_table.good_nodes.len(), 1);
1204 assert_eq!(
1205 leader_swap_table.good_nodes[0].0,
1206 AuthorityIndex::new_for_test(3)
1207 );
1208 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
1209 assert!(
1210 leader_swap_table
1211 .bad_nodes
1212 .contains_key(&AuthorityIndex::new_for_test(0))
1213 );
1214 }
1215
1216 #[tokio::test]
1217 async fn test_leader_swap_table_swap() {
1218 telemetry_subscribers::init_for_testing();
1219 let context = Arc::new(Context::new_for_test(4).0);
1220
1221 let swap_stake_threshold = 33;
1222 let reputation_scores = ReputationScores::new(
1223 (0..=10).into(),
1224 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1225 );
1226 let leader_swap_table =
1227 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1228
1229 let leader = AuthorityIndex::new_for_test(0);
1231 let leader_round = 1;
1232 let leader_offset = 0;
1233 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
1234 assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
1235
1236 let leader = AuthorityIndex::new_for_test(1);
1238 let leader_round = 1;
1239 let leader_offset = 0;
1240 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
1241 assert_eq!(swapped_leader, None);
1242 }
1243
1244 #[tokio::test]
1245 async fn test_leader_swap_table_retrieve_first_nodes() {
1246 telemetry_subscribers::init_for_testing();
1247 let context = Arc::new(Context::new_for_test(4).0);
1248
1249 let authorities = [
1250 (AuthorityIndex::new_for_test(0), 1),
1251 (AuthorityIndex::new_for_test(1), 2),
1252 (AuthorityIndex::new_for_test(2), 3),
1253 (AuthorityIndex::new_for_test(3), 4),
1254 ];
1255
1256 let stake_threshold = 50;
1257 let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
1258 context.clone(),
1259 authorities.iter(),
1260 stake_threshold,
1261 );
1262
1263 assert_eq!(filtered_authorities.len(), 2);
1266 let authority_0_idx = AuthorityIndex::new_for_test(0);
1267 let authority_0 = context.committee.authority(authority_0_idx);
1268 assert!(filtered_authorities.contains(&(
1269 authority_0_idx,
1270 authority_0.hostname.clone(),
1271 authority_0.stake
1272 )));
1273 let authority_1_idx = AuthorityIndex::new_for_test(1);
1274 let authority_1 = context.committee.authority(authority_1_idx);
1275 assert!(filtered_authorities.contains(&(
1276 authority_1_idx,
1277 authority_1.hostname.clone(),
1278 authority_1.stake
1279 )));
1280 }
1281
1282 #[tokio::test]
1283 #[should_panic(
1284 expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
1285 )]
1286 async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
1287 telemetry_subscribers::init_for_testing();
1288 let context = Arc::new(Context::new_for_test(4).0);
1289
1290 let swap_stake_threshold = 34;
1291 let reputation_scores = ReputationScores::new(
1292 (0..=10).into(),
1293 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1294 );
1295 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
1296 }
1297
1298 #[tokio::test]
1299 async fn test_update_leader_swap_table() {
1300 telemetry_subscribers::init_for_testing();
1301 let context = Arc::new(Context::new_for_test(4).0);
1302
1303 let swap_stake_threshold = 33;
1304 let reputation_scores = ReputationScores::new(
1305 (1..=10).into(),
1306 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1307 );
1308 let leader_swap_table =
1309 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1310
1311 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
1312
1313 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1315
1316 let reputation_scores = ReputationScores::new(
1317 (11..=20).into(),
1318 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1319 );
1320 let leader_swap_table =
1321 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1322
1323 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1325 }
1326
1327 #[tokio::test]
1328 #[should_panic(
1329 expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
1330 )]
1331 async fn test_update_bad_leader_swap_table() {
1332 telemetry_subscribers::init_for_testing();
1333 let context = Arc::new(Context::new_for_test(4).0);
1334
1335 let swap_stake_threshold = 33;
1336 let reputation_scores = ReputationScores::new(
1337 (1..=10).into(),
1338 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1339 );
1340 let leader_swap_table =
1341 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1342
1343 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
1344
1345 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1347
1348 let reputation_scores = ReputationScores::new(
1349 (11..=20).into(),
1350 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1351 );
1352 let leader_swap_table =
1353 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1354
1355 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1357
1358 let reputation_scores = ReputationScores::new(
1359 (21..=25).into(),
1360 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
1361 );
1362 let leader_swap_table =
1363 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
1364
1365 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
1367 }
1368}