consensus_core/
block_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, btree_map::Entry},
7    sync::Arc,
8    time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, instrument, trace, warn};
16
17use crate::{
18    Round,
19    block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    context::Context,
22    dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27    block: VerifiedBlock,
28    missing_ancestors: BTreeSet<BlockRef>,
29    timestamp: Instant,
30}
31
32impl SuspendedBlock {
33    pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34        Self {
35            block,
36            missing_ancestors,
37            timestamp: Instant::now(),
38        }
39    }
40}
41
42/// Block manager suspends incoming blocks until they are connected to the
43/// existing graph, returning newly connected blocks.
44/// TODO: As it is possible to have Byzantine validators who produce Blocks
45/// without valid causal history we need to make sure that BlockManager takes
46/// care of that and avoid OOM (Out Of Memory) situations.
47pub(crate) struct BlockManager {
48    context: Arc<Context>,
49    dag_state: Arc<RwLock<DagState>>,
50    block_verifier: Arc<dyn BlockVerifier>,
51
52    /// Keeps all the suspended blocks. A suspended block is a block that is
53    /// missing part of its causal history and thus can't be immediately
54    /// processed. A block will remain in this map until all its causal history
55    /// has been successfully processed.
56    suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57    /// A map that keeps all the blocks that we are missing (keys) and the
58    /// corresponding blocks that reference the missing blocks as ancestors
59    /// and need them to get unsuspended. It is possible for a missing
60    /// dependency (key) to be a suspended block, so the block has been
61    /// already fetched but it self is still missing some of its ancestors to be
62    /// processed.
63    missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64    /// A map of currently missing blocks to the set of authorities expected
65    /// to have them available locally. This set is approximated based on the
66    /// block's author and the authors of its direct children.
67    /// A block is considered missing if it appears in `missing_ancestors`
68    /// and has not yet been accepted or fetched. Blocks already stored or
69    /// present in `suspended_blocks` are excluded.
70    missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71    /// A vector that holds a tuple of (lowest_round, highest_round) of received
72    /// blocks per authority. This is used for metrics reporting purposes
73    /// and resets during restarts.
74    received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78    pub(crate) fn new(
79        context: Arc<Context>,
80        dag_state: Arc<RwLock<DagState>>,
81        block_verifier: Arc<dyn BlockVerifier>,
82    ) -> Self {
83        let committee_size = context.committee.size();
84        Self {
85            context,
86            dag_state,
87            block_verifier,
88            suspended_blocks: BTreeMap::new(),
89            missing_ancestors: BTreeMap::new(),
90            missing_blocks: BTreeMap::new(),
91            received_block_rounds: vec![None; committee_size],
92        }
93    }
94
95    /// Tries to accept the provided blocks assuming that all their causal
96    /// history exists. The method returns all the blocks that have been
97    /// successfully processed in round ascending order, that includes also
98    /// previously suspended blocks that have now been able to get accepted.
99    /// Method also returns a set with the missing ancestor blocks.
100    #[tracing::instrument(skip_all)]
101    pub(crate) fn try_accept_blocks(
102        &mut self,
103        blocks: Vec<VerifiedBlock>,
104    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105        let _s = monitored_scope("BlockManager::try_accept_blocks");
106        self.try_accept_blocks_internal(blocks, false)
107    }
108
109    // Tries to accept blocks that have been committed. Returns all the blocks that
110    // have been accepted, both from the ones provided and any children blocks.
111    #[tracing::instrument(skip_all)]
112    pub(crate) fn try_accept_committed_blocks(
113        &mut self,
114        blocks: Vec<VerifiedBlock>,
115    ) -> Vec<VerifiedBlock> {
116        // If GC is disabled then should not run any of this logic.
117        if !self.dag_state.read().gc_enabled() {
118            return Vec::new();
119        }
120
121        // Just accept the blocks
122        let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123        let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124        assert!(
125            missing_blocks.is_empty(),
126            "No missing blocks should be returned for committed blocks"
127        );
128
129        accepted_blocks
130    }
131
132    /// Attempts to accept the provided blocks. When `committed = true` then the
133    /// blocks are considered to be committed via certified commits and
134    /// are handled differently.
135    fn try_accept_blocks_internal(
136        &mut self,
137        mut blocks: Vec<VerifiedBlock>,
138        committed: bool,
139    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140        let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142        blocks.sort_by_key(|b| b.round());
143        debug!(
144            "Trying to accept blocks: {}",
145            blocks.iter().map(|b| b.reference().to_string()).join(",")
146        );
147
148        let mut accepted_blocks = vec![];
149        let mut missing_blocks = BTreeSet::new();
150
151        for block in blocks {
152            self.update_block_received_metrics(&block);
153
154            // Try to accept the input block.
155            let block_ref = block.reference();
156
157            let mut to_verify_timestamps_and_accept = vec![];
158            if committed {
159                match self.try_accept_one_committed_block(block) {
160                    TryAcceptResult::Accepted(block) => {
161                        // As this is a committed block, then it's already accepted and there is no
162                        // need to verify its timestamps. Just add it to the
163                        // accepted blocks list.
164                        accepted_blocks.push(block);
165                    }
166                    TryAcceptResult::Processed => continue,
167                    TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168                        panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169                    }
170                };
171            } else {
172                match self.try_accept_one_block(block) {
173                    TryAcceptResult::Accepted(block) => {
174                        to_verify_timestamps_and_accept.push(block);
175                    }
176                    TryAcceptResult::Suspended(ancestors_to_fetch) => {
177                        debug!(
178                            "Missing ancestors to fetch for block {block_ref}: {}",
179                            ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180                        );
181                        missing_blocks.extend(ancestors_to_fetch);
182                        continue;
183                    }
184                    TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185                };
186            };
187
188            // If the block is accepted, try to unsuspend its children blocks if any.
189            let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190            to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192            // Verify block timestamps
193            let blocks_to_accept =
194                self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195            accepted_blocks.extend(blocks_to_accept);
196        }
197
198        self.update_stats(missing_blocks.len() as u64);
199
200        // Figure out the new missing blocks
201        (accepted_blocks, missing_blocks)
202    }
203
204    fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205        if self.dag_state.read().contains_block(&block.reference()) {
206            return TryAcceptResult::Processed;
207        }
208
209        // Remove the block from missing and suspended blocks
210        self.missing_blocks.remove(&block.reference());
211
212        // If the block has been already fetched and parked as suspended block, then
213        // remove it. Also find all the references of missing ancestors to
214        // remove those as well. If we don't do that then it's possible once the missing
215        // ancestor is fetched to cause a panic when trying to unsuspend this
216        // children as it won't be found in the suspended blocks map.
217        if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218            suspended_block
219                .missing_ancestors
220                .iter()
221                .for_each(|ancestor| {
222                    if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223                        references.remove(&block.reference());
224                    }
225                });
226        }
227
228        // Accept this block before any unsuspended children blocks
229        self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231        TryAcceptResult::Accepted(block)
232    }
233
234    /// Tries to find the provided block_refs in DagState and BlockManager,
235    /// and returns missing block refs.
236    pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237        let _s = monitored_scope("BlockManager::try_find_blocks");
238        let gc_round = self.dag_state.read().gc_round();
239
240        // No need to fetch blocks that are <= gc_round as they won't get processed
241        // anyways and they'll get skipped. So keep only the ones above.
242        let mut block_refs = block_refs
243            .into_iter()
244            .filter(|block_ref| block_ref.round > gc_round)
245            .collect::<Vec<_>>();
246
247        if block_refs.is_empty() {
248            return BTreeSet::new();
249        }
250
251        block_refs.sort_by_key(|b| b.round);
252
253        debug!(
254            "Trying to find blocks: {}",
255            block_refs.iter().map(|b| b.to_string()).join(",")
256        );
257
258        let mut missing_blocks = BTreeSet::new();
259
260        for (found, block_ref) in self
261            .dag_state
262            .read()
263            .contains_blocks(block_refs.clone())
264            .into_iter()
265            .zip(block_refs.iter())
266        {
267            if found || self.suspended_blocks.contains_key(block_ref) {
268                continue;
269            }
270            // Fetches the block if it is not in dag state or suspended.
271            missing_blocks.insert(*block_ref);
272            if self
273                .missing_blocks
274                .insert(*block_ref, BTreeSet::from([block_ref.author]))
275                .is_none()
276            {
277                // We want to report this as a missing ancestor even if there is no block that
278                // is actually references it right now. That will allow us
279                // to seamlessly GC the block later if needed.
280                self.missing_ancestors.entry(*block_ref).or_default();
281
282                let block_ref_hostname =
283                    &self.context.committee.authority(block_ref.author).hostname;
284                self.context
285                    .metrics
286                    .node_metrics
287                    .block_manager_missing_blocks_by_authority
288                    .with_label_values(&[block_ref_hostname])
289                    .inc();
290            }
291        }
292
293        let metrics = &self.context.metrics.node_metrics;
294        metrics
295            .missing_blocks_total
296            .inc_by(missing_blocks.len() as u64);
297        metrics
298            .block_manager_missing_blocks
299            .set(self.missing_blocks.len() as i64);
300
301        missing_blocks
302    }
303
304    // TODO: remove once timestamping is refactored to the new approach.
305    // Verifies each block's timestamp based on its ancestors, and persists in store
306    // all the valid blocks that should be accepted. Method returns the accepted
307    // and persisted blocks.
308    fn verify_block_timestamps_and_accept(
309        &mut self,
310        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
311    ) -> Vec<VerifiedBlock> {
312        let (gc_enabled, gc_round) = {
313            let dag_state = self.dag_state.read();
314            (dag_state.gc_enabled(), dag_state.gc_round())
315        };
316        // Try to verify the block and its children for timestamp, with ancestor blocks.
317        let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
318        let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
319        {
320            'block: for b in unsuspended_blocks {
321                let ancestors = self.dag_state.read().get_blocks(b.ancestors());
322                assert_eq!(b.ancestors().len(), ancestors.len());
323                let mut ancestor_blocks = vec![];
324                'ancestor: for (ancestor_ref, found) in
325                    b.ancestors().iter().zip(ancestors.into_iter())
326                {
327                    if let Some(found_block) = found {
328                        // This invariant should be guaranteed by DagState.
329                        assert_eq!(ancestor_ref, &found_block.reference());
330                        ancestor_blocks.push(Some(found_block));
331                        continue 'ancestor;
332                    }
333                    // blocks_to_accept have not been added to DagState yet, but they
334                    // can appear in ancestors.
335                    if blocks_to_accept.contains_key(ancestor_ref) {
336                        ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
337                        continue 'ancestor;
338                    }
339                    // If an ancestor is already rejected, reject this block as well.
340                    if blocks_to_reject.contains_key(ancestor_ref) {
341                        blocks_to_reject.insert(b.reference(), b);
342                        continue 'block;
343                    }
344
345                    // When gc is enabled it's possible that we indeed won't find any ancestors that
346                    // are passed gc_round. That's ok. We don't need to panic here.
347                    // We do want to panic if gc_enabled we and have an ancestor that is > gc_round,
348                    // or gc is disabled.
349                    if gc_enabled
350                        && ancestor_ref.round > GENESIS_ROUND
351                        && ancestor_ref.round <= gc_round
352                    {
353                        debug!(
354                            "Block {:?} has a missing ancestor: {:?} passed GC round {}",
355                            b.reference(),
356                            ancestor_ref,
357                            gc_round
358                        );
359                        ancestor_blocks.push(None);
360                    } else {
361                        panic!(
362                            "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
363                        );
364                    }
365                }
366                if let Err(e) =
367                    self.block_verifier
368                        .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
369                {
370                    warn!("Block {:?} failed to verify ancestors: {}", b, e);
371                    blocks_to_reject.insert(b.reference(), b);
372                } else {
373                    blocks_to_accept.insert(b.reference(), b);
374                }
375            }
376        }
377
378        // TODO: report blocks_to_reject to peers.
379        for (block_ref, block) in blocks_to_reject {
380            let hostname = self
381                .context
382                .committee
383                .authority(block_ref.author)
384                .hostname
385                .clone();
386
387            self.context
388                .metrics
389                .node_metrics
390                .invalid_blocks
391                .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
392                .inc();
393            warn!("Invalid block {:?} is rejected", block);
394        }
395
396        let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
397
398        // Insert the accepted blocks into DAG state so future blocks including them as
399        // ancestors do not get suspended.
400        self.dag_state
401            .write()
402            .accept_blocks(blocks_to_accept.clone());
403
404        blocks_to_accept
405    }
406
407    /// Tries to accept the provided block. To accept a block its ancestors must
408    /// have been already successfully accepted. If block is accepted then
409    /// Some result is returned. None is returned when either the block is
410    /// suspended or the block has been already accepted before.
411    fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
412        let block_ref = block.reference();
413        let mut missing_ancestors = BTreeSet::new();
414        let mut ancestors_to_fetch = BTreeSet::new();
415        let dag_state = self.dag_state.read();
416        let gc_round = dag_state.gc_round();
417        let gc_enabled = dag_state.gc_enabled();
418
419        // If block has been already received and suspended, or already processed and
420        // stored, or is a genesis block, then skip it.
421        if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
422            return TryAcceptResult::Processed;
423        }
424
425        // If the block is <= gc_round, then we simply skip its processing as there is
426        // no meaning do any action on it or even store it.
427        if gc_enabled && block.round() <= gc_round {
428            let hostname = self
429                .context
430                .committee
431                .authority(block.author())
432                .hostname
433                .as_str();
434            self.context
435                .metrics
436                .node_metrics
437                .block_manager_skipped_blocks
438                .with_label_values(&[hostname])
439                .inc();
440            return TryAcceptResult::Skipped;
441        }
442
443        // Keep only the ancestors that are greater than the GC round to check for their
444        // existence. Keep in mind that if GC is disabled then gc_round will be
445        // 0 and all ancestors will be considered.
446        let ancestors = if gc_enabled {
447            block
448                .ancestors()
449                .iter()
450                .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
451                .cloned()
452                .collect::<Vec<_>>()
453        } else {
454            block.ancestors().to_vec()
455        };
456
457        // make sure that we have all the required ancestors in store
458        for (found, ancestor) in dag_state
459            .contains_blocks(ancestors.clone())
460            .into_iter()
461            .zip(ancestors.iter())
462        {
463            if !found {
464                missing_ancestors.insert(*ancestor);
465
466                // mark the block as having missing ancestors
467                self.missing_ancestors
468                    .entry(*ancestor)
469                    .or_default()
470                    .insert(block_ref);
471
472                let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
473                self.context
474                    .metrics
475                    .node_metrics
476                    .block_manager_missing_ancestors_by_authority
477                    .with_label_values(&[ancestor_hostname])
478                    .inc();
479
480                // Add the ancestor to the missing blocks set only if it doesn't already exist
481                // in the suspended blocks - meaning that we already have its
482                // payload.
483                if !self.suspended_blocks.contains_key(ancestor) {
484                    // Fetches the block if it is not in dag state or suspended.
485                    ancestors_to_fetch.insert(*ancestor);
486                    // We also want to keep track of the authorities that have this block.
487                    // This block could be already missing, so we just update the set  of
488                    // authorities who have it.
489                    let entry = self.missing_blocks.entry(*ancestor);
490                    match entry {
491                        Entry::Vacant(v) => {
492                            v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
493                            self.context
494                                .metrics
495                                .node_metrics
496                                .block_manager_missing_blocks_by_authority
497                                .with_label_values(&[ancestor_hostname])
498                                .inc();
499                        }
500                        Entry::Occupied(mut o) => {
501                            o.get_mut().insert(block_ref.author);
502                        }
503                    }
504                }
505            }
506        }
507
508        // Remove the block ref from the `missing_blocks` - if exists - since we now
509        // have received the block. The block might still get suspended, but we
510        // won't report it as missing in order to not re-fetch.
511        self.missing_blocks.remove(&block.reference());
512
513        if !missing_ancestors.is_empty() {
514            let hostname = self
515                .context
516                .committee
517                .authority(block.author())
518                .hostname
519                .as_str();
520            self.context
521                .metrics
522                .node_metrics
523                .block_suspensions
524                .with_label_values(&[hostname])
525                .inc();
526            self.suspended_blocks
527                .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
528            return TryAcceptResult::Suspended(ancestors_to_fetch);
529        }
530
531        TryAcceptResult::Accepted(block)
532    }
533
534    /// Given an accepted block `accepted_block` it attempts to accept all the
535    /// suspended children blocks assuming such exist. All the unsuspended /
536    /// accepted blocks are returned as a vector in causal order.
537    fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
538        let mut unsuspended_blocks = vec![];
539        let mut to_process_blocks = vec![accepted_block];
540
541        while let Some(block_ref) = to_process_blocks.pop() {
542            // And try to check if its direct children can be unsuspended
543            if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
544                for r in block_refs_with_missing_deps {
545                    // For each dependency try to unsuspend it. If that's successful then we add it
546                    // to the queue so we can recursively try to unsuspend its
547                    // children.
548                    if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
549                        to_process_blocks.push(block.block.reference());
550                        unsuspended_blocks.push(block);
551                    }
552                }
553            }
554        }
555
556        let now = Instant::now();
557
558        // Report the unsuspended blocks
559        for block in &unsuspended_blocks {
560            let hostname = self
561                .context
562                .committee
563                .authority(block.block.author())
564                .hostname
565                .as_str();
566            self.context
567                .metrics
568                .node_metrics
569                .block_unsuspensions
570                .with_label_values(&[hostname])
571                .inc();
572            self.context
573                .metrics
574                .node_metrics
575                .suspended_block_time
576                .with_label_values(&[hostname])
577                .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
578        }
579
580        unsuspended_blocks
581            .into_iter()
582            .map(|block| block.block)
583            .collect()
584    }
585
586    /// Attempts to unsuspend a block by checking its ancestors and removing the
587    /// `accepted_dependency` by its local set. If there is no missing
588    /// dependency then this block can be unsuspended immediately and is removed
589    /// from the `suspended_blocks` map.
590    fn try_unsuspend_block(
591        &mut self,
592        block_ref: &BlockRef,
593        accepted_dependency: &BlockRef,
594    ) -> Option<SuspendedBlock> {
595        let block = self
596            .suspended_blocks
597            .get_mut(block_ref)
598            .expect("Block should be in suspended map");
599
600        assert!(
601            block.missing_ancestors.remove(accepted_dependency),
602            "Block reference {} should be present in missing dependencies of {:?}",
603            block_ref,
604            block.block
605        );
606
607        if block.missing_ancestors.is_empty() {
608            // we have no missing dependency, so we unsuspend the block and return it
609            return self.suspended_blocks.remove(block_ref);
610        }
611        None
612    }
613
614    /// Tries to unsuspend any blocks for the latest gc round. If gc round
615    /// hasn't changed then no blocks will be unsuspended due to
616    /// this action.
617    #[instrument(level = "trace", skip_all)]
618    pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
619        let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
620        let (gc_enabled, gc_round) = {
621            let dag_state = self.dag_state.read();
622            (dag_state.gc_enabled(), dag_state.gc_round())
623        };
624        let mut blocks_unsuspended_below_gc_round = 0;
625        let mut blocks_gc_ed = 0;
626
627        if !gc_enabled {
628            trace!("GC is disabled, no blocks will attempt to get unsuspended.");
629            return;
630        }
631
632        while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
633            // If the first block in the missing ancestors is higher than the gc_round, then
634            // we can't unsuspend it yet. So we just put it back
635            // and we terminate the iteration as any next entry will be of equal or higher
636            // round anyways.
637            if block_ref.round > gc_round {
638                return;
639            }
640
641            blocks_gc_ed += 1;
642
643            let hostname = self
644                .context
645                .committee
646                .authority(block_ref.author)
647                .hostname
648                .as_str();
649            self.context
650                .metrics
651                .node_metrics
652                .block_manager_gced_blocks
653                .with_label_values(&[hostname])
654                .inc();
655
656            assert!(
657                !self.suspended_blocks.contains_key(block_ref),
658                "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
659            );
660
661            // Also remove it from the missing list - we don't want to keep looking for it.
662            self.missing_blocks.remove(block_ref);
663
664            // Find all the children blocks that have a dependency on this one and try to
665            // unsuspend them
666            let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
667
668            unsuspended_blocks.iter().for_each(|block| {
669                if block.round() <= gc_round {
670                    blocks_unsuspended_below_gc_round += 1;
671                }
672            });
673
674            // Now validate their timestamps and accept them
675            let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
676            for block in accepted_blocks {
677                let hostname = self
678                    .context
679                    .committee
680                    .authority(block.author())
681                    .hostname
682                    .as_str();
683                self.context
684                    .metrics
685                    .node_metrics
686                    .block_manager_gc_unsuspended_blocks
687                    .with_label_values(&[hostname])
688                    .inc();
689            }
690        }
691
692        debug!(
693            "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
694            blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
695        );
696    }
697
698    /// Returns all the blocks that are currently missing and needed in order to
699    /// accept suspended blocks. For each block reference it returns the set of
700    /// authorities who have this block.
701    pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
702        self.missing_blocks.clone()
703    }
704
705    /// Returns all the block refs that are currently missing.
706    #[cfg(test)]
707    pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
708        self.missing_blocks.keys().cloned().collect()
709    }
710
711    fn update_stats(&mut self, missing_blocks: u64) {
712        let metrics = &self.context.metrics.node_metrics;
713        metrics.missing_blocks_total.inc_by(missing_blocks);
714        metrics
715            .block_manager_suspended_blocks
716            .set(self.suspended_blocks.len() as i64);
717        metrics
718            .block_manager_missing_ancestors
719            .set(self.missing_ancestors.len() as i64);
720        metrics
721            .block_manager_missing_blocks
722            .set(self.missing_blocks.len() as i64);
723    }
724
725    fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
726        let (min_round, max_round) =
727            if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
728                (curr_min.min(block.round()), curr_max.max(block.round()))
729            } else {
730                (block.round(), block.round())
731            };
732        self.received_block_rounds[block.author()] = Some((min_round, max_round));
733
734        let hostname = &self.context.committee.authority(block.author()).hostname;
735        self.context
736            .metrics
737            .node_metrics
738            .lowest_verified_authority_round
739            .with_label_values(&[hostname])
740            .set(min_round.into());
741        self.context
742            .metrics
743            .node_metrics
744            .highest_verified_authority_round
745            .with_label_values(&[hostname])
746            .set(max_round.into());
747    }
748
749    /// Checks if block manager is empty.
750    #[cfg(test)]
751    pub(crate) fn is_empty(&self) -> bool {
752        self.suspended_blocks.is_empty()
753            && self.missing_ancestors.is_empty()
754            && self.missing_blocks.is_empty()
755    }
756
757    /// Returns all the suspended blocks whose causal history we miss hence we
758    /// can't accept them yet.
759    #[cfg(test)]
760    fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
761        self.suspended_blocks.keys().cloned().collect()
762    }
763}
764
765// Result of trying to accept one block.
766enum TryAcceptResult {
767    // The block is accepted. Wraps the block itself.
768    Accepted(VerifiedBlock),
769    // The block is suspended. Wraps ancestors to be fetched.
770    Suspended(BTreeSet<BlockRef>),
771    // The block has been processed before and already exists in BlockManager (and is suspended)
772    // or in DagState (so has been already accepted). No further processing has been done at
773    // this point.
774    Processed,
775    // When a received block is <= gc_round, then we simply skip its processing as there is no
776    // meaning do any action on it or even store it.
777    Skipped,
778}
779
780#[cfg(test)]
781mod tests {
782    use std::{collections::BTreeSet, sync::Arc};
783
784    use consensus_config::AuthorityIndex;
785    use parking_lot::RwLock;
786    use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
787    use rstest::rstest;
788
789    use crate::{
790        CommitDigest, Round,
791        block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
792        block_manager::BlockManager,
793        block_verifier::{BlockVerifier, NoopBlockVerifier},
794        commit::TrustedCommit,
795        context::Context,
796        dag_state::DagState,
797        error::{ConsensusError, ConsensusResult},
798        storage::mem_store::MemStore,
799        test_dag_builder::DagBuilder,
800        test_dag_parser::parse_dag,
801    };
802
803    #[tokio::test]
804    async fn suspend_blocks_with_missing_ancestors() {
805        // GIVEN
806        let (context, _key_pairs) = Context::new_for_test(4);
807        let context = Arc::new(context);
808        let store = Arc::new(MemStore::new());
809        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
810
811        let mut block_manager =
812            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
813
814        // create a DAG
815        let mut dag_builder = DagBuilder::new(context.clone());
816        dag_builder
817            .layers(1..=2) // 2 rounds
818            .authorities(vec![
819                AuthorityIndex::new_for_test(0),
820                AuthorityIndex::new_for_test(2),
821            ]) // Create equivocating blocks for 2 authorities
822            .equivocate(3)
823            .build();
824
825        // Take only the blocks of round 2 and try to accept them
826        let round_2_blocks = dag_builder
827            .blocks
828            .into_iter()
829            .filter_map(|(_, block)| (block.round() == 2).then_some(block))
830            .collect::<Vec<VerifiedBlock>>();
831
832        // WHEN
833        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
834
835        // THEN
836        assert!(accepted_blocks.is_empty());
837
838        // AND the returned missing ancestors should be the same as the provided block
839        // ancestors
840        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
841        let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
842        assert_eq!(missing, missing_block_refs);
843
844        // AND the missing blocks are the parents of the round 2 blocks. Since this is a
845        // fully connected DAG taking the ancestors of the first element
846        // suffices.
847        assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
848
849        // AND suspended blocks should return the round_2_blocks
850        assert_eq!(
851            block_manager.suspended_blocks_refs(),
852            round_2_blocks
853                .into_iter()
854                .map(|block| block.reference())
855                .collect::<BTreeSet<_>>()
856        );
857
858        // AND each missing block should be known to all authorities
859        let known_by_manager = block_manager
860            .missing_blocks()
861            .iter()
862            .next()
863            .expect("We should expect at least two elements there")
864            .1
865            .clone();
866        assert_eq!(
867            known_by_manager,
868            context
869                .committee
870                .authorities()
871                .map(|(a, _)| a)
872                .collect::<BTreeSet<_>>()
873        );
874    }
875
876    #[tokio::test]
877    async fn try_accept_block_returns_missing_blocks() {
878        let (context, _key_pairs) = Context::new_for_test(4);
879        let context = Arc::new(context);
880        let store = Arc::new(MemStore::new());
881        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
882
883        let mut block_manager =
884            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
885
886        // create a DAG
887        let mut dag_builder = DagBuilder::new(context.clone());
888        dag_builder
889            .layers(1..=4) // 4 rounds
890            .authorities(vec![
891                AuthorityIndex::new_for_test(0),
892                AuthorityIndex::new_for_test(2),
893            ]) // Create equivocating blocks for 2 authorities
894            .equivocate(3) // Use 3 equivocations blocks per authority
895            .build();
896
897        // Take the blocks from round 4 up to 2 (included). Only the first block of each
898        // round should return missing ancestors when try to accept
899        for (_, block) in dag_builder
900            .blocks
901            .into_iter()
902            .rev()
903            .take_while(|(_, block)| block.round() >= 2)
904        {
905            // WHEN
906            let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
907
908            // THEN
909            assert!(accepted_blocks.is_empty());
910
911            let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
912            assert_eq!(missing, block_ancestors);
913        }
914    }
915
916    #[tokio::test]
917    async fn accept_blocks_with_complete_causal_history() {
918        // GIVEN
919        let (context, _key_pairs) = Context::new_for_test(4);
920        let context = Arc::new(context);
921        let store = Arc::new(MemStore::new());
922        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
923
924        let mut block_manager =
925            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
926
927        // create a DAG of 2 rounds
928        let mut dag_builder = DagBuilder::new(context.clone());
929        dag_builder.layers(1..=2).build();
930
931        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
932
933        // WHEN
934        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
935
936        // THEN
937        assert_eq!(accepted_blocks.len(), 8);
938        assert_eq!(
939            accepted_blocks,
940            all_blocks
941                .iter()
942                .filter(|block| block.round() > 0)
943                .cloned()
944                .collect::<Vec<VerifiedBlock>>()
945        );
946        assert!(missing.is_empty());
947        assert!(block_manager.is_empty());
948
949        // WHEN trying to accept same blocks again, then none will be returned as those
950        // have been already accepted
951        let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
952        assert!(accepted_blocks.is_empty());
953    }
954
955    /// Tests that the block manager accepts blocks when some or all of their
956    /// causal history is below or equal to the GC round.
957    #[tokio::test]
958    async fn accept_blocks_with_causal_history_below_gc_round() {
959        // GIVEN
960        let (mut context, _key_pairs) = Context::new_for_test(4);
961
962        // We set the gc depth to 4
963        context
964            .protocol_config
965            .set_consensus_gc_depth_for_testing(4);
966        let context = Arc::new(context);
967        let store = Arc::new(MemStore::new());
968        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
969
970        // We "fake" the commit for round 10, so we can test the GC round 6
971        // (commit_round - gc_depth = 10 - 4 = 6)
972        let last_commit = TrustedCommit::new_for_test(
973            10,
974            CommitDigest::MIN,
975            context.clock.timestamp_utc_ms(),
976            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
977            vec![],
978        );
979        dag_state.write().set_last_commit(last_commit);
980        assert_eq!(
981            dag_state.read().gc_round(),
982            6,
983            "GC round should have moved to round 6"
984        );
985
986        let mut block_manager =
987            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
988
989        // create a DAG of 10 rounds with some weak links for the blocks of round 9
990        let dag_str = "DAG {
991            Round 0 : { 4 },
992            Round 1 : { * },
993            Round 2 : { * },
994            Round 3 : { * },
995            Round 4 : { * },
996            Round 5 : { * },
997            Round 6 : { * },
998            Round 7 : {
999                A -> [*],
1000                B -> [*],
1001                C -> [*],
1002            }
1003            Round 8 : {
1004                A -> [*],
1005                B -> [*],
1006                C -> [*],
1007            },
1008            Round 9 : {
1009                A -> [A8, B8, C8, D6],
1010                B -> [A8, B8, C8, D6],
1011                C -> [A8, B8, C8, D6],
1012                D -> [A8, B8, C8, D6],
1013            },
1014            Round 10 : { * },
1015        }";
1016
1017        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1018
1019        // Now take all the blocks for round 7 & 8 , which are above the gc_round = 6.
1020        // All those blocks should eventually be returned as accepted. Pay attention
1021        // that without GC none of those blocks should get accepted.
1022        let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1023
1024        for rounds_range in blocks_ranges {
1025            let all_blocks = dag_builder
1026                .blocks
1027                .values()
1028                .filter(|block| rounds_range.contains(&block.round()))
1029                .cloned()
1030                .collect::<Vec<_>>();
1031
1032            // WHEN
1033            let mut reversed_blocks = all_blocks.clone();
1034            reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1035            let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1036            accepted_blocks.sort_by_key(|a| a.reference());
1037
1038            // THEN
1039            assert_eq!(accepted_blocks, all_blocks.to_vec());
1040            assert!(missing.is_empty());
1041            assert!(block_manager.is_empty());
1042
1043            let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1044            assert!(accepted_blocks.is_empty());
1045        }
1046    }
1047
1048    /// Blocks that are attempted to be accepted but are <= gc_round they will
1049    /// be skipped for processing. Nothing should be stored or trigger any
1050    /// unsuspension etc.
1051    #[tokio::test]
1052    async fn skip_accepting_blocks_below_gc_round() {
1053        // GIVEN
1054        let (mut context, _key_pairs) = Context::new_for_test(4);
1055        // We set the gc depth to 4
1056        context
1057            .protocol_config
1058            .set_consensus_gc_depth_for_testing(4);
1059        let context = Arc::new(context);
1060        let store = Arc::new(MemStore::new());
1061        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1062
1063        // We "fake" the commit for round 10, so we can test the GC round 6
1064        // (commit_round - gc_depth = 10 - 4 = 6)
1065        let last_commit = TrustedCommit::new_for_test(
1066            10,
1067            CommitDigest::MIN,
1068            context.clock.timestamp_utc_ms(),
1069            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1070            vec![],
1071        );
1072        dag_state.write().set_last_commit(last_commit);
1073        assert_eq!(
1074            dag_state.read().gc_round(),
1075            6,
1076            "GC round should have moved to round 6"
1077        );
1078
1079        let mut block_manager =
1080            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1081
1082        // create a DAG of 6 rounds
1083        let mut dag_builder = DagBuilder::new(context.clone());
1084        dag_builder.layers(1..=6).build();
1085
1086        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1087
1088        // WHEN
1089        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1090
1091        // THEN
1092        assert!(accepted_blocks.is_empty());
1093        assert!(missing.is_empty());
1094        assert!(block_manager.is_empty());
1095    }
1096
1097    /// The test generate blocks for a well connected DAG and feed them to block
1098    /// manager in random order. In the end all the blocks should be
1099    /// uniquely suspended and no missing blocks should exist. The test will run
1100    /// for both gc_enabled/disabled. When gc is enabeld we set a high
1101    /// gc_depth value so in practice gc_round will be 0, but we'll be able to
1102    /// test in the common case that this work exactly the same way as when
1103    /// gc is disabled.
1104    #[rstest]
1105    #[tokio::test]
1106    async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1107        // GIVEN
1108        let (mut context, _key_pairs) = Context::new_for_test(4);
1109
1110        if gc_enabled {
1111            context
1112                .protocol_config
1113                .set_consensus_gc_depth_for_testing(10);
1114        }
1115        let context = Arc::new(context);
1116
1117        // create a DAG of rounds 1 ~ 3
1118        let mut dag_builder = DagBuilder::new(context.clone());
1119        dag_builder.layers(1..=3).build();
1120
1121        let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1122
1123        // Now randomize the sequence of sending the blocks to block manager. In the end
1124        // all the blocks should be uniquely suspended and no missing blocks
1125        // should exist.
1126        for seed in 0..100u8 {
1127            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1128
1129            let store = Arc::new(MemStore::new());
1130            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1131
1132            let mut block_manager =
1133                BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1134
1135            // WHEN
1136            let mut all_accepted_blocks = vec![];
1137            for block in &all_blocks {
1138                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1139
1140                all_accepted_blocks.extend(accepted_blocks);
1141            }
1142
1143            // THEN
1144            all_accepted_blocks.sort_by_key(|b| b.reference());
1145            all_blocks.sort_by_key(|b| b.reference());
1146
1147            assert_eq!(
1148                all_accepted_blocks, all_blocks,
1149                "Failed acceptance sequence for seed {seed}"
1150            );
1151            assert!(block_manager.is_empty());
1152        }
1153    }
1154
1155    /// Tests that `missing_blocks()` correctly infers the authorities
1156    /// referencing each missing block based on accepted blocks in the DAG.
1157    #[tokio::test]
1158    async fn authorities_that_know_missing_blocks() {
1159        let (context, _key_pairs) = Context::new_for_test(4);
1160
1161        let context = Arc::new(context);
1162
1163        // create a DAG of rounds 1 ~ 3
1164        let mut dag_builder = DagBuilder::new(context.clone());
1165        dag_builder.layers(1..=3).build();
1166
1167        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1168
1169        let blocks_round_2 = all_blocks
1170            .iter()
1171            .filter(|block| block.round() == 2)
1172            .cloned()
1173            .collect::<Vec<_>>();
1174
1175        let blocks_round_1 = all_blocks
1176            .iter()
1177            .filter(|block| block.round() == 1)
1178            .map(|block| block.reference())
1179            .collect::<BTreeSet<_>>();
1180
1181        let store = Arc::new(MemStore::new());
1182        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1183
1184        let mut block_manager =
1185            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1186
1187        let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1188        // Blocks from round 1 are all missing, since the DAG is fully connected
1189        assert_eq!(missing_blocks, blocks_round_1);
1190
1191        let missing_blocks_with_authorities = block_manager.missing_blocks();
1192
1193        let block_round_1_authority_0 = all_blocks
1194            .iter()
1195            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1196            .map(|block| block.reference())
1197            .next()
1198            .unwrap();
1199        let block_round_1_authority_1 = all_blocks
1200            .iter()
1201            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1202            .map(|block| block.reference())
1203            .next()
1204            .unwrap();
1205        assert_eq!(
1206            missing_blocks_with_authorities[&block_round_1_authority_0],
1207            BTreeSet::from([AuthorityIndex::new_for_test(0)])
1208        );
1209        assert_eq!(
1210            missing_blocks_with_authorities[&block_round_1_authority_1],
1211            BTreeSet::from([
1212                AuthorityIndex::new_for_test(0),
1213                AuthorityIndex::new_for_test(1)
1214            ])
1215        );
1216
1217        // Add a new block from round 2 from authority 1, which updates the set of
1218        // authorities that are aware of the missing blocks
1219        block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1220        let missing_blocks_with_authorities = block_manager.missing_blocks();
1221        assert_eq!(
1222            missing_blocks_with_authorities[&block_round_1_authority_0],
1223            BTreeSet::from([
1224                AuthorityIndex::new_for_test(0),
1225                AuthorityIndex::new_for_test(1)
1226            ])
1227        );
1228    }
1229
1230    #[rstest]
1231    #[tokio::test]
1232    async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1233        telemetry_subscribers::init_for_testing();
1234        // GIVEN
1235        let (mut context, _key_pairs) = Context::new_for_test(4);
1236
1237        if gc_depth > 0 {
1238            context
1239                .protocol_config
1240                .set_consensus_gc_depth_for_testing(gc_depth);
1241        }
1242        let context = Arc::new(context);
1243
1244        // create a DAG of rounds 1 ~ gc_depth * 2
1245        let mut dag_builder = DagBuilder::new(context.clone());
1246        dag_builder.layers(1..=gc_depth * 2).build();
1247
1248        // Pay attention that we start from round 2. Round 1 will always be missing so
1249        // no matter what we do we can't unsuspend it unless gc_round has
1250        // advanced to round >= 1.
1251        let mut all_blocks = dag_builder
1252            .blocks
1253            .values()
1254            .filter(|block| block.round() > 1)
1255            .cloned()
1256            .collect::<Vec<_>>();
1257
1258        // Now randomize the sequence of sending the blocks to block manager. In the end
1259        // all the blocks should be uniquely suspended and no missing blocks
1260        // should exist.
1261        for seed in 0..100u8 {
1262            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1263
1264            let store = Arc::new(MemStore::new());
1265            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1266
1267            let mut block_manager = BlockManager::new(
1268                context.clone(),
1269                dag_state.clone(),
1270                Arc::new(NoopBlockVerifier),
1271            );
1272
1273            // WHEN
1274            for block in &all_blocks {
1275                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1276                assert!(accepted_blocks.is_empty());
1277            }
1278            assert!(!block_manager.is_empty());
1279
1280            // AND also call the try_to_find method with some non existing block refs. Those
1281            // should be cleaned up as well once GC kicks in.
1282            let non_existing_refs = (1..=3)
1283                .map(|round| {
1284                    BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1285                })
1286                .collect::<Vec<_>>();
1287            assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1288
1289            // AND
1290            // Trigger a commit which will advance GC round
1291            let last_commit = TrustedCommit::new_for_test(
1292                gc_depth * 2,
1293                CommitDigest::MIN,
1294                context.clock.timestamp_utc_ms(),
1295                BlockRef::new(
1296                    gc_depth * 2,
1297                    AuthorityIndex::new_for_test(0),
1298                    BlockDigest::MIN,
1299                ),
1300                vec![],
1301            );
1302            dag_state.write().set_last_commit(last_commit);
1303
1304            // AND
1305            block_manager.try_unsuspend_blocks_for_latest_gc_round();
1306
1307            // THEN
1308            assert!(block_manager.is_empty());
1309
1310            // AND ensure that all have been accepted to the DAG
1311            for block in &all_blocks {
1312                assert!(dag_state.read().contains_block(&block.reference()));
1313            }
1314        }
1315    }
1316
1317    #[rstest]
1318    #[tokio::test]
1319    async fn try_accept_committed_blocks() {
1320        // GIVEN
1321        let (mut context, _key_pairs) = Context::new_for_test(4);
1322        // We set the gc depth to 4
1323        context
1324            .protocol_config
1325            .set_consensus_gc_depth_for_testing(4);
1326        let context = Arc::new(context);
1327        let store = Arc::new(MemStore::new());
1328        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1329
1330        // We "fake" the commit for round 6, so GC round moves to (commit_round -
1331        // gc_depth = 6 - 4 = 2)
1332        let last_commit = TrustedCommit::new_for_test(
1333            10,
1334            CommitDigest::MIN,
1335            context.clock.timestamp_utc_ms(),
1336            BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1337            vec![],
1338        );
1339        dag_state.write().set_last_commit(last_commit);
1340        assert_eq!(
1341            dag_state.read().gc_round(),
1342            2,
1343            "GC round should have moved to round 2"
1344        );
1345
1346        let mut block_manager =
1347            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1348
1349        // create a DAG of 12 rounds
1350        let mut dag_builder = DagBuilder::new(context.clone());
1351        dag_builder.layers(1..=12).build();
1352
1353        // Now try to accept via the normal acceptance block path the blocks of rounds 7
1354        // ~ 12. None of them should be accepted
1355        let blocks = dag_builder.blocks(7..=12);
1356        let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1357        assert!(accepted_blocks.is_empty());
1358        assert_eq!(missing.len(), 4);
1359
1360        // Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6.
1361        // All of them should be accepted and also the blocks of rounds 7 ~ 12
1362        // should be unsuspended and accepted as well.
1363        let blocks = dag_builder.blocks(3..=6);
1364
1365        // WHEN
1366        let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1367
1368        // THEN
1369        accepted_blocks.sort_by_key(|b| b.reference());
1370
1371        let mut all_blocks = dag_builder.blocks(3..=12);
1372        all_blocks.sort_by_key(|b| b.reference());
1373
1374        assert_eq!(accepted_blocks, all_blocks);
1375        assert!(block_manager.is_empty());
1376    }
1377
1378    struct TestBlockVerifier {
1379        fail: BTreeSet<BlockRef>,
1380    }
1381
1382    impl TestBlockVerifier {
1383        fn new(fail: BTreeSet<BlockRef>) -> Self {
1384            Self { fail }
1385        }
1386    }
1387
1388    impl BlockVerifier for TestBlockVerifier {
1389        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1390            Ok(())
1391        }
1392
1393        fn check_ancestors(
1394            &self,
1395            block: &VerifiedBlock,
1396            _ancestors: &[Option<VerifiedBlock>],
1397            _gc_enabled: bool,
1398            _gc_round: Round,
1399        ) -> ConsensusResult<()> {
1400            if self.fail.contains(&block.reference()) {
1401                Err(ConsensusError::InvalidBlockTimestamp {
1402                    max_timestamp_ms: 0,
1403                    block_timestamp_ms: block.timestamp_ms(),
1404                })
1405            } else {
1406                Ok(())
1407            }
1408        }
1409    }
1410
1411    #[tokio::test]
1412    async fn reject_blocks_failing_verifications() {
1413        let (context, _key_pairs) = Context::new_for_test(4);
1414        let context = Arc::new(context);
1415
1416        // create a DAG of rounds 1 ~ 5.
1417        let mut dag_builder = DagBuilder::new(context.clone());
1418        dag_builder.layers(1..=5).build();
1419
1420        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1421
1422        // Create a test verifier that fails the blocks of round 3
1423        let test_verifier = TestBlockVerifier::new(
1424            all_blocks
1425                .iter()
1426                .filter(|block| block.round() == 3)
1427                .map(|block| block.reference())
1428                .collect(),
1429        );
1430
1431        // Create BlockManager.
1432        let store = Arc::new(MemStore::new());
1433        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1434        let mut block_manager =
1435            BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1436
1437        // Try to accept blocks from round 2 ~ 5 into block manager. All of them should
1438        // be suspended.
1439        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1440            all_blocks
1441                .iter()
1442                .filter(|block| block.round() > 1)
1443                .cloned()
1444                .collect(),
1445        );
1446
1447        // Missing refs should all come from round 1.
1448        assert!(accepted_blocks.is_empty());
1449        assert_eq!(missing_refs.len(), 4);
1450        missing_refs.iter().for_each(|missing_ref| {
1451            assert_eq!(missing_ref.round, 1);
1452        });
1453
1454        // Now add round 1 blocks into block manager.
1455        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1456            all_blocks
1457                .iter()
1458                .filter(|block| block.round() == 1)
1459                .cloned()
1460                .collect(),
1461        );
1462
1463        // Only round 1 and round 2 blocks should be accepted.
1464        assert_eq!(accepted_blocks.len(), 8);
1465        accepted_blocks.iter().for_each(|block| {
1466            assert!(block.round() <= 2);
1467        });
1468        assert!(missing_refs.is_empty());
1469
1470        // Other blocks should be rejected and there should be no remaining suspended
1471        // block.
1472        assert!(block_manager.suspended_blocks_refs().is_empty());
1473    }
1474
1475    #[tokio::test]
1476    async fn try_find_blocks() {
1477        // GIVEN
1478        let (context, _key_pairs) = Context::new_for_test(4);
1479        let context = Arc::new(context);
1480        let store = Arc::new(MemStore::new());
1481        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1482
1483        let mut block_manager =
1484            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1485
1486        // create a DAG
1487        let mut dag_builder = DagBuilder::new(context.clone());
1488        dag_builder
1489            .layers(1..=2) // 2 rounds
1490            .authorities(vec![
1491                AuthorityIndex::new_for_test(0),
1492                AuthorityIndex::new_for_test(2),
1493            ]) // Create equivocating blocks for 2 authorities
1494            .equivocate(3)
1495            .build();
1496
1497        // Take only the blocks of round 2 and try to accept them
1498        let round_2_blocks = dag_builder
1499            .blocks
1500            .iter()
1501            .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1502            .collect::<Vec<VerifiedBlock>>();
1503
1504        // All blocks should be missing
1505        let missing_block_refs_from_find =
1506            block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1507        assert_eq!(missing_block_refs_from_find.len(), 10);
1508        assert!(
1509            missing_block_refs_from_find
1510                .iter()
1511                .all(|block_ref| block_ref.round == 2)
1512        );
1513
1514        // Try accept blocks which will cause blocks to be suspended and added to
1515        // missing in block manager.
1516        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1517        assert!(accepted_blocks.is_empty());
1518
1519        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1520        let missing_block_refs_from_accept =
1521            missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1522        assert_eq!(missing, missing_block_refs_from_accept);
1523        assert_eq!(
1524            block_manager.missing_block_refs(),
1525            missing_block_refs_from_accept
1526        );
1527
1528        // No blocks should be accepted and block manager should have made note
1529        // of the missing & suspended blocks.
1530        // Now we can check get the result of try find block with all of the blocks
1531        // from newly created but not accepted round 3.
1532        dag_builder.layer(3).build();
1533
1534        let round_3_blocks = dag_builder
1535            .blocks
1536            .iter()
1537            .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1538            .collect::<Vec<BlockRef>>();
1539
1540        let missing_block_refs_from_find = block_manager.try_find_blocks(
1541            round_2_blocks
1542                .iter()
1543                .map(|b| b.reference())
1544                .chain(round_3_blocks.into_iter())
1545                .collect(),
1546        );
1547
1548        assert_eq!(missing_block_refs_from_find.len(), 4);
1549        assert!(
1550            missing_block_refs_from_find
1551                .iter()
1552                .all(|block_ref| block_ref.round == 3)
1553        );
1554        assert_eq!(
1555            block_manager.missing_block_refs(),
1556            missing_block_refs_from_accept
1557                .into_iter()
1558                .chain(missing_block_refs_from_find.into_iter())
1559                .collect()
1560        );
1561    }
1562}