1use std::{
6 collections::{BTreeMap, BTreeSet},
7 sync::Arc,
8 time::Instant,
9};
10
11use iota_metrics::monitored_scope;
12use itertools::Itertools as _;
13use parking_lot::RwLock;
14use tracing::{debug, trace, warn};
15
16use crate::{
17 Round,
18 block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
19 block_verifier::BlockVerifier,
20 context::Context,
21 dag_state::DagState,
22};
23
24struct SuspendedBlock {
25 block: VerifiedBlock,
26 missing_ancestors: BTreeSet<BlockRef>,
27 timestamp: Instant,
28}
29
30impl SuspendedBlock {
31 fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
32 Self {
33 block,
34 missing_ancestors,
35 timestamp: Instant::now(),
36 }
37 }
38}
39
40pub(crate) struct BlockManager {
46 context: Arc<Context>,
47 dag_state: Arc<RwLock<DagState>>,
48 block_verifier: Arc<dyn BlockVerifier>,
49
50 suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
55 missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
62 missing_blocks: BTreeSet<BlockRef>,
66 received_block_rounds: Vec<Option<(Round, Round)>>,
70}
71
72impl BlockManager {
73 pub(crate) fn new(
74 context: Arc<Context>,
75 dag_state: Arc<RwLock<DagState>>,
76 block_verifier: Arc<dyn BlockVerifier>,
77 ) -> Self {
78 let committee_size = context.committee.size();
79 Self {
80 context,
81 dag_state,
82 block_verifier,
83 suspended_blocks: BTreeMap::new(),
84 missing_ancestors: BTreeMap::new(),
85 missing_blocks: BTreeSet::new(),
86 received_block_rounds: vec![None; committee_size],
87 }
88 }
89
90 #[tracing::instrument(skip_all)]
96 pub(crate) fn try_accept_blocks(
97 &mut self,
98 blocks: Vec<VerifiedBlock>,
99 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
100 let _s = monitored_scope("BlockManager::try_accept_blocks");
101 self.try_accept_blocks_internal(blocks, false)
102 }
103
104 #[tracing::instrument(skip_all)]
107 pub(crate) fn try_accept_committed_blocks(
108 &mut self,
109 blocks: Vec<VerifiedBlock>,
110 ) -> Vec<VerifiedBlock> {
111 if !self.dag_state.read().gc_enabled() {
113 return Vec::new();
114 }
115
116 let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
118 let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
119 assert!(
120 missing_blocks.is_empty(),
121 "No missing blocks should be returned for committed blocks"
122 );
123
124 accepted_blocks
125 }
126
127 fn try_accept_blocks_internal(
131 &mut self,
132 mut blocks: Vec<VerifiedBlock>,
133 committed: bool,
134 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
135 let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
136
137 blocks.sort_by_key(|b| b.round());
138 debug!(
139 "Trying to accept blocks: {}",
140 blocks.iter().map(|b| b.reference().to_string()).join(",")
141 );
142
143 let mut accepted_blocks = vec![];
144 let mut missing_blocks = BTreeSet::new();
145
146 for block in blocks {
147 self.update_block_received_metrics(&block);
148
149 let block_ref = block.reference();
151
152 let mut to_verify_timestamps_and_accept = vec![];
153 if committed {
154 match self.try_accept_one_committed_block(block) {
155 TryAcceptResult::Accepted(block) => {
156 accepted_blocks.push(block);
160 }
161 TryAcceptResult::Processed => continue,
162 TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!(
163 "Did not expect to suspend or skip a committed block: {:?}",
164 block_ref
165 ),
166 };
167 } else {
168 match self.try_accept_one_block(block) {
169 TryAcceptResult::Accepted(block) => {
170 to_verify_timestamps_and_accept.push(block);
171 }
172 TryAcceptResult::Suspended(ancestors_to_fetch) => {
173 debug!(
174 "Missing ancestors to fetch for block {block_ref}: {}",
175 ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
176 );
177 missing_blocks.extend(ancestors_to_fetch);
178 continue;
179 }
180 TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
181 };
182 };
183
184 let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
186 to_verify_timestamps_and_accept.extend(unsuspended_blocks);
187
188 let blocks_to_accept =
190 self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
191 accepted_blocks.extend(blocks_to_accept);
192 }
193
194 self.update_stats(missing_blocks.len() as u64);
195
196 (accepted_blocks, missing_blocks)
198 }
199
200 fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
201 if self.dag_state.read().contains_block(&block.reference()) {
202 return TryAcceptResult::Processed;
203 }
204
205 self.missing_blocks.remove(&block.reference());
207
208 if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
214 suspended_block
215 .missing_ancestors
216 .iter()
217 .for_each(|ancestor| {
218 if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
219 references.remove(&block.reference());
220 }
221 });
222 }
223
224 self.dag_state.write().accept_blocks(vec![block.clone()]);
226
227 TryAcceptResult::Accepted(block)
228 }
229
230 pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
233 let _s = monitored_scope("BlockManager::try_find_blocks");
234 let gc_round = self.dag_state.read().gc_round();
235
236 let mut block_refs = block_refs
239 .into_iter()
240 .filter(|block_ref| block_ref.round > gc_round)
241 .collect::<Vec<_>>();
242
243 if block_refs.is_empty() {
244 return BTreeSet::new();
245 }
246
247 block_refs.sort_by_key(|b| b.round);
248
249 debug!(
250 "Trying to find blocks: {}",
251 block_refs.iter().map(|b| b.to_string()).join(",")
252 );
253
254 let mut missing_blocks = BTreeSet::new();
255
256 for (found, block_ref) in self
257 .dag_state
258 .read()
259 .contains_blocks(block_refs.clone())
260 .into_iter()
261 .zip(block_refs.iter())
262 {
263 if found || self.suspended_blocks.contains_key(block_ref) {
264 continue;
265 }
266 missing_blocks.insert(*block_ref);
268 if self.missing_blocks.insert(*block_ref) {
269 self.missing_ancestors.entry(*block_ref).or_default();
273
274 let block_ref_hostname =
275 &self.context.committee.authority(block_ref.author).hostname;
276 self.context
277 .metrics
278 .node_metrics
279 .block_manager_missing_blocks_by_authority
280 .with_label_values(&[block_ref_hostname])
281 .inc();
282 }
283 }
284
285 let metrics = &self.context.metrics.node_metrics;
286 metrics
287 .missing_blocks_total
288 .inc_by(missing_blocks.len() as u64);
289 metrics
290 .block_manager_missing_blocks
291 .set(self.missing_blocks.len() as i64);
292
293 missing_blocks
294 }
295
296 fn verify_block_timestamps_and_accept(
301 &mut self,
302 unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
303 ) -> Vec<VerifiedBlock> {
304 let (gc_enabled, gc_round) = {
305 let dag_state = self.dag_state.read();
306 (dag_state.gc_enabled(), dag_state.gc_round())
307 };
308 let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
310 let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
311 {
312 'block: for b in unsuspended_blocks {
313 let ancestors = self.dag_state.read().get_blocks(b.ancestors());
314 assert_eq!(b.ancestors().len(), ancestors.len());
315 let mut ancestor_blocks = vec![];
316 'ancestor: for (ancestor_ref, found) in
317 b.ancestors().iter().zip(ancestors.into_iter())
318 {
319 if let Some(found_block) = found {
320 assert_eq!(ancestor_ref, &found_block.reference());
322 ancestor_blocks.push(Some(found_block));
323 continue 'ancestor;
324 }
325 if blocks_to_accept.contains_key(ancestor_ref) {
328 ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
329 continue 'ancestor;
330 }
331 if blocks_to_reject.contains_key(ancestor_ref) {
333 blocks_to_reject.insert(b.reference(), b);
334 continue 'block;
335 }
336
337 if gc_enabled
342 && ancestor_ref.round > GENESIS_ROUND
343 && ancestor_ref.round <= gc_round
344 {
345 debug!(
346 "Block {:?} has a missing ancestor: {:?} passed GC round {}",
347 b.reference(),
348 ancestor_ref,
349 gc_round
350 );
351 ancestor_blocks.push(None);
352 } else {
353 panic!(
354 "Unsuspended block {:?} has a missing ancestor! Ancestor not found in DagState: {:?}",
355 b, ancestor_ref
356 );
357 }
358 }
359 if let Err(e) =
360 self.block_verifier
361 .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
362 {
363 warn!("Block {:?} failed to verify ancestors: {}", b, e);
364 blocks_to_reject.insert(b.reference(), b);
365 } else {
366 blocks_to_accept.insert(b.reference(), b);
367 }
368 }
369 }
370
371 for (block_ref, block) in blocks_to_reject {
373 let hostname = self
374 .context
375 .committee
376 .authority(block_ref.author)
377 .hostname
378 .clone();
379
380 self.context
381 .metrics
382 .node_metrics
383 .invalid_blocks
384 .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
385 .inc();
386 warn!("Invalid block {:?} is rejected", block);
387 }
388
389 let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
390
391 self.dag_state
394 .write()
395 .accept_blocks(blocks_to_accept.clone());
396
397 blocks_to_accept
398 }
399
400 fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
405 let block_ref = block.reference();
406 let mut missing_ancestors = BTreeSet::new();
407 let mut ancestors_to_fetch = BTreeSet::new();
408 let dag_state = self.dag_state.read();
409 let gc_round = dag_state.gc_round();
410 let gc_enabled = dag_state.gc_enabled();
411
412 if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
415 return TryAcceptResult::Processed;
416 }
417
418 if gc_enabled && block.round() <= gc_round {
421 let hostname = self
422 .context
423 .committee
424 .authority(block.author())
425 .hostname
426 .as_str();
427 self.context
428 .metrics
429 .node_metrics
430 .block_manager_skipped_blocks
431 .with_label_values(&[hostname])
432 .inc();
433 return TryAcceptResult::Skipped;
434 }
435
436 let ancestors = if gc_enabled {
440 block
441 .ancestors()
442 .iter()
443 .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
444 .cloned()
445 .collect::<Vec<_>>()
446 } else {
447 block.ancestors().to_vec()
448 };
449
450 for (found, ancestor) in dag_state
452 .contains_blocks(ancestors.clone())
453 .into_iter()
454 .zip(ancestors.iter())
455 {
456 if !found {
457 missing_ancestors.insert(*ancestor);
458
459 self.missing_ancestors
461 .entry(*ancestor)
462 .or_default()
463 .insert(block_ref);
464
465 let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
466 self.context
467 .metrics
468 .node_metrics
469 .block_manager_missing_ancestors_by_authority
470 .with_label_values(&[ancestor_hostname])
471 .inc();
472
473 if !self.suspended_blocks.contains_key(ancestor) {
477 ancestors_to_fetch.insert(*ancestor);
479 if self.missing_blocks.insert(*ancestor) {
480 self.context
481 .metrics
482 .node_metrics
483 .block_manager_missing_blocks_by_authority
484 .with_label_values(&[ancestor_hostname])
485 .inc();
486 }
487 }
488 }
489 }
490
491 self.missing_blocks.remove(&block.reference());
495
496 if !missing_ancestors.is_empty() {
497 let hostname = self
498 .context
499 .committee
500 .authority(block.author())
501 .hostname
502 .as_str();
503 self.context
504 .metrics
505 .node_metrics
506 .block_suspensions
507 .with_label_values(&[hostname])
508 .inc();
509 self.suspended_blocks
510 .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
511 return TryAcceptResult::Suspended(ancestors_to_fetch);
512 }
513
514 TryAcceptResult::Accepted(block)
515 }
516
517 fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
521 let mut unsuspended_blocks = vec![];
522 let mut to_process_blocks = vec![accepted_block];
523
524 while let Some(block_ref) = to_process_blocks.pop() {
525 if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
527 for r in block_refs_with_missing_deps {
528 if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
532 to_process_blocks.push(block.block.reference());
533 unsuspended_blocks.push(block);
534 }
535 }
536 }
537 }
538
539 let now = Instant::now();
540
541 for block in &unsuspended_blocks {
543 let hostname = self
544 .context
545 .committee
546 .authority(block.block.author())
547 .hostname
548 .as_str();
549 self.context
550 .metrics
551 .node_metrics
552 .block_unsuspensions
553 .with_label_values(&[hostname])
554 .inc();
555 self.context
556 .metrics
557 .node_metrics
558 .suspended_block_time
559 .with_label_values(&[hostname])
560 .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
561 }
562
563 unsuspended_blocks
564 .into_iter()
565 .map(|block| block.block)
566 .collect()
567 }
568
569 fn try_unsuspend_block(
574 &mut self,
575 block_ref: &BlockRef,
576 accepted_dependency: &BlockRef,
577 ) -> Option<SuspendedBlock> {
578 let block = self
579 .suspended_blocks
580 .get_mut(block_ref)
581 .expect("Block should be in suspended map");
582
583 assert!(
584 block.missing_ancestors.remove(accepted_dependency),
585 "Block reference {} should be present in missing dependencies of {:?}",
586 block_ref,
587 block.block
588 );
589
590 if block.missing_ancestors.is_empty() {
591 return self.suspended_blocks.remove(block_ref);
593 }
594 None
595 }
596
597 pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
601 let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
602 let (gc_enabled, gc_round) = {
603 let dag_state = self.dag_state.read();
604 (dag_state.gc_enabled(), dag_state.gc_round())
605 };
606 let mut blocks_unsuspended_below_gc_round = 0;
607 let mut blocks_gc_ed = 0;
608
609 if !gc_enabled {
610 trace!("GC is disabled, no blocks will attempt to get unsuspended.");
611 return;
612 }
613
614 while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
615 if block_ref.round > gc_round {
620 return;
621 }
622
623 blocks_gc_ed += 1;
624
625 let hostname = self
626 .context
627 .committee
628 .authority(block_ref.author)
629 .hostname
630 .as_str();
631 self.context
632 .metrics
633 .node_metrics
634 .block_manager_gced_blocks
635 .with_label_values(&[hostname])
636 .inc();
637
638 assert!(
639 !self.suspended_blocks.contains_key(block_ref),
640 "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
641 );
642
643 self.missing_blocks.remove(block_ref);
645
646 let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
649
650 unsuspended_blocks.iter().for_each(|block| {
651 if block.round() <= gc_round {
652 blocks_unsuspended_below_gc_round += 1;
653 }
654 });
655
656 let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
658 for block in accepted_blocks {
659 let hostname = self
660 .context
661 .committee
662 .authority(block.author())
663 .hostname
664 .as_str();
665 self.context
666 .metrics
667 .node_metrics
668 .block_manager_gc_unsuspended_blocks
669 .with_label_values(&[hostname])
670 .inc();
671 }
672 }
673
674 debug!(
675 "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
676 blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
677 );
678 }
679
680 pub(crate) fn missing_blocks(&self) -> BTreeSet<BlockRef> {
683 self.missing_blocks.clone()
684 }
685
686 fn update_stats(&mut self, missing_blocks: u64) {
687 let metrics = &self.context.metrics.node_metrics;
688 metrics.missing_blocks_total.inc_by(missing_blocks);
689 metrics
690 .block_manager_suspended_blocks
691 .set(self.suspended_blocks.len() as i64);
692 metrics
693 .block_manager_missing_ancestors
694 .set(self.missing_ancestors.len() as i64);
695 metrics
696 .block_manager_missing_blocks
697 .set(self.missing_blocks.len() as i64);
698 }
699
700 fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
701 let (min_round, max_round) =
702 if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
703 (curr_min.min(block.round()), curr_max.max(block.round()))
704 } else {
705 (block.round(), block.round())
706 };
707 self.received_block_rounds[block.author()] = Some((min_round, max_round));
708
709 let hostname = &self.context.committee.authority(block.author()).hostname;
710 self.context
711 .metrics
712 .node_metrics
713 .lowest_verified_authority_round
714 .with_label_values(&[hostname])
715 .set(min_round.into());
716 self.context
717 .metrics
718 .node_metrics
719 .highest_verified_authority_round
720 .with_label_values(&[hostname])
721 .set(max_round.into());
722 }
723
724 #[cfg(test)]
726 pub(crate) fn is_empty(&self) -> bool {
727 self.suspended_blocks.is_empty()
728 && self.missing_ancestors.is_empty()
729 && self.missing_blocks.is_empty()
730 }
731
732 #[cfg(test)]
735 fn suspended_blocks(&self) -> Vec<BlockRef> {
736 self.suspended_blocks.keys().cloned().collect()
737 }
738}
739
740enum TryAcceptResult {
742 Accepted(VerifiedBlock),
744 Suspended(BTreeSet<BlockRef>),
746 Processed,
750 Skipped,
753}
754
755#[cfg(test)]
756mod tests {
757 use std::{collections::BTreeSet, sync::Arc};
758
759 use consensus_config::AuthorityIndex;
760 use parking_lot::RwLock;
761 use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
762 use rstest::rstest;
763
764 use crate::{
765 CommitDigest, Round,
766 block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
767 block_manager::BlockManager,
768 block_verifier::{BlockVerifier, NoopBlockVerifier},
769 commit::TrustedCommit,
770 context::Context,
771 dag_state::DagState,
772 error::{ConsensusError, ConsensusResult},
773 storage::mem_store::MemStore,
774 test_dag_builder::DagBuilder,
775 test_dag_parser::parse_dag,
776 };
777
778 #[tokio::test]
779 async fn suspend_blocks_with_missing_ancestors() {
780 let (context, _key_pairs) = Context::new_for_test(4);
782 let context = Arc::new(context);
783 let store = Arc::new(MemStore::new());
784 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
785
786 let mut block_manager =
787 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
788
789 let mut dag_builder = DagBuilder::new(context.clone());
791 dag_builder
792 .layers(1..=2) .authorities(vec![
794 AuthorityIndex::new_for_test(0),
795 AuthorityIndex::new_for_test(2),
796 ]) .equivocate(3)
798 .build();
799
800 let round_2_blocks = dag_builder
802 .blocks
803 .into_iter()
804 .filter_map(|(_, block)| (block.round() == 2).then_some(block))
805 .collect::<Vec<VerifiedBlock>>();
806
807 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
809
810 assert!(accepted_blocks.is_empty());
812
813 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
816 let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
817 assert_eq!(missing, missing_block_refs);
818
819 assert_eq!(block_manager.missing_blocks(), missing_block_refs);
823
824 assert_eq!(
826 block_manager.suspended_blocks(),
827 round_2_blocks
828 .into_iter()
829 .map(|block| block.reference())
830 .collect::<Vec<_>>()
831 );
832 }
833
834 #[tokio::test]
835 async fn try_accept_block_returns_missing_blocks() {
836 let (context, _key_pairs) = Context::new_for_test(4);
837 let context = Arc::new(context);
838 let store = Arc::new(MemStore::new());
839 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
840
841 let mut block_manager =
842 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
843
844 let mut dag_builder = DagBuilder::new(context.clone());
846 dag_builder
847 .layers(1..=4) .authorities(vec![
849 AuthorityIndex::new_for_test(0),
850 AuthorityIndex::new_for_test(2),
851 ]) .equivocate(3) .build();
854
855 for (_, block) in dag_builder
858 .blocks
859 .into_iter()
860 .rev()
861 .take_while(|(_, block)| block.round() >= 2)
862 {
863 let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
865
866 assert!(accepted_blocks.is_empty());
868
869 let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
870 assert_eq!(missing, block_ancestors);
871 }
872 }
873
874 #[tokio::test]
875 async fn accept_blocks_with_complete_causal_history() {
876 let (context, _key_pairs) = Context::new_for_test(4);
878 let context = Arc::new(context);
879 let store = Arc::new(MemStore::new());
880 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
881
882 let mut block_manager =
883 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
884
885 let mut dag_builder = DagBuilder::new(context.clone());
887 dag_builder.layers(1..=2).build();
888
889 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
890
891 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
893
894 assert_eq!(accepted_blocks.len(), 8);
896 assert_eq!(
897 accepted_blocks,
898 all_blocks
899 .iter()
900 .filter(|block| block.round() > 0)
901 .cloned()
902 .collect::<Vec<VerifiedBlock>>()
903 );
904 assert!(missing.is_empty());
905 assert!(block_manager.is_empty());
906
907 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
910 assert!(accepted_blocks.is_empty());
911 }
912
913 #[tokio::test]
916 async fn accept_blocks_with_causal_history_below_gc_round() {
917 let (mut context, _key_pairs) = Context::new_for_test(4);
919
920 context
922 .protocol_config
923 .set_consensus_gc_depth_for_testing(4);
924 let context = Arc::new(context);
925 let store = Arc::new(MemStore::new());
926 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
927
928 let last_commit = TrustedCommit::new_for_test(
931 10,
932 CommitDigest::MIN,
933 context.clock.timestamp_utc_ms(),
934 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
935 vec![],
936 );
937 dag_state.write().set_last_commit(last_commit);
938 assert_eq!(
939 dag_state.read().gc_round(),
940 6,
941 "GC round should have moved to round 6"
942 );
943
944 let mut block_manager =
945 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
946
947 let dag_str = "DAG {
949 Round 0 : { 4 },
950 Round 1 : { * },
951 Round 2 : { * },
952 Round 3 : { * },
953 Round 4 : { * },
954 Round 5 : { * },
955 Round 6 : { * },
956 Round 7 : {
957 A -> [*],
958 B -> [*],
959 C -> [*],
960 }
961 Round 8 : {
962 A -> [*],
963 B -> [*],
964 C -> [*],
965 },
966 Round 9 : {
967 A -> [A8, B8, C8, D6],
968 B -> [A8, B8, C8, D6],
969 C -> [A8, B8, C8, D6],
970 D -> [A8, B8, C8, D6],
971 },
972 Round 10 : { * },
973 }";
974
975 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
976
977 let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
981
982 for rounds_range in blocks_ranges {
983 let all_blocks = dag_builder
984 .blocks
985 .values()
986 .filter(|block| rounds_range.contains(&block.round()))
987 .cloned()
988 .collect::<Vec<_>>();
989
990 let mut reversed_blocks = all_blocks.clone();
992 reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
993 let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
994 accepted_blocks.sort_by_key(|a| a.reference());
995
996 assert_eq!(accepted_blocks, all_blocks.to_vec());
998 assert!(missing.is_empty());
999 assert!(block_manager.is_empty());
1000
1001 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1002 assert!(accepted_blocks.is_empty());
1003 }
1004 }
1005
1006 #[tokio::test]
1010 async fn skip_accepting_blocks_below_gc_round() {
1011 let (mut context, _key_pairs) = Context::new_for_test(4);
1013 context
1015 .protocol_config
1016 .set_consensus_gc_depth_for_testing(4);
1017 let context = Arc::new(context);
1018 let store = Arc::new(MemStore::new());
1019 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1020
1021 let last_commit = TrustedCommit::new_for_test(
1024 10,
1025 CommitDigest::MIN,
1026 context.clock.timestamp_utc_ms(),
1027 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1028 vec![],
1029 );
1030 dag_state.write().set_last_commit(last_commit);
1031 assert_eq!(
1032 dag_state.read().gc_round(),
1033 6,
1034 "GC round should have moved to round 6"
1035 );
1036
1037 let mut block_manager =
1038 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1039
1040 let mut dag_builder = DagBuilder::new(context.clone());
1042 dag_builder.layers(1..=6).build();
1043
1044 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1045
1046 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1048
1049 assert!(accepted_blocks.is_empty());
1051 assert!(missing.is_empty());
1052 assert!(block_manager.is_empty());
1053 }
1054
1055 #[rstest]
1063 #[tokio::test]
1064 async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1065 let (mut context, _key_pairs) = Context::new_for_test(4);
1067
1068 if gc_enabled {
1069 context
1070 .protocol_config
1071 .set_consensus_gc_depth_for_testing(10);
1072 }
1073 let context = Arc::new(context);
1074
1075 let mut dag_builder = DagBuilder::new(context.clone());
1077 dag_builder.layers(1..=3).build();
1078
1079 let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1080
1081 for seed in 0..100u8 {
1085 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1086
1087 let store = Arc::new(MemStore::new());
1088 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1089
1090 let mut block_manager =
1091 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1092
1093 let mut all_accepted_blocks = vec![];
1095 for block in &all_blocks {
1096 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1097
1098 all_accepted_blocks.extend(accepted_blocks);
1099 }
1100
1101 all_accepted_blocks.sort_by_key(|b| b.reference());
1103 all_blocks.sort_by_key(|b| b.reference());
1104
1105 assert_eq!(
1106 all_accepted_blocks, all_blocks,
1107 "Failed acceptance sequence for seed {}",
1108 seed
1109 );
1110 assert!(block_manager.is_empty());
1111 }
1112 }
1113
1114 #[rstest]
1115 #[tokio::test]
1116 async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1117 telemetry_subscribers::init_for_testing();
1118 let (mut context, _key_pairs) = Context::new_for_test(4);
1120
1121 if gc_depth > 0 {
1122 context
1123 .protocol_config
1124 .set_consensus_gc_depth_for_testing(gc_depth);
1125 }
1126 let context = Arc::new(context);
1127
1128 let mut dag_builder = DagBuilder::new(context.clone());
1130 dag_builder.layers(1..=gc_depth * 2).build();
1131
1132 let mut all_blocks = dag_builder
1136 .blocks
1137 .values()
1138 .filter(|block| block.round() > 1)
1139 .cloned()
1140 .collect::<Vec<_>>();
1141
1142 for seed in 0..100u8 {
1146 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1147
1148 let store = Arc::new(MemStore::new());
1149 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1150
1151 let mut block_manager = BlockManager::new(
1152 context.clone(),
1153 dag_state.clone(),
1154 Arc::new(NoopBlockVerifier),
1155 );
1156
1157 for block in &all_blocks {
1159 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1160 assert!(accepted_blocks.is_empty());
1161 }
1162 assert!(!block_manager.is_empty());
1163
1164 let non_existing_refs = (1..=3)
1167 .map(|round| {
1168 BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1169 })
1170 .collect::<Vec<_>>();
1171 assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1172
1173 let last_commit = TrustedCommit::new_for_test(
1176 gc_depth * 2,
1177 CommitDigest::MIN,
1178 context.clock.timestamp_utc_ms(),
1179 BlockRef::new(
1180 gc_depth * 2,
1181 AuthorityIndex::new_for_test(0),
1182 BlockDigest::MIN,
1183 ),
1184 vec![],
1185 );
1186 dag_state.write().set_last_commit(last_commit);
1187
1188 block_manager.try_unsuspend_blocks_for_latest_gc_round();
1190
1191 assert!(block_manager.is_empty());
1193
1194 for block in &all_blocks {
1196 assert!(dag_state.read().contains_block(&block.reference()));
1197 }
1198 }
1199 }
1200
1201 #[rstest]
1202 #[tokio::test]
1203 async fn try_accept_committed_blocks() {
1204 let (mut context, _key_pairs) = Context::new_for_test(4);
1206 context
1208 .protocol_config
1209 .set_consensus_gc_depth_for_testing(4);
1210 let context = Arc::new(context);
1211 let store = Arc::new(MemStore::new());
1212 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1213
1214 let last_commit = TrustedCommit::new_for_test(
1217 10,
1218 CommitDigest::MIN,
1219 context.clock.timestamp_utc_ms(),
1220 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1221 vec![],
1222 );
1223 dag_state.write().set_last_commit(last_commit);
1224 assert_eq!(
1225 dag_state.read().gc_round(),
1226 2,
1227 "GC round should have moved to round 2"
1228 );
1229
1230 let mut block_manager =
1231 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1232
1233 let mut dag_builder = DagBuilder::new(context.clone());
1235 dag_builder.layers(1..=12).build();
1236
1237 let blocks = dag_builder.blocks(7..=12);
1240 let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1241 assert!(accepted_blocks.is_empty());
1242 assert_eq!(missing.len(), 4);
1243
1244 let blocks = dag_builder.blocks(3..=6);
1248
1249 let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1251
1252 accepted_blocks.sort_by_key(|b| b.reference());
1254
1255 let mut all_blocks = dag_builder.blocks(3..=12);
1256 all_blocks.sort_by_key(|b| b.reference());
1257
1258 assert_eq!(accepted_blocks, all_blocks);
1259 assert!(block_manager.is_empty());
1260 }
1261
1262 struct TestBlockVerifier {
1263 fail: BTreeSet<BlockRef>,
1264 }
1265
1266 impl TestBlockVerifier {
1267 fn new(fail: BTreeSet<BlockRef>) -> Self {
1268 Self { fail }
1269 }
1270 }
1271
1272 impl BlockVerifier for TestBlockVerifier {
1273 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1274 Ok(())
1275 }
1276
1277 fn check_ancestors(
1278 &self,
1279 block: &VerifiedBlock,
1280 _ancestors: &[Option<VerifiedBlock>],
1281 _gc_enabled: bool,
1282 _gc_round: Round,
1283 ) -> ConsensusResult<()> {
1284 if self.fail.contains(&block.reference()) {
1285 Err(ConsensusError::InvalidBlockTimestamp {
1286 max_timestamp_ms: 0,
1287 block_timestamp_ms: block.timestamp_ms(),
1288 })
1289 } else {
1290 Ok(())
1291 }
1292 }
1293 }
1294
1295 #[tokio::test]
1296 async fn reject_blocks_failing_verifications() {
1297 let (context, _key_pairs) = Context::new_for_test(4);
1298 let context = Arc::new(context);
1299
1300 let mut dag_builder = DagBuilder::new(context.clone());
1302 dag_builder.layers(1..=5).build();
1303
1304 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1305
1306 let test_verifier = TestBlockVerifier::new(
1308 all_blocks
1309 .iter()
1310 .filter(|block| block.round() == 3)
1311 .map(|block| block.reference())
1312 .collect(),
1313 );
1314
1315 let store = Arc::new(MemStore::new());
1317 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1318 let mut block_manager =
1319 BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1320
1321 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1324 all_blocks
1325 .iter()
1326 .filter(|block| block.round() > 1)
1327 .cloned()
1328 .collect(),
1329 );
1330
1331 assert!(accepted_blocks.is_empty());
1333 assert_eq!(missing_refs.len(), 4);
1334 missing_refs.iter().for_each(|missing_ref| {
1335 assert_eq!(missing_ref.round, 1);
1336 });
1337
1338 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1340 all_blocks
1341 .iter()
1342 .filter(|block| block.round() == 1)
1343 .cloned()
1344 .collect(),
1345 );
1346
1347 assert_eq!(accepted_blocks.len(), 8);
1349 accepted_blocks.iter().for_each(|block| {
1350 assert!(block.round() <= 2);
1351 });
1352 assert!(missing_refs.is_empty());
1353
1354 assert!(block_manager.suspended_blocks().is_empty());
1357 }
1358
1359 #[tokio::test]
1360 async fn try_find_blocks() {
1361 let (context, _key_pairs) = Context::new_for_test(4);
1363 let context = Arc::new(context);
1364 let store = Arc::new(MemStore::new());
1365 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1366
1367 let mut block_manager =
1368 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1369
1370 let mut dag_builder = DagBuilder::new(context.clone());
1372 dag_builder
1373 .layers(1..=2) .authorities(vec![
1375 AuthorityIndex::new_for_test(0),
1376 AuthorityIndex::new_for_test(2),
1377 ]) .equivocate(3)
1379 .build();
1380
1381 let round_2_blocks = dag_builder
1383 .blocks
1384 .iter()
1385 .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1386 .collect::<Vec<VerifiedBlock>>();
1387
1388 let missing_block_refs_from_find =
1390 block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1391 assert_eq!(missing_block_refs_from_find.len(), 10);
1392 assert!(
1393 missing_block_refs_from_find
1394 .iter()
1395 .all(|block_ref| block_ref.round == 2)
1396 );
1397
1398 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1401 assert!(accepted_blocks.is_empty());
1402
1403 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1404 let missing_block_refs_from_accept =
1405 missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1406 assert_eq!(missing, missing_block_refs_from_accept);
1407 assert_eq!(
1408 block_manager.missing_blocks(),
1409 missing_block_refs_from_accept
1410 );
1411
1412 dag_builder.layer(3).build();
1417
1418 let round_3_blocks = dag_builder
1419 .blocks
1420 .iter()
1421 .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1422 .collect::<Vec<BlockRef>>();
1423
1424 let missing_block_refs_from_find = block_manager.try_find_blocks(
1425 round_2_blocks
1426 .iter()
1427 .map(|b| b.reference())
1428 .chain(round_3_blocks.into_iter())
1429 .collect(),
1430 );
1431
1432 assert_eq!(missing_block_refs_from_find.len(), 4);
1433 assert!(
1434 missing_block_refs_from_find
1435 .iter()
1436 .all(|block_ref| block_ref.round == 3)
1437 );
1438 assert_eq!(
1439 block_manager.missing_blocks(),
1440 missing_block_refs_from_accept
1441 .into_iter()
1442 .chain(missing_block_refs_from_find.into_iter())
1443 .collect()
1444 );
1445 }
1446}