consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::max,
7    collections::{BTreeMap, BTreeSet, VecDeque},
8    ops::Bound::{Excluded, Included, Unbounded},
9    panic,
10    sync::Arc,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info};
18
19use crate::{
20    CommittedSubDag,
21    block::{
22        BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23        VerifiedBlock, genesis_blocks,
24    },
25    commit::{
26        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28    },
29    context::Context,
30    leader_scoring::{ReputationScores, ScoringSubdag},
31    storage::{Store, WriteBatch},
32    threshold_clock::ThresholdClock,
33};
34
35/// DagState provides the API to write and read accepted blocks from the DAG.
36/// Only uncommitted and last committed blocks are cached in memory.
37/// The rest of blocks are stored on disk.
38/// Refs to cached blocks and additional refs are cached as well, to speed up
39/// existence checks.
40///
41/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
42/// concurrent access from multiple components.
43pub(crate) struct DagState {
44    context: Arc<Context>,
45
46    // The genesis blocks
47    genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
50    // Note: all uncommitted blocks are kept in memory.
51    //
52    // When GC is enabled, this map has a different semantic. It holds all the recent data for each
53    // authority making sure that it always have available CACHED_ROUNDS worth of data. The
54    // entries are evicted based on the latest GC round, however the eviction process will respect
55    // the CACHED_ROUNDS. For each authority, blocks are only evicted when their round is less
56    // than or equal to both `gc_round`, and `highest authority round - cached rounds`.
57    // This ensures that the GC requirements are respected (we never clean up any block above
58    // `gc_round`), and there are enough blocks cached.
59    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61    // Indexes recent block refs by their authorities.
62    // Vec position corresponds to the authority index.
63    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65    // Keeps track of the threshold clock for proposing blocks.
66    threshold_clock: ThresholdClock,
67
68    // Keeps track of the highest round that has been evicted for each authority. Any blocks that
69    // are of round <= evict_round should be considered evicted, and if any exist we should not
70    // consider the causauly complete in the order they appear. The `evicted_rounds` size
71    // should be the same as the committee size.
72    evicted_rounds: Vec<Round>,
73
74    // Highest round of blocks accepted.
75    highest_accepted_round: Round,
76
77    // Last consensus commit of the dag.
78    last_commit: Option<TrustedCommit>,
79
80    // Last wall time when commit round advanced. Does not persist across restarts.
81    last_commit_round_advancement_time: Option<std::time::Instant>,
82
83    // Last committed rounds per authority.
84    last_committed_rounds: Vec<Round>,
85
86    /// The committed subdags that have been scored but scores have not been
87    /// used for leader schedule yet.
88    scoring_subdag: ScoringSubdag,
89    // TODO: Remove when DistributedVoteScoring is enabled.
90    /// The list of committed subdags that have been sequenced by the universal
91    /// committer but have yet to be used to calculate reputation scores for the
92    /// next leader schedule. Until then we consider it as "unscored" subdags.
93    unscored_committed_subdags: Vec<CommittedSubDag>,
94
95    // Commit votes pending to be included in new blocks.
96    // TODO: limit to 1st commit per round with multi-leader.
97    pending_commit_votes: VecDeque<CommitVote>,
98
99    // Data to be flushed to storage.
100    blocks_to_write: Vec<VerifiedBlock>,
101    commits_to_write: Vec<TrustedCommit>,
102
103    // Buffer the reputation scores & last_committed_rounds to be flushed with the
104    // next dag state flush. This is okay because we can recover reputation scores
105    // & last_committed_rounds from the commits as needed.
106    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108    // Persistent storage for blocks, commits and other consensus data.
109    store: Arc<dyn Store>,
110
111    // The number of cached rounds
112    cached_rounds: Round,
113}
114
115impl DagState {
116    /// Initializes DagState from storage.
117    pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119        let num_authorities = context.committee.size();
120
121        let genesis = genesis_blocks(context.clone())
122            .into_iter()
123            .map(|block| (block.reference(), block))
124            .collect();
125
126        let threshold_clock = ThresholdClock::new(1, context.clone());
127
128        let last_commit = store
129            .read_last_commit()
130            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132        let commit_info = store
133            .read_last_commit_info()
134            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135        let (mut last_committed_rounds, commit_recovery_start_index) =
136            if let Some((commit_ref, commit_info)) = commit_info {
137                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138                (commit_info.committed_rounds, commit_ref.index + 1)
139            } else {
140                tracing::info!("Found no stored CommitInfo to recover from");
141                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142            };
143
144        let mut unscored_committed_subdags = Vec::new();
145        let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147        if let Some(last_commit) = last_commit.as_ref() {
148            store
149                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150                .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151                .iter()
152                .for_each(|commit| {
153                    for block_ref in commit.blocks() {
154                        last_committed_rounds[block_ref.author] =
155                            max(last_committed_rounds[block_ref.author], block_ref.round);
156                    }
157
158                    let committed_subdag =
159                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); // We don't need to recover reputation scores for unscored_committed_subdags
160                    unscored_committed_subdags.push(committed_subdag);
161                });
162        }
163
164        tracing::info!(
165            "DagState was initialized with the following state: \
166            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167            unscored_committed_subdags.len()
168        );
169
170        if context
171            .protocol_config
172            .consensus_distributed_vote_scoring_strategy()
173        {
174            scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175        }
176
177        let mut state = Self {
178            context,
179            genesis,
180            recent_blocks: BTreeMap::new(),
181            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182            threshold_clock,
183            highest_accepted_round: 0,
184            last_commit: last_commit.clone(),
185            last_commit_round_advancement_time: None,
186            last_committed_rounds: last_committed_rounds.clone(),
187            pending_commit_votes: VecDeque::new(),
188            blocks_to_write: vec![],
189            commits_to_write: vec![],
190            commit_info_to_write: vec![],
191            scoring_subdag,
192            unscored_committed_subdags,
193            store: store.clone(),
194            cached_rounds,
195            evicted_rounds: vec![0; num_authorities],
196        };
197
198        for (i, round) in last_committed_rounds.into_iter().enumerate() {
199            let authority_index = state.context.committee.to_authority_index(i).unwrap();
200            let (blocks, eviction_round) = if state.gc_enabled() {
201                // Find the latest block for the authority to calculate the eviction round. Then
202                // we want to scan and load the blocks from the eviction round and onwards only.
203                // As reminder, the eviction round is taking into account the gc_round.
204                let last_block = state
205                    .store
206                    .scan_last_blocks_by_author(authority_index, 1, None)
207                    .expect("Database error");
208                let last_block_round = last_block
209                    .last()
210                    .map(|b| b.round())
211                    .unwrap_or(GENESIS_ROUND);
212
213                let eviction_round = Self::gc_eviction_round(
214                    last_block_round,
215                    state.gc_round(),
216                    state.cached_rounds,
217                );
218                let blocks = state
219                    .store
220                    .scan_blocks_by_author(authority_index, eviction_round + 1)
221                    .expect("Database error");
222                (blocks, eviction_round)
223            } else {
224                let eviction_round = Self::eviction_round(round, cached_rounds);
225                let blocks = state
226                    .store
227                    .scan_blocks_by_author(authority_index, eviction_round + 1)
228                    .expect("Database error");
229                (blocks, eviction_round)
230            };
231
232            state.evicted_rounds[authority_index] = eviction_round;
233
234            // Update the block metadata for the authority.
235            for block in &blocks {
236                state.update_block_metadata(block);
237            }
238
239            info!(
240                "Recovered blocks {}: {:?}",
241                authority_index,
242                blocks
243                    .iter()
244                    .map(|b| b.reference())
245                    .collect::<Vec<BlockRef>>()
246            );
247        }
248
249        // Initialize scoring metrics according to the metrics in store and the blocks
250        // that were loaded to cache.
251        let recovered_scoring_metrics = state.store.scan_metrics().expect("Database error");
252        state.context.metrics.initialize_scoring_metrics(
253            recovered_scoring_metrics,
254            &state.recent_refs_by_authority,
255            state.threshold_clock_round(),
256            &state.evicted_rounds,
257            state.context.clone(),
258        );
259
260        if state.gc_enabled() {
261            if let Some(last_commit) = last_commit {
262                let mut index = last_commit.index();
263                let gc_round = state.gc_round();
264                info!(
265                    "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
266                    index, gc_round
267                );
268
269                loop {
270                    let commits = store
271                        .scan_commits((index..=index).into())
272                        .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
273                    let Some(commit) = commits.first() else {
274                        info!(
275                            "Recovering finished up to index {index}, no more commits to recover"
276                        );
277                        break;
278                    };
279
280                    // Check the commit leader round to see if it is within the gc_round. If it is
281                    // not then we can stop the recovery process.
282                    if gc_round > 0 && commit.leader().round <= gc_round {
283                        info!(
284                            "Recovering finished, reached commit leader round {} <= gc_round {}",
285                            commit.leader().round,
286                            gc_round
287                        );
288                        break;
289                    }
290
291                    commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
292                        debug!(
293                            "Setting block {:?} as committed based on commit {:?}",
294                            block_ref,
295                            commit.index()
296                        );
297                        assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
298                    });
299
300                    // All commits are indexed starting from 1, so one reach zero exit.
301                    index = index.saturating_sub(1);
302                    if index == 0 {
303                        break;
304                    }
305                }
306            }
307        }
308
309        state
310    }
311
312    /// Accepts a block into DagState and keeps it in memory.
313    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
314        assert_ne!(
315            block.round(),
316            0,
317            "Genesis block should not be accepted into DAG."
318        );
319
320        let block_ref = block.reference();
321        if self.contains_block(&block_ref) {
322            return;
323        }
324
325        let now = self.context.clock.timestamp_utc_ms();
326        if block.timestamp_ms() > now {
327            panic!(
328                "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
329                block,
330                block.timestamp_ms(),
331                now,
332            );
333        }
334
335        // TODO: Move this check to core
336        // Ensure we don't write multiple blocks per slot for our own index
337        if block_ref.author == self.context.own_index {
338            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
339            assert!(
340                existing_blocks.is_empty(),
341                "Block Rejected! Attempted to add block {block:#?} to own slot where \
342                block(s) {existing_blocks:#?} already exists."
343            );
344        }
345        self.update_block_metadata(&block);
346        self.blocks_to_write.push(block);
347        let source = if self.context.own_index == block_ref.author {
348            "own"
349        } else {
350            "others"
351        };
352        self.context
353            .metrics
354            .node_metrics
355            .accepted_blocks
356            .with_label_values(&[source])
357            .inc();
358    }
359
360    /// Updates internal metadata for a block.
361    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
362        let block_ref = block.reference();
363        self.recent_blocks
364            .insert(block_ref, BlockInfo::new(block.clone()));
365        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
366        self.threshold_clock.add_block(block_ref);
367        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
368        self.context
369            .metrics
370            .node_metrics
371            .highest_accepted_round
372            .set(self.highest_accepted_round as i64);
373
374        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
375            .last()
376            .map(|block_ref| block_ref.round)
377            .expect("There should be by now at least one block ref");
378        let hostname = &self.context.committee.authority(block_ref.author).hostname;
379        self.context
380            .metrics
381            .node_metrics
382            .highest_accepted_authority_round
383            .with_label_values(&[hostname])
384            .set(highest_accepted_round_for_author as i64);
385    }
386
387    /// Accepts a blocks into DagState and keeps it in memory.
388    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
389        debug!(
390            "Accepting blocks: {}",
391            blocks.iter().map(|b| b.reference().to_string()).join(",")
392        );
393        for block in blocks {
394            self.accept_block(block);
395        }
396    }
397
398    /// Gets a block by checking cached recent blocks then storage.
399    /// Returns None when the block is not found.
400    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
401        self.get_blocks(&[*reference])
402            .pop()
403            .expect("Exactly one element should be returned")
404    }
405
406    /// Gets blocks by checking genesis, cached recent blocks in memory, then
407    /// storage. An element is None when the corresponding block is not
408    /// found.
409    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
410        let mut blocks = vec![None; block_refs.len()];
411        let mut missing = Vec::new();
412
413        for (index, block_ref) in block_refs.iter().enumerate() {
414            if block_ref.round == GENESIS_ROUND {
415                // Allow the caller to handle the invalid genesis ancestor error.
416                if let Some(block) = self.genesis.get(block_ref) {
417                    blocks[index] = Some(block.clone());
418                }
419                continue;
420            }
421            if let Some(block_info) = self.recent_blocks.get(block_ref) {
422                blocks[index] = Some(block_info.block.clone());
423                continue;
424            }
425            missing.push((index, block_ref));
426        }
427
428        if missing.is_empty() {
429            return blocks;
430        }
431
432        let missing_refs = missing
433            .iter()
434            .map(|(_, block_ref)| **block_ref)
435            .collect::<Vec<_>>();
436        let store_results = self
437            .store
438            .read_blocks(&missing_refs)
439            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
440        self.context
441            .metrics
442            .node_metrics
443            .dag_state_store_read_count
444            .with_label_values(&["get_blocks"])
445            .inc();
446
447        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
448            blocks[index] = result;
449        }
450
451        blocks
452    }
453
454    // Sets the block as committed in the cache. If the block is set as committed
455    // for first time, then true is returned, otherwise false is returned instead.
456    // Method will panic if the block is not found in the cache.
457    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
458        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
459            if !block_info.committed {
460                block_info.committed = true;
461                return true;
462            }
463            false
464        } else {
465            panic!("Block {block_ref:?} not found in cache to set as committed.");
466        }
467    }
468
469    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
470        self.recent_blocks
471            .get(block_ref)
472            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
473            .committed
474    }
475
476    /// Gets all uncommitted blocks in a slot.
477    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
478    /// checked.
479    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
480        // TODO: either panic below when the slot is at or below the last committed
481        // round, or support reading from storage while limiting storage reads
482        // to edge cases.
483
484        let mut blocks = vec![];
485        for (_block_ref, block_info) in self.recent_blocks.range((
486            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
487            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
488        )) {
489            blocks.push(block_info.block.clone())
490        }
491        blocks
492    }
493
494    /// Gets all uncommitted blocks in a round.
495    /// Uncommitted blocks must exist in memory, so only in-memory blocks are
496    /// checked.
497    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
498        if round <= self.last_commit_round() {
499            panic!("Round {round} have committed blocks!");
500        }
501
502        let mut blocks = vec![];
503        for (_block_ref, block_info) in self.recent_blocks.range((
504            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
505            Excluded(BlockRef::new(
506                round + 1,
507                AuthorityIndex::ZERO,
508                BlockDigest::MIN,
509            )),
510        )) {
511            blocks.push(block_info.block.clone())
512        }
513        blocks
514    }
515
516    /// Gets all ancestors in the history of a block at a certain round.
517    pub(crate) fn ancestors_at_round(
518        &self,
519        later_block: &VerifiedBlock,
520        earlier_round: Round,
521    ) -> Vec<VerifiedBlock> {
522        // Iterate through ancestors of later_block in round descending order.
523        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
524        while !linked.is_empty() {
525            let round = linked.last().unwrap().round;
526            // Stop after finishing traversal for ancestors above earlier_round.
527            if round <= earlier_round {
528                break;
529            }
530            let block_ref = linked.pop_last().unwrap();
531            let Some(block) = self.get_block(&block_ref) else {
532                panic!("Block {block_ref:?} should exist in DAG!");
533            };
534            linked.extend(block.ancestors().iter().cloned());
535        }
536        linked
537            .range((
538                Included(BlockRef::new(
539                    earlier_round,
540                    AuthorityIndex::ZERO,
541                    BlockDigest::MIN,
542                )),
543                Unbounded,
544            ))
545            .map(|r| {
546                self.get_block(r)
547                    .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
548                    .clone()
549            })
550            .collect()
551    }
552
553    /// Gets the last proposed block from this authority.
554    /// If no block is proposed yet, returns the genesis block.
555    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
556        self.get_last_block_for_authority(self.context.own_index)
557    }
558
559    /// Retrieves the last accepted block from the specified `authority`. If no
560    /// block is found in cache then the genesis block is returned as no other
561    /// block has been received from that authority.
562    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
563        if let Some(last) = self.recent_refs_by_authority[authority].last() {
564            return self
565                .recent_blocks
566                .get(last)
567                .expect("Block should be found in recent blocks")
568                .block
569                .clone();
570        }
571
572        // if none exists, then fallback to genesis
573        let (_, genesis_block) = self
574            .genesis
575            .iter()
576            .find(|(block_ref, _)| block_ref.author == authority)
577            .expect("Genesis should be found for authority {authority_index}");
578        genesis_block.clone()
579    }
580
581    /// Returns cached recent blocks from the specified authority.
582    /// Blocks returned are limited to round >= `start`, and cached.
583    /// NOTE: caller should not assume returned blocks are always chained.
584    /// "Disconnected" blocks can be returned when there are byzantine blocks,
585    /// or a previously evicted block is accepted again.
586    pub(crate) fn get_cached_blocks(
587        &self,
588        authority: AuthorityIndex,
589        start: Round,
590    ) -> Vec<VerifiedBlock> {
591        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
592    }
593
594    // Retrieves the cached block within the range [start_round, end_round) from a
595    // given authority, limited in total number of blocks.
596    pub(crate) fn get_cached_blocks_in_range(
597        &self,
598        authority: AuthorityIndex,
599        start_round: Round,
600        end_round: Round,
601        limit: usize,
602    ) -> Vec<VerifiedBlock> {
603        if start_round >= end_round || limit == 0 {
604            return vec![];
605        }
606
607        let mut blocks = vec![];
608        for block_ref in self.recent_refs_by_authority[authority].range((
609            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
610            Excluded(BlockRef::new(
611                end_round,
612                AuthorityIndex::MIN,
613                BlockDigest::MIN,
614            )),
615        )) {
616            let block_info = self
617                .recent_blocks
618                .get(block_ref)
619                .expect("Block should exist in recent blocks");
620            blocks.push(block_info.block.clone());
621            if blocks.len() >= limit {
622                break;
623            }
624        }
625        blocks
626    }
627
628    // Retrieves the cached block within the range [start_round, end_round) from a
629    // given authority. NOTE: end_round must be greater than GENESIS_ROUND.
630    pub(crate) fn get_last_cached_block_in_range(
631        &self,
632        authority: AuthorityIndex,
633        start_round: Round,
634        end_round: Round,
635    ) -> Option<VerifiedBlock> {
636        if start_round >= end_round {
637            return None;
638        }
639
640        let block_ref = self.recent_refs_by_authority[authority]
641            .range((
642                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
643                Excluded(BlockRef::new(
644                    end_round,
645                    AuthorityIndex::MIN,
646                    BlockDigest::MIN,
647                )),
648            ))
649            .last()?;
650
651        self.recent_blocks
652            .get(block_ref)
653            .map(|block_info| block_info.block.clone())
654    }
655
656    /// Returns the last block proposed per authority with `evicted round <
657    /// round < end_round`. The method is guaranteed to return results only
658    /// when the `end_round` is not earlier of the available cached data for
659    /// each authority (evicted round + 1), otherwise the method will panic.
660    /// It's the caller's responsibility to ensure that is not requesting for
661    /// earlier rounds. In case of equivocation for an authority's last
662    /// slot, one block will be returned (the last in order) and the other
663    /// equivocating blocks will be returned.
664    pub(crate) fn get_last_cached_block_per_authority(
665        &self,
666        end_round: Round,
667    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
668        // Initialize with the genesis blocks as fallback
669        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
670        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
671
672        if end_round == GENESIS_ROUND {
673            panic!(
674                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
675            );
676        }
677
678        if end_round == GENESIS_ROUND + 1 {
679            return blocks.into_iter().map(|b| (b, vec![])).collect();
680        }
681
682        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
683            let authority_index = self
684                .context
685                .committee
686                .to_authority_index(authority_index)
687                .unwrap();
688
689            let last_evicted_round = self.evicted_rounds[authority_index];
690            if end_round.saturating_sub(1) <= last_evicted_round {
691                panic!(
692                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
693                );
694            }
695
696            let block_ref_iter = block_refs
697                .range((
698                    Included(BlockRef::new(
699                        last_evicted_round + 1,
700                        authority_index,
701                        BlockDigest::MIN,
702                    )),
703                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
704                ))
705                .rev();
706
707            let mut last_round = 0;
708            for block_ref in block_ref_iter {
709                if last_round == 0 {
710                    last_round = block_ref.round;
711                    let block_info = self
712                        .recent_blocks
713                        .get(block_ref)
714                        .expect("Block should exist in recent blocks");
715                    blocks[authority_index] = block_info.block.clone();
716                    continue;
717                }
718                if block_ref.round < last_round {
719                    break;
720                }
721                equivocating_blocks[authority_index].push(*block_ref);
722            }
723        }
724
725        blocks.into_iter().zip(equivocating_blocks).collect()
726    }
727
728    /// Checks whether a block exists in the slot. The method checks only
729    /// against the cached data. If the user asks for a slot that is not
730    /// within the cached data then a panic is thrown.
731    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
732        // Always return true for genesis slots.
733        if slot.round == GENESIS_ROUND {
734            return true;
735        }
736
737        let eviction_round = self.evicted_rounds[slot.authority];
738        if slot.round <= eviction_round {
739            panic!(
740                "{}",
741                format!(
742                    "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
743                    if self.gc_enabled() { " gc " } else { " " }
744                )
745            );
746        }
747
748        let mut result = self.recent_refs_by_authority[slot.authority].range((
749            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
750            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
751        ));
752        result.next().is_some()
753    }
754
755    /// Checks whether the required blocks are in cache, if exist, or otherwise
756    /// will check in store. The method is not caching back the results, so
757    /// its expensive if keep asking for cache missing blocks.
758    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
759        let mut exist = vec![false; block_refs.len()];
760        let mut missing = Vec::new();
761
762        for (index, block_ref) in block_refs.into_iter().enumerate() {
763            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
764            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
765                exist[index] = true;
766            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
767            {
768                // Optimization: recent_refs contain the most recent blocks known to this
769                // authority. If a block ref is not found there and has a higher
770                // round, it definitely is missing from this authority and there
771                // is no need to check disk.
772                exist[index] = false;
773            } else {
774                missing.push((index, block_ref));
775            }
776        }
777
778        if missing.is_empty() {
779            return exist;
780        }
781
782        let missing_refs = missing
783            .iter()
784            .map(|(_, block_ref)| *block_ref)
785            .collect::<Vec<_>>();
786        let store_results = self
787            .store
788            .contains_blocks(&missing_refs)
789            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
790        self.context
791            .metrics
792            .node_metrics
793            .dag_state_store_read_count
794            .with_label_values(&["contains_blocks"])
795            .inc();
796
797        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
798            exist[index] = result;
799        }
800
801        exist
802    }
803
804    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
805        let blocks = self.contains_blocks(vec![*block_ref]);
806        blocks.first().cloned().unwrap()
807    }
808
809    pub(crate) fn threshold_clock_round(&self) -> Round {
810        self.threshold_clock.get_round()
811    }
812
813    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
814        self.threshold_clock.get_quorum_ts()
815    }
816
817    pub(crate) fn highest_accepted_round(&self) -> Round {
818        self.highest_accepted_round
819    }
820
821    // Buffers a new commit in memory and updates last committed rounds.
822    // REQUIRED: must not skip over any commit index.
823    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
824        if let Some(last_commit) = &self.last_commit {
825            if commit.index() <= last_commit.index() {
826                error!(
827                    "New commit index {} <= last commit index {}!",
828                    commit.index(),
829                    last_commit.index()
830                );
831                return;
832            }
833            assert_eq!(commit.index(), last_commit.index() + 1);
834
835            if commit.timestamp_ms() < last_commit.timestamp_ms() {
836                panic!(
837                    "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
838                );
839            }
840        } else {
841            assert_eq!(commit.index(), 1);
842        }
843
844        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
845            previous_commit.round() < commit.round()
846        } else {
847            true
848        };
849
850        self.last_commit = Some(commit.clone());
851
852        if commit_round_advanced {
853            let now = std::time::Instant::now();
854            if let Some(previous_time) = self.last_commit_round_advancement_time {
855                self.context
856                    .metrics
857                    .node_metrics
858                    .commit_round_advancement_interval
859                    .observe(now.duration_since(previous_time).as_secs_f64())
860            }
861            self.last_commit_round_advancement_time = Some(now);
862        }
863
864        for block_ref in commit.blocks().iter() {
865            self.last_committed_rounds[block_ref.author] = max(
866                self.last_committed_rounds[block_ref.author],
867                block_ref.round,
868            );
869        }
870
871        for (i, round) in self.last_committed_rounds.iter().enumerate() {
872            let index = self.context.committee.to_authority_index(i).unwrap();
873            let hostname = &self.context.committee.authority(index).hostname;
874            self.context
875                .metrics
876                .node_metrics
877                .last_committed_authority_round
878                .with_label_values(&[hostname])
879                .set((*round).into());
880        }
881
882        self.pending_commit_votes.push_back(commit.reference());
883        self.commits_to_write.push(commit);
884    }
885
886    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
887        // We empty the unscored committed subdags to calculate reputation scores.
888        assert!(self.unscored_committed_subdags.is_empty());
889
890        // We create an empty scoring subdag once reputation scores are calculated.
891        // Note: It is okay for this to not be gated by protocol config as the
892        // scoring_subdag should be empty in either case at this point.
893        assert!(self.scoring_subdag.is_empty());
894
895        let commit_info = CommitInfo {
896            committed_rounds: self.last_committed_rounds.clone(),
897            reputation_scores,
898        };
899        let last_commit = self
900            .last_commit
901            .as_ref()
902            .expect("Last commit should already be set.");
903        self.commit_info_to_write
904            .push((last_commit.reference(), commit_info));
905    }
906
907    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
908        let mut votes = Vec::new();
909        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
910            votes.push(self.pending_commit_votes.pop_front().unwrap());
911        }
912        votes
913    }
914
915    /// Index of the last commit.
916    pub(crate) fn last_commit_index(&self) -> CommitIndex {
917        match &self.last_commit {
918            Some(commit) => commit.index(),
919            None => 0,
920        }
921    }
922
923    /// Digest of the last commit.
924    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
925        match &self.last_commit {
926            Some(commit) => commit.digest(),
927            None => CommitDigest::MIN,
928        }
929    }
930
931    /// Timestamp of the last commit.
932    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
933        match &self.last_commit {
934            Some(commit) => commit.timestamp_ms(),
935            None => 0,
936        }
937    }
938
939    /// Leader slot of the last commit.
940    pub(crate) fn last_commit_leader(&self) -> Slot {
941        match &self.last_commit {
942            Some(commit) => commit.leader().into(),
943            None => self
944                .genesis
945                .iter()
946                .next()
947                .map(|(genesis_ref, _)| *genesis_ref)
948                .expect("Genesis blocks should always be available.")
949                .into(),
950        }
951    }
952
953    /// Highest round where a block is committed, which is last commit's leader
954    /// round.
955    pub(crate) fn last_commit_round(&self) -> Round {
956        match &self.last_commit {
957            Some(commit) => commit.leader().round,
958            None => 0,
959        }
960    }
961
962    /// Last committed round per authority.
963    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
964        self.last_committed_rounds.clone()
965    }
966
967    /// The GC round is the highest round that blocks of equal or lower round
968    /// are considered obsolete and no longer possible to be committed.
969    /// There is no meaning accepting any blocks with round <= gc_round. The
970    /// Garbage Collection (GC) round is calculated based on the latest
971    /// committed leader round. When GC is disabled that will return the genesis
972    /// round.
973    pub(crate) fn gc_round(&self) -> Round {
974        self.calculate_gc_round(self.last_commit_round())
975    }
976
977    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
978        let gc_depth = self.context.protocol_config.gc_depth();
979        if gc_depth > 0 {
980            // GC is enabled, only then calculate the diff
981            commit_round.saturating_sub(gc_depth)
982        } else {
983            // Otherwise just return genesis round. That also acts as a safety mechanism so
984            // we never attempt to truncate anything even accidentally.
985            GENESIS_ROUND
986        }
987    }
988
989    pub(crate) fn gc_enabled(&self) -> bool {
990        self.context.protocol_config.gc_depth() > 0
991    }
992
993    /// After each flush, DagState becomes persisted in storage and it expected
994    /// to recover all internal states from storage after restarts.
995    pub(crate) fn flush(&mut self) {
996        let _s = self
997            .context
998            .metrics
999            .node_metrics
1000            .scope_processing_time
1001            .with_label_values(&["DagState::flush"])
1002            .start_timer();
1003        // Flush buffered data to storage.
1004        let blocks = std::mem::take(&mut self.blocks_to_write);
1005        let commits = std::mem::take(&mut self.commits_to_write);
1006        let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
1007
1008        if blocks.is_empty() && commits.is_empty() {
1009            return;
1010        }
1011        debug!(
1012            "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
1013            blocks.len(),
1014            blocks.iter().map(|b| b.reference().to_string()).join(","),
1015            commits.len(),
1016            commits.iter().map(|c| c.reference().to_string()).join(","),
1017            commit_info_to_write.len(),
1018            commit_info_to_write
1019                .iter()
1020                .map(|(commit_ref, _)| commit_ref.to_string())
1021                .join(","),
1022        );
1023
1024        // Update the scoring metrics accordingly to the blocks being flushed.
1025        let mut metrics_to_write = vec![];
1026        let threshold_clock_round = self.threshold_clock_round();
1027        for (authority_index, authority) in self.context.committee.authorities() {
1028            let last_eviction_round = self.evicted_rounds[authority_index];
1029            let current_eviction_round = self.calculate_authority_eviction_round(authority_index);
1030            let metrics_to_write_from_authority =
1031                self.context.metrics.update_scoring_metrics_on_eviction(
1032                    authority_index,
1033                    authority.hostname.as_str(),
1034                    &self.recent_refs_by_authority[authority_index],
1035                    current_eviction_round,
1036                    last_eviction_round,
1037                    threshold_clock_round,
1038                );
1039            if let Some(metrics_to_write_from_authority) = metrics_to_write_from_authority {
1040                metrics_to_write.push((authority_index, metrics_to_write_from_authority));
1041            }
1042        }
1043
1044        self.store
1045            .write(WriteBatch::new(
1046                blocks,
1047                commits,
1048                commit_info_to_write,
1049                metrics_to_write,
1050            ))
1051            .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
1052        self.context
1053            .metrics
1054            .node_metrics
1055            .dag_state_store_write_count
1056            .inc();
1057
1058        // Clean up old cached data. After flushing, all cached blocks are guaranteed to
1059        // be persisted. This clean up also triggers some of the scoring metrics
1060        // updates.
1061        for (authority_index, _) in self.context.committee.authorities() {
1062            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1063            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1064                let block_round = block_ref.round;
1065                if block_round <= eviction_round {
1066                    self.recent_blocks.remove(block_ref);
1067                    self.recent_refs_by_authority[authority_index].pop_first();
1068                } else {
1069                    break;
1070                }
1071            }
1072            self.evicted_rounds[authority_index] = eviction_round;
1073        }
1074
1075        let metrics = &self.context.metrics.node_metrics;
1076        metrics
1077            .dag_state_recent_blocks
1078            .set(self.recent_blocks.len() as i64);
1079        metrics.dag_state_recent_refs.set(
1080            self.recent_refs_by_authority
1081                .iter()
1082                .map(BTreeSet::len)
1083                .sum::<usize>() as i64,
1084        );
1085    }
1086
1087    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1088        self.store
1089            .read_last_commit_info()
1090            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1091    }
1092
1093    // TODO: Remove four methods below this when DistributedVoteScoring is enabled.
1094    pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1095        self.unscored_committed_subdags.len() as u64
1096    }
1097
1098    #[cfg(test)]
1099    pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1100        self.unscored_committed_subdags.clone()
1101    }
1102
1103    pub(crate) fn add_unscored_committed_subdags(
1104        &mut self,
1105        committed_subdags: Vec<CommittedSubDag>,
1106    ) {
1107        self.unscored_committed_subdags.extend(committed_subdags);
1108    }
1109
1110    pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1111        std::mem::take(&mut self.unscored_committed_subdags)
1112    }
1113
1114    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1115        self.scoring_subdag.add_subdags(scoring_subdags);
1116    }
1117
1118    pub(crate) fn clear_scoring_subdag(&mut self) {
1119        self.scoring_subdag.clear();
1120    }
1121
1122    pub(crate) fn scoring_subdags_count(&self) -> usize {
1123        self.scoring_subdag.scored_subdags_count()
1124    }
1125
1126    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1127        self.scoring_subdag.is_empty()
1128    }
1129
1130    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1131        self.scoring_subdag.calculate_distributed_vote_scores()
1132    }
1133
1134    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1135        self.scoring_subdag
1136            .commit_range
1137            .as_ref()
1138            .expect("commit range should exist for scoring subdag")
1139            .end()
1140    }
1141
1142    /// The last round that should get evicted after a cache clean up operation.
1143    /// After this round we are guaranteed to have all the produced blocks
1144    /// from that authority. For any round that is <= `last_evicted_round`
1145    /// we don't have such guarantees as out of order blocks might exist.
1146    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1147        if self.gc_enabled() {
1148            let last_round = self.recent_refs_by_authority[authority_index]
1149                .last()
1150                .map(|block_ref| block_ref.round)
1151                .unwrap_or(GENESIS_ROUND);
1152
1153            Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1154        } else {
1155            let commit_round = self.last_committed_rounds[authority_index];
1156            Self::eviction_round(commit_round, self.cached_rounds)
1157        }
1158    }
1159
1160    /// Calculates the last eviction round based on the provided `commit_round`.
1161    /// Any blocks with round <= the evict round have been cleaned up.
1162    fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1163        commit_round.saturating_sub(cached_rounds)
1164    }
1165
1166    /// Calculates the eviction round for the given authority. The goal is to
1167    /// keep at least `cached_rounds` of the latest blocks in the cache (if
1168    /// enough data is available), while evicting blocks with rounds <=
1169    /// `gc_round` when possible.
1170    fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1171        gc_round.min(last_round.saturating_sub(cached_rounds))
1172    }
1173
1174    /// Detects and returns the blocks of the round that forms the last quorum.
1175    /// The method will return the quorum even if that's genesis.
1176    #[cfg(test)]
1177    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1178        // the quorum should exist either on the highest accepted round or the one
1179        // before. If we fail to detect a quorum then it means that our DAG has
1180        // advanced with missing causal history.
1181        for round in
1182            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1183        {
1184            if round == GENESIS_ROUND {
1185                return self.genesis_blocks();
1186            }
1187            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1188            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1189
1190            // Since the minimum wave length is 3 we expect to find a quorum in the
1191            // uncommitted rounds.
1192            let blocks = self.get_uncommitted_blocks_at_round(round);
1193            for block in &blocks {
1194                if quorum.add(block.author(), &self.context.committee) {
1195                    return blocks;
1196                }
1197            }
1198        }
1199
1200        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1201    }
1202
1203    #[cfg(test)]
1204    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1205        self.genesis.values().cloned().collect()
1206    }
1207
1208    #[cfg(test)]
1209    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1210        self.last_commit = Some(commit);
1211    }
1212}
1213
1214struct BlockInfo {
1215    block: VerifiedBlock,
1216    // Whether the block has been committed
1217    committed: bool,
1218}
1219
1220impl BlockInfo {
1221    fn new(block: VerifiedBlock) -> Self {
1222        Self {
1223            block,
1224            committed: false,
1225        }
1226    }
1227}
1228
1229#[cfg(test)]
1230mod test {
1231    use std::vec;
1232
1233    use parking_lot::RwLock;
1234    use rstest::rstest;
1235
1236    use super::*;
1237    use crate::{
1238        block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1239        storage::{WriteBatch, mem_store::MemStore},
1240        test_dag_builder::DagBuilder,
1241        test_dag_parser::parse_dag,
1242    };
1243
1244    #[tokio::test]
1245    async fn test_get_blocks() {
1246        let (context, _) = Context::new_for_test(4);
1247        let context = Arc::new(context);
1248        let store = Arc::new(MemStore::new());
1249        let mut dag_state = DagState::new(context.clone(), store.clone());
1250        let own_index = AuthorityIndex::new_for_test(0);
1251
1252        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1253        let num_rounds: u32 = 10;
1254        let non_existent_round: u32 = 100;
1255        let num_authorities: u32 = 3;
1256        let num_blocks_per_slot: usize = 3;
1257        let mut blocks = BTreeMap::new();
1258        for round in 1..=num_rounds {
1259            for author in 0..num_authorities {
1260                // Create 3 blocks per slot, with different timestamps and digests.
1261                let base_ts = round as BlockTimestampMs * 1000;
1262                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1263                    let block = VerifiedBlock::new_for_test(
1264                        TestBlock::new(round, author)
1265                            .set_timestamp_ms(timestamp)
1266                            .build(),
1267                    );
1268                    dag_state.accept_block(block.clone());
1269                    blocks.insert(block.reference(), block);
1270
1271                    // Only write one block per slot for own index
1272                    if AuthorityIndex::new_for_test(author) == own_index {
1273                        break;
1274                    }
1275                }
1276            }
1277        }
1278
1279        // Check uncommitted blocks that exist.
1280        for (r, block) in &blocks {
1281            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1282        }
1283
1284        // Check uncommitted blocks that do not exist.
1285        let last_ref = blocks.keys().last().unwrap();
1286        assert!(
1287            dag_state
1288                .get_block(&BlockRef::new(
1289                    last_ref.round,
1290                    last_ref.author,
1291                    BlockDigest::MIN
1292                ))
1293                .is_none()
1294        );
1295
1296        // Check slots with uncommitted blocks.
1297        for round in 1..=num_rounds {
1298            for author in 0..num_authorities {
1299                let slot = Slot::new(
1300                    round,
1301                    context
1302                        .committee
1303                        .to_authority_index(author as usize)
1304                        .unwrap(),
1305                );
1306                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1307
1308                // We only write one block per slot for own index
1309                if AuthorityIndex::new_for_test(author) == own_index {
1310                    assert_eq!(blocks.len(), 1);
1311                } else {
1312                    assert_eq!(blocks.len(), num_blocks_per_slot);
1313                }
1314
1315                for b in blocks {
1316                    assert_eq!(b.round(), round);
1317                    assert_eq!(
1318                        b.author(),
1319                        context
1320                            .committee
1321                            .to_authority_index(author as usize)
1322                            .unwrap()
1323                    );
1324                }
1325            }
1326        }
1327
1328        // Check slots without uncommitted blocks.
1329        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1330        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1331
1332        // Check rounds with uncommitted blocks.
1333        for round in 1..=num_rounds {
1334            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1335            // Expect 3 blocks per authority except for own authority which should
1336            // have 1 block.
1337            assert_eq!(
1338                blocks.len(),
1339                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1340            );
1341            for b in blocks {
1342                assert_eq!(b.round(), round);
1343            }
1344        }
1345
1346        // Check rounds without uncommitted blocks.
1347        assert!(
1348            dag_state
1349                .get_uncommitted_blocks_at_round(non_existent_round)
1350                .is_empty()
1351        );
1352    }
1353
1354    #[tokio::test]
1355    async fn test_ancestors_at_uncommitted_round() {
1356        // Initialize DagState.
1357        let (context, _) = Context::new_for_test(4);
1358        let context = Arc::new(context);
1359        let store = Arc::new(MemStore::new());
1360        let mut dag_state = DagState::new(context.clone(), store.clone());
1361
1362        // Populate DagState.
1363
1364        // Round 10 refs will not have their blocks in DagState.
1365        let round_10_refs: Vec<_> = (0..4)
1366            .map(|a| {
1367                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1368                    .reference()
1369            })
1370            .collect();
1371
1372        // Round 11 blocks.
1373        let round_11 = vec![
1374            // This will connect to round 12.
1375            VerifiedBlock::new_for_test(
1376                TestBlock::new(11, 0)
1377                    .set_timestamp_ms(1100)
1378                    .set_ancestors(round_10_refs.clone())
1379                    .build(),
1380            ),
1381            // Slot(11, 1) has 3 blocks.
1382            // This will connect to round 12.
1383            VerifiedBlock::new_for_test(
1384                TestBlock::new(11, 1)
1385                    .set_timestamp_ms(1110)
1386                    .set_ancestors(round_10_refs.clone())
1387                    .build(),
1388            ),
1389            // This will connect to round 13.
1390            VerifiedBlock::new_for_test(
1391                TestBlock::new(11, 1)
1392                    .set_timestamp_ms(1111)
1393                    .set_ancestors(round_10_refs.clone())
1394                    .build(),
1395            ),
1396            // This will not connect to any block.
1397            VerifiedBlock::new_for_test(
1398                TestBlock::new(11, 1)
1399                    .set_timestamp_ms(1112)
1400                    .set_ancestors(round_10_refs.clone())
1401                    .build(),
1402            ),
1403            // This will not connect to any block.
1404            VerifiedBlock::new_for_test(
1405                TestBlock::new(11, 2)
1406                    .set_timestamp_ms(1120)
1407                    .set_ancestors(round_10_refs.clone())
1408                    .build(),
1409            ),
1410            // This will connect to round 12.
1411            VerifiedBlock::new_for_test(
1412                TestBlock::new(11, 3)
1413                    .set_timestamp_ms(1130)
1414                    .set_ancestors(round_10_refs.clone())
1415                    .build(),
1416            ),
1417        ];
1418
1419        // Round 12 blocks.
1420        let ancestors_for_round_12 = vec![
1421            round_11[0].reference(),
1422            round_11[1].reference(),
1423            round_11[5].reference(),
1424        ];
1425        let round_12 = vec![
1426            VerifiedBlock::new_for_test(
1427                TestBlock::new(12, 0)
1428                    .set_timestamp_ms(1200)
1429                    .set_ancestors(ancestors_for_round_12.clone())
1430                    .build(),
1431            ),
1432            VerifiedBlock::new_for_test(
1433                TestBlock::new(12, 2)
1434                    .set_timestamp_ms(1220)
1435                    .set_ancestors(ancestors_for_round_12.clone())
1436                    .build(),
1437            ),
1438            VerifiedBlock::new_for_test(
1439                TestBlock::new(12, 3)
1440                    .set_timestamp_ms(1230)
1441                    .set_ancestors(ancestors_for_round_12.clone())
1442                    .build(),
1443            ),
1444        ];
1445
1446        // Round 13 blocks.
1447        let ancestors_for_round_13 = vec![
1448            round_12[0].reference(),
1449            round_12[1].reference(),
1450            round_12[2].reference(),
1451            round_11[2].reference(),
1452        ];
1453        let round_13 = vec![
1454            VerifiedBlock::new_for_test(
1455                TestBlock::new(12, 1)
1456                    .set_timestamp_ms(1300)
1457                    .set_ancestors(ancestors_for_round_13.clone())
1458                    .build(),
1459            ),
1460            VerifiedBlock::new_for_test(
1461                TestBlock::new(12, 2)
1462                    .set_timestamp_ms(1320)
1463                    .set_ancestors(ancestors_for_round_13.clone())
1464                    .build(),
1465            ),
1466            VerifiedBlock::new_for_test(
1467                TestBlock::new(12, 3)
1468                    .set_timestamp_ms(1330)
1469                    .set_ancestors(ancestors_for_round_13.clone())
1470                    .build(),
1471            ),
1472        ];
1473
1474        // Round 14 anchor block.
1475        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1476        let anchor = VerifiedBlock::new_for_test(
1477            TestBlock::new(14, 1)
1478                .set_timestamp_ms(1410)
1479                .set_ancestors(ancestors_for_round_14)
1480                .build(),
1481        );
1482
1483        // Add all blocks (at and above round 11) to DagState.
1484        for b in round_11
1485            .iter()
1486            .chain(round_12.iter())
1487            .chain(round_13.iter())
1488            .chain([anchor.clone()].iter())
1489        {
1490            dag_state.accept_block(b.clone());
1491        }
1492
1493        // Check ancestors connected to anchor.
1494        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1495        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1496        ancestors_refs.sort();
1497        let mut expected_refs = vec![
1498            round_11[0].reference(),
1499            round_11[1].reference(),
1500            round_11[2].reference(),
1501            round_11[5].reference(),
1502        ];
1503        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1
1504        // & 2) might not be in right lexicographical order.
1505        assert_eq!(
1506            ancestors_refs, expected_refs,
1507            "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1508        );
1509    }
1510
1511    #[tokio::test]
1512    async fn test_contains_blocks_in_cache_or_store() {
1513        /// Only keep elements up to 2 rounds before the last committed round
1514        const CACHED_ROUNDS: Round = 2;
1515
1516        let (mut context, _) = Context::new_for_test(4);
1517        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1518
1519        let context = Arc::new(context);
1520        let store = Arc::new(MemStore::new());
1521        let mut dag_state = DagState::new(context.clone(), store.clone());
1522
1523        // Create test blocks for round 1 ~ 10
1524        let num_rounds: u32 = 10;
1525        let num_authorities: u32 = 4;
1526        let mut blocks = Vec::new();
1527
1528        for round in 1..=num_rounds {
1529            for author in 0..num_authorities {
1530                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1531                blocks.push(block);
1532            }
1533        }
1534
1535        // Now write in store the blocks from first 4 rounds and the rest to the dag
1536        // state
1537        blocks.clone().into_iter().for_each(|block| {
1538            if block.round() <= 4 {
1539                store
1540                    .write(WriteBatch::default().blocks(vec![block]))
1541                    .unwrap();
1542            } else {
1543                dag_state.accept_blocks(vec![block]);
1544            }
1545        });
1546
1547        // Now when trying to query whether we have all the blocks, we should
1548        // successfully retrieve a positive answer where the blocks of first 4
1549        // round should be found in DagState and the rest in store.
1550        let mut block_refs = blocks
1551            .iter()
1552            .map(|block| block.reference())
1553            .collect::<Vec<_>>();
1554        let result = dag_state.contains_blocks(block_refs.clone());
1555
1556        // Ensure everything is found
1557        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1558        assert_eq!(result, expected);
1559
1560        // Now try to ask also for one block ref that is neither in cache nor in store
1561        block_refs.insert(
1562            3,
1563            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1564        );
1565        let result = dag_state.contains_blocks(block_refs.clone());
1566
1567        // Then all should be found apart from the last one
1568        expected.insert(3, false);
1569        assert_eq!(result, expected.clone());
1570    }
1571
1572    #[tokio::test]
1573    async fn test_contains_cached_block_at_slot() {
1574        /// Only keep elements up to 2 rounds before the last committed round
1575        const CACHED_ROUNDS: Round = 2;
1576
1577        let num_authorities: u32 = 4;
1578        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1579        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1580
1581        let context = Arc::new(context);
1582        let store = Arc::new(MemStore::new());
1583        let mut dag_state = DagState::new(context.clone(), store.clone());
1584
1585        // Create test blocks for round 1 ~ 10
1586        let num_rounds: u32 = 10;
1587        let mut blocks = Vec::new();
1588
1589        for round in 1..=num_rounds {
1590            for author in 0..num_authorities {
1591                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1592                blocks.push(block.clone());
1593                dag_state.accept_block(block);
1594            }
1595        }
1596
1597        // Query for genesis round 0, genesis blocks should be returned
1598        for (author, _) in context.committee.authorities() {
1599            assert!(
1600                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1601                "Genesis should always be found"
1602            );
1603        }
1604
1605        // Now when trying to query whether we have all the blocks, we should
1606        // successfully retrieve a positive answer where the blocks of first 4
1607        // round should be found in DagState and the rest in store.
1608        let mut block_refs = blocks
1609            .iter()
1610            .map(|block| block.reference())
1611            .collect::<Vec<_>>();
1612
1613        for block_ref in block_refs.clone() {
1614            let slot = block_ref.into();
1615            let found = dag_state.contains_cached_block_at_slot(slot);
1616            assert!(found, "A block should be found at slot {slot}");
1617        }
1618
1619        // Now try to ask also for one block ref that is not in cache
1620        // Then all should be found apart from the last one
1621        block_refs.insert(
1622            3,
1623            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1624        );
1625        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1626        expected.insert(3, false);
1627
1628        // Attempt to check the same for via the contains slot method
1629        for block_ref in block_refs {
1630            let slot = block_ref.into();
1631            let found = dag_state.contains_cached_block_at_slot(slot);
1632
1633            assert_eq!(expected.remove(0), found);
1634        }
1635    }
1636
1637    #[tokio::test]
1638    #[should_panic(
1639        expected = "Attempted to check for slot S8[0] that is <= the last evicted round 8"
1640    )]
1641    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1642        /// Only keep elements up to 2 rounds before the last committed round
1643        const CACHED_ROUNDS: Round = 2;
1644
1645        let (mut context, _) = Context::new_for_test(4);
1646        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1647        context
1648            .protocol_config
1649            .set_consensus_gc_depth_for_testing(0);
1650
1651        let context = Arc::new(context);
1652        let store = Arc::new(MemStore::new());
1653        let mut dag_state = DagState::new(context.clone(), store.clone());
1654
1655        // Create test blocks for round 1 ~ 10 for authority 0
1656        let mut blocks = Vec::new();
1657        for round in 1..=10 {
1658            let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1659            blocks.push(block.clone());
1660            dag_state.accept_block(block);
1661        }
1662
1663        // Now add a commit to trigger an eviction
1664        dag_state.add_commit(TrustedCommit::new_for_test(
1665            1 as CommitIndex,
1666            CommitDigest::MIN,
1667            0,
1668            blocks.last().unwrap().reference(),
1669            blocks
1670                .into_iter()
1671                .map(|block| block.reference())
1672                .collect::<Vec<_>>(),
1673        ));
1674
1675        dag_state.flush();
1676
1677        // When trying to request for authority 0 at block slot 8 it should panic, as
1678        // anything that is <= commit_round - cached_rounds = 10 - 2 = 8 should
1679        // be evicted
1680        let _ =
1681            dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1682    }
1683
1684    #[tokio::test]
1685    #[should_panic(
1686        expected = "Attempted to check for slot S3[1] that is <= the last gc evicted round 3"
1687    )]
1688    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1689        /// Keep 2 rounds from the highest committed round. This is considered
1690        /// universal and minimum necessary blocks to hold
1691        /// for the correct node operation.
1692        const GC_DEPTH: u32 = 2;
1693        /// Keep at least 3 rounds in cache for each authority.
1694        const CACHED_ROUNDS: Round = 3;
1695
1696        let (mut context, _) = Context::new_for_test(4);
1697        context
1698            .protocol_config
1699            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1700        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1701
1702        let context = Arc::new(context);
1703        let store = Arc::new(MemStore::new());
1704        let mut dag_state = DagState::new(context.clone(), store.clone());
1705
1706        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 -
1707        // 6.
1708        let mut dag_builder = DagBuilder::new(context.clone());
1709        dag_builder.layers(1..=3).build();
1710        dag_builder
1711            .layers(4..=6)
1712            .authorities(vec![AuthorityIndex::new_for_test(0)])
1713            .skip_block()
1714            .build();
1715
1716        // Accept all blocks
1717        dag_builder
1718            .all_blocks()
1719            .into_iter()
1720            .for_each(|block| dag_state.accept_block(block));
1721
1722        // Now add a commit for leader round 5 to trigger an eviction
1723        dag_state.add_commit(TrustedCommit::new_for_test(
1724            1 as CommitIndex,
1725            CommitDigest::MIN,
1726            0,
1727            dag_builder.leader_block(5).unwrap().reference(),
1728            vec![],
1729        ));
1730
1731        dag_state.flush();
1732
1733        // Ensure that gc round has been updated
1734        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1735
1736        // Now what we expect to happen is for:
1737        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1738        // * Node 0 should have in cache blocks from it's latest round, 3, up to round
1739        //   1, which is the number of cached_rounds.
1740        for authority_index in 1..=3 {
1741            for round in 4..=6 {
1742                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1743                    round,
1744                    AuthorityIndex::new_for_test(authority_index)
1745                )));
1746            }
1747        }
1748
1749        for round in 1..=3 {
1750            assert!(
1751                dag_state.contains_cached_block_at_slot(Slot::new(
1752                    round,
1753                    AuthorityIndex::new_for_test(0)
1754                ))
1755            );
1756        }
1757
1758        // When trying to request for authority 1 at block slot 3 it should panic, as
1759        // anything that is <= 3 should be evicted
1760        let _ =
1761            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1762    }
1763
1764    #[tokio::test]
1765    async fn test_get_blocks_in_cache_or_store() {
1766        let (context, _) = Context::new_for_test(4);
1767        let context = Arc::new(context);
1768        let store = Arc::new(MemStore::new());
1769        let mut dag_state = DagState::new(context.clone(), store.clone());
1770
1771        // Create test blocks for round 1 ~ 10
1772        let num_rounds: u32 = 10;
1773        let num_authorities: u32 = 4;
1774        let mut blocks = Vec::new();
1775
1776        for round in 1..=num_rounds {
1777            for author in 0..num_authorities {
1778                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1779                blocks.push(block);
1780            }
1781        }
1782
1783        // Now write in store the blocks from first 4 rounds and the rest to the dag
1784        // state
1785        blocks.clone().into_iter().for_each(|block| {
1786            if block.round() <= 4 {
1787                store
1788                    .write(WriteBatch::default().blocks(vec![block]))
1789                    .unwrap();
1790            } else {
1791                dag_state.accept_blocks(vec![block]);
1792            }
1793        });
1794
1795        // Now when trying to query whether we have all the blocks, we should
1796        // successfully retrieve a positive answer where the blocks of first 4
1797        // round should be found in DagState and the rest in store.
1798        let mut block_refs = blocks
1799            .iter()
1800            .map(|block| block.reference())
1801            .collect::<Vec<_>>();
1802        let result = dag_state.get_blocks(&block_refs);
1803
1804        let mut expected = blocks
1805            .into_iter()
1806            .map(Some)
1807            .collect::<Vec<Option<VerifiedBlock>>>();
1808
1809        // Ensure everything is found
1810        assert_eq!(result, expected.clone());
1811
1812        // Now try to ask also for one block ref that is neither in cache nor in store
1813        block_refs.insert(
1814            3,
1815            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1816        );
1817        let result = dag_state.get_blocks(&block_refs);
1818
1819        // Then all should be found apart from the last one
1820        expected.insert(3, None);
1821        assert_eq!(result, expected);
1822    }
1823
1824    // TODO: Remove when DistributedVoteScoring is enabled.
1825    #[rstest]
1826    #[tokio::test]
1827    async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1828        telemetry_subscribers::init_for_testing();
1829        let num_authorities: u32 = 4;
1830        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1831        context
1832            .protocol_config
1833            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1834
1835        if gc_depth > 0 {
1836            context
1837                .protocol_config
1838                .set_consensus_gc_depth_for_testing(gc_depth);
1839        }
1840
1841        let context = Arc::new(context);
1842        let store = Arc::new(MemStore::new());
1843        let mut dag_state = DagState::new(context.clone(), store.clone());
1844
1845        // Create test blocks and commits for round 1 ~ 10
1846        let num_rounds: u32 = 10;
1847        let mut dag_builder = DagBuilder::new(context.clone());
1848        dag_builder.layers(1..=num_rounds).build();
1849        let mut commits = vec![];
1850
1851        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1852            commits.push(commit);
1853        }
1854
1855        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1856        let temp_commits = commits.split_off(5);
1857        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1858        for commit in commits.clone() {
1859            dag_state.add_commit(commit);
1860        }
1861
1862        // Flush the dag state
1863        dag_state.flush();
1864
1865        // Add the rest of the blocks and commits to the dag state
1866        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1867        for commit in temp_commits.clone() {
1868            dag_state.add_commit(commit);
1869        }
1870
1871        // All blocks should be found in DagState.
1872        let all_blocks = dag_builder.blocks(6..=num_rounds);
1873        let block_refs = all_blocks
1874            .iter()
1875            .map(|block| block.reference())
1876            .collect::<Vec<_>>();
1877
1878        let result = dag_state
1879            .get_blocks(&block_refs)
1880            .into_iter()
1881            .map(|b| b.unwrap())
1882            .collect::<Vec<_>>();
1883        assert_eq!(result, all_blocks);
1884
1885        // Last commit index should be 10.
1886        assert_eq!(dag_state.last_commit_index(), 10);
1887        assert_eq!(
1888            dag_state.last_committed_rounds(),
1889            dag_builder.last_committed_rounds.clone()
1890        );
1891
1892        // Destroy the dag state.
1893        drop(dag_state);
1894
1895        // Recover the state from the store
1896        let dag_state = DagState::new(context.clone(), store.clone());
1897
1898        // Blocks of first 5 rounds should be found in DagState.
1899        let blocks = dag_builder.blocks(1..=5);
1900        let block_refs = blocks
1901            .iter()
1902            .map(|block| block.reference())
1903            .collect::<Vec<_>>();
1904        let result = dag_state
1905            .get_blocks(&block_refs)
1906            .into_iter()
1907            .map(|b| b.unwrap())
1908            .collect::<Vec<_>>();
1909        assert_eq!(result, blocks);
1910
1911        // Blocks above round 5 should not be in DagState, because they are not flushed.
1912        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1913        let block_refs = missing_blocks
1914            .iter()
1915            .map(|block| block.reference())
1916            .collect::<Vec<_>>();
1917        let retrieved_blocks = dag_state
1918            .get_blocks(&block_refs)
1919            .into_iter()
1920            .flatten()
1921            .collect::<Vec<_>>();
1922        assert!(retrieved_blocks.is_empty());
1923
1924        // Last commit index should be 5.
1925        assert_eq!(dag_state.last_commit_index(), 5);
1926
1927        // This is the last_commit_rounds of the first 5 commits that were flushed
1928        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1929        assert_eq!(
1930            dag_state.last_committed_rounds(),
1931            expected_last_committed_rounds
1932        );
1933
1934        // Unscored subdags will be recovered based on the flushed commits and no commit
1935        // info
1936        assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1937    }
1938
1939    #[tokio::test]
1940    async fn test_flush_and_recovery() {
1941        telemetry_subscribers::init_for_testing();
1942        let num_authorities: u32 = 4;
1943        let (context, _) = Context::new_for_test(num_authorities as usize);
1944        let context = Arc::new(context);
1945        let store = Arc::new(MemStore::new());
1946        let mut dag_state = DagState::new(context.clone(), store.clone());
1947
1948        // Create test blocks and commits for round 1 ~ 10
1949        let num_rounds: u32 = 10;
1950        let mut dag_builder = DagBuilder::new(context.clone());
1951        dag_builder.layers(1..=num_rounds).build();
1952        let mut commits = vec![];
1953        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1954            commits.push(commit);
1955        }
1956
1957        // Add the blocks from first 5 rounds and first 5 commits to the dag state
1958        let temp_commits = commits.split_off(5);
1959        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1960        for commit in commits.clone() {
1961            dag_state.add_commit(commit);
1962        }
1963
1964        // Flush the dag state
1965        dag_state.flush();
1966
1967        // Add the rest of the blocks and commits to the dag state
1968        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1969        for commit in temp_commits.clone() {
1970            dag_state.add_commit(commit);
1971        }
1972
1973        // All blocks should be found in DagState.
1974        let all_blocks = dag_builder.blocks(6..=num_rounds);
1975        let block_refs = all_blocks
1976            .iter()
1977            .map(|block| block.reference())
1978            .collect::<Vec<_>>();
1979        let result = dag_state
1980            .get_blocks(&block_refs)
1981            .into_iter()
1982            .map(|b| b.unwrap())
1983            .collect::<Vec<_>>();
1984        assert_eq!(result, all_blocks);
1985
1986        // Last commit index should be 10.
1987        assert_eq!(dag_state.last_commit_index(), 10);
1988        assert_eq!(
1989            dag_state.last_committed_rounds(),
1990            dag_builder.last_committed_rounds.clone()
1991        );
1992
1993        // Destroy the dag state.
1994        drop(dag_state);
1995
1996        // Recover the state from the store
1997        let dag_state = DagState::new(context.clone(), store.clone());
1998
1999        // Blocks of first 5 rounds should be found in DagState.
2000        let blocks = dag_builder.blocks(1..=5);
2001        let block_refs = blocks
2002            .iter()
2003            .map(|block| block.reference())
2004            .collect::<Vec<_>>();
2005        let result = dag_state
2006            .get_blocks(&block_refs)
2007            .into_iter()
2008            .map(|b| b.unwrap())
2009            .collect::<Vec<_>>();
2010        assert_eq!(result, blocks);
2011
2012        // Blocks above round 5 should not be in DagState, because they are not flushed.
2013        let missing_blocks = dag_builder.blocks(6..=num_rounds);
2014        let block_refs = missing_blocks
2015            .iter()
2016            .map(|block| block.reference())
2017            .collect::<Vec<_>>();
2018        let retrieved_blocks = dag_state
2019            .get_blocks(&block_refs)
2020            .into_iter()
2021            .flatten()
2022            .collect::<Vec<_>>();
2023        assert!(retrieved_blocks.is_empty());
2024
2025        // Last commit index should be 5.
2026        assert_eq!(dag_state.last_commit_index(), 5);
2027
2028        // This is the last_commit_rounds of the first 5 commits that were flushed
2029        let expected_last_committed_rounds = vec![4, 5, 4, 4];
2030        assert_eq!(
2031            dag_state.last_committed_rounds(),
2032            expected_last_committed_rounds
2033        );
2034        // Unscored subdags will be recovered based on the flushed commits and no commit
2035        // info
2036        assert_eq!(dag_state.scoring_subdags_count(), 5);
2037    }
2038
2039    #[tokio::test]
2040    async fn test_flush_and_recovery_gc_enabled() {
2041        telemetry_subscribers::init_for_testing();
2042
2043        const GC_DEPTH: u32 = 3;
2044        const CACHED_ROUNDS: u32 = 4;
2045
2046        let num_authorities: u32 = 4;
2047        let (mut context, _) = Context::new_for_test(num_authorities as usize);
2048        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2049        context
2050            .protocol_config
2051            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2052        context
2053            .protocol_config
2054            .set_consensus_linearize_subdag_v2_for_testing(true);
2055
2056        let context = Arc::new(context);
2057
2058        let store = Arc::new(MemStore::new());
2059        let mut dag_state = DagState::new(context.clone(), store.clone());
2060
2061        let num_rounds: u32 = 10;
2062        let mut dag_builder = DagBuilder::new(context.clone());
2063        dag_builder.layers(1..=5).build();
2064        dag_builder
2065            .layers(6..=8)
2066            .authorities(vec![AuthorityIndex::new_for_test(0)])
2067            .skip_block()
2068            .build();
2069        dag_builder.layers(9..=num_rounds).build();
2070
2071        let mut commits = dag_builder
2072            .get_sub_dag_and_commits(1..=num_rounds)
2073            .into_iter()
2074            .map(|(_subdag, commit)| commit)
2075            .collect::<Vec<_>>();
2076
2077        // Add the blocks from first 8 rounds and first 7 commits to the dag state
2078        // It's 7 commits because we missing the commit of round 8 where authority 0 is
2079        // the leader, but produced no block
2080        let temp_commits = commits.split_off(7);
2081        dag_state.accept_blocks(dag_builder.blocks(1..=8));
2082        for commit in commits.clone() {
2083            dag_state.add_commit(commit);
2084        }
2085
2086        // Holds all the committed blocks from the commits that ended up being persisted
2087        // (flushed). Any commits that not flushed will not be considered.
2088        let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2089        for commit in commits.iter() {
2090            all_committed_blocks.extend(commit.blocks());
2091        }
2092        // Flush the dag state
2093        dag_state.flush();
2094
2095        // Add the rest of the blocks and commits to the dag state
2096        dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2097        for commit in temp_commits.clone() {
2098            dag_state.add_commit(commit);
2099        }
2100
2101        // All blocks should be found in DagState.
2102        let all_blocks = dag_builder.blocks(1..=num_rounds);
2103        let block_refs = all_blocks
2104            .iter()
2105            .map(|block| block.reference())
2106            .collect::<Vec<_>>();
2107        let result = dag_state
2108            .get_blocks(&block_refs)
2109            .into_iter()
2110            .map(|b| b.unwrap())
2111            .collect::<Vec<_>>();
2112        assert_eq!(result, all_blocks);
2113
2114        // Last commit index should be 9
2115        assert_eq!(dag_state.last_commit_index(), 9);
2116        assert_eq!(
2117            dag_state.last_committed_rounds(),
2118            dag_builder.last_committed_rounds.clone()
2119        );
2120
2121        // Destroy the dag state.
2122        drop(dag_state);
2123
2124        // Recover the state from the store
2125        let dag_state = DagState::new(context.clone(), store.clone());
2126
2127        // Blocks of first 5 rounds should be found in DagState.
2128        let blocks = dag_builder.blocks(1..=5);
2129        let block_refs = blocks
2130            .iter()
2131            .map(|block| block.reference())
2132            .collect::<Vec<_>>();
2133        let result = dag_state
2134            .get_blocks(&block_refs)
2135            .into_iter()
2136            .map(|b| b.unwrap())
2137            .collect::<Vec<_>>();
2138        assert_eq!(result, blocks);
2139
2140        // Blocks above round 9 should not be in DagState, because they are not flushed.
2141        let missing_blocks = dag_builder.blocks(9..=num_rounds);
2142        let block_refs = missing_blocks
2143            .iter()
2144            .map(|block| block.reference())
2145            .collect::<Vec<_>>();
2146        let retrieved_blocks = dag_state
2147            .get_blocks(&block_refs)
2148            .into_iter()
2149            .flatten()
2150            .collect::<Vec<_>>();
2151        assert!(retrieved_blocks.is_empty());
2152
2153        // Last commit index should be 7.
2154        assert_eq!(dag_state.last_commit_index(), 7);
2155
2156        // This is the last_commit_rounds of the first 7 commits that were flushed
2157        let expected_last_committed_rounds = vec![5, 6, 6, 7];
2158        assert_eq!(
2159            dag_state.last_committed_rounds(),
2160            expected_last_committed_rounds
2161        );
2162        // Unscored subdags will be recoverd based on the flushed commits and no commit
2163        // info
2164        assert_eq!(dag_state.scoring_subdags_count(), 7);
2165        // Ensure that cached blocks exist only for specific rounds per authority
2166        for (authority_index, _) in context.committee.authorities() {
2167            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2168
2169            // Ensure that eviction rounds have been properly recovered
2170            // DagState should hold cached blocks for authority 0 for rounds [2..=5] as no
2171            // higher blocks exist and due to CACHED_ROUNDS = 4 we want at max
2172            // to hold blocks for 4 rounds in cache.
2173            if authority_index == AuthorityIndex::new_for_test(0) {
2174                assert_eq!(blocks.len(), 4);
2175                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2176                assert!(
2177                    blocks
2178                        .into_iter()
2179                        .all(|block| block.round() >= 2 && block.round() <= 5)
2180                );
2181            } else {
2182                assert_eq!(blocks.len(), 4);
2183                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2184                assert!(
2185                    blocks
2186                        .into_iter()
2187                        .all(|block| block.round() >= 5 && block.round() <= 8)
2188                );
2189            }
2190        }
2191        // Ensure that committed blocks from > gc_round have been correctly marked as
2192        // committed according to committed sub dags
2193        let gc_round = dag_state.gc_round();
2194        assert_eq!(gc_round, 4);
2195        dag_state
2196            .recent_blocks
2197            .iter()
2198            .for_each(|(block_ref, block_info)| {
2199                if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2200                    assert!(
2201                        block_info.committed,
2202                        "Block {block_ref:?} should be committed"
2203                    );
2204                };
2205            });
2206    }
2207
2208    #[tokio::test]
2209    async fn test_block_info_as_committed() {
2210        let num_authorities: u32 = 4;
2211        let (context, _) = Context::new_for_test(num_authorities as usize);
2212        let context = Arc::new(context);
2213
2214        let store = Arc::new(MemStore::new());
2215        let mut dag_state = DagState::new(context.clone(), store.clone());
2216
2217        // Accept a block
2218        let block = VerifiedBlock::new_for_test(
2219            TestBlock::new(1, 0)
2220                .set_timestamp_ms(1000)
2221                .set_ancestors(vec![])
2222                .build(),
2223        );
2224
2225        dag_state.accept_block(block.clone());
2226
2227        // Query is committed
2228        assert!(!dag_state.is_committed(&block.reference()));
2229
2230        // Set block as committed for first time should return true
2231        assert!(
2232            dag_state.set_committed(&block.reference()),
2233            "Block should be successfully set as committed for first time"
2234        );
2235
2236        // Now it should appear as committed
2237        assert!(dag_state.is_committed(&block.reference()));
2238
2239        // Trying to set the block as committed again, it should return false.
2240        assert!(
2241            !dag_state.set_committed(&block.reference()),
2242            "Block should not be successfully set as committed"
2243        );
2244    }
2245
2246    #[tokio::test]
2247    async fn test_get_cached_blocks() {
2248        let (mut context, _) = Context::new_for_test(4);
2249        context.parameters.dag_state_cached_rounds = 5;
2250
2251        let context = Arc::new(context);
2252        let store = Arc::new(MemStore::new());
2253        let mut dag_state = DagState::new(context.clone(), store.clone());
2254
2255        // Create no blocks for authority 0
2256        // Create one block (round 10) for authority 1
2257        // Create two blocks (rounds 10,11) for authority 2
2258        // Create three blocks (rounds 10,11,12) for authority 3
2259        let mut all_blocks = Vec::new();
2260        for author in 1..=3 {
2261            for round in 10..(10 + author) {
2262                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2263                all_blocks.push(block.clone());
2264                dag_state.accept_block(block);
2265            }
2266        }
2267
2268        let cached_blocks =
2269            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2270        assert!(cached_blocks.is_empty());
2271
2272        let cached_blocks =
2273            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2274        assert_eq!(cached_blocks.len(), 1);
2275        assert_eq!(cached_blocks[0].round(), 10);
2276
2277        let cached_blocks =
2278            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2279        assert_eq!(cached_blocks.len(), 2);
2280        assert_eq!(cached_blocks[0].round(), 10);
2281        assert_eq!(cached_blocks[1].round(), 11);
2282
2283        let cached_blocks =
2284            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2285        assert_eq!(cached_blocks.len(), 1);
2286        assert_eq!(cached_blocks[0].round(), 11);
2287
2288        let cached_blocks =
2289            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2290        assert_eq!(cached_blocks.len(), 3);
2291        assert_eq!(cached_blocks[0].round(), 10);
2292        assert_eq!(cached_blocks[1].round(), 11);
2293        assert_eq!(cached_blocks[2].round(), 12);
2294
2295        let cached_blocks =
2296            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2297        assert_eq!(cached_blocks.len(), 1);
2298        assert_eq!(cached_blocks[0].round(), 12);
2299
2300        // Test get_cached_blocks_in_range()
2301
2302        // Start == end
2303        let cached_blocks = dag_state.get_cached_blocks_in_range(
2304            context.committee.to_authority_index(3).unwrap(),
2305            10,
2306            10,
2307            1,
2308        );
2309        assert!(cached_blocks.is_empty());
2310
2311        // Start > end
2312        let cached_blocks = dag_state.get_cached_blocks_in_range(
2313            context.committee.to_authority_index(3).unwrap(),
2314            11,
2315            10,
2316            1,
2317        );
2318        assert!(cached_blocks.is_empty());
2319
2320        // Empty result.
2321        let cached_blocks = dag_state.get_cached_blocks_in_range(
2322            context.committee.to_authority_index(0).unwrap(),
2323            9,
2324            10,
2325            1,
2326        );
2327        assert!(cached_blocks.is_empty());
2328
2329        // Single block, one round before the end.
2330        let cached_blocks = dag_state.get_cached_blocks_in_range(
2331            context.committee.to_authority_index(1).unwrap(),
2332            9,
2333            11,
2334            1,
2335        );
2336        assert_eq!(cached_blocks.len(), 1);
2337        assert_eq!(cached_blocks[0].round(), 10);
2338
2339        // Respect end round.
2340        let cached_blocks = dag_state.get_cached_blocks_in_range(
2341            context.committee.to_authority_index(2).unwrap(),
2342            9,
2343            12,
2344            5,
2345        );
2346        assert_eq!(cached_blocks.len(), 2);
2347        assert_eq!(cached_blocks[0].round(), 10);
2348        assert_eq!(cached_blocks[1].round(), 11);
2349
2350        // Respect start round.
2351        let cached_blocks = dag_state.get_cached_blocks_in_range(
2352            context.committee.to_authority_index(3).unwrap(),
2353            11,
2354            20,
2355            5,
2356        );
2357        assert_eq!(cached_blocks.len(), 2);
2358        assert_eq!(cached_blocks[0].round(), 11);
2359        assert_eq!(cached_blocks[1].round(), 12);
2360
2361        // Respect limit
2362        let cached_blocks = dag_state.get_cached_blocks_in_range(
2363            context.committee.to_authority_index(3).unwrap(),
2364            10,
2365            20,
2366            1,
2367        );
2368        assert_eq!(cached_blocks.len(), 1);
2369        assert_eq!(cached_blocks[0].round(), 10);
2370    }
2371
2372    #[rstest]
2373    #[tokio::test]
2374    async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2375        // GIVEN
2376        const CACHED_ROUNDS: Round = 2;
2377        let (mut context, _) = Context::new_for_test(4);
2378        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2379
2380        if gc_depth > 0 {
2381            context
2382                .protocol_config
2383                .set_consensus_gc_depth_for_testing(gc_depth);
2384        }
2385
2386        let context = Arc::new(context);
2387        let store = Arc::new(MemStore::new());
2388        let mut dag_state = DagState::new(context.clone(), store.clone());
2389
2390        // Create no blocks for authority 0
2391        // Create one block (round 1) for authority 1
2392        // Create two blocks (rounds 1,2) for authority 2
2393        // Create three blocks (rounds 1,2,3) for authority 3
2394        let dag_str = "DAG {
2395            Round 0 : { 4 },
2396            Round 1 : {
2397                B -> [*],
2398                C -> [*],
2399                D -> [*],
2400            },
2401            Round 2 : {
2402                C -> [*],
2403                D -> [*],
2404            },
2405            Round 3 : {
2406                D -> [*],
2407            },
2408        }";
2409
2410        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2411
2412        // Add equivocating block for round 2 authority 3
2413        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2414
2415        // Accept all blocks
2416        for block in dag_builder
2417            .all_blocks()
2418            .into_iter()
2419            .chain(std::iter::once(block))
2420        {
2421            dag_state.accept_block(block);
2422        }
2423
2424        dag_state.add_commit(TrustedCommit::new_for_test(
2425            1 as CommitIndex,
2426            CommitDigest::MIN,
2427            context.clock.timestamp_utc_ms(),
2428            dag_builder.leader_block(3).unwrap().reference(),
2429            vec![],
2430        ));
2431
2432        // WHEN search for the latest blocks
2433        let end_round = 4;
2434        let expected_rounds = vec![0, 1, 2, 3];
2435        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2436        // THEN
2437        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2438        assert_eq!(
2439            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2440            expected_rounds
2441        );
2442        assert_eq!(
2443            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2444            expected_excluded_and_equivocating_blocks
2445        );
2446
2447        // THEN
2448        for (i, expected_round) in expected_rounds.iter().enumerate() {
2449            let round = dag_state
2450                .get_last_cached_block_in_range(
2451                    context.committee.to_authority_index(i).unwrap(),
2452                    0,
2453                    end_round,
2454                )
2455                .map(|b| b.round())
2456                .unwrap_or_default();
2457            assert_eq!(round, *expected_round, "Authority {i}");
2458        }
2459
2460        // WHEN starting from round 2
2461        let start_round = 2;
2462        let expected_rounds = [0, 0, 2, 3];
2463
2464        // THEN
2465        for (i, expected_round) in expected_rounds.iter().enumerate() {
2466            let round = dag_state
2467                .get_last_cached_block_in_range(
2468                    context.committee.to_authority_index(i).unwrap(),
2469                    start_round,
2470                    end_round,
2471                )
2472                .map(|b| b.round())
2473                .unwrap_or_default();
2474            assert_eq!(round, *expected_round, "Authority {i}");
2475        }
2476
2477        // WHEN we flush the DagState - after adding a
2478        // commit with all the blocks, we expect this to trigger a clean up in
2479        // the internal cache. That will keep the all the blocks with rounds >=
2480        // authority_commit_round - CACHED_ROUND.
2481        //
2482        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and
2483        // for those who don't have blocks > gc_round, we'll keep
2484        // all their highest round blocks for CACHED_ROUNDS.
2485        dag_state.flush();
2486
2487        // AND we request before round 3
2488        let end_round = 3;
2489        let expected_rounds = vec![0, 1, 2, 2];
2490
2491        // THEN
2492        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2493        assert_eq!(
2494            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2495            expected_rounds
2496        );
2497
2498        // THEN
2499        for (i, expected_round) in expected_rounds.iter().enumerate() {
2500            let round = dag_state
2501                .get_last_cached_block_in_range(
2502                    context.committee.to_authority_index(i).unwrap(),
2503                    0,
2504                    end_round,
2505                )
2506                .map(|b| b.round())
2507                .unwrap_or_default();
2508            assert_eq!(round, *expected_round, "Authority {i}");
2509        }
2510    }
2511
2512    #[tokio::test]
2513    #[should_panic(
2514        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2515    )]
2516    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2517        // GIVEN
2518        const CACHED_ROUNDS: Round = 1;
2519        let (mut context, _) = Context::new_for_test(4);
2520        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2521        context
2522            .protocol_config
2523            .set_consensus_gc_depth_for_testing(0);
2524
2525        let context = Arc::new(context);
2526        let store = Arc::new(MemStore::new());
2527        let mut dag_state = DagState::new(context.clone(), store.clone());
2528
2529        // Create no blocks for authority 0
2530        // Create one block (round 1) for authority 1
2531        // Create two blocks (rounds 1,2) for authority 2
2532        // Create three blocks (rounds 1,2,3) for authority 3
2533        let mut all_blocks = Vec::new();
2534        for author in 1..=3 {
2535            for round in 1..=author {
2536                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2537                all_blocks.push(block.clone());
2538                dag_state.accept_block(block);
2539            }
2540        }
2541
2542        dag_state.add_commit(TrustedCommit::new_for_test(
2543            1 as CommitIndex,
2544            CommitDigest::MIN,
2545            0,
2546            all_blocks.last().unwrap().reference(),
2547            all_blocks
2548                .into_iter()
2549                .map(|block| block.reference())
2550                .collect::<Vec<_>>(),
2551        ));
2552
2553        // Flush the store so we keep in memory only the last 1 round from the last
2554        // commit for each authority.
2555        dag_state.flush();
2556
2557        // THEN the method should panic, as some authorities have already evicted rounds
2558        // <= round 2
2559        let end_round = 2;
2560        dag_state.get_last_cached_block_per_authority(end_round);
2561    }
2562
2563    #[tokio::test]
2564    #[should_panic(
2565        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2566    )]
2567    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
2568        // GIVEN
2569        const CACHED_ROUNDS: Round = 1;
2570        const GC_DEPTH: u32 = 1;
2571        let (mut context, _) = Context::new_for_test(4);
2572        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2573        context
2574            .protocol_config
2575            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2576
2577        let context = Arc::new(context);
2578        let store = Arc::new(MemStore::new());
2579        let mut dag_state = DagState::new(context.clone(), store.clone());
2580
2581        // Create no blocks for authority 0
2582        // Create one block (round 1) for authority 1
2583        // Create two blocks (rounds 1,2) for authority 2
2584        // Create three blocks (rounds 1,2,3) for authority 3
2585        let mut dag_builder = DagBuilder::new(context.clone());
2586        dag_builder
2587            .layers(1..=1)
2588            .authorities(vec![AuthorityIndex::new_for_test(0)])
2589            .skip_block()
2590            .build();
2591        dag_builder
2592            .layers(2..=2)
2593            .authorities(vec![
2594                AuthorityIndex::new_for_test(0),
2595                AuthorityIndex::new_for_test(1),
2596            ])
2597            .skip_block()
2598            .build();
2599        dag_builder
2600            .layers(3..=3)
2601            .authorities(vec![
2602                AuthorityIndex::new_for_test(0),
2603                AuthorityIndex::new_for_test(1),
2604                AuthorityIndex::new_for_test(2),
2605            ])
2606            .skip_block()
2607            .build();
2608
2609        // Accept all blocks
2610        for block in dag_builder.all_blocks() {
2611            dag_state.accept_block(block);
2612        }
2613
2614        dag_state.add_commit(TrustedCommit::new_for_test(
2615            1 as CommitIndex,
2616            CommitDigest::MIN,
2617            0,
2618            dag_builder.leader_block(3).unwrap().reference(),
2619            vec![],
2620        ));
2621
2622        // Flush the store so we update the evict rounds
2623        dag_state.flush();
2624
2625        // THEN the method should panic, as some authorities have already evicted rounds
2626        // <= round 2
2627        dag_state.get_last_cached_block_per_authority(2);
2628    }
2629
2630    #[tokio::test]
2631    async fn test_last_quorum() {
2632        // GIVEN
2633        let (context, _) = Context::new_for_test(4);
2634        let context = Arc::new(context);
2635        let store = Arc::new(MemStore::new());
2636        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2637
2638        // WHEN no blocks exist then genesis should be returned
2639        {
2640            let genesis = genesis_blocks(context.clone());
2641
2642            assert_eq!(dag_state.read().last_quorum(), genesis);
2643        }
2644
2645        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks
2646        // should be returned as quorum
2647        {
2648            let mut dag_builder = DagBuilder::new(context.clone());
2649            dag_builder
2650                .layers(1..=4)
2651                .build()
2652                .persist_layers(dag_state.clone());
2653            let round_4_blocks: Vec<_> = dag_builder
2654                .blocks(4..=4)
2655                .into_iter()
2656                .map(|block| block.reference())
2657                .collect();
2658
2659            let last_quorum = dag_state.read().last_quorum();
2660
2661            assert_eq!(
2662                last_quorum
2663                    .into_iter()
2664                    .map(|block| block.reference())
2665                    .collect::<Vec<_>>(),
2666                round_4_blocks
2667            );
2668        }
2669
2670        // WHEN adding one more block at round 5, still round 4 should be returned as
2671        // quorum
2672        {
2673            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2674            dag_state.write().accept_block(block);
2675
2676            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2677
2678            let last_quorum = dag_state.read().last_quorum();
2679
2680            assert_eq!(last_quorum, round_4_blocks);
2681        }
2682    }
2683
2684    #[tokio::test]
2685    async fn test_last_block_for_authority() {
2686        // GIVEN
2687        let (context, _) = Context::new_for_test(4);
2688        let context = Arc::new(context);
2689        let store = Arc::new(MemStore::new());
2690        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2691
2692        // WHEN no blocks exist then genesis should be returned
2693        {
2694            let genesis = genesis_blocks(context.clone());
2695            let my_genesis = genesis
2696                .into_iter()
2697                .find(|block| block.author() == context.own_index)
2698                .unwrap();
2699
2700            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2701        }
2702
2703        // WHEN adding some blocks for authorities, only the last ones should be
2704        // returned
2705        {
2706            // add blocks up to round 4
2707            let mut dag_builder = DagBuilder::new(context.clone());
2708            dag_builder
2709                .layers(1..=4)
2710                .build()
2711                .persist_layers(dag_state.clone());
2712
2713            // add block 5 for authority 0
2714            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2715            dag_state.write().accept_block(block);
2716
2717            let block = dag_state
2718                .read()
2719                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2720            assert_eq!(block.round(), 5);
2721
2722            for (authority_index, _) in context.committee.authorities() {
2723                let block = dag_state
2724                    .read()
2725                    .get_last_block_for_authority(authority_index);
2726
2727                if authority_index.value() == 0 {
2728                    assert_eq!(block.round(), 5);
2729                } else {
2730                    assert_eq!(block.round(), 4);
2731                }
2732            }
2733        }
2734    }
2735}