1use std::{collections::HashSet, sync::Arc};
6
7use consensus_config::{AuthorityIndex, Stake};
8use itertools::Itertools;
9use parking_lot::RwLock;
10use tracing::instrument;
11
12use crate::{
13 Round,
14 block::{BlockAPI, BlockRef, BlockTimestampMs, VerifiedBlock},
15 commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
16 context::Context,
17 dag_state::DagState,
18 leader_schedule::LeaderSchedule,
19};
20
21pub(crate) trait BlockStoreAPI {
25 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
26
27 fn gc_round(&self) -> Round;
28
29 fn gc_enabled(&self) -> bool;
30
31 fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
32
33 fn is_committed(&self, block_ref: &BlockRef) -> bool;
34}
35
36impl BlockStoreAPI
37 for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
38{
39 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
40 DagState::get_blocks(self, refs)
41 }
42
43 fn gc_round(&self) -> Round {
44 DagState::gc_round(self)
45 }
46
47 fn gc_enabled(&self) -> bool {
48 DagState::gc_enabled(self)
49 }
50
51 fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
52 DagState::set_committed(self, block_ref)
53 }
54
55 fn is_committed(&self, block_ref: &BlockRef) -> bool {
56 DagState::is_committed(self, block_ref)
57 }
58}
59
60#[derive(Clone)]
62pub(crate) struct Linearizer {
63 context: Arc<Context>,
65 dag_state: Arc<RwLock<DagState>>,
66 leader_schedule: Arc<LeaderSchedule>,
67}
68
69impl Linearizer {
70 pub(crate) fn new(
71 context: Arc<Context>,
72 dag_state: Arc<RwLock<DagState>>,
73 leader_schedule: Arc<LeaderSchedule>,
74 ) -> Self {
75 Self {
76 dag_state,
77 leader_schedule,
78 context,
79 }
80 }
81
82 fn collect_sub_dag_and_commit(
86 &mut self,
87 leader_block: VerifiedBlock,
88 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
89 ) -> (CommittedSubDag, TrustedCommit) {
90 let _s = self
91 .context
92 .metrics
93 .node_metrics
94 .scope_processing_time
95 .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
96 .start_timer();
97 let mut dag_state = self.dag_state.write();
99 let last_commit_index = dag_state.last_commit_index();
100 let last_commit_digest = dag_state.last_commit_digest();
101 let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
102 let last_committed_rounds = dag_state.last_committed_rounds();
103
104 let to_commit = Self::linearize_sub_dag(
106 &self.context,
107 leader_block.clone(),
108 last_committed_rounds,
109 &mut dag_state,
110 );
111
112 let timestamp_ms = Self::calculate_commit_timestamp(
113 &self.context,
114 &mut dag_state,
115 &leader_block,
116 last_commit_timestamp_ms,
117 );
118
119 drop(dag_state);
120
121 let commit = Commit::new(
123 last_commit_index + 1,
124 last_commit_digest,
125 timestamp_ms,
126 leader_block.reference(),
127 to_commit
128 .iter()
129 .map(|block| block.reference())
130 .collect::<Vec<_>>(),
131 );
132 let serialized = commit
133 .serialize()
134 .unwrap_or_else(|e| panic!("Failed to serialize commit: {e}"));
135 let commit = TrustedCommit::new_trusted(commit, serialized);
136
137 let sub_dag = CommittedSubDag::new(
139 leader_block.reference(),
140 to_commit,
141 timestamp_ms,
142 commit.reference(),
143 reputation_scores_desc,
144 );
145
146 (sub_dag, commit)
147 }
148
149 pub(crate) fn calculate_commit_timestamp(
156 context: &Context,
157 dag_state: &mut impl BlockStoreAPI,
158 leader_block: &VerifiedBlock,
159 last_commit_timestamp_ms: BlockTimestampMs,
160 ) -> BlockTimestampMs {
161 let timestamp_ms = if context
162 .protocol_config
163 .consensus_median_timestamp_with_checkpoint_enforcement()
164 {
165 let block_refs = leader_block
167 .ancestors()
168 .iter()
169 .filter(|block_ref| block_ref.round == leader_block.round() - 1)
170 .cloned()
171 .collect::<Vec<_>>();
172 let blocks = dag_state
174 .get_blocks(&block_refs)
175 .into_iter()
176 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
177 median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
178 panic!(
179 "Cannot compute median timestamp for leader block {leader_block:?} ancestors: {e}"
180 )
181 })
182 } else {
183 leader_block.timestamp_ms()
184 };
185
186 timestamp_ms.max(last_commit_timestamp_ms)
189 }
190
191 pub(crate) fn linearize_sub_dag(
192 context: &Context,
193 leader_block: VerifiedBlock,
194 last_committed_rounds: Vec<u32>,
195 dag_state: &mut impl BlockStoreAPI,
196 ) -> Vec<VerifiedBlock> {
197 let gc_enabled = dag_state.gc_enabled();
198 let gc_round: Round = dag_state.gc_round();
205 let leader_block_ref = leader_block.reference();
206 let mut buffer = vec![leader_block];
207
208 let mut to_commit = Vec::new();
209
210 if context.protocol_config.consensus_linearize_subdag_v2() {
215 assert!(
216 dag_state.set_committed(&leader_block_ref),
217 "Leader block with reference {leader_block_ref:?} attempted to be committed twice"
218 );
219
220 while let Some(x) = buffer.pop() {
221 to_commit.push(x.clone());
222
223 let ancestors: Vec<VerifiedBlock> = dag_state
224 .get_blocks(
225 &x.ancestors()
226 .iter()
227 .copied()
228 .filter(|ancestor| {
229 ancestor.round > gc_round && !dag_state.is_committed(ancestor)
230 })
231 .collect::<Vec<_>>(),
232 )
233 .into_iter()
234 .map(|ancestor_opt| {
235 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
236 })
237 .collect();
238
239 for ancestor in ancestors {
240 buffer.push(ancestor.clone());
241 assert!(
242 dag_state.set_committed(&ancestor.reference()),
243 "Block with reference {:?} attempted to be committed twice",
244 ancestor.reference()
245 );
246 }
247 }
248 } else {
249 let mut committed = HashSet::new();
250 assert!(committed.insert(leader_block_ref));
251
252 while let Some(x) = buffer.pop() {
253 to_commit.push(x.clone());
254
255 let ancestors: Vec<VerifiedBlock> = dag_state
256 .get_blocks(
257 &x.ancestors()
258 .iter()
259 .copied()
260 .filter(|ancestor| {
261 !committed.contains(ancestor)
268 && last_committed_rounds[ancestor.author] < ancestor.round
269 })
270 .filter(|ancestor| {
271 !gc_enabled || ancestor.round > gc_round
276 })
277 .collect::<Vec<_>>(),
278 )
279 .into_iter()
280 .map(|ancestor_opt| {
281 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
282 })
283 .collect();
284
285 for ancestor in ancestors {
286 buffer.push(ancestor.clone());
287 assert!(committed.insert(ancestor.reference()));
288 }
289 }
290 }
291
292 if gc_enabled {
296 assert!(
297 to_commit.iter().all(|block| block.round() > gc_round),
298 "No blocks <= {gc_round} should be committed. Leader round {leader_block_ref}, blocks {to_commit:?}."
299 );
300 }
301
302 sort_sub_dag_blocks(&mut to_commit);
304
305 to_commit
306 }
307
308 #[instrument(level = "trace", skip_all)]
312 pub(crate) fn handle_commit(
313 &mut self,
314 committed_leaders: Vec<VerifiedBlock>,
315 ) -> Vec<CommittedSubDag> {
316 if committed_leaders.is_empty() {
317 return vec![];
318 }
319
320 let schedule_updated = self
323 .leader_schedule
324 .leader_schedule_updated(&self.dag_state);
325
326 let mut committed_sub_dags = vec![];
327 for (i, leader_block) in committed_leaders.into_iter().enumerate() {
328 let reputation_scores_desc = if schedule_updated && i == 0 {
329 self.leader_schedule
330 .leader_swap_table
331 .read()
332 .reputation_scores_desc
333 .clone()
334 } else {
335 vec![]
336 };
337
338 let (sub_dag, commit) =
341 self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);
342
343 self.update_blocks_pruned_metric(&sub_dag);
344
345 self.dag_state.write().add_commit(commit.clone());
348
349 committed_sub_dags.push(sub_dag);
350 }
351
352 self.dag_state.write().flush();
359
360 committed_sub_dags
361 }
362
363 fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
371 let (last_committed_rounds, gc_round) = {
372 let dag_state = self.dag_state.read();
373 (dag_state.last_committed_rounds(), dag_state.gc_round())
374 };
375
376 for block_ref in sub_dag
377 .blocks
378 .iter()
379 .flat_map(|block| block.ancestors())
380 .filter(
381 |ancestor_ref| {
382 ancestor_ref.round <= gc_round
383 && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
384 }, )
388 .unique()
389 {
390 let hostname = &self.context.committee.authority(block_ref.author).hostname;
391
392 let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
395 &[hostname, "uncommitted"]
396 } else {
397 &[hostname, "higher_committed"]
401 };
402
403 self.context
404 .metrics
405 .node_metrics
406 .blocks_pruned_on_commit
407 .with_label_values(label_values)
408 .inc();
409 }
410 }
411}
412
413pub(crate) fn median_timestamp_by_stake(
418 context: &Context,
419 blocks: impl Iterator<Item = VerifiedBlock>,
420) -> Result<BlockTimestampMs, String> {
421 let mut total_stake = 0;
422 let mut timestamps = vec![];
423 for block in blocks {
424 let stake = context.committee.authority(block.author()).stake;
425 timestamps.push((block.timestamp_ms(), stake));
426 total_stake += stake;
427 }
428
429 if timestamps.is_empty() {
430 return Err("No blocks provided".to_string());
431 }
432 if total_stake < context.committee.quorum_threshold() {
433 return Err(format!(
434 "Total stake {} < quorum threshold {}",
435 total_stake,
436 context.committee.quorum_threshold()
437 )
438 .to_string());
439 }
440
441 Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
442}
443
444fn median_timestamps_by_stake_inner(
445 mut timestamps: Vec<(BlockTimestampMs, Stake)>,
446 total_stake: Stake,
447) -> BlockTimestampMs {
448 timestamps.sort_by_key(|(ts, _)| *ts);
449
450 let mut cumulative_stake = 0;
451 for (ts, stake) in ×tamps {
452 cumulative_stake += stake;
453 if cumulative_stake > total_stake / 2 {
454 return *ts;
455 }
456 }
457
458 timestamps.last().unwrap().0
459}
460
461#[cfg(test)]
462mod tests {
463 use rstest::rstest;
464
465 use super::*;
466 use crate::{
467 CommitIndex, TestBlock,
468 commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
469 context::Context,
470 leader_schedule::{LeaderSchedule, LeaderSwapTable},
471 storage::mem_store::MemStore,
472 test_dag_builder::DagBuilder,
473 test_dag_parser::parse_dag,
474 };
475
476 #[rstest]
477 #[tokio::test]
478 async fn test_handle_commit(#[values(true, false)] consensus_median_timestamp: bool) {
479 telemetry_subscribers::init_for_testing();
480 let num_authorities = 4;
481 let (mut context, _keys) = Context::new_for_test(num_authorities);
482 context
483 .protocol_config
484 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
485 consensus_median_timestamp,
486 );
487
488 let context = Arc::new(context);
489
490 let dag_state = Arc::new(RwLock::new(DagState::new(
491 context.clone(),
492 Arc::new(MemStore::new()),
493 )));
494 let leader_schedule = Arc::new(LeaderSchedule::new(
495 context.clone(),
496 LeaderSwapTable::default(),
497 ));
498 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
499
500 let num_rounds: u32 = 10;
502 let mut dag_builder = DagBuilder::new(context.clone());
503 dag_builder
504 .layers(1..=num_rounds)
505 .build()
506 .persist_layers(dag_state.clone());
507
508 let leaders = dag_builder
509 .leader_blocks(1..=num_rounds)
510 .into_iter()
511 .map(Option::unwrap)
512 .collect::<Vec<_>>();
513
514 let commits = linearizer.handle_commit(leaders.clone());
515 for (idx, subdag) in commits.into_iter().enumerate() {
516 tracing::info!("{subdag:?}");
517 assert_eq!(subdag.leader, leaders[idx].reference());
518
519 let expected_ts = if consensus_median_timestamp {
520 let block_refs = leaders[idx]
521 .ancestors()
522 .iter()
523 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
524 .cloned()
525 .collect::<Vec<_>>();
526 let blocks = dag_state
527 .read()
528 .get_blocks(&block_refs)
529 .into_iter()
530 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
531
532 median_timestamp_by_stake(&context, blocks).unwrap()
533 } else {
534 leaders[idx].timestamp_ms()
535 };
536 assert_eq!(subdag.timestamp_ms, expected_ts);
537
538 if idx == 0 {
539 assert_eq!(subdag.blocks.len(), 1);
541 } else {
542 assert_eq!(subdag.blocks.len(), num_authorities);
545 }
546 for block in subdag.blocks.iter() {
547 assert!(block.round() <= leaders[idx].round());
548 }
549 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
550 }
551 }
552
553 #[tokio::test]
554 async fn test_handle_commit_with_schedule_update() {
555 telemetry_subscribers::init_for_testing();
556 let num_authorities = 4;
557 let context = Arc::new(Context::new_for_test(num_authorities).0);
558 let dag_state = Arc::new(RwLock::new(DagState::new(
559 context.clone(),
560 Arc::new(MemStore::new()),
561 )));
562 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
563 let leader_schedule = Arc::new(
564 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
565 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
566 );
567 let mut linearizer =
568 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
569
570 let num_rounds: u32 = 20;
572 let mut dag_builder = DagBuilder::new(context.clone());
573 dag_builder
574 .layers(1..=num_rounds)
575 .build()
576 .persist_layers(dag_state.clone());
577
578 let leaders = dag_builder
580 .leader_blocks(1..=10)
581 .into_iter()
582 .map(Option::unwrap)
583 .collect::<Vec<_>>();
584
585 let commits = linearizer.handle_commit(leaders.clone());
587
588 dag_state.write().add_scoring_subdags(commits);
590 leader_schedule.update_leader_schedule_v2(&dag_state);
592 assert!(
593 leader_schedule.leader_schedule_updated(&dag_state),
594 "Leader schedule should have been updated"
595 );
596
597 let leaders = dag_builder
599 .leader_blocks(11..=20)
600 .into_iter()
601 .map(Option::unwrap)
602 .collect::<Vec<_>>();
603
604 let commits = linearizer.handle_commit(leaders.clone());
607 assert_eq!(commits.len(), 10);
608 let scores = vec![
609 (AuthorityIndex::new_for_test(1), 29),
610 (AuthorityIndex::new_for_test(0), 29),
611 (AuthorityIndex::new_for_test(3), 29),
612 (AuthorityIndex::new_for_test(2), 29),
613 ];
614 assert_eq!(commits[0].reputation_scores_desc, scores);
615 for commit in commits.into_iter().skip(1) {
616 assert_eq!(commit.reputation_scores_desc, vec![]);
617 }
618 }
619
620 #[tokio::test]
622 async fn test_handle_commit_with_schedule_update_with_unscored_subdags() {
623 telemetry_subscribers::init_for_testing();
624 let num_authorities = 4;
625 let context = Arc::new(Context::new_for_test(num_authorities).0);
626 let dag_state = Arc::new(RwLock::new(DagState::new(
627 context.clone(),
628 Arc::new(MemStore::new()),
629 )));
630 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
631 let leader_schedule = Arc::new(
632 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
633 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
634 );
635 let mut linearizer =
636 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
637
638 let num_rounds: u32 = 20;
640 let mut dag_builder = DagBuilder::new(context.clone());
641 dag_builder
642 .layers(1..=num_rounds)
643 .build()
644 .persist_layers(dag_state.clone());
645
646 let leaders = dag_builder
648 .leader_blocks(1..=10)
649 .into_iter()
650 .map(Option::unwrap)
651 .collect::<Vec<_>>();
652
653 let commits = linearizer.handle_commit(leaders.clone());
655
656 dag_state.write().add_unscored_committed_subdags(commits);
658
659 leader_schedule.update_leader_schedule_v1(&dag_state);
661
662 assert!(
663 leader_schedule.leader_schedule_updated(&dag_state),
664 "Leader schedule should have been updated"
665 );
666
667 let leaders = dag_builder
669 .leader_blocks(11..=20)
670 .into_iter()
671 .map(Option::unwrap)
672 .collect::<Vec<_>>();
673
674 let commits = linearizer.handle_commit(leaders.clone());
677 assert_eq!(commits.len(), 10);
678 let scores = vec![
679 (AuthorityIndex::new_for_test(2), 9),
680 (AuthorityIndex::new_for_test(1), 8),
681 (AuthorityIndex::new_for_test(0), 8),
682 (AuthorityIndex::new_for_test(3), 8),
683 ];
684 assert_eq!(commits[0].reputation_scores_desc, scores);
685
686 for commit in commits.into_iter().skip(1) {
687 assert_eq!(commit.reputation_scores_desc, vec![]);
688 }
689 }
690
691 #[rstest]
692 #[tokio::test]
693 async fn test_handle_already_committed(
694 #[values(true, false)] consensus_median_timestamp: bool,
695 ) {
696 telemetry_subscribers::init_for_testing();
697 let num_authorities = 4;
698 let (mut context, _) = Context::new_for_test(num_authorities);
699 context
700 .protocol_config
701 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
702 consensus_median_timestamp,
703 );
704
705 let context = Arc::new(context);
706
707 let dag_state = Arc::new(RwLock::new(DagState::new(
708 context.clone(),
709 Arc::new(MemStore::new()),
710 )));
711 let leader_schedule = Arc::new(LeaderSchedule::new(
712 context.clone(),
713 LeaderSwapTable::default(),
714 ));
715 let mut linearizer =
716 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
717 let wave_length = DEFAULT_WAVE_LENGTH;
718
719 let leader_round_wave_1 = 3;
720 let leader_round_wave_2 = leader_round_wave_1 + wave_length;
721
722 let mut dag_builder = DagBuilder::new(context.clone());
724 dag_builder.layers(1..=leader_round_wave_2).build();
725
726 let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
730 blocks.push(
731 dag_builder
732 .leader_block(leader_round_wave_1)
733 .expect("Leader block should have been found"),
734 );
735 dag_state.write().accept_blocks(blocks.clone());
736
737 let first_leader = dag_builder
738 .leader_block(leader_round_wave_1)
739 .expect("Wave 1 leader round block should exist");
740 let mut last_commit_index = 1;
741 let first_commit_data = TrustedCommit::new_for_test(
742 last_commit_index,
743 CommitDigest::MIN,
744 0,
745 first_leader.reference(),
746 blocks.iter().map(|block| block.reference()).collect(),
747 );
748 dag_state.write().add_commit(first_commit_data);
749
750 for block in blocks.iter() {
753 dag_state.write().set_committed(&block.reference());
754 }
755
756 let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
759 blocks.retain(|block| {
761 !(block.round() == leader_round_wave_1
762 && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
763 });
764 blocks.push(
766 dag_builder
767 .leader_block(leader_round_wave_2)
768 .expect("Leader block should have been found"),
769 );
770 dag_state.write().accept_blocks(blocks.clone());
772
773 let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
774
775 let leader = dag_builder
777 .leader_block(leader_round_wave_2)
778 .expect("Leader block should exist");
779
780 last_commit_index += 1;
781 let expected_second_commit = TrustedCommit::new_for_test(
782 last_commit_index,
783 CommitDigest::MIN,
784 0,
785 leader.reference(),
786 blocks.clone(),
787 );
788
789 let commit = linearizer.handle_commit(vec![leader.clone()]);
790 assert_eq!(commit.len(), 1);
791
792 let subdag = &commit[0];
793 tracing::info!("{subdag:?}");
794 assert_eq!(subdag.leader, leader.reference());
795 assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
796
797 let expected_ts = if consensus_median_timestamp {
798 median_timestamp_by_stake(
799 &context,
800 subdag.blocks.iter().filter_map(|block| {
801 if block.round() == subdag.leader.round - 1 {
802 Some(block.clone())
803 } else {
804 None
805 }
806 }),
807 )
808 .unwrap()
809 } else {
810 leader.timestamp_ms()
811 };
812 assert_eq!(subdag.timestamp_ms, expected_ts);
813
814 blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
816 assert_eq!(
817 subdag
818 .blocks
819 .clone()
820 .into_iter()
821 .map(|b| b.reference())
822 .collect::<Vec<_>>(),
823 blocks
824 );
825 for block in subdag.blocks.iter() {
826 assert!(block.round() <= expected_second_commit.leader().round);
827 }
828 }
829
830 #[rstest]
834 #[case(0, false)]
835 #[case(3, false)]
836 #[case(3, true)]
837 #[tokio::test]
838 async fn test_handle_commit_with_gc_simple(
839 #[case] gc_depth: u32,
840 #[case] consensus_median_timestamp: bool,
841 ) {
842 telemetry_subscribers::init_for_testing();
843
844 let num_authorities = 4;
845 let (mut context, _keys) = Context::new_for_test(num_authorities);
846 context.protocol_config.set_gc_depth_for_testing(gc_depth);
847 context
848 .protocol_config
849 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
850 consensus_median_timestamp,
851 );
852 if gc_depth == 0 {
853 context
854 .protocol_config
855 .set_consensus_linearize_subdag_v2_for_testing(false);
856 }
857
858 let context = Arc::new(context);
859 let dag_state = Arc::new(RwLock::new(DagState::new(
860 context.clone(),
861 Arc::new(MemStore::new()),
862 )));
863 let leader_schedule = Arc::new(LeaderSchedule::new(
864 context.clone(),
865 LeaderSwapTable::default(),
866 ));
867 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
868
869 let dag_str = "DAG {
878 Round 0 : { 4 },
879 Round 1 : { * },
880 Round 2 : {
881 A -> [-D1],
882 B -> [-D1],
883 C -> [-D1],
884 D -> [*],
885 },
886 Round 3 : {
887 A -> [-D2],
888 B -> [-D2],
889 C -> [-D2],
890 },
891 Round 4 : {
892 A -> [-D3],
893 B -> [-D3],
894 C -> [-D3],
895 D -> [A3, B3, C3, D2],
896 },
897 Round 5 : { * },
898 }";
899
900 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
901 dag_builder.print();
902 dag_builder.persist_all_blocks(dag_state.clone());
903
904 let leaders = dag_builder
905 .leader_blocks(1..=6)
906 .into_iter()
907 .flatten()
908 .collect::<Vec<_>>();
909
910 let commits = linearizer.handle_commit(leaders.clone());
911 for (idx, subdag) in commits.into_iter().enumerate() {
912 tracing::info!("{subdag:?}");
913 assert_eq!(subdag.leader, leaders[idx].reference());
914
915 let expected_ts = if consensus_median_timestamp {
916 let block_refs = leaders[idx]
917 .ancestors()
918 .iter()
919 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
920 .cloned()
921 .collect::<Vec<_>>();
922 let blocks = dag_state
923 .read()
924 .get_blocks(&block_refs)
925 .into_iter()
926 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
927
928 median_timestamp_by_stake(&context, blocks).unwrap()
929 } else {
930 leaders[idx].timestamp_ms()
931 };
932 assert_eq!(subdag.timestamp_ms, expected_ts);
933
934 if idx == 0 {
935 assert_eq!(subdag.blocks.len(), 1);
937 } else if idx == 1 {
938 assert_eq!(subdag.blocks.len(), 3);
939 } else if idx == 2 {
940 assert_eq!(subdag.blocks.len(), 6);
947 } else {
948 if gc_depth > 0 {
950 assert_eq!(subdag.blocks.len(), 5);
957
958 assert!(
959 subdag.blocks.iter().all(|block| block.round() >= 2),
960 "Found blocks that are of round < 2."
961 );
962
963 assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
965 } else {
966 assert_eq!(subdag.blocks.len(), 6);
968 assert!(
969 subdag.blocks.iter().all(|block| block.round() >= 1),
970 "Found blocks that are of round < 1."
971 );
972
973 assert_eq!(dag_state.read().gc_round(), 0);
975 }
976 }
977 for block in subdag.blocks.iter() {
978 assert!(block.round() <= leaders[idx].round());
979 }
980 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
981 }
982 }
983
984 #[rstest]
985 #[case(3, false)]
986 #[case(3, true)]
987 #[tokio::test]
988 async fn test_handle_commit_below_highest_committed_round(
989 #[case] gc_depth: u32,
990 #[case] consensus_median_timestamp: bool,
991 ) {
992 telemetry_subscribers::init_for_testing();
993
994 let num_authorities = 4;
995 let (mut context, _keys) = Context::new_for_test(num_authorities);
996 context
997 .protocol_config
998 .set_consensus_gc_depth_for_testing(gc_depth);
999 context
1000 .protocol_config
1001 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1002 consensus_median_timestamp,
1003 );
1004 context
1005 .protocol_config
1006 .set_consensus_linearize_subdag_v2_for_testing(true);
1007
1008 let context = Arc::new(context);
1009 let dag_state = Arc::new(RwLock::new(DagState::new(
1010 context.clone(),
1011 Arc::new(MemStore::new()),
1012 )));
1013 let leader_schedule = Arc::new(LeaderSchedule::new(
1014 context.clone(),
1015 LeaderSwapTable::default(),
1016 ));
1017 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
1018
1019 let dag_str = "DAG {
1026 Round 0 : { 4 },
1027 Round 1 : { * },
1028 Round 2 : {
1029 A -> [-D1],
1030 B -> [-D1],
1031 C -> [-D1],
1032 D -> [-D1],
1033 },
1034 Round 3 : {
1035 A -> [A2, B2, C2, D1],
1036 B -> [A2, B2, C2, D1],
1037 C -> [A2, B2, C2, D1],
1038 D -> [A2, B2, C2, D2]
1039 },
1040 Round 4 : { * },
1041 }";
1042
1043 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1044 dag_builder.print();
1045 dag_builder.persist_all_blocks(dag_state.clone());
1046
1047 let leaders = dag_builder
1048 .leader_blocks(1..=4)
1049 .into_iter()
1050 .flatten()
1051 .collect::<Vec<_>>();
1052
1053 let commits = linearizer.handle_commit(leaders.clone());
1054 for (idx, subdag) in commits.into_iter().enumerate() {
1055 tracing::info!("{subdag:?}");
1056 assert_eq!(subdag.leader, leaders[idx].reference());
1057
1058 let expected_ts = if consensus_median_timestamp {
1059 let block_refs = leaders[idx]
1060 .ancestors()
1061 .iter()
1062 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
1063 .cloned()
1064 .collect::<Vec<_>>();
1065 let blocks = dag_state
1066 .read()
1067 .get_blocks(&block_refs)
1068 .into_iter()
1069 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
1070
1071 median_timestamp_by_stake(&context, blocks).unwrap()
1072 } else {
1073 leaders[idx].timestamp_ms()
1074 };
1075 assert_eq!(subdag.timestamp_ms, expected_ts);
1076
1077 if idx == 0 {
1078 assert_eq!(subdag.blocks.len(), 1);
1080 } else if idx == 1 {
1081 assert_eq!(subdag.blocks.len(), 3);
1085 } else if idx == 2 {
1086 assert_eq!(subdag.blocks.len(), 4);
1090
1091 assert!(
1092 subdag.blocks.iter().any(|block| block.round() == 2
1093 && block.author() == AuthorityIndex::new_for_test(3)),
1094 "Block D2 should have been committed."
1095 );
1096 } else if idx == 3 {
1097 assert_eq!(subdag.blocks.len(), 5);
1102 assert!(
1103 subdag.blocks.iter().any(|block| block.round() == 1
1104 && block.author() == AuthorityIndex::new_for_test(3)),
1105 "Block D1 should have been committed."
1106 );
1107 } else {
1108 panic!("Unexpected subdag with index {idx:?}");
1109 }
1110
1111 for block in subdag.blocks.iter() {
1112 assert!(block.round() <= leaders[idx].round());
1113 }
1114 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
1115 }
1116 }
1117
1118 #[rstest]
1119 #[case(false, 5_000, 5_000, 6_000)]
1120 #[case(true, 3_000, 3_000, 6_000)]
1121 #[tokio::test]
1122 async fn test_calculate_commit_timestamp(
1123 #[case] consensus_median_timestamp: bool,
1124 #[case] timestamp_1: u64,
1125 #[case] timestamp_2: u64,
1126 #[case] timestamp_3: u64,
1127 ) {
1128 telemetry_subscribers::init_for_testing();
1130
1131 let num_authorities = 4;
1132 let (mut context, _keys) = Context::new_for_test(num_authorities);
1133
1134 context
1135 .protocol_config
1136 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1137 consensus_median_timestamp,
1138 );
1139
1140 let context = Arc::new(context);
1141 let store = Arc::new(MemStore::new());
1142 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1143 let mut dag_state = dag_state.write();
1144
1145 let ancestors = vec![
1146 VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
1147 VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
1148 VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
1149 VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
1150 ];
1151
1152 let leader_block = VerifiedBlock::new_for_test(
1153 TestBlock::new(5, 0)
1154 .set_timestamp_ms(5_000)
1155 .set_ancestors(
1156 ancestors
1157 .iter()
1158 .map(|block| block.reference())
1159 .collect::<Vec<_>>(),
1160 )
1161 .build(),
1162 );
1163
1164 for block in &ancestors {
1165 dag_state.accept_block(block.clone());
1166 }
1167
1168 let last_commit_timestamp_ms = 0;
1169
1170 let timestamp = Linearizer::calculate_commit_timestamp(
1172 &context,
1173 &mut dag_state,
1174 &leader_block,
1175 last_commit_timestamp_ms,
1176 );
1177 assert_eq!(timestamp, timestamp_1);
1178
1179 let leader_block = VerifiedBlock::new_for_test(
1181 TestBlock::new(5, 0)
1182 .set_timestamp_ms(5_000)
1183 .set_ancestors(
1184 ancestors
1185 .iter()
1186 .skip(1)
1187 .map(|block| block.reference())
1188 .collect::<Vec<_>>(),
1189 )
1190 .build(),
1191 );
1192
1193 let timestamp = Linearizer::calculate_commit_timestamp(
1194 &context,
1195 &mut dag_state,
1196 &leader_block,
1197 last_commit_timestamp_ms,
1198 );
1199 assert_eq!(timestamp, timestamp_2);
1200
1201 let last_commit_timestamp_ms = 6_000;
1203 let timestamp = Linearizer::calculate_commit_timestamp(
1204 &context,
1205 &mut dag_state,
1206 &leader_block,
1207 last_commit_timestamp_ms,
1208 );
1209 assert_eq!(timestamp, timestamp_3);
1210
1211 let (mut context, _) = Context::new_for_test(1);
1213 context
1214 .protocol_config
1215 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1216 consensus_median_timestamp,
1217 );
1218 let leader_block = VerifiedBlock::new_for_test(
1219 TestBlock::new(5, 0)
1220 .set_timestamp_ms(5_000)
1221 .set_ancestors(
1222 ancestors
1223 .iter()
1224 .take(1)
1225 .map(|block| block.reference())
1226 .collect::<Vec<_>>(),
1227 )
1228 .build(),
1229 );
1230 let last_commit_timestamp_ms = 0;
1231 let timestamp = Linearizer::calculate_commit_timestamp(
1232 &context,
1233 &mut dag_state,
1234 &leader_block,
1235 last_commit_timestamp_ms,
1236 );
1237 if consensus_median_timestamp {
1238 assert_eq!(timestamp, 1_000);
1239 } else {
1240 assert_eq!(timestamp, leader_block.timestamp_ms());
1241 }
1242 }
1243
1244 #[test]
1245 fn test_median_timestamps_by_stake() {
1246 let timestamps = vec![(1_000, 1)];
1248 assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
1249
1250 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
1252 assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
1253
1254 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
1256 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
1257
1258 let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
1260 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
1261
1262 let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
1264 assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
1265
1266 let timestamps = vec![
1268 (500, 2),
1269 (4_000, 2),
1270 (2_500, 3),
1271 (1_000, 5),
1272 (3_000, 3),
1273 (2_000, 4),
1274 ];
1275 assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
1276
1277 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
1279 assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
1280 }
1281
1282 #[tokio::test]
1283 async fn test_median_timestamps_by_stake_errors() {
1284 let num_authorities = 4;
1285 let (mut context, _keys) = Context::new_for_test(num_authorities);
1286 context
1287 .protocol_config
1288 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(true);
1289
1290 let context = Arc::new(context);
1291
1292 let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
1294 assert_eq!(err, "No blocks provided");
1295
1296 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
1298 let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
1299 assert_eq!(err, "Total stake 1 < quorum threshold 3");
1300 }
1301}