consensus_core/
block_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, btree_map::Entry},
7    sync::Arc,
8    time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, trace, warn};
16
17use crate::{
18    Round,
19    block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    context::Context,
22    dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27    block: VerifiedBlock,
28    missing_ancestors: BTreeSet<BlockRef>,
29    timestamp: Instant,
30}
31
32impl SuspendedBlock {
33    pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34        Self {
35            block,
36            missing_ancestors,
37            timestamp: Instant::now(),
38        }
39    }
40}
41
42/// Block manager suspends incoming blocks until they are connected to the
43/// existing graph, returning newly connected blocks.
44/// TODO: As it is possible to have Byzantine validators who produce Blocks
45/// without valid causal history we need to make sure that BlockManager takes
46/// care of that and avoid OOM (Out Of Memory) situations.
47pub(crate) struct BlockManager {
48    context: Arc<Context>,
49    dag_state: Arc<RwLock<DagState>>,
50    block_verifier: Arc<dyn BlockVerifier>,
51
52    /// Keeps all the suspended blocks. A suspended block is a block that is
53    /// missing part of its causal history and thus can't be immediately
54    /// processed. A block will remain in this map until all its causal history
55    /// has been successfully processed.
56    suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57    /// A map that keeps all the blocks that we are missing (keys) and the
58    /// corresponding blocks that reference the missing blocks as ancestors
59    /// and need them to get unsuspended. It is possible for a missing
60    /// dependency (key) to be a suspended block, so the block has been
61    /// already fetched but it self is still missing some of its ancestors to be
62    /// processed.
63    missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64    /// A map of currently missing blocks to the set of authorities expected
65    /// to have them available locally. This set is approximated based on the
66    /// block's author and the authors of its direct children.
67    /// A block is considered missing if it appears in `missing_ancestors`
68    /// and has not yet been accepted or fetched. Blocks already stored or
69    /// present in `suspended_blocks` are excluded.
70    missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71    /// A vector that holds a tuple of (lowest_round, highest_round) of received
72    /// blocks per authority. This is used for metrics reporting purposes
73    /// and resets during restarts.
74    received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78    pub(crate) fn new(
79        context: Arc<Context>,
80        dag_state: Arc<RwLock<DagState>>,
81        block_verifier: Arc<dyn BlockVerifier>,
82    ) -> Self {
83        let committee_size = context.committee.size();
84        Self {
85            context,
86            dag_state,
87            block_verifier,
88            suspended_blocks: BTreeMap::new(),
89            missing_ancestors: BTreeMap::new(),
90            missing_blocks: BTreeMap::new(),
91            received_block_rounds: vec![None; committee_size],
92        }
93    }
94
95    /// Tries to accept the provided blocks assuming that all their causal
96    /// history exists. The method returns all the blocks that have been
97    /// successfully processed in round ascending order, that includes also
98    /// previously suspended blocks that have now been able to get accepted.
99    /// Method also returns a set with the missing ancestor blocks.
100    #[tracing::instrument(skip_all)]
101    pub(crate) fn try_accept_blocks(
102        &mut self,
103        blocks: Vec<VerifiedBlock>,
104    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105        let _s = monitored_scope("BlockManager::try_accept_blocks");
106        self.try_accept_blocks_internal(blocks, false)
107    }
108
109    // Tries to accept blocks that have been committed. Returns all the blocks that
110    // have been accepted, both from the ones provided and any children blocks.
111    #[tracing::instrument(skip_all)]
112    pub(crate) fn try_accept_committed_blocks(
113        &mut self,
114        blocks: Vec<VerifiedBlock>,
115    ) -> Vec<VerifiedBlock> {
116        // If GC is disabled then should not run any of this logic.
117        if !self.dag_state.read().gc_enabled() {
118            return Vec::new();
119        }
120
121        // Just accept the blocks
122        let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123        let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124        assert!(
125            missing_blocks.is_empty(),
126            "No missing blocks should be returned for committed blocks"
127        );
128
129        accepted_blocks
130    }
131
132    /// Attempts to accept the provided blocks. When `committed = true` then the
133    /// blocks are considered to be committed via certified commits and
134    /// are handled differently.
135    fn try_accept_blocks_internal(
136        &mut self,
137        mut blocks: Vec<VerifiedBlock>,
138        committed: bool,
139    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140        let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142        blocks.sort_by_key(|b| b.round());
143        debug!(
144            "Trying to accept blocks: {}",
145            blocks.iter().map(|b| b.reference().to_string()).join(",")
146        );
147
148        let mut accepted_blocks = vec![];
149        let mut missing_blocks = BTreeSet::new();
150
151        for block in blocks {
152            self.update_block_received_metrics(&block);
153
154            // Try to accept the input block.
155            let block_ref = block.reference();
156
157            let mut to_verify_timestamps_and_accept = vec![];
158            if committed {
159                match self.try_accept_one_committed_block(block) {
160                    TryAcceptResult::Accepted(block) => {
161                        // As this is a committed block, then it's already accepted and there is no
162                        // need to verify its timestamps. Just add it to the
163                        // accepted blocks list.
164                        accepted_blocks.push(block);
165                    }
166                    TryAcceptResult::Processed => continue,
167                    TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168                        panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169                    }
170                };
171            } else {
172                match self.try_accept_one_block(block) {
173                    TryAcceptResult::Accepted(block) => {
174                        to_verify_timestamps_and_accept.push(block);
175                    }
176                    TryAcceptResult::Suspended(ancestors_to_fetch) => {
177                        debug!(
178                            "Missing ancestors to fetch for block {block_ref}: {}",
179                            ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180                        );
181                        missing_blocks.extend(ancestors_to_fetch);
182                        continue;
183                    }
184                    TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185                };
186            };
187
188            // If the block is accepted, try to unsuspend its children blocks if any.
189            let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190            to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192            // Verify block timestamps
193            let blocks_to_accept =
194                self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195            accepted_blocks.extend(blocks_to_accept);
196        }
197
198        self.update_stats(missing_blocks.len() as u64);
199
200        // Figure out the new missing blocks
201        (accepted_blocks, missing_blocks)
202    }
203
204    fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205        if self.dag_state.read().contains_block(&block.reference()) {
206            return TryAcceptResult::Processed;
207        }
208
209        // Remove the block from missing and suspended blocks
210        self.missing_blocks.remove(&block.reference());
211
212        // If the block has been already fetched and parked as suspended block, then
213        // remove it. Also find all the references of missing ancestors to
214        // remove those as well. If we don't do that then it's possible once the missing
215        // ancestor is fetched to cause a panic when trying to unsuspend this
216        // children as it won't be found in the suspended blocks map.
217        if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218            suspended_block
219                .missing_ancestors
220                .iter()
221                .for_each(|ancestor| {
222                    if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223                        references.remove(&block.reference());
224                    }
225                });
226        }
227
228        // Accept this block before any unsuspended children blocks
229        self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231        TryAcceptResult::Accepted(block)
232    }
233
234    /// Tries to find the provided block_refs in DagState and BlockManager,
235    /// and returns missing block refs.
236    pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237        let _s = monitored_scope("BlockManager::try_find_blocks");
238        let gc_round = self.dag_state.read().gc_round();
239
240        // No need to fetch blocks that are <= gc_round as they won't get processed
241        // anyways and they'll get skipped. So keep only the ones above.
242        let mut block_refs = block_refs
243            .into_iter()
244            .filter(|block_ref| block_ref.round > gc_round)
245            .collect::<Vec<_>>();
246
247        if block_refs.is_empty() {
248            return BTreeSet::new();
249        }
250
251        block_refs.sort_by_key(|b| b.round);
252
253        debug!(
254            "Trying to find blocks: {}",
255            block_refs.iter().map(|b| b.to_string()).join(",")
256        );
257
258        let mut missing_blocks = BTreeSet::new();
259
260        for (found, block_ref) in self
261            .dag_state
262            .read()
263            .contains_blocks(block_refs.clone())
264            .into_iter()
265            .zip(block_refs.iter())
266        {
267            if found || self.suspended_blocks.contains_key(block_ref) {
268                continue;
269            }
270            // Fetches the block if it is not in dag state or suspended.
271            missing_blocks.insert(*block_ref);
272            if self
273                .missing_blocks
274                .insert(*block_ref, BTreeSet::from([block_ref.author]))
275                .is_none()
276            {
277                // We want to report this as a missing ancestor even if there is no block that
278                // is actually references it right now. That will allow us
279                // to seamlessly GC the block later if needed.
280                self.missing_ancestors.entry(*block_ref).or_default();
281
282                let block_ref_hostname =
283                    &self.context.committee.authority(block_ref.author).hostname;
284                self.context
285                    .metrics
286                    .node_metrics
287                    .block_manager_missing_blocks_by_authority
288                    .with_label_values(&[block_ref_hostname])
289                    .inc();
290            }
291        }
292
293        let metrics = &self.context.metrics.node_metrics;
294        metrics
295            .missing_blocks_total
296            .inc_by(missing_blocks.len() as u64);
297        metrics
298            .block_manager_missing_blocks
299            .set(self.missing_blocks.len() as i64);
300
301        missing_blocks
302    }
303
304    // TODO: remove once timestamping is refactored to the new approach.
305    // Verifies each block's timestamp based on its ancestors, and persists in store
306    // all the valid blocks that should be accepted. Method returns the accepted
307    // and persisted blocks.
308    fn verify_block_timestamps_and_accept(
309        &mut self,
310        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
311    ) -> Vec<VerifiedBlock> {
312        let (gc_enabled, gc_round) = {
313            let dag_state = self.dag_state.read();
314            (dag_state.gc_enabled(), dag_state.gc_round())
315        };
316        // Try to verify the block and its children for timestamp, with ancestor blocks.
317        let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
318        let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
319        {
320            'block: for b in unsuspended_blocks {
321                let ancestors = self.dag_state.read().get_blocks(b.ancestors());
322                assert_eq!(b.ancestors().len(), ancestors.len());
323                let mut ancestor_blocks = vec![];
324                'ancestor: for (ancestor_ref, found) in
325                    b.ancestors().iter().zip(ancestors.into_iter())
326                {
327                    if let Some(found_block) = found {
328                        // This invariant should be guaranteed by DagState.
329                        assert_eq!(ancestor_ref, &found_block.reference());
330                        ancestor_blocks.push(Some(found_block));
331                        continue 'ancestor;
332                    }
333                    // blocks_to_accept have not been added to DagState yet, but they
334                    // can appear in ancestors.
335                    if blocks_to_accept.contains_key(ancestor_ref) {
336                        ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
337                        continue 'ancestor;
338                    }
339                    // If an ancestor is already rejected, reject this block as well.
340                    if blocks_to_reject.contains_key(ancestor_ref) {
341                        blocks_to_reject.insert(b.reference(), b);
342                        continue 'block;
343                    }
344
345                    // When gc is enabled it's possible that we indeed won't find any ancestors that
346                    // are passed gc_round. That's ok. We don't need to panic here.
347                    // We do want to panic if gc_enabled we and have an ancestor that is > gc_round,
348                    // or gc is disabled.
349                    if gc_enabled
350                        && ancestor_ref.round > GENESIS_ROUND
351                        && ancestor_ref.round <= gc_round
352                    {
353                        debug!(
354                            "Block {:?} has a missing ancestor: {:?} passed GC round {}",
355                            b.reference(),
356                            ancestor_ref,
357                            gc_round
358                        );
359                        ancestor_blocks.push(None);
360                    } else {
361                        panic!(
362                            "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
363                        );
364                    }
365                }
366                if let Err(e) =
367                    self.block_verifier
368                        .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
369                {
370                    warn!("Block {:?} failed to verify ancestors: {}", b, e);
371                    blocks_to_reject.insert(b.reference(), b);
372                } else {
373                    blocks_to_accept.insert(b.reference(), b);
374                }
375            }
376        }
377
378        // TODO: report blocks_to_reject to peers.
379        for (block_ref, block) in blocks_to_reject {
380            let hostname = self
381                .context
382                .committee
383                .authority(block_ref.author)
384                .hostname
385                .clone();
386
387            self.context
388                .metrics
389                .node_metrics
390                .invalid_blocks
391                .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
392                .inc();
393            warn!("Invalid block {:?} is rejected", block);
394        }
395
396        let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
397
398        // Insert the accepted blocks into DAG state so future blocks including them as
399        // ancestors do not get suspended.
400        self.dag_state
401            .write()
402            .accept_blocks(blocks_to_accept.clone());
403
404        blocks_to_accept
405    }
406
407    /// Tries to accept the provided block. To accept a block its ancestors must
408    /// have been already successfully accepted. If block is accepted then
409    /// Some result is returned. None is returned when either the block is
410    /// suspended or the block has been already accepted before.
411    fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
412        let block_ref = block.reference();
413        let mut missing_ancestors = BTreeSet::new();
414        let mut ancestors_to_fetch = BTreeSet::new();
415        let dag_state = self.dag_state.read();
416        let gc_round = dag_state.gc_round();
417        let gc_enabled = dag_state.gc_enabled();
418
419        // If block has been already received and suspended, or already processed and
420        // stored, or is a genesis block, then skip it.
421        if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
422            return TryAcceptResult::Processed;
423        }
424
425        // If the block is <= gc_round, then we simply skip its processing as there is
426        // no meaning do any action on it or even store it.
427        if gc_enabled && block.round() <= gc_round {
428            let hostname = self
429                .context
430                .committee
431                .authority(block.author())
432                .hostname
433                .as_str();
434            self.context
435                .metrics
436                .node_metrics
437                .block_manager_skipped_blocks
438                .with_label_values(&[hostname])
439                .inc();
440            return TryAcceptResult::Skipped;
441        }
442
443        // Keep only the ancestors that are greater than the GC round to check for their
444        // existence. Keep in mind that if GC is disabled then gc_round will be
445        // 0 and all ancestors will be considered.
446        let ancestors = if gc_enabled {
447            block
448                .ancestors()
449                .iter()
450                .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
451                .cloned()
452                .collect::<Vec<_>>()
453        } else {
454            block.ancestors().to_vec()
455        };
456
457        // make sure that we have all the required ancestors in store
458        for (found, ancestor) in dag_state
459            .contains_blocks(ancestors.clone())
460            .into_iter()
461            .zip(ancestors.iter())
462        {
463            if !found {
464                missing_ancestors.insert(*ancestor);
465
466                // mark the block as having missing ancestors
467                self.missing_ancestors
468                    .entry(*ancestor)
469                    .or_default()
470                    .insert(block_ref);
471
472                let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
473                self.context
474                    .metrics
475                    .node_metrics
476                    .block_manager_missing_ancestors_by_authority
477                    .with_label_values(&[ancestor_hostname])
478                    .inc();
479
480                // Add the ancestor to the missing blocks set only if it doesn't already exist
481                // in the suspended blocks - meaning that we already have its
482                // payload.
483                if !self.suspended_blocks.contains_key(ancestor) {
484                    // Fetches the block if it is not in dag state or suspended.
485                    ancestors_to_fetch.insert(*ancestor);
486                    // We also want to keep track of the authorities that have this block.
487                    // This block could be already missing, so we just update the set  of
488                    // authorities who have it.
489                    let entry = self.missing_blocks.entry(*ancestor);
490                    match entry {
491                        Entry::Vacant(v) => {
492                            v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
493                            self.context
494                                .metrics
495                                .node_metrics
496                                .block_manager_missing_blocks_by_authority
497                                .with_label_values(&[ancestor_hostname])
498                                .inc();
499                        }
500                        Entry::Occupied(mut o) => {
501                            o.get_mut().insert(block_ref.author);
502                        }
503                    }
504                }
505            }
506        }
507
508        // Remove the block ref from the `missing_blocks` - if exists - since we now
509        // have received the block. The block might still get suspended, but we
510        // won't report it as missing in order to not re-fetch.
511        self.missing_blocks.remove(&block.reference());
512
513        if !missing_ancestors.is_empty() {
514            let hostname = self
515                .context
516                .committee
517                .authority(block.author())
518                .hostname
519                .as_str();
520            self.context
521                .metrics
522                .node_metrics
523                .block_suspensions
524                .with_label_values(&[hostname])
525                .inc();
526            self.suspended_blocks
527                .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
528            return TryAcceptResult::Suspended(ancestors_to_fetch);
529        }
530
531        TryAcceptResult::Accepted(block)
532    }
533
534    /// Given an accepted block `accepted_block` it attempts to accept all the
535    /// suspended children blocks assuming such exist. All the unsuspended /
536    /// accepted blocks are returned as a vector in causal order.
537    fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
538        let mut unsuspended_blocks = vec![];
539        let mut to_process_blocks = vec![accepted_block];
540
541        while let Some(block_ref) = to_process_blocks.pop() {
542            // And try to check if its direct children can be unsuspended
543            if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
544                for r in block_refs_with_missing_deps {
545                    // For each dependency try to unsuspend it. If that's successful then we add it
546                    // to the queue so we can recursively try to unsuspend its
547                    // children.
548                    if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
549                        to_process_blocks.push(block.block.reference());
550                        unsuspended_blocks.push(block);
551                    }
552                }
553            }
554        }
555
556        let now = Instant::now();
557
558        // Report the unsuspended blocks
559        for block in &unsuspended_blocks {
560            let hostname = self
561                .context
562                .committee
563                .authority(block.block.author())
564                .hostname
565                .as_str();
566            self.context
567                .metrics
568                .node_metrics
569                .block_unsuspensions
570                .with_label_values(&[hostname])
571                .inc();
572            self.context
573                .metrics
574                .node_metrics
575                .suspended_block_time
576                .with_label_values(&[hostname])
577                .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
578        }
579
580        unsuspended_blocks
581            .into_iter()
582            .map(|block| block.block)
583            .collect()
584    }
585
586    /// Attempts to unsuspend a block by checking its ancestors and removing the
587    /// `accepted_dependency` by its local set. If there is no missing
588    /// dependency then this block can be unsuspended immediately and is removed
589    /// from the `suspended_blocks` map.
590    fn try_unsuspend_block(
591        &mut self,
592        block_ref: &BlockRef,
593        accepted_dependency: &BlockRef,
594    ) -> Option<SuspendedBlock> {
595        let block = self
596            .suspended_blocks
597            .get_mut(block_ref)
598            .expect("Block should be in suspended map");
599
600        assert!(
601            block.missing_ancestors.remove(accepted_dependency),
602            "Block reference {} should be present in missing dependencies of {:?}",
603            block_ref,
604            block.block
605        );
606
607        if block.missing_ancestors.is_empty() {
608            // we have no missing dependency, so we unsuspend the block and return it
609            return self.suspended_blocks.remove(block_ref);
610        }
611        None
612    }
613
614    /// Tries to unsuspend any blocks for the latest gc round. If gc round
615    /// hasn't changed then no blocks will be unsuspended due to
616    /// this action.
617    pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
618        let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
619        let (gc_enabled, gc_round) = {
620            let dag_state = self.dag_state.read();
621            (dag_state.gc_enabled(), dag_state.gc_round())
622        };
623        let mut blocks_unsuspended_below_gc_round = 0;
624        let mut blocks_gc_ed = 0;
625
626        if !gc_enabled {
627            trace!("GC is disabled, no blocks will attempt to get unsuspended.");
628            return;
629        }
630
631        while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
632            // If the first block in the missing ancestors is higher than the gc_round, then
633            // we can't unsuspend it yet. So we just put it back
634            // and we terminate the iteration as any next entry will be of equal or higher
635            // round anyways.
636            if block_ref.round > gc_round {
637                return;
638            }
639
640            blocks_gc_ed += 1;
641
642            let hostname = self
643                .context
644                .committee
645                .authority(block_ref.author)
646                .hostname
647                .as_str();
648            self.context
649                .metrics
650                .node_metrics
651                .block_manager_gced_blocks
652                .with_label_values(&[hostname])
653                .inc();
654
655            assert!(
656                !self.suspended_blocks.contains_key(block_ref),
657                "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
658            );
659
660            // Also remove it from the missing list - we don't want to keep looking for it.
661            self.missing_blocks.remove(block_ref);
662
663            // Find all the children blocks that have a dependency on this one and try to
664            // unsuspend them
665            let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
666
667            unsuspended_blocks.iter().for_each(|block| {
668                if block.round() <= gc_round {
669                    blocks_unsuspended_below_gc_round += 1;
670                }
671            });
672
673            // Now validate their timestamps and accept them
674            let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
675            for block in accepted_blocks {
676                let hostname = self
677                    .context
678                    .committee
679                    .authority(block.author())
680                    .hostname
681                    .as_str();
682                self.context
683                    .metrics
684                    .node_metrics
685                    .block_manager_gc_unsuspended_blocks
686                    .with_label_values(&[hostname])
687                    .inc();
688            }
689        }
690
691        debug!(
692            "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
693            blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
694        );
695    }
696
697    /// Returns all the blocks that are currently missing and needed in order to
698    /// accept suspended blocks. For each block reference it returns the set of
699    /// authorities who have this block.
700    pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
701        self.missing_blocks.clone()
702    }
703
704    /// Returns all the block refs that are currently missing.
705    #[cfg(test)]
706    pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
707        self.missing_blocks.keys().cloned().collect()
708    }
709
710    fn update_stats(&mut self, missing_blocks: u64) {
711        let metrics = &self.context.metrics.node_metrics;
712        metrics.missing_blocks_total.inc_by(missing_blocks);
713        metrics
714            .block_manager_suspended_blocks
715            .set(self.suspended_blocks.len() as i64);
716        metrics
717            .block_manager_missing_ancestors
718            .set(self.missing_ancestors.len() as i64);
719        metrics
720            .block_manager_missing_blocks
721            .set(self.missing_blocks.len() as i64);
722    }
723
724    fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
725        let (min_round, max_round) =
726            if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
727                (curr_min.min(block.round()), curr_max.max(block.round()))
728            } else {
729                (block.round(), block.round())
730            };
731        self.received_block_rounds[block.author()] = Some((min_round, max_round));
732
733        let hostname = &self.context.committee.authority(block.author()).hostname;
734        self.context
735            .metrics
736            .node_metrics
737            .lowest_verified_authority_round
738            .with_label_values(&[hostname])
739            .set(min_round.into());
740        self.context
741            .metrics
742            .node_metrics
743            .highest_verified_authority_round
744            .with_label_values(&[hostname])
745            .set(max_round.into());
746    }
747
748    /// Checks if block manager is empty.
749    #[cfg(test)]
750    pub(crate) fn is_empty(&self) -> bool {
751        self.suspended_blocks.is_empty()
752            && self.missing_ancestors.is_empty()
753            && self.missing_blocks.is_empty()
754    }
755
756    /// Returns all the suspended blocks whose causal history we miss hence we
757    /// can't accept them yet.
758    #[cfg(test)]
759    fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
760        self.suspended_blocks.keys().cloned().collect()
761    }
762}
763
764// Result of trying to accept one block.
765enum TryAcceptResult {
766    // The block is accepted. Wraps the block itself.
767    Accepted(VerifiedBlock),
768    // The block is suspended. Wraps ancestors to be fetched.
769    Suspended(BTreeSet<BlockRef>),
770    // The block has been processed before and already exists in BlockManager (and is suspended)
771    // or in DagState (so has been already accepted). No further processing has been done at
772    // this point.
773    Processed,
774    // When a received block is <= gc_round, then we simply skip its processing as there is no
775    // meaning do any action on it or even store it.
776    Skipped,
777}
778
779#[cfg(test)]
780mod tests {
781    use std::{collections::BTreeSet, sync::Arc};
782
783    use consensus_config::AuthorityIndex;
784    use parking_lot::RwLock;
785    use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
786    use rstest::rstest;
787
788    use crate::{
789        CommitDigest, Round,
790        block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
791        block_manager::BlockManager,
792        block_verifier::{BlockVerifier, NoopBlockVerifier},
793        commit::TrustedCommit,
794        context::Context,
795        dag_state::DagState,
796        error::{ConsensusError, ConsensusResult},
797        storage::mem_store::MemStore,
798        test_dag_builder::DagBuilder,
799        test_dag_parser::parse_dag,
800    };
801
802    #[tokio::test]
803    async fn suspend_blocks_with_missing_ancestors() {
804        // GIVEN
805        let (context, _key_pairs) = Context::new_for_test(4);
806        let context = Arc::new(context);
807        let store = Arc::new(MemStore::new());
808        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
809
810        let mut block_manager =
811            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
812
813        // create a DAG
814        let mut dag_builder = DagBuilder::new(context.clone());
815        dag_builder
816            .layers(1..=2) // 2 rounds
817            .authorities(vec![
818                AuthorityIndex::new_for_test(0),
819                AuthorityIndex::new_for_test(2),
820            ]) // Create equivocating blocks for 2 authorities
821            .equivocate(3)
822            .build();
823
824        // Take only the blocks of round 2 and try to accept them
825        let round_2_blocks = dag_builder
826            .blocks
827            .into_iter()
828            .filter_map(|(_, block)| (block.round() == 2).then_some(block))
829            .collect::<Vec<VerifiedBlock>>();
830
831        // WHEN
832        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
833
834        // THEN
835        assert!(accepted_blocks.is_empty());
836
837        // AND the returned missing ancestors should be the same as the provided block
838        // ancestors
839        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
840        let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
841        assert_eq!(missing, missing_block_refs);
842
843        // AND the missing blocks are the parents of the round 2 blocks. Since this is a
844        // fully connected DAG taking the ancestors of the first element
845        // suffices.
846        assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
847
848        // AND suspended blocks should return the round_2_blocks
849        assert_eq!(
850            block_manager.suspended_blocks_refs(),
851            round_2_blocks
852                .into_iter()
853                .map(|block| block.reference())
854                .collect::<BTreeSet<_>>()
855        );
856
857        // AND each missing block should be known to all authorities
858        let known_by_manager = block_manager
859            .missing_blocks()
860            .iter()
861            .next()
862            .expect("We should expect at least two elements there")
863            .1
864            .clone();
865        assert_eq!(
866            known_by_manager,
867            context
868                .committee
869                .authorities()
870                .map(|(a, _)| a)
871                .collect::<BTreeSet<_>>()
872        );
873    }
874
875    #[tokio::test]
876    async fn try_accept_block_returns_missing_blocks() {
877        let (context, _key_pairs) = Context::new_for_test(4);
878        let context = Arc::new(context);
879        let store = Arc::new(MemStore::new());
880        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
881
882        let mut block_manager =
883            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
884
885        // create a DAG
886        let mut dag_builder = DagBuilder::new(context.clone());
887        dag_builder
888            .layers(1..=4) // 4 rounds
889            .authorities(vec![
890                AuthorityIndex::new_for_test(0),
891                AuthorityIndex::new_for_test(2),
892            ]) // Create equivocating blocks for 2 authorities
893            .equivocate(3) // Use 3 equivocations blocks per authority
894            .build();
895
896        // Take the blocks from round 4 up to 2 (included). Only the first block of each
897        // round should return missing ancestors when try to accept
898        for (_, block) in dag_builder
899            .blocks
900            .into_iter()
901            .rev()
902            .take_while(|(_, block)| block.round() >= 2)
903        {
904            // WHEN
905            let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
906
907            // THEN
908            assert!(accepted_blocks.is_empty());
909
910            let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
911            assert_eq!(missing, block_ancestors);
912        }
913    }
914
915    #[tokio::test]
916    async fn accept_blocks_with_complete_causal_history() {
917        // GIVEN
918        let (context, _key_pairs) = Context::new_for_test(4);
919        let context = Arc::new(context);
920        let store = Arc::new(MemStore::new());
921        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
922
923        let mut block_manager =
924            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
925
926        // create a DAG of 2 rounds
927        let mut dag_builder = DagBuilder::new(context.clone());
928        dag_builder.layers(1..=2).build();
929
930        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
931
932        // WHEN
933        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
934
935        // THEN
936        assert_eq!(accepted_blocks.len(), 8);
937        assert_eq!(
938            accepted_blocks,
939            all_blocks
940                .iter()
941                .filter(|block| block.round() > 0)
942                .cloned()
943                .collect::<Vec<VerifiedBlock>>()
944        );
945        assert!(missing.is_empty());
946        assert!(block_manager.is_empty());
947
948        // WHEN trying to accept same blocks again, then none will be returned as those
949        // have been already accepted
950        let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
951        assert!(accepted_blocks.is_empty());
952    }
953
954    /// Tests that the block manager accepts blocks when some or all of their
955    /// causal history is below or equal to the GC round.
956    #[tokio::test]
957    async fn accept_blocks_with_causal_history_below_gc_round() {
958        // GIVEN
959        let (mut context, _key_pairs) = Context::new_for_test(4);
960
961        // We set the gc depth to 4
962        context
963            .protocol_config
964            .set_consensus_gc_depth_for_testing(4);
965        let context = Arc::new(context);
966        let store = Arc::new(MemStore::new());
967        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
968
969        // We "fake" the commit for round 10, so we can test the GC round 6
970        // (commit_round - gc_depth = 10 - 4 = 6)
971        let last_commit = TrustedCommit::new_for_test(
972            10,
973            CommitDigest::MIN,
974            context.clock.timestamp_utc_ms(),
975            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
976            vec![],
977        );
978        dag_state.write().set_last_commit(last_commit);
979        assert_eq!(
980            dag_state.read().gc_round(),
981            6,
982            "GC round should have moved to round 6"
983        );
984
985        let mut block_manager =
986            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
987
988        // create a DAG of 10 rounds with some weak links for the blocks of round 9
989        let dag_str = "DAG {
990            Round 0 : { 4 },
991            Round 1 : { * },
992            Round 2 : { * },
993            Round 3 : { * },
994            Round 4 : { * },
995            Round 5 : { * },
996            Round 6 : { * },
997            Round 7 : {
998                A -> [*],
999                B -> [*],
1000                C -> [*],
1001            }
1002            Round 8 : {
1003                A -> [*],
1004                B -> [*],
1005                C -> [*],
1006            },
1007            Round 9 : {
1008                A -> [A8, B8, C8, D6],
1009                B -> [A8, B8, C8, D6],
1010                C -> [A8, B8, C8, D6],
1011                D -> [A8, B8, C8, D6],
1012            },
1013            Round 10 : { * },
1014        }";
1015
1016        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1017
1018        // Now take all the blocks for round 7 & 8 , which are above the gc_round = 6.
1019        // All those blocks should eventually be returned as accepted. Pay attention
1020        // that without GC none of those blocks should get accepted.
1021        let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1022
1023        for rounds_range in blocks_ranges {
1024            let all_blocks = dag_builder
1025                .blocks
1026                .values()
1027                .filter(|block| rounds_range.contains(&block.round()))
1028                .cloned()
1029                .collect::<Vec<_>>();
1030
1031            // WHEN
1032            let mut reversed_blocks = all_blocks.clone();
1033            reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1034            let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1035            accepted_blocks.sort_by_key(|a| a.reference());
1036
1037            // THEN
1038            assert_eq!(accepted_blocks, all_blocks.to_vec());
1039            assert!(missing.is_empty());
1040            assert!(block_manager.is_empty());
1041
1042            let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1043            assert!(accepted_blocks.is_empty());
1044        }
1045    }
1046
1047    /// Blocks that are attempted to be accepted but are <= gc_round they will
1048    /// be skipped for processing. Nothing should be stored or trigger any
1049    /// unsuspension etc.
1050    #[tokio::test]
1051    async fn skip_accepting_blocks_below_gc_round() {
1052        // GIVEN
1053        let (mut context, _key_pairs) = Context::new_for_test(4);
1054        // We set the gc depth to 4
1055        context
1056            .protocol_config
1057            .set_consensus_gc_depth_for_testing(4);
1058        let context = Arc::new(context);
1059        let store = Arc::new(MemStore::new());
1060        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1061
1062        // We "fake" the commit for round 10, so we can test the GC round 6
1063        // (commit_round - gc_depth = 10 - 4 = 6)
1064        let last_commit = TrustedCommit::new_for_test(
1065            10,
1066            CommitDigest::MIN,
1067            context.clock.timestamp_utc_ms(),
1068            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1069            vec![],
1070        );
1071        dag_state.write().set_last_commit(last_commit);
1072        assert_eq!(
1073            dag_state.read().gc_round(),
1074            6,
1075            "GC round should have moved to round 6"
1076        );
1077
1078        let mut block_manager =
1079            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1080
1081        // create a DAG of 6 rounds
1082        let mut dag_builder = DagBuilder::new(context.clone());
1083        dag_builder.layers(1..=6).build();
1084
1085        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1086
1087        // WHEN
1088        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1089
1090        // THEN
1091        assert!(accepted_blocks.is_empty());
1092        assert!(missing.is_empty());
1093        assert!(block_manager.is_empty());
1094    }
1095
1096    /// The test generate blocks for a well connected DAG and feed them to block
1097    /// manager in random order. In the end all the blocks should be
1098    /// uniquely suspended and no missing blocks should exist. The test will run
1099    /// for both gc_enabled/disabled. When gc is enabeld we set a high
1100    /// gc_depth value so in practice gc_round will be 0, but we'll be able to
1101    /// test in the common case that this work exactly the same way as when
1102    /// gc is disabled.
1103    #[rstest]
1104    #[tokio::test]
1105    async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1106        // GIVEN
1107        let (mut context, _key_pairs) = Context::new_for_test(4);
1108
1109        if gc_enabled {
1110            context
1111                .protocol_config
1112                .set_consensus_gc_depth_for_testing(10);
1113        }
1114        let context = Arc::new(context);
1115
1116        // create a DAG of rounds 1 ~ 3
1117        let mut dag_builder = DagBuilder::new(context.clone());
1118        dag_builder.layers(1..=3).build();
1119
1120        let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1121
1122        // Now randomize the sequence of sending the blocks to block manager. In the end
1123        // all the blocks should be uniquely suspended and no missing blocks
1124        // should exist.
1125        for seed in 0..100u8 {
1126            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1127
1128            let store = Arc::new(MemStore::new());
1129            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1130
1131            let mut block_manager =
1132                BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1133
1134            // WHEN
1135            let mut all_accepted_blocks = vec![];
1136            for block in &all_blocks {
1137                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1138
1139                all_accepted_blocks.extend(accepted_blocks);
1140            }
1141
1142            // THEN
1143            all_accepted_blocks.sort_by_key(|b| b.reference());
1144            all_blocks.sort_by_key(|b| b.reference());
1145
1146            assert_eq!(
1147                all_accepted_blocks, all_blocks,
1148                "Failed acceptance sequence for seed {seed}"
1149            );
1150            assert!(block_manager.is_empty());
1151        }
1152    }
1153
1154    /// Tests that `missing_blocks()` correctly infers the authorities
1155    /// referencing each missing block based on accepted blocks in the DAG.
1156    #[tokio::test]
1157    async fn authorities_that_know_missing_blocks() {
1158        let (context, _key_pairs) = Context::new_for_test(4);
1159
1160        let context = Arc::new(context);
1161
1162        // create a DAG of rounds 1 ~ 3
1163        let mut dag_builder = DagBuilder::new(context.clone());
1164        dag_builder.layers(1..=3).build();
1165
1166        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1167
1168        let blocks_round_2 = all_blocks
1169            .iter()
1170            .filter(|block| block.round() == 2)
1171            .cloned()
1172            .collect::<Vec<_>>();
1173
1174        let blocks_round_1 = all_blocks
1175            .iter()
1176            .filter(|block| block.round() == 1)
1177            .map(|block| block.reference())
1178            .collect::<BTreeSet<_>>();
1179
1180        let store = Arc::new(MemStore::new());
1181        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1182
1183        let mut block_manager =
1184            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1185
1186        let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1187        // Blocks from round 1 are all missing, since the DAG is fully connected
1188        assert_eq!(missing_blocks, blocks_round_1);
1189
1190        let missing_blocks_with_authorities = block_manager.missing_blocks();
1191
1192        let block_round_1_authority_0 = all_blocks
1193            .iter()
1194            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1195            .map(|block| block.reference())
1196            .next()
1197            .unwrap();
1198        let block_round_1_authority_1 = all_blocks
1199            .iter()
1200            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1201            .map(|block| block.reference())
1202            .next()
1203            .unwrap();
1204        assert_eq!(
1205            missing_blocks_with_authorities[&block_round_1_authority_0],
1206            BTreeSet::from([AuthorityIndex::new_for_test(0)])
1207        );
1208        assert_eq!(
1209            missing_blocks_with_authorities[&block_round_1_authority_1],
1210            BTreeSet::from([
1211                AuthorityIndex::new_for_test(0),
1212                AuthorityIndex::new_for_test(1)
1213            ])
1214        );
1215
1216        // Add a new block from round 2 from authority 1, which updates the set of
1217        // authorities that are aware of the missing blocks
1218        block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1219        let missing_blocks_with_authorities = block_manager.missing_blocks();
1220        assert_eq!(
1221            missing_blocks_with_authorities[&block_round_1_authority_0],
1222            BTreeSet::from([
1223                AuthorityIndex::new_for_test(0),
1224                AuthorityIndex::new_for_test(1)
1225            ])
1226        );
1227    }
1228
1229    #[rstest]
1230    #[tokio::test]
1231    async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1232        telemetry_subscribers::init_for_testing();
1233        // GIVEN
1234        let (mut context, _key_pairs) = Context::new_for_test(4);
1235
1236        if gc_depth > 0 {
1237            context
1238                .protocol_config
1239                .set_consensus_gc_depth_for_testing(gc_depth);
1240        }
1241        let context = Arc::new(context);
1242
1243        // create a DAG of rounds 1 ~ gc_depth * 2
1244        let mut dag_builder = DagBuilder::new(context.clone());
1245        dag_builder.layers(1..=gc_depth * 2).build();
1246
1247        // Pay attention that we start from round 2. Round 1 will always be missing so
1248        // no matter what we do we can't unsuspend it unless gc_round has
1249        // advanced to round >= 1.
1250        let mut all_blocks = dag_builder
1251            .blocks
1252            .values()
1253            .filter(|block| block.round() > 1)
1254            .cloned()
1255            .collect::<Vec<_>>();
1256
1257        // Now randomize the sequence of sending the blocks to block manager. In the end
1258        // all the blocks should be uniquely suspended and no missing blocks
1259        // should exist.
1260        for seed in 0..100u8 {
1261            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1262
1263            let store = Arc::new(MemStore::new());
1264            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1265
1266            let mut block_manager = BlockManager::new(
1267                context.clone(),
1268                dag_state.clone(),
1269                Arc::new(NoopBlockVerifier),
1270            );
1271
1272            // WHEN
1273            for block in &all_blocks {
1274                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1275                assert!(accepted_blocks.is_empty());
1276            }
1277            assert!(!block_manager.is_empty());
1278
1279            // AND also call the try_to_find method with some non existing block refs. Those
1280            // should be cleaned up as well once GC kicks in.
1281            let non_existing_refs = (1..=3)
1282                .map(|round| {
1283                    BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1284                })
1285                .collect::<Vec<_>>();
1286            assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1287
1288            // AND
1289            // Trigger a commit which will advance GC round
1290            let last_commit = TrustedCommit::new_for_test(
1291                gc_depth * 2,
1292                CommitDigest::MIN,
1293                context.clock.timestamp_utc_ms(),
1294                BlockRef::new(
1295                    gc_depth * 2,
1296                    AuthorityIndex::new_for_test(0),
1297                    BlockDigest::MIN,
1298                ),
1299                vec![],
1300            );
1301            dag_state.write().set_last_commit(last_commit);
1302
1303            // AND
1304            block_manager.try_unsuspend_blocks_for_latest_gc_round();
1305
1306            // THEN
1307            assert!(block_manager.is_empty());
1308
1309            // AND ensure that all have been accepted to the DAG
1310            for block in &all_blocks {
1311                assert!(dag_state.read().contains_block(&block.reference()));
1312            }
1313        }
1314    }
1315
1316    #[rstest]
1317    #[tokio::test]
1318    async fn try_accept_committed_blocks() {
1319        // GIVEN
1320        let (mut context, _key_pairs) = Context::new_for_test(4);
1321        // We set the gc depth to 4
1322        context
1323            .protocol_config
1324            .set_consensus_gc_depth_for_testing(4);
1325        let context = Arc::new(context);
1326        let store = Arc::new(MemStore::new());
1327        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1328
1329        // We "fake" the commit for round 6, so GC round moves to (commit_round -
1330        // gc_depth = 6 - 4 = 2)
1331        let last_commit = TrustedCommit::new_for_test(
1332            10,
1333            CommitDigest::MIN,
1334            context.clock.timestamp_utc_ms(),
1335            BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1336            vec![],
1337        );
1338        dag_state.write().set_last_commit(last_commit);
1339        assert_eq!(
1340            dag_state.read().gc_round(),
1341            2,
1342            "GC round should have moved to round 2"
1343        );
1344
1345        let mut block_manager =
1346            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1347
1348        // create a DAG of 12 rounds
1349        let mut dag_builder = DagBuilder::new(context.clone());
1350        dag_builder.layers(1..=12).build();
1351
1352        // Now try to accept via the normal acceptance block path the blocks of rounds 7
1353        // ~ 12. None of them should be accepted
1354        let blocks = dag_builder.blocks(7..=12);
1355        let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1356        assert!(accepted_blocks.is_empty());
1357        assert_eq!(missing.len(), 4);
1358
1359        // Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6.
1360        // All of them should be accepted and also the blocks of rounds 7 ~ 12
1361        // should be unsuspended and accepted as well.
1362        let blocks = dag_builder.blocks(3..=6);
1363
1364        // WHEN
1365        let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1366
1367        // THEN
1368        accepted_blocks.sort_by_key(|b| b.reference());
1369
1370        let mut all_blocks = dag_builder.blocks(3..=12);
1371        all_blocks.sort_by_key(|b| b.reference());
1372
1373        assert_eq!(accepted_blocks, all_blocks);
1374        assert!(block_manager.is_empty());
1375    }
1376
1377    struct TestBlockVerifier {
1378        fail: BTreeSet<BlockRef>,
1379    }
1380
1381    impl TestBlockVerifier {
1382        fn new(fail: BTreeSet<BlockRef>) -> Self {
1383            Self { fail }
1384        }
1385    }
1386
1387    impl BlockVerifier for TestBlockVerifier {
1388        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1389            Ok(())
1390        }
1391
1392        fn check_ancestors(
1393            &self,
1394            block: &VerifiedBlock,
1395            _ancestors: &[Option<VerifiedBlock>],
1396            _gc_enabled: bool,
1397            _gc_round: Round,
1398        ) -> ConsensusResult<()> {
1399            if self.fail.contains(&block.reference()) {
1400                Err(ConsensusError::InvalidBlockTimestamp {
1401                    max_timestamp_ms: 0,
1402                    block_timestamp_ms: block.timestamp_ms(),
1403                })
1404            } else {
1405                Ok(())
1406            }
1407        }
1408    }
1409
1410    #[tokio::test]
1411    async fn reject_blocks_failing_verifications() {
1412        let (context, _key_pairs) = Context::new_for_test(4);
1413        let context = Arc::new(context);
1414
1415        // create a DAG of rounds 1 ~ 5.
1416        let mut dag_builder = DagBuilder::new(context.clone());
1417        dag_builder.layers(1..=5).build();
1418
1419        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1420
1421        // Create a test verifier that fails the blocks of round 3
1422        let test_verifier = TestBlockVerifier::new(
1423            all_blocks
1424                .iter()
1425                .filter(|block| block.round() == 3)
1426                .map(|block| block.reference())
1427                .collect(),
1428        );
1429
1430        // Create BlockManager.
1431        let store = Arc::new(MemStore::new());
1432        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1433        let mut block_manager =
1434            BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1435
1436        // Try to accept blocks from round 2 ~ 5 into block manager. All of them should
1437        // be suspended.
1438        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1439            all_blocks
1440                .iter()
1441                .filter(|block| block.round() > 1)
1442                .cloned()
1443                .collect(),
1444        );
1445
1446        // Missing refs should all come from round 1.
1447        assert!(accepted_blocks.is_empty());
1448        assert_eq!(missing_refs.len(), 4);
1449        missing_refs.iter().for_each(|missing_ref| {
1450            assert_eq!(missing_ref.round, 1);
1451        });
1452
1453        // Now add round 1 blocks into block manager.
1454        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1455            all_blocks
1456                .iter()
1457                .filter(|block| block.round() == 1)
1458                .cloned()
1459                .collect(),
1460        );
1461
1462        // Only round 1 and round 2 blocks should be accepted.
1463        assert_eq!(accepted_blocks.len(), 8);
1464        accepted_blocks.iter().for_each(|block| {
1465            assert!(block.round() <= 2);
1466        });
1467        assert!(missing_refs.is_empty());
1468
1469        // Other blocks should be rejected and there should be no remaining suspended
1470        // block.
1471        assert!(block_manager.suspended_blocks_refs().is_empty());
1472    }
1473
1474    #[tokio::test]
1475    async fn try_find_blocks() {
1476        // GIVEN
1477        let (context, _key_pairs) = Context::new_for_test(4);
1478        let context = Arc::new(context);
1479        let store = Arc::new(MemStore::new());
1480        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1481
1482        let mut block_manager =
1483            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1484
1485        // create a DAG
1486        let mut dag_builder = DagBuilder::new(context.clone());
1487        dag_builder
1488            .layers(1..=2) // 2 rounds
1489            .authorities(vec![
1490                AuthorityIndex::new_for_test(0),
1491                AuthorityIndex::new_for_test(2),
1492            ]) // Create equivocating blocks for 2 authorities
1493            .equivocate(3)
1494            .build();
1495
1496        // Take only the blocks of round 2 and try to accept them
1497        let round_2_blocks = dag_builder
1498            .blocks
1499            .iter()
1500            .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1501            .collect::<Vec<VerifiedBlock>>();
1502
1503        // All blocks should be missing
1504        let missing_block_refs_from_find =
1505            block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1506        assert_eq!(missing_block_refs_from_find.len(), 10);
1507        assert!(
1508            missing_block_refs_from_find
1509                .iter()
1510                .all(|block_ref| block_ref.round == 2)
1511        );
1512
1513        // Try accept blocks which will cause blocks to be suspended and added to
1514        // missing in block manager.
1515        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1516        assert!(accepted_blocks.is_empty());
1517
1518        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1519        let missing_block_refs_from_accept =
1520            missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1521        assert_eq!(missing, missing_block_refs_from_accept);
1522        assert_eq!(
1523            block_manager.missing_block_refs(),
1524            missing_block_refs_from_accept
1525        );
1526
1527        // No blocks should be accepted and block manager should have made note
1528        // of the missing & suspended blocks.
1529        // Now we can check get the result of try find block with all of the blocks
1530        // from newly created but not accepted round 3.
1531        dag_builder.layer(3).build();
1532
1533        let round_3_blocks = dag_builder
1534            .blocks
1535            .iter()
1536            .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1537            .collect::<Vec<BlockRef>>();
1538
1539        let missing_block_refs_from_find = block_manager.try_find_blocks(
1540            round_2_blocks
1541                .iter()
1542                .map(|b| b.reference())
1543                .chain(round_3_blocks.into_iter())
1544                .collect(),
1545        );
1546
1547        assert_eq!(missing_block_refs_from_find.len(), 4);
1548        assert!(
1549            missing_block_refs_from_find
1550                .iter()
1551                .all(|block_ref| block_ref.round == 3)
1552        );
1553        assert_eq!(
1554            block_manager.missing_block_refs(),
1555            missing_block_refs_from_accept
1556                .into_iter()
1557                .chain(missing_block_refs_from_find.into_iter())
1558                .collect()
1559        );
1560    }
1561}