1use std::{
6 cmp::max,
7 collections::{BTreeMap, BTreeSet, VecDeque},
8 ops::Bound::{Excluded, Included, Unbounded},
9 panic,
10 sync::Arc,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info};
18
19use crate::{
20 CommittedSubDag,
21 block::{
22 BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23 VerifiedBlock, genesis_blocks,
24 },
25 commit::{
26 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28 },
29 context::Context,
30 leader_scoring::{ReputationScores, ScoringSubdag},
31 storage::{Store, WriteBatch},
32 threshold_clock::ThresholdClock,
33};
34
35pub(crate) struct DagState {
44 context: Arc<Context>,
45
46 genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65 threshold_clock: ThresholdClock,
67
68 evicted_rounds: Vec<Round>,
73
74 highest_accepted_round: Round,
76
77 last_commit: Option<TrustedCommit>,
79
80 last_commit_round_advancement_time: Option<std::time::Instant>,
82
83 last_committed_rounds: Vec<Round>,
85
86 scoring_subdag: ScoringSubdag,
89 unscored_committed_subdags: Vec<CommittedSubDag>,
94
95 pending_commit_votes: VecDeque<CommitVote>,
98
99 blocks_to_write: Vec<VerifiedBlock>,
101 commits_to_write: Vec<TrustedCommit>,
102
103 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108 store: Arc<dyn Store>,
110
111 cached_rounds: Round,
113}
114
115impl DagState {
116 pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119 let num_authorities = context.committee.size();
120
121 let genesis = genesis_blocks(context.clone())
122 .into_iter()
123 .map(|block| (block.reference(), block))
124 .collect();
125
126 let threshold_clock = ThresholdClock::new(1, context.clone());
127
128 let last_commit = store
129 .read_last_commit()
130 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
131
132 let commit_info = store
133 .read_last_commit_info()
134 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
135 let (mut last_committed_rounds, commit_recovery_start_index) =
136 if let Some((commit_ref, commit_info)) = commit_info {
137 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138 (commit_info.committed_rounds, commit_ref.index + 1)
139 } else {
140 tracing::info!("Found no stored CommitInfo to recover from");
141 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142 };
143
144 let mut unscored_committed_subdags = Vec::new();
145 let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147 if let Some(last_commit) = last_commit.as_ref() {
148 store
149 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
151 .iter()
152 .for_each(|commit| {
153 for block_ref in commit.blocks() {
154 last_committed_rounds[block_ref.author] =
155 max(last_committed_rounds[block_ref.author], block_ref.round);
156 }
157
158 let committed_subdag =
159 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); unscored_committed_subdags.push(committed_subdag);
161 });
162 }
163
164 tracing::info!(
165 "DagState was initialized with the following state: \
166 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167 unscored_committed_subdags.len()
168 );
169
170 if context
171 .protocol_config
172 .consensus_distributed_vote_scoring_strategy()
173 {
174 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175 }
176
177 let mut state = Self {
178 context,
179 genesis,
180 recent_blocks: BTreeMap::new(),
181 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182 threshold_clock,
183 highest_accepted_round: 0,
184 last_commit: last_commit.clone(),
185 last_commit_round_advancement_time: None,
186 last_committed_rounds: last_committed_rounds.clone(),
187 pending_commit_votes: VecDeque::new(),
188 blocks_to_write: vec![],
189 commits_to_write: vec![],
190 commit_info_to_write: vec![],
191 scoring_subdag,
192 unscored_committed_subdags,
193 store: store.clone(),
194 cached_rounds,
195 evicted_rounds: vec![0; num_authorities],
196 };
197
198 for (i, round) in last_committed_rounds.into_iter().enumerate() {
199 let authority_index = state.context.committee.to_authority_index(i).unwrap();
200 let (blocks, eviction_round) = if state.gc_enabled() {
201 let last_block = state
205 .store
206 .scan_last_blocks_by_author(authority_index, 1, None)
207 .expect("Database error");
208 let last_block_round = last_block
209 .last()
210 .map(|b| b.round())
211 .unwrap_or(GENESIS_ROUND);
212
213 let eviction_round = Self::gc_eviction_round(
214 last_block_round,
215 state.gc_round(),
216 state.cached_rounds,
217 );
218 let blocks = state
219 .store
220 .scan_blocks_by_author(authority_index, eviction_round + 1)
221 .expect("Database error");
222 (blocks, eviction_round)
223 } else {
224 let eviction_round = Self::eviction_round(round, cached_rounds);
225 let blocks = state
226 .store
227 .scan_blocks_by_author(authority_index, eviction_round + 1)
228 .expect("Database error");
229 (blocks, eviction_round)
230 };
231
232 state.evicted_rounds[authority_index] = eviction_round;
233
234 for block in &blocks {
236 state.update_block_metadata(block);
237 }
238
239 info!(
240 "Recovered blocks {}: {:?}",
241 authority_index,
242 blocks
243 .iter()
244 .map(|b| b.reference())
245 .collect::<Vec<BlockRef>>()
246 );
247 }
248
249 if state.gc_enabled() {
250 if let Some(last_commit) = last_commit {
251 let mut index = last_commit.index();
252 let gc_round = state.gc_round();
253 info!(
254 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
255 index, gc_round
256 );
257
258 loop {
259 let commits = store
260 .scan_commits((index..=index).into())
261 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
262 let Some(commit) = commits.first() else {
263 info!(
264 "Recovering finished up to index {index}, no more commits to recover"
265 );
266 break;
267 };
268
269 if gc_round > 0 && commit.leader().round <= gc_round {
272 info!(
273 "Recovering finished, reached commit leader round {} <= gc_round {}",
274 commit.leader().round,
275 gc_round
276 );
277 break;
278 }
279
280 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
281 debug!(
282 "Setting block {:?} as committed based on commit {:?}",
283 block_ref,
284 commit.index()
285 );
286 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
287 });
288
289 index = index.saturating_sub(1);
291 if index == 0 {
292 break;
293 }
294 }
295 }
296 }
297
298 state
299 }
300
301 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
303 assert_ne!(
304 block.round(),
305 0,
306 "Genesis block should not be accepted into DAG."
307 );
308
309 let block_ref = block.reference();
310 if self.contains_block(&block_ref) {
311 return;
312 }
313
314 let now = self.context.clock.timestamp_utc_ms();
315 if block.timestamp_ms() > now {
316 panic!(
317 "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
318 block,
319 block.timestamp_ms(),
320 now,
321 );
322 }
323
324 if block_ref.author == self.context.own_index {
327 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
328 assert!(
329 existing_blocks.is_empty(),
330 "Block Rejected! Attempted to add block {block:#?} to own slot where \
331 block(s) {existing_blocks:#?} already exists."
332 );
333 }
334 self.update_block_metadata(&block);
335 self.blocks_to_write.push(block);
336 let source = if self.context.own_index == block_ref.author {
337 "own"
338 } else {
339 "others"
340 };
341 self.context
342 .metrics
343 .node_metrics
344 .accepted_blocks
345 .with_label_values(&[source])
346 .inc();
347 }
348
349 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
351 let block_ref = block.reference();
352 self.recent_blocks
353 .insert(block_ref, BlockInfo::new(block.clone()));
354 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
355 self.threshold_clock.add_block(block_ref);
356 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
357 self.context
358 .metrics
359 .node_metrics
360 .highest_accepted_round
361 .set(self.highest_accepted_round as i64);
362
363 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
364 .last()
365 .map(|block_ref| block_ref.round)
366 .expect("There should be by now at least one block ref");
367 let hostname = &self.context.committee.authority(block_ref.author).hostname;
368 self.context
369 .metrics
370 .node_metrics
371 .highest_accepted_authority_round
372 .with_label_values(&[hostname])
373 .set(highest_accepted_round_for_author as i64);
374 }
375
376 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
378 debug!(
379 "Accepting blocks: {}",
380 blocks.iter().map(|b| b.reference().to_string()).join(",")
381 );
382 for block in blocks {
383 self.accept_block(block);
384 }
385 }
386
387 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
390 self.get_blocks(&[*reference])
391 .pop()
392 .expect("Exactly one element should be returned")
393 }
394
395 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
399 let mut blocks = vec![None; block_refs.len()];
400 let mut missing = Vec::new();
401
402 for (index, block_ref) in block_refs.iter().enumerate() {
403 if block_ref.round == GENESIS_ROUND {
404 if let Some(block) = self.genesis.get(block_ref) {
406 blocks[index] = Some(block.clone());
407 }
408 continue;
409 }
410 if let Some(block_info) = self.recent_blocks.get(block_ref) {
411 blocks[index] = Some(block_info.block.clone());
412 continue;
413 }
414 missing.push((index, block_ref));
415 }
416
417 if missing.is_empty() {
418 return blocks;
419 }
420
421 let missing_refs = missing
422 .iter()
423 .map(|(_, block_ref)| **block_ref)
424 .collect::<Vec<_>>();
425 let store_results = self
426 .store
427 .read_blocks(&missing_refs)
428 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
429 self.context
430 .metrics
431 .node_metrics
432 .dag_state_store_read_count
433 .with_label_values(&["get_blocks"])
434 .inc();
435
436 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
437 blocks[index] = result;
438 }
439
440 blocks
441 }
442
443 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
447 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
448 if !block_info.committed {
449 block_info.committed = true;
450 return true;
451 }
452 false
453 } else {
454 panic!(
455 "Block {:?} not found in cache to set as committed.",
456 block_ref
457 );
458 }
459 }
460
461 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
462 self.recent_blocks
463 .get(block_ref)
464 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
465 .committed
466 }
467
468 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
472 let mut blocks = vec![];
477 for (_block_ref, block_info) in self.recent_blocks.range((
478 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
479 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
480 )) {
481 blocks.push(block_info.block.clone())
482 }
483 blocks
484 }
485
486 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
490 if round <= self.last_commit_round() {
491 panic!("Round {} have committed blocks!", round);
492 }
493
494 let mut blocks = vec![];
495 for (_block_ref, block_info) in self.recent_blocks.range((
496 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
497 Excluded(BlockRef::new(
498 round + 1,
499 AuthorityIndex::ZERO,
500 BlockDigest::MIN,
501 )),
502 )) {
503 blocks.push(block_info.block.clone())
504 }
505 blocks
506 }
507
508 pub(crate) fn ancestors_at_round(
510 &self,
511 later_block: &VerifiedBlock,
512 earlier_round: Round,
513 ) -> Vec<VerifiedBlock> {
514 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
516 while !linked.is_empty() {
517 let round = linked.last().unwrap().round;
518 if round <= earlier_round {
520 break;
521 }
522 let block_ref = linked.pop_last().unwrap();
523 let Some(block) = self.get_block(&block_ref) else {
524 panic!("Block {:?} should exist in DAG!", block_ref);
525 };
526 linked.extend(block.ancestors().iter().cloned());
527 }
528 linked
529 .range((
530 Included(BlockRef::new(
531 earlier_round,
532 AuthorityIndex::ZERO,
533 BlockDigest::MIN,
534 )),
535 Unbounded,
536 ))
537 .map(|r| {
538 self.get_block(r)
539 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
540 .clone()
541 })
542 .collect()
543 }
544
545 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
548 self.get_last_block_for_authority(self.context.own_index)
549 }
550
551 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
555 if let Some(last) = self.recent_refs_by_authority[authority].last() {
556 return self
557 .recent_blocks
558 .get(last)
559 .expect("Block should be found in recent blocks")
560 .block
561 .clone();
562 }
563
564 let (_, genesis_block) = self
566 .genesis
567 .iter()
568 .find(|(block_ref, _)| block_ref.author == authority)
569 .expect("Genesis should be found for authority {authority_index}");
570 genesis_block.clone()
571 }
572
573 pub(crate) fn get_cached_blocks(
579 &self,
580 authority: AuthorityIndex,
581 start: Round,
582 ) -> Vec<VerifiedBlock> {
583 let mut blocks = vec![];
584 for block_ref in self.recent_refs_by_authority[authority].range((
585 Included(BlockRef::new(start, authority, BlockDigest::MIN)),
586 Unbounded,
587 )) {
588 let block_info = self
589 .recent_blocks
590 .get(block_ref)
591 .expect("Block should exist in recent blocks");
592 blocks.push(block_info.block.clone());
593 }
594 blocks
595 }
596
597 pub(crate) fn get_last_cached_block_in_range(
600 &self,
601 authority: AuthorityIndex,
602 start_round: Round,
603 end_round: Round,
604 ) -> Option<VerifiedBlock> {
605 if end_round == GENESIS_ROUND {
606 panic!(
607 "Attempted to retrieve blocks earlier than the genesis round which is impossible"
608 );
609 }
610
611 let block_ref = self.recent_refs_by_authority[authority]
612 .range((
613 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
614 Excluded(BlockRef::new(
615 end_round,
616 AuthorityIndex::MIN,
617 BlockDigest::MIN,
618 )),
619 ))
620 .last()?;
621
622 self.recent_blocks
623 .get(block_ref)
624 .map(|block_info| block_info.block.clone())
625 }
626
627 pub(crate) fn get_last_cached_block_per_authority(
636 &self,
637 end_round: Round,
638 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
639 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
641 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
642
643 if end_round == GENESIS_ROUND {
644 panic!(
645 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
646 );
647 }
648
649 if end_round == GENESIS_ROUND + 1 {
650 return blocks.into_iter().map(|b| (b, vec![])).collect();
651 }
652
653 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
654 let authority_index = self
655 .context
656 .committee
657 .to_authority_index(authority_index)
658 .unwrap();
659
660 let last_evicted_round = self.evicted_rounds[authority_index];
661 if end_round.saturating_sub(1) <= last_evicted_round {
662 panic!(
663 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
664 );
665 }
666
667 let block_ref_iter = block_refs
668 .range((
669 Included(BlockRef::new(
670 last_evicted_round + 1,
671 authority_index,
672 BlockDigest::MIN,
673 )),
674 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
675 ))
676 .rev();
677
678 let mut last_round = 0;
679 for block_ref in block_ref_iter {
680 if last_round == 0 {
681 last_round = block_ref.round;
682 let block_info = self
683 .recent_blocks
684 .get(block_ref)
685 .expect("Block should exist in recent blocks");
686 blocks[authority_index] = block_info.block.clone();
687 continue;
688 }
689 if block_ref.round < last_round {
690 break;
691 }
692 equivocating_blocks[authority_index].push(*block_ref);
693 }
694 }
695
696 blocks.into_iter().zip(equivocating_blocks).collect()
697 }
698
699 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
703 if slot.round == GENESIS_ROUND {
705 return true;
706 }
707
708 let eviction_round = self.evicted_rounds[slot.authority];
709 if slot.round <= eviction_round {
710 panic!(
711 "{}",
712 format!(
713 "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
714 if self.gc_enabled() { " gc " } else { " " }
715 )
716 );
717 }
718
719 let mut result = self.recent_refs_by_authority[slot.authority].range((
720 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
721 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
722 ));
723 result.next().is_some()
724 }
725
726 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
730 let mut exist = vec![false; block_refs.len()];
731 let mut missing = Vec::new();
732
733 for (index, block_ref) in block_refs.into_iter().enumerate() {
734 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
735 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
736 exist[index] = true;
737 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
738 {
739 exist[index] = false;
744 } else {
745 missing.push((index, block_ref));
746 }
747 }
748
749 if missing.is_empty() {
750 return exist;
751 }
752
753 let missing_refs = missing
754 .iter()
755 .map(|(_, block_ref)| *block_ref)
756 .collect::<Vec<_>>();
757 let store_results = self
758 .store
759 .contains_blocks(&missing_refs)
760 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
761 self.context
762 .metrics
763 .node_metrics
764 .dag_state_store_read_count
765 .with_label_values(&["contains_blocks"])
766 .inc();
767
768 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
769 exist[index] = result;
770 }
771
772 exist
773 }
774
775 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
776 let blocks = self.contains_blocks(vec![*block_ref]);
777 blocks.first().cloned().unwrap()
778 }
779
780 pub(crate) fn threshold_clock_round(&self) -> Round {
781 self.threshold_clock.get_round()
782 }
783
784 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
785 self.threshold_clock.get_quorum_ts()
786 }
787
788 pub(crate) fn highest_accepted_round(&self) -> Round {
789 self.highest_accepted_round
790 }
791
792 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
795 if let Some(last_commit) = &self.last_commit {
796 if commit.index() <= last_commit.index() {
797 error!(
798 "New commit index {} <= last commit index {}!",
799 commit.index(),
800 last_commit.index()
801 );
802 return;
803 }
804 assert_eq!(commit.index(), last_commit.index() + 1);
805
806 if commit.timestamp_ms() < last_commit.timestamp_ms() {
807 panic!(
808 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
809 last_commit, commit
810 );
811 }
812 } else {
813 assert_eq!(commit.index(), 1);
814 }
815
816 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
817 previous_commit.round() < commit.round()
818 } else {
819 true
820 };
821
822 self.last_commit = Some(commit.clone());
823
824 if commit_round_advanced {
825 let now = std::time::Instant::now();
826 if let Some(previous_time) = self.last_commit_round_advancement_time {
827 self.context
828 .metrics
829 .node_metrics
830 .commit_round_advancement_interval
831 .observe(now.duration_since(previous_time).as_secs_f64())
832 }
833 self.last_commit_round_advancement_time = Some(now);
834 }
835
836 for block_ref in commit.blocks().iter() {
837 self.last_committed_rounds[block_ref.author] = max(
838 self.last_committed_rounds[block_ref.author],
839 block_ref.round,
840 );
841 }
842
843 for (i, round) in self.last_committed_rounds.iter().enumerate() {
844 let index = self.context.committee.to_authority_index(i).unwrap();
845 let hostname = &self.context.committee.authority(index).hostname;
846 self.context
847 .metrics
848 .node_metrics
849 .last_committed_authority_round
850 .with_label_values(&[hostname])
851 .set((*round).into());
852 }
853
854 self.pending_commit_votes.push_back(commit.reference());
855 self.commits_to_write.push(commit);
856 }
857
858 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
859 assert!(self.unscored_committed_subdags.is_empty());
861
862 assert!(self.scoring_subdag.is_empty());
866
867 let commit_info = CommitInfo {
868 committed_rounds: self.last_committed_rounds.clone(),
869 reputation_scores,
870 };
871 let last_commit = self
872 .last_commit
873 .as_ref()
874 .expect("Last commit should already be set.");
875 self.commit_info_to_write
876 .push((last_commit.reference(), commit_info));
877 }
878
879 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
880 let mut votes = Vec::new();
881 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
882 votes.push(self.pending_commit_votes.pop_front().unwrap());
883 }
884 votes
885 }
886
887 pub(crate) fn last_commit_index(&self) -> CommitIndex {
889 match &self.last_commit {
890 Some(commit) => commit.index(),
891 None => 0,
892 }
893 }
894
895 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
897 match &self.last_commit {
898 Some(commit) => commit.digest(),
899 None => CommitDigest::MIN,
900 }
901 }
902
903 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
905 match &self.last_commit {
906 Some(commit) => commit.timestamp_ms(),
907 None => 0,
908 }
909 }
910
911 pub(crate) fn last_commit_leader(&self) -> Slot {
913 match &self.last_commit {
914 Some(commit) => commit.leader().into(),
915 None => self
916 .genesis
917 .iter()
918 .next()
919 .map(|(genesis_ref, _)| *genesis_ref)
920 .expect("Genesis blocks should always be available.")
921 .into(),
922 }
923 }
924
925 pub(crate) fn last_commit_round(&self) -> Round {
928 match &self.last_commit {
929 Some(commit) => commit.leader().round,
930 None => 0,
931 }
932 }
933
934 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
936 self.last_committed_rounds.clone()
937 }
938
939 pub(crate) fn gc_round(&self) -> Round {
946 self.calculate_gc_round(self.last_commit_round())
947 }
948
949 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
950 let gc_depth = self.context.protocol_config.gc_depth();
951 if gc_depth > 0 {
952 commit_round.saturating_sub(gc_depth)
954 } else {
955 GENESIS_ROUND
958 }
959 }
960
961 pub(crate) fn gc_enabled(&self) -> bool {
962 self.context.protocol_config.gc_depth() > 0
963 }
964
965 pub(crate) fn flush(&mut self) {
968 let _s = self
969 .context
970 .metrics
971 .node_metrics
972 .scope_processing_time
973 .with_label_values(&["DagState::flush"])
974 .start_timer();
975 let blocks = std::mem::take(&mut self.blocks_to_write);
977 let commits = std::mem::take(&mut self.commits_to_write);
978 let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
979
980 if blocks.is_empty() && commits.is_empty() {
981 return;
982 }
983 debug!(
984 "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
985 blocks.len(),
986 blocks.iter().map(|b| b.reference().to_string()).join(","),
987 commits.len(),
988 commits.iter().map(|c| c.reference().to_string()).join(","),
989 commit_info_to_write.len(),
990 commit_info_to_write
991 .iter()
992 .map(|(commit_ref, _)| commit_ref.to_string())
993 .join(","),
994 );
995 self.store
996 .write(WriteBatch::new(blocks, commits, commit_info_to_write))
997 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
998 self.context
999 .metrics
1000 .node_metrics
1001 .dag_state_store_write_count
1002 .inc();
1003
1004 for (authority_index, _) in self.context.committee.authorities() {
1007 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1008 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1009 if block_ref.round <= eviction_round {
1010 self.recent_blocks.remove(block_ref);
1011 self.recent_refs_by_authority[authority_index].pop_first();
1012 } else {
1013 break;
1014 }
1015 }
1016 self.evicted_rounds[authority_index] = eviction_round;
1017 }
1018
1019 let metrics = &self.context.metrics.node_metrics;
1020 metrics
1021 .dag_state_recent_blocks
1022 .set(self.recent_blocks.len() as i64);
1023 metrics.dag_state_recent_refs.set(
1024 self.recent_refs_by_authority
1025 .iter()
1026 .map(BTreeSet::len)
1027 .sum::<usize>() as i64,
1028 );
1029 }
1030
1031 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1032 self.store
1033 .read_last_commit_info()
1034 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1035 }
1036
1037 pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1039 self.unscored_committed_subdags.len() as u64
1040 }
1041
1042 #[cfg(test)]
1043 pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1044 self.unscored_committed_subdags.clone()
1045 }
1046
1047 pub(crate) fn add_unscored_committed_subdags(
1048 &mut self,
1049 committed_subdags: Vec<CommittedSubDag>,
1050 ) {
1051 self.unscored_committed_subdags.extend(committed_subdags);
1052 }
1053
1054 pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1055 std::mem::take(&mut self.unscored_committed_subdags)
1056 }
1057
1058 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1059 self.scoring_subdag.add_subdags(scoring_subdags);
1060 }
1061
1062 pub(crate) fn clear_scoring_subdag(&mut self) {
1063 self.scoring_subdag.clear();
1064 }
1065
1066 pub(crate) fn scoring_subdags_count(&self) -> usize {
1067 self.scoring_subdag.scored_subdags_count()
1068 }
1069
1070 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1071 self.scoring_subdag.is_empty()
1072 }
1073
1074 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1075 self.scoring_subdag.calculate_distributed_vote_scores()
1076 }
1077
1078 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1079 self.scoring_subdag
1080 .commit_range
1081 .as_ref()
1082 .expect("commit range should exist for scoring subdag")
1083 .end()
1084 }
1085
1086 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1091 if self.gc_enabled() {
1092 let last_round = self.recent_refs_by_authority[authority_index]
1093 .last()
1094 .map(|block_ref| block_ref.round)
1095 .unwrap_or(GENESIS_ROUND);
1096
1097 Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1098 } else {
1099 let commit_round = self.last_committed_rounds[authority_index];
1100 Self::eviction_round(commit_round, self.cached_rounds)
1101 }
1102 }
1103
1104 fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1107 commit_round.saturating_sub(cached_rounds)
1108 }
1109
1110 fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1115 gc_round.min(last_round.saturating_sub(cached_rounds))
1116 }
1117
1118 #[cfg(test)]
1121 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1122 for round in
1126 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1127 {
1128 if round == GENESIS_ROUND {
1129 return self.genesis_blocks();
1130 }
1131 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1132 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1133
1134 let blocks = self.get_uncommitted_blocks_at_round(round);
1137 for block in &blocks {
1138 if quorum.add(block.author(), &self.context.committee) {
1139 return blocks;
1140 }
1141 }
1142 }
1143
1144 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1145 }
1146
1147 #[cfg(test)]
1148 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1149 self.genesis.values().cloned().collect()
1150 }
1151
1152 #[cfg(test)]
1153 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1154 self.last_commit = Some(commit);
1155 }
1156}
1157
1158struct BlockInfo {
1159 block: VerifiedBlock,
1160 committed: bool,
1162}
1163
1164impl BlockInfo {
1165 fn new(block: VerifiedBlock) -> Self {
1166 Self {
1167 block,
1168 committed: false,
1169 }
1170 }
1171}
1172
1173#[cfg(test)]
1174mod test {
1175 use std::vec;
1176
1177 use parking_lot::RwLock;
1178 use rstest::rstest;
1179
1180 use super::*;
1181 use crate::{
1182 block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1183 storage::{WriteBatch, mem_store::MemStore},
1184 test_dag_builder::DagBuilder,
1185 test_dag_parser::parse_dag,
1186 };
1187
1188 #[tokio::test]
1189 async fn test_get_blocks() {
1190 let (context, _) = Context::new_for_test(4);
1191 let context = Arc::new(context);
1192 let store = Arc::new(MemStore::new());
1193 let mut dag_state = DagState::new(context.clone(), store.clone());
1194 let own_index = AuthorityIndex::new_for_test(0);
1195
1196 let num_rounds: u32 = 10;
1198 let non_existent_round: u32 = 100;
1199 let num_authorities: u32 = 3;
1200 let num_blocks_per_slot: usize = 3;
1201 let mut blocks = BTreeMap::new();
1202 for round in 1..=num_rounds {
1203 for author in 0..num_authorities {
1204 let base_ts = round as BlockTimestampMs * 1000;
1206 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1207 let block = VerifiedBlock::new_for_test(
1208 TestBlock::new(round, author)
1209 .set_timestamp_ms(timestamp)
1210 .build(),
1211 );
1212 dag_state.accept_block(block.clone());
1213 blocks.insert(block.reference(), block);
1214
1215 if AuthorityIndex::new_for_test(author) == own_index {
1217 break;
1218 }
1219 }
1220 }
1221 }
1222
1223 for (r, block) in &blocks {
1225 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1226 }
1227
1228 let last_ref = blocks.keys().last().unwrap();
1230 assert!(
1231 dag_state
1232 .get_block(&BlockRef::new(
1233 last_ref.round,
1234 last_ref.author,
1235 BlockDigest::MIN
1236 ))
1237 .is_none()
1238 );
1239
1240 for round in 1..=num_rounds {
1242 for author in 0..num_authorities {
1243 let slot = Slot::new(
1244 round,
1245 context
1246 .committee
1247 .to_authority_index(author as usize)
1248 .unwrap(),
1249 );
1250 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1251
1252 if AuthorityIndex::new_for_test(author) == own_index {
1254 assert_eq!(blocks.len(), 1);
1255 } else {
1256 assert_eq!(blocks.len(), num_blocks_per_slot);
1257 }
1258
1259 for b in blocks {
1260 assert_eq!(b.round(), round);
1261 assert_eq!(
1262 b.author(),
1263 context
1264 .committee
1265 .to_authority_index(author as usize)
1266 .unwrap()
1267 );
1268 }
1269 }
1270 }
1271
1272 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1274 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1275
1276 for round in 1..=num_rounds {
1278 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1279 assert_eq!(
1282 blocks.len(),
1283 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1284 );
1285 for b in blocks {
1286 assert_eq!(b.round(), round);
1287 }
1288 }
1289
1290 assert!(
1292 dag_state
1293 .get_uncommitted_blocks_at_round(non_existent_round)
1294 .is_empty()
1295 );
1296 }
1297
1298 #[tokio::test]
1299 async fn test_ancestors_at_uncommitted_round() {
1300 let (context, _) = Context::new_for_test(4);
1302 let context = Arc::new(context);
1303 let store = Arc::new(MemStore::new());
1304 let mut dag_state = DagState::new(context.clone(), store.clone());
1305
1306 let round_10_refs: Vec<_> = (0..4)
1310 .map(|a| {
1311 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1312 .reference()
1313 })
1314 .collect();
1315
1316 let round_11 = vec![
1318 VerifiedBlock::new_for_test(
1320 TestBlock::new(11, 0)
1321 .set_timestamp_ms(1100)
1322 .set_ancestors(round_10_refs.clone())
1323 .build(),
1324 ),
1325 VerifiedBlock::new_for_test(
1328 TestBlock::new(11, 1)
1329 .set_timestamp_ms(1110)
1330 .set_ancestors(round_10_refs.clone())
1331 .build(),
1332 ),
1333 VerifiedBlock::new_for_test(
1335 TestBlock::new(11, 1)
1336 .set_timestamp_ms(1111)
1337 .set_ancestors(round_10_refs.clone())
1338 .build(),
1339 ),
1340 VerifiedBlock::new_for_test(
1342 TestBlock::new(11, 1)
1343 .set_timestamp_ms(1112)
1344 .set_ancestors(round_10_refs.clone())
1345 .build(),
1346 ),
1347 VerifiedBlock::new_for_test(
1349 TestBlock::new(11, 2)
1350 .set_timestamp_ms(1120)
1351 .set_ancestors(round_10_refs.clone())
1352 .build(),
1353 ),
1354 VerifiedBlock::new_for_test(
1356 TestBlock::new(11, 3)
1357 .set_timestamp_ms(1130)
1358 .set_ancestors(round_10_refs.clone())
1359 .build(),
1360 ),
1361 ];
1362
1363 let ancestors_for_round_12 = vec![
1365 round_11[0].reference(),
1366 round_11[1].reference(),
1367 round_11[5].reference(),
1368 ];
1369 let round_12 = vec![
1370 VerifiedBlock::new_for_test(
1371 TestBlock::new(12, 0)
1372 .set_timestamp_ms(1200)
1373 .set_ancestors(ancestors_for_round_12.clone())
1374 .build(),
1375 ),
1376 VerifiedBlock::new_for_test(
1377 TestBlock::new(12, 2)
1378 .set_timestamp_ms(1220)
1379 .set_ancestors(ancestors_for_round_12.clone())
1380 .build(),
1381 ),
1382 VerifiedBlock::new_for_test(
1383 TestBlock::new(12, 3)
1384 .set_timestamp_ms(1230)
1385 .set_ancestors(ancestors_for_round_12.clone())
1386 .build(),
1387 ),
1388 ];
1389
1390 let ancestors_for_round_13 = vec![
1392 round_12[0].reference(),
1393 round_12[1].reference(),
1394 round_12[2].reference(),
1395 round_11[2].reference(),
1396 ];
1397 let round_13 = vec![
1398 VerifiedBlock::new_for_test(
1399 TestBlock::new(12, 1)
1400 .set_timestamp_ms(1300)
1401 .set_ancestors(ancestors_for_round_13.clone())
1402 .build(),
1403 ),
1404 VerifiedBlock::new_for_test(
1405 TestBlock::new(12, 2)
1406 .set_timestamp_ms(1320)
1407 .set_ancestors(ancestors_for_round_13.clone())
1408 .build(),
1409 ),
1410 VerifiedBlock::new_for_test(
1411 TestBlock::new(12, 3)
1412 .set_timestamp_ms(1330)
1413 .set_ancestors(ancestors_for_round_13.clone())
1414 .build(),
1415 ),
1416 ];
1417
1418 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1420 let anchor = VerifiedBlock::new_for_test(
1421 TestBlock::new(14, 1)
1422 .set_timestamp_ms(1410)
1423 .set_ancestors(ancestors_for_round_14)
1424 .build(),
1425 );
1426
1427 for b in round_11
1429 .iter()
1430 .chain(round_12.iter())
1431 .chain(round_13.iter())
1432 .chain([anchor.clone()].iter())
1433 {
1434 dag_state.accept_block(b.clone());
1435 }
1436
1437 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1439 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1440 ancestors_refs.sort();
1441 let mut expected_refs = vec![
1442 round_11[0].reference(),
1443 round_11[1].reference(),
1444 round_11[2].reference(),
1445 round_11[5].reference(),
1446 ];
1447 expected_refs.sort(); assert_eq!(
1450 ancestors_refs, expected_refs,
1451 "Expected round 11 ancestors: {:?}. Got: {:?}",
1452 expected_refs, ancestors_refs
1453 );
1454 }
1455
1456 #[tokio::test]
1457 async fn test_contains_blocks_in_cache_or_store() {
1458 const CACHED_ROUNDS: Round = 2;
1460
1461 let (mut context, _) = Context::new_for_test(4);
1462 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1463
1464 let context = Arc::new(context);
1465 let store = Arc::new(MemStore::new());
1466 let mut dag_state = DagState::new(context.clone(), store.clone());
1467
1468 let num_rounds: u32 = 10;
1470 let num_authorities: u32 = 4;
1471 let mut blocks = Vec::new();
1472
1473 for round in 1..=num_rounds {
1474 for author in 0..num_authorities {
1475 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1476 blocks.push(block);
1477 }
1478 }
1479
1480 blocks.clone().into_iter().for_each(|block| {
1483 if block.round() <= 4 {
1484 store
1485 .write(WriteBatch::default().blocks(vec![block]))
1486 .unwrap();
1487 } else {
1488 dag_state.accept_blocks(vec![block]);
1489 }
1490 });
1491
1492 let mut block_refs = blocks
1496 .iter()
1497 .map(|block| block.reference())
1498 .collect::<Vec<_>>();
1499 let result = dag_state.contains_blocks(block_refs.clone());
1500
1501 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1503 assert_eq!(result, expected);
1504
1505 block_refs.insert(
1507 3,
1508 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1509 );
1510 let result = dag_state.contains_blocks(block_refs.clone());
1511
1512 expected.insert(3, false);
1514 assert_eq!(result, expected.clone());
1515 }
1516
1517 #[tokio::test]
1518 async fn test_contains_cached_block_at_slot() {
1519 const CACHED_ROUNDS: Round = 2;
1521
1522 let num_authorities: u32 = 4;
1523 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1524 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1525
1526 let context = Arc::new(context);
1527 let store = Arc::new(MemStore::new());
1528 let mut dag_state = DagState::new(context.clone(), store.clone());
1529
1530 let num_rounds: u32 = 10;
1532 let mut blocks = Vec::new();
1533
1534 for round in 1..=num_rounds {
1535 for author in 0..num_authorities {
1536 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1537 blocks.push(block.clone());
1538 dag_state.accept_block(block);
1539 }
1540 }
1541
1542 for (author, _) in context.committee.authorities() {
1544 assert!(
1545 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1546 "Genesis should always be found"
1547 );
1548 }
1549
1550 let mut block_refs = blocks
1554 .iter()
1555 .map(|block| block.reference())
1556 .collect::<Vec<_>>();
1557
1558 for block_ref in block_refs.clone() {
1559 let slot = block_ref.into();
1560 let found = dag_state.contains_cached_block_at_slot(slot);
1561 assert!(found, "A block should be found at slot {}", slot);
1562 }
1563
1564 block_refs.insert(
1567 3,
1568 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1569 );
1570 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1571 expected.insert(3, false);
1572
1573 for block_ref in block_refs {
1575 let slot = block_ref.into();
1576 let found = dag_state.contains_cached_block_at_slot(slot);
1577
1578 assert_eq!(expected.remove(0), found);
1579 }
1580 }
1581
1582 #[tokio::test]
1583 #[should_panic(
1584 expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
1585 )]
1586 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1587 const CACHED_ROUNDS: Round = 2;
1589
1590 let (mut context, _) = Context::new_for_test(4);
1591 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1592 context
1593 .protocol_config
1594 .set_consensus_gc_depth_for_testing(0);
1595
1596 let context = Arc::new(context);
1597 let store = Arc::new(MemStore::new());
1598 let mut dag_state = DagState::new(context.clone(), store.clone());
1599
1600 let mut blocks = Vec::new();
1602 for round in 1..=10 {
1603 let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1604 blocks.push(block.clone());
1605 dag_state.accept_block(block);
1606 }
1607
1608 dag_state.add_commit(TrustedCommit::new_for_test(
1610 1 as CommitIndex,
1611 CommitDigest::MIN,
1612 0,
1613 blocks.last().unwrap().reference(),
1614 blocks
1615 .into_iter()
1616 .map(|block| block.reference())
1617 .collect::<Vec<_>>(),
1618 ));
1619
1620 dag_state.flush();
1621
1622 let _ =
1626 dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1627 }
1628
1629 #[tokio::test]
1630 #[should_panic(
1631 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1632 )]
1633 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1634 const GC_DEPTH: u32 = 2;
1638 const CACHED_ROUNDS: Round = 3;
1640
1641 let (mut context, _) = Context::new_for_test(4);
1642 context
1643 .protocol_config
1644 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1645 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1646
1647 let context = Arc::new(context);
1648 let store = Arc::new(MemStore::new());
1649 let mut dag_state = DagState::new(context.clone(), store.clone());
1650
1651 let mut dag_builder = DagBuilder::new(context.clone());
1654 dag_builder.layers(1..=3).build();
1655 dag_builder
1656 .layers(4..=6)
1657 .authorities(vec![AuthorityIndex::new_for_test(0)])
1658 .skip_block()
1659 .build();
1660
1661 dag_builder
1663 .all_blocks()
1664 .into_iter()
1665 .for_each(|block| dag_state.accept_block(block));
1666
1667 dag_state.add_commit(TrustedCommit::new_for_test(
1669 1 as CommitIndex,
1670 CommitDigest::MIN,
1671 0,
1672 dag_builder.leader_block(5).unwrap().reference(),
1673 vec![],
1674 ));
1675
1676 dag_state.flush();
1677
1678 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1680
1681 for authority_index in 1..=3 {
1686 for round in 4..=6 {
1687 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1688 round,
1689 AuthorityIndex::new_for_test(authority_index)
1690 )));
1691 }
1692 }
1693
1694 for round in 1..=3 {
1695 assert!(
1696 dag_state.contains_cached_block_at_slot(Slot::new(
1697 round,
1698 AuthorityIndex::new_for_test(0)
1699 ))
1700 );
1701 }
1702
1703 let _ =
1706 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1707 }
1708
1709 #[tokio::test]
1710 async fn test_get_blocks_in_cache_or_store() {
1711 let (context, _) = Context::new_for_test(4);
1712 let context = Arc::new(context);
1713 let store = Arc::new(MemStore::new());
1714 let mut dag_state = DagState::new(context.clone(), store.clone());
1715
1716 let num_rounds: u32 = 10;
1718 let num_authorities: u32 = 4;
1719 let mut blocks = Vec::new();
1720
1721 for round in 1..=num_rounds {
1722 for author in 0..num_authorities {
1723 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1724 blocks.push(block);
1725 }
1726 }
1727
1728 blocks.clone().into_iter().for_each(|block| {
1731 if block.round() <= 4 {
1732 store
1733 .write(WriteBatch::default().blocks(vec![block]))
1734 .unwrap();
1735 } else {
1736 dag_state.accept_blocks(vec![block]);
1737 }
1738 });
1739
1740 let mut block_refs = blocks
1744 .iter()
1745 .map(|block| block.reference())
1746 .collect::<Vec<_>>();
1747 let result = dag_state.get_blocks(&block_refs);
1748
1749 let mut expected = blocks
1750 .into_iter()
1751 .map(Some)
1752 .collect::<Vec<Option<VerifiedBlock>>>();
1753
1754 assert_eq!(result, expected.clone());
1756
1757 block_refs.insert(
1759 3,
1760 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1761 );
1762 let result = dag_state.get_blocks(&block_refs);
1763
1764 expected.insert(3, None);
1766 assert_eq!(result, expected);
1767 }
1768
1769 #[rstest]
1771 #[tokio::test]
1772 async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1773 telemetry_subscribers::init_for_testing();
1774 let num_authorities: u32 = 4;
1775 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1776 context
1777 .protocol_config
1778 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1779
1780 if gc_depth > 0 {
1781 context
1782 .protocol_config
1783 .set_consensus_gc_depth_for_testing(gc_depth);
1784 }
1785
1786 let context = Arc::new(context);
1787 let store = Arc::new(MemStore::new());
1788 let mut dag_state = DagState::new(context.clone(), store.clone());
1789
1790 let num_rounds: u32 = 10;
1792 let mut dag_builder = DagBuilder::new(context.clone());
1793 dag_builder.layers(1..=num_rounds).build();
1794 let mut commits = vec![];
1795
1796 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1797 commits.push(commit);
1798 }
1799
1800 let temp_commits = commits.split_off(5);
1802 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1803 for commit in commits.clone() {
1804 dag_state.add_commit(commit);
1805 }
1806
1807 dag_state.flush();
1809
1810 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1812 for commit in temp_commits.clone() {
1813 dag_state.add_commit(commit);
1814 }
1815
1816 let all_blocks = dag_builder.blocks(6..=num_rounds);
1818 let block_refs = all_blocks
1819 .iter()
1820 .map(|block| block.reference())
1821 .collect::<Vec<_>>();
1822
1823 let result = dag_state
1824 .get_blocks(&block_refs)
1825 .into_iter()
1826 .map(|b| b.unwrap())
1827 .collect::<Vec<_>>();
1828 assert_eq!(result, all_blocks);
1829
1830 assert_eq!(dag_state.last_commit_index(), 10);
1832 assert_eq!(
1833 dag_state.last_committed_rounds(),
1834 dag_builder.last_committed_rounds.clone()
1835 );
1836
1837 drop(dag_state);
1839
1840 let dag_state = DagState::new(context.clone(), store.clone());
1842
1843 let blocks = dag_builder.blocks(1..=5);
1845 let block_refs = blocks
1846 .iter()
1847 .map(|block| block.reference())
1848 .collect::<Vec<_>>();
1849 let result = dag_state
1850 .get_blocks(&block_refs)
1851 .into_iter()
1852 .map(|b| b.unwrap())
1853 .collect::<Vec<_>>();
1854 assert_eq!(result, blocks);
1855
1856 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1858 let block_refs = missing_blocks
1859 .iter()
1860 .map(|block| block.reference())
1861 .collect::<Vec<_>>();
1862 let retrieved_blocks = dag_state
1863 .get_blocks(&block_refs)
1864 .into_iter()
1865 .flatten()
1866 .collect::<Vec<_>>();
1867 assert!(retrieved_blocks.is_empty());
1868
1869 assert_eq!(dag_state.last_commit_index(), 5);
1871
1872 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1874 assert_eq!(
1875 dag_state.last_committed_rounds(),
1876 expected_last_committed_rounds
1877 );
1878
1879 assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1882 }
1883
1884 #[tokio::test]
1885 async fn test_flush_and_recovery() {
1886 telemetry_subscribers::init_for_testing();
1887 let num_authorities: u32 = 4;
1888 let (context, _) = Context::new_for_test(num_authorities as usize);
1889 let context = Arc::new(context);
1890 let store = Arc::new(MemStore::new());
1891 let mut dag_state = DagState::new(context.clone(), store.clone());
1892
1893 let num_rounds: u32 = 10;
1895 let mut dag_builder = DagBuilder::new(context.clone());
1896 dag_builder.layers(1..=num_rounds).build();
1897 let mut commits = vec![];
1898 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1899 commits.push(commit);
1900 }
1901
1902 let temp_commits = commits.split_off(5);
1904 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1905 for commit in commits.clone() {
1906 dag_state.add_commit(commit);
1907 }
1908
1909 dag_state.flush();
1911
1912 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1914 for commit in temp_commits.clone() {
1915 dag_state.add_commit(commit);
1916 }
1917
1918 let all_blocks = dag_builder.blocks(6..=num_rounds);
1920 let block_refs = all_blocks
1921 .iter()
1922 .map(|block| block.reference())
1923 .collect::<Vec<_>>();
1924 let result = dag_state
1925 .get_blocks(&block_refs)
1926 .into_iter()
1927 .map(|b| b.unwrap())
1928 .collect::<Vec<_>>();
1929 assert_eq!(result, all_blocks);
1930
1931 assert_eq!(dag_state.last_commit_index(), 10);
1933 assert_eq!(
1934 dag_state.last_committed_rounds(),
1935 dag_builder.last_committed_rounds.clone()
1936 );
1937
1938 drop(dag_state);
1940
1941 let dag_state = DagState::new(context.clone(), store.clone());
1943
1944 let blocks = dag_builder.blocks(1..=5);
1946 let block_refs = blocks
1947 .iter()
1948 .map(|block| block.reference())
1949 .collect::<Vec<_>>();
1950 let result = dag_state
1951 .get_blocks(&block_refs)
1952 .into_iter()
1953 .map(|b| b.unwrap())
1954 .collect::<Vec<_>>();
1955 assert_eq!(result, blocks);
1956
1957 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1959 let block_refs = missing_blocks
1960 .iter()
1961 .map(|block| block.reference())
1962 .collect::<Vec<_>>();
1963 let retrieved_blocks = dag_state
1964 .get_blocks(&block_refs)
1965 .into_iter()
1966 .flatten()
1967 .collect::<Vec<_>>();
1968 assert!(retrieved_blocks.is_empty());
1969
1970 assert_eq!(dag_state.last_commit_index(), 5);
1972
1973 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1975 assert_eq!(
1976 dag_state.last_committed_rounds(),
1977 expected_last_committed_rounds
1978 );
1979 assert_eq!(dag_state.scoring_subdags_count(), 5);
1982 }
1983
1984 #[tokio::test]
1985 async fn test_flush_and_recovery_gc_enabled() {
1986 telemetry_subscribers::init_for_testing();
1987
1988 const GC_DEPTH: u32 = 3;
1989 const CACHED_ROUNDS: u32 = 4;
1990
1991 let num_authorities: u32 = 4;
1992 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1993 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1994 context
1995 .protocol_config
1996 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1997 context
1998 .protocol_config
1999 .set_consensus_linearize_subdag_v2_for_testing(true);
2000
2001 let context = Arc::new(context);
2002
2003 let store = Arc::new(MemStore::new());
2004 let mut dag_state = DagState::new(context.clone(), store.clone());
2005
2006 let num_rounds: u32 = 10;
2007 let mut dag_builder = DagBuilder::new(context.clone());
2008 dag_builder.layers(1..=5).build();
2009 dag_builder
2010 .layers(6..=8)
2011 .authorities(vec![AuthorityIndex::new_for_test(0)])
2012 .skip_block()
2013 .build();
2014 dag_builder.layers(9..=num_rounds).build();
2015
2016 let mut commits = dag_builder
2017 .get_sub_dag_and_commits(1..=num_rounds)
2018 .into_iter()
2019 .map(|(_subdag, commit)| commit)
2020 .collect::<Vec<_>>();
2021
2022 let temp_commits = commits.split_off(7);
2026 dag_state.accept_blocks(dag_builder.blocks(1..=8));
2027 for commit in commits.clone() {
2028 dag_state.add_commit(commit);
2029 }
2030
2031 let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2034 for commit in commits.iter() {
2035 all_committed_blocks.extend(commit.blocks());
2036 }
2037 dag_state.flush();
2039
2040 dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2042 for commit in temp_commits.clone() {
2043 dag_state.add_commit(commit);
2044 }
2045
2046 let all_blocks = dag_builder.blocks(1..=num_rounds);
2048 let block_refs = all_blocks
2049 .iter()
2050 .map(|block| block.reference())
2051 .collect::<Vec<_>>();
2052 let result = dag_state
2053 .get_blocks(&block_refs)
2054 .into_iter()
2055 .map(|b| b.unwrap())
2056 .collect::<Vec<_>>();
2057 assert_eq!(result, all_blocks);
2058
2059 assert_eq!(dag_state.last_commit_index(), 9);
2061 assert_eq!(
2062 dag_state.last_committed_rounds(),
2063 dag_builder.last_committed_rounds.clone()
2064 );
2065
2066 drop(dag_state);
2068
2069 let dag_state = DagState::new(context.clone(), store.clone());
2071
2072 let blocks = dag_builder.blocks(1..=5);
2074 let block_refs = blocks
2075 .iter()
2076 .map(|block| block.reference())
2077 .collect::<Vec<_>>();
2078 let result = dag_state
2079 .get_blocks(&block_refs)
2080 .into_iter()
2081 .map(|b| b.unwrap())
2082 .collect::<Vec<_>>();
2083 assert_eq!(result, blocks);
2084
2085 let missing_blocks = dag_builder.blocks(9..=num_rounds);
2087 let block_refs = missing_blocks
2088 .iter()
2089 .map(|block| block.reference())
2090 .collect::<Vec<_>>();
2091 let retrieved_blocks = dag_state
2092 .get_blocks(&block_refs)
2093 .into_iter()
2094 .flatten()
2095 .collect::<Vec<_>>();
2096 assert!(retrieved_blocks.is_empty());
2097
2098 assert_eq!(dag_state.last_commit_index(), 7);
2100
2101 let expected_last_committed_rounds = vec![5, 6, 6, 7];
2103 assert_eq!(
2104 dag_state.last_committed_rounds(),
2105 expected_last_committed_rounds
2106 );
2107 assert_eq!(dag_state.scoring_subdags_count(), 7);
2110 for (authority_index, _) in context.committee.authorities() {
2112 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2113
2114 if authority_index == AuthorityIndex::new_for_test(0) {
2119 assert_eq!(blocks.len(), 4);
2120 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2121 assert!(
2122 blocks
2123 .into_iter()
2124 .all(|block| block.round() >= 2 && block.round() <= 5)
2125 );
2126 } else {
2127 assert_eq!(blocks.len(), 4);
2128 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2129 assert!(
2130 blocks
2131 .into_iter()
2132 .all(|block| block.round() >= 5 && block.round() <= 8)
2133 );
2134 }
2135 }
2136 let gc_round = dag_state.gc_round();
2139 assert_eq!(gc_round, 4);
2140 dag_state
2141 .recent_blocks
2142 .iter()
2143 .for_each(|(block_ref, block_info)| {
2144 if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2145 assert!(
2146 block_info.committed,
2147 "Block {:?} should be committed",
2148 block_ref
2149 );
2150 };
2151 });
2152 }
2153
2154 #[tokio::test]
2155 async fn test_block_info_as_committed() {
2156 let num_authorities: u32 = 4;
2157 let (context, _) = Context::new_for_test(num_authorities as usize);
2158 let context = Arc::new(context);
2159
2160 let store = Arc::new(MemStore::new());
2161 let mut dag_state = DagState::new(context.clone(), store.clone());
2162
2163 let block = VerifiedBlock::new_for_test(
2165 TestBlock::new(1, 0)
2166 .set_timestamp_ms(1000)
2167 .set_ancestors(vec![])
2168 .build(),
2169 );
2170
2171 dag_state.accept_block(block.clone());
2172
2173 assert!(!dag_state.is_committed(&block.reference()));
2175
2176 assert!(
2178 dag_state.set_committed(&block.reference()),
2179 "Block should be successfully set as committed for first time"
2180 );
2181
2182 assert!(dag_state.is_committed(&block.reference()));
2184
2185 assert!(
2187 !dag_state.set_committed(&block.reference()),
2188 "Block should not be successfully set as committed"
2189 );
2190 }
2191
2192 #[tokio::test]
2193 async fn test_get_cached_blocks() {
2194 let (mut context, _) = Context::new_for_test(4);
2195 context.parameters.dag_state_cached_rounds = 5;
2196
2197 let context = Arc::new(context);
2198 let store = Arc::new(MemStore::new());
2199 let mut dag_state = DagState::new(context.clone(), store.clone());
2200
2201 let mut all_blocks = Vec::new();
2206 for author in 1..=3 {
2207 for round in 10..(10 + author) {
2208 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2209 all_blocks.push(block.clone());
2210 dag_state.accept_block(block);
2211 }
2212 }
2213
2214 let cached_blocks =
2215 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2216 assert!(cached_blocks.is_empty());
2217
2218 let cached_blocks =
2219 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2220 assert_eq!(cached_blocks.len(), 1);
2221 assert_eq!(cached_blocks[0].round(), 10);
2222
2223 let cached_blocks =
2224 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2225 assert_eq!(cached_blocks.len(), 2);
2226 assert_eq!(cached_blocks[0].round(), 10);
2227 assert_eq!(cached_blocks[1].round(), 11);
2228
2229 let cached_blocks =
2230 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2231 assert_eq!(cached_blocks.len(), 1);
2232 assert_eq!(cached_blocks[0].round(), 11);
2233
2234 let cached_blocks =
2235 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2236 assert_eq!(cached_blocks.len(), 3);
2237 assert_eq!(cached_blocks[0].round(), 10);
2238 assert_eq!(cached_blocks[1].round(), 11);
2239 assert_eq!(cached_blocks[2].round(), 12);
2240
2241 let cached_blocks =
2242 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2243 assert_eq!(cached_blocks.len(), 1);
2244 assert_eq!(cached_blocks[0].round(), 12);
2245 }
2246
2247 #[rstest]
2248 #[tokio::test]
2249 async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2250 const CACHED_ROUNDS: Round = 2;
2252 let (mut context, _) = Context::new_for_test(4);
2253 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2254
2255 if gc_depth > 0 {
2256 context
2257 .protocol_config
2258 .set_consensus_gc_depth_for_testing(gc_depth);
2259 }
2260
2261 let context = Arc::new(context);
2262 let store = Arc::new(MemStore::new());
2263 let mut dag_state = DagState::new(context.clone(), store.clone());
2264
2265 let dag_str = "DAG {
2270 Round 0 : { 4 },
2271 Round 1 : {
2272 B -> [*],
2273 C -> [*],
2274 D -> [*],
2275 },
2276 Round 2 : {
2277 C -> [*],
2278 D -> [*],
2279 },
2280 Round 3 : {
2281 D -> [*],
2282 },
2283 }";
2284
2285 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2286
2287 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2289
2290 for block in dag_builder
2292 .all_blocks()
2293 .into_iter()
2294 .chain(std::iter::once(block))
2295 {
2296 dag_state.accept_block(block);
2297 }
2298
2299 dag_state.add_commit(TrustedCommit::new_for_test(
2300 1 as CommitIndex,
2301 CommitDigest::MIN,
2302 context.clock.timestamp_utc_ms(),
2303 dag_builder.leader_block(3).unwrap().reference(),
2304 vec![],
2305 ));
2306
2307 let end_round = 4;
2309 let expected_rounds = vec![0, 1, 2, 3];
2310 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2311 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2313 assert_eq!(
2314 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2315 expected_rounds
2316 );
2317 assert_eq!(
2318 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2319 expected_excluded_and_equivocating_blocks
2320 );
2321
2322 for (i, expected_round) in expected_rounds.iter().enumerate() {
2324 let round = dag_state
2325 .get_last_cached_block_in_range(
2326 context.committee.to_authority_index(i).unwrap(),
2327 0,
2328 end_round,
2329 )
2330 .map(|b| b.round())
2331 .unwrap_or_default();
2332 assert_eq!(round, *expected_round, "Authority {i}");
2333 }
2334
2335 let start_round = 2;
2337 let expected_rounds = [0, 0, 2, 3];
2338
2339 for (i, expected_round) in expected_rounds.iter().enumerate() {
2341 let round = dag_state
2342 .get_last_cached_block_in_range(
2343 context.committee.to_authority_index(i).unwrap(),
2344 start_round,
2345 end_round,
2346 )
2347 .map(|b| b.round())
2348 .unwrap_or_default();
2349 assert_eq!(round, *expected_round, "Authority {i}");
2350 }
2351
2352 dag_state.flush();
2361
2362 let end_round = 3;
2364 let expected_rounds = vec![0, 1, 2, 2];
2365
2366 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2368 assert_eq!(
2369 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2370 expected_rounds
2371 );
2372
2373 for (i, expected_round) in expected_rounds.iter().enumerate() {
2375 let round = dag_state
2376 .get_last_cached_block_in_range(
2377 context.committee.to_authority_index(i).unwrap(),
2378 0,
2379 end_round,
2380 )
2381 .map(|b| b.round())
2382 .unwrap_or_default();
2383 assert_eq!(round, *expected_round, "Authority {i}");
2384 }
2385 }
2386
2387 #[tokio::test]
2388 #[should_panic(
2389 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2390 )]
2391 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2392 const CACHED_ROUNDS: Round = 1;
2394 let (mut context, _) = Context::new_for_test(4);
2395 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2396 context
2397 .protocol_config
2398 .set_consensus_gc_depth_for_testing(0);
2399
2400 let context = Arc::new(context);
2401 let store = Arc::new(MemStore::new());
2402 let mut dag_state = DagState::new(context.clone(), store.clone());
2403
2404 let mut all_blocks = Vec::new();
2409 for author in 1..=3 {
2410 for round in 1..=author {
2411 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2412 all_blocks.push(block.clone());
2413 dag_state.accept_block(block);
2414 }
2415 }
2416
2417 dag_state.add_commit(TrustedCommit::new_for_test(
2418 1 as CommitIndex,
2419 CommitDigest::MIN,
2420 0,
2421 all_blocks.last().unwrap().reference(),
2422 all_blocks
2423 .into_iter()
2424 .map(|block| block.reference())
2425 .collect::<Vec<_>>(),
2426 ));
2427
2428 dag_state.flush();
2431
2432 let end_round = 2;
2435 dag_state.get_last_cached_block_per_authority(end_round);
2436 }
2437
2438 #[tokio::test]
2439 #[should_panic(
2440 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2441 )]
2442 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
2443 const CACHED_ROUNDS: Round = 1;
2445 const GC_DEPTH: u32 = 1;
2446 let (mut context, _) = Context::new_for_test(4);
2447 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2448 context
2449 .protocol_config
2450 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2451
2452 let context = Arc::new(context);
2453 let store = Arc::new(MemStore::new());
2454 let mut dag_state = DagState::new(context.clone(), store.clone());
2455
2456 let mut dag_builder = DagBuilder::new(context.clone());
2461 dag_builder
2462 .layers(1..=1)
2463 .authorities(vec![AuthorityIndex::new_for_test(0)])
2464 .skip_block()
2465 .build();
2466 dag_builder
2467 .layers(2..=2)
2468 .authorities(vec![
2469 AuthorityIndex::new_for_test(0),
2470 AuthorityIndex::new_for_test(1),
2471 ])
2472 .skip_block()
2473 .build();
2474 dag_builder
2475 .layers(3..=3)
2476 .authorities(vec![
2477 AuthorityIndex::new_for_test(0),
2478 AuthorityIndex::new_for_test(1),
2479 AuthorityIndex::new_for_test(2),
2480 ])
2481 .skip_block()
2482 .build();
2483
2484 for block in dag_builder.all_blocks() {
2486 dag_state.accept_block(block);
2487 }
2488
2489 dag_state.add_commit(TrustedCommit::new_for_test(
2490 1 as CommitIndex,
2491 CommitDigest::MIN,
2492 0,
2493 dag_builder.leader_block(3).unwrap().reference(),
2494 vec![],
2495 ));
2496
2497 dag_state.flush();
2499
2500 dag_state.get_last_cached_block_per_authority(2);
2503 }
2504
2505 #[tokio::test]
2506 async fn test_last_quorum() {
2507 let (context, _) = Context::new_for_test(4);
2509 let context = Arc::new(context);
2510 let store = Arc::new(MemStore::new());
2511 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2512
2513 {
2515 let genesis = genesis_blocks(context.clone());
2516
2517 assert_eq!(dag_state.read().last_quorum(), genesis);
2518 }
2519
2520 {
2523 let mut dag_builder = DagBuilder::new(context.clone());
2524 dag_builder
2525 .layers(1..=4)
2526 .build()
2527 .persist_layers(dag_state.clone());
2528 let round_4_blocks: Vec<_> = dag_builder
2529 .blocks(4..=4)
2530 .into_iter()
2531 .map(|block| block.reference())
2532 .collect();
2533
2534 let last_quorum = dag_state.read().last_quorum();
2535
2536 assert_eq!(
2537 last_quorum
2538 .into_iter()
2539 .map(|block| block.reference())
2540 .collect::<Vec<_>>(),
2541 round_4_blocks
2542 );
2543 }
2544
2545 {
2548 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2549 dag_state.write().accept_block(block);
2550
2551 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2552
2553 let last_quorum = dag_state.read().last_quorum();
2554
2555 assert_eq!(last_quorum, round_4_blocks);
2556 }
2557 }
2558
2559 #[tokio::test]
2560 async fn test_last_block_for_authority() {
2561 let (context, _) = Context::new_for_test(4);
2563 let context = Arc::new(context);
2564 let store = Arc::new(MemStore::new());
2565 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2566
2567 {
2569 let genesis = genesis_blocks(context.clone());
2570 let my_genesis = genesis
2571 .into_iter()
2572 .find(|block| block.author() == context.own_index)
2573 .unwrap();
2574
2575 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2576 }
2577
2578 {
2581 let mut dag_builder = DagBuilder::new(context.clone());
2583 dag_builder
2584 .layers(1..=4)
2585 .build()
2586 .persist_layers(dag_state.clone());
2587
2588 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2590 dag_state.write().accept_block(block);
2591
2592 let block = dag_state
2593 .read()
2594 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2595 assert_eq!(block.round(), 5);
2596
2597 for (authority_index, _) in context.committee.authorities() {
2598 let block = dag_state
2599 .read()
2600 .get_last_block_for_authority(authority_index);
2601
2602 if authority_index.value() == 0 {
2603 assert_eq!(block.round(), 5);
2604 } else {
2605 assert_eq!(block.round(), 4);
2606 }
2607 }
2608 }
2609 }
2610}