1use std::{
6 collections::{BTreeMap, BTreeSet, btree_map::Entry},
7 sync::Arc,
8 time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, trace, warn};
16
17use crate::{
18 Round,
19 block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 context::Context,
22 dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27 block: VerifiedBlock,
28 missing_ancestors: BTreeSet<BlockRef>,
29 timestamp: Instant,
30}
31
32impl SuspendedBlock {
33 pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34 Self {
35 block,
36 missing_ancestors,
37 timestamp: Instant::now(),
38 }
39 }
40}
41
42pub(crate) struct BlockManager {
48 context: Arc<Context>,
49 dag_state: Arc<RwLock<DagState>>,
50 block_verifier: Arc<dyn BlockVerifier>,
51
52 suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57 missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71 received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78 pub(crate) fn new(
79 context: Arc<Context>,
80 dag_state: Arc<RwLock<DagState>>,
81 block_verifier: Arc<dyn BlockVerifier>,
82 ) -> Self {
83 let committee_size = context.committee.size();
84 Self {
85 context,
86 dag_state,
87 block_verifier,
88 suspended_blocks: BTreeMap::new(),
89 missing_ancestors: BTreeMap::new(),
90 missing_blocks: BTreeMap::new(),
91 received_block_rounds: vec![None; committee_size],
92 }
93 }
94
95 #[tracing::instrument(skip_all)]
101 pub(crate) fn try_accept_blocks(
102 &mut self,
103 blocks: Vec<VerifiedBlock>,
104 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105 let _s = monitored_scope("BlockManager::try_accept_blocks");
106 self.try_accept_blocks_internal(blocks, false)
107 }
108
109 #[tracing::instrument(skip_all)]
112 pub(crate) fn try_accept_committed_blocks(
113 &mut self,
114 blocks: Vec<VerifiedBlock>,
115 ) -> Vec<VerifiedBlock> {
116 if !self.dag_state.read().gc_enabled() {
118 return Vec::new();
119 }
120
121 let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123 let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124 assert!(
125 missing_blocks.is_empty(),
126 "No missing blocks should be returned for committed blocks"
127 );
128
129 accepted_blocks
130 }
131
132 fn try_accept_blocks_internal(
136 &mut self,
137 mut blocks: Vec<VerifiedBlock>,
138 committed: bool,
139 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140 let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142 blocks.sort_by_key(|b| b.round());
143 debug!(
144 "Trying to accept blocks: {}",
145 blocks.iter().map(|b| b.reference().to_string()).join(",")
146 );
147
148 let mut accepted_blocks = vec![];
149 let mut missing_blocks = BTreeSet::new();
150
151 for block in blocks {
152 self.update_block_received_metrics(&block);
153
154 let block_ref = block.reference();
156
157 let mut to_verify_timestamps_and_accept = vec![];
158 if committed {
159 match self.try_accept_one_committed_block(block) {
160 TryAcceptResult::Accepted(block) => {
161 accepted_blocks.push(block);
165 }
166 TryAcceptResult::Processed => continue,
167 TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168 panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169 }
170 };
171 } else {
172 match self.try_accept_one_block(block) {
173 TryAcceptResult::Accepted(block) => {
174 to_verify_timestamps_and_accept.push(block);
175 }
176 TryAcceptResult::Suspended(ancestors_to_fetch) => {
177 debug!(
178 "Missing ancestors to fetch for block {block_ref}: {}",
179 ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180 );
181 missing_blocks.extend(ancestors_to_fetch);
182 continue;
183 }
184 TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185 };
186 };
187
188 let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190 to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192 let blocks_to_accept =
194 self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195 accepted_blocks.extend(blocks_to_accept);
196 }
197
198 self.update_stats(missing_blocks.len() as u64);
199
200 (accepted_blocks, missing_blocks)
202 }
203
204 fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205 if self.dag_state.read().contains_block(&block.reference()) {
206 return TryAcceptResult::Processed;
207 }
208
209 self.missing_blocks.remove(&block.reference());
211
212 if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218 suspended_block
219 .missing_ancestors
220 .iter()
221 .for_each(|ancestor| {
222 if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223 references.remove(&block.reference());
224 }
225 });
226 }
227
228 self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231 TryAcceptResult::Accepted(block)
232 }
233
234 pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237 let _s = monitored_scope("BlockManager::try_find_blocks");
238 let gc_round = self.dag_state.read().gc_round();
239
240 let mut block_refs = block_refs
243 .into_iter()
244 .filter(|block_ref| block_ref.round > gc_round)
245 .collect::<Vec<_>>();
246
247 if block_refs.is_empty() {
248 return BTreeSet::new();
249 }
250
251 block_refs.sort_by_key(|b| b.round);
252
253 debug!(
254 "Trying to find blocks: {}",
255 block_refs.iter().map(|b| b.to_string()).join(",")
256 );
257
258 let mut missing_blocks = BTreeSet::new();
259
260 for (found, block_ref) in self
261 .dag_state
262 .read()
263 .contains_blocks(block_refs.clone())
264 .into_iter()
265 .zip(block_refs.iter())
266 {
267 if found || self.suspended_blocks.contains_key(block_ref) {
268 continue;
269 }
270 missing_blocks.insert(*block_ref);
272 if self
273 .missing_blocks
274 .insert(*block_ref, BTreeSet::from([block_ref.author]))
275 .is_none()
276 {
277 self.missing_ancestors.entry(*block_ref).or_default();
281
282 let block_ref_hostname =
283 &self.context.committee.authority(block_ref.author).hostname;
284 self.context
285 .metrics
286 .node_metrics
287 .block_manager_missing_blocks_by_authority
288 .with_label_values(&[block_ref_hostname])
289 .inc();
290 }
291 }
292
293 let metrics = &self.context.metrics.node_metrics;
294 metrics
295 .missing_blocks_total
296 .inc_by(missing_blocks.len() as u64);
297 metrics
298 .block_manager_missing_blocks
299 .set(self.missing_blocks.len() as i64);
300
301 missing_blocks
302 }
303
304 fn verify_block_timestamps_and_accept(
309 &mut self,
310 unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
311 ) -> Vec<VerifiedBlock> {
312 let (gc_enabled, gc_round) = {
313 let dag_state = self.dag_state.read();
314 (dag_state.gc_enabled(), dag_state.gc_round())
315 };
316 let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
318 let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
319 {
320 'block: for b in unsuspended_blocks {
321 let ancestors = self.dag_state.read().get_blocks(b.ancestors());
322 assert_eq!(b.ancestors().len(), ancestors.len());
323 let mut ancestor_blocks = vec![];
324 'ancestor: for (ancestor_ref, found) in
325 b.ancestors().iter().zip(ancestors.into_iter())
326 {
327 if let Some(found_block) = found {
328 assert_eq!(ancestor_ref, &found_block.reference());
330 ancestor_blocks.push(Some(found_block));
331 continue 'ancestor;
332 }
333 if blocks_to_accept.contains_key(ancestor_ref) {
336 ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
337 continue 'ancestor;
338 }
339 if blocks_to_reject.contains_key(ancestor_ref) {
341 blocks_to_reject.insert(b.reference(), b);
342 continue 'block;
343 }
344
345 if gc_enabled
350 && ancestor_ref.round > GENESIS_ROUND
351 && ancestor_ref.round <= gc_round
352 {
353 debug!(
354 "Block {:?} has a missing ancestor: {:?} passed GC round {}",
355 b.reference(),
356 ancestor_ref,
357 gc_round
358 );
359 ancestor_blocks.push(None);
360 } else {
361 panic!(
362 "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
363 );
364 }
365 }
366 if let Err(e) =
367 self.block_verifier
368 .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
369 {
370 warn!("Block {:?} failed to verify ancestors: {}", b, e);
371 blocks_to_reject.insert(b.reference(), b);
372 } else {
373 blocks_to_accept.insert(b.reference(), b);
374 }
375 }
376 }
377
378 for (block_ref, block) in blocks_to_reject {
380 let hostname = self
381 .context
382 .committee
383 .authority(block_ref.author)
384 .hostname
385 .clone();
386
387 self.context
388 .metrics
389 .node_metrics
390 .invalid_blocks
391 .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
392 .inc();
393 warn!("Invalid block {:?} is rejected", block);
394 }
395
396 let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
397
398 self.dag_state
401 .write()
402 .accept_blocks(blocks_to_accept.clone());
403
404 blocks_to_accept
405 }
406
407 fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
412 let block_ref = block.reference();
413 let mut missing_ancestors = BTreeSet::new();
414 let mut ancestors_to_fetch = BTreeSet::new();
415 let dag_state = self.dag_state.read();
416 let gc_round = dag_state.gc_round();
417 let gc_enabled = dag_state.gc_enabled();
418
419 if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
422 return TryAcceptResult::Processed;
423 }
424
425 if gc_enabled && block.round() <= gc_round {
428 let hostname = self
429 .context
430 .committee
431 .authority(block.author())
432 .hostname
433 .as_str();
434 self.context
435 .metrics
436 .node_metrics
437 .block_manager_skipped_blocks
438 .with_label_values(&[hostname])
439 .inc();
440 return TryAcceptResult::Skipped;
441 }
442
443 let ancestors = if gc_enabled {
447 block
448 .ancestors()
449 .iter()
450 .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
451 .cloned()
452 .collect::<Vec<_>>()
453 } else {
454 block.ancestors().to_vec()
455 };
456
457 for (found, ancestor) in dag_state
459 .contains_blocks(ancestors.clone())
460 .into_iter()
461 .zip(ancestors.iter())
462 {
463 if !found {
464 missing_ancestors.insert(*ancestor);
465
466 self.missing_ancestors
468 .entry(*ancestor)
469 .or_default()
470 .insert(block_ref);
471
472 let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
473 self.context
474 .metrics
475 .node_metrics
476 .block_manager_missing_ancestors_by_authority
477 .with_label_values(&[ancestor_hostname])
478 .inc();
479
480 if !self.suspended_blocks.contains_key(ancestor) {
484 ancestors_to_fetch.insert(*ancestor);
486 let entry = self.missing_blocks.entry(*ancestor);
490 match entry {
491 Entry::Vacant(v) => {
492 v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
493 self.context
494 .metrics
495 .node_metrics
496 .block_manager_missing_blocks_by_authority
497 .with_label_values(&[ancestor_hostname])
498 .inc();
499 }
500 Entry::Occupied(mut o) => {
501 o.get_mut().insert(block_ref.author);
502 }
503 }
504 }
505 }
506 }
507
508 self.missing_blocks.remove(&block.reference());
512
513 if !missing_ancestors.is_empty() {
514 let hostname = self
515 .context
516 .committee
517 .authority(block.author())
518 .hostname
519 .as_str();
520 self.context
521 .metrics
522 .node_metrics
523 .block_suspensions
524 .with_label_values(&[hostname])
525 .inc();
526 self.suspended_blocks
527 .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
528 return TryAcceptResult::Suspended(ancestors_to_fetch);
529 }
530
531 TryAcceptResult::Accepted(block)
532 }
533
534 fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
538 let mut unsuspended_blocks = vec![];
539 let mut to_process_blocks = vec![accepted_block];
540
541 while let Some(block_ref) = to_process_blocks.pop() {
542 if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
544 for r in block_refs_with_missing_deps {
545 if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
549 to_process_blocks.push(block.block.reference());
550 unsuspended_blocks.push(block);
551 }
552 }
553 }
554 }
555
556 let now = Instant::now();
557
558 for block in &unsuspended_blocks {
560 let hostname = self
561 .context
562 .committee
563 .authority(block.block.author())
564 .hostname
565 .as_str();
566 self.context
567 .metrics
568 .node_metrics
569 .block_unsuspensions
570 .with_label_values(&[hostname])
571 .inc();
572 self.context
573 .metrics
574 .node_metrics
575 .suspended_block_time
576 .with_label_values(&[hostname])
577 .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
578 }
579
580 unsuspended_blocks
581 .into_iter()
582 .map(|block| block.block)
583 .collect()
584 }
585
586 fn try_unsuspend_block(
591 &mut self,
592 block_ref: &BlockRef,
593 accepted_dependency: &BlockRef,
594 ) -> Option<SuspendedBlock> {
595 let block = self
596 .suspended_blocks
597 .get_mut(block_ref)
598 .expect("Block should be in suspended map");
599
600 assert!(
601 block.missing_ancestors.remove(accepted_dependency),
602 "Block reference {} should be present in missing dependencies of {:?}",
603 block_ref,
604 block.block
605 );
606
607 if block.missing_ancestors.is_empty() {
608 return self.suspended_blocks.remove(block_ref);
610 }
611 None
612 }
613
614 pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
618 let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
619 let (gc_enabled, gc_round) = {
620 let dag_state = self.dag_state.read();
621 (dag_state.gc_enabled(), dag_state.gc_round())
622 };
623 let mut blocks_unsuspended_below_gc_round = 0;
624 let mut blocks_gc_ed = 0;
625
626 if !gc_enabled {
627 trace!("GC is disabled, no blocks will attempt to get unsuspended.");
628 return;
629 }
630
631 while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
632 if block_ref.round > gc_round {
637 return;
638 }
639
640 blocks_gc_ed += 1;
641
642 let hostname = self
643 .context
644 .committee
645 .authority(block_ref.author)
646 .hostname
647 .as_str();
648 self.context
649 .metrics
650 .node_metrics
651 .block_manager_gced_blocks
652 .with_label_values(&[hostname])
653 .inc();
654
655 assert!(
656 !self.suspended_blocks.contains_key(block_ref),
657 "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
658 );
659
660 self.missing_blocks.remove(block_ref);
662
663 let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
666
667 unsuspended_blocks.iter().for_each(|block| {
668 if block.round() <= gc_round {
669 blocks_unsuspended_below_gc_round += 1;
670 }
671 });
672
673 let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
675 for block in accepted_blocks {
676 let hostname = self
677 .context
678 .committee
679 .authority(block.author())
680 .hostname
681 .as_str();
682 self.context
683 .metrics
684 .node_metrics
685 .block_manager_gc_unsuspended_blocks
686 .with_label_values(&[hostname])
687 .inc();
688 }
689 }
690
691 debug!(
692 "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
693 blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
694 );
695 }
696
697 pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
701 self.missing_blocks.clone()
702 }
703
704 #[cfg(test)]
706 pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
707 self.missing_blocks.keys().cloned().collect()
708 }
709
710 fn update_stats(&mut self, missing_blocks: u64) {
711 let metrics = &self.context.metrics.node_metrics;
712 metrics.missing_blocks_total.inc_by(missing_blocks);
713 metrics
714 .block_manager_suspended_blocks
715 .set(self.suspended_blocks.len() as i64);
716 metrics
717 .block_manager_missing_ancestors
718 .set(self.missing_ancestors.len() as i64);
719 metrics
720 .block_manager_missing_blocks
721 .set(self.missing_blocks.len() as i64);
722 }
723
724 fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
725 let (min_round, max_round) =
726 if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
727 (curr_min.min(block.round()), curr_max.max(block.round()))
728 } else {
729 (block.round(), block.round())
730 };
731 self.received_block_rounds[block.author()] = Some((min_round, max_round));
732
733 let hostname = &self.context.committee.authority(block.author()).hostname;
734 self.context
735 .metrics
736 .node_metrics
737 .lowest_verified_authority_round
738 .with_label_values(&[hostname])
739 .set(min_round.into());
740 self.context
741 .metrics
742 .node_metrics
743 .highest_verified_authority_round
744 .with_label_values(&[hostname])
745 .set(max_round.into());
746 }
747
748 #[cfg(test)]
750 pub(crate) fn is_empty(&self) -> bool {
751 self.suspended_blocks.is_empty()
752 && self.missing_ancestors.is_empty()
753 && self.missing_blocks.is_empty()
754 }
755
756 #[cfg(test)]
759 fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
760 self.suspended_blocks.keys().cloned().collect()
761 }
762}
763
764enum TryAcceptResult {
766 Accepted(VerifiedBlock),
768 Suspended(BTreeSet<BlockRef>),
770 Processed,
774 Skipped,
777}
778
779#[cfg(test)]
780mod tests {
781 use std::{collections::BTreeSet, sync::Arc};
782
783 use consensus_config::AuthorityIndex;
784 use parking_lot::RwLock;
785 use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
786 use rstest::rstest;
787
788 use crate::{
789 CommitDigest, Round,
790 block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
791 block_manager::BlockManager,
792 block_verifier::{BlockVerifier, NoopBlockVerifier},
793 commit::TrustedCommit,
794 context::Context,
795 dag_state::DagState,
796 error::{ConsensusError, ConsensusResult},
797 storage::mem_store::MemStore,
798 test_dag_builder::DagBuilder,
799 test_dag_parser::parse_dag,
800 };
801
802 #[tokio::test]
803 async fn suspend_blocks_with_missing_ancestors() {
804 let (context, _key_pairs) = Context::new_for_test(4);
806 let context = Arc::new(context);
807 let store = Arc::new(MemStore::new());
808 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
809
810 let mut block_manager =
811 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
812
813 let mut dag_builder = DagBuilder::new(context.clone());
815 dag_builder
816 .layers(1..=2) .authorities(vec![
818 AuthorityIndex::new_for_test(0),
819 AuthorityIndex::new_for_test(2),
820 ]) .equivocate(3)
822 .build();
823
824 let round_2_blocks = dag_builder
826 .blocks
827 .into_iter()
828 .filter_map(|(_, block)| (block.round() == 2).then_some(block))
829 .collect::<Vec<VerifiedBlock>>();
830
831 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
833
834 assert!(accepted_blocks.is_empty());
836
837 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
840 let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
841 assert_eq!(missing, missing_block_refs);
842
843 assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
847
848 assert_eq!(
850 block_manager.suspended_blocks_refs(),
851 round_2_blocks
852 .into_iter()
853 .map(|block| block.reference())
854 .collect::<BTreeSet<_>>()
855 );
856
857 let known_by_manager = block_manager
859 .missing_blocks()
860 .iter()
861 .next()
862 .expect("We should expect at least two elements there")
863 .1
864 .clone();
865 assert_eq!(
866 known_by_manager,
867 context
868 .committee
869 .authorities()
870 .map(|(a, _)| a)
871 .collect::<BTreeSet<_>>()
872 );
873 }
874
875 #[tokio::test]
876 async fn try_accept_block_returns_missing_blocks() {
877 let (context, _key_pairs) = Context::new_for_test(4);
878 let context = Arc::new(context);
879 let store = Arc::new(MemStore::new());
880 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
881
882 let mut block_manager =
883 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
884
885 let mut dag_builder = DagBuilder::new(context.clone());
887 dag_builder
888 .layers(1..=4) .authorities(vec![
890 AuthorityIndex::new_for_test(0),
891 AuthorityIndex::new_for_test(2),
892 ]) .equivocate(3) .build();
895
896 for (_, block) in dag_builder
899 .blocks
900 .into_iter()
901 .rev()
902 .take_while(|(_, block)| block.round() >= 2)
903 {
904 let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
906
907 assert!(accepted_blocks.is_empty());
909
910 let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
911 assert_eq!(missing, block_ancestors);
912 }
913 }
914
915 #[tokio::test]
916 async fn accept_blocks_with_complete_causal_history() {
917 let (context, _key_pairs) = Context::new_for_test(4);
919 let context = Arc::new(context);
920 let store = Arc::new(MemStore::new());
921 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
922
923 let mut block_manager =
924 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
925
926 let mut dag_builder = DagBuilder::new(context.clone());
928 dag_builder.layers(1..=2).build();
929
930 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
931
932 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
934
935 assert_eq!(accepted_blocks.len(), 8);
937 assert_eq!(
938 accepted_blocks,
939 all_blocks
940 .iter()
941 .filter(|block| block.round() > 0)
942 .cloned()
943 .collect::<Vec<VerifiedBlock>>()
944 );
945 assert!(missing.is_empty());
946 assert!(block_manager.is_empty());
947
948 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
951 assert!(accepted_blocks.is_empty());
952 }
953
954 #[tokio::test]
957 async fn accept_blocks_with_causal_history_below_gc_round() {
958 let (mut context, _key_pairs) = Context::new_for_test(4);
960
961 context
963 .protocol_config
964 .set_consensus_gc_depth_for_testing(4);
965 let context = Arc::new(context);
966 let store = Arc::new(MemStore::new());
967 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
968
969 let last_commit = TrustedCommit::new_for_test(
972 10,
973 CommitDigest::MIN,
974 context.clock.timestamp_utc_ms(),
975 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
976 vec![],
977 );
978 dag_state.write().set_last_commit(last_commit);
979 assert_eq!(
980 dag_state.read().gc_round(),
981 6,
982 "GC round should have moved to round 6"
983 );
984
985 let mut block_manager =
986 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
987
988 let dag_str = "DAG {
990 Round 0 : { 4 },
991 Round 1 : { * },
992 Round 2 : { * },
993 Round 3 : { * },
994 Round 4 : { * },
995 Round 5 : { * },
996 Round 6 : { * },
997 Round 7 : {
998 A -> [*],
999 B -> [*],
1000 C -> [*],
1001 }
1002 Round 8 : {
1003 A -> [*],
1004 B -> [*],
1005 C -> [*],
1006 },
1007 Round 9 : {
1008 A -> [A8, B8, C8, D6],
1009 B -> [A8, B8, C8, D6],
1010 C -> [A8, B8, C8, D6],
1011 D -> [A8, B8, C8, D6],
1012 },
1013 Round 10 : { * },
1014 }";
1015
1016 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1017
1018 let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1022
1023 for rounds_range in blocks_ranges {
1024 let all_blocks = dag_builder
1025 .blocks
1026 .values()
1027 .filter(|block| rounds_range.contains(&block.round()))
1028 .cloned()
1029 .collect::<Vec<_>>();
1030
1031 let mut reversed_blocks = all_blocks.clone();
1033 reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1034 let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1035 accepted_blocks.sort_by_key(|a| a.reference());
1036
1037 assert_eq!(accepted_blocks, all_blocks.to_vec());
1039 assert!(missing.is_empty());
1040 assert!(block_manager.is_empty());
1041
1042 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1043 assert!(accepted_blocks.is_empty());
1044 }
1045 }
1046
1047 #[tokio::test]
1051 async fn skip_accepting_blocks_below_gc_round() {
1052 let (mut context, _key_pairs) = Context::new_for_test(4);
1054 context
1056 .protocol_config
1057 .set_consensus_gc_depth_for_testing(4);
1058 let context = Arc::new(context);
1059 let store = Arc::new(MemStore::new());
1060 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1061
1062 let last_commit = TrustedCommit::new_for_test(
1065 10,
1066 CommitDigest::MIN,
1067 context.clock.timestamp_utc_ms(),
1068 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1069 vec![],
1070 );
1071 dag_state.write().set_last_commit(last_commit);
1072 assert_eq!(
1073 dag_state.read().gc_round(),
1074 6,
1075 "GC round should have moved to round 6"
1076 );
1077
1078 let mut block_manager =
1079 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1080
1081 let mut dag_builder = DagBuilder::new(context.clone());
1083 dag_builder.layers(1..=6).build();
1084
1085 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1086
1087 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1089
1090 assert!(accepted_blocks.is_empty());
1092 assert!(missing.is_empty());
1093 assert!(block_manager.is_empty());
1094 }
1095
1096 #[rstest]
1104 #[tokio::test]
1105 async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1106 let (mut context, _key_pairs) = Context::new_for_test(4);
1108
1109 if gc_enabled {
1110 context
1111 .protocol_config
1112 .set_consensus_gc_depth_for_testing(10);
1113 }
1114 let context = Arc::new(context);
1115
1116 let mut dag_builder = DagBuilder::new(context.clone());
1118 dag_builder.layers(1..=3).build();
1119
1120 let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1121
1122 for seed in 0..100u8 {
1126 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1127
1128 let store = Arc::new(MemStore::new());
1129 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1130
1131 let mut block_manager =
1132 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1133
1134 let mut all_accepted_blocks = vec![];
1136 for block in &all_blocks {
1137 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1138
1139 all_accepted_blocks.extend(accepted_blocks);
1140 }
1141
1142 all_accepted_blocks.sort_by_key(|b| b.reference());
1144 all_blocks.sort_by_key(|b| b.reference());
1145
1146 assert_eq!(
1147 all_accepted_blocks, all_blocks,
1148 "Failed acceptance sequence for seed {seed}"
1149 );
1150 assert!(block_manager.is_empty());
1151 }
1152 }
1153
1154 #[tokio::test]
1157 async fn authorities_that_know_missing_blocks() {
1158 let (context, _key_pairs) = Context::new_for_test(4);
1159
1160 let context = Arc::new(context);
1161
1162 let mut dag_builder = DagBuilder::new(context.clone());
1164 dag_builder.layers(1..=3).build();
1165
1166 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1167
1168 let blocks_round_2 = all_blocks
1169 .iter()
1170 .filter(|block| block.round() == 2)
1171 .cloned()
1172 .collect::<Vec<_>>();
1173
1174 let blocks_round_1 = all_blocks
1175 .iter()
1176 .filter(|block| block.round() == 1)
1177 .map(|block| block.reference())
1178 .collect::<BTreeSet<_>>();
1179
1180 let store = Arc::new(MemStore::new());
1181 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1182
1183 let mut block_manager =
1184 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1185
1186 let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1187 assert_eq!(missing_blocks, blocks_round_1);
1189
1190 let missing_blocks_with_authorities = block_manager.missing_blocks();
1191
1192 let block_round_1_authority_0 = all_blocks
1193 .iter()
1194 .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1195 .map(|block| block.reference())
1196 .next()
1197 .unwrap();
1198 let block_round_1_authority_1 = all_blocks
1199 .iter()
1200 .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1201 .map(|block| block.reference())
1202 .next()
1203 .unwrap();
1204 assert_eq!(
1205 missing_blocks_with_authorities[&block_round_1_authority_0],
1206 BTreeSet::from([AuthorityIndex::new_for_test(0)])
1207 );
1208 assert_eq!(
1209 missing_blocks_with_authorities[&block_round_1_authority_1],
1210 BTreeSet::from([
1211 AuthorityIndex::new_for_test(0),
1212 AuthorityIndex::new_for_test(1)
1213 ])
1214 );
1215
1216 block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1219 let missing_blocks_with_authorities = block_manager.missing_blocks();
1220 assert_eq!(
1221 missing_blocks_with_authorities[&block_round_1_authority_0],
1222 BTreeSet::from([
1223 AuthorityIndex::new_for_test(0),
1224 AuthorityIndex::new_for_test(1)
1225 ])
1226 );
1227 }
1228
1229 #[rstest]
1230 #[tokio::test]
1231 async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1232 telemetry_subscribers::init_for_testing();
1233 let (mut context, _key_pairs) = Context::new_for_test(4);
1235
1236 if gc_depth > 0 {
1237 context
1238 .protocol_config
1239 .set_consensus_gc_depth_for_testing(gc_depth);
1240 }
1241 let context = Arc::new(context);
1242
1243 let mut dag_builder = DagBuilder::new(context.clone());
1245 dag_builder.layers(1..=gc_depth * 2).build();
1246
1247 let mut all_blocks = dag_builder
1251 .blocks
1252 .values()
1253 .filter(|block| block.round() > 1)
1254 .cloned()
1255 .collect::<Vec<_>>();
1256
1257 for seed in 0..100u8 {
1261 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1262
1263 let store = Arc::new(MemStore::new());
1264 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1265
1266 let mut block_manager = BlockManager::new(
1267 context.clone(),
1268 dag_state.clone(),
1269 Arc::new(NoopBlockVerifier),
1270 );
1271
1272 for block in &all_blocks {
1274 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1275 assert!(accepted_blocks.is_empty());
1276 }
1277 assert!(!block_manager.is_empty());
1278
1279 let non_existing_refs = (1..=3)
1282 .map(|round| {
1283 BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1284 })
1285 .collect::<Vec<_>>();
1286 assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1287
1288 let last_commit = TrustedCommit::new_for_test(
1291 gc_depth * 2,
1292 CommitDigest::MIN,
1293 context.clock.timestamp_utc_ms(),
1294 BlockRef::new(
1295 gc_depth * 2,
1296 AuthorityIndex::new_for_test(0),
1297 BlockDigest::MIN,
1298 ),
1299 vec![],
1300 );
1301 dag_state.write().set_last_commit(last_commit);
1302
1303 block_manager.try_unsuspend_blocks_for_latest_gc_round();
1305
1306 assert!(block_manager.is_empty());
1308
1309 for block in &all_blocks {
1311 assert!(dag_state.read().contains_block(&block.reference()));
1312 }
1313 }
1314 }
1315
1316 #[rstest]
1317 #[tokio::test]
1318 async fn try_accept_committed_blocks() {
1319 let (mut context, _key_pairs) = Context::new_for_test(4);
1321 context
1323 .protocol_config
1324 .set_consensus_gc_depth_for_testing(4);
1325 let context = Arc::new(context);
1326 let store = Arc::new(MemStore::new());
1327 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1328
1329 let last_commit = TrustedCommit::new_for_test(
1332 10,
1333 CommitDigest::MIN,
1334 context.clock.timestamp_utc_ms(),
1335 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1336 vec![],
1337 );
1338 dag_state.write().set_last_commit(last_commit);
1339 assert_eq!(
1340 dag_state.read().gc_round(),
1341 2,
1342 "GC round should have moved to round 2"
1343 );
1344
1345 let mut block_manager =
1346 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1347
1348 let mut dag_builder = DagBuilder::new(context.clone());
1350 dag_builder.layers(1..=12).build();
1351
1352 let blocks = dag_builder.blocks(7..=12);
1355 let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1356 assert!(accepted_blocks.is_empty());
1357 assert_eq!(missing.len(), 4);
1358
1359 let blocks = dag_builder.blocks(3..=6);
1363
1364 let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1366
1367 accepted_blocks.sort_by_key(|b| b.reference());
1369
1370 let mut all_blocks = dag_builder.blocks(3..=12);
1371 all_blocks.sort_by_key(|b| b.reference());
1372
1373 assert_eq!(accepted_blocks, all_blocks);
1374 assert!(block_manager.is_empty());
1375 }
1376
1377 struct TestBlockVerifier {
1378 fail: BTreeSet<BlockRef>,
1379 }
1380
1381 impl TestBlockVerifier {
1382 fn new(fail: BTreeSet<BlockRef>) -> Self {
1383 Self { fail }
1384 }
1385 }
1386
1387 impl BlockVerifier for TestBlockVerifier {
1388 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1389 Ok(())
1390 }
1391
1392 fn check_ancestors(
1393 &self,
1394 block: &VerifiedBlock,
1395 _ancestors: &[Option<VerifiedBlock>],
1396 _gc_enabled: bool,
1397 _gc_round: Round,
1398 ) -> ConsensusResult<()> {
1399 if self.fail.contains(&block.reference()) {
1400 Err(ConsensusError::InvalidBlockTimestamp {
1401 max_timestamp_ms: 0,
1402 block_timestamp_ms: block.timestamp_ms(),
1403 })
1404 } else {
1405 Ok(())
1406 }
1407 }
1408 }
1409
1410 #[tokio::test]
1411 async fn reject_blocks_failing_verifications() {
1412 let (context, _key_pairs) = Context::new_for_test(4);
1413 let context = Arc::new(context);
1414
1415 let mut dag_builder = DagBuilder::new(context.clone());
1417 dag_builder.layers(1..=5).build();
1418
1419 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1420
1421 let test_verifier = TestBlockVerifier::new(
1423 all_blocks
1424 .iter()
1425 .filter(|block| block.round() == 3)
1426 .map(|block| block.reference())
1427 .collect(),
1428 );
1429
1430 let store = Arc::new(MemStore::new());
1432 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1433 let mut block_manager =
1434 BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1435
1436 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1439 all_blocks
1440 .iter()
1441 .filter(|block| block.round() > 1)
1442 .cloned()
1443 .collect(),
1444 );
1445
1446 assert!(accepted_blocks.is_empty());
1448 assert_eq!(missing_refs.len(), 4);
1449 missing_refs.iter().for_each(|missing_ref| {
1450 assert_eq!(missing_ref.round, 1);
1451 });
1452
1453 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1455 all_blocks
1456 .iter()
1457 .filter(|block| block.round() == 1)
1458 .cloned()
1459 .collect(),
1460 );
1461
1462 assert_eq!(accepted_blocks.len(), 8);
1464 accepted_blocks.iter().for_each(|block| {
1465 assert!(block.round() <= 2);
1466 });
1467 assert!(missing_refs.is_empty());
1468
1469 assert!(block_manager.suspended_blocks_refs().is_empty());
1472 }
1473
1474 #[tokio::test]
1475 async fn try_find_blocks() {
1476 let (context, _key_pairs) = Context::new_for_test(4);
1478 let context = Arc::new(context);
1479 let store = Arc::new(MemStore::new());
1480 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1481
1482 let mut block_manager =
1483 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1484
1485 let mut dag_builder = DagBuilder::new(context.clone());
1487 dag_builder
1488 .layers(1..=2) .authorities(vec![
1490 AuthorityIndex::new_for_test(0),
1491 AuthorityIndex::new_for_test(2),
1492 ]) .equivocate(3)
1494 .build();
1495
1496 let round_2_blocks = dag_builder
1498 .blocks
1499 .iter()
1500 .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1501 .collect::<Vec<VerifiedBlock>>();
1502
1503 let missing_block_refs_from_find =
1505 block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1506 assert_eq!(missing_block_refs_from_find.len(), 10);
1507 assert!(
1508 missing_block_refs_from_find
1509 .iter()
1510 .all(|block_ref| block_ref.round == 2)
1511 );
1512
1513 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1516 assert!(accepted_blocks.is_empty());
1517
1518 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1519 let missing_block_refs_from_accept =
1520 missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1521 assert_eq!(missing, missing_block_refs_from_accept);
1522 assert_eq!(
1523 block_manager.missing_block_refs(),
1524 missing_block_refs_from_accept
1525 );
1526
1527 dag_builder.layer(3).build();
1532
1533 let round_3_blocks = dag_builder
1534 .blocks
1535 .iter()
1536 .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1537 .collect::<Vec<BlockRef>>();
1538
1539 let missing_block_refs_from_find = block_manager.try_find_blocks(
1540 round_2_blocks
1541 .iter()
1542 .map(|b| b.reference())
1543 .chain(round_3_blocks.into_iter())
1544 .collect(),
1545 );
1546
1547 assert_eq!(missing_block_refs_from_find.len(), 4);
1548 assert!(
1549 missing_block_refs_from_find
1550 .iter()
1551 .all(|block_ref| block_ref.round == 3)
1552 );
1553 assert_eq!(
1554 block_manager.missing_block_refs(),
1555 missing_block_refs_from_accept
1556 .into_iter()
1557 .chain(missing_block_refs_from_find.into_iter())
1558 .collect()
1559 );
1560 }
1561}