1use std::{collections::HashSet, sync::Arc};
6
7use consensus_config::AuthorityIndex;
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12 Round,
13 block::{BlockAPI, BlockRef, VerifiedBlock},
14 commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
15 context::Context,
16 dag_state::DagState,
17 leader_schedule::LeaderSchedule,
18};
19
20pub(crate) trait BlockStoreAPI {
24 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
25
26 fn gc_round(&self) -> Round;
27
28 fn gc_enabled(&self) -> bool;
29
30 fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
31
32 fn is_committed(&self, block_ref: &BlockRef) -> bool;
33}
34
35impl BlockStoreAPI
36 for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
37{
38 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
39 DagState::get_blocks(self, refs)
40 }
41
42 fn gc_round(&self) -> Round {
43 DagState::gc_round(self)
44 }
45
46 fn gc_enabled(&self) -> bool {
47 DagState::gc_enabled(self)
48 }
49
50 fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
51 DagState::set_committed(self, block_ref)
52 }
53
54 fn is_committed(&self, block_ref: &BlockRef) -> bool {
55 DagState::is_committed(self, block_ref)
56 }
57}
58
59#[derive(Clone)]
61pub(crate) struct Linearizer {
62 context: Arc<Context>,
64 dag_state: Arc<RwLock<DagState>>,
65 leader_schedule: Arc<LeaderSchedule>,
66}
67
68impl Linearizer {
69 pub(crate) fn new(
70 context: Arc<Context>,
71 dag_state: Arc<RwLock<DagState>>,
72 leader_schedule: Arc<LeaderSchedule>,
73 ) -> Self {
74 Self {
75 dag_state,
76 leader_schedule,
77 context,
78 }
79 }
80
81 fn collect_sub_dag_and_commit(
85 &mut self,
86 leader_block: VerifiedBlock,
87 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
88 ) -> (CommittedSubDag, TrustedCommit) {
89 let _s = self
90 .context
91 .metrics
92 .node_metrics
93 .scope_processing_time
94 .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
95 .start_timer();
96 let mut dag_state = self.dag_state.write();
98 let last_commit_index = dag_state.last_commit_index();
99 let last_commit_digest = dag_state.last_commit_digest();
100 let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
101 let last_committed_rounds = dag_state.last_committed_rounds();
102 let timestamp_ms = leader_block.timestamp_ms().max(last_commit_timestamp_ms);
103
104 let to_commit = Self::linearize_sub_dag(
106 &self.context,
107 leader_block.clone(),
108 last_committed_rounds,
109 &mut dag_state,
110 );
111
112 drop(dag_state);
113
114 let commit = Commit::new(
116 last_commit_index + 1,
117 last_commit_digest,
118 timestamp_ms,
119 leader_block.reference(),
120 to_commit
121 .iter()
122 .map(|block| block.reference())
123 .collect::<Vec<_>>(),
124 );
125 let serialized = commit
126 .serialize()
127 .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
128 let commit = TrustedCommit::new_trusted(commit, serialized);
129
130 let sub_dag = CommittedSubDag::new(
132 leader_block.reference(),
133 to_commit,
134 timestamp_ms,
135 commit.reference(),
136 reputation_scores_desc,
137 );
138
139 (sub_dag, commit)
140 }
141
142 pub(crate) fn linearize_sub_dag(
143 context: &Context,
144 leader_block: VerifiedBlock,
145 last_committed_rounds: Vec<u32>,
146 dag_state: &mut impl BlockStoreAPI,
147 ) -> Vec<VerifiedBlock> {
148 let gc_enabled = dag_state.gc_enabled();
149 let gc_round: Round = dag_state.gc_round();
156 let leader_block_ref = leader_block.reference();
157 let mut buffer = vec![leader_block];
158
159 let mut to_commit = Vec::new();
160
161 if context.protocol_config.consensus_linearize_subdag_v2() {
166 assert!(
167 dag_state.set_committed(&leader_block_ref),
168 "Leader block with reference {:?} attempted to be committed twice",
169 leader_block_ref
170 );
171
172 while let Some(x) = buffer.pop() {
173 to_commit.push(x.clone());
174
175 let ancestors: Vec<VerifiedBlock> = dag_state
176 .get_blocks(
177 &x.ancestors()
178 .iter()
179 .copied()
180 .filter(|ancestor| {
181 ancestor.round > gc_round && !dag_state.is_committed(ancestor)
182 })
183 .collect::<Vec<_>>(),
184 )
185 .into_iter()
186 .map(|ancestor_opt| {
187 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
188 })
189 .collect();
190
191 for ancestor in ancestors {
192 buffer.push(ancestor.clone());
193 assert!(
194 dag_state.set_committed(&ancestor.reference()),
195 "Block with reference {:?} attempted to be committed twice",
196 ancestor.reference()
197 );
198 }
199 }
200 } else {
201 let mut committed = HashSet::new();
202 assert!(committed.insert(leader_block_ref));
203
204 while let Some(x) = buffer.pop() {
205 to_commit.push(x.clone());
206
207 let ancestors: Vec<VerifiedBlock> = dag_state
208 .get_blocks(
209 &x.ancestors()
210 .iter()
211 .copied()
212 .filter(|ancestor| {
213 !committed.contains(ancestor)
220 && last_committed_rounds[ancestor.author] < ancestor.round
221 })
222 .filter(|ancestor| {
223 !gc_enabled || ancestor.round > gc_round
228 })
229 .collect::<Vec<_>>(),
230 )
231 .into_iter()
232 .map(|ancestor_opt| {
233 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
234 })
235 .collect();
236
237 for ancestor in ancestors {
238 buffer.push(ancestor.clone());
239 assert!(committed.insert(ancestor.reference()));
240 }
241 }
242 }
243
244 if gc_enabled {
248 assert!(
249 to_commit.iter().all(|block| block.round() > gc_round),
250 "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
251 leader_block_ref
252 );
253 }
254
255 sort_sub_dag_blocks(&mut to_commit);
257
258 to_commit
259 }
260
261 pub(crate) fn handle_commit(
265 &mut self,
266 committed_leaders: Vec<VerifiedBlock>,
267 ) -> Vec<CommittedSubDag> {
268 if committed_leaders.is_empty() {
269 return vec![];
270 }
271
272 let schedule_updated = self
275 .leader_schedule
276 .leader_schedule_updated(&self.dag_state);
277
278 let mut committed_sub_dags = vec![];
279 for (i, leader_block) in committed_leaders.into_iter().enumerate() {
280 let reputation_scores_desc = if schedule_updated && i == 0 {
281 self.leader_schedule
282 .leader_swap_table
283 .read()
284 .reputation_scores_desc
285 .clone()
286 } else {
287 vec![]
288 };
289
290 let (sub_dag, commit) =
293 self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);
294
295 self.update_blocks_pruned_metric(&sub_dag);
296
297 self.dag_state.write().add_commit(commit.clone());
300
301 committed_sub_dags.push(sub_dag);
302 }
303
304 self.dag_state.write().flush();
311
312 committed_sub_dags
313 }
314
315 fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
323 let (last_committed_rounds, gc_round) = {
324 let dag_state = self.dag_state.read();
325 (dag_state.last_committed_rounds(), dag_state.gc_round())
326 };
327
328 for block_ref in sub_dag
329 .blocks
330 .iter()
331 .flat_map(|block| block.ancestors())
332 .filter(
333 |ancestor_ref| {
334 ancestor_ref.round <= gc_round
335 && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
336 }, )
340 .unique()
341 {
342 let hostname = &self.context.committee.authority(block_ref.author).hostname;
343
344 let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
347 &[hostname, "uncommitted"]
348 } else {
349 &[hostname, "higher_committed"]
353 };
354
355 self.context
356 .metrics
357 .node_metrics
358 .blocks_pruned_on_commit
359 .with_label_values(label_values)
360 .inc();
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use rstest::rstest;
368
369 use super::*;
370 use crate::{
371 CommitIndex,
372 commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
373 context::Context,
374 leader_schedule::{LeaderSchedule, LeaderSwapTable},
375 storage::mem_store::MemStore,
376 test_dag_builder::DagBuilder,
377 test_dag_parser::parse_dag,
378 };
379
380 #[tokio::test]
381 async fn test_handle_commit() {
382 telemetry_subscribers::init_for_testing();
383 let num_authorities = 4;
384 let context = Arc::new(Context::new_for_test(num_authorities).0);
385 let dag_state = Arc::new(RwLock::new(DagState::new(
386 context.clone(),
387 Arc::new(MemStore::new()),
388 )));
389 let leader_schedule = Arc::new(LeaderSchedule::new(
390 context.clone(),
391 LeaderSwapTable::default(),
392 ));
393 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
394
395 let num_rounds: u32 = 10;
397 let mut dag_builder = DagBuilder::new(context.clone());
398 dag_builder
399 .layers(1..=num_rounds)
400 .build()
401 .persist_layers(dag_state.clone());
402
403 let leaders = dag_builder
404 .leader_blocks(1..=num_rounds)
405 .into_iter()
406 .map(Option::unwrap)
407 .collect::<Vec<_>>();
408
409 let commits = linearizer.handle_commit(leaders.clone());
410 for (idx, subdag) in commits.into_iter().enumerate() {
411 tracing::info!("{subdag:?}");
412 assert_eq!(subdag.leader, leaders[idx].reference());
413 assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
414 if idx == 0 {
415 assert_eq!(subdag.blocks.len(), 1);
417 } else {
418 assert_eq!(subdag.blocks.len(), num_authorities);
421 }
422 for block in subdag.blocks.iter() {
423 assert!(block.round() <= leaders[idx].round());
424 }
425 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
426 }
427 }
428
429 #[tokio::test]
430 async fn test_handle_commit_with_schedule_update() {
431 telemetry_subscribers::init_for_testing();
432 let num_authorities = 4;
433 let context = Arc::new(Context::new_for_test(num_authorities).0);
434 let dag_state = Arc::new(RwLock::new(DagState::new(
435 context.clone(),
436 Arc::new(MemStore::new()),
437 )));
438 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
439 let leader_schedule = Arc::new(
440 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
441 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
442 );
443 let mut linearizer =
444 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
445
446 let num_rounds: u32 = 20;
448 let mut dag_builder = DagBuilder::new(context.clone());
449 dag_builder
450 .layers(1..=num_rounds)
451 .build()
452 .persist_layers(dag_state.clone());
453
454 let leaders = dag_builder
456 .leader_blocks(1..=10)
457 .into_iter()
458 .map(Option::unwrap)
459 .collect::<Vec<_>>();
460
461 let commits = linearizer.handle_commit(leaders.clone());
463
464 dag_state.write().add_scoring_subdags(commits);
466 leader_schedule.update_leader_schedule_v2(&dag_state);
468 assert!(
469 leader_schedule.leader_schedule_updated(&dag_state),
470 "Leader schedule should have been updated"
471 );
472
473 let leaders = dag_builder
475 .leader_blocks(11..=20)
476 .into_iter()
477 .map(Option::unwrap)
478 .collect::<Vec<_>>();
479
480 let commits = linearizer.handle_commit(leaders.clone());
483 assert_eq!(commits.len(), 10);
484 let scores = vec![
485 (AuthorityIndex::new_for_test(1), 29),
486 (AuthorityIndex::new_for_test(0), 29),
487 (AuthorityIndex::new_for_test(3), 29),
488 (AuthorityIndex::new_for_test(2), 29),
489 ];
490 assert_eq!(commits[0].reputation_scores_desc, scores);
491 for commit in commits.into_iter().skip(1) {
492 assert_eq!(commit.reputation_scores_desc, vec![]);
493 }
494 }
495
496 #[tokio::test]
498 async fn test_handle_commit_with_schedule_update_with_unscored_subdags() {
499 telemetry_subscribers::init_for_testing();
500 let num_authorities = 4;
501 let context = Arc::new(Context::new_for_test(num_authorities).0);
502 let dag_state = Arc::new(RwLock::new(DagState::new(
503 context.clone(),
504 Arc::new(MemStore::new()),
505 )));
506 const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
507 let leader_schedule = Arc::new(
508 LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
509 .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
510 );
511 let mut linearizer =
512 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
513
514 let num_rounds: u32 = 20;
516 let mut dag_builder = DagBuilder::new(context.clone());
517 dag_builder
518 .layers(1..=num_rounds)
519 .build()
520 .persist_layers(dag_state.clone());
521
522 let leaders = dag_builder
524 .leader_blocks(1..=10)
525 .into_iter()
526 .map(Option::unwrap)
527 .collect::<Vec<_>>();
528
529 let commits = linearizer.handle_commit(leaders.clone());
531
532 dag_state.write().add_unscored_committed_subdags(commits);
534
535 leader_schedule.update_leader_schedule_v1(&dag_state);
537
538 assert!(
539 leader_schedule.leader_schedule_updated(&dag_state),
540 "Leader schedule should have been updated"
541 );
542
543 let leaders = dag_builder
545 .leader_blocks(11..=20)
546 .into_iter()
547 .map(Option::unwrap)
548 .collect::<Vec<_>>();
549
550 let commits = linearizer.handle_commit(leaders.clone());
553 assert_eq!(commits.len(), 10);
554 let scores = vec![
555 (AuthorityIndex::new_for_test(2), 9),
556 (AuthorityIndex::new_for_test(1), 8),
557 (AuthorityIndex::new_for_test(0), 8),
558 (AuthorityIndex::new_for_test(3), 8),
559 ];
560 assert_eq!(commits[0].reputation_scores_desc, scores);
561
562 for commit in commits.into_iter().skip(1) {
563 assert_eq!(commit.reputation_scores_desc, vec![]);
564 }
565 }
566
567 #[tokio::test]
568 async fn test_handle_already_committed() {
569 telemetry_subscribers::init_for_testing();
570 let num_authorities = 4;
571 let (mut context, _) = Context::new_for_test(num_authorities);
572 context
573 .protocol_config
574 .set_consensus_gc_depth_for_testing(0);
575 context
576 .protocol_config
577 .set_consensus_linearize_subdag_v2_for_testing(false);
578
579 let context = Arc::new(context);
580
581 let dag_state = Arc::new(RwLock::new(DagState::new(
582 context.clone(),
583 Arc::new(MemStore::new()),
584 )));
585 let leader_schedule = Arc::new(LeaderSchedule::new(
586 context.clone(),
587 LeaderSwapTable::default(),
588 ));
589 let mut linearizer =
590 Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
591 let wave_length = DEFAULT_WAVE_LENGTH;
592
593 let leader_round_wave_1 = 3;
594 let leader_round_wave_2 = leader_round_wave_1 + wave_length;
595
596 let mut dag_builder = DagBuilder::new(context.clone());
598 dag_builder.layers(1..=leader_round_wave_2).build();
599
600 let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
604 blocks.push(
605 dag_builder
606 .leader_block(leader_round_wave_1)
607 .expect("Leader block should have been found"),
608 );
609 dag_state.write().accept_blocks(blocks.clone());
610
611 let first_leader = dag_builder
612 .leader_block(leader_round_wave_1)
613 .expect("Wave 1 leader round block should exist");
614 let mut last_commit_index = 1;
615 let first_commit_data = TrustedCommit::new_for_test(
616 last_commit_index,
617 CommitDigest::MIN,
618 0,
619 first_leader.reference(),
620 blocks.into_iter().map(|block| block.reference()).collect(),
621 );
622 dag_state.write().add_commit(first_commit_data);
623
624 let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
627 blocks.retain(|block| {
629 !(block.round() == leader_round_wave_1
630 && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
631 });
632 blocks.push(
634 dag_builder
635 .leader_block(leader_round_wave_2)
636 .expect("Leader block should have been found"),
637 );
638 dag_state.write().accept_blocks(blocks.clone());
640
641 let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
642
643 let leader = dag_builder
645 .leader_block(leader_round_wave_2)
646 .expect("Leader block should exist");
647
648 last_commit_index += 1;
649 let expected_second_commit = TrustedCommit::new_for_test(
650 last_commit_index,
651 CommitDigest::MIN,
652 0,
653 leader.reference(),
654 blocks.clone(),
655 );
656
657 let commit = linearizer.handle_commit(vec![leader.clone()]);
658 assert_eq!(commit.len(), 1);
659
660 let subdag = &commit[0];
661 tracing::info!("{subdag:?}");
662 assert_eq!(subdag.leader, leader.reference());
663 assert_eq!(subdag.timestamp_ms, leader.timestamp_ms());
664 assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
665
666 blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
668 assert_eq!(
669 subdag
670 .blocks
671 .clone()
672 .into_iter()
673 .map(|b| b.reference())
674 .collect::<Vec<_>>(),
675 blocks
676 );
677 for block in subdag.blocks.iter() {
678 assert!(block.round() <= expected_second_commit.leader().round);
679 }
680 }
681
682 #[rstest]
686 #[tokio::test]
687 async fn test_handle_commit_with_gc_simple(#[values(0, 3)] gc_depth: u32) {
688 telemetry_subscribers::init_for_testing();
689
690 let num_authorities = 4;
691 let (mut context, _keys) = Context::new_for_test(num_authorities);
692 context.protocol_config.set_gc_depth_for_testing(gc_depth);
693
694 if gc_depth == 0 {
695 context
696 .protocol_config
697 .set_consensus_linearize_subdag_v2_for_testing(false);
698 }
699
700 let context = Arc::new(context);
701 let dag_state = Arc::new(RwLock::new(DagState::new(
702 context.clone(),
703 Arc::new(MemStore::new()),
704 )));
705 let leader_schedule = Arc::new(LeaderSchedule::new(
706 context.clone(),
707 LeaderSwapTable::default(),
708 ));
709 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
710
711 let dag_str = "DAG {
720 Round 0 : { 4 },
721 Round 1 : { * },
722 Round 2 : {
723 A -> [-D1],
724 B -> [-D1],
725 C -> [-D1],
726 D -> [*],
727 },
728 Round 3 : {
729 A -> [-D2],
730 B -> [-D2],
731 C -> [-D2],
732 },
733 Round 4 : {
734 A -> [-D3],
735 B -> [-D3],
736 C -> [-D3],
737 D -> [A3, B3, C3, D2],
738 },
739 Round 5 : { * },
740 }";
741
742 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
743 dag_builder.print();
744 dag_builder.persist_all_blocks(dag_state.clone());
745
746 let leaders = dag_builder
747 .leader_blocks(1..=6)
748 .into_iter()
749 .flatten()
750 .collect::<Vec<_>>();
751
752 let commits = linearizer.handle_commit(leaders.clone());
753 for (idx, subdag) in commits.into_iter().enumerate() {
754 tracing::info!("{subdag:?}");
755 assert_eq!(subdag.leader, leaders[idx].reference());
756 assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
757 if idx == 0 {
758 assert_eq!(subdag.blocks.len(), 1);
760 } else if idx == 1 {
761 assert_eq!(subdag.blocks.len(), 3);
762 } else if idx == 2 {
763 assert_eq!(subdag.blocks.len(), 6);
770 } else {
771 if gc_depth > 0 {
773 assert_eq!(subdag.blocks.len(), 5);
780
781 assert!(
782 subdag.blocks.iter().all(|block| block.round() >= 2),
783 "Found blocks that are of round < 2."
784 );
785
786 assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
788 } else {
789 assert_eq!(subdag.blocks.len(), 6);
791 assert!(
792 subdag.blocks.iter().all(|block| block.round() >= 1),
793 "Found blocks that are of round < 1."
794 );
795
796 assert_eq!(dag_state.read().gc_round(), 0);
798 }
799 }
800 for block in subdag.blocks.iter() {
801 assert!(block.round() <= leaders[idx].round());
802 }
803 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
804 }
805 }
806
807 #[rstest]
808 #[tokio::test]
809 async fn test_handle_commit_below_highest_committed_round(#[values(3)] gc_depth: u32) {
810 telemetry_subscribers::init_for_testing();
811
812 let num_authorities = 4;
813 let (mut context, _keys) = Context::new_for_test(num_authorities);
814 context
815 .protocol_config
816 .set_consensus_gc_depth_for_testing(gc_depth);
817 context
818 .protocol_config
819 .set_consensus_linearize_subdag_v2_for_testing(true);
820
821 let context = Arc::new(context);
822 let dag_state = Arc::new(RwLock::new(DagState::new(
823 context.clone(),
824 Arc::new(MemStore::new()),
825 )));
826 let leader_schedule = Arc::new(LeaderSchedule::new(
827 context.clone(),
828 LeaderSwapTable::default(),
829 ));
830 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
831
832 let dag_str = "DAG {
839 Round 0 : { 4 },
840 Round 1 : { * },
841 Round 2 : {
842 A -> [-D1],
843 B -> [-D1],
844 C -> [-D1],
845 D -> [-D1],
846 },
847 Round 3 : {
848 A -> [A2, B2, C2, D1],
849 B -> [A2, B2, C2, D1],
850 C -> [A2, B2, C2, D1],
851 D -> [A2, B2, C2, D2]
852 },
853 Round 4 : { * },
854 }";
855
856 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
857 dag_builder.print();
858 dag_builder.persist_all_blocks(dag_state.clone());
859
860 let leaders = dag_builder
861 .leader_blocks(1..=4)
862 .into_iter()
863 .flatten()
864 .collect::<Vec<_>>();
865
866 let commits = linearizer.handle_commit(leaders.clone());
867 for (idx, subdag) in commits.into_iter().enumerate() {
868 tracing::info!("{subdag:?}");
869 assert_eq!(subdag.leader, leaders[idx].reference());
870 assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
871 if idx == 0 {
872 assert_eq!(subdag.blocks.len(), 1);
874 } else if idx == 1 {
875 assert_eq!(subdag.blocks.len(), 3);
879 } else if idx == 2 {
880 assert_eq!(subdag.blocks.len(), 4);
884
885 assert!(
886 subdag.blocks.iter().any(|block| block.round() == 2
887 && block.author() == AuthorityIndex::new_for_test(3)),
888 "Block D2 should have been committed."
889 );
890 } else if idx == 3 {
891 assert_eq!(subdag.blocks.len(), 5);
896 assert!(
897 subdag.blocks.iter().any(|block| block.round() == 1
898 && block.author() == AuthorityIndex::new_for_test(3)),
899 "Block D1 should have been committed."
900 );
901 } else {
902 panic!("Unexpected subdag with index {:?}", idx);
903 }
904
905 for block in subdag.blocks.iter() {
906 assert!(block.round() <= leaders[idx].round());
907 }
908 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
909 }
910 }
911}