consensus_core/
block_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, btree_map::Entry},
7    sync::Arc,
8    time::Instant,
9};
10
11use consensus_config::AuthorityIndex;
12use iota_metrics::monitored_scope;
13use itertools::Itertools as _;
14use parking_lot::RwLock;
15use tracing::{debug, instrument, trace, warn};
16
17use crate::{
18    Round,
19    block::{BlockAPI, BlockRef, GENESIS_ROUND, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    context::Context,
22    dag_state::DagState,
23};
24
25#[derive(Clone)]
26pub(crate) struct SuspendedBlock {
27    block: VerifiedBlock,
28    missing_ancestors: BTreeSet<BlockRef>,
29    timestamp: Instant,
30}
31
32impl SuspendedBlock {
33    pub(crate) fn new(block: VerifiedBlock, missing_ancestors: BTreeSet<BlockRef>) -> Self {
34        Self {
35            block,
36            missing_ancestors,
37            timestamp: Instant::now(),
38        }
39    }
40}
41
42/// Block manager suspends incoming blocks until they are connected to the
43/// existing graph, returning newly connected blocks.
44/// TODO: As it is possible to have Byzantine validators who produce Blocks
45/// without valid causal history we need to make sure that BlockManager takes
46/// care of that and avoid OOM (Out Of Memory) situations.
47pub(crate) struct BlockManager {
48    context: Arc<Context>,
49    dag_state: Arc<RwLock<DagState>>,
50    block_verifier: Arc<dyn BlockVerifier>,
51
52    /// Keeps all the suspended blocks. A suspended block is a block that is
53    /// missing part of its causal history and thus can't be immediately
54    /// processed. A block will remain in this map until all its causal history
55    /// has been successfully processed.
56    suspended_blocks: BTreeMap<BlockRef, SuspendedBlock>,
57    /// A map that keeps all the blocks that we are missing (keys) and the
58    /// corresponding blocks that reference the missing blocks as ancestors
59    /// and need them to get unsuspended. It is possible for a missing
60    /// dependency (key) to be a suspended block, so the block has been
61    /// already fetched but it self is still missing some of its ancestors to be
62    /// processed.
63    missing_ancestors: BTreeMap<BlockRef, BTreeSet<BlockRef>>,
64    /// A map of currently missing blocks to the set of authorities expected
65    /// to have them available locally. This set is approximated based on the
66    /// block's author and the authors of its direct children.
67    /// A block is considered missing if it appears in `missing_ancestors`
68    /// and has not yet been accepted or fetched. Blocks already stored or
69    /// present in `suspended_blocks` are excluded.
70    missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
71    /// A vector that holds a tuple of (lowest_round, highest_round) of received
72    /// blocks per authority. This is used for metrics reporting purposes
73    /// and resets during restarts.
74    received_block_rounds: Vec<Option<(Round, Round)>>,
75}
76
77impl BlockManager {
78    pub(crate) fn new(
79        context: Arc<Context>,
80        dag_state: Arc<RwLock<DagState>>,
81        block_verifier: Arc<dyn BlockVerifier>,
82    ) -> Self {
83        let committee_size = context.committee.size();
84        Self {
85            context,
86            dag_state,
87            block_verifier,
88            suspended_blocks: BTreeMap::new(),
89            missing_ancestors: BTreeMap::new(),
90            missing_blocks: BTreeMap::new(),
91            received_block_rounds: vec![None; committee_size],
92        }
93    }
94
95    /// Tries to accept the provided blocks assuming that all their causal
96    /// history exists. The method returns all the blocks that have been
97    /// successfully processed in round ascending order, that includes also
98    /// previously suspended blocks that have now been able to get accepted.
99    /// Method also returns a set with the missing ancestor blocks.
100    #[tracing::instrument(skip_all)]
101    pub(crate) fn try_accept_blocks(
102        &mut self,
103        blocks: Vec<VerifiedBlock>,
104    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
105        let _s = monitored_scope("BlockManager::try_accept_blocks");
106        self.try_accept_blocks_internal(blocks, false)
107    }
108
109    // Tries to accept blocks that have been committed. Returns all the blocks that
110    // have been accepted, both from the ones provided and any children blocks.
111    #[tracing::instrument(skip_all)]
112    pub(crate) fn try_accept_committed_blocks(
113        &mut self,
114        blocks: Vec<VerifiedBlock>,
115    ) -> Vec<VerifiedBlock> {
116        // If GC is disabled then should not run any of this logic.
117        if !self.dag_state.read().gc_enabled() {
118            return Vec::new();
119        }
120
121        // Just accept the blocks
122        let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
123        let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
124        assert!(
125            missing_blocks.is_empty(),
126            "No missing blocks should be returned for committed blocks"
127        );
128
129        accepted_blocks
130    }
131
132    /// Attempts to accept the provided blocks. When `committed = true` then the
133    /// blocks are considered to be committed via certified commits and
134    /// are handled differently.
135    fn try_accept_blocks_internal(
136        &mut self,
137        mut blocks: Vec<VerifiedBlock>,
138        committed: bool,
139    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
140        let _s = monitored_scope("BlockManager::try_accept_blocks_internal");
141
142        blocks.sort_by_key(|b| b.round());
143        debug!(
144            "Trying to accept blocks: {}",
145            blocks.iter().map(|b| b.reference().to_string()).join(",")
146        );
147
148        let mut accepted_blocks = vec![];
149        let mut missing_blocks = BTreeSet::new();
150
151        for block in blocks {
152            self.update_block_received_metrics(&block);
153
154            // Try to accept the input block.
155            let block_ref = block.reference();
156
157            let mut to_verify_timestamps_and_accept = vec![];
158            if committed {
159                match self.try_accept_one_committed_block(block) {
160                    TryAcceptResult::Accepted(block) => {
161                        // As this is a committed block, then it's already accepted and there is no
162                        // need to verify its timestamps. Just add it to the
163                        // accepted blocks list.
164                        accepted_blocks.push(block);
165                    }
166                    TryAcceptResult::Processed => continue,
167                    TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => {
168                        panic!("Did not expect to suspend or skip a committed block: {block_ref:?}")
169                    }
170                };
171            } else {
172                match self.try_accept_one_block(block) {
173                    TryAcceptResult::Accepted(block) => {
174                        to_verify_timestamps_and_accept.push(block);
175                    }
176                    TryAcceptResult::Suspended(ancestors_to_fetch) => {
177                        debug!(
178                            "Missing ancestors to fetch for block {block_ref}: {}",
179                            ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
180                        );
181                        missing_blocks.extend(ancestors_to_fetch);
182                        continue;
183                    }
184                    TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
185                };
186            };
187
188            // If the block is accepted, try to unsuspend its children blocks if any.
189            let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
190            to_verify_timestamps_and_accept.extend(unsuspended_blocks);
191
192            // Verify block timestamps
193            let blocks_to_accept =
194                self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
195            accepted_blocks.extend(blocks_to_accept);
196        }
197
198        self.update_stats(missing_blocks.len() as u64);
199
200        // Figure out the new missing blocks
201        (accepted_blocks, missing_blocks)
202    }
203
204    fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
205        if self.dag_state.read().contains_block(&block.reference()) {
206            return TryAcceptResult::Processed;
207        }
208
209        // Remove the block from missing and suspended blocks
210        self.missing_blocks.remove(&block.reference());
211
212        // If the block has been already fetched and parked as suspended block, then
213        // remove it. Also find all the references of missing ancestors to
214        // remove those as well. If we don't do that then it's possible once the missing
215        // ancestor is fetched to cause a panic when trying to unsuspend this
216        // children as it won't be found in the suspended blocks map.
217        if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
218            suspended_block
219                .missing_ancestors
220                .iter()
221                .for_each(|ancestor| {
222                    if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
223                        references.remove(&block.reference());
224                    }
225                });
226        }
227
228        // Accept this block before any unsuspended children blocks
229        self.dag_state.write().accept_blocks(vec![block.clone()]);
230
231        TryAcceptResult::Accepted(block)
232    }
233
234    /// Tries to find the provided block_refs in DagState and BlockManager,
235    /// and returns missing block refs.
236    pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
237        let _s = monitored_scope("BlockManager::try_find_blocks");
238        let gc_round = self.dag_state.read().gc_round();
239
240        // No need to fetch blocks that are <= gc_round as they won't get processed
241        // anyways and they'll get skipped. So keep only the ones above.
242        let mut block_refs = block_refs
243            .into_iter()
244            .filter(|block_ref| block_ref.round > gc_round)
245            .collect::<Vec<_>>();
246
247        if block_refs.is_empty() {
248            return BTreeSet::new();
249        }
250
251        block_refs.sort_by_key(|b| b.round);
252
253        debug!(
254            "Trying to find blocks: {}",
255            block_refs.iter().map(|b| b.to_string()).join(",")
256        );
257
258        let mut missing_blocks = BTreeSet::new();
259
260        for (found, block_ref) in self
261            .dag_state
262            .read()
263            .contains_blocks(block_refs.clone())
264            .into_iter()
265            .zip(block_refs.iter())
266        {
267            if found || self.suspended_blocks.contains_key(block_ref) {
268                continue;
269            }
270            // Fetches the block if it is not in dag state or suspended.
271            missing_blocks.insert(*block_ref);
272            if self
273                .missing_blocks
274                .insert(*block_ref, BTreeSet::from([block_ref.author]))
275                .is_none()
276            {
277                // We want to report this as a missing ancestor even if there is no block that
278                // is actually references it right now. That will allow us
279                // to seamlessly GC the block later if needed.
280                self.missing_ancestors.entry(*block_ref).or_default();
281
282                let block_ref_hostname =
283                    &self.context.committee.authority(block_ref.author).hostname;
284                self.context
285                    .metrics
286                    .node_metrics
287                    .block_manager_missing_blocks_by_authority
288                    .with_label_values(&[block_ref_hostname])
289                    .inc();
290            }
291        }
292
293        let metrics = &self.context.metrics.node_metrics;
294        metrics
295            .missing_blocks_total
296            .inc_by(missing_blocks.len() as u64);
297        metrics
298            .block_manager_missing_blocks
299            .set(self.missing_blocks.len() as i64);
300
301        missing_blocks
302    }
303
304    // Persists in store all the valid blocks that should be accepted. Method
305    // returns the accepted and persisted blocks.
306    fn verify_block_timestamps_and_accept(
307        &mut self,
308        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
309    ) -> Vec<VerifiedBlock> {
310        // If the median based timestamp is enabled, then we skip all the timestamp
311        // verification and we go straight and accept the blocks.
312        let blocks_to_accept = if self
313            .context
314            .protocol_config
315            .consensus_median_timestamp_with_checkpoint_enforcement()
316        {
317            unsuspended_blocks.into_iter().collect::<Vec<_>>()
318        } else {
319            self.verify_block_timestamps(unsuspended_blocks)
320        };
321
322        // Insert the accepted blocks into DAG state so future blocks including them as
323        // ancestors do not get suspended.
324        self.dag_state
325            .write()
326            .accept_blocks(blocks_to_accept.clone());
327
328        blocks_to_accept
329    }
330
331    // TODO: remove once timestamping is refactored to the new approach.
332    // Verifies each block's timestamp based on its ancestors.Method returns blocks
333    // with valid timestamps.
334    fn verify_block_timestamps(
335        &mut self,
336        unsuspended_blocks: impl IntoIterator<Item = VerifiedBlock>,
337    ) -> Vec<VerifiedBlock> {
338        let (gc_enabled, gc_round) = {
339            let dag_state = self.dag_state.read();
340            (dag_state.gc_enabled(), dag_state.gc_round())
341        };
342        // Try to verify the block and its children for timestamp, with ancestor blocks.
343        let mut blocks_to_accept: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
344        let mut blocks_to_reject: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
345        {
346            'block: for b in unsuspended_blocks {
347                let ancestors = self.dag_state.read().get_blocks(b.ancestors());
348                assert_eq!(b.ancestors().len(), ancestors.len());
349                let mut ancestor_blocks = vec![];
350                'ancestor: for (ancestor_ref, found) in
351                    b.ancestors().iter().zip(ancestors.into_iter())
352                {
353                    if let Some(found_block) = found {
354                        // This invariant should be guaranteed by DagState.
355                        assert_eq!(ancestor_ref, &found_block.reference());
356                        ancestor_blocks.push(Some(found_block));
357                        continue 'ancestor;
358                    }
359                    // blocks_to_accept have not been added to DagState yet, but they
360                    // can appear in ancestors.
361                    if blocks_to_accept.contains_key(ancestor_ref) {
362                        ancestor_blocks.push(Some(blocks_to_accept[ancestor_ref].clone()));
363                        continue 'ancestor;
364                    }
365                    // If an ancestor is already rejected, reject this block as well.
366                    if blocks_to_reject.contains_key(ancestor_ref) {
367                        blocks_to_reject.insert(b.reference(), b);
368                        continue 'block;
369                    }
370
371                    // When gc is enabled it's possible that we indeed won't find any ancestors that
372                    // are passed gc_round. That's ok. We don't need to panic here.
373                    // We do want to panic if gc_enabled we and have an ancestor that is > gc_round,
374                    // or gc is disabled.
375                    if gc_enabled
376                        && ancestor_ref.round > GENESIS_ROUND
377                        && ancestor_ref.round <= gc_round
378                    {
379                        debug!(
380                            "Block {:?} has a missing ancestor: {:?} passed GC round {}",
381                            b.reference(),
382                            ancestor_ref,
383                            gc_round
384                        );
385                        ancestor_blocks.push(None);
386                    } else {
387                        panic!(
388                            "Unsuspended block {b:?} has a missing ancestor! Ancestor not found in DagState: {ancestor_ref:?}"
389                        );
390                    }
391                }
392                if let Err(e) =
393                    self.block_verifier
394                        .check_ancestors(&b, &ancestor_blocks, gc_enabled, gc_round)
395                {
396                    warn!("Block {:?} failed to verify ancestors: {}", b, e);
397                    blocks_to_reject.insert(b.reference(), b);
398                } else {
399                    blocks_to_accept.insert(b.reference(), b);
400                }
401            }
402        }
403
404        // TODO: report blocks_to_reject to peers.
405        for (block_ref, block) in blocks_to_reject {
406            let hostname = self
407                .context
408                .committee
409                .authority(block_ref.author)
410                .hostname
411                .clone();
412
413            self.context
414                .metrics
415                .node_metrics
416                .invalid_blocks
417                .with_label_values(&[hostname.as_str(), "accept_block", "InvalidAncestors"])
418                .inc();
419            warn!("Invalid block {:?} is rejected", block);
420        }
421
422        let blocks_to_accept = blocks_to_accept.values().cloned().collect::<Vec<_>>();
423
424        // Insert the accepted blocks into DAG state so future blocks including them as
425        // ancestors do not get suspended.
426        self.dag_state
427            .write()
428            .accept_blocks(blocks_to_accept.clone());
429
430        blocks_to_accept
431    }
432
433    /// Tries to accept the provided block. To accept a block its ancestors must
434    /// have been already successfully accepted. If block is accepted then
435    /// Some result is returned. None is returned when either the block is
436    /// suspended or the block has been already accepted before.
437    fn try_accept_one_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
438        let block_ref = block.reference();
439        let mut missing_ancestors = BTreeSet::new();
440        let mut ancestors_to_fetch = BTreeSet::new();
441        let dag_state = self.dag_state.read();
442        let gc_round = dag_state.gc_round();
443        let gc_enabled = dag_state.gc_enabled();
444
445        // If block has been already received and suspended, or already processed and
446        // stored, or is a genesis block, then skip it.
447        if self.suspended_blocks.contains_key(&block_ref) || dag_state.contains_block(&block_ref) {
448            return TryAcceptResult::Processed;
449        }
450
451        // If the block is <= gc_round, then we simply skip its processing as there is
452        // no meaning do any action on it or even store it.
453        if gc_enabled && block.round() <= gc_round {
454            let hostname = self
455                .context
456                .committee
457                .authority(block.author())
458                .hostname
459                .as_str();
460            self.context
461                .metrics
462                .node_metrics
463                .block_manager_skipped_blocks
464                .with_label_values(&[hostname])
465                .inc();
466            return TryAcceptResult::Skipped;
467        }
468
469        // Keep only the ancestors that are greater than the GC round to check for their
470        // existence. Keep in mind that if GC is disabled then gc_round will be
471        // 0 and all ancestors will be considered.
472        let ancestors = if gc_enabled {
473            block
474                .ancestors()
475                .iter()
476                .filter(|ancestor| ancestor.round == GENESIS_ROUND || ancestor.round > gc_round)
477                .cloned()
478                .collect::<Vec<_>>()
479        } else {
480            block.ancestors().to_vec()
481        };
482
483        // make sure that we have all the required ancestors in store
484        for (found, ancestor) in dag_state
485            .contains_blocks(ancestors.clone())
486            .into_iter()
487            .zip(ancestors.iter())
488        {
489            if !found {
490                missing_ancestors.insert(*ancestor);
491
492                // mark the block as having missing ancestors
493                self.missing_ancestors
494                    .entry(*ancestor)
495                    .or_default()
496                    .insert(block_ref);
497
498                let ancestor_hostname = &self.context.committee.authority(ancestor.author).hostname;
499                self.context
500                    .metrics
501                    .node_metrics
502                    .block_manager_missing_ancestors_by_authority
503                    .with_label_values(&[ancestor_hostname])
504                    .inc();
505
506                // Add the ancestor to the missing blocks set only if it doesn't already exist
507                // in the suspended blocks - meaning that we already have its
508                // payload.
509                if !self.suspended_blocks.contains_key(ancestor) {
510                    // Fetches the block if it is not in dag state or suspended.
511                    ancestors_to_fetch.insert(*ancestor);
512                    // We also want to keep track of the authorities that have this block.
513                    // This block could be already missing, so we just update the set  of
514                    // authorities who have it.
515                    let entry = self.missing_blocks.entry(*ancestor);
516                    match entry {
517                        Entry::Vacant(v) => {
518                            v.insert(BTreeSet::from([ancestor.author, block_ref.author]));
519                            self.context
520                                .metrics
521                                .node_metrics
522                                .block_manager_missing_blocks_by_authority
523                                .with_label_values(&[ancestor_hostname])
524                                .inc();
525                        }
526                        Entry::Occupied(mut o) => {
527                            o.get_mut().insert(block_ref.author);
528                        }
529                    }
530                }
531            }
532        }
533
534        // Remove the block ref from the `missing_blocks` - if exists - since we now
535        // have received the block. The block might still get suspended, but we
536        // won't report it as missing in order to not re-fetch.
537        self.missing_blocks.remove(&block.reference());
538
539        if !missing_ancestors.is_empty() {
540            let hostname = self
541                .context
542                .committee
543                .authority(block.author())
544                .hostname
545                .as_str();
546            self.context
547                .metrics
548                .node_metrics
549                .block_suspensions
550                .with_label_values(&[hostname])
551                .inc();
552            self.suspended_blocks
553                .insert(block_ref, SuspendedBlock::new(block, missing_ancestors));
554            return TryAcceptResult::Suspended(ancestors_to_fetch);
555        }
556
557        TryAcceptResult::Accepted(block)
558    }
559
560    /// Given an accepted block `accepted_block` it attempts to accept all the
561    /// suspended children blocks assuming such exist. All the unsuspended /
562    /// accepted blocks are returned as a vector in causal order.
563    fn try_unsuspend_children_blocks(&mut self, accepted_block: BlockRef) -> Vec<VerifiedBlock> {
564        let mut unsuspended_blocks = vec![];
565        let mut to_process_blocks = vec![accepted_block];
566
567        while let Some(block_ref) = to_process_blocks.pop() {
568            // And try to check if its direct children can be unsuspended
569            if let Some(block_refs_with_missing_deps) = self.missing_ancestors.remove(&block_ref) {
570                for r in block_refs_with_missing_deps {
571                    // For each dependency try to unsuspend it. If that's successful then we add it
572                    // to the queue so we can recursively try to unsuspend its
573                    // children.
574                    if let Some(block) = self.try_unsuspend_block(&r, &block_ref) {
575                        to_process_blocks.push(block.block.reference());
576                        unsuspended_blocks.push(block);
577                    }
578                }
579            }
580        }
581
582        let now = Instant::now();
583
584        // Report the unsuspended blocks
585        for block in &unsuspended_blocks {
586            let hostname = self
587                .context
588                .committee
589                .authority(block.block.author())
590                .hostname
591                .as_str();
592            self.context
593                .metrics
594                .node_metrics
595                .block_unsuspensions
596                .with_label_values(&[hostname])
597                .inc();
598            self.context
599                .metrics
600                .node_metrics
601                .suspended_block_time
602                .with_label_values(&[hostname])
603                .observe(now.saturating_duration_since(block.timestamp).as_secs_f64());
604        }
605
606        unsuspended_blocks
607            .into_iter()
608            .map(|block| block.block)
609            .collect()
610    }
611
612    /// Attempts to unsuspend a block by checking its ancestors and removing the
613    /// `accepted_dependency` by its local set. If there is no missing
614    /// dependency then this block can be unsuspended immediately and is removed
615    /// from the `suspended_blocks` map.
616    fn try_unsuspend_block(
617        &mut self,
618        block_ref: &BlockRef,
619        accepted_dependency: &BlockRef,
620    ) -> Option<SuspendedBlock> {
621        let block = self
622            .suspended_blocks
623            .get_mut(block_ref)
624            .expect("Block should be in suspended map");
625
626        assert!(
627            block.missing_ancestors.remove(accepted_dependency),
628            "Block reference {} should be present in missing dependencies of {:?}",
629            block_ref,
630            block.block
631        );
632
633        if block.missing_ancestors.is_empty() {
634            // we have no missing dependency, so we unsuspend the block and return it
635            return self.suspended_blocks.remove(block_ref);
636        }
637        None
638    }
639
640    /// Tries to unsuspend any blocks for the latest gc round. If gc round
641    /// hasn't changed then no blocks will be unsuspended due to
642    /// this action.
643    #[instrument(level = "trace", skip_all)]
644    pub(crate) fn try_unsuspend_blocks_for_latest_gc_round(&mut self) {
645        let _s = monitored_scope("BlockManager::try_unsuspend_blocks_for_latest_gc_round");
646        let (gc_enabled, gc_round) = {
647            let dag_state = self.dag_state.read();
648            (dag_state.gc_enabled(), dag_state.gc_round())
649        };
650        let mut blocks_unsuspended_below_gc_round = 0;
651        let mut blocks_gc_ed = 0;
652
653        if !gc_enabled {
654            trace!("GC is disabled, no blocks will attempt to get unsuspended.");
655            return;
656        }
657
658        while let Some((block_ref, _children_refs)) = self.missing_ancestors.first_key_value() {
659            // If the first block in the missing ancestors is higher than the gc_round, then
660            // we can't unsuspend it yet. So we just put it back
661            // and we terminate the iteration as any next entry will be of equal or higher
662            // round anyways.
663            if block_ref.round > gc_round {
664                return;
665            }
666
667            blocks_gc_ed += 1;
668
669            let hostname = self
670                .context
671                .committee
672                .authority(block_ref.author)
673                .hostname
674                .as_str();
675            self.context
676                .metrics
677                .node_metrics
678                .block_manager_gced_blocks
679                .with_label_values(&[hostname])
680                .inc();
681
682            assert!(
683                !self.suspended_blocks.contains_key(block_ref),
684                "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."
685            );
686
687            // Also remove it from the missing list - we don't want to keep looking for it.
688            self.missing_blocks.remove(block_ref);
689
690            // Find all the children blocks that have a dependency on this one and try to
691            // unsuspend them
692            let unsuspended_blocks = self.try_unsuspend_children_blocks(*block_ref);
693
694            unsuspended_blocks.iter().for_each(|block| {
695                if block.round() <= gc_round {
696                    blocks_unsuspended_below_gc_round += 1;
697                }
698            });
699
700            // Now validate their timestamps and accept them
701            let accepted_blocks = self.verify_block_timestamps_and_accept(unsuspended_blocks);
702            for block in accepted_blocks {
703                let hostname = self
704                    .context
705                    .committee
706                    .authority(block.author())
707                    .hostname
708                    .as_str();
709                self.context
710                    .metrics
711                    .node_metrics
712                    .block_manager_gc_unsuspended_blocks
713                    .with_label_values(&[hostname])
714                    .inc();
715            }
716        }
717
718        debug!(
719            "Total {} blocks unsuspended and total blocks {} gc'ed <= gc_round {}",
720            blocks_unsuspended_below_gc_round, blocks_gc_ed, gc_round
721        );
722    }
723
724    /// Returns all the blocks that are currently missing and needed in order to
725    /// accept suspended blocks. For each block reference it returns the set of
726    /// authorities who have this block.
727    pub(crate) fn missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
728        self.missing_blocks.clone()
729    }
730
731    /// Returns all the block refs that are currently missing.
732    #[cfg(test)]
733    pub(crate) fn missing_block_refs(&self) -> BTreeSet<BlockRef> {
734        self.missing_blocks.keys().cloned().collect()
735    }
736
737    fn update_stats(&mut self, missing_blocks: u64) {
738        let metrics = &self.context.metrics.node_metrics;
739        metrics.missing_blocks_total.inc_by(missing_blocks);
740        metrics
741            .block_manager_suspended_blocks
742            .set(self.suspended_blocks.len() as i64);
743        metrics
744            .block_manager_missing_ancestors
745            .set(self.missing_ancestors.len() as i64);
746        metrics
747            .block_manager_missing_blocks
748            .set(self.missing_blocks.len() as i64);
749    }
750
751    fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
752        let (min_round, max_round) =
753            if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
754                (curr_min.min(block.round()), curr_max.max(block.round()))
755            } else {
756                (block.round(), block.round())
757            };
758        self.received_block_rounds[block.author()] = Some((min_round, max_round));
759
760        let hostname = &self.context.committee.authority(block.author()).hostname;
761        self.context
762            .metrics
763            .node_metrics
764            .lowest_verified_authority_round
765            .with_label_values(&[hostname])
766            .set(min_round.into());
767        self.context
768            .metrics
769            .node_metrics
770            .highest_verified_authority_round
771            .with_label_values(&[hostname])
772            .set(max_round.into());
773    }
774
775    /// Checks if block manager is empty.
776    #[cfg(test)]
777    pub(crate) fn is_empty(&self) -> bool {
778        self.suspended_blocks.is_empty()
779            && self.missing_ancestors.is_empty()
780            && self.missing_blocks.is_empty()
781    }
782
783    /// Returns all the suspended blocks whose causal history we miss hence we
784    /// can't accept them yet.
785    #[cfg(test)]
786    fn suspended_blocks_refs(&self) -> BTreeSet<BlockRef> {
787        self.suspended_blocks.keys().cloned().collect()
788    }
789}
790
791// Result of trying to accept one block.
792enum TryAcceptResult {
793    // The block is accepted. Wraps the block itself.
794    Accepted(VerifiedBlock),
795    // The block is suspended. Wraps ancestors to be fetched.
796    Suspended(BTreeSet<BlockRef>),
797    // The block has been processed before and already exists in BlockManager (and is suspended)
798    // or in DagState (so has been already accepted). No further processing has been done at
799    // this point.
800    Processed,
801    // When a received block is <= gc_round, then we simply skip its processing as there is no
802    // meaning do any action on it or even store it.
803    Skipped,
804}
805
806#[cfg(test)]
807mod tests {
808    use std::{collections::BTreeSet, sync::Arc};
809
810    use consensus_config::AuthorityIndex;
811    use parking_lot::RwLock;
812    use rand::{SeedableRng, prelude::StdRng, seq::SliceRandom};
813    use rstest::rstest;
814
815    use crate::{
816        CommitDigest, Round,
817        block::{BlockAPI, BlockDigest, BlockRef, SignedBlock, VerifiedBlock},
818        block_manager::BlockManager,
819        block_verifier::{BlockVerifier, NoopBlockVerifier, SignedBlockVerifier},
820        commit::TrustedCommit,
821        context::Context,
822        dag_state::DagState,
823        error::{ConsensusError, ConsensusResult},
824        storage::mem_store::MemStore,
825        test_dag_builder::DagBuilder,
826        test_dag_parser::parse_dag,
827        transaction::NoopTransactionVerifier,
828    };
829
830    #[tokio::test]
831    async fn suspend_blocks_with_missing_ancestors() {
832        // GIVEN
833        let (context, _key_pairs) = Context::new_for_test(4);
834        let context = Arc::new(context);
835        let store = Arc::new(MemStore::new());
836        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
837
838        let mut block_manager =
839            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
840
841        // create a DAG
842        let mut dag_builder = DagBuilder::new(context.clone());
843        dag_builder
844            .layers(1..=2) // 2 rounds
845            .authorities(vec![
846                AuthorityIndex::new_for_test(0),
847                AuthorityIndex::new_for_test(2),
848            ]) // Create equivocating blocks for 2 authorities
849            .equivocate(3)
850            .build();
851
852        // Take only the blocks of round 2 and try to accept them
853        let round_2_blocks = dag_builder
854            .blocks
855            .into_iter()
856            .filter_map(|(_, block)| (block.round() == 2).then_some(block))
857            .collect::<Vec<VerifiedBlock>>();
858
859        // WHEN
860        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
861
862        // THEN
863        assert!(accepted_blocks.is_empty());
864
865        // AND the returned missing ancestors should be the same as the provided block
866        // ancestors
867        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
868        let missing_block_refs = missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
869        assert_eq!(missing, missing_block_refs);
870
871        // AND the missing blocks are the parents of the round 2 blocks. Since this is a
872        // fully connected DAG taking the ancestors of the first element
873        // suffices.
874        assert_eq!(block_manager.missing_block_refs(), missing_block_refs);
875
876        // AND suspended blocks should return the round_2_blocks
877        assert_eq!(
878            block_manager.suspended_blocks_refs(),
879            round_2_blocks
880                .into_iter()
881                .map(|block| block.reference())
882                .collect::<BTreeSet<_>>()
883        );
884
885        // AND each missing block should be known to all authorities
886        let known_by_manager = block_manager
887            .missing_blocks()
888            .iter()
889            .next()
890            .expect("We should expect at least two elements there")
891            .1
892            .clone();
893        assert_eq!(
894            known_by_manager,
895            context
896                .committee
897                .authorities()
898                .map(|(a, _)| a)
899                .collect::<BTreeSet<_>>()
900        );
901    }
902
903    #[tokio::test]
904    async fn try_accept_block_returns_missing_blocks() {
905        let (context, _key_pairs) = Context::new_for_test(4);
906        let context = Arc::new(context);
907        let store = Arc::new(MemStore::new());
908        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
909
910        let mut block_manager =
911            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
912
913        // create a DAG
914        let mut dag_builder = DagBuilder::new(context.clone());
915        dag_builder
916            .layers(1..=4) // 4 rounds
917            .authorities(vec![
918                AuthorityIndex::new_for_test(0),
919                AuthorityIndex::new_for_test(2),
920            ]) // Create equivocating blocks for 2 authorities
921            .equivocate(3) // Use 3 equivocations blocks per authority
922            .build();
923
924        // Take the blocks from round 4 up to 2 (included). Only the first block of each
925        // round should return missing ancestors when try to accept
926        for (_, block) in dag_builder
927            .blocks
928            .into_iter()
929            .rev()
930            .take_while(|(_, block)| block.round() >= 2)
931        {
932            // WHEN
933            let (accepted_blocks, missing) = block_manager.try_accept_blocks(vec![block.clone()]);
934
935            // THEN
936            assert!(accepted_blocks.is_empty());
937
938            let block_ancestors = block.ancestors().iter().cloned().collect::<BTreeSet<_>>();
939            assert_eq!(missing, block_ancestors);
940        }
941    }
942
943    #[tokio::test]
944    async fn accept_blocks_with_complete_causal_history() {
945        // GIVEN
946        let (context, _key_pairs) = Context::new_for_test(4);
947        let context = Arc::new(context);
948        let store = Arc::new(MemStore::new());
949        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
950
951        let mut block_manager =
952            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
953
954        // create a DAG of 2 rounds
955        let mut dag_builder = DagBuilder::new(context.clone());
956        dag_builder.layers(1..=2).build();
957
958        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
959
960        // WHEN
961        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
962
963        // THEN
964        assert_eq!(accepted_blocks.len(), 8);
965        assert_eq!(
966            accepted_blocks,
967            all_blocks
968                .iter()
969                .filter(|block| block.round() > 0)
970                .cloned()
971                .collect::<Vec<VerifiedBlock>>()
972        );
973        assert!(missing.is_empty());
974        assert!(block_manager.is_empty());
975
976        // WHEN trying to accept same blocks again, then none will be returned as those
977        // have been already accepted
978        let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
979        assert!(accepted_blocks.is_empty());
980    }
981
982    /// Tests that the block manager accepts blocks when some or all of their
983    /// causal history is below or equal to the GC round.
984    #[tokio::test]
985    async fn accept_blocks_with_causal_history_below_gc_round() {
986        // GIVEN
987        let (mut context, _key_pairs) = Context::new_for_test(4);
988
989        // We set the gc depth to 4
990        context
991            .protocol_config
992            .set_consensus_gc_depth_for_testing(4);
993        let context = Arc::new(context);
994        let store = Arc::new(MemStore::new());
995        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
996
997        // We "fake" the commit for round 10, so we can test the GC round 6
998        // (commit_round - gc_depth = 10 - 4 = 6)
999        let last_commit = TrustedCommit::new_for_test(
1000            10,
1001            CommitDigest::MIN,
1002            context.clock.timestamp_utc_ms(),
1003            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1004            vec![],
1005        );
1006        dag_state.write().set_last_commit(last_commit);
1007        assert_eq!(
1008            dag_state.read().gc_round(),
1009            6,
1010            "GC round should have moved to round 6"
1011        );
1012
1013        let mut block_manager =
1014            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1015
1016        // create a DAG of 10 rounds with some weak links for the blocks of round 9
1017        let dag_str = "DAG {
1018            Round 0 : { 4 },
1019            Round 1 : { * },
1020            Round 2 : { * },
1021            Round 3 : { * },
1022            Round 4 : { * },
1023            Round 5 : { * },
1024            Round 6 : { * },
1025            Round 7 : {
1026                A -> [*],
1027                B -> [*],
1028                C -> [*],
1029            }
1030            Round 8 : {
1031                A -> [*],
1032                B -> [*],
1033                C -> [*],
1034            },
1035            Round 9 : {
1036                A -> [A8, B8, C8, D6],
1037                B -> [A8, B8, C8, D6],
1038                C -> [A8, B8, C8, D6],
1039                D -> [A8, B8, C8, D6],
1040            },
1041            Round 10 : { * },
1042        }";
1043
1044        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1045
1046        // Now take all the blocks for round 7 & 8 , which are above the gc_round = 6.
1047        // All those blocks should eventually be returned as accepted. Pay attention
1048        // that without GC none of those blocks should get accepted.
1049        let blocks_ranges = vec![7..=8 as Round, 9..=10 as Round];
1050
1051        for rounds_range in blocks_ranges {
1052            let all_blocks = dag_builder
1053                .blocks
1054                .values()
1055                .filter(|block| rounds_range.contains(&block.round()))
1056                .cloned()
1057                .collect::<Vec<_>>();
1058
1059            // WHEN
1060            let mut reversed_blocks = all_blocks.clone();
1061            reversed_blocks.sort_by_key(|b| std::cmp::Reverse(b.reference()));
1062            let (mut accepted_blocks, missing) = block_manager.try_accept_blocks(reversed_blocks);
1063            accepted_blocks.sort_by_key(|a| a.reference());
1064
1065            // THEN
1066            assert_eq!(accepted_blocks, all_blocks.to_vec());
1067            assert!(missing.is_empty());
1068            assert!(block_manager.is_empty());
1069
1070            let (accepted_blocks, _) = block_manager.try_accept_blocks(all_blocks);
1071            assert!(accepted_blocks.is_empty());
1072        }
1073    }
1074
1075    /// Blocks that are attempted to be accepted but are <= gc_round they will
1076    /// be skipped for processing. Nothing should be stored or trigger any
1077    /// unsuspension etc.
1078    #[tokio::test]
1079    async fn skip_accepting_blocks_below_gc_round() {
1080        // GIVEN
1081        let (mut context, _key_pairs) = Context::new_for_test(4);
1082        // We set the gc depth to 4
1083        context
1084            .protocol_config
1085            .set_consensus_gc_depth_for_testing(4);
1086        let context = Arc::new(context);
1087        let store = Arc::new(MemStore::new());
1088        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1089
1090        // We "fake" the commit for round 10, so we can test the GC round 6
1091        // (commit_round - gc_depth = 10 - 4 = 6)
1092        let last_commit = TrustedCommit::new_for_test(
1093            10,
1094            CommitDigest::MIN,
1095            context.clock.timestamp_utc_ms(),
1096            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1097            vec![],
1098        );
1099        dag_state.write().set_last_commit(last_commit);
1100        assert_eq!(
1101            dag_state.read().gc_round(),
1102            6,
1103            "GC round should have moved to round 6"
1104        );
1105
1106        let mut block_manager =
1107            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1108
1109        // create a DAG of 6 rounds
1110        let mut dag_builder = DagBuilder::new(context.clone());
1111        dag_builder.layers(1..=6).build();
1112
1113        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1114
1115        // WHEN
1116        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1117
1118        // THEN
1119        assert!(accepted_blocks.is_empty());
1120        assert!(missing.is_empty());
1121        assert!(block_manager.is_empty());
1122    }
1123
1124    /// The test generate blocks for a well connected DAG and feed them to block
1125    /// manager in random order. In the end all the blocks should be
1126    /// uniquely suspended and no missing blocks should exist. The test will run
1127    /// for both gc_enabled/disabled. When gc is enabled we set a high
1128    /// gc_depth value so in practice gc_round will be 0, but we'll be able to
1129    /// test in the common case that this work exactly the same way as when
1130    /// gc is disabled.
1131    #[rstest]
1132    #[tokio::test]
1133    async fn accept_blocks_unsuspend_children_blocks(#[values(false, true)] gc_enabled: bool) {
1134        // GIVEN
1135        let (mut context, _key_pairs) = Context::new_for_test(4);
1136
1137        if gc_enabled {
1138            context
1139                .protocol_config
1140                .set_consensus_gc_depth_for_testing(10);
1141        }
1142        let context = Arc::new(context);
1143
1144        // create a DAG of rounds 1 ~ 3
1145        let mut dag_builder = DagBuilder::new(context.clone());
1146        dag_builder.layers(1..=3).build();
1147
1148        let mut all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1149
1150        // Now randomize the sequence of sending the blocks to block manager. In the end
1151        // all the blocks should be uniquely suspended and no missing blocks
1152        // should exist.
1153        for seed in 0..100u8 {
1154            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1155
1156            let store = Arc::new(MemStore::new());
1157            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1158
1159            let mut block_manager =
1160                BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1161
1162            // WHEN
1163            let mut all_accepted_blocks = vec![];
1164            for block in &all_blocks {
1165                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1166
1167                all_accepted_blocks.extend(accepted_blocks);
1168            }
1169
1170            // THEN
1171            all_accepted_blocks.sort_by_key(|b| b.reference());
1172            all_blocks.sort_by_key(|b| b.reference());
1173
1174            assert_eq!(
1175                all_accepted_blocks, all_blocks,
1176                "Failed acceptance sequence for seed {seed}"
1177            );
1178            assert!(block_manager.is_empty());
1179        }
1180    }
1181
1182    /// Tests that `missing_blocks()` correctly infers the authorities
1183    /// referencing each missing block based on accepted blocks in the DAG.
1184    #[tokio::test]
1185    async fn authorities_that_know_missing_blocks() {
1186        let (context, _key_pairs) = Context::new_for_test(4);
1187
1188        let context = Arc::new(context);
1189
1190        // create a DAG of rounds 1 ~ 3
1191        let mut dag_builder = DagBuilder::new(context.clone());
1192        dag_builder.layers(1..=3).build();
1193
1194        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1195
1196        let blocks_round_2 = all_blocks
1197            .iter()
1198            .filter(|block| block.round() == 2)
1199            .cloned()
1200            .collect::<Vec<_>>();
1201
1202        let blocks_round_1 = all_blocks
1203            .iter()
1204            .filter(|block| block.round() == 1)
1205            .map(|block| block.reference())
1206            .collect::<BTreeSet<_>>();
1207
1208        let store = Arc::new(MemStore::new());
1209        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1210
1211        let mut block_manager =
1212            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1213
1214        let (_, missing_blocks) = block_manager.try_accept_blocks(vec![blocks_round_2[0].clone()]);
1215        // Blocks from round 1 are all missing, since the DAG is fully connected
1216        assert_eq!(missing_blocks, blocks_round_1);
1217
1218        let missing_blocks_with_authorities = block_manager.missing_blocks();
1219
1220        let block_round_1_authority_0 = all_blocks
1221            .iter()
1222            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(0))
1223            .map(|block| block.reference())
1224            .next()
1225            .unwrap();
1226        let block_round_1_authority_1 = all_blocks
1227            .iter()
1228            .filter(|block| block.round() == 1 && block.author() == AuthorityIndex::new_for_test(1))
1229            .map(|block| block.reference())
1230            .next()
1231            .unwrap();
1232        assert_eq!(
1233            missing_blocks_with_authorities[&block_round_1_authority_0],
1234            BTreeSet::from([AuthorityIndex::new_for_test(0)])
1235        );
1236        assert_eq!(
1237            missing_blocks_with_authorities[&block_round_1_authority_1],
1238            BTreeSet::from([
1239                AuthorityIndex::new_for_test(0),
1240                AuthorityIndex::new_for_test(1)
1241            ])
1242        );
1243
1244        // Add a new block from round 2 from authority 1, which updates the set of
1245        // authorities that are aware of the missing blocks
1246        block_manager.try_accept_blocks(vec![blocks_round_2[1].clone()]);
1247        let missing_blocks_with_authorities = block_manager.missing_blocks();
1248        assert_eq!(
1249            missing_blocks_with_authorities[&block_round_1_authority_0],
1250            BTreeSet::from([
1251                AuthorityIndex::new_for_test(0),
1252                AuthorityIndex::new_for_test(1)
1253            ])
1254        );
1255    }
1256
1257    #[rstest]
1258    #[tokio::test]
1259    async fn unsuspend_blocks_for_latest_gc_round(#[values(5, 10, 14)] gc_depth: u32) {
1260        telemetry_subscribers::init_for_testing();
1261        // GIVEN
1262        let (mut context, _key_pairs) = Context::new_for_test(4);
1263
1264        if gc_depth > 0 {
1265            context
1266                .protocol_config
1267                .set_consensus_gc_depth_for_testing(gc_depth);
1268        }
1269        let context = Arc::new(context);
1270
1271        // create a DAG of rounds 1 ~ gc_depth * 2
1272        let mut dag_builder = DagBuilder::new(context.clone());
1273        dag_builder.layers(1..=gc_depth * 2).build();
1274
1275        // Pay attention that we start from round 2. Round 1 will always be missing so
1276        // no matter what we do we can't unsuspend it unless gc_round has
1277        // advanced to round >= 1.
1278        let mut all_blocks = dag_builder
1279            .blocks
1280            .values()
1281            .filter(|block| block.round() > 1)
1282            .cloned()
1283            .collect::<Vec<_>>();
1284
1285        // Now randomize the sequence of sending the blocks to block manager. In the end
1286        // all the blocks should be uniquely suspended and no missing blocks
1287        // should exist.
1288        for seed in 0..100u8 {
1289            all_blocks.shuffle(&mut StdRng::from_seed([seed; 32]));
1290
1291            let store = Arc::new(MemStore::new());
1292            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1293
1294            let mut block_manager = BlockManager::new(
1295                context.clone(),
1296                dag_state.clone(),
1297                Arc::new(NoopBlockVerifier),
1298            );
1299
1300            // WHEN
1301            for block in &all_blocks {
1302                let (accepted_blocks, _) = block_manager.try_accept_blocks(vec![block.clone()]);
1303                assert!(accepted_blocks.is_empty());
1304            }
1305            assert!(!block_manager.is_empty());
1306
1307            // AND also call the try_to_find method with some non existing block refs. Those
1308            // should be cleaned up as well once GC kicks in.
1309            let non_existing_refs = (1..=3)
1310                .map(|round| {
1311                    BlockRef::new(round, AuthorityIndex::new_for_test(0), BlockDigest::MIN)
1312                })
1313                .collect::<Vec<_>>();
1314            assert_eq!(block_manager.try_find_blocks(non_existing_refs).len(), 3);
1315
1316            // AND
1317            // Trigger a commit which will advance GC round
1318            let last_commit = TrustedCommit::new_for_test(
1319                gc_depth * 2,
1320                CommitDigest::MIN,
1321                context.clock.timestamp_utc_ms(),
1322                BlockRef::new(
1323                    gc_depth * 2,
1324                    AuthorityIndex::new_for_test(0),
1325                    BlockDigest::MIN,
1326                ),
1327                vec![],
1328            );
1329            dag_state.write().set_last_commit(last_commit);
1330
1331            // AND
1332            block_manager.try_unsuspend_blocks_for_latest_gc_round();
1333
1334            // THEN
1335            assert!(block_manager.is_empty());
1336
1337            // AND ensure that all have been accepted to the DAG
1338            for block in &all_blocks {
1339                assert!(dag_state.read().contains_block(&block.reference()));
1340            }
1341        }
1342    }
1343
1344    #[rstest]
1345    #[tokio::test]
1346    async fn try_accept_committed_blocks() {
1347        // GIVEN
1348        let (mut context, _key_pairs) = Context::new_for_test(4);
1349        // We set the gc depth to 4
1350        context
1351            .protocol_config
1352            .set_consensus_gc_depth_for_testing(4);
1353        let context = Arc::new(context);
1354        let store = Arc::new(MemStore::new());
1355        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1356
1357        // We "fake" the commit for round 6, so GC round moves to (commit_round -
1358        // gc_depth = 6 - 4 = 2)
1359        let last_commit = TrustedCommit::new_for_test(
1360            10,
1361            CommitDigest::MIN,
1362            context.clock.timestamp_utc_ms(),
1363            BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1364            vec![],
1365        );
1366        dag_state.write().set_last_commit(last_commit);
1367        assert_eq!(
1368            dag_state.read().gc_round(),
1369            2,
1370            "GC round should have moved to round 2"
1371        );
1372
1373        let mut block_manager =
1374            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1375
1376        // create a DAG of 12 rounds
1377        let mut dag_builder = DagBuilder::new(context.clone());
1378        dag_builder.layers(1..=12).build();
1379
1380        // Now try to accept via the normal acceptance block path the blocks of rounds 7
1381        // ~ 12. None of them should be accepted
1382        let blocks = dag_builder.blocks(7..=12);
1383        let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
1384        assert!(accepted_blocks.is_empty());
1385        assert_eq!(missing.len(), 4);
1386
1387        // Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6.
1388        // All of them should be accepted and also the blocks of rounds 7 ~ 12
1389        // should be unsuspended and accepted as well.
1390        let blocks = dag_builder.blocks(3..=6);
1391
1392        // WHEN
1393        let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);
1394
1395        // THEN
1396        accepted_blocks.sort_by_key(|b| b.reference());
1397
1398        let mut all_blocks = dag_builder.blocks(3..=12);
1399        all_blocks.sort_by_key(|b| b.reference());
1400
1401        assert_eq!(accepted_blocks, all_blocks);
1402        assert!(block_manager.is_empty());
1403    }
1404
1405    struct TestBlockVerifier {
1406        fail: BTreeSet<BlockRef>,
1407    }
1408
1409    impl TestBlockVerifier {
1410        fn new(fail: BTreeSet<BlockRef>) -> Self {
1411            Self { fail }
1412        }
1413    }
1414
1415    impl BlockVerifier for TestBlockVerifier {
1416        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1417            Ok(())
1418        }
1419
1420        fn check_ancestors(
1421            &self,
1422            block: &VerifiedBlock,
1423            _ancestors: &[Option<VerifiedBlock>],
1424            _gc_enabled: bool,
1425            _gc_round: Round,
1426        ) -> ConsensusResult<()> {
1427            if self.fail.contains(&block.reference()) {
1428                Err(ConsensusError::InvalidBlockTimestamp {
1429                    max_timestamp_ms: 0,
1430                    block_timestamp_ms: block.timestamp_ms(),
1431                })
1432            } else {
1433                Ok(())
1434            }
1435        }
1436    }
1437
1438    #[tokio::test]
1439    async fn reject_blocks_failing_verifications() {
1440        let (mut context, _key_pairs) = Context::new_for_test(4);
1441        context
1442            .protocol_config
1443            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
1444        let context = Arc::new(context);
1445
1446        // create a DAG of rounds 1 ~ 5.
1447        let mut dag_builder = DagBuilder::new(context.clone());
1448        dag_builder.layers(1..=5).build();
1449
1450        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1451
1452        // Create a test verifier that fails the blocks of round 3
1453        let test_verifier = TestBlockVerifier::new(
1454            all_blocks
1455                .iter()
1456                .filter(|block| block.round() == 3)
1457                .map(|block| block.reference())
1458                .collect(),
1459        );
1460
1461        // Create BlockManager.
1462        let store = Arc::new(MemStore::new());
1463        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1464        let mut block_manager =
1465            BlockManager::new(context.clone(), dag_state, Arc::new(test_verifier));
1466
1467        // Try to accept blocks from round 2 ~ 5 into block manager. All of them should
1468        // be suspended.
1469        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1470            all_blocks
1471                .iter()
1472                .filter(|block| block.round() > 1)
1473                .cloned()
1474                .collect(),
1475        );
1476
1477        // Missing refs should all come from round 1.
1478        assert!(accepted_blocks.is_empty());
1479        assert_eq!(missing_refs.len(), 4);
1480        missing_refs.iter().for_each(|missing_ref| {
1481            assert_eq!(missing_ref.round, 1);
1482        });
1483
1484        // Now add round 1 blocks into block manager.
1485        let (accepted_blocks, missing_refs) = block_manager.try_accept_blocks(
1486            all_blocks
1487                .iter()
1488                .filter(|block| block.round() == 1)
1489                .cloned()
1490                .collect(),
1491        );
1492
1493        // Only round 1 and round 2 blocks should be accepted.
1494        assert_eq!(accepted_blocks.len(), 8);
1495        accepted_blocks.iter().for_each(|block| {
1496            assert!(block.round() <= 2);
1497        });
1498        assert!(missing_refs.is_empty());
1499
1500        // Other blocks should be rejected and there should be no remaining suspended
1501        // block.
1502        assert!(block_manager.suspended_blocks_refs().is_empty());
1503    }
1504
1505    #[tokio::test]
1506    async fn try_find_blocks() {
1507        // GIVEN
1508        let (context, _key_pairs) = Context::new_for_test(4);
1509        let context = Arc::new(context);
1510        let store = Arc::new(MemStore::new());
1511        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1512
1513        let mut block_manager =
1514            BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));
1515
1516        // create a DAG
1517        let mut dag_builder = DagBuilder::new(context.clone());
1518        dag_builder
1519            .layers(1..=2) // 2 rounds
1520            .authorities(vec![
1521                AuthorityIndex::new_for_test(0),
1522                AuthorityIndex::new_for_test(2),
1523            ]) // Create equivocating blocks for 2 authorities
1524            .equivocate(3)
1525            .build();
1526
1527        // Take only the blocks of round 2 and try to accept them
1528        let round_2_blocks = dag_builder
1529            .blocks
1530            .iter()
1531            .filter_map(|(_, block)| (block.round() == 2).then_some(block.clone()))
1532            .collect::<Vec<VerifiedBlock>>();
1533
1534        // All blocks should be missing
1535        let missing_block_refs_from_find =
1536            block_manager.try_find_blocks(round_2_blocks.iter().map(|b| b.reference()).collect());
1537        assert_eq!(missing_block_refs_from_find.len(), 10);
1538        assert!(
1539            missing_block_refs_from_find
1540                .iter()
1541                .all(|block_ref| block_ref.round == 2)
1542        );
1543
1544        // Try accept blocks which will cause blocks to be suspended and added to
1545        // missing in block manager.
1546        let (accepted_blocks, missing) = block_manager.try_accept_blocks(round_2_blocks.clone());
1547        assert!(accepted_blocks.is_empty());
1548
1549        let missing_block_refs = round_2_blocks.first().unwrap().ancestors();
1550        let missing_block_refs_from_accept =
1551            missing_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1552        assert_eq!(missing, missing_block_refs_from_accept);
1553        assert_eq!(
1554            block_manager.missing_block_refs(),
1555            missing_block_refs_from_accept
1556        );
1557
1558        // No blocks should be accepted and block manager should have made note
1559        // of the missing & suspended blocks.
1560        // Now we can check get the result of try find block with all of the blocks
1561        // from newly created but not accepted round 3.
1562        dag_builder.layer(3).build();
1563
1564        let round_3_blocks = dag_builder
1565            .blocks
1566            .iter()
1567            .filter_map(|(_, block)| (block.round() == 3).then_some(block.reference()))
1568            .collect::<Vec<BlockRef>>();
1569
1570        let missing_block_refs_from_find = block_manager.try_find_blocks(
1571            round_2_blocks
1572                .iter()
1573                .map(|b| b.reference())
1574                .chain(round_3_blocks.into_iter())
1575                .collect(),
1576        );
1577
1578        assert_eq!(missing_block_refs_from_find.len(), 4);
1579        assert!(
1580            missing_block_refs_from_find
1581                .iter()
1582                .all(|block_ref| block_ref.round == 3)
1583        );
1584        assert_eq!(
1585            block_manager.missing_block_refs(),
1586            missing_block_refs_from_accept
1587                .into_iter()
1588                .chain(missing_block_refs_from_find.into_iter())
1589                .collect()
1590        );
1591    }
1592
1593    #[rstest]
1594    #[tokio::test]
1595    async fn test_verify_block_timestamps_and_accept(
1596        #[values(false, true)] median_based_timestamp: bool,
1597    ) {
1598        telemetry_subscribers::init_for_testing();
1599        let (mut context, _key_pairs) = Context::new_for_test(4);
1600        context
1601            .protocol_config
1602            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1603                median_based_timestamp,
1604            );
1605
1606        let context = Arc::new(context);
1607        let store = Arc::new(MemStore::new());
1608        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1609
1610        let mut block_manager = BlockManager::new(
1611            context.clone(),
1612            dag_state.clone(),
1613            Arc::new(SignedBlockVerifier::new(
1614                context.clone(),
1615                Arc::new(NoopTransactionVerifier {}),
1616            )),
1617        );
1618
1619        // create a DAG where authority 0 timestamp is always higher than the others.
1620        let mut dag_builder = DagBuilder::new(context.clone());
1621        let authorities = context
1622            .committee
1623            .authorities()
1624            .map(|(index, _)| index)
1625            .collect::<Vec<_>>();
1626        dag_builder
1627            .layers(1..=1)
1628            .authorities(authorities.clone())
1629            .with_timestamps(vec![1000, 500, 550, 580])
1630            .build();
1631        dag_builder
1632            .layers(2..=2)
1633            .authorities(authorities.clone())
1634            .with_timestamps(vec![2000, 600, 650, 680])
1635            .build();
1636        dag_builder
1637            .layers(3..=3)
1638            .authorities(authorities)
1639            .with_timestamps(vec![3000, 700, 750, 780])
1640            .build();
1641
1642        // take all the blocks and try to accept them.
1643        let all_blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
1644
1645        // All blocks should get accepted
1646        let (accepted_blocks, missing) = block_manager.try_accept_blocks(all_blocks.clone());
1647
1648        if median_based_timestamp {
1649            // If the median based timestamp is enabled then all the blocks should be
1650            // accepted
1651            assert_eq!(all_blocks, accepted_blocks);
1652            assert!(missing.is_empty());
1653        } else {
1654            // only the blocks of first round will be accepted (and the block of round 2 for
1655            // authority 0) as the rest will be rejected
1656            assert_eq!(accepted_blocks.len(), 5);
1657            for block in accepted_blocks {
1658                if block.author() == AuthorityIndex::new_for_test(0) {
1659                    assert!(block.round() == 1 || block.round() == 2);
1660                } else {
1661                    assert_eq!(block.round(), 1);
1662                }
1663            }
1664        }
1665    }
1666}