consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::max,
7    collections::{BTreeMap, BTreeSet, VecDeque},
8    ops::Bound::{Excluded, Included, Unbounded},
9    panic,
10    sync::Arc,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info};
18
19use crate::{
20    CommittedSubDag,
21    block::{
22        BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23        VerifiedBlock, genesis_blocks,
24    },
25    commit::{
26        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28    },
29    context::Context,
30    leader_scoring::{ReputationScores, ScoringSubdag},
31    storage::{Store, WriteBatch},
32    threshold_clock::ThresholdClock,
33};
34
35/// DagState provides the API to write and read accepted blocks from the DAG.
36/// Only uncommitted and last committed blocks are cached in memory.
37/// The rest of blocks are stored on disk.
38/// Refs to cached blocks and additional refs are cached as well, to speed up
39/// existence checks.
40///
41/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
42/// concurrent access from multiple components.
43pub(crate) struct DagState {
44    context: Arc<Context>,
45
46    // The genesis blocks
47    genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
50    // Note: all uncommitted blocks are kept in memory.
51    //
52    // When GC is enabled, this map has a different semantic. It holds all the recent data for each
53    // authority making sure that it always have available CACHED_ROUNDS worth of data. The
54    // entries are evicted based on the latest GC round, however the eviction process will respect
55    // the CACHED_ROUNDS. For each authority, blocks are only evicted when their round is less
56    // than or equal to both `gc_round`, and `highest authority round - cached rounds`.
57    // This ensures that the GC requirements are respected (we never clean up any block above
58    // `gc_round`), and there are enough blocks cached.
59    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61    // Indexes recent block refs by their authorities.
62    // Vec position corresponds to the authority index.
63    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65    // Keeps track of the threshold clock for proposing blocks.
66    threshold_clock: ThresholdClock,
67
68    // Keeps track of the highest round that has been evicted for each authority. Any blocks that
69    // are of round <= evict_round should be considered evicted, and if any exist we should not
70    // consider the causauly complete in the order they appear. The `evicted_rounds` size
71    // should be the same as the committee size.
72    evicted_rounds: Vec<Round>,
73
74    // Highest round of blocks accepted.
75    highest_accepted_round: Round,
76
77    // Last consensus commit of the dag.
78    last_commit: Option<TrustedCommit>,
79
80    // Last wall time when commit round advanced. Does not persist across restarts.
81    last_commit_round_advancement_time: Option<std::time::Instant>,
82
83    // Last committed rounds per authority.
84    last_committed_rounds: Vec<Round>,
85
86    /// The committed subdags that have been scored but scores have not been
87    /// used for leader schedule yet.
88    scoring_subdag: ScoringSubdag,
89    // TODO: Remove when DistributedVoteScoring is enabled.
90    /// The list of committed subdags that have been sequenced by the universal
91    /// committer but have yet to be used to calculate reputation scores for the
92    /// next leader schedule. Until then we consider it as "unscored" subdags.
93    unscored_committed_subdags: Vec<CommittedSubDag>,
94
95    // Commit votes pending to be included in new blocks.
96    // TODO: limit to 1st commit per round with multi-leader.
97    pending_commit_votes: VecDeque<CommitVote>,
98
99    // Data to be flushed to storage.
100    blocks_to_write: Vec<VerifiedBlock>,
101    commits_to_write: Vec<TrustedCommit>,
102
103    // Buffer the reputation scores & last_committed_rounds to be flushed with the
104    // next dag state flush. This is okay because we can recover reputation scores
105    // & last_committed_rounds from the commits as needed.
106    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108    // Persistent storage for blocks, commits and other consensus data.
109    store: Arc<dyn Store>,
110
111    // The number of cached rounds
112    cached_rounds: Round,
113}
114
115impl DagState {
116    /// Initializes DagState from storage.
117    pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119        let num_authorities = context.committee.size();
120
121        let genesis = genesis_blocks(context.clone())
122            .into_iter()
123            .map(|block| (block.reference(), block))
124            .collect();
125
126        let threshold_clock = ThresholdClock::new(1, context.clone());
127
128        let last_commit = store
129            .read_last_commit()
130            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132        let commit_info = store
133            .read_last_commit_info()
134            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135        let (mut last_committed_rounds, commit_recovery_start_index) =
136            if let Some((commit_ref, commit_info)) = commit_info {
137                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138                (commit_info.committed_rounds, commit_ref.index + 1)
139            } else {
140                tracing::info!("Found no stored CommitInfo to recover from");
141                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142            };
143
144        let mut unscored_committed_subdags = Vec::new();
145        let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147        if let Some(last_commit) = last_commit.as_ref() {
148            store
149                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150                .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151                .iter()
152                .for_each(|commit| {
153                    for block_ref in commit.blocks() {
154                        last_committed_rounds[block_ref.author] =
155                            max(last_committed_rounds[block_ref.author], block_ref.round);
156                    }
157
158                    let committed_subdag =
159                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); // We don't need to recover reputation scores for unscored_committed_subdags
160                    unscored_committed_subdags.push(committed_subdag);
161                });
162        }
163
164        tracing::info!(
165            "DagState was initialized with the following state: \
166            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167            unscored_committed_subdags.len()
168        );
169
170        if context
171            .protocol_config
172            .consensus_distributed_vote_scoring_strategy()
173        {
174            scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175        }
176
177        let mut state = Self {
178            context,
179            genesis,
180            recent_blocks: BTreeMap::new(),
181            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182            threshold_clock,
183            highest_accepted_round: 0,
184            last_commit: last_commit.clone(),
185            last_commit_round_advancement_time: None,
186            last_committed_rounds: last_committed_rounds.clone(),
187            pending_commit_votes: VecDeque::new(),
188            blocks_to_write: vec![],
189            commits_to_write: vec![],
190            commit_info_to_write: vec![],
191            scoring_subdag,
192            unscored_committed_subdags,
193            store: store.clone(),
194            cached_rounds,
195            evicted_rounds: vec![0; num_authorities],
196        };
197
198        for (i, round) in last_committed_rounds.into_iter().enumerate() {
199            let authority_index = state.context.committee.to_authority_index(i).unwrap();
200            let (blocks, eviction_round) = if state.gc_enabled() {
201                // Find the latest block for the authority to calculate the eviction round. Then
202                // we want to scan and load the blocks from the eviction round and onwards only.
203                // As reminder, the eviction round is taking into account the gc_round.
204                let last_block = state
205                    .store
206                    .scan_last_blocks_by_author(authority_index, 1, None)
207                    .expect("Database error");
208                let last_block_round = last_block
209                    .last()
210                    .map(|b| b.round())
211                    .unwrap_or(GENESIS_ROUND);
212
213                let eviction_round = Self::gc_eviction_round(
214                    last_block_round,
215                    state.gc_round(),
216                    state.cached_rounds,
217                );
218                let blocks = state
219                    .store
220                    .scan_blocks_by_author(authority_index, eviction_round + 1)
221                    .expect("Database error");
222                (blocks, eviction_round)
223            } else {
224                let eviction_round = Self::eviction_round(round, cached_rounds);
225                let blocks = state
226                    .store
227                    .scan_blocks_by_author(authority_index, eviction_round + 1)
228                    .expect("Database error");
229                (blocks, eviction_round)
230            };
231
232            state.evicted_rounds[authority_index] = eviction_round;
233
234            // Update the block metadata for the authority.
235            for block in &blocks {
236                state.update_block_metadata(block);
237            }
238
239            info!(
240                "Recovered blocks {}: {:?}",
241                authority_index,
242                blocks
243                    .iter()
244                    .map(|b| b.reference())
245                    .collect::<Vec<BlockRef>>()
246            );
247        }
248
249        if state.gc_enabled() {
250            if let Some(last_commit) = last_commit {
251                let mut index = last_commit.index();
252                let gc_round = state.gc_round();
253                info!(
254                    "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
255                    index, gc_round
256                );
257
258                loop {
259                    let commits = store
260                        .scan_commits((index..=index).into())
261                        .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
262                    let Some(commit) = commits.first() else {
263                        info!(
264                            "Recovering finished up to index {index}, no more commits to recover"
265                        );
266                        break;
267                    };
268
269                    // Check the commit leader round to see if it is within the gc_round. If it is
270                    // not then we can stop the recovery process.
271                    if gc_round > 0 && commit.leader().round <= gc_round {
272                        info!(
273                            "Recovering finished, reached commit leader round {} <= gc_round {}",
274                            commit.leader().round,
275                            gc_round
276                        );
277                        break;
278                    }
279
280                    commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
281                        debug!(
282                            "Setting block {:?} as committed based on commit {:?}",
283                            block_ref,
284                            commit.index()
285                        );
286                        assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
287                    });
288
289                    // All commits are indexed starting from 1, so one reach zero exit.
290                    index = index.saturating_sub(1);
291                    if index == 0 {
292                        break;
293                    }
294                }
295            }
296        }
297
298        state
299    }
300
301    /// Accepts a block into DagState and keeps it in memory.
302    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
303        assert_ne!(
304            block.round(),
305            0,
306            "Genesis block should not be accepted into DAG."
307        );
308
309        let block_ref = block.reference();
310        if self.contains_block(&block_ref) {
311            return;
312        }
313
314        let now = self.context.clock.timestamp_utc_ms();
315        if block.timestamp_ms() > now {
316            panic!(
317                "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
318                block,
319                block.timestamp_ms(),
320                now,
321            );
322        }
323
324        // TODO: Move this check to core
325        // Ensure we don't write multiple blocks per slot for our own index
326        if block_ref.author == self.context.own_index {
327            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
328            assert!(
329                existing_blocks.is_empty(),
330                "Block Rejected! Attempted to add block {block:#?} to own slot where \
331                block(s) {existing_blocks:#?} already exists."
332            );
333        }
334        self.update_block_metadata(&block);
335        self.blocks_to_write.push(block);
336        let source = if self.context.own_index == block_ref.author {
337            "own"
338        } else {
339            "others"
340        };
341        self.context
342            .metrics
343            .node_metrics
344            .accepted_blocks
345            .with_label_values(&[source])
346            .inc();
347    }
348
349    /// Updates internal metadata for a block.
350    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
351        let block_ref = block.reference();
352        self.recent_blocks
353            .insert(block_ref, BlockInfo::new(block.clone()));
354        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
355        self.threshold_clock.add_block(block_ref);
356        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
357        self.context
358            .metrics
359            .node_metrics
360            .highest_accepted_round
361            .set(self.highest_accepted_round as i64);
362
363        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
364            .last()
365            .map(|block_ref| block_ref.round)
366            .expect("There should be by now at least one block ref");
367        let hostname = &self.context.committee.authority(block_ref.author).hostname;
368        self.context
369            .metrics
370            .node_metrics
371            .highest_accepted_authority_round
372            .with_label_values(&[hostname])
373            .set(highest_accepted_round_for_author as i64);
374    }
375
376    /// Accepts a blocks into DagState and keeps it in memory.
377    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
378        debug!(
379            "Accepting blocks: {}",
380            blocks.iter().map(|b| b.reference().to_string()).join(",")
381        );
382        for block in blocks {
383            self.accept_block(block);
384        }
385    }
386
387    /// Gets a block by checking cached recent blocks then storage.
388    /// Returns None when the block is not found.
389    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
390        self.get_blocks(&[*reference])
391            .pop()
392            .expect("Exactly one element should be returned")
393    }
394
395    /// Gets blocks by checking genesis, cached recent blocks in memory, then
396    /// storage. An element is None when the corresponding block is not
397    /// found.
398    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
399        let mut blocks = vec![None; block_refs.len()];
400        let mut missing = Vec::new();
401
402        for (index, block_ref) in block_refs.iter().enumerate() {
403            if block_ref.round == GENESIS_ROUND {
404                // Allow the caller to handle the invalid genesis ancestor error.
405                if let Some(block) = self.genesis.get(block_ref) {
406                    blocks[index] = Some(block.clone());
407                }
408                continue;
409            }
410            if let Some(block_info) = self.recent_blocks.get(block_ref) {
411                blocks[index] = Some(block_info.block.clone());
412                continue;
413            }
414            missing.push((index, block_ref));
415        }
416
417        if missing.is_empty() {
418            return blocks;
419        }
420
421        let missing_refs = missing
422            .iter()
423            .map(|(_, block_ref)| **block_ref)
424            .collect::<Vec<_>>();
425        let store_results = self
426            .store
427            .read_blocks(&missing_refs)
428            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
429        self.context
430            .metrics
431            .node_metrics
432            .dag_state_store_read_count
433            .with_label_values(&["get_blocks"])
434            .inc();
435
436        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
437            blocks[index] = result;
438        }
439
440        blocks
441    }
442
443    // Sets the block as committed in the cache. If the block is set as committed
444    // for first time, then true is returned, otherwise false is returned instead.
445    // Method will panic if the block is not found in the cache.
446    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
447        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
448            if !block_info.committed {
449                block_info.committed = true;
450                return true;
451            }
452            false
453        } else {
454            panic!("Block {block_ref:?} not found in cache to set as committed.");
455        }
456    }
457
458    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
459        self.recent_blocks
460            .get(block_ref)
461            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
462            .committed
463    }
464
465    /// Gets all uncommitted blocks in a slot.
466    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
467    /// checked.
468    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
469        // TODO: either panic below when the slot is at or below the last committed
470        // round, or support reading from storage while limiting storage reads
471        // to edge cases.
472
473        let mut blocks = vec![];
474        for (_block_ref, block_info) in self.recent_blocks.range((
475            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
476            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
477        )) {
478            blocks.push(block_info.block.clone())
479        }
480        blocks
481    }
482
483    /// Gets all uncommitted blocks in a round.
484    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
485    /// checked.
486    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
487        if round <= self.last_commit_round() {
488            panic!("Round {round} have committed blocks!");
489        }
490
491        let mut blocks = vec![];
492        for (_block_ref, block_info) in self.recent_blocks.range((
493            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
494            Excluded(BlockRef::new(
495                round + 1,
496                AuthorityIndex::ZERO,
497                BlockDigest::MIN,
498            )),
499        )) {
500            blocks.push(block_info.block.clone())
501        }
502        blocks
503    }
504
505    /// Gets all ancestors in the history of a block at a certain round.
506    pub(crate) fn ancestors_at_round(
507        &self,
508        later_block: &VerifiedBlock,
509        earlier_round: Round,
510    ) -> Vec<VerifiedBlock> {
511        // Iterate through ancestors of later_block in round descending order.
512        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
513        while !linked.is_empty() {
514            let round = linked.last().unwrap().round;
515            // Stop after finishing traversal for ancestors above earlier_round.
516            if round <= earlier_round {
517                break;
518            }
519            let block_ref = linked.pop_last().unwrap();
520            let Some(block) = self.get_block(&block_ref) else {
521                panic!("Block {block_ref:?} should exist in DAG!");
522            };
523            linked.extend(block.ancestors().iter().cloned());
524        }
525        linked
526            .range((
527                Included(BlockRef::new(
528                    earlier_round,
529                    AuthorityIndex::ZERO,
530                    BlockDigest::MIN,
531                )),
532                Unbounded,
533            ))
534            .map(|r| {
535                self.get_block(r)
536                    .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
537                    .clone()
538            })
539            .collect()
540    }
541
542    /// Gets the last proposed block from this authority.
543    /// If no block is proposed yet, returns the genesis block.
544    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
545        self.get_last_block_for_authority(self.context.own_index)
546    }
547
548    /// Retrieves the last accepted block from the specified `authority`. If no
549    /// block is found in cache then the genesis block is returned as no other
550    /// block has been received from that authority.
551    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
552        if let Some(last) = self.recent_refs_by_authority[authority].last() {
553            return self
554                .recent_blocks
555                .get(last)
556                .expect("Block should be found in recent blocks")
557                .block
558                .clone();
559        }
560
561        // if none exists, then fallback to genesis
562        let (_, genesis_block) = self
563            .genesis
564            .iter()
565            .find(|(block_ref, _)| block_ref.author == authority)
566            .expect("Genesis should be found for authority {authority_index}");
567        genesis_block.clone()
568    }
569
570    /// Returns cached recent blocks from the specified authority.
571    /// Blocks returned are limited to round >= `start`, and cached.
572    /// NOTE: caller should not assume returned blocks are always chained.
573    /// "Disconnected" blocks can be returned when there are byzantine blocks,
574    /// or when received blocks are not deduped.
575    pub(crate) fn get_cached_blocks(
576        &self,
577        authority: AuthorityIndex,
578        start: Round,
579    ) -> Vec<VerifiedBlock> {
580        let mut blocks = vec![];
581        for block_ref in self.recent_refs_by_authority[authority].range((
582            Included(BlockRef::new(start, authority, BlockDigest::MIN)),
583            Unbounded,
584        )) {
585            let block_info = self
586                .recent_blocks
587                .get(block_ref)
588                .expect("Block should exist in recent blocks");
589            blocks.push(block_info.block.clone());
590        }
591        blocks
592    }
593
594    // Retrieves the cached block within the range [start_round, end_round) from a
595    // given authority. NOTE: end_round must be greater than GENESIS_ROUND.
596    pub(crate) fn get_last_cached_block_in_range(
597        &self,
598        authority: AuthorityIndex,
599        start_round: Round,
600        end_round: Round,
601    ) -> Option<VerifiedBlock> {
602        if end_round == GENESIS_ROUND {
603            panic!(
604                "Attempted to retrieve blocks earlier than the genesis round which is impossible"
605            );
606        }
607
608        let block_ref = self.recent_refs_by_authority[authority]
609            .range((
610                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
611                Excluded(BlockRef::new(
612                    end_round,
613                    AuthorityIndex::MIN,
614                    BlockDigest::MIN,
615                )),
616            ))
617            .last()?;
618
619        self.recent_blocks
620            .get(block_ref)
621            .map(|block_info| block_info.block.clone())
622    }
623
624    /// Returns the last block proposed per authority with `evicted round <
625    /// round < end_round`. The method is guaranteed to return results only
626    /// when the `end_round` is not earlier of the available cached data for
627    /// each authority (evicted round + 1), otherwise the method will panic.
628    /// It's the caller's responsibility to ensure that is not requesting for
629    /// earlier rounds. In case of equivocation for an authority's last
630    /// slot, one block will be returned (the last in order) and the other
631    /// equivocating blocks will be returned.
632    pub(crate) fn get_last_cached_block_per_authority(
633        &self,
634        end_round: Round,
635    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
636        // Initialize with the genesis blocks as fallback
637        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
638        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
639
640        if end_round == GENESIS_ROUND {
641            panic!(
642                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
643            );
644        }
645
646        if end_round == GENESIS_ROUND + 1 {
647            return blocks.into_iter().map(|b| (b, vec![])).collect();
648        }
649
650        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
651            let authority_index = self
652                .context
653                .committee
654                .to_authority_index(authority_index)
655                .unwrap();
656
657            let last_evicted_round = self.evicted_rounds[authority_index];
658            if end_round.saturating_sub(1) <= last_evicted_round {
659                panic!(
660                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
661                );
662            }
663
664            let block_ref_iter = block_refs
665                .range((
666                    Included(BlockRef::new(
667                        last_evicted_round + 1,
668                        authority_index,
669                        BlockDigest::MIN,
670                    )),
671                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
672                ))
673                .rev();
674
675            let mut last_round = 0;
676            for block_ref in block_ref_iter {
677                if last_round == 0 {
678                    last_round = block_ref.round;
679                    let block_info = self
680                        .recent_blocks
681                        .get(block_ref)
682                        .expect("Block should exist in recent blocks");
683                    blocks[authority_index] = block_info.block.clone();
684                    continue;
685                }
686                if block_ref.round < last_round {
687                    break;
688                }
689                equivocating_blocks[authority_index].push(*block_ref);
690            }
691        }
692
693        blocks.into_iter().zip(equivocating_blocks).collect()
694    }
695
696    /// Checks whether a block exists in the slot. The method checks only
697    /// against the cached data. If the user asks for a slot that is not
698    /// within the cached data then a panic is thrown.
699    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
700        // Always return true for genesis slots.
701        if slot.round == GENESIS_ROUND {
702            return true;
703        }
704
705        let eviction_round = self.evicted_rounds[slot.authority];
706        if slot.round <= eviction_round {
707            panic!(
708                "{}",
709                format!(
710                    "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
711                    if self.gc_enabled() { " gc " } else { " " }
712                )
713            );
714        }
715
716        let mut result = self.recent_refs_by_authority[slot.authority].range((
717            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
718            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
719        ));
720        result.next().is_some()
721    }
722
723    /// Checks whether the required blocks are in cache, if exist, or otherwise
724    /// will check in store. The method is not caching back the results, so
725    /// its expensive if keep asking for cache missing blocks.
726    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
727        let mut exist = vec![false; block_refs.len()];
728        let mut missing = Vec::new();
729
730        for (index, block_ref) in block_refs.into_iter().enumerate() {
731            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
732            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
733                exist[index] = true;
734            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
735            {
736                // Optimization: recent_refs contain the most recent blocks known to this
737                // authority. If a block ref is not found there and has a higher
738                // round, it definitely is missing from this authority and there
739                // is no need to check disk.
740                exist[index] = false;
741            } else {
742                missing.push((index, block_ref));
743            }
744        }
745
746        if missing.is_empty() {
747            return exist;
748        }
749
750        let missing_refs = missing
751            .iter()
752            .map(|(_, block_ref)| *block_ref)
753            .collect::<Vec<_>>();
754        let store_results = self
755            .store
756            .contains_blocks(&missing_refs)
757            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
758        self.context
759            .metrics
760            .node_metrics
761            .dag_state_store_read_count
762            .with_label_values(&["contains_blocks"])
763            .inc();
764
765        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
766            exist[index] = result;
767        }
768
769        exist
770    }
771
772    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
773        let blocks = self.contains_blocks(vec![*block_ref]);
774        blocks.first().cloned().unwrap()
775    }
776
777    pub(crate) fn threshold_clock_round(&self) -> Round {
778        self.threshold_clock.get_round()
779    }
780
781    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
782        self.threshold_clock.get_quorum_ts()
783    }
784
785    pub(crate) fn highest_accepted_round(&self) -> Round {
786        self.highest_accepted_round
787    }
788
789    // Buffers a new commit in memory and updates last committed rounds.
790    // REQUIRED: must not skip over any commit index.
791    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
792        if let Some(last_commit) = &self.last_commit {
793            if commit.index() <= last_commit.index() {
794                error!(
795                    "New commit index {} <= last commit index {}!",
796                    commit.index(),
797                    last_commit.index()
798                );
799                return;
800            }
801            assert_eq!(commit.index(), last_commit.index() + 1);
802
803            if commit.timestamp_ms() < last_commit.timestamp_ms() {
804                panic!(
805                    "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
806                );
807            }
808        } else {
809            assert_eq!(commit.index(), 1);
810        }
811
812        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
813            previous_commit.round() < commit.round()
814        } else {
815            true
816        };
817
818        self.last_commit = Some(commit.clone());
819
820        if commit_round_advanced {
821            let now = std::time::Instant::now();
822            if let Some(previous_time) = self.last_commit_round_advancement_time {
823                self.context
824                    .metrics
825                    .node_metrics
826                    .commit_round_advancement_interval
827                    .observe(now.duration_since(previous_time).as_secs_f64())
828            }
829            self.last_commit_round_advancement_time = Some(now);
830        }
831
832        for block_ref in commit.blocks().iter() {
833            self.last_committed_rounds[block_ref.author] = max(
834                self.last_committed_rounds[block_ref.author],
835                block_ref.round,
836            );
837        }
838
839        for (i, round) in self.last_committed_rounds.iter().enumerate() {
840            let index = self.context.committee.to_authority_index(i).unwrap();
841            let hostname = &self.context.committee.authority(index).hostname;
842            self.context
843                .metrics
844                .node_metrics
845                .last_committed_authority_round
846                .with_label_values(&[hostname])
847                .set((*round).into());
848        }
849
850        self.pending_commit_votes.push_back(commit.reference());
851        self.commits_to_write.push(commit);
852    }
853
854    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
855        // We empty the unscored committed subdags to calculate reputation scores.
856        assert!(self.unscored_committed_subdags.is_empty());
857
858        // We create an empty scoring subdag once reputation scores are calculated.
859        // Note: It is okay for this to not be gated by protocol config as the
860        // scoring_subdag should be empty in either case at this point.
861        assert!(self.scoring_subdag.is_empty());
862
863        let commit_info = CommitInfo {
864            committed_rounds: self.last_committed_rounds.clone(),
865            reputation_scores,
866        };
867        let last_commit = self
868            .last_commit
869            .as_ref()
870            .expect("Last commit should already be set.");
871        self.commit_info_to_write
872            .push((last_commit.reference(), commit_info));
873    }
874
875    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
876        let mut votes = Vec::new();
877        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
878            votes.push(self.pending_commit_votes.pop_front().unwrap());
879        }
880        votes
881    }
882
883    /// Index of the last commit.
884    pub(crate) fn last_commit_index(&self) -> CommitIndex {
885        match &self.last_commit {
886            Some(commit) => commit.index(),
887            None => 0,
888        }
889    }
890
891    /// Digest of the last commit.
892    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
893        match &self.last_commit {
894            Some(commit) => commit.digest(),
895            None => CommitDigest::MIN,
896        }
897    }
898
899    /// Timestamp of the last commit.
900    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
901        match &self.last_commit {
902            Some(commit) => commit.timestamp_ms(),
903            None => 0,
904        }
905    }
906
907    /// Leader slot of the last commit.
908    pub(crate) fn last_commit_leader(&self) -> Slot {
909        match &self.last_commit {
910            Some(commit) => commit.leader().into(),
911            None => self
912                .genesis
913                .iter()
914                .next()
915                .map(|(genesis_ref, _)| *genesis_ref)
916                .expect("Genesis blocks should always be available.")
917                .into(),
918        }
919    }
920
921    /// Highest round where a block is committed, which is last commit's leader
922    /// round.
923    pub(crate) fn last_commit_round(&self) -> Round {
924        match &self.last_commit {
925            Some(commit) => commit.leader().round,
926            None => 0,
927        }
928    }
929
930    /// Last committed round per authority.
931    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
932        self.last_committed_rounds.clone()
933    }
934
935    /// The GC round is the highest round that blocks of equal or lower round
936    /// are considered obsolete and no longer possible to be committed.
937    /// There is no meaning accepting any blocks with round <= gc_round. The
938    /// Garbage Collection (GC) round is calculated based on the latest
939    /// committed leader round. When GC is disabled that will return the genesis
940    /// round.
941    pub(crate) fn gc_round(&self) -> Round {
942        self.calculate_gc_round(self.last_commit_round())
943    }
944
945    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
946        let gc_depth = self.context.protocol_config.gc_depth();
947        if gc_depth > 0 {
948            // GC is enabled, only then calculate the diff
949            commit_round.saturating_sub(gc_depth)
950        } else {
951            // Otherwise just return genesis round. That also acts as a safety mechanism so
952            // we never attempt to truncate anything even accidentally.
953            GENESIS_ROUND
954        }
955    }
956
957    pub(crate) fn gc_enabled(&self) -> bool {
958        self.context.protocol_config.gc_depth() > 0
959    }
960
961    /// After each flush, DagState becomes persisted in storage and it expected
962    /// to recover all internal states from storage after restarts.
963    pub(crate) fn flush(&mut self) {
964        let _s = self
965            .context
966            .metrics
967            .node_metrics
968            .scope_processing_time
969            .with_label_values(&["DagState::flush"])
970            .start_timer();
971        // Flush buffered data to storage.
972        let blocks = std::mem::take(&mut self.blocks_to_write);
973        let commits = std::mem::take(&mut self.commits_to_write);
974        let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
975
976        if blocks.is_empty() && commits.is_empty() {
977            return;
978        }
979        debug!(
980            "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
981            blocks.len(),
982            blocks.iter().map(|b| b.reference().to_string()).join(","),
983            commits.len(),
984            commits.iter().map(|c| c.reference().to_string()).join(","),
985            commit_info_to_write.len(),
986            commit_info_to_write
987                .iter()
988                .map(|(commit_ref, _)| commit_ref.to_string())
989                .join(","),
990        );
991        self.store
992            .write(WriteBatch::new(blocks, commits, commit_info_to_write))
993            .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
994        self.context
995            .metrics
996            .node_metrics
997            .dag_state_store_write_count
998            .inc();
999
1000        // Clean up old cached data. After flushing, all cached blocks are guaranteed to
1001        // be persisted.
1002        for (authority_index, _) in self.context.committee.authorities() {
1003            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1004            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1005                if block_ref.round <= eviction_round {
1006                    self.recent_blocks.remove(block_ref);
1007                    self.recent_refs_by_authority[authority_index].pop_first();
1008                } else {
1009                    break;
1010                }
1011            }
1012            self.evicted_rounds[authority_index] = eviction_round;
1013        }
1014
1015        let metrics = &self.context.metrics.node_metrics;
1016        metrics
1017            .dag_state_recent_blocks
1018            .set(self.recent_blocks.len() as i64);
1019        metrics.dag_state_recent_refs.set(
1020            self.recent_refs_by_authority
1021                .iter()
1022                .map(BTreeSet::len)
1023                .sum::<usize>() as i64,
1024        );
1025    }
1026
1027    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1028        self.store
1029            .read_last_commit_info()
1030            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1031    }
1032
1033    // TODO: Remove four methods below this when DistributedVoteScoring is enabled.
1034    pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1035        self.unscored_committed_subdags.len() as u64
1036    }
1037
1038    #[cfg(test)]
1039    pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1040        self.unscored_committed_subdags.clone()
1041    }
1042
1043    pub(crate) fn add_unscored_committed_subdags(
1044        &mut self,
1045        committed_subdags: Vec<CommittedSubDag>,
1046    ) {
1047        self.unscored_committed_subdags.extend(committed_subdags);
1048    }
1049
1050    pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1051        std::mem::take(&mut self.unscored_committed_subdags)
1052    }
1053
1054    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1055        self.scoring_subdag.add_subdags(scoring_subdags);
1056    }
1057
1058    pub(crate) fn clear_scoring_subdag(&mut self) {
1059        self.scoring_subdag.clear();
1060    }
1061
1062    pub(crate) fn scoring_subdags_count(&self) -> usize {
1063        self.scoring_subdag.scored_subdags_count()
1064    }
1065
1066    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1067        self.scoring_subdag.is_empty()
1068    }
1069
1070    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1071        self.scoring_subdag.calculate_distributed_vote_scores()
1072    }
1073
1074    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1075        self.scoring_subdag
1076            .commit_range
1077            .as_ref()
1078            .expect("commit range should exist for scoring subdag")
1079            .end()
1080    }
1081
1082    /// The last round that should get evicted after a cache clean up operation.
1083    /// After this round we are guaranteed to have all the produced blocks
1084    /// from that authority. For any round that is <= `last_evicted_round`
1085    /// we don't have such guarantees as out of order blocks might exist.
1086    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1087        if self.gc_enabled() {
1088            let last_round = self.recent_refs_by_authority[authority_index]
1089                .last()
1090                .map(|block_ref| block_ref.round)
1091                .unwrap_or(GENESIS_ROUND);
1092
1093            Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1094        } else {
1095            let commit_round = self.last_committed_rounds[authority_index];
1096            Self::eviction_round(commit_round, self.cached_rounds)
1097        }
1098    }
1099
1100    /// Calculates the last eviction round based on the provided `commit_round`.
1101    /// Any blocks with round <= the evict round have been cleaned up.
1102    fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1103        commit_round.saturating_sub(cached_rounds)
1104    }
1105
1106    /// Calculates the eviction round for the given authority. The goal is to
1107    /// keep at least `cached_rounds` of the latest blocks in the cache (if
1108    /// enough data is available), while evicting blocks with rounds <=
1109    /// `gc_round` when possible.
1110    fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1111        gc_round.min(last_round.saturating_sub(cached_rounds))
1112    }
1113
1114    /// Detects and returns the blocks of the round that forms the last quorum.
1115    /// The method will return the quorum even if that's genesis.
1116    #[cfg(test)]
1117    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1118        // the quorum should exist either on the highest accepted round or the one
1119        // before. If we fail to detect a quorum then it means that our DAG has
1120        // advanced with missing causal history.
1121        for round in
1122            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1123        {
1124            if round == GENESIS_ROUND {
1125                return self.genesis_blocks();
1126            }
1127            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1128            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1129
1130            // Since the minimum wave length is 3 we expect to find a quorum in the
1131            // uncommitted rounds.
1132            let blocks = self.get_uncommitted_blocks_at_round(round);
1133            for block in &blocks {
1134                if quorum.add(block.author(), &self.context.committee) {
1135                    return blocks;
1136                }
1137            }
1138        }
1139
1140        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1141    }
1142
1143    #[cfg(test)]
1144    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1145        self.genesis.values().cloned().collect()
1146    }
1147
1148    #[cfg(test)]
1149    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1150        self.last_commit = Some(commit);
1151    }
1152}
1153
1154struct BlockInfo {
1155    block: VerifiedBlock,
1156    // Whether the block has been committed
1157    committed: bool,
1158}
1159
1160impl BlockInfo {
1161    fn new(block: VerifiedBlock) -> Self {
1162        Self {
1163            block,
1164            committed: false,
1165        }
1166    }
1167}
1168
1169#[cfg(test)]
1170mod test {
1171    use std::vec;
1172
1173    use parking_lot::RwLock;
1174    use rstest::rstest;
1175
1176    use super::*;
1177    use crate::{
1178        block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1179        storage::{WriteBatch, mem_store::MemStore},
1180        test_dag_builder::DagBuilder,
1181        test_dag_parser::parse_dag,
1182    };
1183
1184    #[tokio::test]
1185    async fn test_get_blocks() {
1186        let (context, _) = Context::new_for_test(4);
1187        let context = Arc::new(context);
1188        let store = Arc::new(MemStore::new());
1189        let mut dag_state = DagState::new(context.clone(), store.clone());
1190        let own_index = AuthorityIndex::new_for_test(0);
1191
1192        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1193        let num_rounds: u32 = 10;
1194        let non_existent_round: u32 = 100;
1195        let num_authorities: u32 = 3;
1196        let num_blocks_per_slot: usize = 3;
1197        let mut blocks = BTreeMap::new();
1198        for round in 1..=num_rounds {
1199            for author in 0..num_authorities {
1200                // Create 3 blocks per slot, with different timestamps and digests.
1201                let base_ts = round as BlockTimestampMs * 1000;
1202                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1203                    let block = VerifiedBlock::new_for_test(
1204                        TestBlock::new(round, author)
1205                            .set_timestamp_ms(timestamp)
1206                            .build(),
1207                    );
1208                    dag_state.accept_block(block.clone());
1209                    blocks.insert(block.reference(), block);
1210
1211                    // Only write one block per slot for own index
1212                    if AuthorityIndex::new_for_test(author) == own_index {
1213                        break;
1214                    }
1215                }
1216            }
1217        }
1218
1219        // Check uncommitted blocks that exist.
1220        for (r, block) in &blocks {
1221            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1222        }
1223
1224        // Check uncommitted blocks that do not exist.
1225        let last_ref = blocks.keys().last().unwrap();
1226        assert!(
1227            dag_state
1228                .get_block(&BlockRef::new(
1229                    last_ref.round,
1230                    last_ref.author,
1231                    BlockDigest::MIN
1232                ))
1233                .is_none()
1234        );
1235
1236        // Check slots with uncommitted blocks.
1237        for round in 1..=num_rounds {
1238            for author in 0..num_authorities {
1239                let slot = Slot::new(
1240                    round,
1241                    context
1242                        .committee
1243                        .to_authority_index(author as usize)
1244                        .unwrap(),
1245                );
1246                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1247
1248                // We only write one block per slot for own index
1249                if AuthorityIndex::new_for_test(author) == own_index {
1250                    assert_eq!(blocks.len(), 1);
1251                } else {
1252                    assert_eq!(blocks.len(), num_blocks_per_slot);
1253                }
1254
1255                for b in blocks {
1256                    assert_eq!(b.round(), round);
1257                    assert_eq!(
1258                        b.author(),
1259                        context
1260                            .committee
1261                            .to_authority_index(author as usize)
1262                            .unwrap()
1263                    );
1264                }
1265            }
1266        }
1267
1268        // Check slots without uncommitted blocks.
1269        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1270        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1271
1272        // Check rounds with uncommitted blocks.
1273        for round in 1..=num_rounds {
1274            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1275            // Expect 3 blocks per authority except for own authority which should
1276            // have 1 block.
1277            assert_eq!(
1278                blocks.len(),
1279                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1280            );
1281            for b in blocks {
1282                assert_eq!(b.round(), round);
1283            }
1284        }
1285
1286        // Check rounds without uncommitted blocks.
1287        assert!(
1288            dag_state
1289                .get_uncommitted_blocks_at_round(non_existent_round)
1290                .is_empty()
1291        );
1292    }
1293
1294    #[tokio::test]
1295    async fn test_ancestors_at_uncommitted_round() {
1296        // Initialize DagState.
1297        let (context, _) = Context::new_for_test(4);
1298        let context = Arc::new(context);
1299        let store = Arc::new(MemStore::new());
1300        let mut dag_state = DagState::new(context.clone(), store.clone());
1301
1302        // Populate DagState.
1303
1304        // Round 10 refs will not have their blocks in DagState.
1305        let round_10_refs: Vec<_> = (0..4)
1306            .map(|a| {
1307                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1308                    .reference()
1309            })
1310            .collect();
1311
1312        // Round 11 blocks.
1313        let round_11 = vec![
1314            // This will connect to round 12.
1315            VerifiedBlock::new_for_test(
1316                TestBlock::new(11, 0)
1317                    .set_timestamp_ms(1100)
1318                    .set_ancestors(round_10_refs.clone())
1319                    .build(),
1320            ),
1321            // Slot(11, 1) has 3 blocks.
1322            // This will connect to round 12.
1323            VerifiedBlock::new_for_test(
1324                TestBlock::new(11, 1)
1325                    .set_timestamp_ms(1110)
1326                    .set_ancestors(round_10_refs.clone())
1327                    .build(),
1328            ),
1329            // This will connect to round 13.
1330            VerifiedBlock::new_for_test(
1331                TestBlock::new(11, 1)
1332                    .set_timestamp_ms(1111)
1333                    .set_ancestors(round_10_refs.clone())
1334                    .build(),
1335            ),
1336            // This will not connect to any block.
1337            VerifiedBlock::new_for_test(
1338                TestBlock::new(11, 1)
1339                    .set_timestamp_ms(1112)
1340                    .set_ancestors(round_10_refs.clone())
1341                    .build(),
1342            ),
1343            // This will not connect to any block.
1344            VerifiedBlock::new_for_test(
1345                TestBlock::new(11, 2)
1346                    .set_timestamp_ms(1120)
1347                    .set_ancestors(round_10_refs.clone())
1348                    .build(),
1349            ),
1350            // This will connect to round 12.
1351            VerifiedBlock::new_for_test(
1352                TestBlock::new(11, 3)
1353                    .set_timestamp_ms(1130)
1354                    .set_ancestors(round_10_refs.clone())
1355                    .build(),
1356            ),
1357        ];
1358
1359        // Round 12 blocks.
1360        let ancestors_for_round_12 = vec![
1361            round_11[0].reference(),
1362            round_11[1].reference(),
1363            round_11[5].reference(),
1364        ];
1365        let round_12 = vec![
1366            VerifiedBlock::new_for_test(
1367                TestBlock::new(12, 0)
1368                    .set_timestamp_ms(1200)
1369                    .set_ancestors(ancestors_for_round_12.clone())
1370                    .build(),
1371            ),
1372            VerifiedBlock::new_for_test(
1373                TestBlock::new(12, 2)
1374                    .set_timestamp_ms(1220)
1375                    .set_ancestors(ancestors_for_round_12.clone())
1376                    .build(),
1377            ),
1378            VerifiedBlock::new_for_test(
1379                TestBlock::new(12, 3)
1380                    .set_timestamp_ms(1230)
1381                    .set_ancestors(ancestors_for_round_12.clone())
1382                    .build(),
1383            ),
1384        ];
1385
1386        // Round 13 blocks.
1387        let ancestors_for_round_13 = vec![
1388            round_12[0].reference(),
1389            round_12[1].reference(),
1390            round_12[2].reference(),
1391            round_11[2].reference(),
1392        ];
1393        let round_13 = vec![
1394            VerifiedBlock::new_for_test(
1395                TestBlock::new(12, 1)
1396                    .set_timestamp_ms(1300)
1397                    .set_ancestors(ancestors_for_round_13.clone())
1398                    .build(),
1399            ),
1400            VerifiedBlock::new_for_test(
1401                TestBlock::new(12, 2)
1402                    .set_timestamp_ms(1320)
1403                    .set_ancestors(ancestors_for_round_13.clone())
1404                    .build(),
1405            ),
1406            VerifiedBlock::new_for_test(
1407                TestBlock::new(12, 3)
1408                    .set_timestamp_ms(1330)
1409                    .set_ancestors(ancestors_for_round_13.clone())
1410                    .build(),
1411            ),
1412        ];
1413
1414        // Round 14 anchor block.
1415        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1416        let anchor = VerifiedBlock::new_for_test(
1417            TestBlock::new(14, 1)
1418                .set_timestamp_ms(1410)
1419                .set_ancestors(ancestors_for_round_14)
1420                .build(),
1421        );
1422
1423        // Add all blocks (at and above round 11) to DagState.
1424        for b in round_11
1425            .iter()
1426            .chain(round_12.iter())
1427            .chain(round_13.iter())
1428            .chain([anchor.clone()].iter())
1429        {
1430            dag_state.accept_block(b.clone());
1431        }
1432
1433        // Check ancestors connected to anchor.
1434        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1435        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1436        ancestors_refs.sort();
1437        let mut expected_refs = vec![
1438            round_11[0].reference(),
1439            round_11[1].reference(),
1440            round_11[2].reference(),
1441            round_11[5].reference(),
1442        ];
1443        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1
1444        // & 2) might not be in right lexicographical order.
1445        assert_eq!(
1446            ancestors_refs, expected_refs,
1447            "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1448        );
1449    }
1450
1451    #[tokio::test]
1452    async fn test_contains_blocks_in_cache_or_store() {
1453        /// Only keep elements up to 2 rounds before the last committed round
1454        const CACHED_ROUNDS: Round = 2;
1455
1456        let (mut context, _) = Context::new_for_test(4);
1457        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1458
1459        let context = Arc::new(context);
1460        let store = Arc::new(MemStore::new());
1461        let mut dag_state = DagState::new(context.clone(), store.clone());
1462
1463        // Create test blocks for round 1 ~ 10
1464        let num_rounds: u32 = 10;
1465        let num_authorities: u32 = 4;
1466        let mut blocks = Vec::new();
1467
1468        for round in 1..=num_rounds {
1469            for author in 0..num_authorities {
1470                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1471                blocks.push(block);
1472            }
1473        }
1474
1475        // Now write in store the blocks from first 4 rounds and the rest to the dag
1476        // state
1477        blocks.clone().into_iter().for_each(|block| {
1478            if block.round() <= 4 {
1479                store
1480                    .write(WriteBatch::default().blocks(vec![block]))
1481                    .unwrap();
1482            } else {
1483                dag_state.accept_blocks(vec![block]);
1484            }
1485        });
1486
1487        // Now when trying to query whether we have all the blocks, we should
1488        // successfully retrieve a positive answer where the blocks of first 4
1489        // round should be found in DagState and the rest in store.
1490        let mut block_refs = blocks
1491            .iter()
1492            .map(|block| block.reference())
1493            .collect::<Vec<_>>();
1494        let result = dag_state.contains_blocks(block_refs.clone());
1495
1496        // Ensure everything is found
1497        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1498        assert_eq!(result, expected);
1499
1500        // Now try to ask also for one block ref that is neither in cache nor in store
1501        block_refs.insert(
1502            3,
1503            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1504        );
1505        let result = dag_state.contains_blocks(block_refs.clone());
1506
1507        // Then all should be found apart from the last one
1508        expected.insert(3, false);
1509        assert_eq!(result, expected.clone());
1510    }
1511
1512    #[tokio::test]
1513    async fn test_contains_cached_block_at_slot() {
1514        /// Only keep elements up to 2 rounds before the last committed round
1515        const CACHED_ROUNDS: Round = 2;
1516
1517        let num_authorities: u32 = 4;
1518        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1519        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1520
1521        let context = Arc::new(context);
1522        let store = Arc::new(MemStore::new());
1523        let mut dag_state = DagState::new(context.clone(), store.clone());
1524
1525        // Create test blocks for round 1 ~ 10
1526        let num_rounds: u32 = 10;
1527        let mut blocks = Vec::new();
1528
1529        for round in 1..=num_rounds {
1530            for author in 0..num_authorities {
1531                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1532                blocks.push(block.clone());
1533                dag_state.accept_block(block);
1534            }
1535        }
1536
1537        // Query for genesis round 0, genesis blocks should be returned
1538        for (author, _) in context.committee.authorities() {
1539            assert!(
1540                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1541                "Genesis should always be found"
1542            );
1543        }
1544
1545        // Now when trying to query whether we have all the blocks, we should
1546        // successfully retrieve a positive answer where the blocks of first 4
1547        // round should be found in DagState and the rest in store.
1548        let mut block_refs = blocks
1549            .iter()
1550            .map(|block| block.reference())
1551            .collect::<Vec<_>>();
1552
1553        for block_ref in block_refs.clone() {
1554            let slot = block_ref.into();
1555            let found = dag_state.contains_cached_block_at_slot(slot);
1556            assert!(found, "A block should be found at slot {slot}");
1557        }
1558
1559        // Now try to ask also for one block ref that is not in cache
1560        // Then all should be found apart from the last one
1561        block_refs.insert(
1562            3,
1563            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1564        );
1565        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1566        expected.insert(3, false);
1567
1568        // Attempt to check the same for via the contains slot method
1569        for block_ref in block_refs {
1570            let slot = block_ref.into();
1571            let found = dag_state.contains_cached_block_at_slot(slot);
1572
1573            assert_eq!(expected.remove(0), found);
1574        }
1575    }
1576
1577    #[tokio::test]
1578    #[should_panic(
1579        expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
1580    )]
1581    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1582        /// Only keep elements up to 2 rounds before the last committed round
1583        const CACHED_ROUNDS: Round = 2;
1584
1585        let (mut context, _) = Context::new_for_test(4);
1586        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1587        context
1588            .protocol_config
1589            .set_consensus_gc_depth_for_testing(0);
1590
1591        let context = Arc::new(context);
1592        let store = Arc::new(MemStore::new());
1593        let mut dag_state = DagState::new(context.clone(), store.clone());
1594
1595        // Create test blocks for round 1 ~ 10 for authority 0
1596        let mut blocks = Vec::new();
1597        for round in 1..=10 {
1598            let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1599            blocks.push(block.clone());
1600            dag_state.accept_block(block);
1601        }
1602
1603        // Now add a commit to trigger an eviction
1604        dag_state.add_commit(TrustedCommit::new_for_test(
1605            1 as CommitIndex,
1606            CommitDigest::MIN,
1607            0,
1608            blocks.last().unwrap().reference(),
1609            blocks
1610                .into_iter()
1611                .map(|block| block.reference())
1612                .collect::<Vec<_>>(),
1613        ));
1614
1615        dag_state.flush();
1616
1617        // When trying to request for authority 0 at block slot 8 it should panic, as
1618        // anything that is <= commit_round - cached_rounds = 10 - 2 = 8 should
1619        // be evicted
1620        let _ =
1621            dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1622    }
1623
1624    #[tokio::test]
1625    #[should_panic(
1626        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1627    )]
1628    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1629        /// Keep 2 rounds from the highest committed round. This is considered
1630        /// universal and minimum necessary blocks to hold
1631        /// for the correct node operation.
1632        const GC_DEPTH: u32 = 2;
1633        /// Keep at least 3 rounds in cache for each authority.
1634        const CACHED_ROUNDS: Round = 3;
1635
1636        let (mut context, _) = Context::new_for_test(4);
1637        context
1638            .protocol_config
1639            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1640        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1641
1642        let context = Arc::new(context);
1643        let store = Arc::new(MemStore::new());
1644        let mut dag_state = DagState::new(context.clone(), store.clone());
1645
1646        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 -
1647        // 6.
1648        let mut dag_builder = DagBuilder::new(context.clone());
1649        dag_builder.layers(1..=3).build();
1650        dag_builder
1651            .layers(4..=6)
1652            .authorities(vec![AuthorityIndex::new_for_test(0)])
1653            .skip_block()
1654            .build();
1655
1656        // Accept all blocks
1657        dag_builder
1658            .all_blocks()
1659            .into_iter()
1660            .for_each(|block| dag_state.accept_block(block));
1661
1662        // Now add a commit for leader round 5 to trigger an eviction
1663        dag_state.add_commit(TrustedCommit::new_for_test(
1664            1 as CommitIndex,
1665            CommitDigest::MIN,
1666            0,
1667            dag_builder.leader_block(5).unwrap().reference(),
1668            vec![],
1669        ));
1670
1671        dag_state.flush();
1672
1673        // Ensure that gc round has been updated
1674        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1675
1676        // Now what we expect to happen is for:
1677        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1678        // * Node 0 should have in cache blocks from it's latest round, 3, up to round
1679        //   1, which is the number of cached_rounds.
1680        for authority_index in 1..=3 {
1681            for round in 4..=6 {
1682                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1683                    round,
1684                    AuthorityIndex::new_for_test(authority_index)
1685                )));
1686            }
1687        }
1688
1689        for round in 1..=3 {
1690            assert!(
1691                dag_state.contains_cached_block_at_slot(Slot::new(
1692                    round,
1693                    AuthorityIndex::new_for_test(0)
1694                ))
1695            );
1696        }
1697
1698        // When trying to request for authority 1 at block slot 3 it should panic, as
1699        // anything that is <= 3 should be evicted
1700        let _ =
1701            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1702    }
1703
1704    #[tokio::test]
1705    async fn test_get_blocks_in_cache_or_store() {
1706        let (context, _) = Context::new_for_test(4);
1707        let context = Arc::new(context);
1708        let store = Arc::new(MemStore::new());
1709        let mut dag_state = DagState::new(context.clone(), store.clone());
1710
1711        // Create test blocks for round 1 ~ 10
1712        let num_rounds: u32 = 10;
1713        let num_authorities: u32 = 4;
1714        let mut blocks = Vec::new();
1715
1716        for round in 1..=num_rounds {
1717            for author in 0..num_authorities {
1718                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1719                blocks.push(block);
1720            }
1721        }
1722
1723        // Now write in store the blocks from first 4 rounds and the rest to the dag
1724        // state
1725        blocks.clone().into_iter().for_each(|block| {
1726            if block.round() <= 4 {
1727                store
1728                    .write(WriteBatch::default().blocks(vec![block]))
1729                    .unwrap();
1730            } else {
1731                dag_state.accept_blocks(vec![block]);
1732            }
1733        });
1734
1735        // Now when trying to query whether we have all the blocks, we should
1736        // successfully retrieve a positive answer where the blocks of first 4
1737        // round should be found in DagState and the rest in store.
1738        let mut block_refs = blocks
1739            .iter()
1740            .map(|block| block.reference())
1741            .collect::<Vec<_>>();
1742        let result = dag_state.get_blocks(&block_refs);
1743
1744        let mut expected = blocks
1745            .into_iter()
1746            .map(Some)
1747            .collect::<Vec<Option<VerifiedBlock>>>();
1748
1749        // Ensure everything is found
1750        assert_eq!(result, expected.clone());
1751
1752        // Now try to ask also for one block ref that is neither in cache nor in store
1753        block_refs.insert(
1754            3,
1755            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1756        );
1757        let result = dag_state.get_blocks(&block_refs);
1758
1759        // Then all should be found apart from the last one
1760        expected.insert(3, None);
1761        assert_eq!(result, expected);
1762    }
1763
1764    // TODO: Remove when DistributedVoteScoring is enabled.
1765    #[rstest]
1766    #[tokio::test]
1767    async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1768        telemetry_subscribers::init_for_testing();
1769        let num_authorities: u32 = 4;
1770        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1771        context
1772            .protocol_config
1773            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1774
1775        if gc_depth > 0 {
1776            context
1777                .protocol_config
1778                .set_consensus_gc_depth_for_testing(gc_depth);
1779        }
1780
1781        let context = Arc::new(context);
1782        let store = Arc::new(MemStore::new());
1783        let mut dag_state = DagState::new(context.clone(), store.clone());
1784
1785        // Create test blocks and commits for round 1 ~ 10
1786        let num_rounds: u32 = 10;
1787        let mut dag_builder = DagBuilder::new(context.clone());
1788        dag_builder.layers(1..=num_rounds).build();
1789        let mut commits = vec![];
1790
1791        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1792            commits.push(commit);
1793        }
1794
1795        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1796        let temp_commits = commits.split_off(5);
1797        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1798        for commit in commits.clone() {
1799            dag_state.add_commit(commit);
1800        }
1801
1802        // Flush the dag state
1803        dag_state.flush();
1804
1805        // Add the rest of the blocks and commits to the dag state
1806        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1807        for commit in temp_commits.clone() {
1808            dag_state.add_commit(commit);
1809        }
1810
1811        // All blocks should be found in DagState.
1812        let all_blocks = dag_builder.blocks(6..=num_rounds);
1813        let block_refs = all_blocks
1814            .iter()
1815            .map(|block| block.reference())
1816            .collect::<Vec<_>>();
1817
1818        let result = dag_state
1819            .get_blocks(&block_refs)
1820            .into_iter()
1821            .map(|b| b.unwrap())
1822            .collect::<Vec<_>>();
1823        assert_eq!(result, all_blocks);
1824
1825        // Last commit index should be 10.
1826        assert_eq!(dag_state.last_commit_index(), 10);
1827        assert_eq!(
1828            dag_state.last_committed_rounds(),
1829            dag_builder.last_committed_rounds.clone()
1830        );
1831
1832        // Destroy the dag state.
1833        drop(dag_state);
1834
1835        // Recover the state from the store
1836        let dag_state = DagState::new(context.clone(), store.clone());
1837
1838        // Blocks of first 5 rounds should be found in DagState.
1839        let blocks = dag_builder.blocks(1..=5);
1840        let block_refs = blocks
1841            .iter()
1842            .map(|block| block.reference())
1843            .collect::<Vec<_>>();
1844        let result = dag_state
1845            .get_blocks(&block_refs)
1846            .into_iter()
1847            .map(|b| b.unwrap())
1848            .collect::<Vec<_>>();
1849        assert_eq!(result, blocks);
1850
1851        // Blocks above round 5 should not be in DagState, because they are not flushed.
1852        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1853        let block_refs = missing_blocks
1854            .iter()
1855            .map(|block| block.reference())
1856            .collect::<Vec<_>>();
1857        let retrieved_blocks = dag_state
1858            .get_blocks(&block_refs)
1859            .into_iter()
1860            .flatten()
1861            .collect::<Vec<_>>();
1862        assert!(retrieved_blocks.is_empty());
1863
1864        // Last commit index should be 5.
1865        assert_eq!(dag_state.last_commit_index(), 5);
1866
1867        // This is the last_commit_rounds of the first 5 commits that were flushed
1868        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1869        assert_eq!(
1870            dag_state.last_committed_rounds(),
1871            expected_last_committed_rounds
1872        );
1873
1874        // Unscored subdags will be recovered based on the flushed commits and no commit
1875        // info
1876        assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1877    }
1878
1879    #[tokio::test]
1880    async fn test_flush_and_recovery() {
1881        telemetry_subscribers::init_for_testing();
1882        let num_authorities: u32 = 4;
1883        let (context, _) = Context::new_for_test(num_authorities as usize);
1884        let context = Arc::new(context);
1885        let store = Arc::new(MemStore::new());
1886        let mut dag_state = DagState::new(context.clone(), store.clone());
1887
1888        // Create test blocks and commits for round 1 ~ 10
1889        let num_rounds: u32 = 10;
1890        let mut dag_builder = DagBuilder::new(context.clone());
1891        dag_builder.layers(1..=num_rounds).build();
1892        let mut commits = vec![];
1893        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1894            commits.push(commit);
1895        }
1896
1897        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1898        let temp_commits = commits.split_off(5);
1899        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1900        for commit in commits.clone() {
1901            dag_state.add_commit(commit);
1902        }
1903
1904        // Flush the dag state
1905        dag_state.flush();
1906
1907        // Add the rest of the blocks and commits to the dag state
1908        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1909        for commit in temp_commits.clone() {
1910            dag_state.add_commit(commit);
1911        }
1912
1913        // All blocks should be found in DagState.
1914        let all_blocks = dag_builder.blocks(6..=num_rounds);
1915        let block_refs = all_blocks
1916            .iter()
1917            .map(|block| block.reference())
1918            .collect::<Vec<_>>();
1919        let result = dag_state
1920            .get_blocks(&block_refs)
1921            .into_iter()
1922            .map(|b| b.unwrap())
1923            .collect::<Vec<_>>();
1924        assert_eq!(result, all_blocks);
1925
1926        // Last commit index should be 10.
1927        assert_eq!(dag_state.last_commit_index(), 10);
1928        assert_eq!(
1929            dag_state.last_committed_rounds(),
1930            dag_builder.last_committed_rounds.clone()
1931        );
1932
1933        // Destroy the dag state.
1934        drop(dag_state);
1935
1936        // Recover the state from the store
1937        let dag_state = DagState::new(context.clone(), store.clone());
1938
1939        // Blocks of first 5 rounds should be found in DagState.
1940        let blocks = dag_builder.blocks(1..=5);
1941        let block_refs = blocks
1942            .iter()
1943            .map(|block| block.reference())
1944            .collect::<Vec<_>>();
1945        let result = dag_state
1946            .get_blocks(&block_refs)
1947            .into_iter()
1948            .map(|b| b.unwrap())
1949            .collect::<Vec<_>>();
1950        assert_eq!(result, blocks);
1951
1952        // Blocks above round 5 should not be in DagState, because they are not flushed.
1953        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1954        let block_refs = missing_blocks
1955            .iter()
1956            .map(|block| block.reference())
1957            .collect::<Vec<_>>();
1958        let retrieved_blocks = dag_state
1959            .get_blocks(&block_refs)
1960            .into_iter()
1961            .flatten()
1962            .collect::<Vec<_>>();
1963        assert!(retrieved_blocks.is_empty());
1964
1965        // Last commit index should be 5.
1966        assert_eq!(dag_state.last_commit_index(), 5);
1967
1968        // This is the last_commit_rounds of the first 5 commits that were flushed
1969        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1970        assert_eq!(
1971            dag_state.last_committed_rounds(),
1972            expected_last_committed_rounds
1973        );
1974        // Unscored subdags will be recovered based on the flushed commits and no commit
1975        // info
1976        assert_eq!(dag_state.scoring_subdags_count(), 5);
1977    }
1978
1979    #[tokio::test]
1980    async fn test_flush_and_recovery_gc_enabled() {
1981        telemetry_subscribers::init_for_testing();
1982
1983        const GC_DEPTH: u32 = 3;
1984        const CACHED_ROUNDS: u32 = 4;
1985
1986        let num_authorities: u32 = 4;
1987        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1988        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1989        context
1990            .protocol_config
1991            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1992        context
1993            .protocol_config
1994            .set_consensus_linearize_subdag_v2_for_testing(true);
1995
1996        let context = Arc::new(context);
1997
1998        let store = Arc::new(MemStore::new());
1999        let mut dag_state = DagState::new(context.clone(), store.clone());
2000
2001        let num_rounds: u32 = 10;
2002        let mut dag_builder = DagBuilder::new(context.clone());
2003        dag_builder.layers(1..=5).build();
2004        dag_builder
2005            .layers(6..=8)
2006            .authorities(vec![AuthorityIndex::new_for_test(0)])
2007            .skip_block()
2008            .build();
2009        dag_builder.layers(9..=num_rounds).build();
2010
2011        let mut commits = dag_builder
2012            .get_sub_dag_and_commits(1..=num_rounds)
2013            .into_iter()
2014            .map(|(_subdag, commit)| commit)
2015            .collect::<Vec<_>>();
2016
2017        // Add the blocks from first 8 rounds and first 7 commits to the dag state
2018        // It's 7 commits because we missing the commit of round 8 where authority 0 is
2019        // the leader, but produced no block
2020        let temp_commits = commits.split_off(7);
2021        dag_state.accept_blocks(dag_builder.blocks(1..=8));
2022        for commit in commits.clone() {
2023            dag_state.add_commit(commit);
2024        }
2025
2026        // Holds all the committed blocks from the commits that ended up being persisted
2027        // (flushed). Any commits that not flushed will not be considered.
2028        let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2029        for commit in commits.iter() {
2030            all_committed_blocks.extend(commit.blocks());
2031        }
2032        // Flush the dag state
2033        dag_state.flush();
2034
2035        // Add the rest of the blocks and commits to the dag state
2036        dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2037        for commit in temp_commits.clone() {
2038            dag_state.add_commit(commit);
2039        }
2040
2041        // All blocks should be found in DagState.
2042        let all_blocks = dag_builder.blocks(1..=num_rounds);
2043        let block_refs = all_blocks
2044            .iter()
2045            .map(|block| block.reference())
2046            .collect::<Vec<_>>();
2047        let result = dag_state
2048            .get_blocks(&block_refs)
2049            .into_iter()
2050            .map(|b| b.unwrap())
2051            .collect::<Vec<_>>();
2052        assert_eq!(result, all_blocks);
2053
2054        // Last commit index should be 9
2055        assert_eq!(dag_state.last_commit_index(), 9);
2056        assert_eq!(
2057            dag_state.last_committed_rounds(),
2058            dag_builder.last_committed_rounds.clone()
2059        );
2060
2061        // Destroy the dag state.
2062        drop(dag_state);
2063
2064        // Recover the state from the store
2065        let dag_state = DagState::new(context.clone(), store.clone());
2066
2067        // Blocks of first 5 rounds should be found in DagState.
2068        let blocks = dag_builder.blocks(1..=5);
2069        let block_refs = blocks
2070            .iter()
2071            .map(|block| block.reference())
2072            .collect::<Vec<_>>();
2073        let result = dag_state
2074            .get_blocks(&block_refs)
2075            .into_iter()
2076            .map(|b| b.unwrap())
2077            .collect::<Vec<_>>();
2078        assert_eq!(result, blocks);
2079
2080        // Blocks above round 9 should not be in DagState, because they are not flushed.
2081        let missing_blocks = dag_builder.blocks(9..=num_rounds);
2082        let block_refs = missing_blocks
2083            .iter()
2084            .map(|block| block.reference())
2085            .collect::<Vec<_>>();
2086        let retrieved_blocks = dag_state
2087            .get_blocks(&block_refs)
2088            .into_iter()
2089            .flatten()
2090            .collect::<Vec<_>>();
2091        assert!(retrieved_blocks.is_empty());
2092
2093        // Last commit index should be 7.
2094        assert_eq!(dag_state.last_commit_index(), 7);
2095
2096        // This is the last_commit_rounds of the first 7 commits that were flushed
2097        let expected_last_committed_rounds = vec![5, 6, 6, 7];
2098        assert_eq!(
2099            dag_state.last_committed_rounds(),
2100            expected_last_committed_rounds
2101        );
2102        // Unscored subdags will be recoverd based on the flushed commits and no commit
2103        // info
2104        assert_eq!(dag_state.scoring_subdags_count(), 7);
2105        // Ensure that cached blocks exist only for specific rounds per authority
2106        for (authority_index, _) in context.committee.authorities() {
2107            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2108
2109            // Ensure that eviction rounds have been properly recovered
2110            // DagState should hold cached blocks for authority 0 for rounds [2..=5] as no
2111            // higher blocks exist and due to CACHED_ROUNDS = 4 we want at max
2112            // to hold blocks for 4 rounds in cache.
2113            if authority_index == AuthorityIndex::new_for_test(0) {
2114                assert_eq!(blocks.len(), 4);
2115                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2116                assert!(
2117                    blocks
2118                        .into_iter()
2119                        .all(|block| block.round() >= 2 && block.round() <= 5)
2120                );
2121            } else {
2122                assert_eq!(blocks.len(), 4);
2123                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2124                assert!(
2125                    blocks
2126                        .into_iter()
2127                        .all(|block| block.round() >= 5 && block.round() <= 8)
2128                );
2129            }
2130        }
2131        // Ensure that committed blocks from > gc_round have been correctly marked as
2132        // committed according to committed sub dags
2133        let gc_round = dag_state.gc_round();
2134        assert_eq!(gc_round, 4);
2135        dag_state
2136            .recent_blocks
2137            .iter()
2138            .for_each(|(block_ref, block_info)| {
2139                if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2140                    assert!(
2141                        block_info.committed,
2142                        "Block {block_ref:?} should be committed"
2143                    );
2144                };
2145            });
2146    }
2147
2148    #[tokio::test]
2149    async fn test_block_info_as_committed() {
2150        let num_authorities: u32 = 4;
2151        let (context, _) = Context::new_for_test(num_authorities as usize);
2152        let context = Arc::new(context);
2153
2154        let store = Arc::new(MemStore::new());
2155        let mut dag_state = DagState::new(context.clone(), store.clone());
2156
2157        // Accept a block
2158        let block = VerifiedBlock::new_for_test(
2159            TestBlock::new(1, 0)
2160                .set_timestamp_ms(1000)
2161                .set_ancestors(vec![])
2162                .build(),
2163        );
2164
2165        dag_state.accept_block(block.clone());
2166
2167        // Query is committed
2168        assert!(!dag_state.is_committed(&block.reference()));
2169
2170        // Set block as committed for first time should return true
2171        assert!(
2172            dag_state.set_committed(&block.reference()),
2173            "Block should be successfully set as committed for first time"
2174        );
2175
2176        // Now it should appear as committed
2177        assert!(dag_state.is_committed(&block.reference()));
2178
2179        // Trying to set the block as committed again, it should return false.
2180        assert!(
2181            !dag_state.set_committed(&block.reference()),
2182            "Block should not be successfully set as committed"
2183        );
2184    }
2185
2186    #[tokio::test]
2187    async fn test_get_cached_blocks() {
2188        let (mut context, _) = Context::new_for_test(4);
2189        context.parameters.dag_state_cached_rounds = 5;
2190
2191        let context = Arc::new(context);
2192        let store = Arc::new(MemStore::new());
2193        let mut dag_state = DagState::new(context.clone(), store.clone());
2194
2195        // Create no blocks for authority 0
2196        // Create one block (round 10) for authority 1
2197        // Create two blocks (rounds 10,11) for authority 2
2198        // Create three blocks (rounds 10,11,12) for authority 3
2199        let mut all_blocks = Vec::new();
2200        for author in 1..=3 {
2201            for round in 10..(10 + author) {
2202                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2203                all_blocks.push(block.clone());
2204                dag_state.accept_block(block);
2205            }
2206        }
2207
2208        let cached_blocks =
2209            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2210        assert!(cached_blocks.is_empty());
2211
2212        let cached_blocks =
2213            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2214        assert_eq!(cached_blocks.len(), 1);
2215        assert_eq!(cached_blocks[0].round(), 10);
2216
2217        let cached_blocks =
2218            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2219        assert_eq!(cached_blocks.len(), 2);
2220        assert_eq!(cached_blocks[0].round(), 10);
2221        assert_eq!(cached_blocks[1].round(), 11);
2222
2223        let cached_blocks =
2224            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2225        assert_eq!(cached_blocks.len(), 1);
2226        assert_eq!(cached_blocks[0].round(), 11);
2227
2228        let cached_blocks =
2229            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2230        assert_eq!(cached_blocks.len(), 3);
2231        assert_eq!(cached_blocks[0].round(), 10);
2232        assert_eq!(cached_blocks[1].round(), 11);
2233        assert_eq!(cached_blocks[2].round(), 12);
2234
2235        let cached_blocks =
2236            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2237        assert_eq!(cached_blocks.len(), 1);
2238        assert_eq!(cached_blocks[0].round(), 12);
2239    }
2240
2241    #[rstest]
2242    #[tokio::test]
2243    async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2244        // GIVEN
2245        const CACHED_ROUNDS: Round = 2;
2246        let (mut context, _) = Context::new_for_test(4);
2247        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2248
2249        if gc_depth > 0 {
2250            context
2251                .protocol_config
2252                .set_consensus_gc_depth_for_testing(gc_depth);
2253        }
2254
2255        let context = Arc::new(context);
2256        let store = Arc::new(MemStore::new());
2257        let mut dag_state = DagState::new(context.clone(), store.clone());
2258
2259        // Create no blocks for authority 0
2260        // Create one block (round 1) for authority 1
2261        // Create two blocks (rounds 1,2) for authority 2
2262        // Create three blocks (rounds 1,2,3) for authority 3
2263        let dag_str = "DAG {
2264            Round 0 : { 4 },
2265            Round 1 : {
2266                B -> [*],
2267                C -> [*],
2268                D -> [*],
2269            },
2270            Round 2 : {
2271                C -> [*],
2272                D -> [*],
2273            },
2274            Round 3 : {
2275                D -> [*],
2276            },
2277        }";
2278
2279        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2280
2281        // Add equivocating block for round 2 authority 3
2282        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2283
2284        // Accept all blocks
2285        for block in dag_builder
2286            .all_blocks()
2287            .into_iter()
2288            .chain(std::iter::once(block))
2289        {
2290            dag_state.accept_block(block);
2291        }
2292
2293        dag_state.add_commit(TrustedCommit::new_for_test(
2294            1 as CommitIndex,
2295            CommitDigest::MIN,
2296            context.clock.timestamp_utc_ms(),
2297            dag_builder.leader_block(3).unwrap().reference(),
2298            vec![],
2299        ));
2300
2301        // WHEN search for the latest blocks
2302        let end_round = 4;
2303        let expected_rounds = vec![0, 1, 2, 3];
2304        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2305        // THEN
2306        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2307        assert_eq!(
2308            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2309            expected_rounds
2310        );
2311        assert_eq!(
2312            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2313            expected_excluded_and_equivocating_blocks
2314        );
2315
2316        // THEN
2317        for (i, expected_round) in expected_rounds.iter().enumerate() {
2318            let round = dag_state
2319                .get_last_cached_block_in_range(
2320                    context.committee.to_authority_index(i).unwrap(),
2321                    0,
2322                    end_round,
2323                )
2324                .map(|b| b.round())
2325                .unwrap_or_default();
2326            assert_eq!(round, *expected_round, "Authority {i}");
2327        }
2328
2329        // WHEN starting from round 2
2330        let start_round = 2;
2331        let expected_rounds = [0, 0, 2, 3];
2332
2333        // THEN
2334        for (i, expected_round) in expected_rounds.iter().enumerate() {
2335            let round = dag_state
2336                .get_last_cached_block_in_range(
2337                    context.committee.to_authority_index(i).unwrap(),
2338                    start_round,
2339                    end_round,
2340                )
2341                .map(|b| b.round())
2342                .unwrap_or_default();
2343            assert_eq!(round, *expected_round, "Authority {i}");
2344        }
2345
2346        // WHEN we flush the DagState - after adding a
2347        // commit with all the blocks, we expect this to trigger a clean up in
2348        // the internal cache. That will keep the all the blocks with rounds >=
2349        // authority_commit_round - CACHED_ROUND.
2350        //
2351        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and
2352        // for those who don't have blocks > gc_round, we'll keep
2353        // all their highest round blocks for CACHED_ROUNDS.
2354        dag_state.flush();
2355
2356        // AND we request before round 3
2357        let end_round = 3;
2358        let expected_rounds = vec![0, 1, 2, 2];
2359
2360        // THEN
2361        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2362        assert_eq!(
2363            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2364            expected_rounds
2365        );
2366
2367        // THEN
2368        for (i, expected_round) in expected_rounds.iter().enumerate() {
2369            let round = dag_state
2370                .get_last_cached_block_in_range(
2371                    context.committee.to_authority_index(i).unwrap(),
2372                    0,
2373                    end_round,
2374                )
2375                .map(|b| b.round())
2376                .unwrap_or_default();
2377            assert_eq!(round, *expected_round, "Authority {i}");
2378        }
2379    }
2380
2381    #[tokio::test]
2382    #[should_panic(
2383        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2384    )]
2385    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2386        // GIVEN
2387        const CACHED_ROUNDS: Round = 1;
2388        let (mut context, _) = Context::new_for_test(4);
2389        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2390        context
2391            .protocol_config
2392            .set_consensus_gc_depth_for_testing(0);
2393
2394        let context = Arc::new(context);
2395        let store = Arc::new(MemStore::new());
2396        let mut dag_state = DagState::new(context.clone(), store.clone());
2397
2398        // Create no blocks for authority 0
2399        // Create one block (round 1) for authority 1
2400        // Create two blocks (rounds 1,2) for authority 2
2401        // Create three blocks (rounds 1,2,3) for authority 3
2402        let mut all_blocks = Vec::new();
2403        for author in 1..=3 {
2404            for round in 1..=author {
2405                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2406                all_blocks.push(block.clone());
2407                dag_state.accept_block(block);
2408            }
2409        }
2410
2411        dag_state.add_commit(TrustedCommit::new_for_test(
2412            1 as CommitIndex,
2413            CommitDigest::MIN,
2414            0,
2415            all_blocks.last().unwrap().reference(),
2416            all_blocks
2417                .into_iter()
2418                .map(|block| block.reference())
2419                .collect::<Vec<_>>(),
2420        ));
2421
2422        // Flush the store so we keep in memory only the last 1 round from the last
2423        // commit for each authority.
2424        dag_state.flush();
2425
2426        // THEN the method should panic, as some authorities have already evicted rounds
2427        // <= round 2
2428        let end_round = 2;
2429        dag_state.get_last_cached_block_per_authority(end_round);
2430    }
2431
2432    #[tokio::test]
2433    #[should_panic(
2434        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2435    )]
2436    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
2437        // GIVEN
2438        const CACHED_ROUNDS: Round = 1;
2439        const GC_DEPTH: u32 = 1;
2440        let (mut context, _) = Context::new_for_test(4);
2441        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2442        context
2443            .protocol_config
2444            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2445
2446        let context = Arc::new(context);
2447        let store = Arc::new(MemStore::new());
2448        let mut dag_state = DagState::new(context.clone(), store.clone());
2449
2450        // Create no blocks for authority 0
2451        // Create one block (round 1) for authority 1
2452        // Create two blocks (rounds 1,2) for authority 2
2453        // Create three blocks (rounds 1,2,3) for authority 3
2454        let mut dag_builder = DagBuilder::new(context.clone());
2455        dag_builder
2456            .layers(1..=1)
2457            .authorities(vec![AuthorityIndex::new_for_test(0)])
2458            .skip_block()
2459            .build();
2460        dag_builder
2461            .layers(2..=2)
2462            .authorities(vec![
2463                AuthorityIndex::new_for_test(0),
2464                AuthorityIndex::new_for_test(1),
2465            ])
2466            .skip_block()
2467            .build();
2468        dag_builder
2469            .layers(3..=3)
2470            .authorities(vec![
2471                AuthorityIndex::new_for_test(0),
2472                AuthorityIndex::new_for_test(1),
2473                AuthorityIndex::new_for_test(2),
2474            ])
2475            .skip_block()
2476            .build();
2477
2478        // Accept all blocks
2479        for block in dag_builder.all_blocks() {
2480            dag_state.accept_block(block);
2481        }
2482
2483        dag_state.add_commit(TrustedCommit::new_for_test(
2484            1 as CommitIndex,
2485            CommitDigest::MIN,
2486            0,
2487            dag_builder.leader_block(3).unwrap().reference(),
2488            vec![],
2489        ));
2490
2491        // Flush the store so we update the evict rounds
2492        dag_state.flush();
2493
2494        // THEN the method should panic, as some authorities have already evicted rounds
2495        // <= round 2
2496        dag_state.get_last_cached_block_per_authority(2);
2497    }
2498
2499    #[tokio::test]
2500    async fn test_last_quorum() {
2501        // GIVEN
2502        let (context, _) = Context::new_for_test(4);
2503        let context = Arc::new(context);
2504        let store = Arc::new(MemStore::new());
2505        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2506
2507        // WHEN no blocks exist then genesis should be returned
2508        {
2509            let genesis = genesis_blocks(context.clone());
2510
2511            assert_eq!(dag_state.read().last_quorum(), genesis);
2512        }
2513
2514        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks
2515        // should be returned as quorum
2516        {
2517            let mut dag_builder = DagBuilder::new(context.clone());
2518            dag_builder
2519                .layers(1..=4)
2520                .build()
2521                .persist_layers(dag_state.clone());
2522            let round_4_blocks: Vec<_> = dag_builder
2523                .blocks(4..=4)
2524                .into_iter()
2525                .map(|block| block.reference())
2526                .collect();
2527
2528            let last_quorum = dag_state.read().last_quorum();
2529
2530            assert_eq!(
2531                last_quorum
2532                    .into_iter()
2533                    .map(|block| block.reference())
2534                    .collect::<Vec<_>>(),
2535                round_4_blocks
2536            );
2537        }
2538
2539        // WHEN adding one more block at round 5, still round 4 should be returned as
2540        // quorum
2541        {
2542            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2543            dag_state.write().accept_block(block);
2544
2545            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2546
2547            let last_quorum = dag_state.read().last_quorum();
2548
2549            assert_eq!(last_quorum, round_4_blocks);
2550        }
2551    }
2552
2553    #[tokio::test]
2554    async fn test_last_block_for_authority() {
2555        // GIVEN
2556        let (context, _) = Context::new_for_test(4);
2557        let context = Arc::new(context);
2558        let store = Arc::new(MemStore::new());
2559        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2560
2561        // WHEN no blocks exist then genesis should be returned
2562        {
2563            let genesis = genesis_blocks(context.clone());
2564            let my_genesis = genesis
2565                .into_iter()
2566                .find(|block| block.author() == context.own_index)
2567                .unwrap();
2568
2569            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2570        }
2571
2572        // WHEN adding some blocks for authorities, only the last ones should be
2573        // returned
2574        {
2575            // add blocks up to round 4
2576            let mut dag_builder = DagBuilder::new(context.clone());
2577            dag_builder
2578                .layers(1..=4)
2579                .build()
2580                .persist_layers(dag_state.clone());
2581
2582            // add block 5 for authority 0
2583            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2584            dag_state.write().accept_block(block);
2585
2586            let block = dag_state
2587                .read()
2588                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2589            assert_eq!(block.round(), 5);
2590
2591            for (authority_index, _) in context.committee.authorities() {
2592                let block = dag_state
2593                    .read()
2594                    .get_last_block_for_authority(authority_index);
2595
2596                if authority_index.value() == 0 {
2597                    assert_eq!(block.round(), 5);
2598                } else {
2599                    assert_eq!(block.round(), 4);
2600                }
2601            }
2602        }
2603    }
2604}