1use std::{
6 cmp::max,
7 collections::{BTreeMap, BTreeSet, VecDeque},
8 ops::Bound::{Excluded, Included, Unbounded},
9 panic,
10 sync::Arc,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info};
18
19use crate::{
20 CommittedSubDag,
21 block::{
22 BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23 VerifiedBlock, genesis_blocks,
24 },
25 commit::{
26 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28 },
29 context::Context,
30 leader_scoring::{ReputationScores, ScoringSubdag},
31 storage::{Store, WriteBatch},
32 threshold_clock::ThresholdClock,
33};
34
35pub(crate) struct DagState {
44 context: Arc<Context>,
45
46 genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65 threshold_clock: ThresholdClock,
67
68 evicted_rounds: Vec<Round>,
73
74 highest_accepted_round: Round,
76
77 last_commit: Option<TrustedCommit>,
79
80 last_commit_round_advancement_time: Option<std::time::Instant>,
82
83 last_committed_rounds: Vec<Round>,
85
86 scoring_subdag: ScoringSubdag,
89 unscored_committed_subdags: Vec<CommittedSubDag>,
94
95 pending_commit_votes: VecDeque<CommitVote>,
98
99 blocks_to_write: Vec<VerifiedBlock>,
101 commits_to_write: Vec<TrustedCommit>,
102
103 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108 store: Arc<dyn Store>,
110
111 cached_rounds: Round,
113}
114
115impl DagState {
116 pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119 let num_authorities = context.committee.size();
120
121 let genesis = genesis_blocks(context.clone())
122 .into_iter()
123 .map(|block| (block.reference(), block))
124 .collect();
125
126 let threshold_clock = ThresholdClock::new(1, context.clone());
127
128 let last_commit = store
129 .read_last_commit()
130 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132 let commit_info = store
133 .read_last_commit_info()
134 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135 let (mut last_committed_rounds, commit_recovery_start_index) =
136 if let Some((commit_ref, commit_info)) = commit_info {
137 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138 (commit_info.committed_rounds, commit_ref.index + 1)
139 } else {
140 tracing::info!("Found no stored CommitInfo to recover from");
141 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142 };
143
144 let mut unscored_committed_subdags = Vec::new();
145 let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147 if let Some(last_commit) = last_commit.as_ref() {
148 store
149 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151 .iter()
152 .for_each(|commit| {
153 for block_ref in commit.blocks() {
154 last_committed_rounds[block_ref.author] =
155 max(last_committed_rounds[block_ref.author], block_ref.round);
156 }
157
158 let committed_subdag =
159 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); unscored_committed_subdags.push(committed_subdag);
161 });
162 }
163
164 tracing::info!(
165 "DagState was initialized with the following state: \
166 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167 unscored_committed_subdags.len()
168 );
169
170 if context
171 .protocol_config
172 .consensus_distributed_vote_scoring_strategy()
173 {
174 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175 }
176
177 let mut state = Self {
178 context,
179 genesis,
180 recent_blocks: BTreeMap::new(),
181 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182 threshold_clock,
183 highest_accepted_round: 0,
184 last_commit: last_commit.clone(),
185 last_commit_round_advancement_time: None,
186 last_committed_rounds: last_committed_rounds.clone(),
187 pending_commit_votes: VecDeque::new(),
188 blocks_to_write: vec![],
189 commits_to_write: vec![],
190 commit_info_to_write: vec![],
191 scoring_subdag,
192 unscored_committed_subdags,
193 store: store.clone(),
194 cached_rounds,
195 evicted_rounds: vec![0; num_authorities],
196 };
197
198 for (i, round) in last_committed_rounds.into_iter().enumerate() {
199 let authority_index = state.context.committee.to_authority_index(i).unwrap();
200 let (blocks, eviction_round) = if state.gc_enabled() {
201 let last_block = state
205 .store
206 .scan_last_blocks_by_author(authority_index, 1, None)
207 .expect("Database error");
208 let last_block_round = last_block
209 .last()
210 .map(|b| b.round())
211 .unwrap_or(GENESIS_ROUND);
212
213 let eviction_round = Self::gc_eviction_round(
214 last_block_round,
215 state.gc_round(),
216 state.cached_rounds,
217 );
218 let blocks = state
219 .store
220 .scan_blocks_by_author(authority_index, eviction_round + 1)
221 .expect("Database error");
222 (blocks, eviction_round)
223 } else {
224 let eviction_round = Self::eviction_round(round, cached_rounds);
225 let blocks = state
226 .store
227 .scan_blocks_by_author(authority_index, eviction_round + 1)
228 .expect("Database error");
229 (blocks, eviction_round)
230 };
231
232 state.evicted_rounds[authority_index] = eviction_round;
233
234 for block in &blocks {
236 state.update_block_metadata(block);
237 }
238
239 info!(
240 "Recovered blocks {}: {:?}",
241 authority_index,
242 blocks
243 .iter()
244 .map(|b| b.reference())
245 .collect::<Vec<BlockRef>>()
246 );
247 }
248
249 let recovered_scoring_metrics = state.store.scan_metrics().expect("Database error");
252 state.context.metrics.initialize_scoring_metrics(
253 recovered_scoring_metrics,
254 &state.recent_refs_by_authority,
255 state.threshold_clock_round(),
256 &state.evicted_rounds,
257 state.context.clone(),
258 );
259
260 if state.gc_enabled() {
261 if let Some(last_commit) = last_commit {
262 let mut index = last_commit.index();
263 let gc_round = state.gc_round();
264 info!(
265 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
266 index, gc_round
267 );
268
269 loop {
270 let commits = store
271 .scan_commits((index..=index).into())
272 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
273 let Some(commit) = commits.first() else {
274 info!(
275 "Recovering finished up to index {index}, no more commits to recover"
276 );
277 break;
278 };
279
280 if gc_round > 0 && commit.leader().round <= gc_round {
283 info!(
284 "Recovering finished, reached commit leader round {} <= gc_round {}",
285 commit.leader().round,
286 gc_round
287 );
288 break;
289 }
290
291 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
292 debug!(
293 "Setting block {:?} as committed based on commit {:?}",
294 block_ref,
295 commit.index()
296 );
297 assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
298 });
299
300 index = index.saturating_sub(1);
302 if index == 0 {
303 break;
304 }
305 }
306 }
307 }
308
309 state
310 }
311
312 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
314 assert_ne!(
315 block.round(),
316 0,
317 "Genesis block should not be accepted into DAG."
318 );
319
320 let block_ref = block.reference();
321 if self.contains_block(&block_ref) {
322 return;
323 }
324
325 let now = self.context.clock.timestamp_utc_ms();
326 if block.timestamp_ms() > now {
327 panic!(
328 "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
329 block,
330 block.timestamp_ms(),
331 now,
332 );
333 }
334
335 if block_ref.author == self.context.own_index {
338 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
339 assert!(
340 existing_blocks.is_empty(),
341 "Block Rejected! Attempted to add block {block:#?} to own slot where \
342 block(s) {existing_blocks:#?} already exists."
343 );
344 }
345 self.update_block_metadata(&block);
346 self.blocks_to_write.push(block);
347 let source = if self.context.own_index == block_ref.author {
348 "own"
349 } else {
350 "others"
351 };
352 self.context
353 .metrics
354 .node_metrics
355 .accepted_blocks
356 .with_label_values(&[source])
357 .inc();
358 }
359
360 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
362 let block_ref = block.reference();
363 self.recent_blocks
364 .insert(block_ref, BlockInfo::new(block.clone()));
365 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
366 self.threshold_clock.add_block(block_ref);
367 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
368 self.context
369 .metrics
370 .node_metrics
371 .highest_accepted_round
372 .set(self.highest_accepted_round as i64);
373
374 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
375 .last()
376 .map(|block_ref| block_ref.round)
377 .expect("There should be by now at least one block ref");
378 let hostname = &self.context.committee.authority(block_ref.author).hostname;
379 self.context
380 .metrics
381 .node_metrics
382 .highest_accepted_authority_round
383 .with_label_values(&[hostname])
384 .set(highest_accepted_round_for_author as i64);
385 }
386
387 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
389 debug!(
390 "Accepting blocks: {}",
391 blocks.iter().map(|b| b.reference().to_string()).join(",")
392 );
393 for block in blocks {
394 self.accept_block(block);
395 }
396 }
397
398 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
401 self.get_blocks(&[*reference])
402 .pop()
403 .expect("Exactly one element should be returned")
404 }
405
406 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
410 let mut blocks = vec![None; block_refs.len()];
411 let mut missing = Vec::new();
412
413 for (index, block_ref) in block_refs.iter().enumerate() {
414 if block_ref.round == GENESIS_ROUND {
415 if let Some(block) = self.genesis.get(block_ref) {
417 blocks[index] = Some(block.clone());
418 }
419 continue;
420 }
421 if let Some(block_info) = self.recent_blocks.get(block_ref) {
422 blocks[index] = Some(block_info.block.clone());
423 continue;
424 }
425 missing.push((index, block_ref));
426 }
427
428 if missing.is_empty() {
429 return blocks;
430 }
431
432 let missing_refs = missing
433 .iter()
434 .map(|(_, block_ref)| **block_ref)
435 .collect::<Vec<_>>();
436 let store_results = self
437 .store
438 .read_blocks(&missing_refs)
439 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
440 self.context
441 .metrics
442 .node_metrics
443 .dag_state_store_read_count
444 .with_label_values(&["get_blocks"])
445 .inc();
446
447 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
448 blocks[index] = result;
449 }
450
451 blocks
452 }
453
454 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
458 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
459 if !block_info.committed {
460 block_info.committed = true;
461 return true;
462 }
463 false
464 } else {
465 panic!("Block {block_ref:?} not found in cache to set as committed.");
466 }
467 }
468
469 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
470 self.recent_blocks
471 .get(block_ref)
472 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
473 .committed
474 }
475
476 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
480 let mut blocks = vec![];
485 for (_block_ref, block_info) in self.recent_blocks.range((
486 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
487 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
488 )) {
489 blocks.push(block_info.block.clone())
490 }
491 blocks
492 }
493
494 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
498 if round <= self.last_commit_round() {
499 panic!("Round {round} have committed blocks!");
500 }
501
502 let mut blocks = vec![];
503 for (_block_ref, block_info) in self.recent_blocks.range((
504 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
505 Excluded(BlockRef::new(
506 round + 1,
507 AuthorityIndex::ZERO,
508 BlockDigest::MIN,
509 )),
510 )) {
511 blocks.push(block_info.block.clone())
512 }
513 blocks
514 }
515
516 pub(crate) fn ancestors_at_round(
518 &self,
519 later_block: &VerifiedBlock,
520 earlier_round: Round,
521 ) -> Vec<VerifiedBlock> {
522 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
524 while !linked.is_empty() {
525 let round = linked.last().unwrap().round;
526 if round <= earlier_round {
528 break;
529 }
530 let block_ref = linked.pop_last().unwrap();
531 let Some(block) = self.get_block(&block_ref) else {
532 panic!("Block {block_ref:?} should exist in DAG!");
533 };
534 linked.extend(block.ancestors().iter().cloned());
535 }
536 linked
537 .range((
538 Included(BlockRef::new(
539 earlier_round,
540 AuthorityIndex::ZERO,
541 BlockDigest::MIN,
542 )),
543 Unbounded,
544 ))
545 .map(|r| {
546 self.get_block(r)
547 .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
548 .clone()
549 })
550 .collect()
551 }
552
553 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
556 self.get_last_block_for_authority(self.context.own_index)
557 }
558
559 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
563 if let Some(last) = self.recent_refs_by_authority[authority].last() {
564 return self
565 .recent_blocks
566 .get(last)
567 .expect("Block should be found in recent blocks")
568 .block
569 .clone();
570 }
571
572 let (_, genesis_block) = self
574 .genesis
575 .iter()
576 .find(|(block_ref, _)| block_ref.author == authority)
577 .expect("Genesis should be found for authority {authority_index}");
578 genesis_block.clone()
579 }
580
581 pub(crate) fn get_cached_blocks(
587 &self,
588 authority: AuthorityIndex,
589 start: Round,
590 ) -> Vec<VerifiedBlock> {
591 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
592 }
593
594 pub(crate) fn get_cached_blocks_in_range(
597 &self,
598 authority: AuthorityIndex,
599 start_round: Round,
600 end_round: Round,
601 limit: usize,
602 ) -> Vec<VerifiedBlock> {
603 if start_round >= end_round || limit == 0 {
604 return vec![];
605 }
606
607 let mut blocks = vec![];
608 for block_ref in self.recent_refs_by_authority[authority].range((
609 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
610 Excluded(BlockRef::new(
611 end_round,
612 AuthorityIndex::MIN,
613 BlockDigest::MIN,
614 )),
615 )) {
616 let block_info = self
617 .recent_blocks
618 .get(block_ref)
619 .expect("Block should exist in recent blocks");
620 blocks.push(block_info.block.clone());
621 if blocks.len() >= limit {
622 break;
623 }
624 }
625 blocks
626 }
627
628 pub(crate) fn get_last_cached_block_in_range(
631 &self,
632 authority: AuthorityIndex,
633 start_round: Round,
634 end_round: Round,
635 ) -> Option<VerifiedBlock> {
636 if start_round >= end_round {
637 return None;
638 }
639
640 let block_ref = self.recent_refs_by_authority[authority]
641 .range((
642 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
643 Excluded(BlockRef::new(
644 end_round,
645 AuthorityIndex::MIN,
646 BlockDigest::MIN,
647 )),
648 ))
649 .last()?;
650
651 self.recent_blocks
652 .get(block_ref)
653 .map(|block_info| block_info.block.clone())
654 }
655
656 pub(crate) fn get_last_cached_block_per_authority(
665 &self,
666 end_round: Round,
667 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
668 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
670 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
671
672 if end_round == GENESIS_ROUND {
673 panic!(
674 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
675 );
676 }
677
678 if end_round == GENESIS_ROUND + 1 {
679 return blocks.into_iter().map(|b| (b, vec![])).collect();
680 }
681
682 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
683 let authority_index = self
684 .context
685 .committee
686 .to_authority_index(authority_index)
687 .unwrap();
688
689 let last_evicted_round = self.evicted_rounds[authority_index];
690 if end_round.saturating_sub(1) <= last_evicted_round {
691 panic!(
692 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
693 );
694 }
695
696 let block_ref_iter = block_refs
697 .range((
698 Included(BlockRef::new(
699 last_evicted_round + 1,
700 authority_index,
701 BlockDigest::MIN,
702 )),
703 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
704 ))
705 .rev();
706
707 let mut last_round = 0;
708 for block_ref in block_ref_iter {
709 if last_round == 0 {
710 last_round = block_ref.round;
711 let block_info = self
712 .recent_blocks
713 .get(block_ref)
714 .expect("Block should exist in recent blocks");
715 blocks[authority_index] = block_info.block.clone();
716 continue;
717 }
718 if block_ref.round < last_round {
719 break;
720 }
721 equivocating_blocks[authority_index].push(*block_ref);
722 }
723 }
724
725 blocks.into_iter().zip(equivocating_blocks).collect()
726 }
727
728 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
732 if slot.round == GENESIS_ROUND {
734 return true;
735 }
736
737 let eviction_round = self.evicted_rounds[slot.authority];
738 if slot.round <= eviction_round {
739 panic!(
740 "{}",
741 format!(
742 "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
743 if self.gc_enabled() { " gc " } else { " " }
744 )
745 );
746 }
747
748 let mut result = self.recent_refs_by_authority[slot.authority].range((
749 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
750 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
751 ));
752 result.next().is_some()
753 }
754
755 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
759 let mut exist = vec![false; block_refs.len()];
760 let mut missing = Vec::new();
761
762 for (index, block_ref) in block_refs.into_iter().enumerate() {
763 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
764 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
765 exist[index] = true;
766 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
767 {
768 exist[index] = false;
773 } else {
774 missing.push((index, block_ref));
775 }
776 }
777
778 if missing.is_empty() {
779 return exist;
780 }
781
782 let missing_refs = missing
783 .iter()
784 .map(|(_, block_ref)| *block_ref)
785 .collect::<Vec<_>>();
786 let store_results = self
787 .store
788 .contains_blocks(&missing_refs)
789 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
790 self.context
791 .metrics
792 .node_metrics
793 .dag_state_store_read_count
794 .with_label_values(&["contains_blocks"])
795 .inc();
796
797 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
798 exist[index] = result;
799 }
800
801 exist
802 }
803
804 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
805 let blocks = self.contains_blocks(vec![*block_ref]);
806 blocks.first().cloned().unwrap()
807 }
808
809 pub(crate) fn threshold_clock_round(&self) -> Round {
810 self.threshold_clock.get_round()
811 }
812
813 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
814 self.threshold_clock.get_quorum_ts()
815 }
816
817 pub(crate) fn highest_accepted_round(&self) -> Round {
818 self.highest_accepted_round
819 }
820
821 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
824 if let Some(last_commit) = &self.last_commit {
825 if commit.index() <= last_commit.index() {
826 error!(
827 "New commit index {} <= last commit index {}!",
828 commit.index(),
829 last_commit.index()
830 );
831 return;
832 }
833 assert_eq!(commit.index(), last_commit.index() + 1);
834
835 if commit.timestamp_ms() < last_commit.timestamp_ms() {
836 panic!(
837 "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
838 );
839 }
840 } else {
841 assert_eq!(commit.index(), 1);
842 }
843
844 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
845 previous_commit.round() < commit.round()
846 } else {
847 true
848 };
849
850 self.last_commit = Some(commit.clone());
851
852 if commit_round_advanced {
853 let now = std::time::Instant::now();
854 if let Some(previous_time) = self.last_commit_round_advancement_time {
855 self.context
856 .metrics
857 .node_metrics
858 .commit_round_advancement_interval
859 .observe(now.duration_since(previous_time).as_secs_f64())
860 }
861 self.last_commit_round_advancement_time = Some(now);
862 }
863
864 for block_ref in commit.blocks().iter() {
865 self.last_committed_rounds[block_ref.author] = max(
866 self.last_committed_rounds[block_ref.author],
867 block_ref.round,
868 );
869 }
870
871 for (i, round) in self.last_committed_rounds.iter().enumerate() {
872 let index = self.context.committee.to_authority_index(i).unwrap();
873 let hostname = &self.context.committee.authority(index).hostname;
874 self.context
875 .metrics
876 .node_metrics
877 .last_committed_authority_round
878 .with_label_values(&[hostname])
879 .set((*round).into());
880 }
881
882 self.pending_commit_votes.push_back(commit.reference());
883 self.commits_to_write.push(commit);
884 }
885
886 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
887 assert!(self.unscored_committed_subdags.is_empty());
889
890 assert!(self.scoring_subdag.is_empty());
894
895 let commit_info = CommitInfo {
896 committed_rounds: self.last_committed_rounds.clone(),
897 reputation_scores,
898 };
899 let last_commit = self
900 .last_commit
901 .as_ref()
902 .expect("Last commit should already be set.");
903 self.commit_info_to_write
904 .push((last_commit.reference(), commit_info));
905 }
906
907 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
908 let mut votes = Vec::new();
909 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
910 votes.push(self.pending_commit_votes.pop_front().unwrap());
911 }
912 votes
913 }
914
915 pub(crate) fn last_commit_index(&self) -> CommitIndex {
917 match &self.last_commit {
918 Some(commit) => commit.index(),
919 None => 0,
920 }
921 }
922
923 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
925 match &self.last_commit {
926 Some(commit) => commit.digest(),
927 None => CommitDigest::MIN,
928 }
929 }
930
931 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
933 match &self.last_commit {
934 Some(commit) => commit.timestamp_ms(),
935 None => 0,
936 }
937 }
938
939 pub(crate) fn last_commit_leader(&self) -> Slot {
941 match &self.last_commit {
942 Some(commit) => commit.leader().into(),
943 None => self
944 .genesis
945 .iter()
946 .next()
947 .map(|(genesis_ref, _)| *genesis_ref)
948 .expect("Genesis blocks should always be available.")
949 .into(),
950 }
951 }
952
953 pub(crate) fn last_commit_round(&self) -> Round {
956 match &self.last_commit {
957 Some(commit) => commit.leader().round,
958 None => 0,
959 }
960 }
961
962 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
964 self.last_committed_rounds.clone()
965 }
966
967 pub(crate) fn gc_round(&self) -> Round {
974 self.calculate_gc_round(self.last_commit_round())
975 }
976
977 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
978 let gc_depth = self.context.protocol_config.gc_depth();
979 if gc_depth > 0 {
980 commit_round.saturating_sub(gc_depth)
982 } else {
983 GENESIS_ROUND
986 }
987 }
988
989 pub(crate) fn gc_enabled(&self) -> bool {
990 self.context.protocol_config.gc_depth() > 0
991 }
992
993 pub(crate) fn flush(&mut self) {
996 let _s = self
997 .context
998 .metrics
999 .node_metrics
1000 .scope_processing_time
1001 .with_label_values(&["DagState::flush"])
1002 .start_timer();
1003 let blocks = std::mem::take(&mut self.blocks_to_write);
1005 let commits = std::mem::take(&mut self.commits_to_write);
1006 let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
1007
1008 if blocks.is_empty() && commits.is_empty() {
1009 return;
1010 }
1011 debug!(
1012 "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
1013 blocks.len(),
1014 blocks.iter().map(|b| b.reference().to_string()).join(","),
1015 commits.len(),
1016 commits.iter().map(|c| c.reference().to_string()).join(","),
1017 commit_info_to_write.len(),
1018 commit_info_to_write
1019 .iter()
1020 .map(|(commit_ref, _)| commit_ref.to_string())
1021 .join(","),
1022 );
1023
1024 let mut metrics_to_write = vec![];
1026 let threshold_clock_round = self.threshold_clock_round();
1027 for (authority_index, authority) in self.context.committee.authorities() {
1028 let last_eviction_round = self.evicted_rounds[authority_index];
1029 let current_eviction_round = self.calculate_authority_eviction_round(authority_index);
1030 let metrics_to_write_from_authority =
1031 self.context.metrics.update_scoring_metrics_on_eviction(
1032 authority_index,
1033 authority.hostname.as_str(),
1034 &self.recent_refs_by_authority[authority_index],
1035 current_eviction_round,
1036 last_eviction_round,
1037 threshold_clock_round,
1038 );
1039 if let Some(metrics_to_write_from_authority) = metrics_to_write_from_authority {
1040 metrics_to_write.push((authority_index, metrics_to_write_from_authority));
1041 }
1042 }
1043
1044 self.store
1045 .write(WriteBatch::new(
1046 blocks,
1047 commits,
1048 commit_info_to_write,
1049 metrics_to_write,
1050 ))
1051 .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
1052 self.context
1053 .metrics
1054 .node_metrics
1055 .dag_state_store_write_count
1056 .inc();
1057
1058 for (authority_index, _) in self.context.committee.authorities() {
1062 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1063 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1064 let block_round = block_ref.round;
1065 if block_round <= eviction_round {
1066 self.recent_blocks.remove(block_ref);
1067 self.recent_refs_by_authority[authority_index].pop_first();
1068 } else {
1069 break;
1070 }
1071 }
1072 self.evicted_rounds[authority_index] = eviction_round;
1073 }
1074
1075 let metrics = &self.context.metrics.node_metrics;
1076 metrics
1077 .dag_state_recent_blocks
1078 .set(self.recent_blocks.len() as i64);
1079 metrics.dag_state_recent_refs.set(
1080 self.recent_refs_by_authority
1081 .iter()
1082 .map(BTreeSet::len)
1083 .sum::<usize>() as i64,
1084 );
1085 }
1086
1087 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1088 self.store
1089 .read_last_commit_info()
1090 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1091 }
1092
1093 pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1095 self.unscored_committed_subdags.len() as u64
1096 }
1097
1098 #[cfg(test)]
1099 pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1100 self.unscored_committed_subdags.clone()
1101 }
1102
1103 pub(crate) fn add_unscored_committed_subdags(
1104 &mut self,
1105 committed_subdags: Vec<CommittedSubDag>,
1106 ) {
1107 self.unscored_committed_subdags.extend(committed_subdags);
1108 }
1109
1110 pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1111 std::mem::take(&mut self.unscored_committed_subdags)
1112 }
1113
1114 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1115 self.scoring_subdag.add_subdags(scoring_subdags);
1116 }
1117
1118 pub(crate) fn clear_scoring_subdag(&mut self) {
1119 self.scoring_subdag.clear();
1120 }
1121
1122 pub(crate) fn scoring_subdags_count(&self) -> usize {
1123 self.scoring_subdag.scored_subdags_count()
1124 }
1125
1126 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1127 self.scoring_subdag.is_empty()
1128 }
1129
1130 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1131 self.scoring_subdag.calculate_distributed_vote_scores()
1132 }
1133
1134 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1135 self.scoring_subdag
1136 .commit_range
1137 .as_ref()
1138 .expect("commit range should exist for scoring subdag")
1139 .end()
1140 }
1141
1142 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1147 if self.gc_enabled() {
1148 let last_round = self.recent_refs_by_authority[authority_index]
1149 .last()
1150 .map(|block_ref| block_ref.round)
1151 .unwrap_or(GENESIS_ROUND);
1152
1153 Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1154 } else {
1155 let commit_round = self.last_committed_rounds[authority_index];
1156 Self::eviction_round(commit_round, self.cached_rounds)
1157 }
1158 }
1159
1160 fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1163 commit_round.saturating_sub(cached_rounds)
1164 }
1165
1166 fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1171 gc_round.min(last_round.saturating_sub(cached_rounds))
1172 }
1173
1174 #[cfg(test)]
1177 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1178 for round in
1182 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1183 {
1184 if round == GENESIS_ROUND {
1185 return self.genesis_blocks();
1186 }
1187 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1188 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1189
1190 let blocks = self.get_uncommitted_blocks_at_round(round);
1193 for block in &blocks {
1194 if quorum.add(block.author(), &self.context.committee) {
1195 return blocks;
1196 }
1197 }
1198 }
1199
1200 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1201 }
1202
1203 #[cfg(test)]
1204 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1205 self.genesis.values().cloned().collect()
1206 }
1207
1208 #[cfg(test)]
1209 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1210 self.last_commit = Some(commit);
1211 }
1212}
1213
1214struct BlockInfo {
1215 block: VerifiedBlock,
1216 committed: bool,
1218}
1219
1220impl BlockInfo {
1221 fn new(block: VerifiedBlock) -> Self {
1222 Self {
1223 block,
1224 committed: false,
1225 }
1226 }
1227}
1228
1229#[cfg(test)]
1230mod test {
1231 use std::vec;
1232
1233 use parking_lot::RwLock;
1234 use rstest::rstest;
1235
1236 use super::*;
1237 use crate::{
1238 block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1239 storage::{WriteBatch, mem_store::MemStore},
1240 test_dag_builder::DagBuilder,
1241 test_dag_parser::parse_dag,
1242 };
1243
1244 #[tokio::test]
1245 async fn test_get_blocks() {
1246 let (context, _) = Context::new_for_test(4);
1247 let context = Arc::new(context);
1248 let store = Arc::new(MemStore::new());
1249 let mut dag_state = DagState::new(context.clone(), store.clone());
1250 let own_index = AuthorityIndex::new_for_test(0);
1251
1252 let num_rounds: u32 = 10;
1254 let non_existent_round: u32 = 100;
1255 let num_authorities: u32 = 3;
1256 let num_blocks_per_slot: usize = 3;
1257 let mut blocks = BTreeMap::new();
1258 for round in 1..=num_rounds {
1259 for author in 0..num_authorities {
1260 let base_ts = round as BlockTimestampMs * 1000;
1262 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1263 let block = VerifiedBlock::new_for_test(
1264 TestBlock::new(round, author)
1265 .set_timestamp_ms(timestamp)
1266 .build(),
1267 );
1268 dag_state.accept_block(block.clone());
1269 blocks.insert(block.reference(), block);
1270
1271 if AuthorityIndex::new_for_test(author) == own_index {
1273 break;
1274 }
1275 }
1276 }
1277 }
1278
1279 for (r, block) in &blocks {
1281 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1282 }
1283
1284 let last_ref = blocks.keys().last().unwrap();
1286 assert!(
1287 dag_state
1288 .get_block(&BlockRef::new(
1289 last_ref.round,
1290 last_ref.author,
1291 BlockDigest::MIN
1292 ))
1293 .is_none()
1294 );
1295
1296 for round in 1..=num_rounds {
1298 for author in 0..num_authorities {
1299 let slot = Slot::new(
1300 round,
1301 context
1302 .committee
1303 .to_authority_index(author as usize)
1304 .unwrap(),
1305 );
1306 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1307
1308 if AuthorityIndex::new_for_test(author) == own_index {
1310 assert_eq!(blocks.len(), 1);
1311 } else {
1312 assert_eq!(blocks.len(), num_blocks_per_slot);
1313 }
1314
1315 for b in blocks {
1316 assert_eq!(b.round(), round);
1317 assert_eq!(
1318 b.author(),
1319 context
1320 .committee
1321 .to_authority_index(author as usize)
1322 .unwrap()
1323 );
1324 }
1325 }
1326 }
1327
1328 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1330 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1331
1332 for round in 1..=num_rounds {
1334 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1335 assert_eq!(
1338 blocks.len(),
1339 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1340 );
1341 for b in blocks {
1342 assert_eq!(b.round(), round);
1343 }
1344 }
1345
1346 assert!(
1348 dag_state
1349 .get_uncommitted_blocks_at_round(non_existent_round)
1350 .is_empty()
1351 );
1352 }
1353
1354 #[tokio::test]
1355 async fn test_ancestors_at_uncommitted_round() {
1356 let (context, _) = Context::new_for_test(4);
1358 let context = Arc::new(context);
1359 let store = Arc::new(MemStore::new());
1360 let mut dag_state = DagState::new(context.clone(), store.clone());
1361
1362 let round_10_refs: Vec<_> = (0..4)
1366 .map(|a| {
1367 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1368 .reference()
1369 })
1370 .collect();
1371
1372 let round_11 = vec![
1374 VerifiedBlock::new_for_test(
1376 TestBlock::new(11, 0)
1377 .set_timestamp_ms(1100)
1378 .set_ancestors(round_10_refs.clone())
1379 .build(),
1380 ),
1381 VerifiedBlock::new_for_test(
1384 TestBlock::new(11, 1)
1385 .set_timestamp_ms(1110)
1386 .set_ancestors(round_10_refs.clone())
1387 .build(),
1388 ),
1389 VerifiedBlock::new_for_test(
1391 TestBlock::new(11, 1)
1392 .set_timestamp_ms(1111)
1393 .set_ancestors(round_10_refs.clone())
1394 .build(),
1395 ),
1396 VerifiedBlock::new_for_test(
1398 TestBlock::new(11, 1)
1399 .set_timestamp_ms(1112)
1400 .set_ancestors(round_10_refs.clone())
1401 .build(),
1402 ),
1403 VerifiedBlock::new_for_test(
1405 TestBlock::new(11, 2)
1406 .set_timestamp_ms(1120)
1407 .set_ancestors(round_10_refs.clone())
1408 .build(),
1409 ),
1410 VerifiedBlock::new_for_test(
1412 TestBlock::new(11, 3)
1413 .set_timestamp_ms(1130)
1414 .set_ancestors(round_10_refs.clone())
1415 .build(),
1416 ),
1417 ];
1418
1419 let ancestors_for_round_12 = vec![
1421 round_11[0].reference(),
1422 round_11[1].reference(),
1423 round_11[5].reference(),
1424 ];
1425 let round_12 = vec![
1426 VerifiedBlock::new_for_test(
1427 TestBlock::new(12, 0)
1428 .set_timestamp_ms(1200)
1429 .set_ancestors(ancestors_for_round_12.clone())
1430 .build(),
1431 ),
1432 VerifiedBlock::new_for_test(
1433 TestBlock::new(12, 2)
1434 .set_timestamp_ms(1220)
1435 .set_ancestors(ancestors_for_round_12.clone())
1436 .build(),
1437 ),
1438 VerifiedBlock::new_for_test(
1439 TestBlock::new(12, 3)
1440 .set_timestamp_ms(1230)
1441 .set_ancestors(ancestors_for_round_12.clone())
1442 .build(),
1443 ),
1444 ];
1445
1446 let ancestors_for_round_13 = vec![
1448 round_12[0].reference(),
1449 round_12[1].reference(),
1450 round_12[2].reference(),
1451 round_11[2].reference(),
1452 ];
1453 let round_13 = vec![
1454 VerifiedBlock::new_for_test(
1455 TestBlock::new(12, 1)
1456 .set_timestamp_ms(1300)
1457 .set_ancestors(ancestors_for_round_13.clone())
1458 .build(),
1459 ),
1460 VerifiedBlock::new_for_test(
1461 TestBlock::new(12, 2)
1462 .set_timestamp_ms(1320)
1463 .set_ancestors(ancestors_for_round_13.clone())
1464 .build(),
1465 ),
1466 VerifiedBlock::new_for_test(
1467 TestBlock::new(12, 3)
1468 .set_timestamp_ms(1330)
1469 .set_ancestors(ancestors_for_round_13.clone())
1470 .build(),
1471 ),
1472 ];
1473
1474 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1476 let anchor = VerifiedBlock::new_for_test(
1477 TestBlock::new(14, 1)
1478 .set_timestamp_ms(1410)
1479 .set_ancestors(ancestors_for_round_14)
1480 .build(),
1481 );
1482
1483 for b in round_11
1485 .iter()
1486 .chain(round_12.iter())
1487 .chain(round_13.iter())
1488 .chain([anchor.clone()].iter())
1489 {
1490 dag_state.accept_block(b.clone());
1491 }
1492
1493 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1495 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1496 ancestors_refs.sort();
1497 let mut expected_refs = vec![
1498 round_11[0].reference(),
1499 round_11[1].reference(),
1500 round_11[2].reference(),
1501 round_11[5].reference(),
1502 ];
1503 expected_refs.sort(); assert_eq!(
1506 ancestors_refs, expected_refs,
1507 "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1508 );
1509 }
1510
1511 #[tokio::test]
1512 async fn test_contains_blocks_in_cache_or_store() {
1513 const CACHED_ROUNDS: Round = 2;
1515
1516 let (mut context, _) = Context::new_for_test(4);
1517 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1518
1519 let context = Arc::new(context);
1520 let store = Arc::new(MemStore::new());
1521 let mut dag_state = DagState::new(context.clone(), store.clone());
1522
1523 let num_rounds: u32 = 10;
1525 let num_authorities: u32 = 4;
1526 let mut blocks = Vec::new();
1527
1528 for round in 1..=num_rounds {
1529 for author in 0..num_authorities {
1530 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1531 blocks.push(block);
1532 }
1533 }
1534
1535 blocks.clone().into_iter().for_each(|block| {
1538 if block.round() <= 4 {
1539 store
1540 .write(WriteBatch::default().blocks(vec![block]))
1541 .unwrap();
1542 } else {
1543 dag_state.accept_blocks(vec![block]);
1544 }
1545 });
1546
1547 let mut block_refs = blocks
1551 .iter()
1552 .map(|block| block.reference())
1553 .collect::<Vec<_>>();
1554 let result = dag_state.contains_blocks(block_refs.clone());
1555
1556 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1558 assert_eq!(result, expected);
1559
1560 block_refs.insert(
1562 3,
1563 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1564 );
1565 let result = dag_state.contains_blocks(block_refs.clone());
1566
1567 expected.insert(3, false);
1569 assert_eq!(result, expected.clone());
1570 }
1571
1572 #[tokio::test]
1573 async fn test_contains_cached_block_at_slot() {
1574 const CACHED_ROUNDS: Round = 2;
1576
1577 let num_authorities: u32 = 4;
1578 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1579 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1580
1581 let context = Arc::new(context);
1582 let store = Arc::new(MemStore::new());
1583 let mut dag_state = DagState::new(context.clone(), store.clone());
1584
1585 let num_rounds: u32 = 10;
1587 let mut blocks = Vec::new();
1588
1589 for round in 1..=num_rounds {
1590 for author in 0..num_authorities {
1591 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1592 blocks.push(block.clone());
1593 dag_state.accept_block(block);
1594 }
1595 }
1596
1597 for (author, _) in context.committee.authorities() {
1599 assert!(
1600 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1601 "Genesis should always be found"
1602 );
1603 }
1604
1605 let mut block_refs = blocks
1609 .iter()
1610 .map(|block| block.reference())
1611 .collect::<Vec<_>>();
1612
1613 for block_ref in block_refs.clone() {
1614 let slot = block_ref.into();
1615 let found = dag_state.contains_cached_block_at_slot(slot);
1616 assert!(found, "A block should be found at slot {slot}");
1617 }
1618
1619 block_refs.insert(
1622 3,
1623 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1624 );
1625 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1626 expected.insert(3, false);
1627
1628 for block_ref in block_refs {
1630 let slot = block_ref.into();
1631 let found = dag_state.contains_cached_block_at_slot(slot);
1632
1633 assert_eq!(expected.remove(0), found);
1634 }
1635 }
1636
1637 #[tokio::test]
1638 #[should_panic(
1639 expected = "Attempted to check for slot S8[0] that is <= the last evicted round 8"
1640 )]
1641 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1642 const CACHED_ROUNDS: Round = 2;
1644
1645 let (mut context, _) = Context::new_for_test(4);
1646 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1647 context
1648 .protocol_config
1649 .set_consensus_gc_depth_for_testing(0);
1650
1651 let context = Arc::new(context);
1652 let store = Arc::new(MemStore::new());
1653 let mut dag_state = DagState::new(context.clone(), store.clone());
1654
1655 let mut blocks = Vec::new();
1657 for round in 1..=10 {
1658 let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1659 blocks.push(block.clone());
1660 dag_state.accept_block(block);
1661 }
1662
1663 dag_state.add_commit(TrustedCommit::new_for_test(
1665 1 as CommitIndex,
1666 CommitDigest::MIN,
1667 0,
1668 blocks.last().unwrap().reference(),
1669 blocks
1670 .into_iter()
1671 .map(|block| block.reference())
1672 .collect::<Vec<_>>(),
1673 ));
1674
1675 dag_state.flush();
1676
1677 let _ =
1681 dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1682 }
1683
1684 #[tokio::test]
1685 #[should_panic(
1686 expected = "Attempted to check for slot S3[1] that is <= the last gc evicted round 3"
1687 )]
1688 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1689 const GC_DEPTH: u32 = 2;
1693 const CACHED_ROUNDS: Round = 3;
1695
1696 let (mut context, _) = Context::new_for_test(4);
1697 context
1698 .protocol_config
1699 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1700 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1701
1702 let context = Arc::new(context);
1703 let store = Arc::new(MemStore::new());
1704 let mut dag_state = DagState::new(context.clone(), store.clone());
1705
1706 let mut dag_builder = DagBuilder::new(context.clone());
1709 dag_builder.layers(1..=3).build();
1710 dag_builder
1711 .layers(4..=6)
1712 .authorities(vec![AuthorityIndex::new_for_test(0)])
1713 .skip_block()
1714 .build();
1715
1716 dag_builder
1718 .all_blocks()
1719 .into_iter()
1720 .for_each(|block| dag_state.accept_block(block));
1721
1722 dag_state.add_commit(TrustedCommit::new_for_test(
1724 1 as CommitIndex,
1725 CommitDigest::MIN,
1726 0,
1727 dag_builder.leader_block(5).unwrap().reference(),
1728 vec![],
1729 ));
1730
1731 dag_state.flush();
1732
1733 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1735
1736 for authority_index in 1..=3 {
1741 for round in 4..=6 {
1742 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1743 round,
1744 AuthorityIndex::new_for_test(authority_index)
1745 )));
1746 }
1747 }
1748
1749 for round in 1..=3 {
1750 assert!(
1751 dag_state.contains_cached_block_at_slot(Slot::new(
1752 round,
1753 AuthorityIndex::new_for_test(0)
1754 ))
1755 );
1756 }
1757
1758 let _ =
1761 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1762 }
1763
1764 #[tokio::test]
1765 async fn test_get_blocks_in_cache_or_store() {
1766 let (context, _) = Context::new_for_test(4);
1767 let context = Arc::new(context);
1768 let store = Arc::new(MemStore::new());
1769 let mut dag_state = DagState::new(context.clone(), store.clone());
1770
1771 let num_rounds: u32 = 10;
1773 let num_authorities: u32 = 4;
1774 let mut blocks = Vec::new();
1775
1776 for round in 1..=num_rounds {
1777 for author in 0..num_authorities {
1778 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1779 blocks.push(block);
1780 }
1781 }
1782
1783 blocks.clone().into_iter().for_each(|block| {
1786 if block.round() <= 4 {
1787 store
1788 .write(WriteBatch::default().blocks(vec![block]))
1789 .unwrap();
1790 } else {
1791 dag_state.accept_blocks(vec![block]);
1792 }
1793 });
1794
1795 let mut block_refs = blocks
1799 .iter()
1800 .map(|block| block.reference())
1801 .collect::<Vec<_>>();
1802 let result = dag_state.get_blocks(&block_refs);
1803
1804 let mut expected = blocks
1805 .into_iter()
1806 .map(Some)
1807 .collect::<Vec<Option<VerifiedBlock>>>();
1808
1809 assert_eq!(result, expected.clone());
1811
1812 block_refs.insert(
1814 3,
1815 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1816 );
1817 let result = dag_state.get_blocks(&block_refs);
1818
1819 expected.insert(3, None);
1821 assert_eq!(result, expected);
1822 }
1823
1824 #[rstest]
1826 #[tokio::test]
1827 async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1828 telemetry_subscribers::init_for_testing();
1829 let num_authorities: u32 = 4;
1830 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1831 context
1832 .protocol_config
1833 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1834
1835 if gc_depth > 0 {
1836 context
1837 .protocol_config
1838 .set_consensus_gc_depth_for_testing(gc_depth);
1839 }
1840
1841 let context = Arc::new(context);
1842 let store = Arc::new(MemStore::new());
1843 let mut dag_state = DagState::new(context.clone(), store.clone());
1844
1845 let num_rounds: u32 = 10;
1847 let mut dag_builder = DagBuilder::new(context.clone());
1848 dag_builder.layers(1..=num_rounds).build();
1849 let mut commits = vec![];
1850
1851 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1852 commits.push(commit);
1853 }
1854
1855 let temp_commits = commits.split_off(5);
1857 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1858 for commit in commits.clone() {
1859 dag_state.add_commit(commit);
1860 }
1861
1862 dag_state.flush();
1864
1865 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1867 for commit in temp_commits.clone() {
1868 dag_state.add_commit(commit);
1869 }
1870
1871 let all_blocks = dag_builder.blocks(6..=num_rounds);
1873 let block_refs = all_blocks
1874 .iter()
1875 .map(|block| block.reference())
1876 .collect::<Vec<_>>();
1877
1878 let result = dag_state
1879 .get_blocks(&block_refs)
1880 .into_iter()
1881 .map(|b| b.unwrap())
1882 .collect::<Vec<_>>();
1883 assert_eq!(result, all_blocks);
1884
1885 assert_eq!(dag_state.last_commit_index(), 10);
1887 assert_eq!(
1888 dag_state.last_committed_rounds(),
1889 dag_builder.last_committed_rounds.clone()
1890 );
1891
1892 drop(dag_state);
1894
1895 let dag_state = DagState::new(context.clone(), store.clone());
1897
1898 let blocks = dag_builder.blocks(1..=5);
1900 let block_refs = blocks
1901 .iter()
1902 .map(|block| block.reference())
1903 .collect::<Vec<_>>();
1904 let result = dag_state
1905 .get_blocks(&block_refs)
1906 .into_iter()
1907 .map(|b| b.unwrap())
1908 .collect::<Vec<_>>();
1909 assert_eq!(result, blocks);
1910
1911 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1913 let block_refs = missing_blocks
1914 .iter()
1915 .map(|block| block.reference())
1916 .collect::<Vec<_>>();
1917 let retrieved_blocks = dag_state
1918 .get_blocks(&block_refs)
1919 .into_iter()
1920 .flatten()
1921 .collect::<Vec<_>>();
1922 assert!(retrieved_blocks.is_empty());
1923
1924 assert_eq!(dag_state.last_commit_index(), 5);
1926
1927 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1929 assert_eq!(
1930 dag_state.last_committed_rounds(),
1931 expected_last_committed_rounds
1932 );
1933
1934 assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1937 }
1938
1939 #[tokio::test]
1940 async fn test_flush_and_recovery() {
1941 telemetry_subscribers::init_for_testing();
1942 let num_authorities: u32 = 4;
1943 let (context, _) = Context::new_for_test(num_authorities as usize);
1944 let context = Arc::new(context);
1945 let store = Arc::new(MemStore::new());
1946 let mut dag_state = DagState::new(context.clone(), store.clone());
1947
1948 let num_rounds: u32 = 10;
1950 let mut dag_builder = DagBuilder::new(context.clone());
1951 dag_builder.layers(1..=num_rounds).build();
1952 let mut commits = vec![];
1953 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1954 commits.push(commit);
1955 }
1956
1957 let temp_commits = commits.split_off(5);
1959 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1960 for commit in commits.clone() {
1961 dag_state.add_commit(commit);
1962 }
1963
1964 dag_state.flush();
1966
1967 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1969 for commit in temp_commits.clone() {
1970 dag_state.add_commit(commit);
1971 }
1972
1973 let all_blocks = dag_builder.blocks(6..=num_rounds);
1975 let block_refs = all_blocks
1976 .iter()
1977 .map(|block| block.reference())
1978 .collect::<Vec<_>>();
1979 let result = dag_state
1980 .get_blocks(&block_refs)
1981 .into_iter()
1982 .map(|b| b.unwrap())
1983 .collect::<Vec<_>>();
1984 assert_eq!(result, all_blocks);
1985
1986 assert_eq!(dag_state.last_commit_index(), 10);
1988 assert_eq!(
1989 dag_state.last_committed_rounds(),
1990 dag_builder.last_committed_rounds.clone()
1991 );
1992
1993 drop(dag_state);
1995
1996 let dag_state = DagState::new(context.clone(), store.clone());
1998
1999 let blocks = dag_builder.blocks(1..=5);
2001 let block_refs = blocks
2002 .iter()
2003 .map(|block| block.reference())
2004 .collect::<Vec<_>>();
2005 let result = dag_state
2006 .get_blocks(&block_refs)
2007 .into_iter()
2008 .map(|b| b.unwrap())
2009 .collect::<Vec<_>>();
2010 assert_eq!(result, blocks);
2011
2012 let missing_blocks = dag_builder.blocks(6..=num_rounds);
2014 let block_refs = missing_blocks
2015 .iter()
2016 .map(|block| block.reference())
2017 .collect::<Vec<_>>();
2018 let retrieved_blocks = dag_state
2019 .get_blocks(&block_refs)
2020 .into_iter()
2021 .flatten()
2022 .collect::<Vec<_>>();
2023 assert!(retrieved_blocks.is_empty());
2024
2025 assert_eq!(dag_state.last_commit_index(), 5);
2027
2028 let expected_last_committed_rounds = vec![4, 5, 4, 4];
2030 assert_eq!(
2031 dag_state.last_committed_rounds(),
2032 expected_last_committed_rounds
2033 );
2034 assert_eq!(dag_state.scoring_subdags_count(), 5);
2037 }
2038
2039 #[tokio::test]
2040 async fn test_flush_and_recovery_gc_enabled() {
2041 telemetry_subscribers::init_for_testing();
2042
2043 const GC_DEPTH: u32 = 3;
2044 const CACHED_ROUNDS: u32 = 4;
2045
2046 let num_authorities: u32 = 4;
2047 let (mut context, _) = Context::new_for_test(num_authorities as usize);
2048 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2049 context
2050 .protocol_config
2051 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2052 context
2053 .protocol_config
2054 .set_consensus_linearize_subdag_v2_for_testing(true);
2055
2056 let context = Arc::new(context);
2057
2058 let store = Arc::new(MemStore::new());
2059 let mut dag_state = DagState::new(context.clone(), store.clone());
2060
2061 let num_rounds: u32 = 10;
2062 let mut dag_builder = DagBuilder::new(context.clone());
2063 dag_builder.layers(1..=5).build();
2064 dag_builder
2065 .layers(6..=8)
2066 .authorities(vec![AuthorityIndex::new_for_test(0)])
2067 .skip_block()
2068 .build();
2069 dag_builder.layers(9..=num_rounds).build();
2070
2071 let mut commits = dag_builder
2072 .get_sub_dag_and_commits(1..=num_rounds)
2073 .into_iter()
2074 .map(|(_subdag, commit)| commit)
2075 .collect::<Vec<_>>();
2076
2077 let temp_commits = commits.split_off(7);
2081 dag_state.accept_blocks(dag_builder.blocks(1..=8));
2082 for commit in commits.clone() {
2083 dag_state.add_commit(commit);
2084 }
2085
2086 let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2089 for commit in commits.iter() {
2090 all_committed_blocks.extend(commit.blocks());
2091 }
2092 dag_state.flush();
2094
2095 dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2097 for commit in temp_commits.clone() {
2098 dag_state.add_commit(commit);
2099 }
2100
2101 let all_blocks = dag_builder.blocks(1..=num_rounds);
2103 let block_refs = all_blocks
2104 .iter()
2105 .map(|block| block.reference())
2106 .collect::<Vec<_>>();
2107 let result = dag_state
2108 .get_blocks(&block_refs)
2109 .into_iter()
2110 .map(|b| b.unwrap())
2111 .collect::<Vec<_>>();
2112 assert_eq!(result, all_blocks);
2113
2114 assert_eq!(dag_state.last_commit_index(), 9);
2116 assert_eq!(
2117 dag_state.last_committed_rounds(),
2118 dag_builder.last_committed_rounds.clone()
2119 );
2120
2121 drop(dag_state);
2123
2124 let dag_state = DagState::new(context.clone(), store.clone());
2126
2127 let blocks = dag_builder.blocks(1..=5);
2129 let block_refs = blocks
2130 .iter()
2131 .map(|block| block.reference())
2132 .collect::<Vec<_>>();
2133 let result = dag_state
2134 .get_blocks(&block_refs)
2135 .into_iter()
2136 .map(|b| b.unwrap())
2137 .collect::<Vec<_>>();
2138 assert_eq!(result, blocks);
2139
2140 let missing_blocks = dag_builder.blocks(9..=num_rounds);
2142 let block_refs = missing_blocks
2143 .iter()
2144 .map(|block| block.reference())
2145 .collect::<Vec<_>>();
2146 let retrieved_blocks = dag_state
2147 .get_blocks(&block_refs)
2148 .into_iter()
2149 .flatten()
2150 .collect::<Vec<_>>();
2151 assert!(retrieved_blocks.is_empty());
2152
2153 assert_eq!(dag_state.last_commit_index(), 7);
2155
2156 let expected_last_committed_rounds = vec![5, 6, 6, 7];
2158 assert_eq!(
2159 dag_state.last_committed_rounds(),
2160 expected_last_committed_rounds
2161 );
2162 assert_eq!(dag_state.scoring_subdags_count(), 7);
2165 for (authority_index, _) in context.committee.authorities() {
2167 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2168
2169 if authority_index == AuthorityIndex::new_for_test(0) {
2174 assert_eq!(blocks.len(), 4);
2175 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2176 assert!(
2177 blocks
2178 .into_iter()
2179 .all(|block| block.round() >= 2 && block.round() <= 5)
2180 );
2181 } else {
2182 assert_eq!(blocks.len(), 4);
2183 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2184 assert!(
2185 blocks
2186 .into_iter()
2187 .all(|block| block.round() >= 5 && block.round() <= 8)
2188 );
2189 }
2190 }
2191 let gc_round = dag_state.gc_round();
2194 assert_eq!(gc_round, 4);
2195 dag_state
2196 .recent_blocks
2197 .iter()
2198 .for_each(|(block_ref, block_info)| {
2199 if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2200 assert!(
2201 block_info.committed,
2202 "Block {block_ref:?} should be committed"
2203 );
2204 };
2205 });
2206 }
2207
2208 #[tokio::test]
2209 async fn test_block_info_as_committed() {
2210 let num_authorities: u32 = 4;
2211 let (context, _) = Context::new_for_test(num_authorities as usize);
2212 let context = Arc::new(context);
2213
2214 let store = Arc::new(MemStore::new());
2215 let mut dag_state = DagState::new(context.clone(), store.clone());
2216
2217 let block = VerifiedBlock::new_for_test(
2219 TestBlock::new(1, 0)
2220 .set_timestamp_ms(1000)
2221 .set_ancestors(vec![])
2222 .build(),
2223 );
2224
2225 dag_state.accept_block(block.clone());
2226
2227 assert!(!dag_state.is_committed(&block.reference()));
2229
2230 assert!(
2232 dag_state.set_committed(&block.reference()),
2233 "Block should be successfully set as committed for first time"
2234 );
2235
2236 assert!(dag_state.is_committed(&block.reference()));
2238
2239 assert!(
2241 !dag_state.set_committed(&block.reference()),
2242 "Block should not be successfully set as committed"
2243 );
2244 }
2245
2246 #[tokio::test]
2247 async fn test_get_cached_blocks() {
2248 let (mut context, _) = Context::new_for_test(4);
2249 context.parameters.dag_state_cached_rounds = 5;
2250
2251 let context = Arc::new(context);
2252 let store = Arc::new(MemStore::new());
2253 let mut dag_state = DagState::new(context.clone(), store.clone());
2254
2255 let mut all_blocks = Vec::new();
2260 for author in 1..=3 {
2261 for round in 10..(10 + author) {
2262 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2263 all_blocks.push(block.clone());
2264 dag_state.accept_block(block);
2265 }
2266 }
2267
2268 let cached_blocks =
2269 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2270 assert!(cached_blocks.is_empty());
2271
2272 let cached_blocks =
2273 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2274 assert_eq!(cached_blocks.len(), 1);
2275 assert_eq!(cached_blocks[0].round(), 10);
2276
2277 let cached_blocks =
2278 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2279 assert_eq!(cached_blocks.len(), 2);
2280 assert_eq!(cached_blocks[0].round(), 10);
2281 assert_eq!(cached_blocks[1].round(), 11);
2282
2283 let cached_blocks =
2284 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2285 assert_eq!(cached_blocks.len(), 1);
2286 assert_eq!(cached_blocks[0].round(), 11);
2287
2288 let cached_blocks =
2289 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2290 assert_eq!(cached_blocks.len(), 3);
2291 assert_eq!(cached_blocks[0].round(), 10);
2292 assert_eq!(cached_blocks[1].round(), 11);
2293 assert_eq!(cached_blocks[2].round(), 12);
2294
2295 let cached_blocks =
2296 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2297 assert_eq!(cached_blocks.len(), 1);
2298 assert_eq!(cached_blocks[0].round(), 12);
2299
2300 let cached_blocks = dag_state.get_cached_blocks_in_range(
2304 context.committee.to_authority_index(3).unwrap(),
2305 10,
2306 10,
2307 1,
2308 );
2309 assert!(cached_blocks.is_empty());
2310
2311 let cached_blocks = dag_state.get_cached_blocks_in_range(
2313 context.committee.to_authority_index(3).unwrap(),
2314 11,
2315 10,
2316 1,
2317 );
2318 assert!(cached_blocks.is_empty());
2319
2320 let cached_blocks = dag_state.get_cached_blocks_in_range(
2322 context.committee.to_authority_index(0).unwrap(),
2323 9,
2324 10,
2325 1,
2326 );
2327 assert!(cached_blocks.is_empty());
2328
2329 let cached_blocks = dag_state.get_cached_blocks_in_range(
2331 context.committee.to_authority_index(1).unwrap(),
2332 9,
2333 11,
2334 1,
2335 );
2336 assert_eq!(cached_blocks.len(), 1);
2337 assert_eq!(cached_blocks[0].round(), 10);
2338
2339 let cached_blocks = dag_state.get_cached_blocks_in_range(
2341 context.committee.to_authority_index(2).unwrap(),
2342 9,
2343 12,
2344 5,
2345 );
2346 assert_eq!(cached_blocks.len(), 2);
2347 assert_eq!(cached_blocks[0].round(), 10);
2348 assert_eq!(cached_blocks[1].round(), 11);
2349
2350 let cached_blocks = dag_state.get_cached_blocks_in_range(
2352 context.committee.to_authority_index(3).unwrap(),
2353 11,
2354 20,
2355 5,
2356 );
2357 assert_eq!(cached_blocks.len(), 2);
2358 assert_eq!(cached_blocks[0].round(), 11);
2359 assert_eq!(cached_blocks[1].round(), 12);
2360
2361 let cached_blocks = dag_state.get_cached_blocks_in_range(
2363 context.committee.to_authority_index(3).unwrap(),
2364 10,
2365 20,
2366 1,
2367 );
2368 assert_eq!(cached_blocks.len(), 1);
2369 assert_eq!(cached_blocks[0].round(), 10);
2370 }
2371
2372 #[rstest]
2373 #[tokio::test]
2374 async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2375 const CACHED_ROUNDS: Round = 2;
2377 let (mut context, _) = Context::new_for_test(4);
2378 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2379
2380 if gc_depth > 0 {
2381 context
2382 .protocol_config
2383 .set_consensus_gc_depth_for_testing(gc_depth);
2384 }
2385
2386 let context = Arc::new(context);
2387 let store = Arc::new(MemStore::new());
2388 let mut dag_state = DagState::new(context.clone(), store.clone());
2389
2390 let dag_str = "DAG {
2395 Round 0 : { 4 },
2396 Round 1 : {
2397 B -> [*],
2398 C -> [*],
2399 D -> [*],
2400 },
2401 Round 2 : {
2402 C -> [*],
2403 D -> [*],
2404 },
2405 Round 3 : {
2406 D -> [*],
2407 },
2408 }";
2409
2410 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2411
2412 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2414
2415 for block in dag_builder
2417 .all_blocks()
2418 .into_iter()
2419 .chain(std::iter::once(block))
2420 {
2421 dag_state.accept_block(block);
2422 }
2423
2424 dag_state.add_commit(TrustedCommit::new_for_test(
2425 1 as CommitIndex,
2426 CommitDigest::MIN,
2427 context.clock.timestamp_utc_ms(),
2428 dag_builder.leader_block(3).unwrap().reference(),
2429 vec![],
2430 ));
2431
2432 let end_round = 4;
2434 let expected_rounds = vec![0, 1, 2, 3];
2435 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2436 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2438 assert_eq!(
2439 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2440 expected_rounds
2441 );
2442 assert_eq!(
2443 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2444 expected_excluded_and_equivocating_blocks
2445 );
2446
2447 for (i, expected_round) in expected_rounds.iter().enumerate() {
2449 let round = dag_state
2450 .get_last_cached_block_in_range(
2451 context.committee.to_authority_index(i).unwrap(),
2452 0,
2453 end_round,
2454 )
2455 .map(|b| b.round())
2456 .unwrap_or_default();
2457 assert_eq!(round, *expected_round, "Authority {i}");
2458 }
2459
2460 let start_round = 2;
2462 let expected_rounds = [0, 0, 2, 3];
2463
2464 for (i, expected_round) in expected_rounds.iter().enumerate() {
2466 let round = dag_state
2467 .get_last_cached_block_in_range(
2468 context.committee.to_authority_index(i).unwrap(),
2469 start_round,
2470 end_round,
2471 )
2472 .map(|b| b.round())
2473 .unwrap_or_default();
2474 assert_eq!(round, *expected_round, "Authority {i}");
2475 }
2476
2477 dag_state.flush();
2486
2487 let end_round = 3;
2489 let expected_rounds = vec![0, 1, 2, 2];
2490
2491 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2493 assert_eq!(
2494 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2495 expected_rounds
2496 );
2497
2498 for (i, expected_round) in expected_rounds.iter().enumerate() {
2500 let round = dag_state
2501 .get_last_cached_block_in_range(
2502 context.committee.to_authority_index(i).unwrap(),
2503 0,
2504 end_round,
2505 )
2506 .map(|b| b.round())
2507 .unwrap_or_default();
2508 assert_eq!(round, *expected_round, "Authority {i}");
2509 }
2510 }
2511
2512 #[tokio::test]
2513 #[should_panic(
2514 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2515 )]
2516 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2517 const CACHED_ROUNDS: Round = 1;
2519 let (mut context, _) = Context::new_for_test(4);
2520 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2521 context
2522 .protocol_config
2523 .set_consensus_gc_depth_for_testing(0);
2524
2525 let context = Arc::new(context);
2526 let store = Arc::new(MemStore::new());
2527 let mut dag_state = DagState::new(context.clone(), store.clone());
2528
2529 let mut all_blocks = Vec::new();
2534 for author in 1..=3 {
2535 for round in 1..=author {
2536 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2537 all_blocks.push(block.clone());
2538 dag_state.accept_block(block);
2539 }
2540 }
2541
2542 dag_state.add_commit(TrustedCommit::new_for_test(
2543 1 as CommitIndex,
2544 CommitDigest::MIN,
2545 0,
2546 all_blocks.last().unwrap().reference(),
2547 all_blocks
2548 .into_iter()
2549 .map(|block| block.reference())
2550 .collect::<Vec<_>>(),
2551 ));
2552
2553 dag_state.flush();
2556
2557 let end_round = 2;
2560 dag_state.get_last_cached_block_per_authority(end_round);
2561 }
2562
2563 #[tokio::test]
2564 #[should_panic(
2565 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2566 )]
2567 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
2568 const CACHED_ROUNDS: Round = 1;
2570 const GC_DEPTH: u32 = 1;
2571 let (mut context, _) = Context::new_for_test(4);
2572 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2573 context
2574 .protocol_config
2575 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2576
2577 let context = Arc::new(context);
2578 let store = Arc::new(MemStore::new());
2579 let mut dag_state = DagState::new(context.clone(), store.clone());
2580
2581 let mut dag_builder = DagBuilder::new(context.clone());
2586 dag_builder
2587 .layers(1..=1)
2588 .authorities(vec![AuthorityIndex::new_for_test(0)])
2589 .skip_block()
2590 .build();
2591 dag_builder
2592 .layers(2..=2)
2593 .authorities(vec![
2594 AuthorityIndex::new_for_test(0),
2595 AuthorityIndex::new_for_test(1),
2596 ])
2597 .skip_block()
2598 .build();
2599 dag_builder
2600 .layers(3..=3)
2601 .authorities(vec![
2602 AuthorityIndex::new_for_test(0),
2603 AuthorityIndex::new_for_test(1),
2604 AuthorityIndex::new_for_test(2),
2605 ])
2606 .skip_block()
2607 .build();
2608
2609 for block in dag_builder.all_blocks() {
2611 dag_state.accept_block(block);
2612 }
2613
2614 dag_state.add_commit(TrustedCommit::new_for_test(
2615 1 as CommitIndex,
2616 CommitDigest::MIN,
2617 0,
2618 dag_builder.leader_block(3).unwrap().reference(),
2619 vec![],
2620 ));
2621
2622 dag_state.flush();
2624
2625 dag_state.get_last_cached_block_per_authority(2);
2628 }
2629
2630 #[tokio::test]
2631 async fn test_last_quorum() {
2632 let (context, _) = Context::new_for_test(4);
2634 let context = Arc::new(context);
2635 let store = Arc::new(MemStore::new());
2636 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2637
2638 {
2640 let genesis = genesis_blocks(context.clone());
2641
2642 assert_eq!(dag_state.read().last_quorum(), genesis);
2643 }
2644
2645 {
2648 let mut dag_builder = DagBuilder::new(context.clone());
2649 dag_builder
2650 .layers(1..=4)
2651 .build()
2652 .persist_layers(dag_state.clone());
2653 let round_4_blocks: Vec<_> = dag_builder
2654 .blocks(4..=4)
2655 .into_iter()
2656 .map(|block| block.reference())
2657 .collect();
2658
2659 let last_quorum = dag_state.read().last_quorum();
2660
2661 assert_eq!(
2662 last_quorum
2663 .into_iter()
2664 .map(|block| block.reference())
2665 .collect::<Vec<_>>(),
2666 round_4_blocks
2667 );
2668 }
2669
2670 {
2673 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2674 dag_state.write().accept_block(block);
2675
2676 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2677
2678 let last_quorum = dag_state.read().last_quorum();
2679
2680 assert_eq!(last_quorum, round_4_blocks);
2681 }
2682 }
2683
2684 #[tokio::test]
2685 async fn test_last_block_for_authority() {
2686 let (context, _) = Context::new_for_test(4);
2688 let context = Arc::new(context);
2689 let store = Arc::new(MemStore::new());
2690 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2691
2692 {
2694 let genesis = genesis_blocks(context.clone());
2695 let my_genesis = genesis
2696 .into_iter()
2697 .find(|block| block.author() == context.own_index)
2698 .unwrap();
2699
2700 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2701 }
2702
2703 {
2706 let mut dag_builder = DagBuilder::new(context.clone());
2708 dag_builder
2709 .layers(1..=4)
2710 .build()
2711 .persist_layers(dag_state.clone());
2712
2713 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2715 dag_state.write().accept_block(block);
2716
2717 let block = dag_state
2718 .read()
2719 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2720 assert_eq!(block.round(), 5);
2721
2722 for (authority_index, _) in context.committee.authorities() {
2723 let block = dag_state
2724 .read()
2725 .get_last_block_for_authority(authority_index);
2726
2727 if authority_index.value() == 0 {
2728 assert_eq!(block.round(), 5);
2729 } else {
2730 assert_eq!(block.round(), 4);
2731 }
2732 }
2733 }
2734 }
2735}