1use std::{collections::HashSet, sync::Arc};
6
7use consensus_config::AuthorityIndex;
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12 Round,
13 block::{BlockAPI, BlockRef, VerifiedBlock},
14 commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
15 context::Context,
16 dag_state::DagState,
17 leader_schedule::LeaderSchedule,
18};
19
20pub(crate) trait BlockStoreAPI {
24 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
25
26 fn gc_round(&self) -> Round;
27
28 fn gc_enabled(&self) -> bool;
29
30 fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
31
32 fn is_committed(&self, block_ref: &BlockRef) -> bool;
33}
34
35impl BlockStoreAPI
36 for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
37{
38 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
39 DagState::get_blocks(self, refs)
40 }
41
42 fn gc_round(&self) -> Round {
43 DagState::gc_round(self)
44 }
45
46 fn gc_enabled(&self) -> bool {
47 DagState::gc_enabled(self)
48 }
49
50 fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
51 DagState::set_committed(self, block_ref)
52 }
53
54 fn is_committed(&self, block_ref: &BlockRef) -> bool {
55 DagState::is_committed(self, block_ref)
56 }
57}
58
59#[derive(Clone)]
61pub(crate) struct Linearizer {
62 context: Arc<Context>,
64 dag_state: Arc<RwLock<DagState>>,
65 leader_schedule: Arc<LeaderSchedule>,
66}
67
68impl Linearizer {
69 pub(crate) fn new(
70 context: Arc<Context>,
71 dag_state: Arc<RwLock<DagState>>,
72 leader_schedule: Arc<LeaderSchedule>,
73 ) -> Self {
74 Self {
75 dag_state,
76 leader_schedule,
77 context,
78 }
79 }
80
81 fn collect_sub_dag_and_commit(
85 &mut self,
86 leader_block: VerifiedBlock,
87 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
88 ) -> (CommittedSubDag, TrustedCommit) {
89 let _s = self
90 .context
91 .metrics
92 .node_metrics
93 .scope_processing_time
94 .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
95 .start_timer();
96 let mut dag_state = self.dag_state.write();
98 let last_commit_index = dag_state.last_commit_index();
99 let last_commit_digest = dag_state.last_commit_digest();
100 let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
101 let last_committed_rounds = dag_state.last_committed_rounds();
102 let timestamp_ms = leader_block.timestamp_ms().max(last_commit_timestamp_ms);
103
104 let to_commit = Self::linearize_sub_dag(
106 &self.context,
107 leader_block.clone(),
108 last_committed_rounds,
109 &mut dag_state,
110 );
111
112 drop(dag_state);
113
114 let commit = Commit::new(
116 last_commit_index + 1,
117 last_commit_digest,
118 timestamp_ms,
119 leader_block.reference(),
120 to_commit
121 .iter()
122 .map(|block| block.reference())
123 .collect::<Vec<_>>(),
124 );
125 let serialized = commit
126 .serialize()
127 .unwrap_or_else(|e| panic!("Failed to serialize commit: {e}"));
128 let commit = TrustedCommit::new_trusted(commit, serialized);
129
130 let sub_dag = CommittedSubDag::new(
132 leader_block.reference(),
133 to_commit,
134 timestamp_ms,
135 commit.reference(),
136 reputation_scores_desc,
137 );
138
139 (sub_dag, commit)
140 }
141
142 pub(crate) fn linearize_sub_dag(
143 context: &Context,
144 leader_block: VerifiedBlock,
145 last_committed_rounds: Vec<u32>,
146 dag_state: &mut impl BlockStoreAPI,
147 ) -> Vec<VerifiedBlock> {
148 let gc_enabled = dag_state.gc_enabled();
149 let gc_round: Round = dag_state.gc_round();
156 let leader_block_ref = leader_block.reference();
157 let mut buffer = vec![leader_block];
158
159 let mut to_commit = Vec::new();
160
161 if context.protocol_config.consensus_linearize_subdag_v2() {
166 assert!(
167 dag_state.set_committed(&leader_block_ref),
168 "Leader block with reference {leader_block_ref:?} attempted to be committed twice"
169 );
170
171 while let Some(x) = buffer.pop() {
172 to_commit.push(x.clone());
173
174 let ancestors: Vec<VerifiedBlock> = dag_state
175 .get_blocks(
176 &x.ancestors()
177 .iter()
178 .copied()
179 .filter(|ancestor| {
180 ancestor.round > gc_round && !dag_state.is_committed(ancestor)
181 })
182 .collect::<Vec<_>>(),
183 )
184 .into_iter()
185 .map(|ancestor_opt| {
186 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
187 })
188 .collect();
189
190 for ancestor in ancestors {
191 buffer.push(ancestor.clone());
192 assert!(
193 dag_state.set_committed(&ancestor.reference()),
194 "Block with reference {:?} attempted to be committed twice",
195 ancestor.reference()
196 );
197 }
198 }
199 } else {
200 let mut committed = HashSet::new();
201 assert!(committed.insert(leader_block_ref));
202
203 while let Some(x) = buffer.pop() {
204 to_commit.push(x.clone());
205
206 let ancestors: Vec<VerifiedBlock> = dag_state
207 .get_blocks(
208 &x.ancestors()
209 .iter()
210 .copied()
211 .filter(|ancestor| {
212 !committed.contains(ancestor)
219 && last_committed_rounds[ancestor.author] < ancestor.round
220 })
221 .filter(|ancestor| {
222 !gc_enabled || ancestor.round > gc_round
227 })
228 .collect::<Vec<_>>(),
229 )
230 .into_iter()
231 .map(|ancestor_opt| {
232 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
233 })
234 .collect();
235
236 for ancestor in ancestors {
237 buffer.push(ancestor.clone());
238 assert!(committed.insert(ancestor.reference()));
239 }
240 }
241 }
242
243 if gc_enabled {
247 assert!(
248 to_commit.iter().all(|block| block.round() > gc_round),
249 "No blocks <= {gc_round} should be committed. Leader round {leader_block_ref}, blocks {to_commit:?}."
250 );
251 }
252
253 sort_sub_dag_blocks(&mut to_commit);
255
256 to_commit
257 }
258
259 pub(crate) fn handle_commit(
263 &mut self,
264 committed_leaders: Vec<VerifiedBlock>,
265 ) -> Vec<CommittedSubDag> {
266 if committed_leaders.is_empty() {
267 return vec![];
268 }
269
270 let schedule_updated = self
273 .leader_schedule
274 .leader_schedule_updated(&self.dag_state);
275
276 let mut committed_sub_dags = vec![];
277 for (i, leader_block) in committed_leaders.into_iter().enumerate() {
278 let reputation_scores_desc = if schedule_updated && i == 0 {
279 self.leader_schedule
280 .leader_swap_table
281 .read()
282 .reputation_scores_desc
283 .clone()
284 } else {
285 vec![]
286 };
287
288 let (sub_dag, commit) =
291 self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);
292
293 self.update_blocks_pruned_metric(&sub_dag);
294
295 self.dag_state.write().add_commit(commit.clone());
298
299 committed_sub_dags.push(sub_dag);
300 }
301
302 self.dag_state.write().flush();
309
310 committed_sub_dags
311 }
312
313 fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
321 let (last_committed_rounds, gc_round) = {
322 let dag_state = self.dag_state.read();
323 (dag_state.last_committed_rounds(), dag_state.gc_round())
324 };
325
326 for block_ref in sub_dag
327 .blocks
328 .iter()
329 .flat_map(|block| block.ancestors())
330 .filter(
331 |ancestor_ref| {
332 ancestor_ref.round <= gc_round
333 && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
334 }, )
338 .unique()
339 {
340 let hostname = &self.context.committee.authority(block_ref.author).hostname;
341
342 let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
345 &[hostname, "uncommitted"]
346 } else {
347 &[hostname, "higher_committed"]
351 };
352
353 self.context
354 .metrics
355 .node_metrics
356 .blocks_pruned_on_commit
357 .with_label_values(label_values)
358 .inc();
359 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use rstest::rstest;
366
367 use super::*;
368 use crate::{
369 CommitIndex,
370 commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
371 context::Context,
372 leader_schedule::{LeaderSchedule, LeaderSwapTable},
373 storage::mem_store::MemStore,
374 test_dag_builder::DagBuilder,
375 test_dag_parser::parse_dag,
376 };
377
378 #[tokio::test]
379 async fn test_handle_commit() {
380 telemetry_subscribers::init_for_testing();
381 let num_authorities = 4;
382 let context = Arc::new(Context::new_for_test(num_authorities).0);
383 let dag_state = Arc::new(RwLock::new(DagState::new(
384 context.clone(),
385 Arc::new(MemStore::new()),
386 )));
387 let leader_schedule = Arc::new(LeaderSchedule::new(
388 context.clone(),
389 LeaderSwapTable::default(),
390 ));
391 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
392
393 let num_rounds: u32 = 10;
395 let mut dag_builder = DagBuilder::new(context.clone());
396 dag_builder
397 .layers(1..=num_rounds)
398 .build()
399 .persist_layers(dag_state.clone());
400
401 let leaders = dag_builder
402 .leader_blocks(1..=num_rounds)
403 .into_iter()
404 .map(Option::unwrap)
405 .collect::<Vec<_>>();
406
407 let commits = linearizer.handle_commit(leaders.clone());
408 for (idx, subdag) in commits.into_iter().enumerate() {
409 tracing::info!("{subdag:?}");
410 assert_eq!(subdag.leader, leaders[idx].reference());
411 assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
412 if idx == 0 {
413 assert_eq!(subdag.blocks.len(), 1);
415 } else {
416 assert_eq!(subdag.blocks.len(), num_authorities);
419 }
420 for block in subdag.blocks.iter() {
421 assert!(block.round() <= leaders[idx].round());
422 }
423 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
424 }
425 }
426
427 #[tokio::test]
428 async fn test_handle_commit_with_schedule_update() {
429 telemetry_subscribers::init_for_testing();
430 let num_authorities = 4;
431 let context = Arc::new(Context::new_for_test(num_authorities).0);
432 let dag_state = Arc::new(RwLock::new(DagState::new(
433 context.clone(),
434 Arc::new(MemStore::new()),
435 )));
436 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
437 let leader_schedule = Arc::new(
438 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
439 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
440 );
441 let mut linearizer =
442 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
443
444 let num_rounds: u32 = 20;
446 let mut dag_builder = DagBuilder::new(context.clone());
447 dag_builder
448 .layers(1..=num_rounds)
449 .build()
450 .persist_layers(dag_state.clone());
451
452 let leaders = dag_builder
454 .leader_blocks(1..=10)
455 .into_iter()
456 .map(Option::unwrap)
457 .collect::<Vec<_>>();
458
459 let commits = linearizer.handle_commit(leaders.clone());
461
462 dag_state.write().add_scoring_subdags(commits);
464 leader_schedule.update_leader_schedule_v2(&dag_state);
466 assert!(
467 leader_schedule.leader_schedule_updated(&dag_state),
468 "Leader schedule should have been updated"
469 );
470
471 let leaders = dag_builder
473 .leader_blocks(11..=20)
474 .into_iter()
475 .map(Option::unwrap)
476 .collect::<Vec<_>>();
477
478 let commits = linearizer.handle_commit(leaders.clone());
481 assert_eq!(commits.len(), 10);
482 let scores = vec![
483 (AuthorityIndex::new_for_test(1), 29),
484 (AuthorityIndex::new_for_test(0), 29),
485 (AuthorityIndex::new_for_test(3), 29),
486 (AuthorityIndex::new_for_test(2), 29),
487 ];
488 assert_eq!(commits[0].reputation_scores_desc, scores);
489 for commit in commits.into_iter().skip(1) {
490 assert_eq!(commit.reputation_scores_desc, vec![]);
491 }
492 }
493
494 #[tokio::test]
496 async fn test_handle_commit_with_schedule_update_with_unscored_subdags() {
497 telemetry_subscribers::init_for_testing();
498 let num_authorities = 4;
499 let context = Arc::new(Context::new_for_test(num_authorities).0);
500 let dag_state = Arc::new(RwLock::new(DagState::new(
501 context.clone(),
502 Arc::new(MemStore::new()),
503 )));
504 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
505 let leader_schedule = Arc::new(
506 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
507 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
508 );
509 let mut linearizer =
510 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
511
512 let num_rounds: u32 = 20;
514 let mut dag_builder = DagBuilder::new(context.clone());
515 dag_builder
516 .layers(1..=num_rounds)
517 .build()
518 .persist_layers(dag_state.clone());
519
520 let leaders = dag_builder
522 .leader_blocks(1..=10)
523 .into_iter()
524 .map(Option::unwrap)
525 .collect::<Vec<_>>();
526
527 let commits = linearizer.handle_commit(leaders.clone());
529
530 dag_state.write().add_unscored_committed_subdags(commits);
532
533 leader_schedule.update_leader_schedule_v1(&dag_state);
535
536 assert!(
537 leader_schedule.leader_schedule_updated(&dag_state),
538 "Leader schedule should have been updated"
539 );
540
541 let leaders = dag_builder
543 .leader_blocks(11..=20)
544 .into_iter()
545 .map(Option::unwrap)
546 .collect::<Vec<_>>();
547
548 let commits = linearizer.handle_commit(leaders.clone());
551 assert_eq!(commits.len(), 10);
552 let scores = vec![
553 (AuthorityIndex::new_for_test(2), 9),
554 (AuthorityIndex::new_for_test(1), 8),
555 (AuthorityIndex::new_for_test(0), 8),
556 (AuthorityIndex::new_for_test(3), 8),
557 ];
558 assert_eq!(commits[0].reputation_scores_desc, scores);
559
560 for commit in commits.into_iter().skip(1) {
561 assert_eq!(commit.reputation_scores_desc, vec![]);
562 }
563 }
564
565 #[tokio::test]
566 async fn test_handle_already_committed() {
567 telemetry_subscribers::init_for_testing();
568 let num_authorities = 4;
569 let (mut context, _) = Context::new_for_test(num_authorities);
570 context
571 .protocol_config
572 .set_consensus_gc_depth_for_testing(0);
573 context
574 .protocol_config
575 .set_consensus_linearize_subdag_v2_for_testing(false);
576
577 let context = Arc::new(context);
578
579 let dag_state = Arc::new(RwLock::new(DagState::new(
580 context.clone(),
581 Arc::new(MemStore::new()),
582 )));
583 let leader_schedule = Arc::new(LeaderSchedule::new(
584 context.clone(),
585 LeaderSwapTable::default(),
586 ));
587 let mut linearizer =
588 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
589 let wave_length = DEFAULT_WAVE_LENGTH;
590
591 let leader_round_wave_1 = 3;
592 let leader_round_wave_2 = leader_round_wave_1 + wave_length;
593
594 let mut dag_builder = DagBuilder::new(context.clone());
596 dag_builder.layers(1..=leader_round_wave_2).build();
597
598 let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
602 blocks.push(
603 dag_builder
604 .leader_block(leader_round_wave_1)
605 .expect("Leader block should have been found"),
606 );
607 dag_state.write().accept_blocks(blocks.clone());
608
609 let first_leader = dag_builder
610 .leader_block(leader_round_wave_1)
611 .expect("Wave 1 leader round block should exist");
612 let mut last_commit_index = 1;
613 let first_commit_data = TrustedCommit::new_for_test(
614 last_commit_index,
615 CommitDigest::MIN,
616 0,
617 first_leader.reference(),
618 blocks.into_iter().map(|block| block.reference()).collect(),
619 );
620 dag_state.write().add_commit(first_commit_data);
621
622 let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
625 blocks.retain(|block| {
627 !(block.round() == leader_round_wave_1
628 && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
629 });
630 blocks.push(
632 dag_builder
633 .leader_block(leader_round_wave_2)
634 .expect("Leader block should have been found"),
635 );
636 dag_state.write().accept_blocks(blocks.clone());
638
639 let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
640
641 let leader = dag_builder
643 .leader_block(leader_round_wave_2)
644 .expect("Leader block should exist");
645
646 last_commit_index += 1;
647 let expected_second_commit = TrustedCommit::new_for_test(
648 last_commit_index,
649 CommitDigest::MIN,
650 0,
651 leader.reference(),
652 blocks.clone(),
653 );
654
655 let commit = linearizer.handle_commit(vec![leader.clone()]);
656 assert_eq!(commit.len(), 1);
657
658 let subdag = &commit[0];
659 tracing::info!("{subdag:?}");
660 assert_eq!(subdag.leader, leader.reference());
661 assert_eq!(subdag.timestamp_ms, leader.timestamp_ms());
662 assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
663
664 blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
666 assert_eq!(
667 subdag
668 .blocks
669 .clone()
670 .into_iter()
671 .map(|b| b.reference())
672 .collect::<Vec<_>>(),
673 blocks
674 );
675 for block in subdag.blocks.iter() {
676 assert!(block.round() <= expected_second_commit.leader().round);
677 }
678 }
679
680 #[rstest]
684 #[tokio::test]
685 async fn test_handle_commit_with_gc_simple(#[values(0, 3)] gc_depth: u32) {
686 telemetry_subscribers::init_for_testing();
687
688 let num_authorities = 4;
689 let (mut context, _keys) = Context::new_for_test(num_authorities);
690 context.protocol_config.set_gc_depth_for_testing(gc_depth);
691
692 if gc_depth == 0 {
693 context
694 .protocol_config
695 .set_consensus_linearize_subdag_v2_for_testing(false);
696 }
697
698 let context = Arc::new(context);
699 let dag_state = Arc::new(RwLock::new(DagState::new(
700 context.clone(),
701 Arc::new(MemStore::new()),
702 )));
703 let leader_schedule = Arc::new(LeaderSchedule::new(
704 context.clone(),
705 LeaderSwapTable::default(),
706 ));
707 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
708
709 let dag_str = "DAG {
718 Round 0 : { 4 },
719 Round 1 : { * },
720 Round 2 : {
721 A -> [-D1],
722 B -> [-D1],
723 C -> [-D1],
724 D -> [*],
725 },
726 Round 3 : {
727 A -> [-D2],
728 B -> [-D2],
729 C -> [-D2],
730 },
731 Round 4 : {
732 A -> [-D3],
733 B -> [-D3],
734 C -> [-D3],
735 D -> [A3, B3, C3, D2],
736 },
737 Round 5 : { * },
738 }";
739
740 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
741 dag_builder.print();
742 dag_builder.persist_all_blocks(dag_state.clone());
743
744 let leaders = dag_builder
745 .leader_blocks(1..=6)
746 .into_iter()
747 .flatten()
748 .collect::<Vec<_>>();
749
750 let commits = linearizer.handle_commit(leaders.clone());
751 for (idx, subdag) in commits.into_iter().enumerate() {
752 tracing::info!("{subdag:?}");
753 assert_eq!(subdag.leader, leaders[idx].reference());
754 assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
755 if idx == 0 {
756 assert_eq!(subdag.blocks.len(), 1);
758 } else if idx == 1 {
759 assert_eq!(subdag.blocks.len(), 3);
760 } else if idx == 2 {
761 assert_eq!(subdag.blocks.len(), 6);
768 } else {
769 if gc_depth > 0 {
771 assert_eq!(subdag.blocks.len(), 5);
778
779 assert!(
780 subdag.blocks.iter().all(|block| block.round() >= 2),
781 "Found blocks that are of round < 2."
782 );
783
784 assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
786 } else {
787 assert_eq!(subdag.blocks.len(), 6);
789 assert!(
790 subdag.blocks.iter().all(|block| block.round() >= 1),
791 "Found blocks that are of round < 1."
792 );
793
794 assert_eq!(dag_state.read().gc_round(), 0);
796 }
797 }
798 for block in subdag.blocks.iter() {
799 assert!(block.round() <= leaders[idx].round());
800 }
801 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
802 }
803 }
804
805 #[rstest]
806 #[tokio::test]
807 async fn test_handle_commit_below_highest_committed_round(#[values(3)] gc_depth: u32) {
808 telemetry_subscribers::init_for_testing();
809
810 let num_authorities = 4;
811 let (mut context, _keys) = Context::new_for_test(num_authorities);
812 context
813 .protocol_config
814 .set_consensus_gc_depth_for_testing(gc_depth);
815 context
816 .protocol_config
817 .set_consensus_linearize_subdag_v2_for_testing(true);
818
819 let context = Arc::new(context);
820 let dag_state = Arc::new(RwLock::new(DagState::new(
821 context.clone(),
822 Arc::new(MemStore::new()),
823 )));
824 let leader_schedule = Arc::new(LeaderSchedule::new(
825 context.clone(),
826 LeaderSwapTable::default(),
827 ));
828 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
829
830 let dag_str = "DAG {
837 Round 0 : { 4 },
838 Round 1 : { * },
839 Round 2 : {
840 A -> [-D1],
841 B -> [-D1],
842 C -> [-D1],
843 D -> [-D1],
844 },
845 Round 3 : {
846 A -> [A2, B2, C2, D1],
847 B -> [A2, B2, C2, D1],
848 C -> [A2, B2, C2, D1],
849 D -> [A2, B2, C2, D2]
850 },
851 Round 4 : { * },
852 }";
853
854 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
855 dag_builder.print();
856 dag_builder.persist_all_blocks(dag_state.clone());
857
858 let leaders = dag_builder
859 .leader_blocks(1..=4)
860 .into_iter()
861 .flatten()
862 .collect::<Vec<_>>();
863
864 let commits = linearizer.handle_commit(leaders.clone());
865 for (idx, subdag) in commits.into_iter().enumerate() {
866 tracing::info!("{subdag:?}");
867 assert_eq!(subdag.leader, leaders[idx].reference());
868 assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
869 if idx == 0 {
870 assert_eq!(subdag.blocks.len(), 1);
872 } else if idx == 1 {
873 assert_eq!(subdag.blocks.len(), 3);
877 } else if idx == 2 {
878 assert_eq!(subdag.blocks.len(), 4);
882
883 assert!(
884 subdag.blocks.iter().any(|block| block.round() == 2
885 && block.author() == AuthorityIndex::new_for_test(3)),
886 "Block D2 should have been committed."
887 );
888 } else if idx == 3 {
889 assert_eq!(subdag.blocks.len(), 5);
894 assert!(
895 subdag.blocks.iter().any(|block| block.round() == 1
896 && block.author() == AuthorityIndex::new_for_test(3)),
897 "Block D1 should have been committed."
898 );
899 } else {
900 panic!("Unexpected subdag with index {idx:?}");
901 }
902
903 for block in subdag.blocks.iter() {
904 assert!(block.round() <= leaders[idx].round());
905 }
906 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
907 }
908 }
909}