1use std::{
6 collections::{BTreeMap, BTreeSet, btree_map::Entry},
7 sync::Arc,
8 time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, instrument, trace, warn};
16
17use crate::{
18 Round,
19 block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 context::Context,
22 dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27 block: VerifiedBlock,
28 missing_ancestors: BTreeSet<BlockRef>,
29 timestamp: Instant,
30}
31
32impl SuspendedBlock {
33 pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34 Self {
35 block,
36 missing_ancestors,
37 timestamp: Instant::now(),
38 }
39 }
40}
41
42pub(crate) struct BlockManager {
48 context: Arc<Context>,
49 dag_state: Arc<RwLock<DagState>>,
50 block_verifier: Arc<dyn BlockVerifier>,
51
52 suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57 missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71 received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78 pub(crate) fn new(
79 context: Arc<Context>,
80 dag_state: Arc<RwLock<DagState>>,
81 block_verifier: Arc<dyn BlockVerifier>,
82 ) -> Self {
83 let committee_size = context.committee.size();
84 Self {
85 context,
86 dag_state,
87 block_verifier,
88 suspended_blocks: BTreeMap::new(),
89 missing_ancestors: BTreeMap::new(),
90 missing_blocks: BTreeMap::new(),
91 received_block_rounds: vec![None; committee_size],
92 }
93 }
94
95 #[tracing::instrument(skip_all)]
101 pub(crate) fn try_accept_blocks(
102 &mut self,
103 blocks: Vec<VerifiedBlock>,
104 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105 let _s = monitored_scope("BlockManager::try_accept_blocks");
106 self.try_accept_blocks_internal(blocks, false)
107 }
108
109 #[tracing::instrument(skip_all)]
112 pub(crate) fn try_accept_committed_blocks(
113 &mut self,
114 blocks: Vec<VerifiedBlock>,
115 ) -> Vec<VerifiedBlock> {
116 if !self.dag_state.read().gc_enabled() {
118 return Vec::new();
119 }
120
121 let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123 let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124 assert!(
125 missing_blocks.is_empty(),
126 "No missing blocks should be returned for committed blocks"
127 );
128
129 accepted_blocks
130 }
131
132 fn try_accept_blocks_internal(
136 &mut self,
137 mut blocks: Vec<VerifiedBlock>,
138 committed: bool,
139 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140 let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142 blocks.sort_by_key(|b| b.round());
143 debug!(
144 "Trying to accept blocks: {}",
145 blocks.iter().map(|b| b.reference().to_string()).join(",")
146 );
147
148 let mut accepted_blocks = vec![];
149 let mut missing_blocks = BTreeSet::new();
150
151 for block in blocks {
152 self.update_block_received_metrics(&block);
153
154 let block_ref = block.reference();
156
157 let mut to_verify_timestamps_and_accept = vec![];
158 if committed {
159 match self.try_accept_one_committed_block(block) {
160 TryAcceptResult::Accepted(block) => {
161 accepted_blocks.push(block);
165 }
166 TryAcceptResult::Processed => continue,
167 TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168 panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169 }
170 };
171 } else {
172 match self.try_accept_one_block(block) {
173 TryAcceptResult::Accepted(block) => {
174 to_verify_timestamps_and_accept.push(block);
175 }
176 TryAcceptResult::Suspended(ancestors_to_fetch) => {
177 debug!(
178 "Missing ancestors to fetch for block {block_ref}: {}",
179 ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180 );
181 missing_blocks.extend(ancestors_to_fetch);
182 continue;
183 }
184 TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185 };
186 };
187
188 let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190 to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192 let blocks_to_accept =
194 self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195 accepted_blocks.extend(blocks_to_accept);
196 }
197
198 self.update_stats(missing_blocks.len() as u64);
199
200 (accepted_blocks, missing_blocks)
202 }
203
204 fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205 if self.dag_state.read().contains_block(&block.reference()) {
206 return TryAcceptResult::Processed;
207 }
208
209 self.missing_blocks.remove(&block.reference());
211
212 if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218 suspended_block
219 .missing_ancestors
220 .iter()
221 .for_each(|ancestor| {
222 if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223 references.remove(&block.reference());
224 }
225 });
226 }
227
228 self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231 TryAcceptResult::Accepted(block)
232 }
233
234 pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237 let _s = monitored_scope("BlockManager::try_find_blocks");
238 let gc_round = self.dag_state.read().gc_round();
239
240 let mut block_refs = block_refs
243 .into_iter()
244 .filter(|block_ref| block_ref.round > gc_round)
245 .collect::<Vec<_>>();
246
247 if block_refs.is_empty() {
248 return BTreeSet::new();
249 }
250
251 block_refs.sort_by_key(|b| b.round);
252
253 debug!(
254 "Trying to find blocks: {}",
255 block_refs.iter().map(|b| b.to_string()).join(",")
256 );
257
258 let mut missing_blocks = BTreeSet::new();
259
260 for (found, block_ref) in self
261 .dag_state
262 .read()
263 .contains_blocks(block_refs.clone())
264 .into_iter()
265 .zip(block_refs.iter())
266 {
267 if found || self.suspended_blocks.contains_key(block_ref) {
268 continue;
269 }
270 missing_blocks.insert(*block_ref);
272 if self
273 .missing_blocks
274 .insert(*block_ref, BTreeSet::from([block_ref.author]))
275 .is_none()
276 {
277 self.missing_ancestors.entry(*block_ref).or_default();
281
282 let block_ref_hostname =
283 &self.context.committee.authority(block_ref.author).hostname;
284 self.context
285 .metrics
286 .node_metrics
287 .block_manager_missing_blocks_by_authority
288 .with_label_values(&[block_ref_hostname])
289 .inc();
290 }
291 }
292
293 let metrics = &self.context.metrics.node_metrics;
294 metrics
295 .missing_blocks_total
296 .inc_by(missing_blocks.len() as u64);
297 metrics
298 .block_manager_missing_blocks
299 .set(self.missing_blocks.len() as i64);
300
301 missing_blocks
302 }
303
304 fn verify_block_timestamps_and_accept(
309 &mut self,
310 unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
311 ) -> Vec<VerifiedBlock> {
312 let (gc_enabled, gc_round) = {
313 let dag_state = self.dag_state.read();
314 (dag_state.gc_enabled(), dag_state.gc_round())
315 };
316 let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
318 let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
319 {
320 'block: for b in unsuspended_blocks {
321 let ancestors = self.dag_state.read().get_blocks(b.ancestors());
322 assert_eq!(b.ancestors().len(), ancestors.len());
323 let mut ancestor_blocks = vec![];
324 'ancestor: for (ancestor_ref, found) in
325 b.ancestors().iter().zip(ancestors.into_iter())
326 {
327 if let Some(found_block) = found {
328 assert_eq!(ancestor_ref, &found_block.reference());
330 ancestor_blocks.push(Some(found_block));
331 continue 'ancestor;
332 }
333 if blocks_to_accept.contains_key(ancestor_ref) {
336 ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
337 continue 'ancestor;
338 }
339 if blocks_to_reject.contains_key(ancestor_ref) {
341 blocks_to_reject.insert(b.reference(), b);
342 continue 'block;
343 }
344
345 if gc_enabled
350 && ancestor_ref.round > GENESIS_ROUND
351 && ancestor_ref.round <= gc_round
352 {
353 debug!(
354 "Block {:?} has a missing ancestor: {:?} passed GC round {}",
355 b.reference(),
356 ancestor_ref,
357 gc_round
358 );
359 ancestor_blocks.push(None);
360 } else {
361 panic!(
362 "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
363 );
364 }
365 }
366 if let Err(e) =
367 self.block_verifier
368 .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
369 {
370 warn!("Block {:?} failed to verify ancestors: {}", b, e);
371 blocks_to_reject.insert(b.reference(), b);
372 } else {
373 blocks_to_accept.insert(b.reference(), b);
374 }
375 }
376 }
377
378 for (block_ref, block) in blocks_to_reject {
380 let hostname = self
381 .context
382 .committee
383 .authority(block_ref.author)
384 .hostname
385 .clone();
386
387 self.context
388 .metrics
389 .node_metrics
390 .invalid_blocks
391 .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
392 .inc();
393 warn!("Invalid block {:?} is rejected", block);
394 }
395
396 let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
397
398 self.dag_state
401 .write()
402 .accept_blocks(blocks_to_accept.clone());
403
404 blocks_to_accept
405 }
406
407 fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
412 let block_ref = block.reference();
413 let mut missing_ancestors = BTreeSet::new();
414 let mut ancestors_to_fetch = BTreeSet::new();
415 let dag_state = self.dag_state.read();
416 let gc_round = dag_state.gc_round();
417 let gc_enabled = dag_state.gc_enabled();
418
419 if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
422 return TryAcceptResult::Processed;
423 }
424
425 if gc_enabled && block.round() <= gc_round {
428 let hostname = self
429 .context
430 .committee
431 .authority(block.author())
432 .hostname
433 .as_str();
434 self.context
435 .metrics
436 .node_metrics
437 .block_manager_skipped_blocks
438 .with_label_values(&[hostname])
439 .inc();
440 return TryAcceptResult::Skipped;
441 }
442
443 let ancestors = if gc_enabled {
447 block
448 .ancestors()
449 .iter()
450 .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
451 .cloned()
452 .collect::<Vec<_>>()
453 } else {
454 block.ancestors().to_vec()
455 };
456
457 for (found, ancestor) in dag_state
459 .contains_blocks(ancestors.clone())
460 .into_iter()
461 .zip(ancestors.iter())
462 {
463 if !found {
464 missing_ancestors.insert(*ancestor);
465
466 self.missing_ancestors
468 .entry(*ancestor)
469 .or_default()
470 .insert(block_ref);
471
472 let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
473 self.context
474 .metrics
475 .node_metrics
476 .block_manager_missing_ancestors_by_authority
477 .with_label_values(&[ancestor_hostname])
478 .inc();
479
480 if !self.suspended_blocks.contains_key(ancestor) {
484 ancestors_to_fetch.insert(*ancestor);
486 let entry = self.missing_blocks.entry(*ancestor);
490 match entry {
491 Entry::Vacant(v) => {
492 v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
493 self.context
494 .metrics
495 .node_metrics
496 .block_manager_missing_blocks_by_authority
497 .with_label_values(&[ancestor_hostname])
498 .inc();
499 }
500 Entry::Occupied(mut o) => {
501 o.get_mut().insert(block_ref.author);
502 }
503 }
504 }
505 }
506 }
507
508 self.missing_blocks.remove(&block.reference());
512
513 if !missing_ancestors.is_empty() {
514 let hostname = self
515 .context
516 .committee
517 .authority(block.author())
518 .hostname
519 .as_str();
520 self.context
521 .metrics
522 .node_metrics
523 .block_suspensions
524 .with_label_values(&[hostname])
525 .inc();
526 self.suspended_blocks
527 .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
528 return TryAcceptResult::Suspended(ancestors_to_fetch);
529 }
530
531 TryAcceptResult::Accepted(block)
532 }
533
534 fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
538 let mut unsuspended_blocks = vec![];
539 let mut to_process_blocks = vec![accepted_block];
540
541 while let Some(block_ref) = to_process_blocks.pop() {
542 if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
544 for r in block_refs_with_missing_deps {
545 if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
549 to_process_blocks.push(block.block.reference());
550 unsuspended_blocks.push(block);
551 }
552 }
553 }
554 }
555
556 let now = Instant::now();
557
558 for block in &unsuspended_blocks {
560 let hostname = self
561 .context
562 .committee
563 .authority(block.block.author())
564 .hostname
565 .as_str();
566 self.context
567 .metrics
568 .node_metrics
569 .block_unsuspensions
570 .with_label_values(&[hostname])
571 .inc();
572 self.context
573 .metrics
574 .node_metrics
575 .suspended_block_time
576 .with_label_values(&[hostname])
577 .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
578 }
579
580 unsuspended_blocks
581 .into_iter()
582 .map(|block| block.block)
583 .collect()
584 }
585
586 fn try_unsuspend_block(
591 &mut self,
592 block_ref: &BlockRef,
593 accepted_dependency: &BlockRef,
594 ) -> Option<SuspendedBlock> {
595 let block = self
596 .suspended_blocks
597 .get_mut(block_ref)
598 .expect("Block should be in suspended map");
599
600 assert!(
601 block.missing_ancestors.remove(accepted_dependency),
602 "Block reference {} should be present in missing dependencies of {:?}",
603 block_ref,
604 block.block
605 );
606
607 if block.missing_ancestors.is_empty() {
608 return self.suspended_blocks.remove(block_ref);
610 }
611 None
612 }
613
614 #[instrument(level = "trace", skip_all)]
618 pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
619 let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
620 let (gc_enabled, gc_round) = {
621 let dag_state = self.dag_state.read();
622 (dag_state.gc_enabled(), dag_state.gc_round())
623 };
624 let mut blocks_unsuspended_below_gc_round = 0;
625 let mut blocks_gc_ed = 0;
626
627 if !gc_enabled {
628 trace!("GC is disabled, no blocks will attempt to get unsuspended.");
629 return;
630 }
631
632 while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
633 if block_ref.round > gc_round {
638 return;
639 }
640
641 blocks_gc_ed += 1;
642
643 let hostname = self
644 .context
645 .committee
646 .authority(block_ref.author)
647 .hostname
648 .as_str();
649 self.context
650 .metrics
651 .node_metrics
652 .block_manager_gced_blocks
653 .with_label_values(&[hostname])
654 .inc();
655
656 assert!(
657 !self.suspended_blocks.contains_key(block_ref),
658 "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
659 );
660
661 self.missing_blocks.remove(block_ref);
663
664 let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
667
668 unsuspended_blocks.iter().for_each(|block| {
669 if block.round() <= gc_round {
670 blocks_unsuspended_below_gc_round += 1;
671 }
672 });
673
674 let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
676 for block in accepted_blocks {
677 let hostname = self
678 .context
679 .committee
680 .authority(block.author())
681 .hostname
682 .as_str();
683 self.context
684 .metrics
685 .node_metrics
686 .block_manager_gc_unsuspended_blocks
687 .with_label_values(&[hostname])
688 .inc();
689 }
690 }
691
692 debug!(
693 "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
694 blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
695 );
696 }
697
698 pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
702 self.missing_blocks.clone()
703 }
704
705 #[cfg(test)]
707 pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
708 self.missing_blocks.keys().cloned().collect()
709 }
710
711 fn update_stats(&mut self, missing_blocks: u64) {
712 let metrics = &self.context.metrics.node_metrics;
713 metrics.missing_blocks_total.inc_by(missing_blocks);
714 metrics
715 .block_manager_suspended_blocks
716 .set(self.suspended_blocks.len() as i64);
717 metrics
718 .block_manager_missing_ancestors
719 .set(self.missing_ancestors.len() as i64);
720 metrics
721 .block_manager_missing_blocks
722 .set(self.missing_blocks.len() as i64);
723 }
724
725 fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
726 let (min_round, max_round) =
727 if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
728 (curr_min.min(block.round()), curr_max.max(block.round()))
729 } else {
730 (block.round(), block.round())
731 };
732 self.received_block_rounds[block.author()] = Some((min_round, max_round));
733
734 let hostname = &self.context.committee.authority(block.author()).hostname;
735 self.context
736 .metrics
737 .node_metrics
738 .lowest_verified_authority_round
739 .with_label_values(&[hostname])
740 .set(min_round.into());
741 self.context
742 .metrics
743 .node_metrics
744 .highest_verified_authority_round
745 .with_label_values(&[hostname])
746 .set(max_round.into());
747 }
748
749 #[cfg(test)]
751 pub(crate) fn is_empty(&self) -> bool {
752 self.suspended_blocks.is_empty()
753 && self.missing_ancestors.is_empty()
754 && self.missing_blocks.is_empty()
755 }
756
757 #[cfg(test)]
760 fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
761 self.suspended_blocks.keys().cloned().collect()
762 }
763}
764
765enum TryAcceptResult {
767 Accepted(VerifiedBlock),
769 Suspended(BTreeSet<BlockRef>),
771 Processed,
775 Skipped,
778}
779
780#[cfg(test)]
781mod tests {
782 use std::{collections::BTreeSet, sync::Arc};
783
784 use consensus_config::AuthorityIndex;
785 use parking_lot::RwLock;
786 use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
787 use rstest::rstest;
788
789 use crate::{
790 CommitDigest, Round,
791 block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
792 block_manager::BlockManager,
793 block_verifier::{BlockVerifier, NoopBlockVerifier},
794 commit::TrustedCommit,
795 context::Context,
796 dag_state::DagState,
797 error::{ConsensusError, ConsensusResult},
798 storage::mem_store::MemStore,
799 test_dag_builder::DagBuilder,
800 test_dag_parser::parse_dag,
801 };
802
803 #[tokio::test]
804 async fn suspend_blocks_with_missing_ancestors() {
805 let (context, _key_pairs) = Context::new_for_test(4);
807 let context = Arc::new(context);
808 let store = Arc::new(MemStore::new());
809 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
810
811 let mut block_manager =
812 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
813
814 let mut dag_builder = DagBuilder::new(context.clone());
816 dag_builder
817 .layers(1..=2) .authorities(vec![
819 AuthorityIndex::new_for_test(0),
820 AuthorityIndex::new_for_test(2),
821 ]) .equivocate(3)
823 .build();
824
825 let round_2_blocks = dag_builder
827 .blocks
828 .into_iter()
829 .filter_map(|(_, block)| (block.round() == 2).then_some(block))
830 .collect::<Vec<VerifiedBlock>>();
831
832 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
834
835 assert!(accepted_blocks.is_empty());
837
838 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
841 let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
842 assert_eq!(missing, missing_block_refs);
843
844 assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
848
849 assert_eq!(
851 block_manager.suspended_blocks_refs(),
852 round_2_blocks
853 .into_iter()
854 .map(|block| block.reference())
855 .collect::<BTreeSet<_>>()
856 );
857
858 let known_by_manager = block_manager
860 .missing_blocks()
861 .iter()
862 .next()
863 .expect("We should expect at least two elements there")
864 .1
865 .clone();
866 assert_eq!(
867 known_by_manager,
868 context
869 .committee
870 .authorities()
871 .map(|(a, _)| a)
872 .collect::<BTreeSet<_>>()
873 );
874 }
875
876 #[tokio::test]
877 async fn try_accept_block_returns_missing_blocks() {
878 let (context, _key_pairs) = Context::new_for_test(4);
879 let context = Arc::new(context);
880 let store = Arc::new(MemStore::new());
881 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
882
883 let mut block_manager =
884 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
885
886 let mut dag_builder = DagBuilder::new(context.clone());
888 dag_builder
889 .layers(1..=4) .authorities(vec![
891 AuthorityIndex::new_for_test(0),
892 AuthorityIndex::new_for_test(2),
893 ]) .equivocate(3) .build();
896
897 for (_, block) in dag_builder
900 .blocks
901 .into_iter()
902 .rev()
903 .take_while(|(_, block)| block.round() >= 2)
904 {
905 let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
907
908 assert!(accepted_blocks.is_empty());
910
911 let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
912 assert_eq!(missing, block_ancestors);
913 }
914 }
915
916 #[tokio::test]
917 async fn accept_blocks_with_complete_causal_history() {
918 let (context, _key_pairs) = Context::new_for_test(4);
920 let context = Arc::new(context);
921 let store = Arc::new(MemStore::new());
922 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
923
924 let mut block_manager =
925 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
926
927 let mut dag_builder = DagBuilder::new(context.clone());
929 dag_builder.layers(1..=2).build();
930
931 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
932
933 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
935
936 assert_eq!(accepted_blocks.len(), 8);
938 assert_eq!(
939 accepted_blocks,
940 all_blocks
941 .iter()
942 .filter(|block| block.round() > 0)
943 .cloned()
944 .collect::<Vec<VerifiedBlock>>()
945 );
946 assert!(missing.is_empty());
947 assert!(block_manager.is_empty());
948
949 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
952 assert!(accepted_blocks.is_empty());
953 }
954
955 #[tokio::test]
958 async fn accept_blocks_with_causal_history_below_gc_round() {
959 let (mut context, _key_pairs) = Context::new_for_test(4);
961
962 context
964 .protocol_config
965 .set_consensus_gc_depth_for_testing(4);
966 let context = Arc::new(context);
967 let store = Arc::new(MemStore::new());
968 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
969
970 let last_commit = TrustedCommit::new_for_test(
973 10,
974 CommitDigest::MIN,
975 context.clock.timestamp_utc_ms(),
976 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
977 vec![],
978 );
979 dag_state.write().set_last_commit(last_commit);
980 assert_eq!(
981 dag_state.read().gc_round(),
982 6,
983 "GC round should have moved to round 6"
984 );
985
986 let mut block_manager =
987 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
988
989 let dag_str = "DAG {
991 Round 0 : { 4 },
992 Round 1 : { * },
993 Round 2 : { * },
994 Round 3 : { * },
995 Round 4 : { * },
996 Round 5 : { * },
997 Round 6 : { * },
998 Round 7 : {
999 A -> [*],
1000 B -> [*],
1001 C -> [*],
1002 }
1003 Round 8 : {
1004 A -> [*],
1005 B -> [*],
1006 C -> [*],
1007 },
1008 Round 9 : {
1009 A -> [A8, B8, C8, D6],
1010 B -> [A8, B8, C8, D6],
1011 C -> [A8, B8, C8, D6],
1012 D -> [A8, B8, C8, D6],
1013 },
1014 Round 10 : { * },
1015 }";
1016
1017 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1018
1019 let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1023
1024 for rounds_range in blocks_ranges {
1025 let all_blocks = dag_builder
1026 .blocks
1027 .values()
1028 .filter(|block| rounds_range.contains(&block.round()))
1029 .cloned()
1030 .collect::<Vec<_>>();
1031
1032 let mut reversed_blocks = all_blocks.clone();
1034 reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1035 let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1036 accepted_blocks.sort_by_key(|a| a.reference());
1037
1038 assert_eq!(accepted_blocks, all_blocks.to_vec());
1040 assert!(missing.is_empty());
1041 assert!(block_manager.is_empty());
1042
1043 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1044 assert!(accepted_blocks.is_empty());
1045 }
1046 }
1047
1048 #[tokio::test]
1052 async fn skip_accepting_blocks_below_gc_round() {
1053 let (mut context, _key_pairs) = Context::new_for_test(4);
1055 context
1057 .protocol_config
1058 .set_consensus_gc_depth_for_testing(4);
1059 let context = Arc::new(context);
1060 let store = Arc::new(MemStore::new());
1061 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1062
1063 let last_commit = TrustedCommit::new_for_test(
1066 10,
1067 CommitDigest::MIN,
1068 context.clock.timestamp_utc_ms(),
1069 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1070 vec![],
1071 );
1072 dag_state.write().set_last_commit(last_commit);
1073 assert_eq!(
1074 dag_state.read().gc_round(),
1075 6,
1076 "GC round should have moved to round 6"
1077 );
1078
1079 let mut block_manager =
1080 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1081
1082 let mut dag_builder = DagBuilder::new(context.clone());
1084 dag_builder.layers(1..=6).build();
1085
1086 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1087
1088 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1090
1091 assert!(accepted_blocks.is_empty());
1093 assert!(missing.is_empty());
1094 assert!(block_manager.is_empty());
1095 }
1096
1097 #[rstest]
1105 #[tokio::test]
1106 async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1107 let (mut context, _key_pairs) = Context::new_for_test(4);
1109
1110 if gc_enabled {
1111 context
1112 .protocol_config
1113 .set_consensus_gc_depth_for_testing(10);
1114 }
1115 let context = Arc::new(context);
1116
1117 let mut dag_builder = DagBuilder::new(context.clone());
1119 dag_builder.layers(1..=3).build();
1120
1121 let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1122
1123 for seed in 0..100u8 {
1127 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1128
1129 let store = Arc::new(MemStore::new());
1130 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1131
1132 let mut block_manager =
1133 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1134
1135 let mut all_accepted_blocks = vec![];
1137 for block in &all_blocks {
1138 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1139
1140 all_accepted_blocks.extend(accepted_blocks);
1141 }
1142
1143 all_accepted_blocks.sort_by_key(|b| b.reference());
1145 all_blocks.sort_by_key(|b| b.reference());
1146
1147 assert_eq!(
1148 all_accepted_blocks, all_blocks,
1149 "Failed acceptance sequence for seed {seed}"
1150 );
1151 assert!(block_manager.is_empty());
1152 }
1153 }
1154
1155 #[tokio::test]
1158 async fn authorities_that_know_missing_blocks() {
1159 let (context, _key_pairs) = Context::new_for_test(4);
1160
1161 let context = Arc::new(context);
1162
1163 let mut dag_builder = DagBuilder::new(context.clone());
1165 dag_builder.layers(1..=3).build();
1166
1167 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1168
1169 let blocks_round_2 = all_blocks
1170 .iter()
1171 .filter(|block| block.round() == 2)
1172 .cloned()
1173 .collect::<Vec<_>>();
1174
1175 let blocks_round_1 = all_blocks
1176 .iter()
1177 .filter(|block| block.round() == 1)
1178 .map(|block| block.reference())
1179 .collect::<BTreeSet<_>>();
1180
1181 let store = Arc::new(MemStore::new());
1182 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1183
1184 let mut block_manager =
1185 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1186
1187 let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1188 assert_eq!(missing_blocks, blocks_round_1);
1190
1191 let missing_blocks_with_authorities = block_manager.missing_blocks();
1192
1193 let block_round_1_authority_0 = all_blocks
1194 .iter()
1195 .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1196 .map(|block| block.reference())
1197 .next()
1198 .unwrap();
1199 let block_round_1_authority_1 = all_blocks
1200 .iter()
1201 .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1202 .map(|block| block.reference())
1203 .next()
1204 .unwrap();
1205 assert_eq!(
1206 missing_blocks_with_authorities[&block_round_1_authority_0],
1207 BTreeSet::from([AuthorityIndex::new_for_test(0)])
1208 );
1209 assert_eq!(
1210 missing_blocks_with_authorities[&block_round_1_authority_1],
1211 BTreeSet::from([
1212 AuthorityIndex::new_for_test(0),
1213 AuthorityIndex::new_for_test(1)
1214 ])
1215 );
1216
1217 block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1220 let missing_blocks_with_authorities = block_manager.missing_blocks();
1221 assert_eq!(
1222 missing_blocks_with_authorities[&block_round_1_authority_0],
1223 BTreeSet::from([
1224 AuthorityIndex::new_for_test(0),
1225 AuthorityIndex::new_for_test(1)
1226 ])
1227 );
1228 }
1229
1230 #[rstest]
1231 #[tokio::test]
1232 async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1233 telemetry_subscribers::init_for_testing();
1234 let (mut context, _key_pairs) = Context::new_for_test(4);
1236
1237 if gc_depth > 0 {
1238 context
1239 .protocol_config
1240 .set_consensus_gc_depth_for_testing(gc_depth);
1241 }
1242 let context = Arc::new(context);
1243
1244 let mut dag_builder = DagBuilder::new(context.clone());
1246 dag_builder.layers(1..=gc_depth * 2).build();
1247
1248 let mut all_blocks = dag_builder
1252 .blocks
1253 .values()
1254 .filter(|block| block.round() > 1)
1255 .cloned()
1256 .collect::<Vec<_>>();
1257
1258 for seed in 0..100u8 {
1262 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1263
1264 let store = Arc::new(MemStore::new());
1265 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1266
1267 let mut block_manager = BlockManager::new(
1268 context.clone(),
1269 dag_state.clone(),
1270 Arc::new(NoopBlockVerifier),
1271 );
1272
1273 for block in &all_blocks {
1275 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1276 assert!(accepted_blocks.is_empty());
1277 }
1278 assert!(!block_manager.is_empty());
1279
1280 let non_existing_refs = (1..=3)
1283 .map(|round| {
1284 BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1285 })
1286 .collect::<Vec<_>>();
1287 assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1288
1289 let last_commit = TrustedCommit::new_for_test(
1292 gc_depth * 2,
1293 CommitDigest::MIN,
1294 context.clock.timestamp_utc_ms(),
1295 BlockRef::new(
1296 gc_depth * 2,
1297 AuthorityIndex::new_for_test(0),
1298 BlockDigest::MIN,
1299 ),
1300 vec![],
1301 );
1302 dag_state.write().set_last_commit(last_commit);
1303
1304 block_manager.try_unsuspend_blocks_for_latest_gc_round();
1306
1307 assert!(block_manager.is_empty());
1309
1310 for block in &all_blocks {
1312 assert!(dag_state.read().contains_block(&block.reference()));
1313 }
1314 }
1315 }
1316
1317 #[rstest]
1318 #[tokio::test]
1319 async fn try_accept_committed_blocks() {
1320 let (mut context, _key_pairs) = Context::new_for_test(4);
1322 context
1324 .protocol_config
1325 .set_consensus_gc_depth_for_testing(4);
1326 let context = Arc::new(context);
1327 let store = Arc::new(MemStore::new());
1328 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1329
1330 let last_commit = TrustedCommit::new_for_test(
1333 10,
1334 CommitDigest::MIN,
1335 context.clock.timestamp_utc_ms(),
1336 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1337 vec![],
1338 );
1339 dag_state.write().set_last_commit(last_commit);
1340 assert_eq!(
1341 dag_state.read().gc_round(),
1342 2,
1343 "GC round should have moved to round 2"
1344 );
1345
1346 let mut block_manager =
1347 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1348
1349 let mut dag_builder = DagBuilder::new(context.clone());
1351 dag_builder.layers(1..=12).build();
1352
1353 let blocks = dag_builder.blocks(7..=12);
1356 let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1357 assert!(accepted_blocks.is_empty());
1358 assert_eq!(missing.len(), 4);
1359
1360 let blocks = dag_builder.blocks(3..=6);
1364
1365 let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1367
1368 accepted_blocks.sort_by_key(|b| b.reference());
1370
1371 let mut all_blocks = dag_builder.blocks(3..=12);
1372 all_blocks.sort_by_key(|b| b.reference());
1373
1374 assert_eq!(accepted_blocks, all_blocks);
1375 assert!(block_manager.is_empty());
1376 }
1377
1378 struct TestBlockVerifier {
1379 fail: BTreeSet<BlockRef>,
1380 }
1381
1382 impl TestBlockVerifier {
1383 fn new(fail: BTreeSet<BlockRef>) -> Self {
1384 Self { fail }
1385 }
1386 }
1387
1388 impl BlockVerifier for TestBlockVerifier {
1389 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1390 Ok(())
1391 }
1392
1393 fn check_ancestors(
1394 &self,
1395 block: &VerifiedBlock,
1396 _ancestors: &[Option<VerifiedBlock>],
1397 _gc_enabled: bool,
1398 _gc_round: Round,
1399 ) -> ConsensusResult<()> {
1400 if self.fail.contains(&block.reference()) {
1401 Err(ConsensusError::InvalidBlockTimestamp {
1402 max_timestamp_ms: 0,
1403 block_timestamp_ms: block.timestamp_ms(),
1404 })
1405 } else {
1406 Ok(())
1407 }
1408 }
1409 }
1410
1411 #[tokio::test]
1412 async fn reject_blocks_failing_verifications() {
1413 let (context, _key_pairs) = Context::new_for_test(4);
1414 let context = Arc::new(context);
1415
1416 let mut dag_builder = DagBuilder::new(context.clone());
1418 dag_builder.layers(1..=5).build();
1419
1420 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1421
1422 let test_verifier = TestBlockVerifier::new(
1424 all_blocks
1425 .iter()
1426 .filter(|block| block.round() == 3)
1427 .map(|block| block.reference())
1428 .collect(),
1429 );
1430
1431 let store = Arc::new(MemStore::new());
1433 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1434 let mut block_manager =
1435 BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1436
1437 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1440 all_blocks
1441 .iter()
1442 .filter(|block| block.round() > 1)
1443 .cloned()
1444 .collect(),
1445 );
1446
1447 assert!(accepted_blocks.is_empty());
1449 assert_eq!(missing_refs.len(), 4);
1450 missing_refs.iter().for_each(|missing_ref| {
1451 assert_eq!(missing_ref.round, 1);
1452 });
1453
1454 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1456 all_blocks
1457 .iter()
1458 .filter(|block| block.round() == 1)
1459 .cloned()
1460 .collect(),
1461 );
1462
1463 assert_eq!(accepted_blocks.len(), 8);
1465 accepted_blocks.iter().for_each(|block| {
1466 assert!(block.round() <= 2);
1467 });
1468 assert!(missing_refs.is_empty());
1469
1470 assert!(block_manager.suspended_blocks_refs().is_empty());
1473 }
1474
1475 #[tokio::test]
1476 async fn try_find_blocks() {
1477 let (context, _key_pairs) = Context::new_for_test(4);
1479 let context = Arc::new(context);
1480 let store = Arc::new(MemStore::new());
1481 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1482
1483 let mut block_manager =
1484 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1485
1486 let mut dag_builder = DagBuilder::new(context.clone());
1488 dag_builder
1489 .layers(1..=2) .authorities(vec![
1491 AuthorityIndex::new_for_test(0),
1492 AuthorityIndex::new_for_test(2),
1493 ]) .equivocate(3)
1495 .build();
1496
1497 let round_2_blocks = dag_builder
1499 .blocks
1500 .iter()
1501 .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1502 .collect::<Vec<VerifiedBlock>>();
1503
1504 let missing_block_refs_from_find =
1506 block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1507 assert_eq!(missing_block_refs_from_find.len(), 10);
1508 assert!(
1509 missing_block_refs_from_find
1510 .iter()
1511 .all(|block_ref| block_ref.round == 2)
1512 );
1513
1514 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1517 assert!(accepted_blocks.is_empty());
1518
1519 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1520 let missing_block_refs_from_accept =
1521 missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1522 assert_eq!(missing, missing_block_refs_from_accept);
1523 assert_eq!(
1524 block_manager.missing_block_refs(),
1525 missing_block_refs_from_accept
1526 );
1527
1528 dag_builder.layer(3).build();
1533
1534 let round_3_blocks = dag_builder
1535 .blocks
1536 .iter()
1537 .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1538 .collect::<Vec<BlockRef>>();
1539
1540 let missing_block_refs_from_find = block_manager.try_find_blocks(
1541 round_2_blocks
1542 .iter()
1543 .map(|b| b.reference())
1544 .chain(round_3_blocks.into_iter())
1545 .collect(),
1546 );
1547
1548 assert_eq!(missing_block_refs_from_find.len(), 4);
1549 assert!(
1550 missing_block_refs_from_find
1551 .iter()
1552 .all(|block_ref| block_ref.round == 3)
1553 );
1554 assert_eq!(
1555 block_manager.missing_block_refs(),
1556 missing_block_refs_from_accept
1557 .into_iter()
1558 .chain(missing_block_refs_from_find.into_iter())
1559 .collect()
1560 );
1561 }
1562}