consensus_core/
block_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    sync::Arc,
8    time::Instant,
9};
10
11use iota_metrics::monitored_scope;
12use itertools::Itertools as _;
13use parking_lot::RwLock;
14use tracing::{debug, trace, warn};
15
16use crate::{
17    Round,
18    block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
19    block_verifier::BlockVerifier,
20    context::Context,
21    dag_state::DagState,
22};
23
24struct SuspendedBlock {
25    block: VerifiedBlock,
26    missing_ancestors: BTreeSet<BlockRef>,
27    timestamp: Instant,
28}
29
30impl SuspendedBlock {
31    fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
32        Self {
33            block,
34            missing_ancestors,
35            timestamp: Instant::now(),
36        }
37    }
38}
39
40/// Block manager suspends incoming blocks until they are connected to the
41/// existing graph, returning newly connected blocks.
42/// TODO: As it is possible to have Byzantine validators who produce Blocks
43/// without valid causal history we need to make sure that BlockManager takes
44/// care of that and avoid OOM (Out Of Memory) situations.
45pub(crate) struct BlockManager {
46    context: Arc<Context>,
47    dag_state: Arc<RwLock<DagState>>,
48    block_verifier: Arc<dyn BlockVerifier>,
49
50    /// Keeps all the suspended blocks. A suspended block is a block that is
51    /// missing part of its causal history and thus can't be immediately
52    /// processed. A block will remain in this map until all its causal history
53    /// has been successfully processed.
54    suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
55    /// A map that keeps all the blocks that we are missing (keys) and the
56    /// corresponding blocks that reference the missing blocks as ancestors
57    /// and need them to get unsuspended. It is possible for a missing
58    /// dependency (key) to be a suspended block, so the block has been
59    /// already fetched but it self is still missing some of its ancestors to be
60    /// processed.
61    missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
62    /// Keeps all the blocks that we actually miss and haven't fetched them yet.
63    /// That set will basically contain all the keys from the
64    /// `missing_ancestors` minus any keys that exist in `suspended_blocks`.
65    missing_blocks: BTreeSet<BlockRef>,
66    /// A vector that holds a tuple of (lowest_round, highest_round) of received
67    /// blocks per authority. This is used for metrics reporting purposes
68    /// and resets during restarts.
69    received_block_rounds: Vec<Option<(Round, Round)>>,
70}
71
72impl BlockManager {
73    pub(crate) fn new(
74        context: Arc<Context>,
75        dag_state: Arc<RwLock<DagState>>,
76        block_verifier: Arc<dyn BlockVerifier>,
77    ) -> Self {
78        let committee_size = context.committee.size();
79        Self {
80            context,
81            dag_state,
82            block_verifier,
83            suspended_blocks: BTreeMap::new(),
84            missing_ancestors: BTreeMap::new(),
85            missing_blocks: BTreeSet::new(),
86            received_block_rounds: vec![None; committee_size],
87        }
88    }
89
90    /// Tries to accept the provided blocks assuming that all their causal
91    /// history exists. The method returns all the blocks that have been
92    /// successfully processed in round ascending order, that includes also
93    /// previously suspended blocks that have now been able to get accepted.
94    /// Method also returns a set with the missing ancestor blocks.
95    #[tracing::instrument(skip_all)]
96    pub(crate) fn try_accept_blocks(
97        &mut self,
98        blocks: Vec<VerifiedBlock>,
99    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
100        let _s = monitored_scope("BlockManager::try_accept_blocks");
101        self.try_accept_blocks_internal(blocks, false)
102    }
103
104    // Tries to accept blocks that have been committed. Returns all the blocks that
105    // have been accepted, both from the ones provided and any children blocks.
106    #[tracing::instrument(skip_all)]
107    pub(crate) fn try_accept_committed_blocks(
108        &mut self,
109        blocks: Vec<VerifiedBlock>,
110    ) -> Vec<VerifiedBlock> {
111        // If GC is disabled then should not run any of this logic.
112        if !self.dag_state.read().gc_enabled() {
113            return Vec::new();
114        }
115
116        // Just accept the blocks
117        let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
118        let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
119        assert!(
120            missing_blocks.is_empty(),
121            "No missing blocks should be returned for committed blocks"
122        );
123
124        accepted_blocks
125    }
126
127    /// Attempts to accept the provided blocks. When `committed = true` then the
128    /// blocks are considered to be committed via certified commits and
129    /// are handled differently.
130    fn try_accept_blocks_internal(
131        &mut self,
132        mut blocks: Vec<VerifiedBlock>,
133        committed: bool,
134    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
135        let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
136
137        blocks.sort_by_key(|b| b.round());
138        debug!(
139            "Trying to accept blocks: {}",
140            blocks.iter().map(|b| b.reference().to_string()).join(",")
141        );
142
143        let mut accepted_blocks = vec![];
144        let mut missing_blocks = BTreeSet::new();
145
146        for block in blocks {
147            self.update_block_received_metrics(&block);
148
149            // Try to accept the input block.
150            let block_ref = block.reference();
151
152            let mut to_verify_timestamps_and_accept = vec![];
153            if committed {
154                match self.try_accept_one_committed_block(block) {
155                    TryAcceptResult::Accepted(block) => {
156                        // As this is a committed block, then it's already accepted and there is no
157                        // need to verify its timestamps. Just add it to the
158                        // accepted blocks list.
159                        accepted_blocks.push(block);
160                    }
161                    TryAcceptResult::Processed => continue,
162                    TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!(
163                        "Did not expect to suspend or skip a committed block: {:?}",
164                        block_ref
165                    ),
166                };
167            } else {
168                match self.try_accept_one_block(block) {
169                    TryAcceptResult::Accepted(block) => {
170                        to_verify_timestamps_and_accept.push(block);
171                    }
172                    TryAcceptResult::Suspended(ancestors_to_fetch) => {
173                        debug!(
174                            "Missing ancestors to fetch for block {block_ref}: {}",
175                            ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
176                        );
177                        missing_blocks.extend(ancestors_to_fetch);
178                        continue;
179                    }
180                    TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
181                };
182            };
183
184            // If the block is accepted, try to unsuspend its children blocks if any.
185            let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
186            to_verify_timestamps_and_accept.extend(unsuspended_blocks);
187
188            // Verify block timestamps
189            let blocks_to_accept =
190                self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
191            accepted_blocks.extend(blocks_to_accept);
192        }
193
194        self.update_stats(missing_blocks.len() as u64);
195
196        // Figure out the new missing blocks
197        (accepted_blocks, missing_blocks)
198    }
199
200    fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
201        if self.dag_state.read().contains_block(&block.reference()) {
202            return TryAcceptResult::Processed;
203        }
204
205        // Remove the block from missing and suspended blocks
206        self.missing_blocks.remove(&block.reference());
207
208        // If the block has been already fetched and parked as suspended block, then
209        // remove it. Also find all the references of missing ancestors to
210        // remove those as well. If we don't do that then it's possible once the missing
211        // ancestor is fetched to cause a panic when trying to unsuspend this
212        // children as it won't be found in the suspended blocks map.
213        if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
214            suspended_block
215                .missing_ancestors
216                .iter()
217                .for_each(|ancestor| {
218                    if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
219                        references.remove(&block.reference());
220                    }
221                });
222        }
223
224        // Accept this block before any unsuspended children blocks
225        self.dag_state.write().accept_blocks(vec![block.clone()]);
226
227        TryAcceptResult::Accepted(block)
228    }
229
230    /// Tries to find the provided block_refs in DagState and BlockManager,
231    /// and returns missing block refs.
232    pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
233        let _s = monitored_scope("BlockManager::try_find_blocks");
234        let gc_round = self.dag_state.read().gc_round();
235
236        // No need to fetch blocks that are <= gc_round as they won't get processed
237        // anyways and they'll get skipped. So keep only the ones above.
238        let mut block_refs = block_refs
239            .into_iter()
240            .filter(|block_ref| block_ref.round > gc_round)
241            .collect::<Vec<_>>();
242
243        if block_refs.is_empty() {
244            return BTreeSet::new();
245        }
246
247        block_refs.sort_by_key(|b| b.round);
248
249        debug!(
250            "Trying to find blocks: {}",
251            block_refs.iter().map(|b| b.to_string()).join(",")
252        );
253
254        let mut missing_blocks = BTreeSet::new();
255
256        for (found, block_ref) in self
257            .dag_state
258            .read()
259            .contains_blocks(block_refs.clone())
260            .into_iter()
261            .zip(block_refs.iter())
262        {
263            if found || self.suspended_blocks.contains_key(block_ref) {
264                continue;
265            }
266            // Fetches the block if it is not in dag state or suspended.
267            missing_blocks.insert(*block_ref);
268            if self.missing_blocks.insert(*block_ref) {
269                // We want to report this as a missing ancestor even if there is no block that
270                // is actually references it right now. That will allow us
271                // to seamlessly GC the block later if needed.
272                self.missing_ancestors.entry(*block_ref).or_default();
273
274                let block_ref_hostname =
275                    &self.context.committee.authority(block_ref.author).hostname;
276                self.context
277                    .metrics
278                    .node_metrics
279                    .block_manager_missing_blocks_by_authority
280                    .with_label_values(&[block_ref_hostname])
281                    .inc();
282            }
283        }
284
285        let metrics = &self.context.metrics.node_metrics;
286        metrics
287            .missing_blocks_total
288            .inc_by(missing_blocks.len() as u64);
289        metrics
290            .block_manager_missing_blocks
291            .set(self.missing_blocks.len() as i64);
292
293        missing_blocks
294    }
295
296    // TODO: remove once timestamping is refactored to the new approach.
297    // Verifies each block's timestamp based on its ancestors, and persists in store
298    // all the valid blocks that should be accepted. Method returns the accepted
299    // and persisted blocks.
300    fn verify_block_timestamps_and_accept(
301        &mut self,
302        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
303    ) -> Vec<VerifiedBlock> {
304        let (gc_enabled, gc_round) = {
305            let dag_state = self.dag_state.read();
306            (dag_state.gc_enabled(), dag_state.gc_round())
307        };
308        // Try to verify the block and its children for timestamp, with ancestor blocks.
309        let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
310        let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
311        {
312            'block: for b in unsuspended_blocks {
313                let ancestors = self.dag_state.read().get_blocks(b.ancestors());
314                assert_eq!(b.ancestors().len(), ancestors.len());
315                let mut ancestor_blocks = vec![];
316                'ancestor: for (ancestor_ref, found) in
317                    b.ancestors().iter().zip(ancestors.into_iter())
318                {
319                    if let Some(found_block) = found {
320                        // This invariant should be guaranteed by DagState.
321                        assert_eq!(ancestor_ref, &found_block.reference());
322                        ancestor_blocks.push(Some(found_block));
323                        continue 'ancestor;
324                    }
325                    // blocks_to_accept have not been added to DagState yet, but they
326                    // can appear in ancestors.
327                    if blocks_to_accept.contains_key(ancestor_ref) {
328                        ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
329                        continue 'ancestor;
330                    }
331                    // If an ancestor is already rejected, reject this block as well.
332                    if blocks_to_reject.contains_key(ancestor_ref) {
333                        blocks_to_reject.insert(b.reference(), b);
334                        continue 'block;
335                    }
336
337                    // When gc is enabled it's possible that we indeed won't find any ancestors that
338                    // are passed gc_round. That's ok. We don't need to panic here.
339                    // We do want to panic if gc_enabled we and have an ancestor that is > gc_round,
340                    // or gc is disabled.
341                    if gc_enabled
342                        && ancestor_ref.round > GENESIS_ROUND
343                        && ancestor_ref.round <= gc_round
344                    {
345                        debug!(
346                            "Block {:?} has a missing ancestor: {:?} passed GC round {}",
347                            b.reference(),
348                            ancestor_ref,
349                            gc_round
350                        );
351                        ancestor_blocks.push(None);
352                    } else {
353                        panic!(
354                            "Unsuspended block {:?} has a missing ancestor! Ancestor not found in DagState: {:?}",
355                            b, ancestor_ref
356                        );
357                    }
358                }
359                if let Err(e) =
360                    self.block_verifier
361                        .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
362                {
363                    warn!("Block {:?} failed to verify ancestors: {}", b, e);
364                    blocks_to_reject.insert(b.reference(), b);
365                } else {
366                    blocks_to_accept.insert(b.reference(), b);
367                }
368            }
369        }
370
371        // TODO: report blocks_to_reject to peers.
372        for (block_ref, block) in blocks_to_reject {
373            let hostname = self
374                .context
375                .committee
376                .authority(block_ref.author)
377                .hostname
378                .clone();
379
380            self.context
381                .metrics
382                .node_metrics
383                .invalid_blocks
384                .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
385                .inc();
386            warn!("Invalid block {:?} is rejected", block);
387        }
388
389        let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
390
391        // Insert the accepted blocks into DAG state so future blocks including them as
392        // ancestors do not get suspended.
393        self.dag_state
394            .write()
395            .accept_blocks(blocks_to_accept.clone());
396
397        blocks_to_accept
398    }
399
400    /// Tries to accept the provided block. To accept a block its ancestors must
401    /// have been already successfully accepted. If block is accepted then
402    /// Some result is returned. None is returned when either the block is
403    /// suspended or the block has been already accepted before.
404    fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
405        let block_ref = block.reference();
406        let mut missing_ancestors = BTreeSet::new();
407        let mut ancestors_to_fetch = BTreeSet::new();
408        let dag_state = self.dag_state.read();
409        let gc_round = dag_state.gc_round();
410        let gc_enabled = dag_state.gc_enabled();
411
412        // If block has been already received and suspended, or already processed and
413        // stored, or is a genesis block, then skip it.
414        if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
415            return TryAcceptResult::Processed;
416        }
417
418        // If the block is <= gc_round, then we simply skip its processing as there is
419        // no meaning do any action on it or even store it.
420        if gc_enabled && block.round() <= gc_round {
421            let hostname = self
422                .context
423                .committee
424                .authority(block.author())
425                .hostname
426                .as_str();
427            self.context
428                .metrics
429                .node_metrics
430                .block_manager_skipped_blocks
431                .with_label_values(&[hostname])
432                .inc();
433            return TryAcceptResult::Skipped;
434        }
435
436        // Keep only the ancestors that are greater than the GC round to check for their
437        // existence. Keep in mind that if GC is disabled then gc_round will be
438        // 0 and all ancestors will be considered.
439        let ancestors = if gc_enabled {
440            block
441                .ancestors()
442                .iter()
443                .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
444                .cloned()
445                .collect::<Vec<_>>()
446        } else {
447            block.ancestors().to_vec()
448        };
449
450        // make sure that we have all the required ancestors in store
451        for (found, ancestor) in dag_state
452            .contains_blocks(ancestors.clone())
453            .into_iter()
454            .zip(ancestors.iter())
455        {
456            if !found {
457                missing_ancestors.insert(*ancestor);
458
459                // mark the block as having missing ancestors
460                self.missing_ancestors
461                    .entry(*ancestor)
462                    .or_default()
463                    .insert(block_ref);
464
465                let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
466                self.context
467                    .metrics
468                    .node_metrics
469                    .block_manager_missing_ancestors_by_authority
470                    .with_label_values(&[ancestor_hostname])
471                    .inc();
472
473                // Add the ancestor to the missing blocks set only if it doesn't already exist
474                // in the suspended blocks - meaning that we already have its
475                // payload.
476                if !self.suspended_blocks.contains_key(ancestor) {
477                    // Fetches the block if it is not in dag state or suspended.
478                    ancestors_to_fetch.insert(*ancestor);
479                    if self.missing_blocks.insert(*ancestor) {
480                        self.context
481                            .metrics
482                            .node_metrics
483                            .block_manager_missing_blocks_by_authority
484                            .with_label_values(&[ancestor_hostname])
485                            .inc();
486                    }
487                }
488            }
489        }
490
491        // Remove the block ref from the `missing_blocks` - if exists - since we now
492        // have received the block. The block might still get suspended, but we
493        // won't report it as missing in order to not re-fetch.
494        self.missing_blocks.remove(&block.reference());
495
496        if !missing_ancestors.is_empty() {
497            let hostname = self
498                .context
499                .committee
500                .authority(block.author())
501                .hostname
502                .as_str();
503            self.context
504                .metrics
505                .node_metrics
506                .block_suspensions
507                .with_label_values(&[hostname])
508                .inc();
509            self.suspended_blocks
510                .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
511            return TryAcceptResult::Suspended(ancestors_to_fetch);
512        }
513
514        TryAcceptResult::Accepted(block)
515    }
516
517    /// Given an accepted block `accepted_block` it attempts to accept all the
518    /// suspended children blocks assuming such exist. All the unsuspended /
519    /// accepted blocks are returned as a vector in causal order.
520    fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
521        let mut unsuspended_blocks = vec![];
522        let mut to_process_blocks = vec![accepted_block];
523
524        while let Some(block_ref) = to_process_blocks.pop() {
525            // And try to check if its direct children can be unsuspended
526            if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
527                for r in block_refs_with_missing_deps {
528                    // For each dependency try to unsuspend it. If that's successful then we add it
529                    // to the queue so we can recursively try to unsuspend its
530                    // children.
531                    if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
532                        to_process_blocks.push(block.block.reference());
533                        unsuspended_blocks.push(block);
534                    }
535                }
536            }
537        }
538
539        let now = Instant::now();
540
541        // Report the unsuspended blocks
542        for block in &unsuspended_blocks {
543            let hostname = self
544                .context
545                .committee
546                .authority(block.block.author())
547                .hostname
548                .as_str();
549            self.context
550                .metrics
551                .node_metrics
552                .block_unsuspensions
553                .with_label_values(&[hostname])
554                .inc();
555            self.context
556                .metrics
557                .node_metrics
558                .suspended_block_time
559                .with_label_values(&[hostname])
560                .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
561        }
562
563        unsuspended_blocks
564            .into_iter()
565            .map(|block| block.block)
566            .collect()
567    }
568
569    /// Attempts to unsuspend a block by checking its ancestors and removing the
570    /// `accepted_dependency` by its local set. If there is no missing
571    /// dependency then this block can be unsuspended immediately and is removed
572    /// from the `suspended_blocks` map.
573    fn try_unsuspend_block(
574        &mut self,
575        block_ref: &BlockRef,
576        accepted_dependency: &BlockRef,
577    ) -> Option<SuspendedBlock> {
578        let block = self
579            .suspended_blocks
580            .get_mut(block_ref)
581            .expect("Block should be in suspended map");
582
583        assert!(
584            block.missing_ancestors.remove(accepted_dependency),
585            "Block reference {} should be present in missing dependencies of {:?}",
586            block_ref,
587            block.block
588        );
589
590        if block.missing_ancestors.is_empty() {
591            // we have no missing dependency, so we unsuspend the block and return it
592            return self.suspended_blocks.remove(block_ref);
593        }
594        None
595    }
596
597    /// Tries to unsuspend any blocks for the latest gc round. If gc round
598    /// hasn't changed then no blocks will be unsuspended due to
599    /// this action.
600    pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
601        let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
602        let (gc_enabled, gc_round) = {
603            let dag_state = self.dag_state.read();
604            (dag_state.gc_enabled(), dag_state.gc_round())
605        };
606        let mut blocks_unsuspended_below_gc_round = 0;
607        let mut blocks_gc_ed = 0;
608
609        if !gc_enabled {
610            trace!("GC is disabled, no blocks will attempt to get unsuspended.");
611            return;
612        }
613
614        while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
615            // If the first block in the missing ancestors is higher than the gc_round, then
616            // we can't unsuspend it yet. So we just put it back
617            // and we terminate the iteration as any next entry will be of equal or higher
618            // round anyways.
619            if block_ref.round > gc_round {
620                return;
621            }
622
623            blocks_gc_ed += 1;
624
625            let hostname = self
626                .context
627                .committee
628                .authority(block_ref.author)
629                .hostname
630                .as_str();
631            self.context
632                .metrics
633                .node_metrics
634                .block_manager_gced_blocks
635                .with_label_values(&[hostname])
636                .inc();
637
638            assert!(
639                !self.suspended_blocks.contains_key(block_ref),
640                "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
641            );
642
643            // Also remove it from the missing list - we don't want to keep looking for it.
644            self.missing_blocks.remove(block_ref);
645
646            // Find all the children blocks that have a dependency on this one and try to
647            // unsuspend them
648            let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
649
650            unsuspended_blocks.iter().for_each(|block| {
651                if block.round() <= gc_round {
652                    blocks_unsuspended_below_gc_round += 1;
653                }
654            });
655
656            // Now validate their timestamps and accept them
657            let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
658            for block in accepted_blocks {
659                let hostname = self
660                    .context
661                    .committee
662                    .authority(block.author())
663                    .hostname
664                    .as_str();
665                self.context
666                    .metrics
667                    .node_metrics
668                    .block_manager_gc_unsuspended_blocks
669                    .with_label_values(&[hostname])
670                    .inc();
671            }
672        }
673
674        debug!(
675            "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
676            blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
677        );
678    }
679
680    /// Returns all the blocks that are currently missing and needed in order to
681    /// accept suspended blocks.
682    pub(crate) fn missing_blocks(&self) -> BTreeSet<BlockRef> {
683        self.missing_blocks.clone()
684    }
685
686    fn update_stats(&mut self, missing_blocks: u64) {
687        let metrics = &self.context.metrics.node_metrics;
688        metrics.missing_blocks_total.inc_by(missing_blocks);
689        metrics
690            .block_manager_suspended_blocks
691            .set(self.suspended_blocks.len() as i64);
692        metrics
693            .block_manager_missing_ancestors
694            .set(self.missing_ancestors.len() as i64);
695        metrics
696            .block_manager_missing_blocks
697            .set(self.missing_blocks.len() as i64);
698    }
699
700    fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
701        let (min_round, max_round) =
702            if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
703                (curr_min.min(block.round()), curr_max.max(block.round()))
704            } else {
705                (block.round(), block.round())
706            };
707        self.received_block_rounds[block.author()] = Some((min_round, max_round));
708
709        let hostname = &self.context.committee.authority(block.author()).hostname;
710        self.context
711            .metrics
712            .node_metrics
713            .lowest_verified_authority_round
714            .with_label_values(&[hostname])
715            .set(min_round.into());
716        self.context
717            .metrics
718            .node_metrics
719            .highest_verified_authority_round
720            .with_label_values(&[hostname])
721            .set(max_round.into());
722    }
723
724    /// Checks if block manager is empty.
725    #[cfg(test)]
726    pub(crate) fn is_empty(&self) -> bool {
727        self.suspended_blocks.is_empty()
728            && self.missing_ancestors.is_empty()
729            && self.missing_blocks.is_empty()
730    }
731
732    /// Returns all the suspended blocks whose causal history we miss hence we
733    /// can't accept them yet.
734    #[cfg(test)]
735    fn suspended_blocks(&self) -> Vec<BlockRef> {
736        self.suspended_blocks.keys().cloned().collect()
737    }
738}
739
740// Result of trying to accept one block.
741enum TryAcceptResult {
742    // The block is accepted. Wraps the block itself.
743    Accepted(VerifiedBlock),
744    // The block is suspended. Wraps ancestors to be fetched.
745    Suspended(BTreeSet<BlockRef>),
746    // The block has been processed before and already exists in BlockManager (and is suspended)
747    // or in DagState (so has been already accepted). No further processing has been done at
748    // this point.
749    Processed,
750    // When a received block is <= gc_round, then we simply skip its processing as there is no
751    // meaning do any action on it or even store it.
752    Skipped,
753}
754
755#[cfg(test)]
756mod tests {
757    use std::{collections::BTreeSet, sync::Arc};
758
759    use consensus_config::AuthorityIndex;
760    use parking_lot::RwLock;
761    use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
762    use rstest::rstest;
763
764    use crate::{
765        CommitDigest, Round,
766        block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
767        block_manager::BlockManager,
768        block_verifier::{BlockVerifier, NoopBlockVerifier},
769        commit::TrustedCommit,
770        context::Context,
771        dag_state::DagState,
772        error::{ConsensusError, ConsensusResult},
773        storage::mem_store::MemStore,
774        test_dag_builder::DagBuilder,
775        test_dag_parser::parse_dag,
776    };
777
778    #[tokio::test]
779    async fn suspend_blocks_with_missing_ancestors() {
780        // GIVEN
781        let (context, _key_pairs) = Context::new_for_test(4);
782        let context = Arc::new(context);
783        let store = Arc::new(MemStore::new());
784        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
785
786        let mut block_manager =
787            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
788
789        // create a DAG
790        let mut dag_builder = DagBuilder::new(context.clone());
791        dag_builder
792            .layers(1..=2) // 2 rounds
793            .authorities(vec![
794                AuthorityIndex::new_for_test(0),
795                AuthorityIndex::new_for_test(2),
796            ]) // Create equivocating blocks for 2 authorities
797            .equivocate(3)
798            .build();
799
800        // Take only the blocks of round 2 and try to accept them
801        let round_2_blocks = dag_builder
802            .blocks
803            .into_iter()
804            .filter_map(|(_, block)| (block.round() == 2).then_some(block))
805            .collect::<Vec<VerifiedBlock>>();
806
807        // WHEN
808        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
809
810        // THEN
811        assert!(accepted_blocks.is_empty());
812
813        // AND the returned missing ancestors should be the same as the provided block
814        // ancestors
815        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
816        let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
817        assert_eq!(missing, missing_block_refs);
818
819        // AND the missing blocks are the parents of the round 2 blocks. Since this is a
820        // fully connected DAG taking the ancestors of the first element
821        // suffices.
822        assert_eq!(block_manager.missing_blocks(), missing_block_refs);
823
824        // AND suspended blocks should return the round_2_blocks
825        assert_eq!(
826            block_manager.suspended_blocks(),
827            round_2_blocks
828                .into_iter()
829                .map(|block| block.reference())
830                .collect::<Vec<_>>()
831        );
832    }
833
834    #[tokio::test]
835    async fn try_accept_block_returns_missing_blocks() {
836        let (context, _key_pairs) = Context::new_for_test(4);
837        let context = Arc::new(context);
838        let store = Arc::new(MemStore::new());
839        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
840
841        let mut block_manager =
842            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
843
844        // create a DAG
845        let mut dag_builder = DagBuilder::new(context.clone());
846        dag_builder
847            .layers(1..=4) // 4 rounds
848            .authorities(vec![
849                AuthorityIndex::new_for_test(0),
850                AuthorityIndex::new_for_test(2),
851            ]) // Create equivocating blocks for 2 authorities
852            .equivocate(3) // Use 3 equivocations blocks per authority
853            .build();
854
855        // Take the blocks from round 4 up to 2 (included). Only the first block of each
856        // round should return missing ancestors when try to accept
857        for (_, block) in dag_builder
858            .blocks
859            .into_iter()
860            .rev()
861            .take_while(|(_, block)| block.round() >= 2)
862        {
863            // WHEN
864            let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
865
866            // THEN
867            assert!(accepted_blocks.is_empty());
868
869            let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
870            assert_eq!(missing, block_ancestors);
871        }
872    }
873
874    #[tokio::test]
875    async fn accept_blocks_with_complete_causal_history() {
876        // GIVEN
877        let (context, _key_pairs) = Context::new_for_test(4);
878        let context = Arc::new(context);
879        let store = Arc::new(MemStore::new());
880        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
881
882        let mut block_manager =
883            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
884
885        // create a DAG of 2 rounds
886        let mut dag_builder = DagBuilder::new(context.clone());
887        dag_builder.layers(1..=2).build();
888
889        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
890
891        // WHEN
892        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
893
894        // THEN
895        assert_eq!(accepted_blocks.len(), 8);
896        assert_eq!(
897            accepted_blocks,
898            all_blocks
899                .iter()
900                .filter(|block| block.round() > 0)
901                .cloned()
902                .collect::<Vec<VerifiedBlock>>()
903        );
904        assert!(missing.is_empty());
905        assert!(block_manager.is_empty());
906
907        // WHEN trying to accept same blocks again, then none will be returned as those
908        // have been already accepted
909        let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
910        assert!(accepted_blocks.is_empty());
911    }
912
913    /// Tests that the block manager accepts blocks when some or all of their
914    /// causal history is below or equal to the GC round.
915    #[tokio::test]
916    async fn accept_blocks_with_causal_history_below_gc_round() {
917        // GIVEN
918        let (mut context, _key_pairs) = Context::new_for_test(4);
919
920        // We set the gc depth to 4
921        context
922            .protocol_config
923            .set_consensus_gc_depth_for_testing(4);
924        let context = Arc::new(context);
925        let store = Arc::new(MemStore::new());
926        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
927
928        // We "fake" the commit for round 10, so we can test the GC round 6
929        // (commit_round - gc_depth = 10 - 4 = 6)
930        let last_commit = TrustedCommit::new_for_test(
931            10,
932            CommitDigest::MIN,
933            context.clock.timestamp_utc_ms(),
934            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
935            vec![],
936        );
937        dag_state.write().set_last_commit(last_commit);
938        assert_eq!(
939            dag_state.read().gc_round(),
940            6,
941            "GC round should have moved to round 6"
942        );
943
944        let mut block_manager =
945            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
946
947        // create a DAG of 10 rounds with some weak links for the blocks of round 9
948        let dag_str = "DAG {
949            Round 0 : { 4 },
950            Round 1 : { * },
951            Round 2 : { * },
952            Round 3 : { * },
953            Round 4 : { * },
954            Round 5 : { * },
955            Round 6 : { * },
956            Round 7 : {
957                A -> [*],
958                B -> [*],
959                C -> [*],
960            }
961            Round 8 : {
962                A -> [*],
963                B -> [*],
964                C -> [*],
965            },
966            Round 9 : {
967                A -> [A8, B8, C8, D6],
968                B -> [A8, B8, C8, D6],
969                C -> [A8, B8, C8, D6],
970                D -> [A8, B8, C8, D6],
971            },
972            Round 10 : { * },
973        }";
974
975        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
976
977        // Now take all the blocks for round 7 & 8 , which are above the gc_round = 6.
978        // All those blocks should eventually be returned as accepted. Pay attention
979        // that without GC none of those blocks should get accepted.
980        let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
981
982        for rounds_range in blocks_ranges {
983            let all_blocks = dag_builder
984                .blocks
985                .values()
986                .filter(|block| rounds_range.contains(&block.round()))
987                .cloned()
988                .collect::<Vec<_>>();
989
990            // WHEN
991            let mut reversed_blocks = all_blocks.clone();
992            reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
993            let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
994            accepted_blocks.sort_by_key(|a| a.reference());
995
996            // THEN
997            assert_eq!(accepted_blocks, all_blocks.to_vec());
998            assert!(missing.is_empty());
999            assert!(block_manager.is_empty());
1000
1001            let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1002            assert!(accepted_blocks.is_empty());
1003        }
1004    }
1005
1006    /// Blocks that are attempted to be accepted but are <= gc_round they will
1007    /// be skipped for processing. Nothing should be stored or trigger any
1008    /// unsuspension etc.
1009    #[tokio::test]
1010    async fn skip_accepting_blocks_below_gc_round() {
1011        // GIVEN
1012        let (mut context, _key_pairs) = Context::new_for_test(4);
1013        // We set the gc depth to 4
1014        context
1015            .protocol_config
1016            .set_consensus_gc_depth_for_testing(4);
1017        let context = Arc::new(context);
1018        let store = Arc::new(MemStore::new());
1019        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1020
1021        // We "fake" the commit for round 10, so we can test the GC round 6
1022        // (commit_round - gc_depth = 10 - 4 = 6)
1023        let last_commit = TrustedCommit::new_for_test(
1024            10,
1025            CommitDigest::MIN,
1026            context.clock.timestamp_utc_ms(),
1027            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1028            vec![],
1029        );
1030        dag_state.write().set_last_commit(last_commit);
1031        assert_eq!(
1032            dag_state.read().gc_round(),
1033            6,
1034            "GC round should have moved to round 6"
1035        );
1036
1037        let mut block_manager =
1038            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1039
1040        // create a DAG of 6 rounds
1041        let mut dag_builder = DagBuilder::new(context.clone());
1042        dag_builder.layers(1..=6).build();
1043
1044        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1045
1046        // WHEN
1047        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1048
1049        // THEN
1050        assert!(accepted_blocks.is_empty());
1051        assert!(missing.is_empty());
1052        assert!(block_manager.is_empty());
1053    }
1054
1055    /// The test generate blocks for a well connected DAG and feed them to block
1056    /// manager in random order. In the end all the blocks should be
1057    /// uniquely suspended and no missing blocks should exist. The test will run
1058    /// for both gc_enabled/disabled. When gc is enabeld we set a high
1059    /// gc_depth value so in practice gc_round will be 0, but we'll be able to
1060    /// test in the common case that this work exactly the same way as when
1061    /// gc is disabled.
1062    #[rstest]
1063    #[tokio::test]
1064    async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1065        // GIVEN
1066        let (mut context, _key_pairs) = Context::new_for_test(4);
1067
1068        if gc_enabled {
1069            context
1070                .protocol_config
1071                .set_consensus_gc_depth_for_testing(10);
1072        }
1073        let context = Arc::new(context);
1074
1075        // create a DAG of rounds 1 ~ 3
1076        let mut dag_builder = DagBuilder::new(context.clone());
1077        dag_builder.layers(1..=3).build();
1078
1079        let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1080
1081        // Now randomize the sequence of sending the blocks to block manager. In the end
1082        // all the blocks should be uniquely suspended and no missing blocks
1083        // should exist.
1084        for seed in 0..100u8 {
1085            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1086
1087            let store = Arc::new(MemStore::new());
1088            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1089
1090            let mut block_manager =
1091                BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1092
1093            // WHEN
1094            let mut all_accepted_blocks = vec![];
1095            for block in &all_blocks {
1096                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1097
1098                all_accepted_blocks.extend(accepted_blocks);
1099            }
1100
1101            // THEN
1102            all_accepted_blocks.sort_by_key(|b| b.reference());
1103            all_blocks.sort_by_key(|b| b.reference());
1104
1105            assert_eq!(
1106                all_accepted_blocks, all_blocks,
1107                "Failed acceptance sequence for seed {}",
1108                seed
1109            );
1110            assert!(block_manager.is_empty());
1111        }
1112    }
1113
1114    #[rstest]
1115    #[tokio::test]
1116    async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1117        telemetry_subscribers::init_for_testing();
1118        // GIVEN
1119        let (mut context, _key_pairs) = Context::new_for_test(4);
1120
1121        if gc_depth > 0 {
1122            context
1123                .protocol_config
1124                .set_consensus_gc_depth_for_testing(gc_depth);
1125        }
1126        let context = Arc::new(context);
1127
1128        // create a DAG of rounds 1 ~ gc_depth * 2
1129        let mut dag_builder = DagBuilder::new(context.clone());
1130        dag_builder.layers(1..=gc_depth * 2).build();
1131
1132        // Pay attention that we start from round 2. Round 1 will always be missing so
1133        // no matter what we do we can't unsuspend it unless gc_round has
1134        // advanced to round >= 1.
1135        let mut all_blocks = dag_builder
1136            .blocks
1137            .values()
1138            .filter(|block| block.round() > 1)
1139            .cloned()
1140            .collect::<Vec<_>>();
1141
1142        // Now randomize the sequence of sending the blocks to block manager. In the end
1143        // all the blocks should be uniquely suspended and no missing blocks
1144        // should exist.
1145        for seed in 0..100u8 {
1146            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1147
1148            let store = Arc::new(MemStore::new());
1149            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1150
1151            let mut block_manager = BlockManager::new(
1152                context.clone(),
1153                dag_state.clone(),
1154                Arc::new(NoopBlockVerifier),
1155            );
1156
1157            // WHEN
1158            for block in &all_blocks {
1159                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1160                assert!(accepted_blocks.is_empty());
1161            }
1162            assert!(!block_manager.is_empty());
1163
1164            // AND also call the try_to_find method with some non existing block refs. Those
1165            // should be cleaned up as well once GC kicks in.
1166            let non_existing_refs = (1..=3)
1167                .map(|round| {
1168                    BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1169                })
1170                .collect::<Vec<_>>();
1171            assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1172
1173            // AND
1174            // Trigger a commit which will advance GC round
1175            let last_commit = TrustedCommit::new_for_test(
1176                gc_depth * 2,
1177                CommitDigest::MIN,
1178                context.clock.timestamp_utc_ms(),
1179                BlockRef::new(
1180                    gc_depth * 2,
1181                    AuthorityIndex::new_for_test(0),
1182                    BlockDigest::MIN,
1183                ),
1184                vec![],
1185            );
1186            dag_state.write().set_last_commit(last_commit);
1187
1188            // AND
1189            block_manager.try_unsuspend_blocks_for_latest_gc_round();
1190
1191            // THEN
1192            assert!(block_manager.is_empty());
1193
1194            // AND ensure that all have been accepted to the DAG
1195            for block in &all_blocks {
1196                assert!(dag_state.read().contains_block(&block.reference()));
1197            }
1198        }
1199    }
1200
1201    #[rstest]
1202    #[tokio::test]
1203    async fn try_accept_committed_blocks() {
1204        // GIVEN
1205        let (mut context, _key_pairs) = Context::new_for_test(4);
1206        // We set the gc depth to 4
1207        context
1208            .protocol_config
1209            .set_consensus_gc_depth_for_testing(4);
1210        let context = Arc::new(context);
1211        let store = Arc::new(MemStore::new());
1212        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1213
1214        // We "fake" the commit for round 6, so GC round moves to (commit_round -
1215        // gc_depth = 6 - 4 = 2)
1216        let last_commit = TrustedCommit::new_for_test(
1217            10,
1218            CommitDigest::MIN,
1219            context.clock.timestamp_utc_ms(),
1220            BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1221            vec![],
1222        );
1223        dag_state.write().set_last_commit(last_commit);
1224        assert_eq!(
1225            dag_state.read().gc_round(),
1226            2,
1227            "GC round should have moved to round 2"
1228        );
1229
1230        let mut block_manager =
1231            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1232
1233        // create a DAG of 12 rounds
1234        let mut dag_builder = DagBuilder::new(context.clone());
1235        dag_builder.layers(1..=12).build();
1236
1237        // Now try to accept via the normal acceptance block path the blocks of rounds 7
1238        // ~ 12. None of them should be accepted
1239        let blocks = dag_builder.blocks(7..=12);
1240        let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1241        assert!(accepted_blocks.is_empty());
1242        assert_eq!(missing.len(), 4);
1243
1244        // Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6.
1245        // All of them should be accepted and also the blocks of rounds 7 ~ 12
1246        // should be unsuspended and accepted as well.
1247        let blocks = dag_builder.blocks(3..=6);
1248
1249        // WHEN
1250        let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1251
1252        // THEN
1253        accepted_blocks.sort_by_key(|b| b.reference());
1254
1255        let mut all_blocks = dag_builder.blocks(3..=12);
1256        all_blocks.sort_by_key(|b| b.reference());
1257
1258        assert_eq!(accepted_blocks, all_blocks);
1259        assert!(block_manager.is_empty());
1260    }
1261
1262    struct TestBlockVerifier {
1263        fail: BTreeSet<BlockRef>,
1264    }
1265
1266    impl TestBlockVerifier {
1267        fn new(fail: BTreeSet<BlockRef>) -> Self {
1268            Self { fail }
1269        }
1270    }
1271
1272    impl BlockVerifier for TestBlockVerifier {
1273        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1274            Ok(())
1275        }
1276
1277        fn check_ancestors(
1278            &self,
1279            block: &VerifiedBlock,
1280            _ancestors: &[Option<VerifiedBlock>],
1281            _gc_enabled: bool,
1282            _gc_round: Round,
1283        ) -> ConsensusResult<()> {
1284            if self.fail.contains(&block.reference()) {
1285                Err(ConsensusError::InvalidBlockTimestamp {
1286                    max_timestamp_ms: 0,
1287                    block_timestamp_ms: block.timestamp_ms(),
1288                })
1289            } else {
1290                Ok(())
1291            }
1292        }
1293    }
1294
1295    #[tokio::test]
1296    async fn reject_blocks_failing_verifications() {
1297        let (context, _key_pairs) = Context::new_for_test(4);
1298        let context = Arc::new(context);
1299
1300        // create a DAG of rounds 1 ~ 5.
1301        let mut dag_builder = DagBuilder::new(context.clone());
1302        dag_builder.layers(1..=5).build();
1303
1304        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1305
1306        // Create a test verifier that fails the blocks of round 3
1307        let test_verifier = TestBlockVerifier::new(
1308            all_blocks
1309                .iter()
1310                .filter(|block| block.round() == 3)
1311                .map(|block| block.reference())
1312                .collect(),
1313        );
1314
1315        // Create BlockManager.
1316        let store = Arc::new(MemStore::new());
1317        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1318        let mut block_manager =
1319            BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1320
1321        // Try to accept blocks from round 2 ~ 5 into block manager. All of them should
1322        // be suspended.
1323        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1324            all_blocks
1325                .iter()
1326                .filter(|block| block.round() > 1)
1327                .cloned()
1328                .collect(),
1329        );
1330
1331        // Missing refs should all come from round 1.
1332        assert!(accepted_blocks.is_empty());
1333        assert_eq!(missing_refs.len(), 4);
1334        missing_refs.iter().for_each(|missing_ref| {
1335            assert_eq!(missing_ref.round, 1);
1336        });
1337
1338        // Now add round 1 blocks into block manager.
1339        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1340            all_blocks
1341                .iter()
1342                .filter(|block| block.round() == 1)
1343                .cloned()
1344                .collect(),
1345        );
1346
1347        // Only round 1 and round 2 blocks should be accepted.
1348        assert_eq!(accepted_blocks.len(), 8);
1349        accepted_blocks.iter().for_each(|block| {
1350            assert!(block.round() <= 2);
1351        });
1352        assert!(missing_refs.is_empty());
1353
1354        // Other blocks should be rejected and there should be no remaining suspended
1355        // block.
1356        assert!(block_manager.suspended_blocks().is_empty());
1357    }
1358
1359    #[tokio::test]
1360    async fn try_find_blocks() {
1361        // GIVEN
1362        let (context, _key_pairs) = Context::new_for_test(4);
1363        let context = Arc::new(context);
1364        let store = Arc::new(MemStore::new());
1365        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1366
1367        let mut block_manager =
1368            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1369
1370        // create a DAG
1371        let mut dag_builder = DagBuilder::new(context.clone());
1372        dag_builder
1373            .layers(1..=2) // 2 rounds
1374            .authorities(vec![
1375                AuthorityIndex::new_for_test(0),
1376                AuthorityIndex::new_for_test(2),
1377            ]) // Create equivocating blocks for 2 authorities
1378            .equivocate(3)
1379            .build();
1380
1381        // Take only the blocks of round 2 and try to accept them
1382        let round_2_blocks = dag_builder
1383            .blocks
1384            .iter()
1385            .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1386            .collect::<Vec<VerifiedBlock>>();
1387
1388        // All blocks should be missing
1389        let missing_block_refs_from_find =
1390            block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1391        assert_eq!(missing_block_refs_from_find.len(), 10);
1392        assert!(
1393            missing_block_refs_from_find
1394                .iter()
1395                .all(|block_ref| block_ref.round == 2)
1396        );
1397
1398        // Try accept blocks which will cause blocks to be suspended and added to
1399        // missing in block manager.
1400        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1401        assert!(accepted_blocks.is_empty());
1402
1403        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1404        let missing_block_refs_from_accept =
1405            missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1406        assert_eq!(missing, missing_block_refs_from_accept);
1407        assert_eq!(
1408            block_manager.missing_blocks(),
1409            missing_block_refs_from_accept
1410        );
1411
1412        // No blocks should be accepted and block manager should have made note
1413        // of the missing & suspended blocks.
1414        // Now we can check get the result of try find block with all of the blocks
1415        // from newly created but not accepted round 3.
1416        dag_builder.layer(3).build();
1417
1418        let round_3_blocks = dag_builder
1419            .blocks
1420            .iter()
1421            .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1422            .collect::<Vec<BlockRef>>();
1423
1424        let missing_block_refs_from_find = block_manager.try_find_blocks(
1425            round_2_blocks
1426                .iter()
1427                .map(|b| b.reference())
1428                .chain(round_3_blocks.into_iter())
1429                .collect(),
1430        );
1431
1432        assert_eq!(missing_block_refs_from_find.len(), 4);
1433        assert!(
1434            missing_block_refs_from_find
1435                .iter()
1436                .all(|block_ref| block_ref.round == 3)
1437        );
1438        assert_eq!(
1439            block_manager.missing_blocks(),
1440            missing_block_refs_from_accept
1441                .into_iter()
1442                .chain(missing_block_refs_from_find.into_iter())
1443                .collect()
1444        );
1445    }
1446}