consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::max,
7    collections::{BTreeMap, BTreeSet, VecDeque},
8    ops::Bound::{Excluded, Included, Unbounded},
9    panic,
10    sync::Arc,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info};
18
19use crate::{
20    CommittedSubDag,
21    block::{
22        BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23        VerifiedBlock, genesis_blocks,
24    },
25    commit::{
26        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28    },
29    context::Context,
30    leader_scoring::{ReputationScores, ScoringSubdag},
31    storage::{Store, WriteBatch},
32    threshold_clock::ThresholdClock,
33};
34
35/// DagState provides the API to write and read accepted blocks from the DAG.
36/// Only uncommitted and last committed blocks are cached in memory.
37/// The rest of blocks are stored on disk.
38/// Refs to cached blocks and additional refs are cached as well, to speed up
39/// existence checks.
40///
41/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
42/// concurrent access from multiple components.
43pub(crate) struct DagState {
44    context: Arc<Context>,
45
46    // The genesis blocks
47    genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
50    // Note: all uncommitted blocks are kept in memory.
51    //
52    // When GC is enabled, this map has a different semantic. It holds all the recent data for each
53    // authority making sure that it always have available CACHED_ROUNDS worth of data. The
54    // entries are evicted based on the latest GC round, however the eviction process will respect
55    // the CACHED_ROUNDS. For each authority, blocks are only evicted when their round is less
56    // than or equal to both `gc_round`, and `highest authority round - cached rounds`.
57    // This ensures that the GC requirements are respected (we never clean up any block above
58    // `gc_round`), and there are enough blocks cached.
59    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61    // Indexes recent block refs by their authorities.
62    // Vec position corresponds to the authority index.
63    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65    // Keeps track of the threshold clock for proposing blocks.
66    threshold_clock: ThresholdClock,
67
68    // Keeps track of the highest round that has been evicted for each authority. Any blocks that
69    // are of round <= evict_round should be considered evicted, and if any exist we should not
70    // consider the causauly complete in the order they appear. The `evicted_rounds` size
71    // should be the same as the committee size.
72    evicted_rounds: Vec<Round>,
73
74    // Highest round of blocks accepted.
75    highest_accepted_round: Round,
76
77    // Last consensus commit of the dag.
78    last_commit: Option<TrustedCommit>,
79
80    // Last wall time when commit round advanced. Does not persist across restarts.
81    last_commit_round_advancement_time: Option<std::time::Instant>,
82
83    // Last committed rounds per authority.
84    last_committed_rounds: Vec<Round>,
85
86    /// The committed subdags that have been scored but scores have not been
87    /// used for leader schedule yet.
88    scoring_subdag: ScoringSubdag,
89    // TODO: Remove when DistributedVoteScoring is enabled.
90    /// The list of committed subdags that have been sequenced by the universal
91    /// committer but have yet to be used to calculate reputation scores for the
92    /// next leader schedule. Until then we consider it as "unscored" subdags.
93    unscored_committed_subdags: Vec<CommittedSubDag>,
94
95    // Commit votes pending to be included in new blocks.
96    // TODO: limit to 1st commit per round with multi-leader.
97    pending_commit_votes: VecDeque<CommitVote>,
98
99    // Data to be flushed to storage.
100    blocks_to_write: Vec<VerifiedBlock>,
101    commits_to_write: Vec<TrustedCommit>,
102
103    // Buffer the reputation scores & last_committed_rounds to be flushed with the
104    // next dag state flush. This is okay because we can recover reputation scores
105    // & last_committed_rounds from the commits as needed.
106    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108    // Persistent storage for blocks, commits and other consensus data.
109    store: Arc<dyn Store>,
110
111    // The number of cached rounds
112    cached_rounds: Round,
113}
114
115impl DagState {
116    /// Initializes DagState from storage.
117    pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119        let num_authorities = context.committee.size();
120
121        let genesis = genesis_blocks(context.clone())
122            .into_iter()
123            .map(|block| (block.reference(), block))
124            .collect();
125
126        let threshold_clock = ThresholdClock::new(1, context.clone());
127
128        let last_commit = store
129            .read_last_commit()
130            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
131
132        let commit_info = store
133            .read_last_commit_info()
134            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
135        let (mut last_committed_rounds, commit_recovery_start_index) =
136            if let Some((commit_ref, commit_info)) = commit_info {
137                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138                (commit_info.committed_rounds, commit_ref.index + 1)
139            } else {
140                tracing::info!("Found no stored CommitInfo to recover from");
141                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142            };
143
144        let mut unscored_committed_subdags = Vec::new();
145        let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147        if let Some(last_commit) = last_commit.as_ref() {
148            store
149                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
151                .iter()
152                .for_each(|commit| {
153                    for block_ref in commit.blocks() {
154                        last_committed_rounds[block_ref.author] =
155                            max(last_committed_rounds[block_ref.author], block_ref.round);
156                    }
157
158                    let committed_subdag =
159                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); // We don't need to recover reputation scores for unscored_committed_subdags
160                    unscored_committed_subdags.push(committed_subdag);
161                });
162        }
163
164        tracing::info!(
165            "DagState was initialized with the following state: \
166            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167            unscored_committed_subdags.len()
168        );
169
170        if context
171            .protocol_config
172            .consensus_distributed_vote_scoring_strategy()
173        {
174            scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175        }
176
177        let mut state = Self {
178            context,
179            genesis,
180            recent_blocks: BTreeMap::new(),
181            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182            threshold_clock,
183            highest_accepted_round: 0,
184            last_commit: last_commit.clone(),
185            last_commit_round_advancement_time: None,
186            last_committed_rounds: last_committed_rounds.clone(),
187            pending_commit_votes: VecDeque::new(),
188            blocks_to_write: vec![],
189            commits_to_write: vec![],
190            commit_info_to_write: vec![],
191            scoring_subdag,
192            unscored_committed_subdags,
193            store: store.clone(),
194            cached_rounds,
195            evicted_rounds: vec![0; num_authorities],
196        };
197
198        for (i, round) in last_committed_rounds.into_iter().enumerate() {
199            let authority_index = state.context.committee.to_authority_index(i).unwrap();
200            let (blocks, eviction_round) = if state.gc_enabled() {
201                // Find the latest block for the authority to calculate the eviction round. Then
202                // we want to scan and load the blocks from the eviction round and onwards only.
203                // As reminder, the eviction round is taking into account the gc_round.
204                let last_block = state
205                    .store
206                    .scan_last_blocks_by_author(authority_index, 1, None)
207                    .expect("Database error");
208                let last_block_round = last_block
209                    .last()
210                    .map(|b| b.round())
211                    .unwrap_or(GENESIS_ROUND);
212
213                let eviction_round = Self::gc_eviction_round(
214                    last_block_round,
215                    state.gc_round(),
216                    state.cached_rounds,
217                );
218                let blocks = state
219                    .store
220                    .scan_blocks_by_author(authority_index, eviction_round + 1)
221                    .expect("Database error");
222                (blocks, eviction_round)
223            } else {
224                let eviction_round = Self::eviction_round(round, cached_rounds);
225                let blocks = state
226                    .store
227                    .scan_blocks_by_author(authority_index, eviction_round + 1)
228                    .expect("Database error");
229                (blocks, eviction_round)
230            };
231
232            state.evicted_rounds[authority_index] = eviction_round;
233
234            // Update the block metadata for the authority.
235            for block in &blocks {
236                state.update_block_metadata(block);
237            }
238
239            info!(
240                "Recovered blocks {}: {:?}",
241                authority_index,
242                blocks
243                    .iter()
244                    .map(|b| b.reference())
245                    .collect::<Vec<BlockRef>>()
246            );
247        }
248
249        if state.gc_enabled() {
250            if let Some(last_commit) = last_commit {
251                let mut index = last_commit.index();
252                let gc_round = state.gc_round();
253                info!(
254                    "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
255                    index, gc_round
256                );
257
258                loop {
259                    let commits = store
260                        .scan_commits((index..=index).into())
261                        .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
262                    let Some(commit) = commits.first() else {
263                        info!(
264                            "Recovering finished up to index {index}, no more commits to recover"
265                        );
266                        break;
267                    };
268
269                    // Check the commit leader round to see if it is within the gc_round. If it is
270                    // not then we can stop the recovery process.
271                    if gc_round > 0 && commit.leader().round <= gc_round {
272                        info!(
273                            "Recovering finished, reached commit leader round {} <= gc_round {}",
274                            commit.leader().round,
275                            gc_round
276                        );
277                        break;
278                    }
279
280                    commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
281                        debug!(
282                            "Setting block {:?} as committed based on commit {:?}",
283                            block_ref,
284                            commit.index()
285                        );
286                        assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
287                    });
288
289                    // All commits are indexed starting from 1, so one reach zero exit.
290                    index = index.saturating_sub(1);
291                    if index == 0 {
292                        break;
293                    }
294                }
295            }
296        }
297
298        state
299    }
300
301    /// Accepts a block into DagState and keeps it in memory.
302    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
303        assert_ne!(
304            block.round(),
305            0,
306            "Genesis block should not be accepted into DAG."
307        );
308
309        let block_ref = block.reference();
310        if self.contains_block(&block_ref) {
311            return;
312        }
313
314        let now = self.context.clock.timestamp_utc_ms();
315        if block.timestamp_ms() > now {
316            panic!(
317                "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
318                block,
319                block.timestamp_ms(),
320                now,
321            );
322        }
323
324        // TODO: Move this check to core
325        // Ensure we don't write multiple blocks per slot for our own index
326        if block_ref.author == self.context.own_index {
327            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
328            assert!(
329                existing_blocks.is_empty(),
330                "Block Rejected! Attempted to add block {block:#?} to own slot where \
331                block(s) {existing_blocks:#?} already exists."
332            );
333        }
334        self.update_block_metadata(&block);
335        self.blocks_to_write.push(block);
336        let source = if self.context.own_index == block_ref.author {
337            "own"
338        } else {
339            "others"
340        };
341        self.context
342            .metrics
343            .node_metrics
344            .accepted_blocks
345            .with_label_values(&[source])
346            .inc();
347    }
348
349    /// Updates internal metadata for a block.
350    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
351        let block_ref = block.reference();
352        self.recent_blocks
353            .insert(block_ref, BlockInfo::new(block.clone()));
354        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
355        self.threshold_clock.add_block(block_ref);
356        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
357        self.context
358            .metrics
359            .node_metrics
360            .highest_accepted_round
361            .set(self.highest_accepted_round as i64);
362
363        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
364            .last()
365            .map(|block_ref| block_ref.round)
366            .expect("There should be by now at least one block ref");
367        let hostname = &self.context.committee.authority(block_ref.author).hostname;
368        self.context
369            .metrics
370            .node_metrics
371            .highest_accepted_authority_round
372            .with_label_values(&[hostname])
373            .set(highest_accepted_round_for_author as i64);
374    }
375
376    /// Accepts a blocks into DagState and keeps it in memory.
377    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
378        debug!(
379            "Accepting blocks: {}",
380            blocks.iter().map(|b| b.reference().to_string()).join(",")
381        );
382        for block in blocks {
383            self.accept_block(block);
384        }
385    }
386
387    /// Gets a block by checking cached recent blocks then storage.
388    /// Returns None when the block is not found.
389    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
390        self.get_blocks(&[*reference])
391            .pop()
392            .expect("Exactly one element should be returned")
393    }
394
395    /// Gets blocks by checking genesis, cached recent blocks in memory, then
396    /// storage. An element is None when the corresponding block is not
397    /// found.
398    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
399        let mut blocks = vec![None; block_refs.len()];
400        let mut missing = Vec::new();
401
402        for (index, block_ref) in block_refs.iter().enumerate() {
403            if block_ref.round == GENESIS_ROUND {
404                // Allow the caller to handle the invalid genesis ancestor error.
405                if let Some(block) = self.genesis.get(block_ref) {
406                    blocks[index] = Some(block.clone());
407                }
408                continue;
409            }
410            if let Some(block_info) = self.recent_blocks.get(block_ref) {
411                blocks[index] = Some(block_info.block.clone());
412                continue;
413            }
414            missing.push((index, block_ref));
415        }
416
417        if missing.is_empty() {
418            return blocks;
419        }
420
421        let missing_refs = missing
422            .iter()
423            .map(|(_, block_ref)| **block_ref)
424            .collect::<Vec<_>>();
425        let store_results = self
426            .store
427            .read_blocks(&missing_refs)
428            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
429        self.context
430            .metrics
431            .node_metrics
432            .dag_state_store_read_count
433            .with_label_values(&["get_blocks"])
434            .inc();
435
436        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
437            blocks[index] = result;
438        }
439
440        blocks
441    }
442
443    // Sets the block as committed in the cache. If the block is set as committed
444    // for first time, then true is returned, otherwise false is returned instead.
445    // Method will panic if the block is not found in the cache.
446    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
447        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
448            if !block_info.committed {
449                block_info.committed = true;
450                return true;
451            }
452            false
453        } else {
454            panic!(
455                "Block {:?} not found in cache to set as committed.",
456                block_ref
457            );
458        }
459    }
460
461    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
462        self.recent_blocks
463            .get(block_ref)
464            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
465            .committed
466    }
467
468    /// Gets all uncommitted blocks in a slot.
469    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
470    /// checked.
471    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
472        // TODO: either panic below when the slot is at or below the last committed
473        // round, or support reading from storage while limiting storage reads
474        // to edge cases.
475
476        let mut blocks = vec![];
477        for (_block_ref, block_info) in self.recent_blocks.range((
478            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
479            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
480        )) {
481            blocks.push(block_info.block.clone())
482        }
483        blocks
484    }
485
486    /// Gets all uncommitted blocks in a round.
487    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
488    /// checked.
489    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
490        if round <= self.last_commit_round() {
491            panic!("Round {} have committed blocks!", round);
492        }
493
494        let mut blocks = vec![];
495        for (_block_ref, block_info) in self.recent_blocks.range((
496            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
497            Excluded(BlockRef::new(
498                round + 1,
499                AuthorityIndex::ZERO,
500                BlockDigest::MIN,
501            )),
502        )) {
503            blocks.push(block_info.block.clone())
504        }
505        blocks
506    }
507
508    /// Gets all ancestors in the history of a block at a certain round.
509    pub(crate) fn ancestors_at_round(
510        &self,
511        later_block: &VerifiedBlock,
512        earlier_round: Round,
513    ) -> Vec<VerifiedBlock> {
514        // Iterate through ancestors of later_block in round descending order.
515        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
516        while !linked.is_empty() {
517            let round = linked.last().unwrap().round;
518            // Stop after finishing traversal for ancestors above earlier_round.
519            if round <= earlier_round {
520                break;
521            }
522            let block_ref = linked.pop_last().unwrap();
523            let Some(block) = self.get_block(&block_ref) else {
524                panic!("Block {:?} should exist in DAG!", block_ref);
525            };
526            linked.extend(block.ancestors().iter().cloned());
527        }
528        linked
529            .range((
530                Included(BlockRef::new(
531                    earlier_round,
532                    AuthorityIndex::ZERO,
533                    BlockDigest::MIN,
534                )),
535                Unbounded,
536            ))
537            .map(|r| {
538                self.get_block(r)
539                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
540                    .clone()
541            })
542            .collect()
543    }
544
545    /// Gets the last proposed block from this authority.
546    /// If no block is proposed yet, returns the genesis block.
547    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
548        self.get_last_block_for_authority(self.context.own_index)
549    }
550
551    /// Retrieves the last accepted block from the specified `authority`. If no
552    /// block is found in cache then the genesis block is returned as no other
553    /// block has been received from that authority.
554    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
555        if let Some(last) = self.recent_refs_by_authority[authority].last() {
556            return self
557                .recent_blocks
558                .get(last)
559                .expect("Block should be found in recent blocks")
560                .block
561                .clone();
562        }
563
564        // if none exists, then fallback to genesis
565        let (_, genesis_block) = self
566            .genesis
567            .iter()
568            .find(|(block_ref, _)| block_ref.author == authority)
569            .expect("Genesis should be found for authority {authority_index}");
570        genesis_block.clone()
571    }
572
573    /// Returns cached recent blocks from the specified authority.
574    /// Blocks returned are limited to round >= `start`, and cached.
575    /// NOTE: caller should not assume returned blocks are always chained.
576    /// "Disconnected" blocks can be returned when there are byzantine blocks,
577    /// or when received blocks are not deduped.
578    pub(crate) fn get_cached_blocks(
579        &self,
580        authority: AuthorityIndex,
581        start: Round,
582    ) -> Vec<VerifiedBlock> {
583        let mut blocks = vec![];
584        for block_ref in self.recent_refs_by_authority[authority].range((
585            Included(BlockRef::new(start, authority, BlockDigest::MIN)),
586            Unbounded,
587        )) {
588            let block_info = self
589                .recent_blocks
590                .get(block_ref)
591                .expect("Block should exist in recent blocks");
592            blocks.push(block_info.block.clone());
593        }
594        blocks
595    }
596
597    // Retrieves the cached block within the range [start_round, end_round) from a
598    // given authority. NOTE: end_round must be greater than GENESIS_ROUND.
599    pub(crate) fn get_last_cached_block_in_range(
600        &self,
601        authority: AuthorityIndex,
602        start_round: Round,
603        end_round: Round,
604    ) -> Option<VerifiedBlock> {
605        if end_round == GENESIS_ROUND {
606            panic!(
607                "Attempted to retrieve blocks earlier than the genesis round which is impossible"
608            );
609        }
610
611        let block_ref = self.recent_refs_by_authority[authority]
612            .range((
613                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
614                Excluded(BlockRef::new(
615                    end_round,
616                    AuthorityIndex::MIN,
617                    BlockDigest::MIN,
618                )),
619            ))
620            .last()?;
621
622        self.recent_blocks
623            .get(block_ref)
624            .map(|block_info| block_info.block.clone())
625    }
626
627    /// Returns the last block proposed per authority with `evicted round <
628    /// round < end_round`. The method is guaranteed to return results only
629    /// when the `end_round` is not earlier of the available cached data for
630    /// each authority (evicted round + 1), otherwise the method will panic.
631    /// It's the caller's responsibility to ensure that is not requesting for
632    /// earlier rounds. In case of equivocation for an authority's last
633    /// slot, one block will be returned (the last in order) and the other
634    /// equivocating blocks will be returned.
635    pub(crate) fn get_last_cached_block_per_authority(
636        &self,
637        end_round: Round,
638    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
639        // Initialize with the genesis blocks as fallback
640        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
641        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
642
643        if end_round == GENESIS_ROUND {
644            panic!(
645                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
646            );
647        }
648
649        if end_round == GENESIS_ROUND + 1 {
650            return blocks.into_iter().map(|b| (b, vec![])).collect();
651        }
652
653        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
654            let authority_index = self
655                .context
656                .committee
657                .to_authority_index(authority_index)
658                .unwrap();
659
660            let last_evicted_round = self.evicted_rounds[authority_index];
661            if end_round.saturating_sub(1) <= last_evicted_round {
662                panic!(
663                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
664                );
665            }
666
667            let block_ref_iter = block_refs
668                .range((
669                    Included(BlockRef::new(
670                        last_evicted_round + 1,
671                        authority_index,
672                        BlockDigest::MIN,
673                    )),
674                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
675                ))
676                .rev();
677
678            let mut last_round = 0;
679            for block_ref in block_ref_iter {
680                if last_round == 0 {
681                    last_round = block_ref.round;
682                    let block_info = self
683                        .recent_blocks
684                        .get(block_ref)
685                        .expect("Block should exist in recent blocks");
686                    blocks[authority_index] = block_info.block.clone();
687                    continue;
688                }
689                if block_ref.round < last_round {
690                    break;
691                }
692                equivocating_blocks[authority_index].push(*block_ref);
693            }
694        }
695
696        blocks.into_iter().zip(equivocating_blocks).collect()
697    }
698
699    /// Checks whether a block exists in the slot. The method checks only
700    /// against the cached data. If the user asks for a slot that is not
701    /// within the cached data then a panic is thrown.
702    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
703        // Always return true for genesis slots.
704        if slot.round == GENESIS_ROUND {
705            return true;
706        }
707
708        let eviction_round = self.evicted_rounds[slot.authority];
709        if slot.round <= eviction_round {
710            panic!(
711                "{}",
712                format!(
713                    "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
714                    if self.gc_enabled() { " gc " } else { " " }
715                )
716            );
717        }
718
719        let mut result = self.recent_refs_by_authority[slot.authority].range((
720            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
721            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
722        ));
723        result.next().is_some()
724    }
725
726    /// Checks whether the required blocks are in cache, if exist, or otherwise
727    /// will check in store. The method is not caching back the results, so
728    /// its expensive if keep asking for cache missing blocks.
729    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
730        let mut exist = vec![false; block_refs.len()];
731        let mut missing = Vec::new();
732
733        for (index, block_ref) in block_refs.into_iter().enumerate() {
734            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
735            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
736                exist[index] = true;
737            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
738            {
739                // Optimization: recent_refs contain the most recent blocks known to this
740                // authority. If a block ref is not found there and has a higher
741                // round, it definitely is missing from this authority and there
742                // is no need to check disk.
743                exist[index] = false;
744            } else {
745                missing.push((index, block_ref));
746            }
747        }
748
749        if missing.is_empty() {
750            return exist;
751        }
752
753        let missing_refs = missing
754            .iter()
755            .map(|(_, block_ref)| *block_ref)
756            .collect::<Vec<_>>();
757        let store_results = self
758            .store
759            .contains_blocks(&missing_refs)
760            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
761        self.context
762            .metrics
763            .node_metrics
764            .dag_state_store_read_count
765            .with_label_values(&["contains_blocks"])
766            .inc();
767
768        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
769            exist[index] = result;
770        }
771
772        exist
773    }
774
775    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
776        let blocks = self.contains_blocks(vec![*block_ref]);
777        blocks.first().cloned().unwrap()
778    }
779
780    pub(crate) fn threshold_clock_round(&self) -> Round {
781        self.threshold_clock.get_round()
782    }
783
784    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
785        self.threshold_clock.get_quorum_ts()
786    }
787
788    pub(crate) fn highest_accepted_round(&self) -> Round {
789        self.highest_accepted_round
790    }
791
792    // Buffers a new commit in memory and updates last committed rounds.
793    // REQUIRED: must not skip over any commit index.
794    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
795        if let Some(last_commit) = &self.last_commit {
796            if commit.index() <= last_commit.index() {
797                error!(
798                    "New commit index {} <= last commit index {}!",
799                    commit.index(),
800                    last_commit.index()
801                );
802                return;
803            }
804            assert_eq!(commit.index(), last_commit.index() + 1);
805
806            if commit.timestamp_ms() < last_commit.timestamp_ms() {
807                panic!(
808                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
809                    last_commit, commit
810                );
811            }
812        } else {
813            assert_eq!(commit.index(), 1);
814        }
815
816        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
817            previous_commit.round() < commit.round()
818        } else {
819            true
820        };
821
822        self.last_commit = Some(commit.clone());
823
824        if commit_round_advanced {
825            let now = std::time::Instant::now();
826            if let Some(previous_time) = self.last_commit_round_advancement_time {
827                self.context
828                    .metrics
829                    .node_metrics
830                    .commit_round_advancement_interval
831                    .observe(now.duration_since(previous_time).as_secs_f64())
832            }
833            self.last_commit_round_advancement_time = Some(now);
834        }
835
836        for block_ref in commit.blocks().iter() {
837            self.last_committed_rounds[block_ref.author] = max(
838                self.last_committed_rounds[block_ref.author],
839                block_ref.round,
840            );
841        }
842
843        for (i, round) in self.last_committed_rounds.iter().enumerate() {
844            let index = self.context.committee.to_authority_index(i).unwrap();
845            let hostname = &self.context.committee.authority(index).hostname;
846            self.context
847                .metrics
848                .node_metrics
849                .last_committed_authority_round
850                .with_label_values(&[hostname])
851                .set((*round).into());
852        }
853
854        self.pending_commit_votes.push_back(commit.reference());
855        self.commits_to_write.push(commit);
856    }
857
858    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
859        // We empty the unscored committed subdags to calculate reputation scores.
860        assert!(self.unscored_committed_subdags.is_empty());
861
862        // We create an empty scoring subdag once reputation scores are calculated.
863        // Note: It is okay for this to not be gated by protocol config as the
864        // scoring_subdag should be empty in either case at this point.
865        assert!(self.scoring_subdag.is_empty());
866
867        let commit_info = CommitInfo {
868            committed_rounds: self.last_committed_rounds.clone(),
869            reputation_scores,
870        };
871        let last_commit = self
872            .last_commit
873            .as_ref()
874            .expect("Last commit should already be set.");
875        self.commit_info_to_write
876            .push((last_commit.reference(), commit_info));
877    }
878
879    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
880        let mut votes = Vec::new();
881        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
882            votes.push(self.pending_commit_votes.pop_front().unwrap());
883        }
884        votes
885    }
886
887    /// Index of the last commit.
888    pub(crate) fn last_commit_index(&self) -> CommitIndex {
889        match &self.last_commit {
890            Some(commit) => commit.index(),
891            None => 0,
892        }
893    }
894
895    /// Digest of the last commit.
896    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
897        match &self.last_commit {
898            Some(commit) => commit.digest(),
899            None => CommitDigest::MIN,
900        }
901    }
902
903    /// Timestamp of the last commit.
904    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
905        match &self.last_commit {
906            Some(commit) => commit.timestamp_ms(),
907            None => 0,
908        }
909    }
910
911    /// Leader slot of the last commit.
912    pub(crate) fn last_commit_leader(&self) -> Slot {
913        match &self.last_commit {
914            Some(commit) => commit.leader().into(),
915            None => self
916                .genesis
917                .iter()
918                .next()
919                .map(|(genesis_ref, _)| *genesis_ref)
920                .expect("Genesis blocks should always be available.")
921                .into(),
922        }
923    }
924
925    /// Highest round where a block is committed, which is last commit's leader
926    /// round.
927    pub(crate) fn last_commit_round(&self) -> Round {
928        match &self.last_commit {
929            Some(commit) => commit.leader().round,
930            None => 0,
931        }
932    }
933
934    /// Last committed round per authority.
935    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
936        self.last_committed_rounds.clone()
937    }
938
939    /// The GC round is the highest round that blocks of equal or lower round
940    /// are considered obsolete and no longer possible to be committed.
941    /// There is no meaning accepting any blocks with round <= gc_round. The
942    /// Garbage Collection (GC) round is calculated based on the latest
943    /// committed leader round. When GC is disabled that will return the genesis
944    /// round.
945    pub(crate) fn gc_round(&self) -> Round {
946        self.calculate_gc_round(self.last_commit_round())
947    }
948
949    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
950        let gc_depth = self.context.protocol_config.gc_depth();
951        if gc_depth > 0 {
952            // GC is enabled, only then calculate the diff
953            commit_round.saturating_sub(gc_depth)
954        } else {
955            // Otherwise just return genesis round. That also acts as a safety mechanism so
956            // we never attempt to truncate anything even accidentally.
957            GENESIS_ROUND
958        }
959    }
960
961    pub(crate) fn gc_enabled(&self) -> bool {
962        self.context.protocol_config.gc_depth() > 0
963    }
964
965    /// After each flush, DagState becomes persisted in storage and it expected
966    /// to recover all internal states from storage after restarts.
967    pub(crate) fn flush(&mut self) {
968        let _s = self
969            .context
970            .metrics
971            .node_metrics
972            .scope_processing_time
973            .with_label_values(&["DagState::flush"])
974            .start_timer();
975        // Flush buffered data to storage.
976        let blocks = std::mem::take(&mut self.blocks_to_write);
977        let commits = std::mem::take(&mut self.commits_to_write);
978        let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
979
980        if blocks.is_empty() && commits.is_empty() {
981            return;
982        }
983        debug!(
984            "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
985            blocks.len(),
986            blocks.iter().map(|b| b.reference().to_string()).join(","),
987            commits.len(),
988            commits.iter().map(|c| c.reference().to_string()).join(","),
989            commit_info_to_write.len(),
990            commit_info_to_write
991                .iter()
992                .map(|(commit_ref, _)| commit_ref.to_string())
993                .join(","),
994        );
995        self.store
996            .write(WriteBatch::new(blocks, commits, commit_info_to_write))
997            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
998        self.context
999            .metrics
1000            .node_metrics
1001            .dag_state_store_write_count
1002            .inc();
1003
1004        // Clean up old cached data. After flushing, all cached blocks are guaranteed to
1005        // be persisted.
1006        for (authority_index, _) in self.context.committee.authorities() {
1007            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1008            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1009                if block_ref.round <= eviction_round {
1010                    self.recent_blocks.remove(block_ref);
1011                    self.recent_refs_by_authority[authority_index].pop_first();
1012                } else {
1013                    break;
1014                }
1015            }
1016            self.evicted_rounds[authority_index] = eviction_round;
1017        }
1018
1019        let metrics = &self.context.metrics.node_metrics;
1020        metrics
1021            .dag_state_recent_blocks
1022            .set(self.recent_blocks.len() as i64);
1023        metrics.dag_state_recent_refs.set(
1024            self.recent_refs_by_authority
1025                .iter()
1026                .map(BTreeSet::len)
1027                .sum::<usize>() as i64,
1028        );
1029    }
1030
1031    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1032        self.store
1033            .read_last_commit_info()
1034            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1035    }
1036
1037    // TODO: Remove four methods below this when DistributedVoteScoring is enabled.
1038    pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1039        self.unscored_committed_subdags.len() as u64
1040    }
1041
1042    #[cfg(test)]
1043    pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1044        self.unscored_committed_subdags.clone()
1045    }
1046
1047    pub(crate) fn add_unscored_committed_subdags(
1048        &mut self,
1049        committed_subdags: Vec<CommittedSubDag>,
1050    ) {
1051        self.unscored_committed_subdags.extend(committed_subdags);
1052    }
1053
1054    pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1055        std::mem::take(&mut self.unscored_committed_subdags)
1056    }
1057
1058    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1059        self.scoring_subdag.add_subdags(scoring_subdags);
1060    }
1061
1062    pub(crate) fn clear_scoring_subdag(&mut self) {
1063        self.scoring_subdag.clear();
1064    }
1065
1066    pub(crate) fn scoring_subdags_count(&self) -> usize {
1067        self.scoring_subdag.scored_subdags_count()
1068    }
1069
1070    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1071        self.scoring_subdag.is_empty()
1072    }
1073
1074    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1075        self.scoring_subdag.calculate_distributed_vote_scores()
1076    }
1077
1078    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1079        self.scoring_subdag
1080            .commit_range
1081            .as_ref()
1082            .expect("commit range should exist for scoring subdag")
1083            .end()
1084    }
1085
1086    /// The last round that should get evicted after a cache clean up operation.
1087    /// After this round we are guaranteed to have all the produced blocks
1088    /// from that authority. For any round that is <= `last_evicted_round`
1089    /// we don't have such guarantees as out of order blocks might exist.
1090    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1091        if self.gc_enabled() {
1092            let last_round = self.recent_refs_by_authority[authority_index]
1093                .last()
1094                .map(|block_ref| block_ref.round)
1095                .unwrap_or(GENESIS_ROUND);
1096
1097            Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1098        } else {
1099            let commit_round = self.last_committed_rounds[authority_index];
1100            Self::eviction_round(commit_round, self.cached_rounds)
1101        }
1102    }
1103
1104    /// Calculates the last eviction round based on the provided `commit_round`.
1105    /// Any blocks with round <= the evict round have been cleaned up.
1106    fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1107        commit_round.saturating_sub(cached_rounds)
1108    }
1109
1110    /// Calculates the eviction round for the given authority. The goal is to
1111    /// keep at least `cached_rounds` of the latest blocks in the cache (if
1112    /// enough data is available), while evicting blocks with rounds <=
1113    /// `gc_round` when possible.
1114    fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1115        gc_round.min(last_round.saturating_sub(cached_rounds))
1116    }
1117
1118    /// Detects and returns the blocks of the round that forms the last quorum.
1119    /// The method will return the quorum even if that's genesis.
1120    #[cfg(test)]
1121    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1122        // the quorum should exist either on the highest accepted round or the one
1123        // before. If we fail to detect a quorum then it means that our DAG has
1124        // advanced with missing causal history.
1125        for round in
1126            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1127        {
1128            if round == GENESIS_ROUND {
1129                return self.genesis_blocks();
1130            }
1131            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1132            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1133
1134            // Since the minimum wave length is 3 we expect to find a quorum in the
1135            // uncommitted rounds.
1136            let blocks = self.get_uncommitted_blocks_at_round(round);
1137            for block in &blocks {
1138                if quorum.add(block.author(), &self.context.committee) {
1139                    return blocks;
1140                }
1141            }
1142        }
1143
1144        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1145    }
1146
1147    #[cfg(test)]
1148    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1149        self.genesis.values().cloned().collect()
1150    }
1151
1152    #[cfg(test)]
1153    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1154        self.last_commit = Some(commit);
1155    }
1156}
1157
1158struct BlockInfo {
1159    block: VerifiedBlock,
1160    // Whether the block has been committed
1161    committed: bool,
1162}
1163
1164impl BlockInfo {
1165    fn new(block: VerifiedBlock) -> Self {
1166        Self {
1167            block,
1168            committed: false,
1169        }
1170    }
1171}
1172
1173#[cfg(test)]
1174mod test {
1175    use std::vec;
1176
1177    use parking_lot::RwLock;
1178    use rstest::rstest;
1179
1180    use super::*;
1181    use crate::{
1182        block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1183        storage::{WriteBatch, mem_store::MemStore},
1184        test_dag_builder::DagBuilder,
1185        test_dag_parser::parse_dag,
1186    };
1187
1188    #[tokio::test]
1189    async fn test_get_blocks() {
1190        let (context, _) = Context::new_for_test(4);
1191        let context = Arc::new(context);
1192        let store = Arc::new(MemStore::new());
1193        let mut dag_state = DagState::new(context.clone(), store.clone());
1194        let own_index = AuthorityIndex::new_for_test(0);
1195
1196        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1197        let num_rounds: u32 = 10;
1198        let non_existent_round: u32 = 100;
1199        let num_authorities: u32 = 3;
1200        let num_blocks_per_slot: usize = 3;
1201        let mut blocks = BTreeMap::new();
1202        for round in 1..=num_rounds {
1203            for author in 0..num_authorities {
1204                // Create 3 blocks per slot, with different timestamps and digests.
1205                let base_ts = round as BlockTimestampMs * 1000;
1206                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1207                    let block = VerifiedBlock::new_for_test(
1208                        TestBlock::new(round, author)
1209                            .set_timestamp_ms(timestamp)
1210                            .build(),
1211                    );
1212                    dag_state.accept_block(block.clone());
1213                    blocks.insert(block.reference(), block);
1214
1215                    // Only write one block per slot for own index
1216                    if AuthorityIndex::new_for_test(author) == own_index {
1217                        break;
1218                    }
1219                }
1220            }
1221        }
1222
1223        // Check uncommitted blocks that exist.
1224        for (r, block) in &blocks {
1225            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1226        }
1227
1228        // Check uncommitted blocks that do not exist.
1229        let last_ref = blocks.keys().last().unwrap();
1230        assert!(
1231            dag_state
1232                .get_block(&BlockRef::new(
1233                    last_ref.round,
1234                    last_ref.author,
1235                    BlockDigest::MIN
1236                ))
1237                .is_none()
1238        );
1239
1240        // Check slots with uncommitted blocks.
1241        for round in 1..=num_rounds {
1242            for author in 0..num_authorities {
1243                let slot = Slot::new(
1244                    round,
1245                    context
1246                        .committee
1247                        .to_authority_index(author as usize)
1248                        .unwrap(),
1249                );
1250                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1251
1252                // We only write one block per slot for own index
1253                if AuthorityIndex::new_for_test(author) == own_index {
1254                    assert_eq!(blocks.len(), 1);
1255                } else {
1256                    assert_eq!(blocks.len(), num_blocks_per_slot);
1257                }
1258
1259                for b in blocks {
1260                    assert_eq!(b.round(), round);
1261                    assert_eq!(
1262                        b.author(),
1263                        context
1264                            .committee
1265                            .to_authority_index(author as usize)
1266                            .unwrap()
1267                    );
1268                }
1269            }
1270        }
1271
1272        // Check slots without uncommitted blocks.
1273        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1274        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1275
1276        // Check rounds with uncommitted blocks.
1277        for round in 1..=num_rounds {
1278            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1279            // Expect 3 blocks per authority except for own authority which should
1280            // have 1 block.
1281            assert_eq!(
1282                blocks.len(),
1283                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1284            );
1285            for b in blocks {
1286                assert_eq!(b.round(), round);
1287            }
1288        }
1289
1290        // Check rounds without uncommitted blocks.
1291        assert!(
1292            dag_state
1293                .get_uncommitted_blocks_at_round(non_existent_round)
1294                .is_empty()
1295        );
1296    }
1297
1298    #[tokio::test]
1299    async fn test_ancestors_at_uncommitted_round() {
1300        // Initialize DagState.
1301        let (context, _) = Context::new_for_test(4);
1302        let context = Arc::new(context);
1303        let store = Arc::new(MemStore::new());
1304        let mut dag_state = DagState::new(context.clone(), store.clone());
1305
1306        // Populate DagState.
1307
1308        // Round 10 refs will not have their blocks in DagState.
1309        let round_10_refs: Vec<_> = (0..4)
1310            .map(|a| {
1311                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1312                    .reference()
1313            })
1314            .collect();
1315
1316        // Round 11 blocks.
1317        let round_11 = vec![
1318            // This will connect to round 12.
1319            VerifiedBlock::new_for_test(
1320                TestBlock::new(11, 0)
1321                    .set_timestamp_ms(1100)
1322                    .set_ancestors(round_10_refs.clone())
1323                    .build(),
1324            ),
1325            // Slot(11, 1) has 3 blocks.
1326            // This will connect to round 12.
1327            VerifiedBlock::new_for_test(
1328                TestBlock::new(11, 1)
1329                    .set_timestamp_ms(1110)
1330                    .set_ancestors(round_10_refs.clone())
1331                    .build(),
1332            ),
1333            // This will connect to round 13.
1334            VerifiedBlock::new_for_test(
1335                TestBlock::new(11, 1)
1336                    .set_timestamp_ms(1111)
1337                    .set_ancestors(round_10_refs.clone())
1338                    .build(),
1339            ),
1340            // This will not connect to any block.
1341            VerifiedBlock::new_for_test(
1342                TestBlock::new(11, 1)
1343                    .set_timestamp_ms(1112)
1344                    .set_ancestors(round_10_refs.clone())
1345                    .build(),
1346            ),
1347            // This will not connect to any block.
1348            VerifiedBlock::new_for_test(
1349                TestBlock::new(11, 2)
1350                    .set_timestamp_ms(1120)
1351                    .set_ancestors(round_10_refs.clone())
1352                    .build(),
1353            ),
1354            // This will connect to round 12.
1355            VerifiedBlock::new_for_test(
1356                TestBlock::new(11, 3)
1357                    .set_timestamp_ms(1130)
1358                    .set_ancestors(round_10_refs.clone())
1359                    .build(),
1360            ),
1361        ];
1362
1363        // Round 12 blocks.
1364        let ancestors_for_round_12 = vec![
1365            round_11[0].reference(),
1366            round_11[1].reference(),
1367            round_11[5].reference(),
1368        ];
1369        let round_12 = vec![
1370            VerifiedBlock::new_for_test(
1371                TestBlock::new(12, 0)
1372                    .set_timestamp_ms(1200)
1373                    .set_ancestors(ancestors_for_round_12.clone())
1374                    .build(),
1375            ),
1376            VerifiedBlock::new_for_test(
1377                TestBlock::new(12, 2)
1378                    .set_timestamp_ms(1220)
1379                    .set_ancestors(ancestors_for_round_12.clone())
1380                    .build(),
1381            ),
1382            VerifiedBlock::new_for_test(
1383                TestBlock::new(12, 3)
1384                    .set_timestamp_ms(1230)
1385                    .set_ancestors(ancestors_for_round_12.clone())
1386                    .build(),
1387            ),
1388        ];
1389
1390        // Round 13 blocks.
1391        let ancestors_for_round_13 = vec![
1392            round_12[0].reference(),
1393            round_12[1].reference(),
1394            round_12[2].reference(),
1395            round_11[2].reference(),
1396        ];
1397        let round_13 = vec![
1398            VerifiedBlock::new_for_test(
1399                TestBlock::new(12, 1)
1400                    .set_timestamp_ms(1300)
1401                    .set_ancestors(ancestors_for_round_13.clone())
1402                    .build(),
1403            ),
1404            VerifiedBlock::new_for_test(
1405                TestBlock::new(12, 2)
1406                    .set_timestamp_ms(1320)
1407                    .set_ancestors(ancestors_for_round_13.clone())
1408                    .build(),
1409            ),
1410            VerifiedBlock::new_for_test(
1411                TestBlock::new(12, 3)
1412                    .set_timestamp_ms(1330)
1413                    .set_ancestors(ancestors_for_round_13.clone())
1414                    .build(),
1415            ),
1416        ];
1417
1418        // Round 14 anchor block.
1419        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1420        let anchor = VerifiedBlock::new_for_test(
1421            TestBlock::new(14, 1)
1422                .set_timestamp_ms(1410)
1423                .set_ancestors(ancestors_for_round_14)
1424                .build(),
1425        );
1426
1427        // Add all blocks (at and above round 11) to DagState.
1428        for b in round_11
1429            .iter()
1430            .chain(round_12.iter())
1431            .chain(round_13.iter())
1432            .chain([anchor.clone()].iter())
1433        {
1434            dag_state.accept_block(b.clone());
1435        }
1436
1437        // Check ancestors connected to anchor.
1438        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1439        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1440        ancestors_refs.sort();
1441        let mut expected_refs = vec![
1442            round_11[0].reference(),
1443            round_11[1].reference(),
1444            round_11[2].reference(),
1445            round_11[5].reference(),
1446        ];
1447        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1
1448        // & 2) might not be in right lexicographical order.
1449        assert_eq!(
1450            ancestors_refs, expected_refs,
1451            "Expected round 11 ancestors: {:?}. Got: {:?}",
1452            expected_refs, ancestors_refs
1453        );
1454    }
1455
1456    #[tokio::test]
1457    async fn test_contains_blocks_in_cache_or_store() {
1458        /// Only keep elements up to 2 rounds before the last committed round
1459        const CACHED_ROUNDS: Round = 2;
1460
1461        let (mut context, _) = Context::new_for_test(4);
1462        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1463
1464        let context = Arc::new(context);
1465        let store = Arc::new(MemStore::new());
1466        let mut dag_state = DagState::new(context.clone(), store.clone());
1467
1468        // Create test blocks for round 1 ~ 10
1469        let num_rounds: u32 = 10;
1470        let num_authorities: u32 = 4;
1471        let mut blocks = Vec::new();
1472
1473        for round in 1..=num_rounds {
1474            for author in 0..num_authorities {
1475                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1476                blocks.push(block);
1477            }
1478        }
1479
1480        // Now write in store the blocks from first 4 rounds and the rest to the dag
1481        // state
1482        blocks.clone().into_iter().for_each(|block| {
1483            if block.round() <= 4 {
1484                store
1485                    .write(WriteBatch::default().blocks(vec![block]))
1486                    .unwrap();
1487            } else {
1488                dag_state.accept_blocks(vec![block]);
1489            }
1490        });
1491
1492        // Now when trying to query whether we have all the blocks, we should
1493        // successfully retrieve a positive answer where the blocks of first 4
1494        // round should be found in DagState and the rest in store.
1495        let mut block_refs = blocks
1496            .iter()
1497            .map(|block| block.reference())
1498            .collect::<Vec<_>>();
1499        let result = dag_state.contains_blocks(block_refs.clone());
1500
1501        // Ensure everything is found
1502        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1503        assert_eq!(result, expected);
1504
1505        // Now try to ask also for one block ref that is neither in cache nor in store
1506        block_refs.insert(
1507            3,
1508            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1509        );
1510        let result = dag_state.contains_blocks(block_refs.clone());
1511
1512        // Then all should be found apart from the last one
1513        expected.insert(3, false);
1514        assert_eq!(result, expected.clone());
1515    }
1516
1517    #[tokio::test]
1518    async fn test_contains_cached_block_at_slot() {
1519        /// Only keep elements up to 2 rounds before the last committed round
1520        const CACHED_ROUNDS: Round = 2;
1521
1522        let num_authorities: u32 = 4;
1523        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1524        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1525
1526        let context = Arc::new(context);
1527        let store = Arc::new(MemStore::new());
1528        let mut dag_state = DagState::new(context.clone(), store.clone());
1529
1530        // Create test blocks for round 1 ~ 10
1531        let num_rounds: u32 = 10;
1532        let mut blocks = Vec::new();
1533
1534        for round in 1..=num_rounds {
1535            for author in 0..num_authorities {
1536                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1537                blocks.push(block.clone());
1538                dag_state.accept_block(block);
1539            }
1540        }
1541
1542        // Query for genesis round 0, genesis blocks should be returned
1543        for (author, _) in context.committee.authorities() {
1544            assert!(
1545                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1546                "Genesis should always be found"
1547            );
1548        }
1549
1550        // Now when trying to query whether we have all the blocks, we should
1551        // successfully retrieve a positive answer where the blocks of first 4
1552        // round should be found in DagState and the rest in store.
1553        let mut block_refs = blocks
1554            .iter()
1555            .map(|block| block.reference())
1556            .collect::<Vec<_>>();
1557
1558        for block_ref in block_refs.clone() {
1559            let slot = block_ref.into();
1560            let found = dag_state.contains_cached_block_at_slot(slot);
1561            assert!(found, "A block should be found at slot {}", slot);
1562        }
1563
1564        // Now try to ask also for one block ref that is not in cache
1565        // Then all should be found apart from the last one
1566        block_refs.insert(
1567            3,
1568            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1569        );
1570        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1571        expected.insert(3, false);
1572
1573        // Attempt to check the same for via the contains slot method
1574        for block_ref in block_refs {
1575            let slot = block_ref.into();
1576            let found = dag_state.contains_cached_block_at_slot(slot);
1577
1578            assert_eq!(expected.remove(0), found);
1579        }
1580    }
1581
1582    #[tokio::test]
1583    #[should_panic(
1584        expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
1585    )]
1586    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1587        /// Only keep elements up to 2 rounds before the last committed round
1588        const CACHED_ROUNDS: Round = 2;
1589
1590        let (mut context, _) = Context::new_for_test(4);
1591        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1592        context
1593            .protocol_config
1594            .set_consensus_gc_depth_for_testing(0);
1595
1596        let context = Arc::new(context);
1597        let store = Arc::new(MemStore::new());
1598        let mut dag_state = DagState::new(context.clone(), store.clone());
1599
1600        // Create test blocks for round 1 ~ 10 for authority 0
1601        let mut blocks = Vec::new();
1602        for round in 1..=10 {
1603            let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1604            blocks.push(block.clone());
1605            dag_state.accept_block(block);
1606        }
1607
1608        // Now add a commit to trigger an eviction
1609        dag_state.add_commit(TrustedCommit::new_for_test(
1610            1 as CommitIndex,
1611            CommitDigest::MIN,
1612            0,
1613            blocks.last().unwrap().reference(),
1614            blocks
1615                .into_iter()
1616                .map(|block| block.reference())
1617                .collect::<Vec<_>>(),
1618        ));
1619
1620        dag_state.flush();
1621
1622        // When trying to request for authority 0 at block slot 8 it should panic, as
1623        // anything that is <= commit_round - cached_rounds = 10 - 2 = 8 should
1624        // be evicted
1625        let _ =
1626            dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1627    }
1628
1629    #[tokio::test]
1630    #[should_panic(
1631        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1632    )]
1633    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1634        /// Keep 2 rounds from the highest committed round. This is considered
1635        /// universal and minimum necessary blocks to hold
1636        /// for the correct node operation.
1637        const GC_DEPTH: u32 = 2;
1638        /// Keep at least 3 rounds in cache for each authority.
1639        const CACHED_ROUNDS: Round = 3;
1640
1641        let (mut context, _) = Context::new_for_test(4);
1642        context
1643            .protocol_config
1644            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1645        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1646
1647        let context = Arc::new(context);
1648        let store = Arc::new(MemStore::new());
1649        let mut dag_state = DagState::new(context.clone(), store.clone());
1650
1651        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 -
1652        // 6.
1653        let mut dag_builder = DagBuilder::new(context.clone());
1654        dag_builder.layers(1..=3).build();
1655        dag_builder
1656            .layers(4..=6)
1657            .authorities(vec![AuthorityIndex::new_for_test(0)])
1658            .skip_block()
1659            .build();
1660
1661        // Accept all blocks
1662        dag_builder
1663            .all_blocks()
1664            .into_iter()
1665            .for_each(|block| dag_state.accept_block(block));
1666
1667        // Now add a commit for leader round 5 to trigger an eviction
1668        dag_state.add_commit(TrustedCommit::new_for_test(
1669            1 as CommitIndex,
1670            CommitDigest::MIN,
1671            0,
1672            dag_builder.leader_block(5).unwrap().reference(),
1673            vec![],
1674        ));
1675
1676        dag_state.flush();
1677
1678        // Ensure that gc round has been updated
1679        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1680
1681        // Now what we expect to happen is for:
1682        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1683        // * Node 0 should have in cache blocks from it's latest round, 3, up to round
1684        //   1, which is the number of cached_rounds.
1685        for authority_index in 1..=3 {
1686            for round in 4..=6 {
1687                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1688                    round,
1689                    AuthorityIndex::new_for_test(authority_index)
1690                )));
1691            }
1692        }
1693
1694        for round in 1..=3 {
1695            assert!(
1696                dag_state.contains_cached_block_at_slot(Slot::new(
1697                    round,
1698                    AuthorityIndex::new_for_test(0)
1699                ))
1700            );
1701        }
1702
1703        // When trying to request for authority 1 at block slot 3 it should panic, as
1704        // anything that is <= 3 should be evicted
1705        let _ =
1706            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1707    }
1708
1709    #[tokio::test]
1710    async fn test_get_blocks_in_cache_or_store() {
1711        let (context, _) = Context::new_for_test(4);
1712        let context = Arc::new(context);
1713        let store = Arc::new(MemStore::new());
1714        let mut dag_state = DagState::new(context.clone(), store.clone());
1715
1716        // Create test blocks for round 1 ~ 10
1717        let num_rounds: u32 = 10;
1718        let num_authorities: u32 = 4;
1719        let mut blocks = Vec::new();
1720
1721        for round in 1..=num_rounds {
1722            for author in 0..num_authorities {
1723                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1724                blocks.push(block);
1725            }
1726        }
1727
1728        // Now write in store the blocks from first 4 rounds and the rest to the dag
1729        // state
1730        blocks.clone().into_iter().for_each(|block| {
1731            if block.round() <= 4 {
1732                store
1733                    .write(WriteBatch::default().blocks(vec![block]))
1734                    .unwrap();
1735            } else {
1736                dag_state.accept_blocks(vec![block]);
1737            }
1738        });
1739
1740        // Now when trying to query whether we have all the blocks, we should
1741        // successfully retrieve a positive answer where the blocks of first 4
1742        // round should be found in DagState and the rest in store.
1743        let mut block_refs = blocks
1744            .iter()
1745            .map(|block| block.reference())
1746            .collect::<Vec<_>>();
1747        let result = dag_state.get_blocks(&block_refs);
1748
1749        let mut expected = blocks
1750            .into_iter()
1751            .map(Some)
1752            .collect::<Vec<Option<VerifiedBlock>>>();
1753
1754        // Ensure everything is found
1755        assert_eq!(result, expected.clone());
1756
1757        // Now try to ask also for one block ref that is neither in cache nor in store
1758        block_refs.insert(
1759            3,
1760            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1761        );
1762        let result = dag_state.get_blocks(&block_refs);
1763
1764        // Then all should be found apart from the last one
1765        expected.insert(3, None);
1766        assert_eq!(result, expected);
1767    }
1768
1769    // TODO: Remove when DistributedVoteScoring is enabled.
1770    #[rstest]
1771    #[tokio::test]
1772    async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1773        telemetry_subscribers::init_for_testing();
1774        let num_authorities: u32 = 4;
1775        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1776        context
1777            .protocol_config
1778            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1779
1780        if gc_depth > 0 {
1781            context
1782                .protocol_config
1783                .set_consensus_gc_depth_for_testing(gc_depth);
1784        }
1785
1786        let context = Arc::new(context);
1787        let store = Arc::new(MemStore::new());
1788        let mut dag_state = DagState::new(context.clone(), store.clone());
1789
1790        // Create test blocks and commits for round 1 ~ 10
1791        let num_rounds: u32 = 10;
1792        let mut dag_builder = DagBuilder::new(context.clone());
1793        dag_builder.layers(1..=num_rounds).build();
1794        let mut commits = vec![];
1795
1796        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1797            commits.push(commit);
1798        }
1799
1800        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1801        let temp_commits = commits.split_off(5);
1802        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1803        for commit in commits.clone() {
1804            dag_state.add_commit(commit);
1805        }
1806
1807        // Flush the dag state
1808        dag_state.flush();
1809
1810        // Add the rest of the blocks and commits to the dag state
1811        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1812        for commit in temp_commits.clone() {
1813            dag_state.add_commit(commit);
1814        }
1815
1816        // All blocks should be found in DagState.
1817        let all_blocks = dag_builder.blocks(6..=num_rounds);
1818        let block_refs = all_blocks
1819            .iter()
1820            .map(|block| block.reference())
1821            .collect::<Vec<_>>();
1822
1823        let result = dag_state
1824            .get_blocks(&block_refs)
1825            .into_iter()
1826            .map(|b| b.unwrap())
1827            .collect::<Vec<_>>();
1828        assert_eq!(result, all_blocks);
1829
1830        // Last commit index should be 10.
1831        assert_eq!(dag_state.last_commit_index(), 10);
1832        assert_eq!(
1833            dag_state.last_committed_rounds(),
1834            dag_builder.last_committed_rounds.clone()
1835        );
1836
1837        // Destroy the dag state.
1838        drop(dag_state);
1839
1840        // Recover the state from the store
1841        let dag_state = DagState::new(context.clone(), store.clone());
1842
1843        // Blocks of first 5 rounds should be found in DagState.
1844        let blocks = dag_builder.blocks(1..=5);
1845        let block_refs = blocks
1846            .iter()
1847            .map(|block| block.reference())
1848            .collect::<Vec<_>>();
1849        let result = dag_state
1850            .get_blocks(&block_refs)
1851            .into_iter()
1852            .map(|b| b.unwrap())
1853            .collect::<Vec<_>>();
1854        assert_eq!(result, blocks);
1855
1856        // Blocks above round 5 should not be in DagState, because they are not flushed.
1857        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1858        let block_refs = missing_blocks
1859            .iter()
1860            .map(|block| block.reference())
1861            .collect::<Vec<_>>();
1862        let retrieved_blocks = dag_state
1863            .get_blocks(&block_refs)
1864            .into_iter()
1865            .flatten()
1866            .collect::<Vec<_>>();
1867        assert!(retrieved_blocks.is_empty());
1868
1869        // Last commit index should be 5.
1870        assert_eq!(dag_state.last_commit_index(), 5);
1871
1872        // This is the last_commit_rounds of the first 5 commits that were flushed
1873        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1874        assert_eq!(
1875            dag_state.last_committed_rounds(),
1876            expected_last_committed_rounds
1877        );
1878
1879        // Unscored subdags will be recovered based on the flushed commits and no commit
1880        // info
1881        assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1882    }
1883
1884    #[tokio::test]
1885    async fn test_flush_and_recovery() {
1886        telemetry_subscribers::init_for_testing();
1887        let num_authorities: u32 = 4;
1888        let (context, _) = Context::new_for_test(num_authorities as usize);
1889        let context = Arc::new(context);
1890        let store = Arc::new(MemStore::new());
1891        let mut dag_state = DagState::new(context.clone(), store.clone());
1892
1893        // Create test blocks and commits for round 1 ~ 10
1894        let num_rounds: u32 = 10;
1895        let mut dag_builder = DagBuilder::new(context.clone());
1896        dag_builder.layers(1..=num_rounds).build();
1897        let mut commits = vec![];
1898        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1899            commits.push(commit);
1900        }
1901
1902        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1903        let temp_commits = commits.split_off(5);
1904        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1905        for commit in commits.clone() {
1906            dag_state.add_commit(commit);
1907        }
1908
1909        // Flush the dag state
1910        dag_state.flush();
1911
1912        // Add the rest of the blocks and commits to the dag state
1913        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1914        for commit in temp_commits.clone() {
1915            dag_state.add_commit(commit);
1916        }
1917
1918        // All blocks should be found in DagState.
1919        let all_blocks = dag_builder.blocks(6..=num_rounds);
1920        let block_refs = all_blocks
1921            .iter()
1922            .map(|block| block.reference())
1923            .collect::<Vec<_>>();
1924        let result = dag_state
1925            .get_blocks(&block_refs)
1926            .into_iter()
1927            .map(|b| b.unwrap())
1928            .collect::<Vec<_>>();
1929        assert_eq!(result, all_blocks);
1930
1931        // Last commit index should be 10.
1932        assert_eq!(dag_state.last_commit_index(), 10);
1933        assert_eq!(
1934            dag_state.last_committed_rounds(),
1935            dag_builder.last_committed_rounds.clone()
1936        );
1937
1938        // Destroy the dag state.
1939        drop(dag_state);
1940
1941        // Recover the state from the store
1942        let dag_state = DagState::new(context.clone(), store.clone());
1943
1944        // Blocks of first 5 rounds should be found in DagState.
1945        let blocks = dag_builder.blocks(1..=5);
1946        let block_refs = blocks
1947            .iter()
1948            .map(|block| block.reference())
1949            .collect::<Vec<_>>();
1950        let result = dag_state
1951            .get_blocks(&block_refs)
1952            .into_iter()
1953            .map(|b| b.unwrap())
1954            .collect::<Vec<_>>();
1955        assert_eq!(result, blocks);
1956
1957        // Blocks above round 5 should not be in DagState, because they are not flushed.
1958        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1959        let block_refs = missing_blocks
1960            .iter()
1961            .map(|block| block.reference())
1962            .collect::<Vec<_>>();
1963        let retrieved_blocks = dag_state
1964            .get_blocks(&block_refs)
1965            .into_iter()
1966            .flatten()
1967            .collect::<Vec<_>>();
1968        assert!(retrieved_blocks.is_empty());
1969
1970        // Last commit index should be 5.
1971        assert_eq!(dag_state.last_commit_index(), 5);
1972
1973        // This is the last_commit_rounds of the first 5 commits that were flushed
1974        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1975        assert_eq!(
1976            dag_state.last_committed_rounds(),
1977            expected_last_committed_rounds
1978        );
1979        // Unscored subdags will be recovered based on the flushed commits and no commit
1980        // info
1981        assert_eq!(dag_state.scoring_subdags_count(), 5);
1982    }
1983
1984    #[tokio::test]
1985    async fn test_flush_and_recovery_gc_enabled() {
1986        telemetry_subscribers::init_for_testing();
1987
1988        const GC_DEPTH: u32 = 3;
1989        const CACHED_ROUNDS: u32 = 4;
1990
1991        let num_authorities: u32 = 4;
1992        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1993        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1994        context
1995            .protocol_config
1996            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1997        context
1998            .protocol_config
1999            .set_consensus_linearize_subdag_v2_for_testing(true);
2000
2001        let context = Arc::new(context);
2002
2003        let store = Arc::new(MemStore::new());
2004        let mut dag_state = DagState::new(context.clone(), store.clone());
2005
2006        let num_rounds: u32 = 10;
2007        let mut dag_builder = DagBuilder::new(context.clone());
2008        dag_builder.layers(1..=5).build();
2009        dag_builder
2010            .layers(6..=8)
2011            .authorities(vec![AuthorityIndex::new_for_test(0)])
2012            .skip_block()
2013            .build();
2014        dag_builder.layers(9..=num_rounds).build();
2015
2016        let mut commits = dag_builder
2017            .get_sub_dag_and_commits(1..=num_rounds)
2018            .into_iter()
2019            .map(|(_subdag, commit)| commit)
2020            .collect::<Vec<_>>();
2021
2022        // Add the blocks from first 8 rounds and first 7 commits to the dag state
2023        // It's 7 commits because we missing the commit of round 8 where authority 0 is
2024        // the leader, but produced no block
2025        let temp_commits = commits.split_off(7);
2026        dag_state.accept_blocks(dag_builder.blocks(1..=8));
2027        for commit in commits.clone() {
2028            dag_state.add_commit(commit);
2029        }
2030
2031        // Holds all the committed blocks from the commits that ended up being persisted
2032        // (flushed). Any commits that not flushed will not be considered.
2033        let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2034        for commit in commits.iter() {
2035            all_committed_blocks.extend(commit.blocks());
2036        }
2037        // Flush the dag state
2038        dag_state.flush();
2039
2040        // Add the rest of the blocks and commits to the dag state
2041        dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2042        for commit in temp_commits.clone() {
2043            dag_state.add_commit(commit);
2044        }
2045
2046        // All blocks should be found in DagState.
2047        let all_blocks = dag_builder.blocks(1..=num_rounds);
2048        let block_refs = all_blocks
2049            .iter()
2050            .map(|block| block.reference())
2051            .collect::<Vec<_>>();
2052        let result = dag_state
2053            .get_blocks(&block_refs)
2054            .into_iter()
2055            .map(|b| b.unwrap())
2056            .collect::<Vec<_>>();
2057        assert_eq!(result, all_blocks);
2058
2059        // Last commit index should be 9
2060        assert_eq!(dag_state.last_commit_index(), 9);
2061        assert_eq!(
2062            dag_state.last_committed_rounds(),
2063            dag_builder.last_committed_rounds.clone()
2064        );
2065
2066        // Destroy the dag state.
2067        drop(dag_state);
2068
2069        // Recover the state from the store
2070        let dag_state = DagState::new(context.clone(), store.clone());
2071
2072        // Blocks of first 5 rounds should be found in DagState.
2073        let blocks = dag_builder.blocks(1..=5);
2074        let block_refs = blocks
2075            .iter()
2076            .map(|block| block.reference())
2077            .collect::<Vec<_>>();
2078        let result = dag_state
2079            .get_blocks(&block_refs)
2080            .into_iter()
2081            .map(|b| b.unwrap())
2082            .collect::<Vec<_>>();
2083        assert_eq!(result, blocks);
2084
2085        // Blocks above round 9 should not be in DagState, because they are not flushed.
2086        let missing_blocks = dag_builder.blocks(9..=num_rounds);
2087        let block_refs = missing_blocks
2088            .iter()
2089            .map(|block| block.reference())
2090            .collect::<Vec<_>>();
2091        let retrieved_blocks = dag_state
2092            .get_blocks(&block_refs)
2093            .into_iter()
2094            .flatten()
2095            .collect::<Vec<_>>();
2096        assert!(retrieved_blocks.is_empty());
2097
2098        // Last commit index should be 7.
2099        assert_eq!(dag_state.last_commit_index(), 7);
2100
2101        // This is the last_commit_rounds of the first 7 commits that were flushed
2102        let expected_last_committed_rounds = vec![5, 6, 6, 7];
2103        assert_eq!(
2104            dag_state.last_committed_rounds(),
2105            expected_last_committed_rounds
2106        );
2107        // Unscored subdags will be recoverd based on the flushed commits and no commit
2108        // info
2109        assert_eq!(dag_state.scoring_subdags_count(), 7);
2110        // Ensure that cached blocks exist only for specific rounds per authority
2111        for (authority_index, _) in context.committee.authorities() {
2112            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2113
2114            // Ensure that eviction rounds have been properly recovered
2115            // DagState should hold cached blocks for authority 0 for rounds [2..=5] as no
2116            // higher blocks exist and due to CACHED_ROUNDS = 4 we want at max
2117            // to hold blocks for 4 rounds in cache.
2118            if authority_index == AuthorityIndex::new_for_test(0) {
2119                assert_eq!(blocks.len(), 4);
2120                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2121                assert!(
2122                    blocks
2123                        .into_iter()
2124                        .all(|block| block.round() >= 2 && block.round() <= 5)
2125                );
2126            } else {
2127                assert_eq!(blocks.len(), 4);
2128                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2129                assert!(
2130                    blocks
2131                        .into_iter()
2132                        .all(|block| block.round() >= 5 && block.round() <= 8)
2133                );
2134            }
2135        }
2136        // Ensure that committed blocks from > gc_round have been correctly marked as
2137        // committed according to committed sub dags
2138        let gc_round = dag_state.gc_round();
2139        assert_eq!(gc_round, 4);
2140        dag_state
2141            .recent_blocks
2142            .iter()
2143            .for_each(|(block_ref, block_info)| {
2144                if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2145                    assert!(
2146                        block_info.committed,
2147                        "Block {:?} should be committed",
2148                        block_ref
2149                    );
2150                };
2151            });
2152    }
2153
2154    #[tokio::test]
2155    async fn test_block_info_as_committed() {
2156        let num_authorities: u32 = 4;
2157        let (context, _) = Context::new_for_test(num_authorities as usize);
2158        let context = Arc::new(context);
2159
2160        let store = Arc::new(MemStore::new());
2161        let mut dag_state = DagState::new(context.clone(), store.clone());
2162
2163        // Accept a block
2164        let block = VerifiedBlock::new_for_test(
2165            TestBlock::new(1, 0)
2166                .set_timestamp_ms(1000)
2167                .set_ancestors(vec![])
2168                .build(),
2169        );
2170
2171        dag_state.accept_block(block.clone());
2172
2173        // Query is committed
2174        assert!(!dag_state.is_committed(&block.reference()));
2175
2176        // Set block as committed for first time should return true
2177        assert!(
2178            dag_state.set_committed(&block.reference()),
2179            "Block should be successfully set as committed for first time"
2180        );
2181
2182        // Now it should appear as committed
2183        assert!(dag_state.is_committed(&block.reference()));
2184
2185        // Trying to set the block as committed again, it should return false.
2186        assert!(
2187            !dag_state.set_committed(&block.reference()),
2188            "Block should not be successfully set as committed"
2189        );
2190    }
2191
2192    #[tokio::test]
2193    async fn test_get_cached_blocks() {
2194        let (mut context, _) = Context::new_for_test(4);
2195        context.parameters.dag_state_cached_rounds = 5;
2196
2197        let context = Arc::new(context);
2198        let store = Arc::new(MemStore::new());
2199        let mut dag_state = DagState::new(context.clone(), store.clone());
2200
2201        // Create no blocks for authority 0
2202        // Create one block (round 10) for authority 1
2203        // Create two blocks (rounds 10,11) for authority 2
2204        // Create three blocks (rounds 10,11,12) for authority 3
2205        let mut all_blocks = Vec::new();
2206        for author in 1..=3 {
2207            for round in 10..(10 + author) {
2208                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2209                all_blocks.push(block.clone());
2210                dag_state.accept_block(block);
2211            }
2212        }
2213
2214        let cached_blocks =
2215            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2216        assert!(cached_blocks.is_empty());
2217
2218        let cached_blocks =
2219            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2220        assert_eq!(cached_blocks.len(), 1);
2221        assert_eq!(cached_blocks[0].round(), 10);
2222
2223        let cached_blocks =
2224            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2225        assert_eq!(cached_blocks.len(), 2);
2226        assert_eq!(cached_blocks[0].round(), 10);
2227        assert_eq!(cached_blocks[1].round(), 11);
2228
2229        let cached_blocks =
2230            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2231        assert_eq!(cached_blocks.len(), 1);
2232        assert_eq!(cached_blocks[0].round(), 11);
2233
2234        let cached_blocks =
2235            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2236        assert_eq!(cached_blocks.len(), 3);
2237        assert_eq!(cached_blocks[0].round(), 10);
2238        assert_eq!(cached_blocks[1].round(), 11);
2239        assert_eq!(cached_blocks[2].round(), 12);
2240
2241        let cached_blocks =
2242            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2243        assert_eq!(cached_blocks.len(), 1);
2244        assert_eq!(cached_blocks[0].round(), 12);
2245    }
2246
2247    #[rstest]
2248    #[tokio::test]
2249    async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2250        // GIVEN
2251        const CACHED_ROUNDS: Round = 2;
2252        let (mut context, _) = Context::new_for_test(4);
2253        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2254
2255        if gc_depth > 0 {
2256            context
2257                .protocol_config
2258                .set_consensus_gc_depth_for_testing(gc_depth);
2259        }
2260
2261        let context = Arc::new(context);
2262        let store = Arc::new(MemStore::new());
2263        let mut dag_state = DagState::new(context.clone(), store.clone());
2264
2265        // Create no blocks for authority 0
2266        // Create one block (round 1) for authority 1
2267        // Create two blocks (rounds 1,2) for authority 2
2268        // Create three blocks (rounds 1,2,3) for authority 3
2269        let dag_str = "DAG {
2270            Round 0 : { 4 },
2271            Round 1 : {
2272                B -> [*],
2273                C -> [*],
2274                D -> [*],
2275            },
2276            Round 2 : {
2277                C -> [*],
2278                D -> [*],
2279            },
2280            Round 3 : {
2281                D -> [*],
2282            },
2283        }";
2284
2285        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2286
2287        // Add equivocating block for round 2 authority 3
2288        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2289
2290        // Accept all blocks
2291        for block in dag_builder
2292            .all_blocks()
2293            .into_iter()
2294            .chain(std::iter::once(block))
2295        {
2296            dag_state.accept_block(block);
2297        }
2298
2299        dag_state.add_commit(TrustedCommit::new_for_test(
2300            1 as CommitIndex,
2301            CommitDigest::MIN,
2302            context.clock.timestamp_utc_ms(),
2303            dag_builder.leader_block(3).unwrap().reference(),
2304            vec![],
2305        ));
2306
2307        // WHEN search for the latest blocks
2308        let end_round = 4;
2309        let expected_rounds = vec![0, 1, 2, 3];
2310        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2311        // THEN
2312        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2313        assert_eq!(
2314            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2315            expected_rounds
2316        );
2317        assert_eq!(
2318            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2319            expected_excluded_and_equivocating_blocks
2320        );
2321
2322        // THEN
2323        for (i, expected_round) in expected_rounds.iter().enumerate() {
2324            let round = dag_state
2325                .get_last_cached_block_in_range(
2326                    context.committee.to_authority_index(i).unwrap(),
2327                    0,
2328                    end_round,
2329                )
2330                .map(|b| b.round())
2331                .unwrap_or_default();
2332            assert_eq!(round, *expected_round, "Authority {i}");
2333        }
2334
2335        // WHEN starting from round 2
2336        let start_round = 2;
2337        let expected_rounds = [0, 0, 2, 3];
2338
2339        // THEN
2340        for (i, expected_round) in expected_rounds.iter().enumerate() {
2341            let round = dag_state
2342                .get_last_cached_block_in_range(
2343                    context.committee.to_authority_index(i).unwrap(),
2344                    start_round,
2345                    end_round,
2346                )
2347                .map(|b| b.round())
2348                .unwrap_or_default();
2349            assert_eq!(round, *expected_round, "Authority {i}");
2350        }
2351
2352        // WHEN we flush the DagState - after adding a
2353        // commit with all the blocks, we expect this to trigger a clean up in
2354        // the internal cache. That will keep the all the blocks with rounds >=
2355        // authority_commit_round - CACHED_ROUND.
2356        //
2357        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and
2358        // for those who don't have blocks > gc_round, we'll keep
2359        // all their highest round blocks for CACHED_ROUNDS.
2360        dag_state.flush();
2361
2362        // AND we request before round 3
2363        let end_round = 3;
2364        let expected_rounds = vec![0, 1, 2, 2];
2365
2366        // THEN
2367        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2368        assert_eq!(
2369            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2370            expected_rounds
2371        );
2372
2373        // THEN
2374        for (i, expected_round) in expected_rounds.iter().enumerate() {
2375            let round = dag_state
2376                .get_last_cached_block_in_range(
2377                    context.committee.to_authority_index(i).unwrap(),
2378                    0,
2379                    end_round,
2380                )
2381                .map(|b| b.round())
2382                .unwrap_or_default();
2383            assert_eq!(round, *expected_round, "Authority {i}");
2384        }
2385    }
2386
2387    #[tokio::test]
2388    #[should_panic(
2389        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2390    )]
2391    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2392        // GIVEN
2393        const CACHED_ROUNDS: Round = 1;
2394        let (mut context, _) = Context::new_for_test(4);
2395        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2396        context
2397            .protocol_config
2398            .set_consensus_gc_depth_for_testing(0);
2399
2400        let context = Arc::new(context);
2401        let store = Arc::new(MemStore::new());
2402        let mut dag_state = DagState::new(context.clone(), store.clone());
2403
2404        // Create no blocks for authority 0
2405        // Create one block (round 1) for authority 1
2406        // Create two blocks (rounds 1,2) for authority 2
2407        // Create three blocks (rounds 1,2,3) for authority 3
2408        let mut all_blocks = Vec::new();
2409        for author in 1..=3 {
2410            for round in 1..=author {
2411                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2412                all_blocks.push(block.clone());
2413                dag_state.accept_block(block);
2414            }
2415        }
2416
2417        dag_state.add_commit(TrustedCommit::new_for_test(
2418            1 as CommitIndex,
2419            CommitDigest::MIN,
2420            0,
2421            all_blocks.last().unwrap().reference(),
2422            all_blocks
2423                .into_iter()
2424                .map(|block| block.reference())
2425                .collect::<Vec<_>>(),
2426        ));
2427
2428        // Flush the store so we keep in memory only the last 1 round from the last
2429        // commit for each authority.
2430        dag_state.flush();
2431
2432        // THEN the method should panic, as some authorities have already evicted rounds
2433        // <= round 2
2434        let end_round = 2;
2435        dag_state.get_last_cached_block_per_authority(end_round);
2436    }
2437
2438    #[tokio::test]
2439    #[should_panic(
2440        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2441    )]
2442    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
2443        // GIVEN
2444        const CACHED_ROUNDS: Round = 1;
2445        const GC_DEPTH: u32 = 1;
2446        let (mut context, _) = Context::new_for_test(4);
2447        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2448        context
2449            .protocol_config
2450            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2451
2452        let context = Arc::new(context);
2453        let store = Arc::new(MemStore::new());
2454        let mut dag_state = DagState::new(context.clone(), store.clone());
2455
2456        // Create no blocks for authority 0
2457        // Create one block (round 1) for authority 1
2458        // Create two blocks (rounds 1,2) for authority 2
2459        // Create three blocks (rounds 1,2,3) for authority 3
2460        let mut dag_builder = DagBuilder::new(context.clone());
2461        dag_builder
2462            .layers(1..=1)
2463            .authorities(vec![AuthorityIndex::new_for_test(0)])
2464            .skip_block()
2465            .build();
2466        dag_builder
2467            .layers(2..=2)
2468            .authorities(vec![
2469                AuthorityIndex::new_for_test(0),
2470                AuthorityIndex::new_for_test(1),
2471            ])
2472            .skip_block()
2473            .build();
2474        dag_builder
2475            .layers(3..=3)
2476            .authorities(vec![
2477                AuthorityIndex::new_for_test(0),
2478                AuthorityIndex::new_for_test(1),
2479                AuthorityIndex::new_for_test(2),
2480            ])
2481            .skip_block()
2482            .build();
2483
2484        // Accept all blocks
2485        for block in dag_builder.all_blocks() {
2486            dag_state.accept_block(block);
2487        }
2488
2489        dag_state.add_commit(TrustedCommit::new_for_test(
2490            1 as CommitIndex,
2491            CommitDigest::MIN,
2492            0,
2493            dag_builder.leader_block(3).unwrap().reference(),
2494            vec![],
2495        ));
2496
2497        // Flush the store so we update the evict rounds
2498        dag_state.flush();
2499
2500        // THEN the method should panic, as some authorities have already evicted rounds
2501        // <= round 2
2502        dag_state.get_last_cached_block_per_authority(2);
2503    }
2504
2505    #[tokio::test]
2506    async fn test_last_quorum() {
2507        // GIVEN
2508        let (context, _) = Context::new_for_test(4);
2509        let context = Arc::new(context);
2510        let store = Arc::new(MemStore::new());
2511        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2512
2513        // WHEN no blocks exist then genesis should be returned
2514        {
2515            let genesis = genesis_blocks(context.clone());
2516
2517            assert_eq!(dag_state.read().last_quorum(), genesis);
2518        }
2519
2520        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks
2521        // should be returned as quorum
2522        {
2523            let mut dag_builder = DagBuilder::new(context.clone());
2524            dag_builder
2525                .layers(1..=4)
2526                .build()
2527                .persist_layers(dag_state.clone());
2528            let round_4_blocks: Vec<_> = dag_builder
2529                .blocks(4..=4)
2530                .into_iter()
2531                .map(|block| block.reference())
2532                .collect();
2533
2534            let last_quorum = dag_state.read().last_quorum();
2535
2536            assert_eq!(
2537                last_quorum
2538                    .into_iter()
2539                    .map(|block| block.reference())
2540                    .collect::<Vec<_>>(),
2541                round_4_blocks
2542            );
2543        }
2544
2545        // WHEN adding one more block at round 5, still round 4 should be returned as
2546        // quorum
2547        {
2548            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2549            dag_state.write().accept_block(block);
2550
2551            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2552
2553            let last_quorum = dag_state.read().last_quorum();
2554
2555            assert_eq!(last_quorum, round_4_blocks);
2556        }
2557    }
2558
2559    #[tokio::test]
2560    async fn test_last_block_for_authority() {
2561        // GIVEN
2562        let (context, _) = Context::new_for_test(4);
2563        let context = Arc::new(context);
2564        let store = Arc::new(MemStore::new());
2565        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2566
2567        // WHEN no blocks exist then genesis should be returned
2568        {
2569            let genesis = genesis_blocks(context.clone());
2570            let my_genesis = genesis
2571                .into_iter()
2572                .find(|block| block.author() == context.own_index)
2573                .unwrap();
2574
2575            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2576        }
2577
2578        // WHEN adding some blocks for authorities, only the last ones should be
2579        // returned
2580        {
2581            // add blocks up to round 4
2582            let mut dag_builder = DagBuilder::new(context.clone());
2583            dag_builder
2584                .layers(1..=4)
2585                .build()
2586                .persist_layers(dag_state.clone());
2587
2588            // add block 5 for authority 0
2589            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2590            dag_state.write().accept_block(block);
2591
2592            let block = dag_state
2593                .read()
2594                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2595            assert_eq!(block.round(), 5);
2596
2597            for (authority_index, _) in context.committee.authorities() {
2598                let block = dag_state
2599                    .read()
2600                    .get_last_block_for_authority(authority_index);
2601
2602                if authority_index.value() == 0 {
2603                    assert_eq!(block.round(), 5);
2604                } else {
2605                    assert_eq!(block.round(), 4);
2606                }
2607            }
2608        }
2609    }
2610}