1use std::{
6    cmp::max,
7    collections::{BTreeMap, BTreeSet, VecDeque},
8    ops::Bound::{Excluded, Included, Unbounded},
9    panic,
10    sync::Arc,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info, trace};
18
19use crate::{
20    CommittedSubDag,
21    block::{
22        BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23        VerifiedBlock, genesis_blocks,
24    },
25    commit::{
26        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28    },
29    context::Context,
30    leader_scoring::{ReputationScores, ScoringSubdag},
31    storage::{Store, WriteBatch},
32    threshold_clock::ThresholdClock,
33};
34
35pub(crate) struct DagState {
44    context: Arc<Context>,
45
46    genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65    threshold_clock: ThresholdClock,
67
68    evicted_rounds: Vec<Round>,
73
74    highest_accepted_round: Round,
76
77    last_commit: Option<TrustedCommit>,
79
80    last_commit_round_advancement_time: Option<std::time::Instant>,
82
83    last_committed_rounds: Vec<Round>,
85
86    scoring_subdag: ScoringSubdag,
89    unscored_committed_subdags: Vec<CommittedSubDag>,
94
95    pending_commit_votes: VecDeque<CommitVote>,
98
99    blocks_to_write: Vec<VerifiedBlock>,
101    commits_to_write: Vec<TrustedCommit>,
102
103    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108    store: Arc<dyn Store>,
110
111    cached_rounds: Round,
113}
114
115impl DagState {
116    pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119        let num_authorities = context.committee.size();
120
121        let genesis = genesis_blocks(&context)
122            .into_iter()
123            .map(|block| (block.reference(), block))
124            .collect();
125
126        let threshold_clock = ThresholdClock::new(1, context.clone());
127
128        let last_commit = store
129            .read_last_commit()
130            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132        let commit_info = store
133            .read_last_commit_info()
134            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135        let (mut last_committed_rounds, commit_recovery_start_index) =
136            if let Some((commit_ref, commit_info)) = commit_info {
137                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138                (commit_info.committed_rounds, commit_ref.index + 1)
139            } else {
140                tracing::info!("Found no stored CommitInfo to recover from");
141                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142            };
143
144        let mut unscored_committed_subdags = Vec::new();
145        let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147        if let Some(last_commit) = last_commit.as_ref() {
148            store
149                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150                .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151                .iter()
152                .for_each(|commit| {
153                    for block_ref in commit.blocks() {
154                        last_committed_rounds[block_ref.author] =
155                            max(last_committed_rounds[block_ref.author], block_ref.round);
156                    }
157
158                    let committed_subdag =
159                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); unscored_committed_subdags.push(committed_subdag);
161                });
162        }
163
164        tracing::info!(
165            "DagState was initialized with the following state: \
166            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167            unscored_committed_subdags.len()
168        );
169
170        if context
171            .protocol_config
172            .consensus_distributed_vote_scoring_strategy()
173        {
174            scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175        }
176
177        let mut state = Self {
178            context,
179            genesis,
180            recent_blocks: BTreeMap::new(),
181            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182            threshold_clock,
183            highest_accepted_round: 0,
184            last_commit: last_commit.clone(),
185            last_commit_round_advancement_time: None,
186            last_committed_rounds: last_committed_rounds.clone(),
187            pending_commit_votes: VecDeque::new(),
188            blocks_to_write: vec![],
189            commits_to_write: vec![],
190            commit_info_to_write: vec![],
191            scoring_subdag,
192            unscored_committed_subdags,
193            store: store.clone(),
194            cached_rounds,
195            evicted_rounds: vec![0; num_authorities],
196        };
197
198        for (i, round) in last_committed_rounds.into_iter().enumerate() {
199            let authority_index = state.context.committee.to_authority_index(i).unwrap();
200            let (blocks, eviction_round) = if state.gc_enabled() {
201                let last_block = state
205                    .store
206                    .scan_last_blocks_by_author(authority_index, 1, None)
207                    .expect("Database error");
208                let last_block_round = last_block
209                    .last()
210                    .map(|b| b.round())
211                    .unwrap_or(GENESIS_ROUND);
212
213                let eviction_round = Self::gc_eviction_round(
214                    last_block_round,
215                    state.gc_round(),
216                    state.cached_rounds,
217                );
218                let blocks = state
219                    .store
220                    .scan_blocks_by_author(authority_index, eviction_round + 1)
221                    .expect("Database error");
222                (blocks, eviction_round)
223            } else {
224                let eviction_round = Self::eviction_round(round, cached_rounds);
225                let blocks = state
226                    .store
227                    .scan_blocks_by_author(authority_index, eviction_round + 1)
228                    .expect("Database error");
229                (blocks, eviction_round)
230            };
231
232            state.evicted_rounds[authority_index] = eviction_round;
233
234            for block in &blocks {
236                state.update_block_metadata(block);
237            }
238
239            info!(
240                "Recovered blocks {}: {:?}",
241                authority_index,
242                blocks
243                    .iter()
244                    .map(|b| b.reference())
245                    .collect::<Vec<BlockRef>>()
246            );
247        }
248
249        let recovered_scoring_metrics = state.store.scan_metrics().expect("Database error");
252        state.context.metrics.initialize_scoring_metrics(
253            recovered_scoring_metrics,
254            &state.recent_refs_by_authority,
255            state.threshold_clock_round(),
256            &state.evicted_rounds,
257            state.context.clone(),
258        );
259
260        if state.gc_enabled() {
261            if let Some(last_commit) = last_commit {
262                let mut index = last_commit.index();
263                let gc_round = state.gc_round();
264                info!(
265                    "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
266                    index, gc_round
267                );
268
269                loop {
270                    let commits = store
271                        .scan_commits((index..=index).into())
272                        .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
273                    let Some(commit) = commits.first() else {
274                        info!(
275                            "Recovering finished up to index {index}, no more commits to recover"
276                        );
277                        break;
278                    };
279
280                    if gc_round > 0 && commit.leader().round <= gc_round {
283                        info!(
284                            "Recovering finished, reached commit leader round {} <= gc_round {}",
285                            commit.leader().round,
286                            gc_round
287                        );
288                        break;
289                    }
290
291                    commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
292                        debug!(
293                            "Setting block {:?} as committed based on commit {:?}",
294                            block_ref,
295                            commit.index()
296                        );
297                        assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
298                    });
299
300                    index = index.saturating_sub(1);
302                    if index == 0 {
303                        break;
304                    }
305                }
306            }
307        }
308
309        state
310    }
311
312    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
314        assert_ne!(
315            block.round(),
316            0,
317            "Genesis block should not be accepted into DAG."
318        );
319
320        let block_ref = block.reference();
321        if self.contains_block(&block_ref) {
322            return;
323        }
324
325        let now = self.context.clock.timestamp_utc_ms();
326        if block.timestamp_ms() > now {
327            if self
328                .context
329                .protocol_config
330                .consensus_median_timestamp_with_checkpoint_enforcement()
331            {
332                trace!(
333                    "Block {:?} with timestamp {} is greater than local timestamp {}.",
334                    block,
335                    block.timestamp_ms(),
336                    now,
337                );
338            } else {
339                panic!(
340                    "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
341                    block,
342                    block.timestamp_ms(),
343                    now,
344                );
345            }
346        }
347        let hostname = &self.context.committee.authority(block_ref.author).hostname;
348        self.context
349            .metrics
350            .node_metrics
351            .accepted_block_time_drift_ms
352            .with_label_values(&[hostname])
353            .inc_by(block.timestamp_ms().saturating_sub(now));
354
355        if block_ref.author == self.context.own_index {
358            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
359            assert!(
360                existing_blocks.is_empty(),
361                "Block Rejected! Attempted to add block {block:#?} to own slot where \
362                block(s) {existing_blocks:#?} already exists."
363            );
364        }
365        self.update_block_metadata(&block);
366        self.blocks_to_write.push(block);
367        let source = if self.context.own_index == block_ref.author {
368            "own"
369        } else {
370            "others"
371        };
372        self.context
373            .metrics
374            .node_metrics
375            .accepted_blocks
376            .with_label_values(&[source])
377            .inc();
378    }
379
380    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
382        let block_ref = block.reference();
383        self.recent_blocks
384            .insert(block_ref, BlockInfo::new(block.clone()));
385        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
386        self.threshold_clock.add_block(block_ref);
387        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
388        self.context
389            .metrics
390            .node_metrics
391            .highest_accepted_round
392            .set(self.highest_accepted_round as i64);
393
394        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
395            .last()
396            .map(|block_ref| block_ref.round)
397            .expect("There should be by now at least one block ref");
398        let hostname = &self.context.committee.authority(block_ref.author).hostname;
399        self.context
400            .metrics
401            .node_metrics
402            .highest_accepted_authority_round
403            .with_label_values(&[hostname])
404            .set(highest_accepted_round_for_author as i64);
405    }
406
407    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
409        debug!(
410            "Accepting blocks: {}",
411            blocks.iter().map(|b| b.reference().to_string()).join(",")
412        );
413        for block in blocks {
414            self.accept_block(block);
415        }
416    }
417
418    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
421        self.get_blocks(&[*reference])
422            .pop()
423            .expect("Exactly one element should be returned")
424    }
425
426    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
430        let mut blocks = vec![None; block_refs.len()];
431        let mut missing = Vec::new();
432
433        for (index, block_ref) in block_refs.iter().enumerate() {
434            if block_ref.round == GENESIS_ROUND {
435                if let Some(block) = self.genesis.get(block_ref) {
437                    blocks[index] = Some(block.clone());
438                }
439                continue;
440            }
441            if let Some(block_info) = self.recent_blocks.get(block_ref) {
442                blocks[index] = Some(block_info.block.clone());
443                continue;
444            }
445            missing.push((index, block_ref));
446        }
447
448        if missing.is_empty() {
449            return blocks;
450        }
451
452        let missing_refs = missing
453            .iter()
454            .map(|(_, block_ref)| **block_ref)
455            .collect::<Vec<_>>();
456        let store_results = self
457            .store
458            .read_blocks(&missing_refs)
459            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
460        self.context
461            .metrics
462            .node_metrics
463            .dag_state_store_read_count
464            .with_label_values(&["get_blocks"])
465            .inc();
466
467        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
468            blocks[index] = result;
469        }
470
471        blocks
472    }
473
474    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
478        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
479            if !block_info.committed {
480                block_info.committed = true;
481                return true;
482            }
483            false
484        } else {
485            panic!("Block {block_ref:?} not found in cache to set as committed.");
486        }
487    }
488
489    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
490        self.recent_blocks
491            .get(block_ref)
492            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
493            .committed
494    }
495
496    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
500        let mut blocks = vec![];
505        for (_block_ref, block_info) in self.recent_blocks.range((
506            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
507            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
508        )) {
509            blocks.push(block_info.block.clone())
510        }
511        blocks
512    }
513
514    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
518        if round <= self.last_commit_round() {
519            panic!("Round {round} have committed blocks!");
520        }
521
522        let mut blocks = vec![];
523        for (_block_ref, block_info) in self.recent_blocks.range((
524            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
525            Excluded(BlockRef::new(
526                round + 1,
527                AuthorityIndex::ZERO,
528                BlockDigest::MIN,
529            )),
530        )) {
531            blocks.push(block_info.block.clone())
532        }
533        blocks
534    }
535
536    pub(crate) fn ancestors_at_round(
538        &self,
539        later_block: &VerifiedBlock,
540        earlier_round: Round,
541    ) -> Vec<VerifiedBlock> {
542        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
544        while !linked.is_empty() {
545            let round = linked.last().unwrap().round;
546            if round <= earlier_round {
548                break;
549            }
550            let block_ref = linked.pop_last().unwrap();
551            let Some(block) = self.get_block(&block_ref) else {
552                panic!("Block {block_ref:?} should exist in DAG!");
553            };
554            linked.extend(block.ancestors().iter().cloned());
555        }
556        linked
557            .range((
558                Included(BlockRef::new(
559                    earlier_round,
560                    AuthorityIndex::ZERO,
561                    BlockDigest::MIN,
562                )),
563                Unbounded,
564            ))
565            .map(|r| {
566                self.get_block(r)
567                    .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
568                    .clone()
569            })
570            .collect()
571    }
572
573    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
576        self.get_last_block_for_authority(self.context.own_index)
577    }
578
579    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
583        if let Some(last) = self.recent_refs_by_authority[authority].last() {
584            return self
585                .recent_blocks
586                .get(last)
587                .expect("Block should be found in recent blocks")
588                .block
589                .clone();
590        }
591
592        let (_, genesis_block) = self
594            .genesis
595            .iter()
596            .find(|(block_ref, _)| block_ref.author == authority)
597            .expect("Genesis should be found for authority {authority_index}");
598        genesis_block.clone()
599    }
600
601    pub(crate) fn get_cached_blocks(
607        &self,
608        authority: AuthorityIndex,
609        start: Round,
610    ) -> Vec<VerifiedBlock> {
611        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
612    }
613
614    pub(crate) fn get_cached_blocks_in_range(
617        &self,
618        authority: AuthorityIndex,
619        start_round: Round,
620        end_round: Round,
621        limit: usize,
622    ) -> Vec<VerifiedBlock> {
623        if start_round >= end_round || limit == 0 {
624            return vec![];
625        }
626
627        let mut blocks = vec![];
628        for block_ref in self.recent_refs_by_authority[authority].range((
629            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
630            Excluded(BlockRef::new(
631                end_round,
632                AuthorityIndex::MIN,
633                BlockDigest::MIN,
634            )),
635        )) {
636            let block_info = self
637                .recent_blocks
638                .get(block_ref)
639                .expect("Block should exist in recent blocks");
640            blocks.push(block_info.block.clone());
641            if blocks.len() >= limit {
642                break;
643            }
644        }
645        blocks
646    }
647
648    pub(crate) fn get_last_cached_block_in_range(
651        &self,
652        authority: AuthorityIndex,
653        start_round: Round,
654        end_round: Round,
655    ) -> Option<VerifiedBlock> {
656        if start_round >= end_round {
657            return None;
658        }
659
660        let block_ref = self.recent_refs_by_authority[authority]
661            .range((
662                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
663                Excluded(BlockRef::new(
664                    end_round,
665                    AuthorityIndex::MIN,
666                    BlockDigest::MIN,
667                )),
668            ))
669            .last()?;
670
671        self.recent_blocks
672            .get(block_ref)
673            .map(|block_info| block_info.block.clone())
674    }
675
676    pub(crate) fn get_last_cached_block_per_authority(
685        &self,
686        end_round: Round,
687    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
688        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
690        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
691
692        if end_round == GENESIS_ROUND {
693            panic!(
694                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
695            );
696        }
697
698        if end_round == GENESIS_ROUND + 1 {
699            return blocks.into_iter().map(|b| (b, vec![])).collect();
700        }
701
702        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
703            let authority_index = self
704                .context
705                .committee
706                .to_authority_index(authority_index)
707                .unwrap();
708
709            let last_evicted_round = self.evicted_rounds[authority_index];
710            if end_round.saturating_sub(1) <= last_evicted_round {
711                panic!(
712                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
713                );
714            }
715
716            let block_ref_iter = block_refs
717                .range((
718                    Included(BlockRef::new(
719                        last_evicted_round + 1,
720                        authority_index,
721                        BlockDigest::MIN,
722                    )),
723                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
724                ))
725                .rev();
726
727            let mut last_round = 0;
728            for block_ref in block_ref_iter {
729                if last_round == 0 {
730                    last_round = block_ref.round;
731                    let block_info = self
732                        .recent_blocks
733                        .get(block_ref)
734                        .expect("Block should exist in recent blocks");
735                    blocks[authority_index] = block_info.block.clone();
736                    continue;
737                }
738                if block_ref.round < last_round {
739                    break;
740                }
741                equivocating_blocks[authority_index].push(*block_ref);
742            }
743        }
744
745        blocks.into_iter().zip(equivocating_blocks).collect()
746    }
747
748    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
752        if slot.round == GENESIS_ROUND {
754            return true;
755        }
756
757        let eviction_round = self.evicted_rounds[slot.authority];
758        if slot.round <= eviction_round {
759            panic!(
760                "{}",
761                format!(
762                    "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
763                    if self.gc_enabled() { " gc " } else { " " }
764                )
765            );
766        }
767
768        let mut result = self.recent_refs_by_authority[slot.authority].range((
769            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
770            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
771        ));
772        result.next().is_some()
773    }
774
775    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
779        let mut exist = vec![false; block_refs.len()];
780        let mut missing = Vec::new();
781
782        for (index, block_ref) in block_refs.into_iter().enumerate() {
783            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
784            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
785                exist[index] = true;
786            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
787            {
788                exist[index] = false;
793            } else {
794                missing.push((index, block_ref));
795            }
796        }
797
798        if missing.is_empty() {
799            return exist;
800        }
801
802        let missing_refs = missing
803            .iter()
804            .map(|(_, block_ref)| *block_ref)
805            .collect::<Vec<_>>();
806        let store_results = self
807            .store
808            .contains_blocks(&missing_refs)
809            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
810        self.context
811            .metrics
812            .node_metrics
813            .dag_state_store_read_count
814            .with_label_values(&["contains_blocks"])
815            .inc();
816
817        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
818            exist[index] = result;
819        }
820
821        exist
822    }
823
824    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
825        let blocks = self.contains_blocks(vec![*block_ref]);
826        blocks.first().cloned().unwrap()
827    }
828
829    pub(crate) fn threshold_clock_round(&self) -> Round {
830        self.threshold_clock.get_round()
831    }
832
833    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
834        self.threshold_clock.get_quorum_ts()
835    }
836
837    pub(crate) fn highest_accepted_round(&self) -> Round {
838        self.highest_accepted_round
839    }
840
841    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
844        let time_diff = if let Some(last_commit) = &self.last_commit {
845            if commit.index() <= last_commit.index() {
846                error!(
847                    "New commit index {} <= last commit index {}!",
848                    commit.index(),
849                    last_commit.index()
850                );
851                return;
852            }
853            assert_eq!(commit.index(), last_commit.index() + 1);
854
855            if commit.timestamp_ms() < last_commit.timestamp_ms() {
856                panic!(
857                    "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
858                );
859            }
860            commit
861                .timestamp_ms()
862                .saturating_sub(last_commit.timestamp_ms())
863        } else {
864            assert_eq!(commit.index(), 1);
865            0
866        };
867
868        self.context
869            .metrics
870            .node_metrics
871            .last_commit_time_diff
872            .observe(time_diff as f64);
873
874        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
875            previous_commit.round() < commit.round()
876        } else {
877            true
878        };
879
880        self.last_commit = Some(commit.clone());
881
882        if commit_round_advanced {
883            let now = std::time::Instant::now();
884            if let Some(previous_time) = self.last_commit_round_advancement_time {
885                self.context
886                    .metrics
887                    .node_metrics
888                    .commit_round_advancement_interval
889                    .observe(now.duration_since(previous_time).as_secs_f64())
890            }
891            self.last_commit_round_advancement_time = Some(now);
892        }
893
894        for block_ref in commit.blocks().iter() {
895            self.last_committed_rounds[block_ref.author] = max(
896                self.last_committed_rounds[block_ref.author],
897                block_ref.round,
898            );
899        }
900
901        for (i, round) in self.last_committed_rounds.iter().enumerate() {
902            let index = self.context.committee.to_authority_index(i).unwrap();
903            let hostname = &self.context.committee.authority(index).hostname;
904            self.context
905                .metrics
906                .node_metrics
907                .last_committed_authority_round
908                .with_label_values(&[hostname])
909                .set((*round).into());
910        }
911
912        self.pending_commit_votes.push_back(commit.reference());
913        self.commits_to_write.push(commit);
914    }
915
916    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
917        assert!(self.unscored_committed_subdags.is_empty());
919
920        assert!(self.scoring_subdag.is_empty());
924
925        let commit_info = CommitInfo {
926            committed_rounds: self.last_committed_rounds.clone(),
927            reputation_scores,
928        };
929        let last_commit = self
930            .last_commit
931            .as_ref()
932            .expect("Last commit should already be set.");
933        self.commit_info_to_write
934            .push((last_commit.reference(), commit_info));
935    }
936
937    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
938        let mut votes = Vec::new();
939        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
940            votes.push(self.pending_commit_votes.pop_front().unwrap());
941        }
942        votes
943    }
944
945    pub(crate) fn last_commit_index(&self) -> CommitIndex {
947        match &self.last_commit {
948            Some(commit) => commit.index(),
949            None => 0,
950        }
951    }
952
953    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
955        match &self.last_commit {
956            Some(commit) => commit.digest(),
957            None => CommitDigest::MIN,
958        }
959    }
960
961    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
963        match &self.last_commit {
964            Some(commit) => commit.timestamp_ms(),
965            None => 0,
966        }
967    }
968
969    pub(crate) fn last_commit_leader(&self) -> Slot {
971        match &self.last_commit {
972            Some(commit) => commit.leader().into(),
973            None => self
974                .genesis
975                .iter()
976                .next()
977                .map(|(genesis_ref, _)| *genesis_ref)
978                .expect("Genesis blocks should always be available.")
979                .into(),
980        }
981    }
982
983    pub(crate) fn last_commit_round(&self) -> Round {
986        match &self.last_commit {
987            Some(commit) => commit.leader().round,
988            None => 0,
989        }
990    }
991
992    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
994        self.last_committed_rounds.clone()
995    }
996
997    pub(crate) fn gc_round(&self) -> Round {
1004        self.calculate_gc_round(self.last_commit_round())
1005    }
1006
1007    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1008        let gc_depth = self.context.protocol_config.gc_depth();
1009        if gc_depth > 0 {
1010            commit_round.saturating_sub(gc_depth)
1012        } else {
1013            GENESIS_ROUND
1016        }
1017    }
1018
1019    pub(crate) fn gc_enabled(&self) -> bool {
1020        self.context.protocol_config.gc_depth() > 0
1021    }
1022
1023    pub(crate) fn flush(&mut self) {
1026        let _s = self
1027            .context
1028            .metrics
1029            .node_metrics
1030            .scope_processing_time
1031            .with_label_values(&["DagState::flush"])
1032            .start_timer();
1033        let blocks = std::mem::take(&mut self.blocks_to_write);
1035        let commits = std::mem::take(&mut self.commits_to_write);
1036        let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
1037
1038        if blocks.is_empty() && commits.is_empty() {
1039            return;
1040        }
1041        debug!(
1042            "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
1043            blocks.len(),
1044            blocks.iter().map(|b| b.reference().to_string()).join(","),
1045            commits.len(),
1046            commits.iter().map(|c| c.reference().to_string()).join(","),
1047            commit_info_to_write.len(),
1048            commit_info_to_write
1049                .iter()
1050                .map(|(commit_ref, _)| commit_ref.to_string())
1051                .join(","),
1052        );
1053
1054        let mut metrics_to_write = vec![];
1056        let threshold_clock_round = self.threshold_clock_round();
1057        for (authority_index, authority) in self.context.committee.authorities() {
1058            let last_eviction_round = self.evicted_rounds[authority_index];
1059            let current_eviction_round = self.calculate_authority_eviction_round(authority_index);
1060            let metrics_to_write_from_authority =
1061                self.context.metrics.update_scoring_metrics_on_eviction(
1062                    authority_index,
1063                    authority.hostname.as_str(),
1064                    &self.recent_refs_by_authority[authority_index],
1065                    current_eviction_round,
1066                    last_eviction_round,
1067                    threshold_clock_round,
1068                );
1069            if let Some(metrics_to_write_from_authority) = metrics_to_write_from_authority {
1070                metrics_to_write.push((authority_index, metrics_to_write_from_authority));
1071            }
1072        }
1073
1074        self.store
1075            .write(WriteBatch::new(
1076                blocks,
1077                commits,
1078                commit_info_to_write,
1079                metrics_to_write,
1080            ))
1081            .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
1082        self.context
1083            .metrics
1084            .node_metrics
1085            .dag_state_store_write_count
1086            .inc();
1087
1088        for (authority_index, _) in self.context.committee.authorities() {
1092            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1093            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1094                let block_round = block_ref.round;
1095                if block_round <= eviction_round {
1096                    self.recent_blocks.remove(block_ref);
1097                    self.recent_refs_by_authority[authority_index].pop_first();
1098                } else {
1099                    break;
1100                }
1101            }
1102            self.evicted_rounds[authority_index] = eviction_round;
1103        }
1104
1105        let metrics = &self.context.metrics.node_metrics;
1106        metrics
1107            .dag_state_recent_blocks
1108            .set(self.recent_blocks.len() as i64);
1109        metrics.dag_state_recent_refs.set(
1110            self.recent_refs_by_authority
1111                .iter()
1112                .map(BTreeSet::len)
1113                .sum::<usize>() as i64,
1114        );
1115    }
1116
1117    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1118        self.store
1119            .read_last_commit_info()
1120            .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1121    }
1122
1123    pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1125        self.unscored_committed_subdags.len() as u64
1126    }
1127
1128    #[cfg(test)]
1129    pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1130        self.unscored_committed_subdags.clone()
1131    }
1132
1133    pub(crate) fn add_unscored_committed_subdags(
1134        &mut self,
1135        committed_subdags: Vec<CommittedSubDag>,
1136    ) {
1137        self.unscored_committed_subdags.extend(committed_subdags);
1138    }
1139
1140    pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1141        std::mem::take(&mut self.unscored_committed_subdags)
1142    }
1143
1144    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1145        self.scoring_subdag.add_subdags(scoring_subdags);
1146    }
1147
1148    pub(crate) fn clear_scoring_subdag(&mut self) {
1149        self.scoring_subdag.clear();
1150    }
1151
1152    pub(crate) fn scoring_subdags_count(&self) -> usize {
1153        self.scoring_subdag.scored_subdags_count()
1154    }
1155
1156    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1157        self.scoring_subdag.is_empty()
1158    }
1159
1160    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1161        self.scoring_subdag.calculate_distributed_vote_scores()
1162    }
1163
1164    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1165        self.scoring_subdag
1166            .commit_range
1167            .as_ref()
1168            .expect("commit range should exist for scoring subdag")
1169            .end()
1170    }
1171
1172    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1177        if self.gc_enabled() {
1178            let last_round = self.recent_refs_by_authority[authority_index]
1179                .last()
1180                .map(|block_ref| block_ref.round)
1181                .unwrap_or(GENESIS_ROUND);
1182
1183            Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1184        } else {
1185            let commit_round = self.last_committed_rounds[authority_index];
1186            Self::eviction_round(commit_round, self.cached_rounds)
1187        }
1188    }
1189
1190    fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1193        commit_round.saturating_sub(cached_rounds)
1194    }
1195
1196    fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1201        gc_round.min(last_round.saturating_sub(cached_rounds))
1202    }
1203
1204    #[cfg(test)]
1207    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1208        for round in
1212            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1213        {
1214            if round == GENESIS_ROUND {
1215                return self.genesis_blocks();
1216            }
1217            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1218            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1219
1220            let blocks = self.get_uncommitted_blocks_at_round(round);
1223            for block in &blocks {
1224                if quorum.add(block.author(), &self.context.committee) {
1225                    return blocks;
1226                }
1227            }
1228        }
1229
1230        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1231    }
1232
1233    #[cfg(test)]
1234    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1235        self.genesis.values().cloned().collect()
1236    }
1237
1238    #[cfg(test)]
1239    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1240        self.last_commit = Some(commit);
1241    }
1242}
1243
1244struct BlockInfo {
1245    block: VerifiedBlock,
1246    committed: bool,
1248}
1249
1250impl BlockInfo {
1251    fn new(block: VerifiedBlock) -> Self {
1252        Self {
1253            block,
1254            committed: false,
1255        }
1256    }
1257}
1258
1259#[cfg(test)]
1260mod test {
1261    use std::vec;
1262
1263    use parking_lot::RwLock;
1264    use rstest::rstest;
1265
1266    use super::*;
1267    use crate::{
1268        block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1269        storage::{WriteBatch, mem_store::MemStore},
1270        test_dag_builder::DagBuilder,
1271        test_dag_parser::parse_dag,
1272    };
1273
1274    #[tokio::test]
1275    async fn test_get_blocks() {
1276        let (context, _) = Context::new_for_test(4);
1277        let context = Arc::new(context);
1278        let store = Arc::new(MemStore::new());
1279        let mut dag_state = DagState::new(context.clone(), store.clone());
1280        let own_index = AuthorityIndex::new_for_test(0);
1281
1282        let num_rounds: u32 = 10;
1284        let non_existent_round: u32 = 100;
1285        let num_authorities: u32 = 3;
1286        let num_blocks_per_slot: usize = 3;
1287        let mut blocks = BTreeMap::new();
1288        for round in 1..=num_rounds {
1289            for author in 0..num_authorities {
1290                let base_ts = round as BlockTimestampMs * 1000;
1292                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1293                    let block = VerifiedBlock::new_for_test(
1294                        TestBlock::new(round, author)
1295                            .set_timestamp_ms(timestamp)
1296                            .build(),
1297                    );
1298                    dag_state.accept_block(block.clone());
1299                    blocks.insert(block.reference(), block);
1300
1301                    if AuthorityIndex::new_for_test(author) == own_index {
1303                        break;
1304                    }
1305                }
1306            }
1307        }
1308
1309        for (r, block) in &blocks {
1311            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1312        }
1313
1314        let last_ref = blocks.keys().last().unwrap();
1316        assert!(
1317            dag_state
1318                .get_block(&BlockRef::new(
1319                    last_ref.round,
1320                    last_ref.author,
1321                    BlockDigest::MIN
1322                ))
1323                .is_none()
1324        );
1325
1326        for round in 1..=num_rounds {
1328            for author in 0..num_authorities {
1329                let slot = Slot::new(
1330                    round,
1331                    context
1332                        .committee
1333                        .to_authority_index(author as usize)
1334                        .unwrap(),
1335                );
1336                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1337
1338                if AuthorityIndex::new_for_test(author) == own_index {
1340                    assert_eq!(blocks.len(), 1);
1341                } else {
1342                    assert_eq!(blocks.len(), num_blocks_per_slot);
1343                }
1344
1345                for b in blocks {
1346                    assert_eq!(b.round(), round);
1347                    assert_eq!(
1348                        b.author(),
1349                        context
1350                            .committee
1351                            .to_authority_index(author as usize)
1352                            .unwrap()
1353                    );
1354                }
1355            }
1356        }
1357
1358        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1360        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1361
1362        for round in 1..=num_rounds {
1364            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1365            assert_eq!(
1368                blocks.len(),
1369                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1370            );
1371            for b in blocks {
1372                assert_eq!(b.round(), round);
1373            }
1374        }
1375
1376        assert!(
1378            dag_state
1379                .get_uncommitted_blocks_at_round(non_existent_round)
1380                .is_empty()
1381        );
1382    }
1383
1384    #[tokio::test]
1385    async fn test_ancestors_at_uncommitted_round() {
1386        let (context, _) = Context::new_for_test(4);
1388        let context = Arc::new(context);
1389        let store = Arc::new(MemStore::new());
1390        let mut dag_state = DagState::new(context.clone(), store.clone());
1391
1392        let round_10_refs: Vec<_> = (0..4)
1396            .map(|a| {
1397                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1398                    .reference()
1399            })
1400            .collect();
1401
1402        let round_11 = vec![
1404            VerifiedBlock::new_for_test(
1406                TestBlock::new(11, 0)
1407                    .set_timestamp_ms(1100)
1408                    .set_ancestors(round_10_refs.clone())
1409                    .build(),
1410            ),
1411            VerifiedBlock::new_for_test(
1414                TestBlock::new(11, 1)
1415                    .set_timestamp_ms(1110)
1416                    .set_ancestors(round_10_refs.clone())
1417                    .build(),
1418            ),
1419            VerifiedBlock::new_for_test(
1421                TestBlock::new(11, 1)
1422                    .set_timestamp_ms(1111)
1423                    .set_ancestors(round_10_refs.clone())
1424                    .build(),
1425            ),
1426            VerifiedBlock::new_for_test(
1428                TestBlock::new(11, 1)
1429                    .set_timestamp_ms(1112)
1430                    .set_ancestors(round_10_refs.clone())
1431                    .build(),
1432            ),
1433            VerifiedBlock::new_for_test(
1435                TestBlock::new(11, 2)
1436                    .set_timestamp_ms(1120)
1437                    .set_ancestors(round_10_refs.clone())
1438                    .build(),
1439            ),
1440            VerifiedBlock::new_for_test(
1442                TestBlock::new(11, 3)
1443                    .set_timestamp_ms(1130)
1444                    .set_ancestors(round_10_refs.clone())
1445                    .build(),
1446            ),
1447        ];
1448
1449        let ancestors_for_round_12 = vec![
1451            round_11[0].reference(),
1452            round_11[1].reference(),
1453            round_11[5].reference(),
1454        ];
1455        let round_12 = vec![
1456            VerifiedBlock::new_for_test(
1457                TestBlock::new(12, 0)
1458                    .set_timestamp_ms(1200)
1459                    .set_ancestors(ancestors_for_round_12.clone())
1460                    .build(),
1461            ),
1462            VerifiedBlock::new_for_test(
1463                TestBlock::new(12, 2)
1464                    .set_timestamp_ms(1220)
1465                    .set_ancestors(ancestors_for_round_12.clone())
1466                    .build(),
1467            ),
1468            VerifiedBlock::new_for_test(
1469                TestBlock::new(12, 3)
1470                    .set_timestamp_ms(1230)
1471                    .set_ancestors(ancestors_for_round_12.clone())
1472                    .build(),
1473            ),
1474        ];
1475
1476        let ancestors_for_round_13 = vec![
1478            round_12[0].reference(),
1479            round_12[1].reference(),
1480            round_12[2].reference(),
1481            round_11[2].reference(),
1482        ];
1483        let round_13 = vec![
1484            VerifiedBlock::new_for_test(
1485                TestBlock::new(12, 1)
1486                    .set_timestamp_ms(1300)
1487                    .set_ancestors(ancestors_for_round_13.clone())
1488                    .build(),
1489            ),
1490            VerifiedBlock::new_for_test(
1491                TestBlock::new(12, 2)
1492                    .set_timestamp_ms(1320)
1493                    .set_ancestors(ancestors_for_round_13.clone())
1494                    .build(),
1495            ),
1496            VerifiedBlock::new_for_test(
1497                TestBlock::new(12, 3)
1498                    .set_timestamp_ms(1330)
1499                    .set_ancestors(ancestors_for_round_13.clone())
1500                    .build(),
1501            ),
1502        ];
1503
1504        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1506        let anchor = VerifiedBlock::new_for_test(
1507            TestBlock::new(14, 1)
1508                .set_timestamp_ms(1410)
1509                .set_ancestors(ancestors_for_round_14)
1510                .build(),
1511        );
1512
1513        for b in round_11
1515            .iter()
1516            .chain(round_12.iter())
1517            .chain(round_13.iter())
1518            .chain([anchor.clone()].iter())
1519        {
1520            dag_state.accept_block(b.clone());
1521        }
1522
1523        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1525        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1526        ancestors_refs.sort();
1527        let mut expected_refs = vec![
1528            round_11[0].reference(),
1529            round_11[1].reference(),
1530            round_11[2].reference(),
1531            round_11[5].reference(),
1532        ];
1533        expected_refs.sort(); assert_eq!(
1536            ancestors_refs, expected_refs,
1537            "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1538        );
1539    }
1540
1541    #[tokio::test]
1542    async fn test_contains_blocks_in_cache_or_store() {
1543        const CACHED_ROUNDS: Round = 2;
1545
1546        let (mut context, _) = Context::new_for_test(4);
1547        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1548
1549        let context = Arc::new(context);
1550        let store = Arc::new(MemStore::new());
1551        let mut dag_state = DagState::new(context.clone(), store.clone());
1552
1553        let num_rounds: u32 = 10;
1555        let num_authorities: u32 = 4;
1556        let mut blocks = Vec::new();
1557
1558        for round in 1..=num_rounds {
1559            for author in 0..num_authorities {
1560                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1561                blocks.push(block);
1562            }
1563        }
1564
1565        blocks.clone().into_iter().for_each(|block| {
1568            if block.round() <= 4 {
1569                store
1570                    .write(WriteBatch::default().blocks(vec![block]))
1571                    .unwrap();
1572            } else {
1573                dag_state.accept_blocks(vec![block]);
1574            }
1575        });
1576
1577        let mut block_refs = blocks
1581            .iter()
1582            .map(|block| block.reference())
1583            .collect::<Vec<_>>();
1584        let result = dag_state.contains_blocks(block_refs.clone());
1585
1586        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1588        assert_eq!(result, expected);
1589
1590        block_refs.insert(
1592            3,
1593            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1594        );
1595        let result = dag_state.contains_blocks(block_refs.clone());
1596
1597        expected.insert(3, false);
1599        assert_eq!(result, expected.clone());
1600    }
1601
1602    #[tokio::test]
1603    async fn test_contains_cached_block_at_slot() {
1604        const CACHED_ROUNDS: Round = 2;
1606
1607        let num_authorities: u32 = 4;
1608        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1609        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1610
1611        let context = Arc::new(context);
1612        let store = Arc::new(MemStore::new());
1613        let mut dag_state = DagState::new(context.clone(), store.clone());
1614
1615        let num_rounds: u32 = 10;
1617        let mut blocks = Vec::new();
1618
1619        for round in 1..=num_rounds {
1620            for author in 0..num_authorities {
1621                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1622                blocks.push(block.clone());
1623                dag_state.accept_block(block);
1624            }
1625        }
1626
1627        for (author, _) in context.committee.authorities() {
1629            assert!(
1630                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1631                "Genesis should always be found"
1632            );
1633        }
1634
1635        let mut block_refs = blocks
1639            .iter()
1640            .map(|block| block.reference())
1641            .collect::<Vec<_>>();
1642
1643        for block_ref in block_refs.clone() {
1644            let slot = block_ref.into();
1645            let found = dag_state.contains_cached_block_at_slot(slot);
1646            assert!(found, "A block should be found at slot {slot}");
1647        }
1648
1649        block_refs.insert(
1652            3,
1653            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1654        );
1655        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1656        expected.insert(3, false);
1657
1658        for block_ref in block_refs {
1660            let slot = block_ref.into();
1661            let found = dag_state.contains_cached_block_at_slot(slot);
1662
1663            assert_eq!(expected.remove(0), found);
1664        }
1665    }
1666
1667    #[tokio::test]
1668    #[should_panic(
1669        expected = "Attempted to check for slot S8[0] that is <= the last gc evicted round 8"
1670    )]
1671    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1672        const CACHED_ROUNDS: Round = 2;
1674        const GC_DEPTH: u32 = 1;
1675        let (mut context, _) = Context::new_for_test(4);
1676        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1677        context
1678            .protocol_config
1679            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1680
1681        let context = Arc::new(context);
1682        let store = Arc::new(MemStore::new());
1683        let mut dag_state = DagState::new(context.clone(), store.clone());
1684
1685        let mut blocks = Vec::new();
1687        for round in 1..=10 {
1688            let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1689            blocks.push(block.clone());
1690            dag_state.accept_block(block);
1691        }
1692
1693        dag_state.add_commit(TrustedCommit::new_for_test(
1695            1 as CommitIndex,
1696            CommitDigest::MIN,
1697            0,
1698            blocks.last().unwrap().reference(),
1699            blocks
1700                .into_iter()
1701                .map(|block| block.reference())
1702                .collect::<Vec<_>>(),
1703        ));
1704
1705        dag_state.flush();
1706
1707        let _ =
1711            dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1712    }
1713
1714    #[tokio::test]
1715    #[should_panic(
1716        expected = "Attempted to check for slot S3[1] that is <= the last gc evicted round 3"
1717    )]
1718    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1719        const GC_DEPTH: u32 = 2;
1723        const CACHED_ROUNDS: Round = 3;
1725
1726        let (mut context, _) = Context::new_for_test(4);
1727        context
1728            .protocol_config
1729            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1730        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1731
1732        let context = Arc::new(context);
1733        let store = Arc::new(MemStore::new());
1734        let mut dag_state = DagState::new(context.clone(), store.clone());
1735
1736        let mut dag_builder = DagBuilder::new(context.clone());
1739        dag_builder.layers(1..=3).build();
1740        dag_builder
1741            .layers(4..=6)
1742            .authorities(vec![AuthorityIndex::new_for_test(0)])
1743            .skip_block()
1744            .build();
1745
1746        dag_builder
1748            .all_blocks()
1749            .into_iter()
1750            .for_each(|block| dag_state.accept_block(block));
1751
1752        dag_state.add_commit(TrustedCommit::new_for_test(
1754            1 as CommitIndex,
1755            CommitDigest::MIN,
1756            0,
1757            dag_builder.leader_block(5).unwrap().reference(),
1758            vec![],
1759        ));
1760
1761        dag_state.flush();
1762
1763        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1765
1766        for authority_index in 1..=3 {
1771            for round in 4..=6 {
1772                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1773                    round,
1774                    AuthorityIndex::new_for_test(authority_index)
1775                )));
1776            }
1777        }
1778
1779        for round in 1..=3 {
1780            assert!(
1781                dag_state.contains_cached_block_at_slot(Slot::new(
1782                    round,
1783                    AuthorityIndex::new_for_test(0)
1784                ))
1785            );
1786        }
1787
1788        let _ =
1791            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1792    }
1793
1794    #[tokio::test]
1795    async fn test_get_blocks_in_cache_or_store() {
1796        let (context, _) = Context::new_for_test(4);
1797        let context = Arc::new(context);
1798        let store = Arc::new(MemStore::new());
1799        let mut dag_state = DagState::new(context.clone(), store.clone());
1800
1801        let num_rounds: u32 = 10;
1803        let num_authorities: u32 = 4;
1804        let mut blocks = Vec::new();
1805
1806        for round in 1..=num_rounds {
1807            for author in 0..num_authorities {
1808                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1809                blocks.push(block);
1810            }
1811        }
1812
1813        blocks.clone().into_iter().for_each(|block| {
1816            if block.round() <= 4 {
1817                store
1818                    .write(WriteBatch::default().blocks(vec![block]))
1819                    .unwrap();
1820            } else {
1821                dag_state.accept_blocks(vec![block]);
1822            }
1823        });
1824
1825        let mut block_refs = blocks
1829            .iter()
1830            .map(|block| block.reference())
1831            .collect::<Vec<_>>();
1832        let result = dag_state.get_blocks(&block_refs);
1833
1834        let mut expected = blocks
1835            .into_iter()
1836            .map(Some)
1837            .collect::<Vec<Option<VerifiedBlock>>>();
1838
1839        assert_eq!(result, expected.clone());
1841
1842        block_refs.insert(
1844            3,
1845            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1846        );
1847        let result = dag_state.get_blocks(&block_refs);
1848
1849        expected.insert(3, None);
1851        assert_eq!(result, expected);
1852    }
1853
1854    #[rstest]
1856    #[tokio::test]
1857    async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1858        telemetry_subscribers::init_for_testing();
1859        let num_authorities: u32 = 4;
1860        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1861        context
1862            .protocol_config
1863            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1864
1865        if gc_depth > 0 {
1866            context
1867                .protocol_config
1868                .set_consensus_gc_depth_for_testing(gc_depth);
1869        }
1870
1871        let context = Arc::new(context);
1872        let store = Arc::new(MemStore::new());
1873        let mut dag_state = DagState::new(context.clone(), store.clone());
1874
1875        let num_rounds: u32 = 10;
1877        let mut dag_builder = DagBuilder::new(context.clone());
1878        dag_builder.layers(1..=num_rounds).build();
1879        let mut commits = vec![];
1880
1881        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1882            commits.push(commit);
1883        }
1884
1885        let temp_commits = commits.split_off(5);
1887        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1888        for commit in commits.clone() {
1889            dag_state.add_commit(commit);
1890        }
1891
1892        dag_state.flush();
1894
1895        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1897        for commit in temp_commits.clone() {
1898            dag_state.add_commit(commit);
1899        }
1900
1901        let all_blocks = dag_builder.blocks(6..=num_rounds);
1903        let block_refs = all_blocks
1904            .iter()
1905            .map(|block| block.reference())
1906            .collect::<Vec<_>>();
1907
1908        let result = dag_state
1909            .get_blocks(&block_refs)
1910            .into_iter()
1911            .map(|b| b.unwrap())
1912            .collect::<Vec<_>>();
1913        assert_eq!(result, all_blocks);
1914
1915        assert_eq!(dag_state.last_commit_index(), 10);
1917        assert_eq!(
1918            dag_state.last_committed_rounds(),
1919            dag_builder.last_committed_rounds.clone()
1920        );
1921
1922        drop(dag_state);
1924
1925        let dag_state = DagState::new(context.clone(), store.clone());
1927
1928        let blocks = dag_builder.blocks(1..=5);
1930        let block_refs = blocks
1931            .iter()
1932            .map(|block| block.reference())
1933            .collect::<Vec<_>>();
1934        let result = dag_state
1935            .get_blocks(&block_refs)
1936            .into_iter()
1937            .map(|b| b.unwrap())
1938            .collect::<Vec<_>>();
1939        assert_eq!(result, blocks);
1940
1941        let missing_blocks = dag_builder.blocks(6..=num_rounds);
1943        let block_refs = missing_blocks
1944            .iter()
1945            .map(|block| block.reference())
1946            .collect::<Vec<_>>();
1947        let retrieved_blocks = dag_state
1948            .get_blocks(&block_refs)
1949            .into_iter()
1950            .flatten()
1951            .collect::<Vec<_>>();
1952        assert!(retrieved_blocks.is_empty());
1953
1954        assert_eq!(dag_state.last_commit_index(), 5);
1956
1957        let expected_last_committed_rounds = vec![4, 5, 4, 4];
1959        assert_eq!(
1960            dag_state.last_committed_rounds(),
1961            expected_last_committed_rounds
1962        );
1963
1964        assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1967    }
1968
1969    #[tokio::test]
1970    async fn test_flush_and_recovery() {
1971        telemetry_subscribers::init_for_testing();
1972        let num_authorities: u32 = 4;
1973        let (context, _) = Context::new_for_test(num_authorities as usize);
1974        let context = Arc::new(context);
1975        let store = Arc::new(MemStore::new());
1976        let mut dag_state = DagState::new(context.clone(), store.clone());
1977
1978        let num_rounds: u32 = 10;
1980        let mut dag_builder = DagBuilder::new(context.clone());
1981        dag_builder.layers(1..=num_rounds).build();
1982        let mut commits = vec![];
1983        for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1984            commits.push(commit);
1985        }
1986
1987        let temp_commits = commits.split_off(5);
1989        dag_state.accept_blocks(dag_builder.blocks(1..=5));
1990        for commit in commits.clone() {
1991            dag_state.add_commit(commit);
1992        }
1993
1994        dag_state.flush();
1996
1997        dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1999        for commit in temp_commits.clone() {
2000            dag_state.add_commit(commit);
2001        }
2002
2003        let all_blocks = dag_builder.blocks(6..=num_rounds);
2005        let block_refs = all_blocks
2006            .iter()
2007            .map(|block| block.reference())
2008            .collect::<Vec<_>>();
2009        let result = dag_state
2010            .get_blocks(&block_refs)
2011            .into_iter()
2012            .map(|b| b.unwrap())
2013            .collect::<Vec<_>>();
2014        assert_eq!(result, all_blocks);
2015
2016        assert_eq!(dag_state.last_commit_index(), 10);
2018        assert_eq!(
2019            dag_state.last_committed_rounds(),
2020            dag_builder.last_committed_rounds.clone()
2021        );
2022
2023        drop(dag_state);
2025
2026        let dag_state = DagState::new(context.clone(), store.clone());
2028
2029        let blocks = dag_builder.blocks(1..=5);
2031        let block_refs = blocks
2032            .iter()
2033            .map(|block| block.reference())
2034            .collect::<Vec<_>>();
2035        let result = dag_state
2036            .get_blocks(&block_refs)
2037            .into_iter()
2038            .map(|b| b.unwrap())
2039            .collect::<Vec<_>>();
2040        assert_eq!(result, blocks);
2041
2042        let missing_blocks = dag_builder.blocks(6..=num_rounds);
2044        let block_refs = missing_blocks
2045            .iter()
2046            .map(|block| block.reference())
2047            .collect::<Vec<_>>();
2048        let retrieved_blocks = dag_state
2049            .get_blocks(&block_refs)
2050            .into_iter()
2051            .flatten()
2052            .collect::<Vec<_>>();
2053        assert!(retrieved_blocks.is_empty());
2054
2055        assert_eq!(dag_state.last_commit_index(), 5);
2057
2058        let expected_last_committed_rounds = vec![4, 5, 4, 4];
2060        assert_eq!(
2061            dag_state.last_committed_rounds(),
2062            expected_last_committed_rounds
2063        );
2064        assert_eq!(dag_state.scoring_subdags_count(), 5);
2067    }
2068
2069    #[tokio::test]
2070    async fn test_flush_and_recovery_gc_enabled() {
2071        telemetry_subscribers::init_for_testing();
2072
2073        const GC_DEPTH: u32 = 3;
2074        const CACHED_ROUNDS: u32 = 4;
2075
2076        let num_authorities: u32 = 4;
2077        let (mut context, _) = Context::new_for_test(num_authorities as usize);
2078        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2079        context
2080            .protocol_config
2081            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2082        context
2083            .protocol_config
2084            .set_consensus_linearize_subdag_v2_for_testing(true);
2085
2086        let context = Arc::new(context);
2087
2088        let store = Arc::new(MemStore::new());
2089        let mut dag_state = DagState::new(context.clone(), store.clone());
2090
2091        let num_rounds: u32 = 10;
2092        let mut dag_builder = DagBuilder::new(context.clone());
2093        dag_builder.layers(1..=5).build();
2094        dag_builder
2095            .layers(6..=8)
2096            .authorities(vec![AuthorityIndex::new_for_test(0)])
2097            .skip_block()
2098            .build();
2099        dag_builder.layers(9..=num_rounds).build();
2100
2101        let mut commits = dag_builder
2102            .get_sub_dag_and_commits(1..=num_rounds)
2103            .into_iter()
2104            .map(|(_subdag, commit)| commit)
2105            .collect::<Vec<_>>();
2106
2107        let temp_commits = commits.split_off(7);
2111        dag_state.accept_blocks(dag_builder.blocks(1..=8));
2112        for commit in commits.clone() {
2113            dag_state.add_commit(commit);
2114        }
2115
2116        let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2119        for commit in commits.iter() {
2120            all_committed_blocks.extend(commit.blocks());
2121        }
2122        dag_state.flush();
2124
2125        dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2127        for commit in temp_commits.clone() {
2128            dag_state.add_commit(commit);
2129        }
2130
2131        let all_blocks = dag_builder.blocks(1..=num_rounds);
2133        let block_refs = all_blocks
2134            .iter()
2135            .map(|block| block.reference())
2136            .collect::<Vec<_>>();
2137        let result = dag_state
2138            .get_blocks(&block_refs)
2139            .into_iter()
2140            .map(|b| b.unwrap())
2141            .collect::<Vec<_>>();
2142        assert_eq!(result, all_blocks);
2143
2144        assert_eq!(dag_state.last_commit_index(), 9);
2146        assert_eq!(
2147            dag_state.last_committed_rounds(),
2148            dag_builder.last_committed_rounds.clone()
2149        );
2150
2151        drop(dag_state);
2153
2154        let dag_state = DagState::new(context.clone(), store.clone());
2156
2157        let blocks = dag_builder.blocks(1..=5);
2159        let block_refs = blocks
2160            .iter()
2161            .map(|block| block.reference())
2162            .collect::<Vec<_>>();
2163        let result = dag_state
2164            .get_blocks(&block_refs)
2165            .into_iter()
2166            .map(|b| b.unwrap())
2167            .collect::<Vec<_>>();
2168        assert_eq!(result, blocks);
2169
2170        let missing_blocks = dag_builder.blocks(9..=num_rounds);
2172        let block_refs = missing_blocks
2173            .iter()
2174            .map(|block| block.reference())
2175            .collect::<Vec<_>>();
2176        let retrieved_blocks = dag_state
2177            .get_blocks(&block_refs)
2178            .into_iter()
2179            .flatten()
2180            .collect::<Vec<_>>();
2181        assert!(retrieved_blocks.is_empty());
2182
2183        assert_eq!(dag_state.last_commit_index(), 7);
2185
2186        let expected_last_committed_rounds = vec![5, 6, 6, 7];
2188        assert_eq!(
2189            dag_state.last_committed_rounds(),
2190            expected_last_committed_rounds
2191        );
2192        assert_eq!(dag_state.scoring_subdags_count(), 7);
2195        for (authority_index, _) in context.committee.authorities() {
2197            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2198
2199            if authority_index == AuthorityIndex::new_for_test(0) {
2204                assert_eq!(blocks.len(), 4);
2205                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2206                assert!(
2207                    blocks
2208                        .into_iter()
2209                        .all(|block| block.round() >= 2 && block.round() <= 5)
2210                );
2211            } else {
2212                assert_eq!(blocks.len(), 4);
2213                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2214                assert!(
2215                    blocks
2216                        .into_iter()
2217                        .all(|block| block.round() >= 5 && block.round() <= 8)
2218                );
2219            }
2220        }
2221        let gc_round = dag_state.gc_round();
2224        assert_eq!(gc_round, 4);
2225        dag_state
2226            .recent_blocks
2227            .iter()
2228            .for_each(|(block_ref, block_info)| {
2229                if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2230                    assert!(
2231                        block_info.committed,
2232                        "Block {block_ref:?} should be committed"
2233                    );
2234                };
2235            });
2236    }
2237
2238    #[tokio::test]
2239    async fn test_block_info_as_committed() {
2240        let num_authorities: u32 = 4;
2241        let (context, _) = Context::new_for_test(num_authorities as usize);
2242        let context = Arc::new(context);
2243
2244        let store = Arc::new(MemStore::new());
2245        let mut dag_state = DagState::new(context.clone(), store.clone());
2246
2247        let block = VerifiedBlock::new_for_test(
2249            TestBlock::new(1, 0)
2250                .set_timestamp_ms(1000)
2251                .set_ancestors(vec![])
2252                .build(),
2253        );
2254
2255        dag_state.accept_block(block.clone());
2256
2257        assert!(!dag_state.is_committed(&block.reference()));
2259
2260        assert!(
2262            dag_state.set_committed(&block.reference()),
2263            "Block should be successfully set as committed for first time"
2264        );
2265
2266        assert!(dag_state.is_committed(&block.reference()));
2268
2269        assert!(
2271            !dag_state.set_committed(&block.reference()),
2272            "Block should not be successfully set as committed"
2273        );
2274    }
2275
2276    #[tokio::test]
2277    async fn test_get_cached_blocks() {
2278        let (mut context, _) = Context::new_for_test(4);
2279        context.parameters.dag_state_cached_rounds = 5;
2280
2281        let context = Arc::new(context);
2282        let store = Arc::new(MemStore::new());
2283        let mut dag_state = DagState::new(context.clone(), store.clone());
2284
2285        let mut all_blocks = Vec::new();
2290        for author in 1..=3 {
2291            for round in 10..(10 + author) {
2292                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2293                all_blocks.push(block.clone());
2294                dag_state.accept_block(block);
2295            }
2296        }
2297
2298        let cached_blocks =
2299            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2300        assert!(cached_blocks.is_empty());
2301
2302        let cached_blocks =
2303            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2304        assert_eq!(cached_blocks.len(), 1);
2305        assert_eq!(cached_blocks[0].round(), 10);
2306
2307        let cached_blocks =
2308            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2309        assert_eq!(cached_blocks.len(), 2);
2310        assert_eq!(cached_blocks[0].round(), 10);
2311        assert_eq!(cached_blocks[1].round(), 11);
2312
2313        let cached_blocks =
2314            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2315        assert_eq!(cached_blocks.len(), 1);
2316        assert_eq!(cached_blocks[0].round(), 11);
2317
2318        let cached_blocks =
2319            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2320        assert_eq!(cached_blocks.len(), 3);
2321        assert_eq!(cached_blocks[0].round(), 10);
2322        assert_eq!(cached_blocks[1].round(), 11);
2323        assert_eq!(cached_blocks[2].round(), 12);
2324
2325        let cached_blocks =
2326            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2327        assert_eq!(cached_blocks.len(), 1);
2328        assert_eq!(cached_blocks[0].round(), 12);
2329
2330        let cached_blocks = dag_state.get_cached_blocks_in_range(
2334            context.committee.to_authority_index(3).unwrap(),
2335            10,
2336            10,
2337            1,
2338        );
2339        assert!(cached_blocks.is_empty());
2340
2341        let cached_blocks = dag_state.get_cached_blocks_in_range(
2343            context.committee.to_authority_index(3).unwrap(),
2344            11,
2345            10,
2346            1,
2347        );
2348        assert!(cached_blocks.is_empty());
2349
2350        let cached_blocks = dag_state.get_cached_blocks_in_range(
2352            context.committee.to_authority_index(0).unwrap(),
2353            9,
2354            10,
2355            1,
2356        );
2357        assert!(cached_blocks.is_empty());
2358
2359        let cached_blocks = dag_state.get_cached_blocks_in_range(
2361            context.committee.to_authority_index(1).unwrap(),
2362            9,
2363            11,
2364            1,
2365        );
2366        assert_eq!(cached_blocks.len(), 1);
2367        assert_eq!(cached_blocks[0].round(), 10);
2368
2369        let cached_blocks = dag_state.get_cached_blocks_in_range(
2371            context.committee.to_authority_index(2).unwrap(),
2372            9,
2373            12,
2374            5,
2375        );
2376        assert_eq!(cached_blocks.len(), 2);
2377        assert_eq!(cached_blocks[0].round(), 10);
2378        assert_eq!(cached_blocks[1].round(), 11);
2379
2380        let cached_blocks = dag_state.get_cached_blocks_in_range(
2382            context.committee.to_authority_index(3).unwrap(),
2383            11,
2384            20,
2385            5,
2386        );
2387        assert_eq!(cached_blocks.len(), 2);
2388        assert_eq!(cached_blocks[0].round(), 11);
2389        assert_eq!(cached_blocks[1].round(), 12);
2390
2391        let cached_blocks = dag_state.get_cached_blocks_in_range(
2393            context.committee.to_authority_index(3).unwrap(),
2394            10,
2395            20,
2396            1,
2397        );
2398        assert_eq!(cached_blocks.len(), 1);
2399        assert_eq!(cached_blocks[0].round(), 10);
2400    }
2401
2402    #[rstest]
2403    #[tokio::test]
2404    async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2405        const CACHED_ROUNDS: Round = 2;
2407        let (mut context, _) = Context::new_for_test(4);
2408        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2409
2410        if gc_depth > 0 {
2411            context
2412                .protocol_config
2413                .set_consensus_gc_depth_for_testing(gc_depth);
2414        }
2415
2416        let context = Arc::new(context);
2417        let store = Arc::new(MemStore::new());
2418        let mut dag_state = DagState::new(context.clone(), store.clone());
2419
2420        let dag_str = "DAG {
2425            Round 0 : { 4 },
2426            Round 1 : {
2427                B -> [*],
2428                C -> [*],
2429                D -> [*],
2430            },
2431            Round 2 : {
2432                C -> [*],
2433                D -> [*],
2434            },
2435            Round 3 : {
2436                D -> [*],
2437            },
2438        }";
2439
2440        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2441
2442        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2444
2445        for block in dag_builder
2447            .all_blocks()
2448            .into_iter()
2449            .chain(std::iter::once(block))
2450        {
2451            dag_state.accept_block(block);
2452        }
2453
2454        dag_state.add_commit(TrustedCommit::new_for_test(
2455            1 as CommitIndex,
2456            CommitDigest::MIN,
2457            context.clock.timestamp_utc_ms(),
2458            dag_builder.leader_block(3).unwrap().reference(),
2459            vec![],
2460        ));
2461
2462        let end_round = 4;
2464        let expected_rounds = vec![0, 1, 2, 3];
2465        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2466        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2468        assert_eq!(
2469            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2470            expected_rounds
2471        );
2472        assert_eq!(
2473            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2474            expected_excluded_and_equivocating_blocks
2475        );
2476
2477        for (i, expected_round) in expected_rounds.iter().enumerate() {
2479            let round = dag_state
2480                .get_last_cached_block_in_range(
2481                    context.committee.to_authority_index(i).unwrap(),
2482                    0,
2483                    end_round,
2484                )
2485                .map(|b| b.round())
2486                .unwrap_or_default();
2487            assert_eq!(round, *expected_round, "Authority {i}");
2488        }
2489
2490        let start_round = 2;
2492        let expected_rounds = [0, 0, 2, 3];
2493
2494        for (i, expected_round) in expected_rounds.iter().enumerate() {
2496            let round = dag_state
2497                .get_last_cached_block_in_range(
2498                    context.committee.to_authority_index(i).unwrap(),
2499                    start_round,
2500                    end_round,
2501                )
2502                .map(|b| b.round())
2503                .unwrap_or_default();
2504            assert_eq!(round, *expected_round, "Authority {i}");
2505        }
2506
2507        dag_state.flush();
2516
2517        let end_round = 3;
2519        let expected_rounds = vec![0, 1, 2, 2];
2520
2521        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2523        assert_eq!(
2524            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2525            expected_rounds
2526        );
2527
2528        for (i, expected_round) in expected_rounds.iter().enumerate() {
2530            let round = dag_state
2531                .get_last_cached_block_in_range(
2532                    context.committee.to_authority_index(i).unwrap(),
2533                    0,
2534                    end_round,
2535                )
2536                .map(|b| b.round())
2537                .unwrap_or_default();
2538            assert_eq!(round, *expected_round, "Authority {i}");
2539        }
2540    }
2541
2542    #[tokio::test]
2543    #[should_panic(
2544        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2545    )]
2546    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2547        const CACHED_ROUNDS: Round = 1;
2549        const GC_DEPTH: u32 = 1;
2550        let (mut context, _) = Context::new_for_test(4);
2551        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2552        context
2553            .protocol_config
2554            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2555
2556        let context = Arc::new(context);
2557        let store = Arc::new(MemStore::new());
2558        let mut dag_state = DagState::new(context.clone(), store.clone());
2559
2560        let mut dag_builder = DagBuilder::new(context.clone());
2565        dag_builder
2566            .layers(1..=1)
2567            .authorities(vec![AuthorityIndex::new_for_test(0)])
2568            .skip_block()
2569            .build();
2570        dag_builder
2571            .layers(2..=2)
2572            .authorities(vec![
2573                AuthorityIndex::new_for_test(0),
2574                AuthorityIndex::new_for_test(1),
2575            ])
2576            .skip_block()
2577            .build();
2578        dag_builder
2579            .layers(3..=3)
2580            .authorities(vec![
2581                AuthorityIndex::new_for_test(0),
2582                AuthorityIndex::new_for_test(1),
2583                AuthorityIndex::new_for_test(2),
2584            ])
2585            .skip_block()
2586            .build();
2587
2588        for block in dag_builder.all_blocks() {
2590            dag_state.accept_block(block);
2591        }
2592
2593        dag_state.add_commit(TrustedCommit::new_for_test(
2594            1 as CommitIndex,
2595            CommitDigest::MIN,
2596            0,
2597            dag_builder.leader_block(3).unwrap().reference(),
2598            vec![],
2599        ));
2600
2601        dag_state.flush();
2603
2604        dag_state.get_last_cached_block_per_authority(2);
2607    }
2608
2609    #[tokio::test]
2610    async fn test_last_quorum() {
2611        let (context, _) = Context::new_for_test(4);
2613        let context = Arc::new(context);
2614        let store = Arc::new(MemStore::new());
2615        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2616
2617        {
2619            let genesis = genesis_blocks(&context);
2620
2621            assert_eq!(dag_state.read().last_quorum(), genesis);
2622        }
2623
2624        {
2627            let mut dag_builder = DagBuilder::new(context.clone());
2628            dag_builder
2629                .layers(1..=4)
2630                .build()
2631                .persist_layers(dag_state.clone());
2632            let round_4_blocks: Vec<_> = dag_builder
2633                .blocks(4..=4)
2634                .into_iter()
2635                .map(|block| block.reference())
2636                .collect();
2637
2638            let last_quorum = dag_state.read().last_quorum();
2639
2640            assert_eq!(
2641                last_quorum
2642                    .into_iter()
2643                    .map(|block| block.reference())
2644                    .collect::<Vec<_>>(),
2645                round_4_blocks
2646            );
2647        }
2648
2649        {
2652            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2653            dag_state.write().accept_block(block);
2654
2655            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2656
2657            let last_quorum = dag_state.read().last_quorum();
2658
2659            assert_eq!(last_quorum, round_4_blocks);
2660        }
2661    }
2662
2663    #[tokio::test]
2664    async fn test_last_block_for_authority() {
2665        let (context, _) = Context::new_for_test(4);
2667        let context = Arc::new(context);
2668        let store = Arc::new(MemStore::new());
2669        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2670
2671        {
2673            let genesis = genesis_blocks(&context);
2674            let my_genesis = genesis
2675                .into_iter()
2676                .find(|block| block.author() == context.own_index)
2677                .unwrap();
2678
2679            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2680        }
2681
2682        {
2685            let mut dag_builder = DagBuilder::new(context.clone());
2687            dag_builder
2688                .layers(1..=4)
2689                .build()
2690                .persist_layers(dag_state.clone());
2691
2692            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2694            dag_state.write().accept_block(block);
2695
2696            let block = dag_state
2697                .read()
2698                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2699            assert_eq!(block.round(), 5);
2700
2701            for (authority_index, _) in context.committee.authorities() {
2702                let block = dag_state
2703                    .read()
2704                    .get_last_block_for_authority(authority_index);
2705
2706                if authority_index.value() == 0 {
2707                    assert_eq!(block.round(), 5);
2708                } else {
2709                    assert_eq!(block.round(), 4);
2710                }
2711            }
2712        }
2713    }
2714
2715    #[tokio::test]
2716    #[should_panic]
2717    async fn test_accept_block_panics_when_timestamp_is_ahead() {
2718        let (mut context, _) = Context::new_for_test(4);
2720        context
2721            .protocol_config
2722            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
2723        let context = Arc::new(context);
2724        let store = Arc::new(MemStore::new());
2725        let mut dag_state = DagState::new(context.clone(), store.clone());
2726
2727        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2729
2730        let block = VerifiedBlock::new_for_test(
2731            TestBlock::new(10, 0)
2732                .set_timestamp_ms(block_timestamp)
2733                .build(),
2734        );
2735
2736        dag_state.accept_block(block);
2739    }
2740
2741    #[tokio::test]
2742    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2743        let (mut context, _) = Context::new_for_test(4);
2745        context
2746            .protocol_config
2747            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(true);
2748
2749        let context = Arc::new(context);
2750        let store = Arc::new(MemStore::new());
2751        let mut dag_state = DagState::new(context.clone(), store.clone());
2752
2753        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2755
2756        let block = VerifiedBlock::new_for_test(
2757            TestBlock::new(10, 0)
2758                .set_timestamp_ms(block_timestamp)
2759                .build(),
2760        );
2761
2762        dag_state.accept_block(block);
2764    }
2765}