1use std::{
6 collections::{BTreeMap, BTreeSet, btree_map::Entry},
7 sync::Arc,
8 time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, instrument, trace, warn};
16
17use crate::{
18 Round,
19 block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 context::Context,
22 dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27 block: VerifiedBlock,
28 missing_ancestors: BTreeSet<BlockRef>,
29 timestamp: Instant,
30}
31
32impl SuspendedBlock {
33 pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34 Self {
35 block,
36 missing_ancestors,
37 timestamp: Instant::now(),
38 }
39 }
40}
41
42pub(crate) struct BlockManager {
48 context: Arc<Context>,
49 dag_state: Arc<RwLock<DagState>>,
50 block_verifier: Arc<dyn BlockVerifier>,
51
52 suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57 missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71 received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78 pub(crate) fn new(
79 context: Arc<Context>,
80 dag_state: Arc<RwLock<DagState>>,
81 block_verifier: Arc<dyn BlockVerifier>,
82 ) -> Self {
83 let committee_size = context.committee.size();
84 Self {
85 context,
86 dag_state,
87 block_verifier,
88 suspended_blocks: BTreeMap::new(),
89 missing_ancestors: BTreeMap::new(),
90 missing_blocks: BTreeMap::new(),
91 received_block_rounds: vec![None; committee_size],
92 }
93 }
94
95 #[tracing::instrument(skip_all)]
101 pub(crate) fn try_accept_blocks(
102 &mut self,
103 blocks: Vec<VerifiedBlock>,
104 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105 let _s = monitored_scope("BlockManager::try_accept_blocks");
106 self.try_accept_blocks_internal(blocks, false)
107 }
108
109 #[tracing::instrument(skip_all)]
112 pub(crate) fn try_accept_committed_blocks(
113 &mut self,
114 blocks: Vec<VerifiedBlock>,
115 ) -> Vec<VerifiedBlock> {
116 if !self.dag_state.read().gc_enabled() {
118 return Vec::new();
119 }
120
121 let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123 let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124 assert!(
125 missing_blocks.is_empty(),
126 "No missing blocks should be returned for committed blocks"
127 );
128
129 accepted_blocks
130 }
131
132 fn try_accept_blocks_internal(
136 &mut self,
137 mut blocks: Vec<VerifiedBlock>,
138 committed: bool,
139 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140 let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142 blocks.sort_by_key(|b| b.round());
143 debug!(
144 "Trying to accept blocks: {}",
145 blocks.iter().map(|b| b.reference().to_string()).join(",")
146 );
147
148 let mut accepted_blocks = vec![];
149 let mut missing_blocks = BTreeSet::new();
150
151 for block in blocks {
152 self.update_block_received_metrics(&block);
153
154 let block_ref = block.reference();
156
157 let mut to_verify_timestamps_and_accept = vec![];
158 if committed {
159 match self.try_accept_one_committed_block(block) {
160 TryAcceptResult::Accepted(block) => {
161 accepted_blocks.push(block);
165 }
166 TryAcceptResult::Processed => continue,
167 TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168 panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169 }
170 };
171 } else {
172 match self.try_accept_one_block(block) {
173 TryAcceptResult::Accepted(block) => {
174 to_verify_timestamps_and_accept.push(block);
175 }
176 TryAcceptResult::Suspended(ancestors_to_fetch) => {
177 debug!(
178 "Missing ancestors to fetch for block {block_ref}: {}",
179 ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180 );
181 missing_blocks.extend(ancestors_to_fetch);
182 continue;
183 }
184 TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185 };
186 };
187
188 let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190 to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192 let blocks_to_accept =
194 self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195 accepted_blocks.extend(blocks_to_accept);
196 }
197
198 self.update_stats(missing_blocks.len() as u64);
199
200 (accepted_blocks, missing_blocks)
202 }
203
204 fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205 if self.dag_state.read().contains_block(&block.reference()) {
206 return TryAcceptResult::Processed;
207 }
208
209 self.missing_blocks.remove(&block.reference());
211
212 if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218 suspended_block
219 .missing_ancestors
220 .iter()
221 .for_each(|ancestor| {
222 if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223 references.remove(&block.reference());
224 }
225 });
226 }
227
228 self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231 TryAcceptResult::Accepted(block)
232 }
233
234 pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237 let _s = monitored_scope("BlockManager::try_find_blocks");
238 let gc_round = self.dag_state.read().gc_round();
239
240 let mut block_refs = block_refs
243 .into_iter()
244 .filter(|block_ref| block_ref.round > gc_round)
245 .collect::<Vec<_>>();
246
247 if block_refs.is_empty() {
248 return BTreeSet::new();
249 }
250
251 block_refs.sort_by_key(|b| b.round);
252
253 debug!(
254 "Trying to find blocks: {}",
255 block_refs.iter().map(|b| b.to_string()).join(",")
256 );
257
258 let mut missing_blocks = BTreeSet::new();
259
260 for (found, block_ref) in self
261 .dag_state
262 .read()
263 .contains_blocks(block_refs.clone())
264 .into_iter()
265 .zip(block_refs.iter())
266 {
267 if found || self.suspended_blocks.contains_key(block_ref) {
268 continue;
269 }
270 missing_blocks.insert(*block_ref);
272 if self
273 .missing_blocks
274 .insert(*block_ref, BTreeSet::from([block_ref.author]))
275 .is_none()
276 {
277 self.missing_ancestors.entry(*block_ref).or_default();
281
282 let block_ref_hostname =
283 &self.context.committee.authority(block_ref.author).hostname;
284 self.context
285 .metrics
286 .node_metrics
287 .block_manager_missing_blocks_by_authority
288 .with_label_values(&[block_ref_hostname])
289 .inc();
290 }
291 }
292
293 let metrics = &self.context.metrics.node_metrics;
294 metrics
295 .missing_blocks_total
296 .inc_by(missing_blocks.len() as u64);
297 metrics
298 .block_manager_missing_blocks
299 .set(self.missing_blocks.len() as i64);
300
301 missing_blocks
302 }
303
304 fn verify_block_timestamps_and_accept(
307 &mut self,
308 unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
309 ) -> Vec<VerifiedBlock> {
310 let blocks_to_accept = if self
313 .context
314 .protocol_config
315 .consensus_median_timestamp_with_checkpoint_enforcement()
316 {
317 unsuspended_blocks.into_iter().collect::<Vec<_>>()
318 } else {
319 self.verify_block_timestamps(unsuspended_blocks)
320 };
321
322 self.dag_state
325 .write()
326 .accept_blocks(blocks_to_accept.clone());
327
328 blocks_to_accept
329 }
330
331 fn verify_block_timestamps(
335 &mut self,
336 unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
337 ) -> Vec<VerifiedBlock> {
338 let (gc_enabled, gc_round) = {
339 let dag_state = self.dag_state.read();
340 (dag_state.gc_enabled(), dag_state.gc_round())
341 };
342 let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
344 let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
345 {
346 'block: for b in unsuspended_blocks {
347 let ancestors = self.dag_state.read().get_blocks(b.ancestors());
348 assert_eq!(b.ancestors().len(), ancestors.len());
349 let mut ancestor_blocks = vec![];
350 'ancestor: for (ancestor_ref, found) in
351 b.ancestors().iter().zip(ancestors.into_iter())
352 {
353 if let Some(found_block) = found {
354 assert_eq!(ancestor_ref, &found_block.reference());
356 ancestor_blocks.push(Some(found_block));
357 continue 'ancestor;
358 }
359 if blocks_to_accept.contains_key(ancestor_ref) {
362 ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
363 continue 'ancestor;
364 }
365 if blocks_to_reject.contains_key(ancestor_ref) {
367 blocks_to_reject.insert(b.reference(), b);
368 continue 'block;
369 }
370
371 if gc_enabled
376 && ancestor_ref.round > GENESIS_ROUND
377 && ancestor_ref.round <= gc_round
378 {
379 debug!(
380 "Block {:?} has a missing ancestor: {:?} passed GC round {}",
381 b.reference(),
382 ancestor_ref,
383 gc_round
384 );
385 ancestor_blocks.push(None);
386 } else {
387 panic!(
388 "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
389 );
390 }
391 }
392 if let Err(e) =
393 self.block_verifier
394 .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
395 {
396 warn!("Block {:?} failed to verify ancestors: {}", b, e);
397 blocks_to_reject.insert(b.reference(), b);
398 } else {
399 blocks_to_accept.insert(b.reference(), b);
400 }
401 }
402 }
403
404 for (block_ref, block) in blocks_to_reject {
406 let hostname = self
407 .context
408 .committee
409 .authority(block_ref.author)
410 .hostname
411 .clone();
412
413 self.context
414 .metrics
415 .node_metrics
416 .invalid_blocks
417 .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
418 .inc();
419 warn!("Invalid block {:?} is rejected", block);
420 }
421
422 let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
423
424 self.dag_state
427 .write()
428 .accept_blocks(blocks_to_accept.clone());
429
430 blocks_to_accept
431 }
432
433 fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
438 let block_ref = block.reference();
439 let mut missing_ancestors = BTreeSet::new();
440 let mut ancestors_to_fetch = BTreeSet::new();
441 let dag_state = self.dag_state.read();
442 let gc_round = dag_state.gc_round();
443 let gc_enabled = dag_state.gc_enabled();
444
445 if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
448 return TryAcceptResult::Processed;
449 }
450
451 if gc_enabled && block.round() <= gc_round {
454 let hostname = self
455 .context
456 .committee
457 .authority(block.author())
458 .hostname
459 .as_str();
460 self.context
461 .metrics
462 .node_metrics
463 .block_manager_skipped_blocks
464 .with_label_values(&[hostname])
465 .inc();
466 return TryAcceptResult::Skipped;
467 }
468
469 let ancestors = if gc_enabled {
473 block
474 .ancestors()
475 .iter()
476 .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
477 .cloned()
478 .collect::<Vec<_>>()
479 } else {
480 block.ancestors().to_vec()
481 };
482
483 for (found, ancestor) in dag_state
485 .contains_blocks(ancestors.clone())
486 .into_iter()
487 .zip(ancestors.iter())
488 {
489 if !found {
490 missing_ancestors.insert(*ancestor);
491
492 self.missing_ancestors
494 .entry(*ancestor)
495 .or_default()
496 .insert(block_ref);
497
498 let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
499 self.context
500 .metrics
501 .node_metrics
502 .block_manager_missing_ancestors_by_authority
503 .with_label_values(&[ancestor_hostname])
504 .inc();
505
506 if !self.suspended_blocks.contains_key(ancestor) {
510 ancestors_to_fetch.insert(*ancestor);
512 let entry = self.missing_blocks.entry(*ancestor);
516 match entry {
517 Entry::Vacant(v) => {
518 v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
519 self.context
520 .metrics
521 .node_metrics
522 .block_manager_missing_blocks_by_authority
523 .with_label_values(&[ancestor_hostname])
524 .inc();
525 }
526 Entry::Occupied(mut o) => {
527 o.get_mut().insert(block_ref.author);
528 }
529 }
530 }
531 }
532 }
533
534 self.missing_blocks.remove(&block.reference());
538
539 if !missing_ancestors.is_empty() {
540 let hostname = self
541 .context
542 .committee
543 .authority(block.author())
544 .hostname
545 .as_str();
546 self.context
547 .metrics
548 .node_metrics
549 .block_suspensions
550 .with_label_values(&[hostname])
551 .inc();
552 self.suspended_blocks
553 .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
554 return TryAcceptResult::Suspended(ancestors_to_fetch);
555 }
556
557 TryAcceptResult::Accepted(block)
558 }
559
560 fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
564 let mut unsuspended_blocks = vec![];
565 let mut to_process_blocks = vec![accepted_block];
566
567 while let Some(block_ref) = to_process_blocks.pop() {
568 if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
570 for r in block_refs_with_missing_deps {
571 if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
575 to_process_blocks.push(block.block.reference());
576 unsuspended_blocks.push(block);
577 }
578 }
579 }
580 }
581
582 let now = Instant::now();
583
584 for block in &unsuspended_blocks {
586 let hostname = self
587 .context
588 .committee
589 .authority(block.block.author())
590 .hostname
591 .as_str();
592 self.context
593 .metrics
594 .node_metrics
595 .block_unsuspensions
596 .with_label_values(&[hostname])
597 .inc();
598 self.context
599 .metrics
600 .node_metrics
601 .suspended_block_time
602 .with_label_values(&[hostname])
603 .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
604 }
605
606 unsuspended_blocks
607 .into_iter()
608 .map(|block| block.block)
609 .collect()
610 }
611
612 fn try_unsuspend_block(
617 &mut self,
618 block_ref: &BlockRef,
619 accepted_dependency: &BlockRef,
620 ) -> Option<SuspendedBlock> {
621 let block = self
622 .suspended_blocks
623 .get_mut(block_ref)
624 .expect("Block should be in suspended map");
625
626 assert!(
627 block.missing_ancestors.remove(accepted_dependency),
628 "Block reference {} should be present in missing dependencies of {:?}",
629 block_ref,
630 block.block
631 );
632
633 if block.missing_ancestors.is_empty() {
634 return self.suspended_blocks.remove(block_ref);
636 }
637 None
638 }
639
640 #[instrument(level = "trace", skip_all)]
644 pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
645 let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
646 let (gc_enabled, gc_round) = {
647 let dag_state = self.dag_state.read();
648 (dag_state.gc_enabled(), dag_state.gc_round())
649 };
650 let mut blocks_unsuspended_below_gc_round = 0;
651 let mut blocks_gc_ed = 0;
652
653 if !gc_enabled {
654 trace!("GC is disabled, no blocks will attempt to get unsuspended.");
655 return;
656 }
657
658 while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
659 if block_ref.round > gc_round {
664 return;
665 }
666
667 blocks_gc_ed += 1;
668
669 let hostname = self
670 .context
671 .committee
672 .authority(block_ref.author)
673 .hostname
674 .as_str();
675 self.context
676 .metrics
677 .node_metrics
678 .block_manager_gced_blocks
679 .with_label_values(&[hostname])
680 .inc();
681
682 assert!(
683 !self.suspended_blocks.contains_key(block_ref),
684 "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
685 );
686
687 self.missing_blocks.remove(block_ref);
689
690 let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
693
694 unsuspended_blocks.iter().for_each(|block| {
695 if block.round() <= gc_round {
696 blocks_unsuspended_below_gc_round += 1;
697 }
698 });
699
700 let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
702 for block in accepted_blocks {
703 let hostname = self
704 .context
705 .committee
706 .authority(block.author())
707 .hostname
708 .as_str();
709 self.context
710 .metrics
711 .node_metrics
712 .block_manager_gc_unsuspended_blocks
713 .with_label_values(&[hostname])
714 .inc();
715 }
716 }
717
718 debug!(
719 "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
720 blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
721 );
722 }
723
724 pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
728 self.missing_blocks.clone()
729 }
730
731 #[cfg(test)]
733 pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
734 self.missing_blocks.keys().cloned().collect()
735 }
736
737 fn update_stats(&mut self, missing_blocks: u64) {
738 let metrics = &self.context.metrics.node_metrics;
739 metrics.missing_blocks_total.inc_by(missing_blocks);
740 metrics
741 .block_manager_suspended_blocks
742 .set(self.suspended_blocks.len() as i64);
743 metrics
744 .block_manager_missing_ancestors
745 .set(self.missing_ancestors.len() as i64);
746 metrics
747 .block_manager_missing_blocks
748 .set(self.missing_blocks.len() as i64);
749 }
750
751 fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
752 let (min_round, max_round) =
753 if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
754 (curr_min.min(block.round()), curr_max.max(block.round()))
755 } else {
756 (block.round(), block.round())
757 };
758 self.received_block_rounds[block.author()] = Some((min_round, max_round));
759
760 let hostname = &self.context.committee.authority(block.author()).hostname;
761 self.context
762 .metrics
763 .node_metrics
764 .lowest_verified_authority_round
765 .with_label_values(&[hostname])
766 .set(min_round.into());
767 self.context
768 .metrics
769 .node_metrics
770 .highest_verified_authority_round
771 .with_label_values(&[hostname])
772 .set(max_round.into());
773 }
774
775 #[cfg(test)]
777 pub(crate) fn is_empty(&self) -> bool {
778 self.suspended_blocks.is_empty()
779 && self.missing_ancestors.is_empty()
780 && self.missing_blocks.is_empty()
781 }
782
783 #[cfg(test)]
786 fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
787 self.suspended_blocks.keys().cloned().collect()
788 }
789}
790
791enum TryAcceptResult {
793 Accepted(VerifiedBlock),
795 Suspended(BTreeSet<BlockRef>),
797 Processed,
801 Skipped,
804}
805
806#[cfg(test)]
807mod tests {
808 use std::{collections::BTreeSet, sync::Arc};
809
810 use consensus_config::AuthorityIndex;
811 use parking_lot::RwLock;
812 use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
813 use rstest::rstest;
814
815 use crate::{
816 CommitDigest, Round,
817 block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
818 block_manager::BlockManager,
819 block_verifier::{BlockVerifier, NoopBlockVerifier, SignedBlockVerifier},
820 commit::TrustedCommit,
821 context::Context,
822 dag_state::DagState,
823 error::{ConsensusError, ConsensusResult},
824 storage::mem_store::MemStore,
825 test_dag_builder::DagBuilder,
826 test_dag_parser::parse_dag,
827 transaction::NoopTransactionVerifier,
828 };
829
830 #[tokio::test]
831 async fn suspend_blocks_with_missing_ancestors() {
832 let (context, _key_pairs) = Context::new_for_test(4);
834 let context = Arc::new(context);
835 let store = Arc::new(MemStore::new());
836 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
837
838 let mut block_manager =
839 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
840
841 let mut dag_builder = DagBuilder::new(context.clone());
843 dag_builder
844 .layers(1..=2) .authorities(vec![
846 AuthorityIndex::new_for_test(0),
847 AuthorityIndex::new_for_test(2),
848 ]) .equivocate(3)
850 .build();
851
852 let round_2_blocks = dag_builder
854 .blocks
855 .into_iter()
856 .filter_map(|(_, block)| (block.round() == 2).then_some(block))
857 .collect::<Vec<VerifiedBlock>>();
858
859 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
861
862 assert!(accepted_blocks.is_empty());
864
865 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
868 let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
869 assert_eq!(missing, missing_block_refs);
870
871 assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
875
876 assert_eq!(
878 block_manager.suspended_blocks_refs(),
879 round_2_blocks
880 .into_iter()
881 .map(|block| block.reference())
882 .collect::<BTreeSet<_>>()
883 );
884
885 let known_by_manager = block_manager
887 .missing_blocks()
888 .iter()
889 .next()
890 .expect("We should expect at least two elements there")
891 .1
892 .clone();
893 assert_eq!(
894 known_by_manager,
895 context
896 .committee
897 .authorities()
898 .map(|(a, _)| a)
899 .collect::<BTreeSet<_>>()
900 );
901 }
902
903 #[tokio::test]
904 async fn try_accept_block_returns_missing_blocks() {
905 let (context, _key_pairs) = Context::new_for_test(4);
906 let context = Arc::new(context);
907 let store = Arc::new(MemStore::new());
908 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
909
910 let mut block_manager =
911 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
912
913 let mut dag_builder = DagBuilder::new(context.clone());
915 dag_builder
916 .layers(1..=4) .authorities(vec![
918 AuthorityIndex::new_for_test(0),
919 AuthorityIndex::new_for_test(2),
920 ]) .equivocate(3) .build();
923
924 for (_, block) in dag_builder
927 .blocks
928 .into_iter()
929 .rev()
930 .take_while(|(_, block)| block.round() >= 2)
931 {
932 let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
934
935 assert!(accepted_blocks.is_empty());
937
938 let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
939 assert_eq!(missing, block_ancestors);
940 }
941 }
942
943 #[tokio::test]
944 async fn accept_blocks_with_complete_causal_history() {
945 let (context, _key_pairs) = Context::new_for_test(4);
947 let context = Arc::new(context);
948 let store = Arc::new(MemStore::new());
949 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
950
951 let mut block_manager =
952 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
953
954 let mut dag_builder = DagBuilder::new(context.clone());
956 dag_builder.layers(1..=2).build();
957
958 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
959
960 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
962
963 assert_eq!(accepted_blocks.len(), 8);
965 assert_eq!(
966 accepted_blocks,
967 all_blocks
968 .iter()
969 .filter(|block| block.round() > 0)
970 .cloned()
971 .collect::<Vec<VerifiedBlock>>()
972 );
973 assert!(missing.is_empty());
974 assert!(block_manager.is_empty());
975
976 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
979 assert!(accepted_blocks.is_empty());
980 }
981
982 #[tokio::test]
985 async fn accept_blocks_with_causal_history_below_gc_round() {
986 let (mut context, _key_pairs) = Context::new_for_test(4);
988
989 context
991 .protocol_config
992 .set_consensus_gc_depth_for_testing(4);
993 let context = Arc::new(context);
994 let store = Arc::new(MemStore::new());
995 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
996
997 let last_commit = TrustedCommit::new_for_test(
1000 10,
1001 CommitDigest::MIN,
1002 context.clock.timestamp_utc_ms(),
1003 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1004 vec![],
1005 );
1006 dag_state.write().set_last_commit(last_commit);
1007 assert_eq!(
1008 dag_state.read().gc_round(),
1009 6,
1010 "GC round should have moved to round 6"
1011 );
1012
1013 let mut block_manager =
1014 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1015
1016 let dag_str = "DAG {
1018 Round 0 : { 4 },
1019 Round 1 : { * },
1020 Round 2 : { * },
1021 Round 3 : { * },
1022 Round 4 : { * },
1023 Round 5 : { * },
1024 Round 6 : { * },
1025 Round 7 : {
1026 A -> [*],
1027 B -> [*],
1028 C -> [*],
1029 }
1030 Round 8 : {
1031 A -> [*],
1032 B -> [*],
1033 C -> [*],
1034 },
1035 Round 9 : {
1036 A -> [A8, B8, C8, D6],
1037 B -> [A8, B8, C8, D6],
1038 C -> [A8, B8, C8, D6],
1039 D -> [A8, B8, C8, D6],
1040 },
1041 Round 10 : { * },
1042 }";
1043
1044 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1045
1046 let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1050
1051 for rounds_range in blocks_ranges {
1052 let all_blocks = dag_builder
1053 .blocks
1054 .values()
1055 .filter(|block| rounds_range.contains(&block.round()))
1056 .cloned()
1057 .collect::<Vec<_>>();
1058
1059 let mut reversed_blocks = all_blocks.clone();
1061 reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1062 let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1063 accepted_blocks.sort_by_key(|a| a.reference());
1064
1065 assert_eq!(accepted_blocks, all_blocks.to_vec());
1067 assert!(missing.is_empty());
1068 assert!(block_manager.is_empty());
1069
1070 let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1071 assert!(accepted_blocks.is_empty());
1072 }
1073 }
1074
1075 #[tokio::test]
1079 async fn skip_accepting_blocks_below_gc_round() {
1080 let (mut context, _key_pairs) = Context::new_for_test(4);
1082 context
1084 .protocol_config
1085 .set_consensus_gc_depth_for_testing(4);
1086 let context = Arc::new(context);
1087 let store = Arc::new(MemStore::new());
1088 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1089
1090 let last_commit = TrustedCommit::new_for_test(
1093 10,
1094 CommitDigest::MIN,
1095 context.clock.timestamp_utc_ms(),
1096 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1097 vec![],
1098 );
1099 dag_state.write().set_last_commit(last_commit);
1100 assert_eq!(
1101 dag_state.read().gc_round(),
1102 6,
1103 "GC round should have moved to round 6"
1104 );
1105
1106 let mut block_manager =
1107 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1108
1109 let mut dag_builder = DagBuilder::new(context.clone());
1111 dag_builder.layers(1..=6).build();
1112
1113 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1114
1115 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1117
1118 assert!(accepted_blocks.is_empty());
1120 assert!(missing.is_empty());
1121 assert!(block_manager.is_empty());
1122 }
1123
1124 #[rstest]
1132 #[tokio::test]
1133 async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1134 let (mut context, _key_pairs) = Context::new_for_test(4);
1136
1137 if gc_enabled {
1138 context
1139 .protocol_config
1140 .set_consensus_gc_depth_for_testing(10);
1141 }
1142 let context = Arc::new(context);
1143
1144 let mut dag_builder = DagBuilder::new(context.clone());
1146 dag_builder.layers(1..=3).build();
1147
1148 let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1149
1150 for seed in 0..100u8 {
1154 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1155
1156 let store = Arc::new(MemStore::new());
1157 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1158
1159 let mut block_manager =
1160 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1161
1162 let mut all_accepted_blocks = vec![];
1164 for block in &all_blocks {
1165 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1166
1167 all_accepted_blocks.extend(accepted_blocks);
1168 }
1169
1170 all_accepted_blocks.sort_by_key(|b| b.reference());
1172 all_blocks.sort_by_key(|b| b.reference());
1173
1174 assert_eq!(
1175 all_accepted_blocks, all_blocks,
1176 "Failed acceptance sequence for seed {seed}"
1177 );
1178 assert!(block_manager.is_empty());
1179 }
1180 }
1181
1182 #[tokio::test]
1185 async fn authorities_that_know_missing_blocks() {
1186 let (context, _key_pairs) = Context::new_for_test(4);
1187
1188 let context = Arc::new(context);
1189
1190 let mut dag_builder = DagBuilder::new(context.clone());
1192 dag_builder.layers(1..=3).build();
1193
1194 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1195
1196 let blocks_round_2 = all_blocks
1197 .iter()
1198 .filter(|block| block.round() == 2)
1199 .cloned()
1200 .collect::<Vec<_>>();
1201
1202 let blocks_round_1 = all_blocks
1203 .iter()
1204 .filter(|block| block.round() == 1)
1205 .map(|block| block.reference())
1206 .collect::<BTreeSet<_>>();
1207
1208 let store = Arc::new(MemStore::new());
1209 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1210
1211 let mut block_manager =
1212 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1213
1214 let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1215 assert_eq!(missing_blocks, blocks_round_1);
1217
1218 let missing_blocks_with_authorities = block_manager.missing_blocks();
1219
1220 let block_round_1_authority_0 = all_blocks
1221 .iter()
1222 .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1223 .map(|block| block.reference())
1224 .next()
1225 .unwrap();
1226 let block_round_1_authority_1 = all_blocks
1227 .iter()
1228 .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1229 .map(|block| block.reference())
1230 .next()
1231 .unwrap();
1232 assert_eq!(
1233 missing_blocks_with_authorities[&block_round_1_authority_0],
1234 BTreeSet::from([AuthorityIndex::new_for_test(0)])
1235 );
1236 assert_eq!(
1237 missing_blocks_with_authorities[&block_round_1_authority_1],
1238 BTreeSet::from([
1239 AuthorityIndex::new_for_test(0),
1240 AuthorityIndex::new_for_test(1)
1241 ])
1242 );
1243
1244 block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1247 let missing_blocks_with_authorities = block_manager.missing_blocks();
1248 assert_eq!(
1249 missing_blocks_with_authorities[&block_round_1_authority_0],
1250 BTreeSet::from([
1251 AuthorityIndex::new_for_test(0),
1252 AuthorityIndex::new_for_test(1)
1253 ])
1254 );
1255 }
1256
1257 #[rstest]
1258 #[tokio::test]
1259 async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1260 telemetry_subscribers::init_for_testing();
1261 let (mut context, _key_pairs) = Context::new_for_test(4);
1263
1264 if gc_depth > 0 {
1265 context
1266 .protocol_config
1267 .set_consensus_gc_depth_for_testing(gc_depth);
1268 }
1269 let context = Arc::new(context);
1270
1271 let mut dag_builder = DagBuilder::new(context.clone());
1273 dag_builder.layers(1..=gc_depth * 2).build();
1274
1275 let mut all_blocks = dag_builder
1279 .blocks
1280 .values()
1281 .filter(|block| block.round() > 1)
1282 .cloned()
1283 .collect::<Vec<_>>();
1284
1285 for seed in 0..100u8 {
1289 all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1290
1291 let store = Arc::new(MemStore::new());
1292 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1293
1294 let mut block_manager = BlockManager::new(
1295 context.clone(),
1296 dag_state.clone(),
1297 Arc::new(NoopBlockVerifier),
1298 );
1299
1300 for block in &all_blocks {
1302 let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1303 assert!(accepted_blocks.is_empty());
1304 }
1305 assert!(!block_manager.is_empty());
1306
1307 let non_existing_refs = (1..=3)
1310 .map(|round| {
1311 BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1312 })
1313 .collect::<Vec<_>>();
1314 assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1315
1316 let last_commit = TrustedCommit::new_for_test(
1319 gc_depth * 2,
1320 CommitDigest::MIN,
1321 context.clock.timestamp_utc_ms(),
1322 BlockRef::new(
1323 gc_depth * 2,
1324 AuthorityIndex::new_for_test(0),
1325 BlockDigest::MIN,
1326 ),
1327 vec![],
1328 );
1329 dag_state.write().set_last_commit(last_commit);
1330
1331 block_manager.try_unsuspend_blocks_for_latest_gc_round();
1333
1334 assert!(block_manager.is_empty());
1336
1337 for block in &all_blocks {
1339 assert!(dag_state.read().contains_block(&block.reference()));
1340 }
1341 }
1342 }
1343
1344 #[rstest]
1345 #[tokio::test]
1346 async fn try_accept_committed_blocks() {
1347 let (mut context, _key_pairs) = Context::new_for_test(4);
1349 context
1351 .protocol_config
1352 .set_consensus_gc_depth_for_testing(4);
1353 let context = Arc::new(context);
1354 let store = Arc::new(MemStore::new());
1355 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1356
1357 let last_commit = TrustedCommit::new_for_test(
1360 10,
1361 CommitDigest::MIN,
1362 context.clock.timestamp_utc_ms(),
1363 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1364 vec![],
1365 );
1366 dag_state.write().set_last_commit(last_commit);
1367 assert_eq!(
1368 dag_state.read().gc_round(),
1369 2,
1370 "GC round should have moved to round 2"
1371 );
1372
1373 let mut block_manager =
1374 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1375
1376 let mut dag_builder = DagBuilder::new(context.clone());
1378 dag_builder.layers(1..=12).build();
1379
1380 let blocks = dag_builder.blocks(7..=12);
1383 let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1384 assert!(accepted_blocks.is_empty());
1385 assert_eq!(missing.len(), 4);
1386
1387 let blocks = dag_builder.blocks(3..=6);
1391
1392 let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1394
1395 accepted_blocks.sort_by_key(|b| b.reference());
1397
1398 let mut all_blocks = dag_builder.blocks(3..=12);
1399 all_blocks.sort_by_key(|b| b.reference());
1400
1401 assert_eq!(accepted_blocks, all_blocks);
1402 assert!(block_manager.is_empty());
1403 }
1404
1405 struct TestBlockVerifier {
1406 fail: BTreeSet<BlockRef>,
1407 }
1408
1409 impl TestBlockVerifier {
1410 fn new(fail: BTreeSet<BlockRef>) -> Self {
1411 Self { fail }
1412 }
1413 }
1414
1415 impl BlockVerifier for TestBlockVerifier {
1416 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1417 Ok(())
1418 }
1419
1420 fn check_ancestors(
1421 &self,
1422 block: &VerifiedBlock,
1423 _ancestors: &[Option<VerifiedBlock>],
1424 _gc_enabled: bool,
1425 _gc_round: Round,
1426 ) -> ConsensusResult<()> {
1427 if self.fail.contains(&block.reference()) {
1428 Err(ConsensusError::InvalidBlockTimestamp {
1429 max_timestamp_ms: 0,
1430 block_timestamp_ms: block.timestamp_ms(),
1431 })
1432 } else {
1433 Ok(())
1434 }
1435 }
1436 }
1437
1438 #[tokio::test]
1439 async fn reject_blocks_failing_verifications() {
1440 let (mut context, _key_pairs) = Context::new_for_test(4);
1441 context
1442 .protocol_config
1443 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
1444 let context = Arc::new(context);
1445
1446 let mut dag_builder = DagBuilder::new(context.clone());
1448 dag_builder.layers(1..=5).build();
1449
1450 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1451
1452 let test_verifier = TestBlockVerifier::new(
1454 all_blocks
1455 .iter()
1456 .filter(|block| block.round() == 3)
1457 .map(|block| block.reference())
1458 .collect(),
1459 );
1460
1461 let store = Arc::new(MemStore::new());
1463 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1464 let mut block_manager =
1465 BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1466
1467 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1470 all_blocks
1471 .iter()
1472 .filter(|block| block.round() > 1)
1473 .cloned()
1474 .collect(),
1475 );
1476
1477 assert!(accepted_blocks.is_empty());
1479 assert_eq!(missing_refs.len(), 4);
1480 missing_refs.iter().for_each(|missing_ref| {
1481 assert_eq!(missing_ref.round, 1);
1482 });
1483
1484 let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1486 all_blocks
1487 .iter()
1488 .filter(|block| block.round() == 1)
1489 .cloned()
1490 .collect(),
1491 );
1492
1493 assert_eq!(accepted_blocks.len(), 8);
1495 accepted_blocks.iter().for_each(|block| {
1496 assert!(block.round() <= 2);
1497 });
1498 assert!(missing_refs.is_empty());
1499
1500 assert!(block_manager.suspended_blocks_refs().is_empty());
1503 }
1504
1505 #[tokio::test]
1506 async fn try_find_blocks() {
1507 let (context, _key_pairs) = Context::new_for_test(4);
1509 let context = Arc::new(context);
1510 let store = Arc::new(MemStore::new());
1511 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1512
1513 let mut block_manager =
1514 BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1515
1516 let mut dag_builder = DagBuilder::new(context.clone());
1518 dag_builder
1519 .layers(1..=2) .authorities(vec![
1521 AuthorityIndex::new_for_test(0),
1522 AuthorityIndex::new_for_test(2),
1523 ]) .equivocate(3)
1525 .build();
1526
1527 let round_2_blocks = dag_builder
1529 .blocks
1530 .iter()
1531 .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1532 .collect::<Vec<VerifiedBlock>>();
1533
1534 let missing_block_refs_from_find =
1536 block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1537 assert_eq!(missing_block_refs_from_find.len(), 10);
1538 assert!(
1539 missing_block_refs_from_find
1540 .iter()
1541 .all(|block_ref| block_ref.round == 2)
1542 );
1543
1544 let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1547 assert!(accepted_blocks.is_empty());
1548
1549 let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1550 let missing_block_refs_from_accept =
1551 missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1552 assert_eq!(missing, missing_block_refs_from_accept);
1553 assert_eq!(
1554 block_manager.missing_block_refs(),
1555 missing_block_refs_from_accept
1556 );
1557
1558 dag_builder.layer(3).build();
1563
1564 let round_3_blocks = dag_builder
1565 .blocks
1566 .iter()
1567 .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1568 .collect::<Vec<BlockRef>>();
1569
1570 let missing_block_refs_from_find = block_manager.try_find_blocks(
1571 round_2_blocks
1572 .iter()
1573 .map(|b| b.reference())
1574 .chain(round_3_blocks.into_iter())
1575 .collect(),
1576 );
1577
1578 assert_eq!(missing_block_refs_from_find.len(), 4);
1579 assert!(
1580 missing_block_refs_from_find
1581 .iter()
1582 .all(|block_ref| block_ref.round == 3)
1583 );
1584 assert_eq!(
1585 block_manager.missing_block_refs(),
1586 missing_block_refs_from_accept
1587 .into_iter()
1588 .chain(missing_block_refs_from_find.into_iter())
1589 .collect()
1590 );
1591 }
1592
1593 #[rstest]
1594 #[tokio::test]
1595 async fn test_verify_block_timestamps_and_accept(
1596 #[values(false, true)] median_based_timestamp: bool,
1597 ) {
1598 telemetry_subscribers::init_for_testing();
1599 let (mut context, _key_pairs) = Context::new_for_test(4);
1600 context
1601 .protocol_config
1602 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1603 median_based_timestamp,
1604 );
1605
1606 let context = Arc::new(context);
1607 let store = Arc::new(MemStore::new());
1608 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1609
1610 let mut block_manager = BlockManager::new(
1611 context.clone(),
1612 dag_state.clone(),
1613 Arc::new(SignedBlockVerifier::new(
1614 context.clone(),
1615 Arc::new(NoopTransactionVerifier {}),
1616 )),
1617 );
1618
1619 let mut dag_builder = DagBuilder::new(context.clone());
1621 let authorities = context
1622 .committee
1623 .authorities()
1624 .map(|(index, _)| index)
1625 .collect::<Vec<_>>();
1626 dag_builder
1627 .layers(1..=1)
1628 .authorities(authorities.clone())
1629 .with_timestamps(vec![1000, 500, 550, 580])
1630 .build();
1631 dag_builder
1632 .layers(2..=2)
1633 .authorities(authorities.clone())
1634 .with_timestamps(vec![2000, 600, 650, 680])
1635 .build();
1636 dag_builder
1637 .layers(3..=3)
1638 .authorities(authorities)
1639 .with_timestamps(vec![3000, 700, 750, 780])
1640 .build();
1641
1642 let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1644
1645 let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1647
1648 if median_based_timestamp {
1649 assert_eq!(all_blocks, accepted_blocks);
1652 assert!(missing.is_empty());
1653 } else {
1654 assert_eq!(accepted_blocks.len(), 5);
1657 for block in accepted_blocks {
1658 if block.author() == AuthorityIndex::new_for_test(0) {
1659 assert!(block.round() == 1 || block.round() == 2);
1660 } else {
1661 assert_eq!(block.round(), 1);
1662 }
1663 }
1664 }
1665 }
1666}