consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::max,
7    collections::{BTreeMap, BTreeSet, VecDeque},
8    ops::Bound::{Excluded, Included, Unbounded},
9    panic,
10    sync::Arc,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, info, trace, warn};
18
19use crate::{
20    CommittedSubDag,
21    block::{
22        BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23        VerifiedBlock, genesis_blocks,
24    },
25    commit::{
26        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28    },
29    context::Context,
30    leader_scoring::{ReputationScores, ScoringSubdag},
31    storage::{Store, WriteBatch},
32    threshold_clock::ThresholdClock,
33};
34
35/// DagState provides the API to write and read accepted blocks from the DAG.
36/// Only uncommitted and last committed blocks are cached in memory.
37/// The rest of blocks are stored on disk.
38/// Refs to cached blocks and additional refs are cached as well, to speed up
39/// existence checks.
40///
41/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
42/// concurrent access from multiple components.
43pub(crate) struct DagState {
44    context: Arc<Context>,
45
46    // The genesis blocks
47    genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
50    // Note: all uncommitted blocks are kept in memory.
51    //
52    // When GC is enabled, this map has a different semantic. It holds all the recent data for each
53    // authority making sure that it always have available CACHED_ROUNDS worth of data. The
54    // entries are evicted based on the latest GC round, however the eviction process will respect
55    // the CACHED_ROUNDS. For each authority, blocks are only evicted when their round is less
56    // than or equal to both `gc_round`, and `highest authority round - cached rounds`.
57    // This ensures that the GC requirements are respected (we never clean up any block above
58    // `gc_round`), and there are enough blocks cached.
59    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61    // Indexes recent block refs by their authorities.
62    // Vec position corresponds to the authority index.
63    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65    // Keeps track of the threshold clock for proposing blocks.
66    threshold_clock: ThresholdClock,
67
68    // Keeps track of the highest round that has been evicted for each authority. Any blocks that
69    // are of round <= evict_round should be considered evicted, and if any exist we should not
70    // consider the causauly complete in the order they appear. The `evicted_rounds` size
71    // should be the same as the committee size.
72    evicted_rounds: Vec<Round>,
73
74    // Highest round of blocks accepted.
75    highest_accepted_round: Round,
76
77    // Last consensus commit of the dag.
78    last_commit: Option<TrustedCommit>,
79
80    // Last wall time when commit round advanced. Does not persist across restarts.
81    last_commit_round_advancement_time: Option<std::time::Instant>,
82
83    // Last committed rounds per authority.
84    last_committed_rounds: Vec<Round>,
85
86    /// The committed subdags that have been scored but scores have not been
87    /// used for leader schedule yet.
88    scoring_subdag: ScoringSubdag,
89    // TODO: Remove when DistributedVoteScoring is enabled.
90    /// The list of committed subdags that have been sequenced by the universal
91    /// committer but have yet to be used to calculate reputation scores for the
92    /// next leader schedule. Until then we consider it as "unscored" subdags.
93    unscored_committed_subdags: Vec<CommittedSubDag>,
94
95    // Commit votes pending to be included in new blocks.
96    // TODO: limit to 1st commit per round with multi-leader.
97    pending_commit_votes: VecDeque<CommitVote>,
98
99    // Data to be flushed to storage.
100    blocks_to_write: Vec<VerifiedBlock>,
101    commits_to_write: Vec<TrustedCommit>,
102
103    // Buffer the reputation scores & last_committed_rounds to be flushed with the
104    // next dag state flush. This is okay because we can recover reputation scores
105    // & last_committed_rounds from the commits as needed.
106    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108    // Persistent storage for blocks, commits and other consensus data.
109    store: Arc<dyn Store>,
110
111    // The number of cached rounds
112    cached_rounds: Round,
113}
114
115impl DagState {
116    /// Initializes DagState from storage.
117    pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119        let num_authorities = context.committee.size();
120
121        let genesis = genesis_blocks(&context)
122            .into_iter()
123            .map(|block| (block.reference(), block))
124            .collect();
125
126        let threshold_clock = ThresholdClock::new(1, context.clone());
127
128        let last_commit = store
129            .read_last_commit()
130            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132        let commit_info = store
133            .read_last_commit_info()
134            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135        let (mut last_committed_rounds, commit_recovery_start_index) =
136            if let Some((commit_ref, commit_info)) = commit_info {
137                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138                (commit_info.committed_rounds, commit_ref.index + 1)
139            } else {
140                tracing::info!("Found no stored CommitInfo to recover from");
141                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142            };
143
144        let mut unscored_committed_subdags = Vec::new();
145        let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147        if let Some(last_commit) = last_commit.as_ref() {
148            store
149                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150                .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151                .iter()
152                .for_each(|commit| {
153                    for block_ref in commit.blocks() {
154                        last_committed_rounds[block_ref.author] =
155                            max(last_committed_rounds[block_ref.author], block_ref.round);
156                    }
157
158                    let committed_subdag =
159                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); // We don't need to recover reputation scores for unscored_committed_subdags
160                    unscored_committed_subdags.push(committed_subdag);
161                });
162        }
163
164        tracing::info!(
165            "DagState was initialized with the following state: \
166            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167            unscored_committed_subdags.len()
168        );
169
170        if context
171            .protocol_config
172            .consensus_distributed_vote_scoring_strategy()
173        {
174            scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175        }
176
177        let mut state = Self {
178            context,
179            genesis,
180            recent_blocks: BTreeMap::new(),
181            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182            threshold_clock,
183            highest_accepted_round: 0,
184            last_commit: last_commit.clone(),
185            last_commit_round_advancement_time: None,
186            last_committed_rounds: last_committed_rounds.clone(),
187            pending_commit_votes: VecDeque::new(),
188            blocks_to_write: vec![],
189            commits_to_write: vec![],
190            commit_info_to_write: vec![],
191            scoring_subdag,
192            unscored_committed_subdags,
193            store: store.clone(),
194            cached_rounds,
195            evicted_rounds: vec![0; num_authorities],
196        };
197
198        for (i, round) in last_committed_rounds.into_iter().enumerate() {
199            let authority_index = state.context.committee.to_authority_index(i).unwrap();
200            let (blocks, eviction_round) = if state.gc_enabled() {
201                // Find the latest block for the authority to calculate the eviction round. Then
202                // we want to scan and load the blocks from the eviction round and onwards only.
203                // As reminder, the eviction round is taking into account the gc_round.
204                let last_block = state
205                    .store
206                    .scan_last_blocks_by_author(authority_index, 1, None)
207                    .expect("Database error");
208                let last_block_round = last_block
209                    .last()
210                    .map(|b| b.round())
211                    .unwrap_or(GENESIS_ROUND);
212
213                let eviction_round = Self::gc_eviction_round(
214                    last_block_round,
215                    state.gc_round(),
216                    state.cached_rounds,
217                );
218                let blocks = state
219                    .store
220                    .scan_blocks_by_author(authority_index, eviction_round + 1)
221                    .expect("Database error");
222                (blocks, eviction_round)
223            } else {
224                let eviction_round = Self::eviction_round(round, cached_rounds);
225                let blocks = state
226                    .store
227                    .scan_blocks_by_author(authority_index, eviction_round + 1)
228                    .expect("Database error");
229                (blocks, eviction_round)
230            };
231
232            state.evicted_rounds[authority_index] = eviction_round;
233
234            // Update the block metadata for the authority.
235            for block in &blocks {
236                state.update_block_metadata(block);
237            }
238
239            info!(
240                "Recovered blocks {}: {:?}",
241                authority_index,
242                blocks
243                    .iter()
244                    .map(|b| b.reference())
245                    .collect::<Vec<BlockRef>>()
246            );
247        }
248
249        // Initialize scoring metrics according to the metrics in store and the blocks
250        // that were loaded to cache.
251        let recovered_scoring_metrics = state.store.scan_scoring_metrics().expect("Database error");
252        state
253            .context
254            .scoring_metrics_store
255            .initialize_scoring_metrics(
256                recovered_scoring_metrics,
257                &state.recent_refs_by_authority,
258                state.threshold_clock_round(),
259                &state.evicted_rounds,
260                state.context.clone(),
261            );
262
263        if state.gc_enabled() {
264            if let Some(last_commit) = last_commit {
265                let mut index = last_commit.index();
266                let gc_round = state.gc_round();
267                info!(
268                    "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
269                    index, gc_round
270                );
271
272                loop {
273                    let commits = store
274                        .scan_commits((index..=index).into())
275                        .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
276                    let Some(commit) = commits.first() else {
277                        info!(
278                            "Recovering finished up to index {index}, no more commits to recover"
279                        );
280                        break;
281                    };
282
283                    // Check the commit leader round to see if it is within the gc_round. If it is
284                    // not then we can stop the recovery process.
285                    if gc_round > 0 && commit.leader().round <= gc_round {
286                        info!(
287                            "Recovering finished, reached commit leader round {} <= gc_round {}",
288                            commit.leader().round,
289                            gc_round
290                        );
291                        break;
292                    }
293
294                    commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
295                        debug!(
296                            "Setting block {:?} as committed based on commit {:?}",
297                            block_ref,
298                            commit.index()
299                        );
300                        assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
301                    });
302
303                    // All commits are indexed starting from 1, so one reach zero exit.
304                    index = index.saturating_sub(1);
305                    if index == 0 {
306                        break;
307                    }
308                }
309            }
310        }
311
312        state
313    }
314
315    /// Accepts a block into DagState and keeps it in memory.
316    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
317        assert_ne!(
318            block.round(),
319            0,
320            "Genesis block should not be accepted into DAG."
321        );
322
323        let block_ref = block.reference();
324        if self.contains_block(&block_ref) {
325            return;
326        }
327
328        let now = self.context.clock.timestamp_utc_ms();
329        if block.timestamp_ms() > now {
330            if self
331                .context
332                .protocol_config
333                .consensus_median_timestamp_with_checkpoint_enforcement()
334            {
335                trace!(
336                    "Block {:?} with timestamp {} is greater than local timestamp {}.",
337                    block,
338                    block.timestamp_ms(),
339                    now,
340                );
341            } else {
342                panic!(
343                    "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
344                    block,
345                    block.timestamp_ms(),
346                    now,
347                );
348            }
349        }
350        let hostname = &self.context.committee.authority(block_ref.author).hostname;
351        self.context
352            .metrics
353            .node_metrics
354            .accepted_block_time_drift_ms
355            .with_label_values(&[hostname])
356            .inc_by(block.timestamp_ms().saturating_sub(now));
357
358        // TODO: Move this check to core
359        // Ensure we don't write multiple blocks per slot for our own index
360        if block_ref.author == self.context.own_index {
361            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
362            assert!(
363                existing_blocks.is_empty(),
364                "Block Rejected! Attempted to add block {block:#?} to own slot where \
365                block(s) {existing_blocks:#?} already exists."
366            );
367        }
368        self.update_block_metadata(&block);
369        self.blocks_to_write.push(block);
370        let source = if self.context.own_index == block_ref.author {
371            "own"
372        } else {
373            "others"
374        };
375        self.context
376            .metrics
377            .node_metrics
378            .accepted_blocks
379            .with_label_values(&[source])
380            .inc();
381    }
382
383    /// Updates internal metadata for a block.
384    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
385        let block_ref = block.reference();
386        self.recent_blocks
387            .insert(block_ref, BlockInfo::new(block.clone()));
388        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
389        self.threshold_clock.add_block(block_ref);
390        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
391        self.context
392            .metrics
393            .node_metrics
394            .highest_accepted_round
395            .set(self.highest_accepted_round as i64);
396
397        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
398            .last()
399            .map(|block_ref| block_ref.round)
400            .expect("There should be by now at least one block ref");
401        let hostname = &self.context.committee.authority(block_ref.author).hostname;
402        self.context
403            .metrics
404            .node_metrics
405            .highest_accepted_authority_round
406            .with_label_values(&[hostname])
407            .set(highest_accepted_round_for_author as i64);
408    }
409
410    /// Accepts a blocks into DagState and keeps it in memory.
411    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
412        debug!(
413            "Accepting blocks: {}",
414            blocks.iter().map(|b| b.reference().to_string()).join(",")
415        );
416        for block in blocks {
417            self.accept_block(block);
418        }
419    }
420
421    /// Gets a block by checking cached recent blocks then storage.
422    /// Returns None when the block is not found.
423    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
424        self.get_blocks(&[*reference])
425            .pop()
426            .expect("Exactly one element should be returned")
427    }
428
429    /// Gets blocks by checking genesis, cached recent blocks in memory, then
430    /// storage. An element is None when the corresponding block is not
431    /// found.
432    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
433        let mut blocks = vec![None; block_refs.len()];
434        let mut missing = Vec::new();
435
436        for (index, block_ref) in block_refs.iter().enumerate() {
437            if block_ref.round == GENESIS_ROUND {
438                // Allow the caller to handle the invalid genesis ancestor error.
439                if let Some(block) = self.genesis.get(block_ref) {
440                    blocks[index] = Some(block.clone());
441                }
442                continue;
443            }
444            if let Some(block_info) = self.recent_blocks.get(block_ref) {
445                blocks[index] = Some(block_info.block.clone());
446                continue;
447            }
448            missing.push((index, block_ref));
449        }
450
451        if missing.is_empty() {
452            return blocks;
453        }
454
455        let missing_refs = missing
456            .iter()
457            .map(|(_, block_ref)| **block_ref)
458            .collect::<Vec<_>>();
459        let store_results = self
460            .store
461            .read_blocks(&missing_refs)
462            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
463        self.context
464            .metrics
465            .node_metrics
466            .dag_state_store_read_count
467            .with_label_values(&["get_blocks"])
468            .inc();
469
470        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
471            blocks[index] = result;
472        }
473
474        blocks
475    }
476
477    // Sets the block as committed in the cache. If the block is set as committed
478    // for first time, then true is returned, otherwise false is returned instead.
479    // Method will panic if the block is not found in the cache.
480    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
481        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
482            if !block_info.committed {
483                block_info.committed = true;
484                return true;
485            }
486            false
487        } else {
488            panic!("Block {block_ref:?} not found in cache to set as committed.");
489        }
490    }
491
492    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
493        self.recent_blocks
494            .get(block_ref)
495            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
496            .committed
497    }
498
499    /// Gets all uncommitted blocks in a slot.
500    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
501    /// checked.
502    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
503        // TODO: either panic below when the slot is at or below the last committed
504        // round, or support reading from storage while limiting storage reads
505        // to edge cases.
506
507        let mut blocks = vec![];
508        for (_block_ref, block_info) in self.recent_blocks.range((
509            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
510            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
511        )) {
512            blocks.push(block_info.block.clone())
513        }
514        blocks
515    }
516
517    /// Gets all uncommitted blocks in a round.
518    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
519    /// checked.
520    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
521        if round <= self.last_commit_round() {
522            panic!("Round {round} have committed blocks!");
523        }
524
525        let mut blocks = vec![];
526        for (_block_ref, block_info) in self.recent_blocks.range((
527            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
528            Excluded(BlockRef::new(
529                round + 1,
530                AuthorityIndex::ZERO,
531                BlockDigest::MIN,
532            )),
533        )) {
534            blocks.push(block_info.block.clone())
535        }
536        blocks
537    }
538
539    /// Gets all ancestors in the history of a block at a certain round.
540    pub(crate) fn ancestors_at_round(
541        &self,
542        later_block: &VerifiedBlock,
543        earlier_round: Round,
544    ) -> Vec<VerifiedBlock> {
545        // Iterate through ancestors of later_block in round descending order.
546        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
547        while !linked.is_empty() {
548            let round = linked.last().unwrap().round;
549            // Stop after finishing traversal for ancestors above earlier_round.
550            if round <= earlier_round {
551                break;
552            }
553            let block_ref = linked.pop_last().unwrap();
554            let Some(block) = self.get_block(&block_ref) else {
555                panic!("Block {block_ref:?} should exist in DAG!");
556            };
557            linked.extend(block.ancestors().iter().cloned());
558        }
559        linked
560            .range((
561                Included(BlockRef::new(
562                    earlier_round,
563                    AuthorityIndex::ZERO,
564                    BlockDigest::MIN,
565                )),
566                Unbounded,
567            ))
568            .map(|r| {
569                self.get_block(r)
570                    .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
571                    .clone()
572            })
573            .collect()
574    }
575
576    /// Gets the last proposed block from this authority.
577    /// If no block is proposed yet, returns the genesis block.
578    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
579        self.get_last_block_for_authority(self.context.own_index)
580    }
581
582    /// Retrieves the last accepted block from the specified `authority`. If no
583    /// block is found in cache then the genesis block is returned as no other
584    /// block has been received from that authority.
585    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
586        if let Some(last) = self.recent_refs_by_authority[authority].last() {
587            return self
588                .recent_blocks
589                .get(last)
590                .expect("Block should be found in recent blocks")
591                .block
592                .clone();
593        }
594
595        // if none exists, then fallback to genesis
596        let (_, genesis_block) = self
597            .genesis
598            .iter()
599            .find(|(block_ref, _)| block_ref.author == authority)
600            .expect("Genesis should be found for authority {authority_index}");
601        genesis_block.clone()
602    }
603
604    /// Returns cached recent blocks from the specified authority.
605    /// Blocks returned are limited to round >= `start`, and cached.
606    /// NOTE: caller should not assume returned blocks are always chained.
607    /// "Disconnected" blocks can be returned when there are byzantine blocks,
608    /// or a previously evicted block is accepted again.
609    pub(crate) fn get_cached_blocks(
610        &self,
611        authority: AuthorityIndex,
612        start: Round,
613    ) -> Vec<VerifiedBlock> {
614        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
615    }
616
617    // Retrieves the cached block within the range [start_round, end_round) from a
618    // given authority, limited in total number of blocks.
619    pub(crate) fn get_cached_blocks_in_range(
620        &self,
621        authority: AuthorityIndex,
622        start_round: Round,
623        end_round: Round,
624        limit: usize,
625    ) -> Vec<VerifiedBlock> {
626        if start_round >= end_round || limit == 0 {
627            return vec![];
628        }
629
630        let mut blocks = vec![];
631        for block_ref in self.recent_refs_by_authority[authority].range((
632            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
633            Excluded(BlockRef::new(
634                end_round,
635                AuthorityIndex::MIN,
636                BlockDigest::MIN,
637            )),
638        )) {
639            let block_info = self
640                .recent_blocks
641                .get(block_ref)
642                .expect("Block should exist in recent blocks");
643            blocks.push(block_info.block.clone());
644            if blocks.len() >= limit {
645                break;
646            }
647        }
648        blocks
649    }
650
651    // Retrieves the cached block within the range [start_round, end_round) from a
652    // given authority. NOTE: end_round must be greater than GENESIS_ROUND.
653    pub(crate) fn get_last_cached_block_in_range(
654        &self,
655        authority: AuthorityIndex,
656        start_round: Round,
657        end_round: Round,
658    ) -> Option<VerifiedBlock> {
659        if start_round >= end_round {
660            return None;
661        }
662
663        let block_ref = self.recent_refs_by_authority[authority]
664            .range((
665                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
666                Excluded(BlockRef::new(
667                    end_round,
668                    AuthorityIndex::MIN,
669                    BlockDigest::MIN,
670                )),
671            ))
672            .last()?;
673
674        self.recent_blocks
675            .get(block_ref)
676            .map(|block_info| block_info.block.clone())
677    }
678
679    /// Returns the last block proposed per authority with `evicted round <
680    /// round < end_round`. The method is guaranteed to return results only
681    /// when the `end_round` is not earlier of the available cached data for
682    /// each authority (evicted round + 1), otherwise the method will panic.
683    /// It's the caller's responsibility to ensure that is not requesting for
684    /// earlier rounds. In case of equivocation for an authority's last
685    /// slot, one block will be returned (the last in order) and the other
686    /// equivocating blocks will be returned.
687    pub(crate) fn get_last_cached_block_per_authority(
688        &self,
689        end_round: Round,
690    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
691        // Initialize with the genesis blocks as fallback
692        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
693        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
694
695        if end_round == GENESIS_ROUND {
696            panic!(
697                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
698            );
699        }
700
701        if end_round == GENESIS_ROUND + 1 {
702            return blocks.into_iter().map(|b| (b, vec![])).collect();
703        }
704
705        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
706            let authority_index = self
707                .context
708                .committee
709                .to_authority_index(authority_index)
710                .unwrap();
711
712            let last_evicted_round = self.evicted_rounds[authority_index];
713            if end_round.saturating_sub(1) <= last_evicted_round {
714                panic!(
715                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
716                );
717            }
718
719            let block_ref_iter = block_refs
720                .range((
721                    Included(BlockRef::new(
722                        last_evicted_round + 1,
723                        authority_index,
724                        BlockDigest::MIN,
725                    )),
726                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
727                ))
728                .rev();
729
730            let mut last_round = 0;
731            for block_ref in block_ref_iter {
732                if last_round == 0 {
733                    last_round = block_ref.round;
734                    let block_info = self
735                        .recent_blocks
736                        .get(block_ref)
737                        .expect("Block should exist in recent blocks");
738                    blocks[authority_index] = block_info.block.clone();
739                    continue;
740                }
741                if block_ref.round < last_round {
742                    break;
743                }
744                equivocating_blocks[authority_index].push(*block_ref);
745            }
746        }
747
748        blocks.into_iter().zip(equivocating_blocks).collect()
749    }
750
751    /// Checks whether a block exists in the slot. The method checks only
752    /// against the cached data. If the user asks for a slot that is not
753    /// within the cached data then a panic is thrown.
754    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
755        // Always return true for genesis slots.
756        if slot.round == GENESIS_ROUND {
757            return true;
758        }
759
760        let eviction_round = self.evicted_rounds[slot.authority];
761        if slot.round <= eviction_round {
762            panic!(
763                "{}",
764                format!(
765                    "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
766                    if self.gc_enabled() { " gc " } else { " " }
767                )
768            );
769        }
770
771        let mut result = self.recent_refs_by_authority[slot.authority].range((
772            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
773            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
774        ));
775        result.next().is_some()
776    }
777
778    /// Checks whether the required blocks are in cache, if exist, or otherwise
779    /// will check in store. The method is not caching back the results, so
780    /// its expensive if keep asking for cache missing blocks.
781    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
782        let mut exist = vec![false; block_refs.len()];
783        let mut missing = Vec::new();
784
785        for (index, block_ref) in block_refs.into_iter().enumerate() {
786            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
787            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
788                exist[index] = true;
789            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
790            {
791                // Optimization: recent_refs contain the most recent blocks known to this
792                // authority. If a block ref is not found there and has a higher
793                // round, it definitely is missing from this authority and there
794                // is no need to check disk.
795                exist[index] = false;
796            } else {
797                missing.push((index, block_ref));
798            }
799        }
800
801        if missing.is_empty() {
802            return exist;
803        }
804
805        let missing_refs = missing
806            .iter()
807            .map(|(_, block_ref)| *block_ref)
808            .collect::<Vec<_>>();
809        let store_results = self
810            .store
811            .contains_blocks(&missing_refs)
812            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
813        self.context
814            .metrics
815            .node_metrics
816            .dag_state_store_read_count
817            .with_label_values(&["contains_blocks"])
818            .inc();
819
820        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
821            exist[index] = result;
822        }
823
824        exist
825    }
826
827    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
828        let blocks = self.contains_blocks(vec![*block_ref]);
829        blocks.first().cloned().unwrap()
830    }
831
832    pub(crate) fn threshold_clock_round(&self) -> Round {
833        self.threshold_clock.get_round()
834    }
835
836    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
837        self.threshold_clock.get_quorum_ts()
838    }
839
840    pub(crate) fn highest_accepted_round(&self) -> Round {
841        self.highest_accepted_round
842    }
843
844    // Buffers a new commit in memory and updates last committed rounds.
845    // REQUIRED: must not skip over any commit index.
846    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
847        let time_diff = if let Some(last_commit) = &self.last_commit {
848            if commit.index() <= last_commit.index() {
849                warn!(
850                    "New commit index {} <= last commit index {}!",
851                    commit.index(),
852                    last_commit.index()
853                ); // This could happen in case of fast commit syncer downloading transactions from last solid commit (not pending).
854                return;
855            }
856            assert_eq!(commit.index(), last_commit.index() + 1);
857
858            if commit.timestamp_ms() < last_commit.timestamp_ms() {
859                panic!(
860                    "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
861                );
862            }
863            commit
864                .timestamp_ms()
865                .saturating_sub(last_commit.timestamp_ms())
866        } else {
867            assert_eq!(commit.index(), 1);
868            0
869        };
870
871        self.context
872            .metrics
873            .node_metrics
874            .last_commit_time_diff
875            .observe(time_diff as f64);
876
877        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
878            previous_commit.round() < commit.round()
879        } else {
880            true
881        };
882
883        self.last_commit = Some(commit.clone());
884
885        if commit_round_advanced {
886            let now = std::time::Instant::now();
887            if let Some(previous_time) = self.last_commit_round_advancement_time {
888                self.context
889                    .metrics
890                    .node_metrics
891                    .commit_round_advancement_interval
892                    .observe(now.duration_since(previous_time).as_secs_f64())
893            }
894            self.last_commit_round_advancement_time = Some(now);
895        }
896
897        for block_ref in commit.blocks().iter() {
898            self.last_committed_rounds[block_ref.author] = max(
899                self.last_committed_rounds[block_ref.author],
900                block_ref.round,
901            );
902        }
903
904        for (i, round) in self.last_committed_rounds.iter().enumerate() {
905            let index = self.context.committee.to_authority_index(i).unwrap();
906            let hostname = &self.context.committee.authority(index).hostname;
907            self.context
908                .metrics
909                .node_metrics
910                .last_committed_authority_round
911                .with_label_values(&[hostname])
912                .set((*round).into());
913        }
914
915        self.pending_commit_votes.push_back(commit.reference());
916        self.commits_to_write.push(commit);
917    }
918
919    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
920        // We empty the unscored committed subdags to calculate reputation scores.
921        assert!(self.unscored_committed_subdags.is_empty());
922
923        // We create an empty scoring subdag once reputation scores are calculated.
924        // Note: It is okay for this to not be gated by protocol config as the
925        // scoring_subdag should be empty in either case at this point.
926        assert!(self.scoring_subdag.is_empty());
927
928        let commit_info = CommitInfo {
929            committed_rounds: self.last_committed_rounds.clone(),
930            reputation_scores,
931        };
932        let last_commit = self
933            .last_commit
934            .as_ref()
935            .expect("Last commit should already be set.");
936        self.commit_info_to_write
937            .push((last_commit.reference(), commit_info));
938    }
939
940    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
941        let mut votes = Vec::new();
942        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
943            votes.push(self.pending_commit_votes.pop_front().unwrap());
944        }
945        votes
946    }
947
948    /// Index of the last commit.
949    pub(crate) fn last_commit_index(&self) -> CommitIndex {
950        match &self.last_commit {
951            Some(commit) => commit.index(),
952            None => 0,
953        }
954    }
955
956    /// Digest of the last commit.
957    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
958        match &self.last_commit {
959            Some(commit) => commit.digest(),
960            None => CommitDigest::MIN,
961        }
962    }
963
964    /// Timestamp of the last commit.
965    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
966        match &self.last_commit {
967            Some(commit) => commit.timestamp_ms(),
968            None => 0,
969        }
970    }
971
972    /// Leader slot of the last commit.
973    pub(crate) fn last_commit_leader(&self) -> Slot {
974        match &self.last_commit {
975            Some(commit) => commit.leader().into(),
976            None => self
977                .genesis
978                .iter()
979                .next()
980                .map(|(genesis_ref, _)| *genesis_ref)
981                .expect("Genesis blocks should always be available.")
982                .into(),
983        }
984    }
985
986    /// Highest round where a block is committed, which is last commit's leader
987    /// round.
988    pub(crate) fn last_commit_round(&self) -> Round {
989        match &self.last_commit {
990            Some(commit) => commit.leader().round,
991            None => 0,
992        }
993    }
994
995    /// Last committed round per authority.
996    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
997        self.last_committed_rounds.clone()
998    }
999
1000    /// The GC round is the highest round that blocks of equal or lower round
1001    /// are considered obsolete and no longer possible to be committed.
1002    /// There is no meaning accepting any blocks with round <= gc_round. The
1003    /// Garbage Collection (GC) round is calculated based on the latest
1004    /// committed leader round. When GC is disabled that will return the genesis
1005    /// round.
1006    pub(crate) fn gc_round(&self) -> Round {
1007        self.calculate_gc_round(self.last_commit_round())
1008    }
1009
1010    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1011        let gc_depth = self.context.protocol_config.gc_depth();
1012        if gc_depth > 0 {
1013            // GC is enabled, only then calculate the diff
1014            commit_round.saturating_sub(gc_depth)
1015        } else {
1016            // Otherwise just return genesis round. That also acts as a safety mechanism so
1017            // we never attempt to truncate anything even accidentally.
1018            GENESIS_ROUND
1019        }
1020    }
1021
1022    pub(crate) fn gc_enabled(&self) -> bool {
1023        self.context.protocol_config.gc_depth() > 0
1024    }
1025
1026    /// After each flush, DagState becomes persisted in storage and it expected
1027    /// to recover all internal states from storage after restarts.
1028    pub(crate) fn flush(&mut self) {
1029        let _s = self
1030            .context
1031            .metrics
1032            .node_metrics
1033            .scope_processing_time
1034            .with_label_values(&["DagState::flush"])
1035            .start_timer();
1036        // Flush buffered data to storage.
1037        let blocks = std::mem::take(&mut self.blocks_to_write);
1038        let commits = std::mem::take(&mut self.commits_to_write);
1039        let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
1040
1041        if blocks.is_empty() && commits.is_empty() {
1042            return;
1043        }
1044        debug!(
1045            "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
1046            blocks.len(),
1047            blocks.iter().map(|b| b.reference().to_string()).join(","),
1048            commits.len(),
1049            commits.iter().map(|c| c.reference().to_string()).join(","),
1050            commit_info_to_write.len(),
1051            commit_info_to_write
1052                .iter()
1053                .map(|(commit_ref, _)| commit_ref.to_string())
1054                .join(","),
1055        );
1056
1057        // Update the scoring metrics accordingly to the blocks being flushed.
1058        let mut metrics_to_write = vec![];
1059        let threshold_clock_round = self.threshold_clock_round();
1060        for (authority_index, authority) in self.context.committee.authorities() {
1061            let last_eviction_round = self.evicted_rounds[authority_index];
1062            let current_eviction_round = self.calculate_authority_eviction_round(authority_index);
1063            let metrics_to_write_from_authority = self
1064                .context
1065                .scoring_metrics_store
1066                .update_scoring_metrics_on_eviction(
1067                    authority_index,
1068                    authority.hostname.as_str(),
1069                    &self.recent_refs_by_authority[authority_index],
1070                    current_eviction_round,
1071                    last_eviction_round,
1072                    threshold_clock_round,
1073                    &self.context.metrics.node_metrics,
1074                );
1075            if let Some(metrics_to_write_from_authority) = metrics_to_write_from_authority {
1076                metrics_to_write.push((authority_index, metrics_to_write_from_authority));
1077            }
1078        }
1079
1080        self.store
1081            .write(WriteBatch::new(
1082                blocks,
1083                commits,
1084                commit_info_to_write,
1085                metrics_to_write,
1086            ))
1087            .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
1088        self.context
1089            .metrics
1090            .node_metrics
1091            .dag_state_store_write_count
1092            .inc();
1093
1094        // Clean up old cached data. After flushing, all cached blocks are guaranteed to
1095        // be persisted. This clean up also triggers some of the scoring metrics
1096        // updates.
1097        for (authority_index, _) in self.context.committee.authorities() {
1098            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1099            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1100                let block_round = block_ref.round;
1101                if block_round <= eviction_round {
1102                    self.recent_blocks.remove(block_ref);
1103                    self.recent_refs_by_authority[authority_index].pop_first();
1104                } else {
1105                    break;
1106                }
1107            }
1108            self.evicted_rounds[authority_index] = eviction_round;
1109        }
1110
1111        let metrics = &self.context.metrics.node_metrics;
1112        metrics
1113            .dag_state_recent_blocks
1114            .set(self.recent_blocks.len() as i64);
1115        metrics.dag_state_recent_refs.set(
1116            self.recent_refs_by_authority
1117                .iter()
1118                .map(BTreeSet::len)
1119                .sum::<usize>() as i64,
1120        );
1121    }
1122
1123    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1124        self.store
1125            .read_last_commit_info()
1126            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1127    }
1128
1129    // TODO: Remove four methods below this when DistributedVoteScoring is enabled.
1130    pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1131        self.unscored_committed_subdags.len() as u64
1132    }
1133
1134    #[cfg(test)]
1135    pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1136        self.unscored_committed_subdags.clone()
1137    }
1138
1139    pub(crate) fn add_unscored_committed_subdags(
1140        &mut self,
1141        committed_subdags: Vec<CommittedSubDag>,
1142    ) {
1143        self.unscored_committed_subdags.extend(committed_subdags);
1144    }
1145
1146    pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1147        std::mem::take(&mut self.unscored_committed_subdags)
1148    }
1149
1150    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1151        self.scoring_subdag.add_subdags(scoring_subdags);
1152    }
1153
1154    pub(crate) fn clear_scoring_subdag(&mut self) {
1155        self.scoring_subdag.clear();
1156    }
1157
1158    pub(crate) fn scoring_subdags_count(&self) -> usize {
1159        self.scoring_subdag.scored_subdags_count()
1160    }
1161
1162    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1163        self.scoring_subdag.is_empty()
1164    }
1165
1166    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1167        self.scoring_subdag.calculate_distributed_vote_scores()
1168    }
1169
1170    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1171        self.scoring_subdag
1172            .commit_range
1173            .as_ref()
1174            .expect("commit range should exist for scoring subdag")
1175            .end()
1176    }
1177
1178    /// The last round that should get evicted after a cache clean up operation.
1179    /// After this round we are guaranteed to have all the produced blocks
1180    /// from that authority. For any round that is <= `last_evicted_round`
1181    /// we don't have such guarantees as out of order blocks might exist.
1182    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1183        if self.gc_enabled() {
1184            let last_round = self.recent_refs_by_authority[authority_index]
1185                .last()
1186                .map(|block_ref| block_ref.round)
1187                .unwrap_or(GENESIS_ROUND);
1188
1189            Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1190        } else {
1191            let commit_round = self.last_committed_rounds[authority_index];
1192            Self::eviction_round(commit_round, self.cached_rounds)
1193        }
1194    }
1195
1196    /// Calculates the last eviction round based on the provided `commit_round`.
1197    /// Any blocks with round <= the evict round have been cleaned up.
1198    fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1199        commit_round.saturating_sub(cached_rounds)
1200    }
1201
1202    /// Calculates the eviction round for the given authority. The goal is to
1203    /// keep at least `cached_rounds` of the latest blocks in the cache (if
1204    /// enough data is available), while evicting blocks with rounds <=
1205    /// `gc_round` when possible.
1206    fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1207        gc_round.min(last_round.saturating_sub(cached_rounds))
1208    }
1209
1210    /// Detects and returns the blocks of the round that forms the last quorum.
1211    /// The method will return the quorum even if that's genesis.
1212    #[cfg(test)]
1213    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1214        // the quorum should exist either on the highest accepted round or the one
1215        // before. If we fail to detect a quorum then it means that our DAG has
1216        // advanced with missing causal history.
1217        for round in
1218            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1219        {
1220            if round == GENESIS_ROUND {
1221                return self.genesis_blocks();
1222            }
1223            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1224            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1225
1226            // Since the minimum wave length is 3 we expect to find a quorum in the
1227            // uncommitted rounds.
1228            let blocks = self.get_uncommitted_blocks_at_round(round);
1229            for block in &blocks {
1230                if quorum.add(block.author(), &self.context.committee) {
1231                    return blocks;
1232                }
1233            }
1234        }
1235
1236        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1237    }
1238
1239    #[cfg(test)]
1240    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1241        self.genesis.values().cloned().collect()
1242    }
1243
1244    #[cfg(test)]
1245    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1246        self.last_commit = Some(commit);
1247    }
1248}
1249
1250struct BlockInfo {
1251    block: VerifiedBlock,
1252    // Whether the block has been committed
1253    committed: bool,
1254}
1255
1256impl BlockInfo {
1257    fn new(block: VerifiedBlock) -> Self {
1258        Self {
1259            block,
1260            committed: false,
1261        }
1262    }
1263}
1264
1265#[cfg(test)]
1266mod test {
1267    use std::vec;
1268
1269    use parking_lot::RwLock;
1270    use rstest::rstest;
1271
1272    use super::*;
1273    use crate::{
1274        block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1275        storage::{WriteBatch, mem_store::MemStore},
1276        test_dag_builder::DagBuilder,
1277        test_dag_parser::parse_dag,
1278    };
1279
1280    #[tokio::test]
1281    async fn test_get_blocks() {
1282        let (context, _) = Context::new_for_test(4);
1283        let context = Arc::new(context);
1284        let store = Arc::new(MemStore::new());
1285        let mut dag_state = DagState::new(context.clone(), store.clone());
1286        let own_index = AuthorityIndex::new_for_test(0);
1287
1288        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1289        let num_rounds: u32 = 10;
1290        let non_existent_round: u32 = 100;
1291        let num_authorities: u32 = 3;
1292        let num_blocks_per_slot: usize = 3;
1293        let mut blocks = BTreeMap::new();
1294        for round in 1..=num_rounds {
1295            for author in 0..num_authorities {
1296                // Create 3 blocks per slot, with different timestamps and digests.
1297                let base_ts = round as BlockTimestampMs * 1000;
1298                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1299                    let block = VerifiedBlock::new_for_test(
1300                        TestBlock::new(round, author)
1301                            .set_timestamp_ms(timestamp)
1302                            .build(),
1303                    );
1304                    dag_state.accept_block(block.clone());
1305                    blocks.insert(block.reference(), block);
1306
1307                    // Only write one block per slot for own index
1308                    if AuthorityIndex::new_for_test(author) == own_index {
1309                        break;
1310                    }
1311                }
1312            }
1313        }
1314
1315        // Check uncommitted blocks that exist.
1316        for (r, block) in &blocks {
1317            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1318        }
1319
1320        // Check uncommitted blocks that do not exist.
1321        let last_ref = blocks.keys().last().unwrap();
1322        assert!(
1323            dag_state
1324                .get_block(&BlockRef::new(
1325                    last_ref.round,
1326                    last_ref.author,
1327                    BlockDigest::MIN
1328                ))
1329                .is_none()
1330        );
1331
1332        // Check slots with uncommitted blocks.
1333        for round in 1..=num_rounds {
1334            for author in 0..num_authorities {
1335                let slot = Slot::new(
1336                    round,
1337                    context
1338                        .committee
1339                        .to_authority_index(author as usize)
1340                        .unwrap(),
1341                );
1342                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1343
1344                // We only write one block per slot for own index
1345                if AuthorityIndex::new_for_test(author) == own_index {
1346                    assert_eq!(blocks.len(), 1);
1347                } else {
1348                    assert_eq!(blocks.len(), num_blocks_per_slot);
1349                }
1350
1351                for b in blocks {
1352                    assert_eq!(b.round(), round);
1353                    assert_eq!(
1354                        b.author(),
1355                        context
1356                            .committee
1357                            .to_authority_index(author as usize)
1358                            .unwrap()
1359                    );
1360                }
1361            }
1362        }
1363
1364        // Check slots without uncommitted blocks.
1365        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1366        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1367
1368        // Check rounds with uncommitted blocks.
1369        for round in 1..=num_rounds {
1370            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1371            // Expect 3 blocks per authority except for own authority which should
1372            // have 1 block.
1373            assert_eq!(
1374                blocks.len(),
1375                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1376            );
1377            for b in blocks {
1378                assert_eq!(b.round(), round);
1379            }
1380        }
1381
1382        // Check rounds without uncommitted blocks.
1383        assert!(
1384            dag_state
1385                .get_uncommitted_blocks_at_round(non_existent_round)
1386                .is_empty()
1387        );
1388    }
1389
1390    #[tokio::test]
1391    async fn test_ancestors_at_uncommitted_round() {
1392        // Initialize DagState.
1393        let (context, _) = Context::new_for_test(4);
1394        let context = Arc::new(context);
1395        let store = Arc::new(MemStore::new());
1396        let mut dag_state = DagState::new(context.clone(), store.clone());
1397
1398        // Populate DagState.
1399
1400        // Round 10 refs will not have their blocks in DagState.
1401        let round_10_refs: Vec<_> = (0..4)
1402            .map(|a| {
1403                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1404                    .reference()
1405            })
1406            .collect();
1407
1408        // Round 11 blocks.
1409        let round_11 = [
1410            // This will connect to round 12.
1411            VerifiedBlock::new_for_test(
1412                TestBlock::new(11, 0)
1413                    .set_timestamp_ms(1100)
1414                    .set_ancestors(round_10_refs.clone())
1415                    .build(),
1416            ),
1417            // Slot(11, 1) has 3 blocks.
1418            // This will connect to round 12.
1419            VerifiedBlock::new_for_test(
1420                TestBlock::new(11, 1)
1421                    .set_timestamp_ms(1110)
1422                    .set_ancestors(round_10_refs.clone())
1423                    .build(),
1424            ),
1425            // This will connect to round 13.
1426            VerifiedBlock::new_for_test(
1427                TestBlock::new(11, 1)
1428                    .set_timestamp_ms(1111)
1429                    .set_ancestors(round_10_refs.clone())
1430                    .build(),
1431            ),
1432            // This will not connect to any block.
1433            VerifiedBlock::new_for_test(
1434                TestBlock::new(11, 1)
1435                    .set_timestamp_ms(1112)
1436                    .set_ancestors(round_10_refs.clone())
1437                    .build(),
1438            ),
1439            // This will not connect to any block.
1440            VerifiedBlock::new_for_test(
1441                TestBlock::new(11, 2)
1442                    .set_timestamp_ms(1120)
1443                    .set_ancestors(round_10_refs.clone())
1444                    .build(),
1445            ),
1446            // This will connect to round 12.
1447            VerifiedBlock::new_for_test(
1448                TestBlock::new(11, 3)
1449                    .set_timestamp_ms(1130)
1450                    .set_ancestors(round_10_refs.clone())
1451                    .build(),
1452            ),
1453        ];
1454
1455        // Round 12 blocks.
1456        let ancestors_for_round_12 = vec![
1457            round_11[0].reference(),
1458            round_11[1].reference(),
1459            round_11[5].reference(),
1460        ];
1461        let round_12 = [
1462            VerifiedBlock::new_for_test(
1463                TestBlock::new(12, 0)
1464                    .set_timestamp_ms(1200)
1465                    .set_ancestors(ancestors_for_round_12.clone())
1466                    .build(),
1467            ),
1468            VerifiedBlock::new_for_test(
1469                TestBlock::new(12, 2)
1470                    .set_timestamp_ms(1220)
1471                    .set_ancestors(ancestors_for_round_12.clone())
1472                    .build(),
1473            ),
1474            VerifiedBlock::new_for_test(
1475                TestBlock::new(12, 3)
1476                    .set_timestamp_ms(1230)
1477                    .set_ancestors(ancestors_for_round_12.clone())
1478                    .build(),
1479            ),
1480        ];
1481
1482        // Round 13 blocks.
1483        let ancestors_for_round_13 = vec![
1484            round_12[0].reference(),
1485            round_12[1].reference(),
1486            round_12[2].reference(),
1487            round_11[2].reference(),
1488        ];
1489        let round_13 = [
1490            VerifiedBlock::new_for_test(
1491                TestBlock::new(12, 1)
1492                    .set_timestamp_ms(1300)
1493                    .set_ancestors(ancestors_for_round_13.clone())
1494                    .build(),
1495            ),
1496            VerifiedBlock::new_for_test(
1497                TestBlock::new(12, 2)
1498                    .set_timestamp_ms(1320)
1499                    .set_ancestors(ancestors_for_round_13.clone())
1500                    .build(),
1501            ),
1502            VerifiedBlock::new_for_test(
1503                TestBlock::new(12, 3)
1504                    .set_timestamp_ms(1330)
1505                    .set_ancestors(ancestors_for_round_13.clone())
1506                    .build(),
1507            ),
1508        ];
1509
1510        // Round 14 anchor block.
1511        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1512        let anchor = VerifiedBlock::new_for_test(
1513            TestBlock::new(14, 1)
1514                .set_timestamp_ms(1410)
1515                .set_ancestors(ancestors_for_round_14)
1516                .build(),
1517        );
1518
1519        // Add all blocks (at and above round 11) to DagState.
1520        for b in round_11
1521            .iter()
1522            .chain(round_12.iter())
1523            .chain(round_13.iter())
1524            .chain([anchor.clone()].iter())
1525        {
1526            dag_state.accept_block(b.clone());
1527        }
1528
1529        // Check ancestors connected to anchor.
1530        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1531        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1532        ancestors_refs.sort();
1533        let mut expected_refs = vec![
1534            round_11[0].reference(),
1535            round_11[1].reference(),
1536            round_11[2].reference(),
1537            round_11[5].reference(),
1538        ];
1539        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1
1540        // & 2) might not be in right lexicographical order.
1541        assert_eq!(
1542            ancestors_refs, expected_refs,
1543            "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1544        );
1545    }
1546
1547    #[tokio::test]
1548    async fn test_contains_blocks_in_cache_or_store() {
1549        /// Only keep elements up to 2 rounds before the last committed round
1550        const CACHED_ROUNDS: Round = 2;
1551
1552        let (mut context, _) = Context::new_for_test(4);
1553        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1554
1555        let context = Arc::new(context);
1556        let store = Arc::new(MemStore::new());
1557        let mut dag_state = DagState::new(context.clone(), store.clone());
1558
1559        // Create test blocks for round 1 ~ 10
1560        let num_rounds: u32 = 10;
1561        let num_authorities: u32 = 4;
1562        let mut blocks = Vec::new();
1563
1564        for round in 1..=num_rounds {
1565            for author in 0..num_authorities {
1566                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1567                blocks.push(block);
1568            }
1569        }
1570
1571        // Now write in store the blocks from first 4 rounds and the rest to the dag
1572        // state
1573        blocks.clone().into_iter().for_each(|block| {
1574            if block.round() <= 4 {
1575                store
1576                    .write(WriteBatch::default().blocks(vec![block]))
1577                    .unwrap();
1578            } else {
1579                dag_state.accept_blocks(vec![block]);
1580            }
1581        });
1582
1583        // Now when trying to query whether we have all the blocks, we should
1584        // successfully retrieve a positive answer where the blocks of first 4
1585        // round should be found in DagState and the rest in store.
1586        let mut block_refs = blocks
1587            .iter()
1588            .map(|block| block.reference())
1589            .collect::<Vec<_>>();
1590        let result = dag_state.contains_blocks(block_refs.clone());
1591
1592        // Ensure everything is found
1593        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1594        assert_eq!(result, expected);
1595
1596        // Now try to ask also for one block ref that is neither in cache nor in store
1597        block_refs.insert(
1598            3,
1599            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1600        );
1601        let result = dag_state.contains_blocks(block_refs.clone());
1602
1603        // Then all should be found apart from the last one
1604        expected.insert(3, false);
1605        assert_eq!(result, expected.clone());
1606    }
1607
1608    #[tokio::test]
1609    async fn test_contains_cached_block_at_slot() {
1610        /// Only keep elements up to 2 rounds before the last committed round
1611        const CACHED_ROUNDS: Round = 2;
1612
1613        let num_authorities: u32 = 4;
1614        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1615        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1616
1617        let context = Arc::new(context);
1618        let store = Arc::new(MemStore::new());
1619        let mut dag_state = DagState::new(context.clone(), store.clone());
1620
1621        // Create test blocks for round 1 ~ 10
1622        let num_rounds: u32 = 10;
1623        let mut blocks = Vec::new();
1624
1625        for round in 1..=num_rounds {
1626            for author in 0..num_authorities {
1627                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1628                blocks.push(block.clone());
1629                dag_state.accept_block(block);
1630            }
1631        }
1632
1633        // Query for genesis round 0, genesis blocks should be returned
1634        for (author, _) in context.committee.authorities() {
1635            assert!(
1636                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1637                "Genesis should always be found"
1638            );
1639        }
1640
1641        // Now when trying to query whether we have all the blocks, we should
1642        // successfully retrieve a positive answer where the blocks of first 4
1643        // round should be found in DagState and the rest in store.
1644        let mut block_refs = blocks
1645            .iter()
1646            .map(|block| block.reference())
1647            .collect::<Vec<_>>();
1648
1649        for block_ref in block_refs.clone() {
1650            let slot = block_ref.into();
1651            let found = dag_state.contains_cached_block_at_slot(slot);
1652            assert!(found, "A block should be found at slot {slot}");
1653        }
1654
1655        // Now try to ask also for one block ref that is not in cache
1656        // Then all should be found apart from the last one
1657        block_refs.insert(
1658            3,
1659            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1660        );
1661        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1662        expected.insert(3, false);
1663
1664        // Attempt to check the same for via the contains slot method
1665        for block_ref in block_refs {
1666            let slot = block_ref.into();
1667            let found = dag_state.contains_cached_block_at_slot(slot);
1668
1669            assert_eq!(expected.remove(0), found);
1670        }
1671    }
1672
1673    #[tokio::test]
1674    #[should_panic(
1675        expected = "Attempted to check for slot S8[0] that is <= the last gc evicted round 8"
1676    )]
1677    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1678        /// Only keep elements up to 2 rounds before the last committed round
1679        const CACHED_ROUNDS: Round = 2;
1680        const GC_DEPTH: u32 = 1;
1681        let (mut context, _) = Context::new_for_test(4);
1682        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1683        context
1684            .protocol_config
1685            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1686
1687        let context = Arc::new(context);
1688        let store = Arc::new(MemStore::new());
1689        let mut dag_state = DagState::new(context.clone(), store.clone());
1690
1691        // Create test blocks for round 1 ~ 10 for authority 0
1692        let mut blocks = Vec::new();
1693        for round in 1..=10 {
1694            let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1695            blocks.push(block.clone());
1696            dag_state.accept_block(block);
1697        }
1698
1699        // Now add a commit to trigger an eviction
1700        dag_state.add_commit(TrustedCommit::new_for_test(
1701            1 as CommitIndex,
1702            CommitDigest::MIN,
1703            0,
1704            blocks.last().unwrap().reference(),
1705            blocks
1706                .into_iter()
1707                .map(|block| block.reference())
1708                .collect::<Vec<_>>(),
1709        ));
1710
1711        dag_state.flush();
1712
1713        // When trying to request for authority 0 at block slot 8 it should panic, as
1714        // anything that is <= commit_round - cached_rounds = 10 - 2 = 8 should
1715        // be evicted
1716        let _ =
1717            dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1718    }
1719
1720    #[tokio::test]
1721    #[should_panic(
1722        expected = "Attempted to check for slot S3[1] that is <= the last gc evicted round 3"
1723    )]
1724    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1725        /// Keep 2 rounds from the highest committed round. This is considered
1726        /// universal and minimum necessary blocks to hold
1727        /// for the correct node operation.
1728        const GC_DEPTH: u32 = 2;
1729        /// Keep at least 3 rounds in cache for each authority.
1730        const CACHED_ROUNDS: Round = 3;
1731
1732        let (mut context, _) = Context::new_for_test(4);
1733        context
1734            .protocol_config
1735            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1736        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1737
1738        let context = Arc::new(context);
1739        let store = Arc::new(MemStore::new());
1740        let mut dag_state = DagState::new(context.clone(), store.clone());
1741
1742        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 -
1743        // 6.
1744        let mut dag_builder = DagBuilder::new(context.clone());
1745        dag_builder.layers(1..=3).build();
1746        dag_builder
1747            .layers(4..=6)
1748            .authorities(vec![AuthorityIndex::new_for_test(0)])
1749            .skip_block()
1750            .build();
1751
1752        // Accept all blocks
1753        dag_builder
1754            .all_blocks()
1755            .into_iter()
1756            .for_each(|block| dag_state.accept_block(block));
1757
1758        // Now add a commit for leader round 5 to trigger an eviction
1759        dag_state.add_commit(TrustedCommit::new_for_test(
1760            1 as CommitIndex,
1761            CommitDigest::MIN,
1762            0,
1763            dag_builder.leader_block(5).unwrap().reference(),
1764            vec![],
1765        ));
1766
1767        dag_state.flush();
1768
1769        // Ensure that gc round has been updated
1770        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1771
1772        // Now what we expect to happen is for:
1773        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1774        // * Node 0 should have in cache blocks from it's latest round, 3, up to round
1775        //   1, which is the number of cached_rounds.
1776        for authority_index in 1..=3 {
1777            for round in 4..=6 {
1778                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1779                    round,
1780                    AuthorityIndex::new_for_test(authority_index)
1781                )));
1782            }
1783        }
1784
1785        for round in 1..=3 {
1786            assert!(
1787                dag_state.contains_cached_block_at_slot(Slot::new(
1788                    round,
1789                    AuthorityIndex::new_for_test(0)
1790                ))
1791            );
1792        }
1793
1794        // When trying to request for authority 1 at block slot 3 it should panic, as
1795        // anything that is <= 3 should be evicted
1796        let _ =
1797            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1798    }
1799
1800    #[tokio::test]
1801    async fn test_get_blocks_in_cache_or_store() {
1802        let (context, _) = Context::new_for_test(4);
1803        let context = Arc::new(context);
1804        let store = Arc::new(MemStore::new());
1805        let mut dag_state = DagState::new(context.clone(), store.clone());
1806
1807        // Create test blocks for round 1 ~ 10
1808        let num_rounds: u32 = 10;
1809        let num_authorities: u32 = 4;
1810        let mut blocks = Vec::new();
1811
1812        for round in 1..=num_rounds {
1813            for author in 0..num_authorities {
1814                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1815                blocks.push(block);
1816            }
1817        }
1818
1819        // Now write in store the blocks from first 4 rounds and the rest to the dag
1820        // state
1821        blocks.clone().into_iter().for_each(|block| {
1822            if block.round() <= 4 {
1823                store
1824                    .write(WriteBatch::default().blocks(vec![block]))
1825                    .unwrap();
1826            } else {
1827                dag_state.accept_blocks(vec![block]);
1828            }
1829        });
1830
1831        // Now when trying to query whether we have all the blocks, we should
1832        // successfully retrieve a positive answer where the blocks of first 4
1833        // round should be found in DagState and the rest in store.
1834        let mut block_refs = blocks
1835            .iter()
1836            .map(|block| block.reference())
1837            .collect::<Vec<_>>();
1838        let result = dag_state.get_blocks(&block_refs);
1839
1840        let mut expected = blocks
1841            .into_iter()
1842            .map(Some)
1843            .collect::<Vec<Option<VerifiedBlock>>>();
1844
1845        // Ensure everything is found
1846        assert_eq!(result, expected.clone());
1847
1848        // Now try to ask also for one block ref that is neither in cache nor in store
1849        block_refs.insert(
1850            3,
1851            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1852        );
1853        let result = dag_state.get_blocks(&block_refs);
1854
1855        // Then all should be found apart from the last one
1856        expected.insert(3, None);
1857        assert_eq!(result, expected);
1858    }
1859
1860    // TODO: Remove when DistributedVoteScoring is enabled.
1861    #[rstest]
1862    #[tokio::test]
1863    async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1864        telemetry_subscribers::init_for_testing();
1865        let num_authorities: u32 = 4;
1866        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1867        context
1868            .protocol_config
1869            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1870
1871        if gc_depth > 0 {
1872            context
1873                .protocol_config
1874                .set_consensus_gc_depth_for_testing(gc_depth);
1875        }
1876
1877        let context = Arc::new(context);
1878        let store = Arc::new(MemStore::new());
1879        let mut dag_state = DagState::new(context.clone(), store.clone());
1880
1881        // Create test blocks and commits for round 1 ~ 10
1882        let num_rounds: u32 = 10;
1883        let mut dag_builder = DagBuilder::new(context.clone());
1884        dag_builder.layers(1..=num_rounds).build();
1885        let mut commits = vec![];
1886
1887        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1888            commits.push(commit);
1889        }
1890
1891        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1892        let temp_commits = commits.split_off(5);
1893        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1894        for commit in commits.clone() {
1895            dag_state.add_commit(commit);
1896        }
1897
1898        // Flush the dag state
1899        dag_state.flush();
1900
1901        // Add the rest of the blocks and commits to the dag state
1902        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1903        for commit in temp_commits.clone() {
1904            dag_state.add_commit(commit);
1905        }
1906
1907        // All blocks should be found in DagState.
1908        let all_blocks = dag_builder.blocks(6..=num_rounds);
1909        let block_refs = all_blocks
1910            .iter()
1911            .map(|block| block.reference())
1912            .collect::<Vec<_>>();
1913
1914        let result = dag_state
1915            .get_blocks(&block_refs)
1916            .into_iter()
1917            .map(|b| b.unwrap())
1918            .collect::<Vec<_>>();
1919        assert_eq!(result, all_blocks);
1920
1921        // Last commit index should be 10.
1922        assert_eq!(dag_state.last_commit_index(), 10);
1923        assert_eq!(
1924            dag_state.last_committed_rounds(),
1925            dag_builder.last_committed_rounds.clone()
1926        );
1927
1928        // Destroy the dag state.
1929        drop(dag_state);
1930
1931        // Recover the state from the store
1932        let dag_state = DagState::new(context.clone(), store.clone());
1933
1934        // Blocks of first 5 rounds should be found in DagState.
1935        let blocks = dag_builder.blocks(1..=5);
1936        let block_refs = blocks
1937            .iter()
1938            .map(|block| block.reference())
1939            .collect::<Vec<_>>();
1940        let result = dag_state
1941            .get_blocks(&block_refs)
1942            .into_iter()
1943            .map(|b| b.unwrap())
1944            .collect::<Vec<_>>();
1945        assert_eq!(result, blocks);
1946
1947        // Blocks above round 5 should not be in DagState, because they are not flushed.
1948        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1949        let block_refs = missing_blocks
1950            .iter()
1951            .map(|block| block.reference())
1952            .collect::<Vec<_>>();
1953        let retrieved_blocks = dag_state
1954            .get_blocks(&block_refs)
1955            .into_iter()
1956            .flatten()
1957            .collect::<Vec<_>>();
1958        assert!(retrieved_blocks.is_empty());
1959
1960        // Last commit index should be 5.
1961        assert_eq!(dag_state.last_commit_index(), 5);
1962
1963        // This is the last_commit_rounds of the first 5 commits that were flushed
1964        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1965        assert_eq!(
1966            dag_state.last_committed_rounds(),
1967            expected_last_committed_rounds
1968        );
1969
1970        // Unscored subdags will be recovered based on the flushed commits and no commit
1971        // info
1972        assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1973    }
1974
1975    #[tokio::test]
1976    async fn test_flush_and_recovery() {
1977        telemetry_subscribers::init_for_testing();
1978        let num_authorities: u32 = 4;
1979        let (context, _) = Context::new_for_test(num_authorities as usize);
1980        let context = Arc::new(context);
1981        let store = Arc::new(MemStore::new());
1982        let mut dag_state = DagState::new(context.clone(), store.clone());
1983
1984        // Create test blocks and commits for round 1 ~ 10
1985        let num_rounds: u32 = 10;
1986        let mut dag_builder = DagBuilder::new(context.clone());
1987        dag_builder.layers(1..=num_rounds).build();
1988        let mut commits = vec![];
1989        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1990            commits.push(commit);
1991        }
1992
1993        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1994        let temp_commits = commits.split_off(5);
1995        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1996        for commit in commits.clone() {
1997            dag_state.add_commit(commit);
1998        }
1999
2000        // Flush the dag state
2001        dag_state.flush();
2002
2003        // Add the rest of the blocks and commits to the dag state
2004        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
2005        for commit in temp_commits.clone() {
2006            dag_state.add_commit(commit);
2007        }
2008
2009        // All blocks should be found in DagState.
2010        let all_blocks = dag_builder.blocks(6..=num_rounds);
2011        let block_refs = all_blocks
2012            .iter()
2013            .map(|block| block.reference())
2014            .collect::<Vec<_>>();
2015        let result = dag_state
2016            .get_blocks(&block_refs)
2017            .into_iter()
2018            .map(|b| b.unwrap())
2019            .collect::<Vec<_>>();
2020        assert_eq!(result, all_blocks);
2021
2022        // Last commit index should be 10.
2023        assert_eq!(dag_state.last_commit_index(), 10);
2024        assert_eq!(
2025            dag_state.last_committed_rounds(),
2026            dag_builder.last_committed_rounds.clone()
2027        );
2028
2029        // Destroy the dag state.
2030        drop(dag_state);
2031
2032        // Recover the state from the store
2033        let dag_state = DagState::new(context.clone(), store.clone());
2034
2035        // Blocks of first 5 rounds should be found in DagState.
2036        let blocks = dag_builder.blocks(1..=5);
2037        let block_refs = blocks
2038            .iter()
2039            .map(|block| block.reference())
2040            .collect::<Vec<_>>();
2041        let result = dag_state
2042            .get_blocks(&block_refs)
2043            .into_iter()
2044            .map(|b| b.unwrap())
2045            .collect::<Vec<_>>();
2046        assert_eq!(result, blocks);
2047
2048        // Blocks above round 5 should not be in DagState, because they are not flushed.
2049        let missing_blocks = dag_builder.blocks(6..=num_rounds);
2050        let block_refs = missing_blocks
2051            .iter()
2052            .map(|block| block.reference())
2053            .collect::<Vec<_>>();
2054        let retrieved_blocks = dag_state
2055            .get_blocks(&block_refs)
2056            .into_iter()
2057            .flatten()
2058            .collect::<Vec<_>>();
2059        assert!(retrieved_blocks.is_empty());
2060
2061        // Last commit index should be 5.
2062        assert_eq!(dag_state.last_commit_index(), 5);
2063
2064        // This is the last_commit_rounds of the first 5 commits that were flushed
2065        let expected_last_committed_rounds = vec![4, 5, 4, 4];
2066        assert_eq!(
2067            dag_state.last_committed_rounds(),
2068            expected_last_committed_rounds
2069        );
2070        // Unscored subdags will be recovered based on the flushed commits and no commit
2071        // info
2072        assert_eq!(dag_state.scoring_subdags_count(), 5);
2073    }
2074
2075    #[tokio::test]
2076    async fn test_flush_and_recovery_gc_enabled() {
2077        telemetry_subscribers::init_for_testing();
2078
2079        const GC_DEPTH: u32 = 3;
2080        const CACHED_ROUNDS: u32 = 4;
2081
2082        let num_authorities: u32 = 4;
2083        let (mut context, _) = Context::new_for_test(num_authorities as usize);
2084        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2085        context
2086            .protocol_config
2087            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2088        context
2089            .protocol_config
2090            .set_consensus_linearize_subdag_v2_for_testing(true);
2091
2092        let context = Arc::new(context);
2093
2094        let store = Arc::new(MemStore::new());
2095        let mut dag_state = DagState::new(context.clone(), store.clone());
2096
2097        let num_rounds: u32 = 10;
2098        let mut dag_builder = DagBuilder::new(context.clone());
2099        dag_builder.layers(1..=5).build();
2100        dag_builder
2101            .layers(6..=8)
2102            .authorities(vec![AuthorityIndex::new_for_test(0)])
2103            .skip_block()
2104            .build();
2105        dag_builder.layers(9..=num_rounds).build();
2106
2107        let mut commits = dag_builder
2108            .get_sub_dag_and_commits(1..=num_rounds)
2109            .into_iter()
2110            .map(|(_subdag, commit)| commit)
2111            .collect::<Vec<_>>();
2112
2113        // Add the blocks from first 8 rounds and first 7 commits to the dag state
2114        // It's 7 commits because we missing the commit of round 8 where authority 0 is
2115        // the leader, but produced no block
2116        let temp_commits = commits.split_off(7);
2117        dag_state.accept_blocks(dag_builder.blocks(1..=8));
2118        for commit in commits.clone() {
2119            dag_state.add_commit(commit);
2120        }
2121
2122        // Holds all the committed blocks from the commits that ended up being persisted
2123        // (flushed). Any commits that not flushed will not be considered.
2124        let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2125        for commit in commits.iter() {
2126            all_committed_blocks.extend(commit.blocks());
2127        }
2128        // Flush the dag state
2129        dag_state.flush();
2130
2131        // Add the rest of the blocks and commits to the dag state
2132        dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2133        for commit in temp_commits.clone() {
2134            dag_state.add_commit(commit);
2135        }
2136
2137        // All blocks should be found in DagState.
2138        let all_blocks = dag_builder.blocks(1..=num_rounds);
2139        let block_refs = all_blocks
2140            .iter()
2141            .map(|block| block.reference())
2142            .collect::<Vec<_>>();
2143        let result = dag_state
2144            .get_blocks(&block_refs)
2145            .into_iter()
2146            .map(|b| b.unwrap())
2147            .collect::<Vec<_>>();
2148        assert_eq!(result, all_blocks);
2149
2150        // Last commit index should be 9
2151        assert_eq!(dag_state.last_commit_index(), 9);
2152        assert_eq!(
2153            dag_state.last_committed_rounds(),
2154            dag_builder.last_committed_rounds.clone()
2155        );
2156
2157        // Destroy the dag state.
2158        drop(dag_state);
2159
2160        // Recover the state from the store
2161        let dag_state = DagState::new(context.clone(), store.clone());
2162
2163        // Blocks of first 5 rounds should be found in DagState.
2164        let blocks = dag_builder.blocks(1..=5);
2165        let block_refs = blocks
2166            .iter()
2167            .map(|block| block.reference())
2168            .collect::<Vec<_>>();
2169        let result = dag_state
2170            .get_blocks(&block_refs)
2171            .into_iter()
2172            .map(|b| b.unwrap())
2173            .collect::<Vec<_>>();
2174        assert_eq!(result, blocks);
2175
2176        // Blocks above round 9 should not be in DagState, because they are not flushed.
2177        let missing_blocks = dag_builder.blocks(9..=num_rounds);
2178        let block_refs = missing_blocks
2179            .iter()
2180            .map(|block| block.reference())
2181            .collect::<Vec<_>>();
2182        let retrieved_blocks = dag_state
2183            .get_blocks(&block_refs)
2184            .into_iter()
2185            .flatten()
2186            .collect::<Vec<_>>();
2187        assert!(retrieved_blocks.is_empty());
2188
2189        // Last commit index should be 7.
2190        assert_eq!(dag_state.last_commit_index(), 7);
2191
2192        // This is the last_commit_rounds of the first 7 commits that were flushed
2193        let expected_last_committed_rounds = vec![5, 6, 6, 7];
2194        assert_eq!(
2195            dag_state.last_committed_rounds(),
2196            expected_last_committed_rounds
2197        );
2198        // Unscored subdags will be recoverd based on the flushed commits and no commit
2199        // info
2200        assert_eq!(dag_state.scoring_subdags_count(), 7);
2201        // Ensure that cached blocks exist only for specific rounds per authority
2202        for (authority_index, _) in context.committee.authorities() {
2203            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2204
2205            // Ensure that eviction rounds have been properly recovered
2206            // DagState should hold cached blocks for authority 0 for rounds [2..=5] as no
2207            // higher blocks exist and due to CACHED_ROUNDS = 4 we want at max
2208            // to hold blocks for 4 rounds in cache.
2209            if authority_index == AuthorityIndex::new_for_test(0) {
2210                assert_eq!(blocks.len(), 4);
2211                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2212                assert!(
2213                    blocks
2214                        .into_iter()
2215                        .all(|block| block.round() >= 2 && block.round() <= 5)
2216                );
2217            } else {
2218                assert_eq!(blocks.len(), 4);
2219                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2220                assert!(
2221                    blocks
2222                        .into_iter()
2223                        .all(|block| block.round() >= 5 && block.round() <= 8)
2224                );
2225            }
2226        }
2227        // Ensure that committed blocks from > gc_round have been correctly marked as
2228        // committed according to committed sub dags
2229        let gc_round = dag_state.gc_round();
2230        assert_eq!(gc_round, 4);
2231        dag_state
2232            .recent_blocks
2233            .iter()
2234            .for_each(|(block_ref, block_info)| {
2235                if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2236                    assert!(
2237                        block_info.committed,
2238                        "Block {block_ref:?} should be committed"
2239                    );
2240                };
2241            });
2242    }
2243
2244    #[tokio::test]
2245    async fn test_block_info_as_committed() {
2246        let num_authorities: u32 = 4;
2247        let (context, _) = Context::new_for_test(num_authorities as usize);
2248        let context = Arc::new(context);
2249
2250        let store = Arc::new(MemStore::new());
2251        let mut dag_state = DagState::new(context.clone(), store.clone());
2252
2253        // Accept a block
2254        let block = VerifiedBlock::new_for_test(
2255            TestBlock::new(1, 0)
2256                .set_timestamp_ms(1000)
2257                .set_ancestors(vec![])
2258                .build(),
2259        );
2260
2261        dag_state.accept_block(block.clone());
2262
2263        // Query is committed
2264        assert!(!dag_state.is_committed(&block.reference()));
2265
2266        // Set block as committed for first time should return true
2267        assert!(
2268            dag_state.set_committed(&block.reference()),
2269            "Block should be successfully set as committed for first time"
2270        );
2271
2272        // Now it should appear as committed
2273        assert!(dag_state.is_committed(&block.reference()));
2274
2275        // Trying to set the block as committed again, it should return false.
2276        assert!(
2277            !dag_state.set_committed(&block.reference()),
2278            "Block should not be successfully set as committed"
2279        );
2280    }
2281
2282    #[tokio::test]
2283    async fn test_get_cached_blocks() {
2284        let (mut context, _) = Context::new_for_test(4);
2285        context.parameters.dag_state_cached_rounds = 5;
2286
2287        let context = Arc::new(context);
2288        let store = Arc::new(MemStore::new());
2289        let mut dag_state = DagState::new(context.clone(), store.clone());
2290
2291        // Create no blocks for authority 0
2292        // Create one block (round 10) for authority 1
2293        // Create two blocks (rounds 10,11) for authority 2
2294        // Create three blocks (rounds 10,11,12) for authority 3
2295        let mut all_blocks = Vec::new();
2296        for author in 1..=3 {
2297            for round in 10..(10 + author) {
2298                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2299                all_blocks.push(block.clone());
2300                dag_state.accept_block(block);
2301            }
2302        }
2303
2304        let cached_blocks =
2305            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2306        assert!(cached_blocks.is_empty());
2307
2308        let cached_blocks =
2309            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2310        assert_eq!(cached_blocks.len(), 1);
2311        assert_eq!(cached_blocks[0].round(), 10);
2312
2313        let cached_blocks =
2314            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2315        assert_eq!(cached_blocks.len(), 2);
2316        assert_eq!(cached_blocks[0].round(), 10);
2317        assert_eq!(cached_blocks[1].round(), 11);
2318
2319        let cached_blocks =
2320            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2321        assert_eq!(cached_blocks.len(), 1);
2322        assert_eq!(cached_blocks[0].round(), 11);
2323
2324        let cached_blocks =
2325            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2326        assert_eq!(cached_blocks.len(), 3);
2327        assert_eq!(cached_blocks[0].round(), 10);
2328        assert_eq!(cached_blocks[1].round(), 11);
2329        assert_eq!(cached_blocks[2].round(), 12);
2330
2331        let cached_blocks =
2332            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2333        assert_eq!(cached_blocks.len(), 1);
2334        assert_eq!(cached_blocks[0].round(), 12);
2335
2336        // Test get_cached_blocks_in_range()
2337
2338        // Start == end
2339        let cached_blocks = dag_state.get_cached_blocks_in_range(
2340            context.committee.to_authority_index(3).unwrap(),
2341            10,
2342            10,
2343            1,
2344        );
2345        assert!(cached_blocks.is_empty());
2346
2347        // Start > end
2348        let cached_blocks = dag_state.get_cached_blocks_in_range(
2349            context.committee.to_authority_index(3).unwrap(),
2350            11,
2351            10,
2352            1,
2353        );
2354        assert!(cached_blocks.is_empty());
2355
2356        // Empty result.
2357        let cached_blocks = dag_state.get_cached_blocks_in_range(
2358            context.committee.to_authority_index(0).unwrap(),
2359            9,
2360            10,
2361            1,
2362        );
2363        assert!(cached_blocks.is_empty());
2364
2365        // Single block, one round before the end.
2366        let cached_blocks = dag_state.get_cached_blocks_in_range(
2367            context.committee.to_authority_index(1).unwrap(),
2368            9,
2369            11,
2370            1,
2371        );
2372        assert_eq!(cached_blocks.len(), 1);
2373        assert_eq!(cached_blocks[0].round(), 10);
2374
2375        // Respect end round.
2376        let cached_blocks = dag_state.get_cached_blocks_in_range(
2377            context.committee.to_authority_index(2).unwrap(),
2378            9,
2379            12,
2380            5,
2381        );
2382        assert_eq!(cached_blocks.len(), 2);
2383        assert_eq!(cached_blocks[0].round(), 10);
2384        assert_eq!(cached_blocks[1].round(), 11);
2385
2386        // Respect start round.
2387        let cached_blocks = dag_state.get_cached_blocks_in_range(
2388            context.committee.to_authority_index(3).unwrap(),
2389            11,
2390            20,
2391            5,
2392        );
2393        assert_eq!(cached_blocks.len(), 2);
2394        assert_eq!(cached_blocks[0].round(), 11);
2395        assert_eq!(cached_blocks[1].round(), 12);
2396
2397        // Respect limit
2398        let cached_blocks = dag_state.get_cached_blocks_in_range(
2399            context.committee.to_authority_index(3).unwrap(),
2400            10,
2401            20,
2402            1,
2403        );
2404        assert_eq!(cached_blocks.len(), 1);
2405        assert_eq!(cached_blocks[0].round(), 10);
2406    }
2407
2408    #[rstest]
2409    #[tokio::test]
2410    async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2411        // GIVEN
2412        const CACHED_ROUNDS: Round = 2;
2413        let (mut context, _) = Context::new_for_test(4);
2414        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2415
2416        if gc_depth > 0 {
2417            context
2418                .protocol_config
2419                .set_consensus_gc_depth_for_testing(gc_depth);
2420        }
2421
2422        let context = Arc::new(context);
2423        let store = Arc::new(MemStore::new());
2424        let mut dag_state = DagState::new(context.clone(), store.clone());
2425
2426        // Create no blocks for authority 0
2427        // Create one block (round 1) for authority 1
2428        // Create two blocks (rounds 1,2) for authority 2
2429        // Create three blocks (rounds 1,2,3) for authority 3
2430        let dag_str = "DAG {
2431            Round 0 : { 4 },
2432            Round 1 : {
2433                B -> [*],
2434                C -> [*],
2435                D -> [*],
2436            },
2437            Round 2 : {
2438                C -> [*],
2439                D -> [*],
2440            },
2441            Round 3 : {
2442                D -> [*],
2443            },
2444        }";
2445
2446        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2447
2448        // Add equivocating block for round 2 authority 3
2449        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2450
2451        // Accept all blocks
2452        for block in dag_builder
2453            .all_blocks()
2454            .into_iter()
2455            .chain(std::iter::once(block))
2456        {
2457            dag_state.accept_block(block);
2458        }
2459
2460        dag_state.add_commit(TrustedCommit::new_for_test(
2461            1 as CommitIndex,
2462            CommitDigest::MIN,
2463            context.clock.timestamp_utc_ms(),
2464            dag_builder.leader_block(3).unwrap().reference(),
2465            vec![],
2466        ));
2467
2468        // WHEN search for the latest blocks
2469        let end_round = 4;
2470        let expected_rounds = vec![0, 1, 2, 3];
2471        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2472        // THEN
2473        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2474        assert_eq!(
2475            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2476            expected_rounds
2477        );
2478        assert_eq!(
2479            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2480            expected_excluded_and_equivocating_blocks
2481        );
2482
2483        // THEN
2484        for (i, expected_round) in expected_rounds.iter().enumerate() {
2485            let round = dag_state
2486                .get_last_cached_block_in_range(
2487                    context.committee.to_authority_index(i).unwrap(),
2488                    0,
2489                    end_round,
2490                )
2491                .map(|b| b.round())
2492                .unwrap_or_default();
2493            assert_eq!(round, *expected_round, "Authority {i}");
2494        }
2495
2496        // WHEN starting from round 2
2497        let start_round = 2;
2498        let expected_rounds = [0, 0, 2, 3];
2499
2500        // THEN
2501        for (i, expected_round) in expected_rounds.iter().enumerate() {
2502            let round = dag_state
2503                .get_last_cached_block_in_range(
2504                    context.committee.to_authority_index(i).unwrap(),
2505                    start_round,
2506                    end_round,
2507                )
2508                .map(|b| b.round())
2509                .unwrap_or_default();
2510            assert_eq!(round, *expected_round, "Authority {i}");
2511        }
2512
2513        // WHEN we flush the DagState - after adding a
2514        // commit with all the blocks, we expect this to trigger a clean up in
2515        // the internal cache. That will keep the all the blocks with rounds >=
2516        // authority_commit_round - CACHED_ROUND.
2517        //
2518        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and
2519        // for those who don't have blocks > gc_round, we'll keep
2520        // all their highest round blocks for CACHED_ROUNDS.
2521        dag_state.flush();
2522
2523        // AND we request before round 3
2524        let end_round = 3;
2525        let expected_rounds = vec![0, 1, 2, 2];
2526
2527        // THEN
2528        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2529        assert_eq!(
2530            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2531            expected_rounds
2532        );
2533
2534        // THEN
2535        for (i, expected_round) in expected_rounds.iter().enumerate() {
2536            let round = dag_state
2537                .get_last_cached_block_in_range(
2538                    context.committee.to_authority_index(i).unwrap(),
2539                    0,
2540                    end_round,
2541                )
2542                .map(|b| b.round())
2543                .unwrap_or_default();
2544            assert_eq!(round, *expected_round, "Authority {i}");
2545        }
2546    }
2547
2548    #[tokio::test]
2549    #[should_panic(
2550        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2551    )]
2552    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2553        // GIVEN
2554        const CACHED_ROUNDS: Round = 1;
2555        const GC_DEPTH: u32 = 1;
2556        let (mut context, _) = Context::new_for_test(4);
2557        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2558        context
2559            .protocol_config
2560            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2561
2562        let context = Arc::new(context);
2563        let store = Arc::new(MemStore::new());
2564        let mut dag_state = DagState::new(context.clone(), store.clone());
2565
2566        // Create no blocks for authority 0
2567        // Create one block (round 1) for authority 1
2568        // Create two blocks (rounds 1,2) for authority 2
2569        // Create three blocks (rounds 1,2,3) for authority 3
2570        let mut dag_builder = DagBuilder::new(context.clone());
2571        dag_builder
2572            .layers(1..=1)
2573            .authorities(vec![AuthorityIndex::new_for_test(0)])
2574            .skip_block()
2575            .build();
2576        dag_builder
2577            .layers(2..=2)
2578            .authorities(vec![
2579                AuthorityIndex::new_for_test(0),
2580                AuthorityIndex::new_for_test(1),
2581            ])
2582            .skip_block()
2583            .build();
2584        dag_builder
2585            .layers(3..=3)
2586            .authorities(vec![
2587                AuthorityIndex::new_for_test(0),
2588                AuthorityIndex::new_for_test(1),
2589                AuthorityIndex::new_for_test(2),
2590            ])
2591            .skip_block()
2592            .build();
2593
2594        // Accept all blocks
2595        for block in dag_builder.all_blocks() {
2596            dag_state.accept_block(block);
2597        }
2598
2599        dag_state.add_commit(TrustedCommit::new_for_test(
2600            1 as CommitIndex,
2601            CommitDigest::MIN,
2602            0,
2603            dag_builder.leader_block(3).unwrap().reference(),
2604            vec![],
2605        ));
2606
2607        // Flush the store so we update the evict rounds
2608        dag_state.flush();
2609
2610        // THEN the method should panic, as some authorities have already evicted rounds
2611        // <= round 2
2612        dag_state.get_last_cached_block_per_authority(2);
2613    }
2614
2615    #[tokio::test]
2616    async fn test_last_quorum() {
2617        // GIVEN
2618        let (context, _) = Context::new_for_test(4);
2619        let context = Arc::new(context);
2620        let store = Arc::new(MemStore::new());
2621        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2622
2623        // WHEN no blocks exist then genesis should be returned
2624        {
2625            let genesis = genesis_blocks(&context);
2626
2627            assert_eq!(dag_state.read().last_quorum(), genesis);
2628        }
2629
2630        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks
2631        // should be returned as quorum
2632        {
2633            let mut dag_builder = DagBuilder::new(context.clone());
2634            dag_builder
2635                .layers(1..=4)
2636                .build()
2637                .persist_layers(dag_state.clone());
2638            let round_4_blocks: Vec<_> = dag_builder
2639                .blocks(4..=4)
2640                .into_iter()
2641                .map(|block| block.reference())
2642                .collect();
2643
2644            let last_quorum = dag_state.read().last_quorum();
2645
2646            assert_eq!(
2647                last_quorum
2648                    .into_iter()
2649                    .map(|block| block.reference())
2650                    .collect::<Vec<_>>(),
2651                round_4_blocks
2652            );
2653        }
2654
2655        // WHEN adding one more block at round 5, still round 4 should be returned as
2656        // quorum
2657        {
2658            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2659            dag_state.write().accept_block(block);
2660
2661            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2662
2663            let last_quorum = dag_state.read().last_quorum();
2664
2665            assert_eq!(last_quorum, round_4_blocks);
2666        }
2667    }
2668
2669    #[tokio::test]
2670    async fn test_last_block_for_authority() {
2671        // GIVEN
2672        let (context, _) = Context::new_for_test(4);
2673        let context = Arc::new(context);
2674        let store = Arc::new(MemStore::new());
2675        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2676
2677        // WHEN no blocks exist then genesis should be returned
2678        {
2679            let genesis = genesis_blocks(&context);
2680            let my_genesis = genesis
2681                .into_iter()
2682                .find(|block| block.author() == context.own_index)
2683                .unwrap();
2684
2685            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2686        }
2687
2688        // WHEN adding some blocks for authorities, only the last ones should be
2689        // returned
2690        {
2691            // add blocks up to round 4
2692            let mut dag_builder = DagBuilder::new(context.clone());
2693            dag_builder
2694                .layers(1..=4)
2695                .build()
2696                .persist_layers(dag_state.clone());
2697
2698            // add block 5 for authority 0
2699            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2700            dag_state.write().accept_block(block);
2701
2702            let block = dag_state
2703                .read()
2704                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2705            assert_eq!(block.round(), 5);
2706
2707            for (authority_index, _) in context.committee.authorities() {
2708                let block = dag_state
2709                    .read()
2710                    .get_last_block_for_authority(authority_index);
2711
2712                if authority_index.value() == 0 {
2713                    assert_eq!(block.round(), 5);
2714                } else {
2715                    assert_eq!(block.round(), 4);
2716                }
2717            }
2718        }
2719    }
2720
2721    #[tokio::test]
2722    #[should_panic]
2723    async fn test_accept_block_panics_when_timestamp_is_ahead() {
2724        // GIVEN
2725        let (mut context, _) = Context::new_for_test(4);
2726        context
2727            .protocol_config
2728            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
2729        let context = Arc::new(context);
2730        let store = Arc::new(MemStore::new());
2731        let mut dag_state = DagState::new(context.clone(), store.clone());
2732
2733        // Set a timestamp for the block that is ahead of the current time
2734        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2735
2736        let block = VerifiedBlock::new_for_test(
2737            TestBlock::new(10, 0)
2738                .set_timestamp_ms(block_timestamp)
2739                .build(),
2740        );
2741
2742        // Try to accept the block - it will panic as accepted block timestamp is ahead
2743        // of the current time
2744        dag_state.accept_block(block);
2745    }
2746
2747    #[tokio::test]
2748    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2749        // GIVEN
2750        let (mut context, _) = Context::new_for_test(4);
2751        context
2752            .protocol_config
2753            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(true);
2754
2755        let context = Arc::new(context);
2756        let store = Arc::new(MemStore::new());
2757        let mut dag_state = DagState::new(context.clone(), store.clone());
2758
2759        // Set a timestamp for the block that is ahead of the current time
2760        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2761
2762        let block = VerifiedBlock::new_for_test(
2763            TestBlock::new(10, 0)
2764                .set_timestamp_ms(block_timestamp)
2765                .build(),
2766        );
2767
2768        // Try to accept the block - it should not panic
2769        dag_state.accept_block(block);
2770    }
2771}