consensus_core/
block_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, btree_map::Entry},
7    sync::Arc,
8    time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, instrument, trace, warn};
16
17use crate::{
18    Round,
19    block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    context::Context,
22    dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27    block: VerifiedBlock,
28    missing_ancestors: BTreeSet<BlockRef>,
29    timestamp: Instant,
30}
31
32impl SuspendedBlock {
33    pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34        Self {
35            block,
36            missing_ancestors,
37            timestamp: Instant::now(),
38        }
39    }
40}
41
42/// Block manager suspends incoming blocks until they are connected to the
43/// existing graph, returning newly connected blocks.
44/// TODO: As it is possible to have Byzantine validators who produce Blocks
45/// without valid causal history we need to make sure that BlockManager takes
46/// care of that and avoid OOM (Out Of Memory) situations.
47pub(crate) struct BlockManager {
48    context: Arc<Context>,
49    dag_state: Arc<RwLock<DagState>>,
50    block_verifier: Arc<dyn BlockVerifier>,
51
52    /// Keeps all the suspended blocks. A suspended block is a block that is
53    /// missing part of its causal history and thus can't be immediately
54    /// processed. A block will remain in this map until all its causal history
55    /// has been successfully processed.
56    suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57    /// A map that keeps all the blocks that we are missing (keys) and the
58    /// corresponding blocks that reference the missing blocks as ancestors
59    /// and need them to get unsuspended. It is possible for a missing
60    /// dependency (key) to be a suspended block, so the block has been
61    /// already fetched but it self is still missing some of its ancestors to be
62    /// processed.
63    missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64    /// A map of currently missing blocks to the set of authorities expected
65    /// to have them available locally. This set is approximated based on the
66    /// block's author and the authors of its direct children.
67    /// A block is considered missing if it appears in `missing_ancestors`
68    /// and has not yet been accepted or fetched. Blocks already stored or
69    /// present in `suspended_blocks` are excluded.
70    missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71    /// A vector that holds a tuple of (lowest_round, highest_round) of received
72    /// blocks per authority. This is used for metrics reporting purposes
73    /// and resets during restarts.
74    received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78    pub(crate) fn new(
79        context: Arc<Context>,
80        dag_state: Arc<RwLock<DagState>>,
81        block_verifier: Arc<dyn BlockVerifier>,
82    ) -> Self {
83        let committee_size = context.committee.size();
84        Self {
85            context,
86            dag_state,
87            block_verifier,
88            suspended_blocks: BTreeMap::new(),
89            missing_ancestors: BTreeMap::new(),
90            missing_blocks: BTreeMap::new(),
91            received_block_rounds: vec![None; committee_size],
92        }
93    }
94
95    /// Tries to accept the provided blocks assuming that all their causal
96    /// history exists. The method returns all the blocks that have been
97    /// successfully processed in round ascending order, that includes also
98    /// previously suspended blocks that have now been able to get accepted.
99    /// Method also returns a set with the missing ancestor blocks.
100    #[tracing::instrument(skip_all)]
101    pub(crate) fn try_accept_blocks(
102        &mut self,
103        blocks: Vec<VerifiedBlock>,
104    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105        let _s = monitored_scope("BlockManager::try_accept_blocks");
106        self.try_accept_blocks_internal(blocks, false)
107    }
108
109    // Tries to accept blocks that have been committed. Returns all the blocks that
110    // have been accepted, both from the ones provided and any children blocks.
111    #[tracing::instrument(skip_all)]
112    pub(crate) fn try_accept_committed_blocks(
113        &mut self,
114        blocks: Vec<VerifiedBlock>,
115    ) -> Vec<VerifiedBlock> {
116        // If GC is disabled then should not run any of this logic.
117        if !self.dag_state.read().gc_enabled() {
118            return Vec::new();
119        }
120
121        // Just accept the blocks
122        let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123        let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124        assert!(
125            missing_blocks.is_empty(),
126            "No missing blocks should be returned for committed blocks"
127        );
128
129        accepted_blocks
130    }
131
132    /// Attempts to accept the provided blocks. When `committed = true` then the
133    /// blocks are considered to be committed via certified commits and
134    /// are handled differently.
135    fn try_accept_blocks_internal(
136        &mut self,
137        mut blocks: Vec<VerifiedBlock>,
138        committed: bool,
139    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140        let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142        blocks.sort_by_key(|b| b.round());
143        debug!(
144            "Trying to accept blocks: {}",
145            blocks.iter().map(|b| b.reference().to_string()).join(",")
146        );
147
148        let mut accepted_blocks = vec![];
149        let mut missing_blocks = BTreeSet::new();
150
151        for block in blocks {
152            self.update_block_received_metrics(&block);
153
154            // Try to accept the input block.
155            let block_ref = block.reference();
156
157            let mut to_verify_timestamps_and_accept = vec![];
158            if committed {
159                match self.try_accept_one_committed_block(block) {
160                    TryAcceptResult::Accepted(block) => {
161                        // As this is a committed block, then it's already accepted and there is no
162                        // need to verify its timestamps. Just add it to the
163                        // accepted blocks list.
164                        accepted_blocks.push(block);
165                    }
166                    TryAcceptResult::Processed => continue,
167                    TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168                        panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169                    }
170                };
171            } else {
172                match self.try_accept_one_block(block) {
173                    TryAcceptResult::Accepted(block) => {
174                        to_verify_timestamps_and_accept.push(block);
175                    }
176                    TryAcceptResult::Suspended(ancestors_to_fetch) => {
177                        debug!(
178                            "Missing ancestors to fetch for block {block_ref}: {}",
179                            ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180                        );
181                        missing_blocks.extend(ancestors_to_fetch);
182                        continue;
183                    }
184                    TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185                };
186            };
187
188            // If the block is accepted, try to unsuspend its children blocks if any.
189            let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190            to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192            // Verify block timestamps
193            let blocks_to_accept =
194                self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195            accepted_blocks.extend(blocks_to_accept);
196        }
197
198        self.update_stats(missing_blocks.len() as u64);
199
200        // Figure out the new missing blocks
201        (accepted_blocks, missing_blocks)
202    }
203
204    fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205        if self.dag_state.read().contains_block(&block.reference()) {
206            return TryAcceptResult::Processed;
207        }
208
209        // Remove the block from missing and suspended blocks
210        self.missing_blocks.remove(&block.reference());
211
212        // If the block has been already fetched and parked as suspended block, then
213        // remove it. Also find all the references of missing ancestors to
214        // remove those as well. If we don't do that then it's possible once the missing
215        // ancestor is fetched to cause a panic when trying to unsuspend this
216        // children as it won't be found in the suspended blocks map.
217        if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218            suspended_block
219                .missing_ancestors
220                .iter()
221                .for_each(|ancestor| {
222                    if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223                        references.remove(&block.reference());
224                    }
225                });
226        }
227
228        // Accept this block before any unsuspended children blocks
229        self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231        TryAcceptResult::Accepted(block)
232    }
233
234    /// Tries to find the provided block_refs in DagState and BlockManager,
235    /// and returns missing block refs.
236    pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237        let _s = monitored_scope("BlockManager::try_find_blocks");
238        let gc_round = self.dag_state.read().gc_round();
239
240        // No need to fetch blocks that are <= gc_round as they won't get processed
241        // anyways and they'll get skipped. So keep only the ones above.
242        let mut block_refs = block_refs
243            .into_iter()
244            .filter(|block_ref| block_ref.round > gc_round)
245            .collect::<Vec<_>>();
246
247        if block_refs.is_empty() {
248            return BTreeSet::new();
249        }
250
251        block_refs.sort_by_key(|b| b.round);
252
253        debug!(
254            "Trying to find blocks: {}",
255            block_refs.iter().map(|b| b.to_string()).join(",")
256        );
257
258        let mut missing_blocks = BTreeSet::new();
259
260        for (found, block_ref) in self
261            .dag_state
262            .read()
263            .contains_blocks(block_refs.clone())
264            .into_iter()
265            .zip(block_refs.iter())
266        {
267            if found || self.suspended_blocks.contains_key(block_ref) {
268                continue;
269            }
270            // Fetches the block if it is not in dag state or suspended.
271            missing_blocks.insert(*block_ref);
272            if self
273                .missing_blocks
274                .insert(*block_ref, BTreeSet::from([block_ref.author]))
275                .is_none()
276            {
277                // We want to report this as a missing ancestor even if there is no block that
278                // is actually references it right now. That will allow us
279                // to seamlessly GC the block later if needed.
280                self.missing_ancestors.entry(*block_ref).or_default();
281
282                let block_ref_hostname =
283                    &self.context.committee.authority(block_ref.author).hostname;
284                self.context
285                    .metrics
286                    .node_metrics
287                    .block_manager_missing_blocks_by_authority
288                    .with_label_values(&[block_ref_hostname])
289                    .inc();
290            }
291        }
292
293        let metrics = &self.context.metrics.node_metrics;
294        metrics
295            .missing_blocks_total
296            .inc_by(missing_blocks.len() as u64);
297        metrics
298            .block_manager_missing_blocks
299            .set(self.missing_blocks.len() as i64);
300
301        missing_blocks
302    }
303
304    // Persists in store all the valid blocks that should be accepted. Method
305    // returns the accepted and persisted blocks.
306    fn verify_block_timestamps_and_accept(
307        &mut self,
308        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
309    ) -> Vec<VerifiedBlock> {
310        // If the median based timestamp is enabled, then we skip all the timestamp
311        // verification and we go straight and accept the blocks.
312        let blocks_to_accept = if self
313            .context
314            .protocol_config
315            .consensus_median_timestamp_with_checkpoint_enforcement()
316        {
317            unsuspended_blocks.into_iter().collect::<Vec<_>>()
318        } else {
319            self.verify_block_timestamps(unsuspended_blocks)
320        };
321
322        // Insert the accepted blocks into DAG state so future blocks including them as
323        // ancestors do not get suspended.
324        self.dag_state
325            .write()
326            .accept_blocks(blocks_to_accept.clone());
327
328        blocks_to_accept
329    }
330
331    // TODO: remove once timestamping is refactored to the new approach.
332    // Verifies each block's timestamp based on its ancestors.Method returns blocks
333    // with valid timestamps.
334    fn verify_block_timestamps(
335        &mut self,
336        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
337    ) -> Vec<VerifiedBlock> {
338        let (gc_enabled, gc_round) = {
339            let dag_state = self.dag_state.read();
340            (dag_state.gc_enabled(), dag_state.gc_round())
341        };
342        // Try to verify the block and its children for timestamp, with ancestor blocks.
343        let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
344        let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
345        {
346            'block: for b in unsuspended_blocks {
347                let ancestors = self.dag_state.read().get_blocks(b.ancestors());
348                assert_eq!(b.ancestors().len(), ancestors.len());
349                let mut ancestor_blocks = vec![];
350                'ancestor: for (ancestor_ref, found) in
351                    b.ancestors().iter().zip(ancestors.into_iter())
352                {
353                    if let Some(found_block) = found {
354                        // This invariant should be guaranteed by DagState.
355                        assert_eq!(ancestor_ref, &found_block.reference());
356                        ancestor_blocks.push(Some(found_block));
357                        continue 'ancestor;
358                    }
359                    // blocks_to_accept have not been added to DagState yet, but they
360                    // can appear in ancestors.
361                    if blocks_to_accept.contains_key(ancestor_ref) {
362                        ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
363                        continue 'ancestor;
364                    }
365                    // If an ancestor is already rejected, reject this block as well.
366                    if blocks_to_reject.contains_key(ancestor_ref) {
367                        blocks_to_reject.insert(b.reference(), b);
368                        continue 'block;
369                    }
370
371                    // When gc is enabled it's possible that we indeed won't find any ancestors that
372                    // are passed gc_round. That's ok. We don't need to panic here.
373                    // We do want to panic if gc_enabled we and have an ancestor that is > gc_round,
374                    // or gc is disabled.
375                    if gc_enabled
376                        && ancestor_ref.round > GENESIS_ROUND
377                        && ancestor_ref.round <= gc_round
378                    {
379                        debug!(
380                            "Block {:?} has a missing ancestor: {:?} passed GC round {}",
381                            b.reference(),
382                            ancestor_ref,
383                            gc_round
384                        );
385                        ancestor_blocks.push(None);
386                    } else {
387                        panic!(
388                            "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
389                        );
390                    }
391                }
392                if let Err(e) =
393                    self.block_verifier
394                        .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
395                {
396                    warn!("Block {:?} failed to verify ancestors: {}", b, e);
397                    blocks_to_reject.insert(b.reference(), b);
398                } else {
399                    blocks_to_accept.insert(b.reference(), b);
400                }
401            }
402        }
403
404        // TODO: report blocks_to_reject to peers.
405        for (block_ref, block) in blocks_to_reject {
406            let hostname = self
407                .context
408                .committee
409                .authority(block_ref.author)
410                .hostname
411                .clone();
412
413            self.context
414                .metrics
415                .node_metrics
416                .invalid_blocks
417                .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
418                .inc();
419            warn!("Invalid block {:?} is rejected", block);
420        }
421
422        let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
423
424        // Insert the accepted blocks into DAG state so future blocks including them as
425        // ancestors do not get suspended.
426        self.dag_state
427            .write()
428            .accept_blocks(blocks_to_accept.clone());
429
430        blocks_to_accept
431    }
432
433    /// Tries to accept the provided block. To accept a block its ancestors must
434    /// have been already successfully accepted. If block is accepted then
435    /// Some result is returned. None is returned when either the block is
436    /// suspended or the block has been already accepted before.
437    fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
438        let block_ref = block.reference();
439        let mut missing_ancestors = BTreeSet::new();
440        let mut ancestors_to_fetch = BTreeSet::new();
441        let dag_state = self.dag_state.read();
442        let gc_round = dag_state.gc_round();
443        let gc_enabled = dag_state.gc_enabled();
444
445        // If block has been already received and suspended, or already processed and
446        // stored, or is a genesis block, then skip it.
447        if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
448            return TryAcceptResult::Processed;
449        }
450
451        // If the block is <= gc_round, then we simply skip its processing as there is
452        // no meaning do any action on it or even store it.
453        if gc_enabled && block.round() <= gc_round {
454            let hostname = self
455                .context
456                .committee
457                .authority(block.author())
458                .hostname
459                .as_str();
460            self.context
461                .metrics
462                .node_metrics
463                .block_manager_skipped_blocks
464                .with_label_values(&[hostname])
465                .inc();
466            return TryAcceptResult::Skipped;
467        }
468
469        // Keep only the ancestors that are greater than the GC round to check for their
470        // existence. Keep in mind that if GC is disabled then gc_round will be
471        // 0 and all ancestors will be considered.
472        let ancestors = if gc_enabled {
473            block
474                .ancestors()
475                .iter()
476                .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
477                .cloned()
478                .collect::<Vec<_>>()
479        } else {
480            block.ancestors().to_vec()
481        };
482
483        // make sure that we have all the required ancestors in store
484        for (found, ancestor) in dag_state
485            .contains_blocks(ancestors.clone())
486            .into_iter()
487            .zip(ancestors.iter())
488        {
489            if !found {
490                missing_ancestors.insert(*ancestor);
491
492                // mark the block as having missing ancestors
493                self.missing_ancestors
494                    .entry(*ancestor)
495                    .or_default()
496                    .insert(block_ref);
497
498                let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
499                self.context
500                    .metrics
501                    .node_metrics
502                    .block_manager_missing_ancestors_by_authority
503                    .with_label_values(&[ancestor_hostname])
504                    .inc();
505
506                // Add the ancestor to the missing blocks set only if it doesn't already exist
507                // in the suspended blocks - meaning that we already have its
508                // payload.
509                if !self.suspended_blocks.contains_key(ancestor) {
510                    // Fetches the block if it is not in dag state or suspended.
511                    ancestors_to_fetch.insert(*ancestor);
512                    // We also want to keep track of the authorities that have this block.
513                    // This block could be already missing, so we just update the set  of
514                    // authorities who have it.
515                    let entry = self.missing_blocks.entry(*ancestor);
516                    match entry {
517                        Entry::Vacant(v) => {
518                            v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
519                            self.context
520                                .metrics
521                                .node_metrics
522                                .block_manager_missing_blocks_by_authority
523                                .with_label_values(&[ancestor_hostname])
524                                .inc();
525                        }
526                        Entry::Occupied(mut o) => {
527                            o.get_mut().insert(block_ref.author);
528                        }
529                    }
530                }
531            }
532        }
533
534        // Remove the block ref from the `missing_blocks` - if exists - since we now
535        // have received the block. The block might still get suspended, but we
536        // won't report it as missing in order to not re-fetch.
537        self.missing_blocks.remove(&block.reference());
538
539        if !missing_ancestors.is_empty() {
540            let hostname = self
541                .context
542                .committee
543                .authority(block.author())
544                .hostname
545                .as_str();
546            self.context
547                .metrics
548                .node_metrics
549                .block_suspensions
550                .with_label_values(&[hostname])
551                .inc();
552            self.suspended_blocks
553                .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
554            return TryAcceptResult::Suspended(ancestors_to_fetch);
555        }
556
557        TryAcceptResult::Accepted(block)
558    }
559
560    /// Given an accepted block `accepted_block` it attempts to accept all the
561    /// suspended children blocks assuming such exist. All the unsuspended /
562    /// accepted blocks are returned as a vector in causal order.
563    fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
564        let mut unsuspended_blocks = vec![];
565        let mut to_process_blocks = vec![accepted_block];
566
567        while let Some(block_ref) = to_process_blocks.pop() {
568            // And try to check if its direct children can be unsuspended
569            if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
570                for r in block_refs_with_missing_deps {
571                    // For each dependency try to unsuspend it. If that's successful then we add it
572                    // to the queue so we can recursively try to unsuspend its
573                    // children.
574                    if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
575                        to_process_blocks.push(block.block.reference());
576                        unsuspended_blocks.push(block);
577                    }
578                }
579            }
580        }
581
582        let now = Instant::now();
583
584        // Report the unsuspended blocks
585        for block in &unsuspended_blocks {
586            let hostname = self
587                .context
588                .committee
589                .authority(block.block.author())
590                .hostname
591                .as_str();
592            self.context
593                .metrics
594                .node_metrics
595                .block_unsuspensions
596                .with_label_values(&[hostname])
597                .inc();
598            self.context
599                .metrics
600                .node_metrics
601                .suspended_block_time
602                .with_label_values(&[hostname])
603                .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
604        }
605
606        unsuspended_blocks
607            .into_iter()
608            .map(|block| block.block)
609            .collect()
610    }
611
612    /// Attempts to unsuspend a block by checking its ancestors and removing the
613    /// `accepted_dependency` by its local set. If there is no missing
614    /// dependency then this block can be unsuspended immediately and is removed
615    /// from the `suspended_blocks` map.
616    fn try_unsuspend_block(
617        &mut self,
618        block_ref: &BlockRef,
619        accepted_dependency: &BlockRef,
620    ) -> Option<SuspendedBlock> {
621        let block = self
622            .suspended_blocks
623            .get_mut(block_ref)
624            .expect("Block should be in suspended map");
625
626        assert!(
627            block.missing_ancestors.remove(accepted_dependency),
628            "Block reference {} should be present in missing dependencies of {:?}",
629            block_ref,
630            block.block
631        );
632
633        if block.missing_ancestors.is_empty() {
634            // we have no missing dependency, so we unsuspend the block and return it
635            return self.suspended_blocks.remove(block_ref);
636        }
637        None
638    }
639
640    /// Tries to unsuspend any blocks for the latest gc round. If gc round
641    /// hasn't changed then no blocks will be unsuspended due to
642    /// this action.
643    #[instrument(level = "trace", skip_all)]
644    pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
645        let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
646        let (gc_enabled, gc_round) = {
647            let dag_state = self.dag_state.read();
648            (dag_state.gc_enabled(), dag_state.gc_round())
649        };
650        let mut blocks_unsuspended_below_gc_round = 0;
651        let mut blocks_gc_ed = 0;
652
653        if !gc_enabled {
654            trace!("GC is disabled, no blocks will attempt to get unsuspended.");
655            return;
656        }
657
658        while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
659            // If the first block in the missing ancestors is higher than the gc_round, then
660            // we can't unsuspend it yet. So we just put it back
661            // and we terminate the iteration as any next entry will be of equal or higher
662            // round anyways.
663            if block_ref.round > gc_round {
664                return;
665            }
666
667            blocks_gc_ed += 1;
668
669            let hostname = self
670                .context
671                .committee
672                .authority(block_ref.author)
673                .hostname
674                .as_str();
675            self.context
676                .metrics
677                .node_metrics
678                .block_manager_gced_blocks
679                .with_label_values(&[hostname])
680                .inc();
681
682            assert!(
683                !self.suspended_blocks.contains_key(block_ref),
684                "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
685            );
686
687            // Also remove it from the missing list - we don't want to keep looking for it.
688            self.missing_blocks.remove(block_ref);
689
690            // Find all the children blocks that have a dependency on this one and try to
691            // unsuspend them
692            let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
693
694            unsuspended_blocks.iter().for_each(|block| {
695                if block.round() <= gc_round {
696                    blocks_unsuspended_below_gc_round += 1;
697                }
698            });
699
700            // Now validate their timestamps and accept them
701            let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
702            for block in accepted_blocks {
703                let hostname = self
704                    .context
705                    .committee
706                    .authority(block.author())
707                    .hostname
708                    .as_str();
709                self.context
710                    .metrics
711                    .node_metrics
712                    .block_manager_gc_unsuspended_blocks
713                    .with_label_values(&[hostname])
714                    .inc();
715            }
716        }
717
718        debug!(
719            "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
720            blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
721        );
722    }
723
724    /// Returns all the blocks that are currently missing and needed in order to
725    /// accept suspended blocks. For each block reference it returns the set of
726    /// authorities who have this block.
727    pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
728        self.missing_blocks.clone()
729    }
730
731    /// Returns all the block refs that are currently missing.
732    #[cfg(test)]
733    pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
734        self.missing_blocks.keys().cloned().collect()
735    }
736
737    fn update_stats(&mut self, missing_blocks: u64) {
738        let metrics = &self.context.metrics.node_metrics;
739        metrics.missing_blocks_total.inc_by(missing_blocks);
740        metrics
741            .block_manager_suspended_blocks
742            .set(self.suspended_blocks.len() as i64);
743        metrics
744            .block_manager_missing_ancestors
745            .set(self.missing_ancestors.len() as i64);
746        metrics
747            .block_manager_missing_blocks
748            .set(self.missing_blocks.len() as i64);
749    }
750
751    fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
752        let (min_round, max_round) =
753            if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
754                (curr_min.min(block.round()), curr_max.max(block.round()))
755            } else {
756                (block.round(), block.round())
757            };
758        self.received_block_rounds[block.author()] = Some((min_round, max_round));
759
760        let hostname = &self.context.committee.authority(block.author()).hostname;
761        self.context
762            .metrics
763            .node_metrics
764            .lowest_verified_authority_round
765            .with_label_values(&[hostname])
766            .set(min_round.into());
767        self.context
768            .metrics
769            .node_metrics
770            .highest_verified_authority_round
771            .with_label_values(&[hostname])
772            .set(max_round.into());
773    }
774
775    /// Checks if block manager is empty.
776    #[cfg(test)]
777    pub(crate) fn is_empty(&self) -> bool {
778        self.suspended_blocks.is_empty()
779            && self.missing_ancestors.is_empty()
780            && self.missing_blocks.is_empty()
781    }
782
783    /// Returns all the suspended blocks whose causal history we miss hence we
784    /// can't accept them yet.
785    #[cfg(test)]
786    fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
787        self.suspended_blocks.keys().cloned().collect()
788    }
789}
790
791// Result of trying to accept one block.
792enum TryAcceptResult {
793    // The block is accepted. Wraps the block itself.
794    Accepted(VerifiedBlock),
795    // The block is suspended. Wraps ancestors to be fetched.
796    Suspended(BTreeSet<BlockRef>),
797    // The block has been processed before and already exists in BlockManager (and is suspended)
798    // or in DagState (so has been already accepted). No further processing has been done at
799    // this point.
800    Processed,
801    // When a received block is <= gc_round, then we simply skip its processing as there is no
802    // meaning do any action on it or even store it.
803    Skipped,
804}
805
806#[cfg(test)]
807mod tests {
808    use std::{collections::BTreeSet, sync::Arc};
809
810    use consensus_config::AuthorityIndex;
811    use parking_lot::RwLock;
812    use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
813    use rstest::rstest;
814
815    use crate::{
816        CommitDigest, Round,
817        block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
818        block_manager::BlockManager,
819        block_verifier::{BlockVerifier, NoopBlockVerifier, SignedBlockVerifier},
820        commit::TrustedCommit,
821        context::Context,
822        dag_state::DagState,
823        error::{ConsensusError, ConsensusResult},
824        storage::mem_store::MemStore,
825        test_dag_builder::DagBuilder,
826        test_dag_parser::parse_dag,
827        transaction::NoopTransactionVerifier,
828    };
829
830    #[tokio::test]
831    async fn suspend_blocks_with_missing_ancestors() {
832        // GIVEN
833        let (context, _key_pairs) = Context::new_for_test(4);
834        let context = Arc::new(context);
835        let store = Arc::new(MemStore::new());
836        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
837
838        let mut block_manager =
839            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
840
841        // create a DAG
842        let mut dag_builder = DagBuilder::new(context.clone());
843        dag_builder
844            .layers(1..=2) // 2 rounds
845            .authorities(vec![
846                AuthorityIndex::new_for_test(0),
847                AuthorityIndex::new_for_test(2),
848            ]) // Create equivocating blocks for 2 authorities
849            .equivocate(3)
850            .build();
851
852        // Take only the blocks of round 2 and try to accept them
853        let round_2_blocks = dag_builder
854            .blocks
855            .into_iter()
856            .filter_map(|(_, block)| (block.round() == 2).then_some(block))
857            .collect::<Vec<VerifiedBlock>>();
858
859        // WHEN
860        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
861
862        // THEN
863        assert!(accepted_blocks.is_empty());
864
865        // AND the returned missing ancestors should be the same as the provided block
866        // ancestors
867        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
868        let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
869        assert_eq!(missing, missing_block_refs);
870
871        // AND the missing blocks are the parents of the round 2 blocks. Since this is a
872        // fully connected DAG taking the ancestors of the first element
873        // suffices.
874        assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
875
876        // AND suspended blocks should return the round_2_blocks
877        assert_eq!(
878            block_manager.suspended_blocks_refs(),
879            round_2_blocks
880                .into_iter()
881                .map(|block| block.reference())
882                .collect::<BTreeSet<_>>()
883        );
884
885        // AND each missing block should be known to all authorities
886        let known_by_manager = block_manager
887            .missing_blocks()
888            .iter()
889            .next()
890            .expect("We should expect at least two elements there")
891            .1
892            .clone();
893        assert_eq!(
894            known_by_manager,
895            context
896                .committee
897                .authorities()
898                .map(|(a, _)| a)
899                .collect::<BTreeSet<_>>()
900        );
901    }
902
903    #[tokio::test]
904    async fn try_accept_block_returns_missing_blocks() {
905        let (context, _key_pairs) = Context::new_for_test(4);
906        let context = Arc::new(context);
907        let store = Arc::new(MemStore::new());
908        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
909
910        let mut block_manager =
911            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
912
913        // create a DAG
914        let mut dag_builder = DagBuilder::new(context);
915        dag_builder
916            .layers(1..=4) // 4 rounds
917            .authorities(vec![
918                AuthorityIndex::new_for_test(0),
919                AuthorityIndex::new_for_test(2),
920            ]) // Create equivocating blocks for 2 authorities
921            .equivocate(3) // Use 3 equivocations blocks per authority
922            .build();
923
924        // Take the blocks from round 4 up to 2 (included). Only the first block of each
925        // round should return missing ancestors when try to accept
926        for (_, block) in dag_builder
927            .blocks
928            .into_iter()
929            .rev()
930            .take_while(|(_, block)| block.round() >= 2)
931        {
932            // WHEN
933            let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
934
935            // THEN
936            assert!(accepted_blocks.is_empty());
937
938            let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
939            assert_eq!(missing, block_ancestors);
940        }
941    }
942
943    #[tokio::test]
944    async fn accept_blocks_with_complete_causal_history() {
945        // GIVEN
946        let (context, _key_pairs) = Context::new_for_test(4);
947        let context = Arc::new(context);
948        let store = Arc::new(MemStore::new());
949        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
950
951        let mut block_manager =
952            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
953
954        // create a DAG of 2 rounds
955        let mut dag_builder = DagBuilder::new(context);
956        dag_builder.layers(1..=2).build();
957
958        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
959
960        // WHEN
961        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
962
963        // THEN
964        assert_eq!(accepted_blocks.len(), 8);
965        assert_eq!(
966            accepted_blocks,
967            all_blocks
968                .iter()
969                .filter(|block| block.round() > 0)
970                .cloned()
971                .collect::<Vec<VerifiedBlock>>()
972        );
973        assert!(missing.is_empty());
974        assert!(block_manager.is_empty());
975
976        // WHEN trying to accept same blocks again, then none will be returned as those
977        // have been already accepted
978        let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
979        assert!(accepted_blocks.is_empty());
980    }
981
982    /// Tests that the block manager accepts blocks when some or all of their
983    /// causal history is below or equal to the GC round.
984    #[tokio::test]
985    async fn accept_blocks_with_causal_history_below_gc_round() {
986        // GIVEN
987        let (mut context, _key_pairs) = Context::new_for_test(4);
988
989        // We set the gc depth to 4
990        context
991            .protocol_config
992            .set_consensus_gc_depth_for_testing(4);
993        let context = Arc::new(context);
994        let store = Arc::new(MemStore::new());
995        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
996
997        // We "fake" the commit for round 10, so we can test the GC round 6
998        // (commit_round - gc_depth = 10 - 4 = 6)
999        let last_commit = TrustedCommit::new_for_test(
1000            10,
1001            CommitDigest::MIN,
1002            context.clock.timestamp_utc_ms(),
1003            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1004            vec![],
1005        );
1006        dag_state.write().set_last_commit(last_commit);
1007        assert_eq!(
1008            dag_state.read().gc_round(),
1009            6,
1010            "GC round should have moved to round 6"
1011        );
1012
1013        let mut block_manager = BlockManager::new(context, dag_state, Arc::new(NoopBlockVerifier));
1014
1015        // create a DAG of 10 rounds with some weak links for the blocks of round 9
1016        let dag_str = "DAG {
1017            Round 0 : { 4 },
1018            Round 1 : { * },
1019            Round 2 : { * },
1020            Round 3 : { * },
1021            Round 4 : { * },
1022            Round 5 : { * },
1023            Round 6 : { * },
1024            Round 7 : {
1025                A -> [*],
1026                B -> [*],
1027                C -> [*],
1028            }
1029            Round 8 : {
1030                A -> [*],
1031                B -> [*],
1032                C -> [*],
1033            },
1034            Round 9 : {
1035                A -> [A8, B8, C8, D6],
1036                B -> [A8, B8, C8, D6],
1037                C -> [A8, B8, C8, D6],
1038                D -> [A8, B8, C8, D6],
1039            },
1040            Round 10 : { * },
1041        }";
1042
1043        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1044
1045        // Now take all the blocks for round 7 & 8 , which are above the gc_round = 6.
1046        // All those blocks should eventually be returned as accepted. Pay attention
1047        // that without GC none of those blocks should get accepted.
1048        let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1049
1050        for rounds_range in blocks_ranges {
1051            let all_blocks = dag_builder
1052                .blocks
1053                .values()
1054                .filter(|block| rounds_range.contains(&block.round()))
1055                .cloned()
1056                .collect::<Vec<_>>();
1057
1058            // WHEN
1059            let mut reversed_blocks = all_blocks.clone();
1060            reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1061            let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1062            accepted_blocks.sort_by_key(|a| a.reference());
1063
1064            // THEN
1065            assert_eq!(accepted_blocks, all_blocks.to_vec());
1066            assert!(missing.is_empty());
1067            assert!(block_manager.is_empty());
1068
1069            let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1070            assert!(accepted_blocks.is_empty());
1071        }
1072    }
1073
1074    /// Blocks that are attempted to be accepted but are <= gc_round they will
1075    /// be skipped for processing. Nothing should be stored or trigger any
1076    /// unsuspension etc.
1077    #[tokio::test]
1078    async fn skip_accepting_blocks_below_gc_round() {
1079        // GIVEN
1080        let (mut context, _key_pairs) = Context::new_for_test(4);
1081        // We set the gc depth to 4
1082        context
1083            .protocol_config
1084            .set_consensus_gc_depth_for_testing(4);
1085        let context = Arc::new(context);
1086        let store = Arc::new(MemStore::new());
1087        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1088
1089        // We "fake" the commit for round 10, so we can test the GC round 6
1090        // (commit_round - gc_depth = 10 - 4 = 6)
1091        let last_commit = TrustedCommit::new_for_test(
1092            10,
1093            CommitDigest::MIN,
1094            context.clock.timestamp_utc_ms(),
1095            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1096            vec![],
1097        );
1098        dag_state.write().set_last_commit(last_commit);
1099        assert_eq!(
1100            dag_state.read().gc_round(),
1101            6,
1102            "GC round should have moved to round 6"
1103        );
1104
1105        let mut block_manager =
1106            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1107
1108        // create a DAG of 6 rounds
1109        let mut dag_builder = DagBuilder::new(context);
1110        dag_builder.layers(1..=6).build();
1111
1112        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1113
1114        // WHEN
1115        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks);
1116
1117        // THEN
1118        assert!(accepted_blocks.is_empty());
1119        assert!(missing.is_empty());
1120        assert!(block_manager.is_empty());
1121    }
1122
1123    /// The test generate blocks for a well connected DAG and feed them to block
1124    /// manager in random order. In the end all the blocks should be
1125    /// uniquely suspended and no missing blocks should exist. The test will run
1126    /// for both gc_enabled/disabled. When gc is enabled we set a high
1127    /// gc_depth value so in practice gc_round will be 0, but we'll be able to
1128    /// test in the common case that this work exactly the same way as when
1129    /// gc is disabled.
1130    #[rstest]
1131    #[tokio::test]
1132    async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1133        // GIVEN
1134        let (mut context, _key_pairs) = Context::new_for_test(4);
1135
1136        if gc_enabled {
1137            context
1138                .protocol_config
1139                .set_consensus_gc_depth_for_testing(10);
1140        }
1141        let context = Arc::new(context);
1142
1143        // create a DAG of rounds 1 ~ 3
1144        let mut dag_builder = DagBuilder::new(context.clone());
1145        dag_builder.layers(1..=3).build();
1146
1147        let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1148
1149        // Now randomize the sequence of sending the blocks to block manager. In the end
1150        // all the blocks should be uniquely suspended and no missing blocks
1151        // should exist.
1152        for seed in 0..100u8 {
1153            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1154
1155            let store = Arc::new(MemStore::new());
1156            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1157
1158            let mut block_manager =
1159                BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1160
1161            // WHEN
1162            let mut all_accepted_blocks = vec![];
1163            for block in &all_blocks {
1164                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1165
1166                all_accepted_blocks.extend(accepted_blocks);
1167            }
1168
1169            // THEN
1170            all_accepted_blocks.sort_by_key(|b| b.reference());
1171            all_blocks.sort_by_key(|b| b.reference());
1172
1173            assert_eq!(
1174                all_accepted_blocks, all_blocks,
1175                "Failed acceptance sequence for seed {seed}"
1176            );
1177            assert!(block_manager.is_empty());
1178        }
1179    }
1180
1181    /// Tests that `missing_blocks()` correctly infers the authorities
1182    /// referencing each missing block based on accepted blocks in the DAG.
1183    #[tokio::test]
1184    async fn authorities_that_know_missing_blocks() {
1185        let (context, _key_pairs) = Context::new_for_test(4);
1186
1187        let context = Arc::new(context);
1188
1189        // create a DAG of rounds 1 ~ 3
1190        let mut dag_builder = DagBuilder::new(context.clone());
1191        dag_builder.layers(1..=3).build();
1192
1193        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1194
1195        let blocks_round_2 = all_blocks
1196            .iter()
1197            .filter(|block| block.round() == 2)
1198            .cloned()
1199            .collect::<Vec<_>>();
1200
1201        let blocks_round_1 = all_blocks
1202            .iter()
1203            .filter(|block| block.round() == 1)
1204            .map(|block| block.reference())
1205            .collect::<BTreeSet<_>>();
1206
1207        let store = Arc::new(MemStore::new());
1208        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1209
1210        let mut block_manager = BlockManager::new(context, dag_state, Arc::new(NoopBlockVerifier));
1211
1212        let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1213        // Blocks from round 1 are all missing, since the DAG is fully connected
1214        assert_eq!(missing_blocks, blocks_round_1);
1215
1216        let missing_blocks_with_authorities = block_manager.missing_blocks();
1217
1218        let block_round_1_authority_0 = all_blocks
1219            .iter()
1220            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1221            .map(|block| block.reference())
1222            .next()
1223            .unwrap();
1224        let block_round_1_authority_1 = all_blocks
1225            .iter()
1226            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1227            .map(|block| block.reference())
1228            .next()
1229            .unwrap();
1230        assert_eq!(
1231            missing_blocks_with_authorities[&block_round_1_authority_0],
1232            BTreeSet::from([AuthorityIndex::new_for_test(0)])
1233        );
1234        assert_eq!(
1235            missing_blocks_with_authorities[&block_round_1_authority_1],
1236            BTreeSet::from([
1237                AuthorityIndex::new_for_test(0),
1238                AuthorityIndex::new_for_test(1)
1239            ])
1240        );
1241
1242        // Add a new block from round 2 from authority 1, which updates the set of
1243        // authorities that are aware of the missing blocks
1244        block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1245        let missing_blocks_with_authorities = block_manager.missing_blocks();
1246        assert_eq!(
1247            missing_blocks_with_authorities[&block_round_1_authority_0],
1248            BTreeSet::from([
1249                AuthorityIndex::new_for_test(0),
1250                AuthorityIndex::new_for_test(1)
1251            ])
1252        );
1253    }
1254
1255    #[rstest]
1256    #[tokio::test]
1257    async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1258        telemetry_subscribers::init_for_testing();
1259        // GIVEN
1260        let (mut context, _key_pairs) = Context::new_for_test(4);
1261
1262        if gc_depth > 0 {
1263            context
1264                .protocol_config
1265                .set_consensus_gc_depth_for_testing(gc_depth);
1266        }
1267        let context = Arc::new(context);
1268
1269        // create a DAG of rounds 1 ~ gc_depth * 2
1270        let mut dag_builder = DagBuilder::new(context.clone());
1271        dag_builder.layers(1..=gc_depth * 2).build();
1272
1273        // Pay attention that we start from round 2. Round 1 will always be missing so
1274        // no matter what we do we can't unsuspend it unless gc_round has
1275        // advanced to round >= 1.
1276        let mut all_blocks = dag_builder
1277            .blocks
1278            .values()
1279            .filter(|block| block.round() > 1)
1280            .cloned()
1281            .collect::<Vec<_>>();
1282
1283        // Now randomize the sequence of sending the blocks to block manager. In the end
1284        // all the blocks should be uniquely suspended and no missing blocks
1285        // should exist.
1286        for seed in 0..100u8 {
1287            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1288
1289            let store = Arc::new(MemStore::new());
1290            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1291
1292            let mut block_manager = BlockManager::new(
1293                context.clone(),
1294                dag_state.clone(),
1295                Arc::new(NoopBlockVerifier),
1296            );
1297
1298            // WHEN
1299            for block in &all_blocks {
1300                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1301                assert!(accepted_blocks.is_empty());
1302            }
1303            assert!(!block_manager.is_empty());
1304
1305            // AND also call the try_to_find method with some non existing block refs. Those
1306            // should be cleaned up as well once GC kicks in.
1307            let non_existing_refs = (1..=3)
1308                .map(|round| {
1309                    BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1310                })
1311                .collect::<Vec<_>>();
1312            assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1313
1314            // AND
1315            // Trigger a commit which will advance GC round
1316            let last_commit = TrustedCommit::new_for_test(
1317                gc_depth * 2,
1318                CommitDigest::MIN,
1319                context.clock.timestamp_utc_ms(),
1320                BlockRef::new(
1321                    gc_depth * 2,
1322                    AuthorityIndex::new_for_test(0),
1323                    BlockDigest::MIN,
1324                ),
1325                vec![],
1326            );
1327            dag_state.write().set_last_commit(last_commit);
1328
1329            // AND
1330            block_manager.try_unsuspend_blocks_for_latest_gc_round();
1331
1332            // THEN
1333            assert!(block_manager.is_empty());
1334
1335            // AND ensure that all have been accepted to the DAG
1336            for block in &all_blocks {
1337                assert!(dag_state.read().contains_block(&block.reference()));
1338            }
1339        }
1340    }
1341
1342    #[rstest]
1343    #[tokio::test]
1344    async fn try_accept_committed_blocks() {
1345        // GIVEN
1346        let (mut context, _key_pairs) = Context::new_for_test(4);
1347        // We set the gc depth to 4
1348        context
1349            .protocol_config
1350            .set_consensus_gc_depth_for_testing(4);
1351        let context = Arc::new(context);
1352        let store = Arc::new(MemStore::new());
1353        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1354
1355        // We "fake" the commit for round 6, so GC round moves to (commit_round -
1356        // gc_depth = 6 - 4 = 2)
1357        let last_commit = TrustedCommit::new_for_test(
1358            10,
1359            CommitDigest::MIN,
1360            context.clock.timestamp_utc_ms(),
1361            BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1362            vec![],
1363        );
1364        dag_state.write().set_last_commit(last_commit);
1365        assert_eq!(
1366            dag_state.read().gc_round(),
1367            2,
1368            "GC round should have moved to round 2"
1369        );
1370
1371        let mut block_manager =
1372            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1373
1374        // create a DAG of 12 rounds
1375        let mut dag_builder = DagBuilder::new(context);
1376        dag_builder.layers(1..=12).build();
1377
1378        // Now try to accept via the normal acceptance block path the blocks of rounds 7
1379        // ~ 12. None of them should be accepted
1380        let blocks = dag_builder.blocks(7..=12);
1381        let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks);
1382        assert!(accepted_blocks.is_empty());
1383        assert_eq!(missing.len(), 4);
1384
1385        // Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6.
1386        // All of them should be accepted and also the blocks of rounds 7 ~ 12
1387        // should be unsuspended and accepted as well.
1388        let blocks = dag_builder.blocks(3..=6);
1389
1390        // WHEN
1391        let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1392
1393        // THEN
1394        accepted_blocks.sort_by_key(|b| b.reference());
1395
1396        let mut all_blocks = dag_builder.blocks(3..=12);
1397        all_blocks.sort_by_key(|b| b.reference());
1398
1399        assert_eq!(accepted_blocks, all_blocks);
1400        assert!(block_manager.is_empty());
1401    }
1402
1403    struct TestBlockVerifier {
1404        fail: BTreeSet<BlockRef>,
1405    }
1406
1407    impl TestBlockVerifier {
1408        fn new(fail: BTreeSet<BlockRef>) -> Self {
1409            Self { fail }
1410        }
1411    }
1412
1413    impl BlockVerifier for TestBlockVerifier {
1414        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1415            Ok(())
1416        }
1417
1418        fn check_ancestors(
1419            &self,
1420            block: &VerifiedBlock,
1421            _ancestors: &[Option<VerifiedBlock>],
1422            _gc_enabled: bool,
1423            _gc_round: Round,
1424        ) -> ConsensusResult<()> {
1425            if self.fail.contains(&block.reference()) {
1426                Err(ConsensusError::InvalidBlockTimestamp {
1427                    max_timestamp_ms: 0,
1428                    block_timestamp_ms: block.timestamp_ms(),
1429                })
1430            } else {
1431                Ok(())
1432            }
1433        }
1434    }
1435
1436    #[tokio::test]
1437    async fn reject_blocks_failing_verifications() {
1438        let (mut context, _key_pairs) = Context::new_for_test(4);
1439        context
1440            .protocol_config
1441            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
1442        let context = Arc::new(context);
1443
1444        // create a DAG of rounds 1 ~ 5.
1445        let mut dag_builder = DagBuilder::new(context.clone());
1446        dag_builder.layers(1..=5).build();
1447
1448        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1449
1450        // Create a test verifier that fails the blocks of round 3
1451        let test_verifier = TestBlockVerifier::new(
1452            all_blocks
1453                .iter()
1454                .filter(|block| block.round() == 3)
1455                .map(|block| block.reference())
1456                .collect(),
1457        );
1458
1459        // Create BlockManager.
1460        let store = Arc::new(MemStore::new());
1461        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1462        let mut block_manager = BlockManager::new(context, dag_state, Arc::new(test_verifier));
1463
1464        // Try to accept blocks from round 2 ~ 5 into block manager. All of them should
1465        // be suspended.
1466        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1467            all_blocks
1468                .iter()
1469                .filter(|block| block.round() > 1)
1470                .cloned()
1471                .collect(),
1472        );
1473
1474        // Missing refs should all come from round 1.
1475        assert!(accepted_blocks.is_empty());
1476        assert_eq!(missing_refs.len(), 4);
1477        missing_refs.iter().for_each(|missing_ref| {
1478            assert_eq!(missing_ref.round, 1);
1479        });
1480
1481        // Now add round 1 blocks into block manager.
1482        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1483            all_blocks
1484                .iter()
1485                .filter(|block| block.round() == 1)
1486                .cloned()
1487                .collect(),
1488        );
1489
1490        // Only round 1 and round 2 blocks should be accepted.
1491        assert_eq!(accepted_blocks.len(), 8);
1492        accepted_blocks.iter().for_each(|block| {
1493            assert!(block.round() <= 2);
1494        });
1495        assert!(missing_refs.is_empty());
1496
1497        // Other blocks should be rejected and there should be no remaining suspended
1498        // block.
1499        assert!(block_manager.suspended_blocks_refs().is_empty());
1500    }
1501
1502    #[tokio::test]
1503    async fn try_find_blocks() {
1504        // GIVEN
1505        let (context, _key_pairs) = Context::new_for_test(4);
1506        let context = Arc::new(context);
1507        let store = Arc::new(MemStore::new());
1508        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1509
1510        let mut block_manager =
1511            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1512
1513        // create a DAG
1514        let mut dag_builder = DagBuilder::new(context);
1515        dag_builder
1516            .layers(1..=2) // 2 rounds
1517            .authorities(vec![
1518                AuthorityIndex::new_for_test(0),
1519                AuthorityIndex::new_for_test(2),
1520            ]) // Create equivocating blocks for 2 authorities
1521            .equivocate(3)
1522            .build();
1523
1524        // Take only the blocks of round 2 and try to accept them
1525        let round_2_blocks = dag_builder
1526            .blocks
1527            .iter()
1528            .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1529            .collect::<Vec<VerifiedBlock>>();
1530
1531        // All blocks should be missing
1532        let missing_block_refs_from_find =
1533            block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1534        assert_eq!(missing_block_refs_from_find.len(), 10);
1535        assert!(
1536            missing_block_refs_from_find
1537                .iter()
1538                .all(|block_ref| block_ref.round == 2)
1539        );
1540
1541        // Try accept blocks which will cause blocks to be suspended and added to
1542        // missing in block manager.
1543        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1544        assert!(accepted_blocks.is_empty());
1545
1546        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1547        let missing_block_refs_from_accept =
1548            missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1549        assert_eq!(missing, missing_block_refs_from_accept);
1550        assert_eq!(
1551            block_manager.missing_block_refs(),
1552            missing_block_refs_from_accept
1553        );
1554
1555        // No blocks should be accepted and block manager should have made note
1556        // of the missing & suspended blocks.
1557        // Now we can check get the result of try find block with all of the blocks
1558        // from newly created but not accepted round 3.
1559        dag_builder.layer(3).build();
1560
1561        let round_3_blocks = dag_builder
1562            .blocks
1563            .iter()
1564            .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1565            .collect::<Vec<BlockRef>>();
1566
1567        let missing_block_refs_from_find = block_manager.try_find_blocks(
1568            round_2_blocks
1569                .iter()
1570                .map(|b| b.reference())
1571                .chain(round_3_blocks.into_iter())
1572                .collect(),
1573        );
1574
1575        assert_eq!(missing_block_refs_from_find.len(), 4);
1576        assert!(
1577            missing_block_refs_from_find
1578                .iter()
1579                .all(|block_ref| block_ref.round == 3)
1580        );
1581        assert_eq!(
1582            block_manager.missing_block_refs(),
1583            missing_block_refs_from_accept
1584                .into_iter()
1585                .chain(missing_block_refs_from_find.into_iter())
1586                .collect()
1587        );
1588    }
1589
1590    #[rstest]
1591    #[tokio::test]
1592    async fn test_verify_block_timestamps_and_accept(
1593        #[values(false, true)] median_based_timestamp: bool,
1594    ) {
1595        telemetry_subscribers::init_for_testing();
1596        let (mut context, _key_pairs) = Context::new_for_test(4);
1597        context
1598            .protocol_config
1599            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1600                median_based_timestamp,
1601            );
1602
1603        let context = Arc::new(context);
1604        let store = Arc::new(MemStore::new());
1605        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1606
1607        let mut block_manager = BlockManager::new(
1608            context.clone(),
1609            dag_state,
1610            Arc::new(SignedBlockVerifier::new(
1611                context.clone(),
1612                Arc::new(NoopTransactionVerifier {}),
1613            )),
1614        );
1615
1616        // create a DAG where authority 0 timestamp is always higher than the others.
1617        let mut dag_builder = DagBuilder::new(context.clone());
1618        let authorities = context
1619            .committee
1620            .authorities()
1621            .map(|(index, _)| index)
1622            .collect::<Vec<_>>();
1623        dag_builder
1624            .layers(1..=1)
1625            .authorities(authorities.clone())
1626            .with_timestamps(vec![1000, 500, 550, 580])
1627            .build();
1628        dag_builder
1629            .layers(2..=2)
1630            .authorities(authorities.clone())
1631            .with_timestamps(vec![2000, 600, 650, 680])
1632            .build();
1633        dag_builder
1634            .layers(3..=3)
1635            .authorities(authorities)
1636            .with_timestamps(vec![3000, 700, 750, 780])
1637            .build();
1638
1639        // take all the blocks and try to accept them.
1640        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1641
1642        // All blocks should get accepted
1643        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1644
1645        if median_based_timestamp {
1646            // If the median based timestamp is enabled then all the blocks should be
1647            // accepted
1648            assert_eq!(all_blocks, accepted_blocks);
1649            assert!(missing.is_empty());
1650        } else {
1651            // only the blocks of first round will be accepted (and the block of round 2 for
1652            // authority 0) as the rest will be rejected
1653            assert_eq!(accepted_blocks.len(), 5);
1654            for block in accepted_blocks {
1655                if block.author() == AuthorityIndex::new_for_test(0) {
1656                    assert!(block.round() == 1 || block.round() == 2);
1657                } else {
1658                    assert_eq!(block.round(), 1);
1659                }
1660            }
1661        }
1662    }
1663}