1use std::{
6 cmp::max,
7 collections::{BTreeMap, BTreeSet, VecDeque},
8 ops::Bound::{Excluded, Included, Unbounded},
9 panic,
10 sync::Arc,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info};
18
19use crate::{
20 CommittedSubDag,
21 block::{
22 BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23 VerifiedBlock, genesis_blocks,
24 },
25 commit::{
26 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28 },
29 context::Context,
30 leader_scoring::{ReputationScores, ScoringSubdag},
31 storage::{Store, WriteBatch},
32 threshold_clock::ThresholdClock,
33};
34
35pub(crate) struct DagState {
44 context: Arc<Context>,
45
46 genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65 threshold_clock: ThresholdClock,
67
68 evicted_rounds: Vec<Round>,
73
74 highest_accepted_round: Round,
76
77 last_commit: Option<TrustedCommit>,
79
80 last_commit_round_advancement_time: Option<std::time::Instant>,
82
83 last_committed_rounds: Vec<Round>,
85
86 scoring_subdag: ScoringSubdag,
89 unscored_committed_subdags: Vec<CommittedSubDag>,
94
95 pending_commit_votes: VecDeque<CommitVote>,
98
99 blocks_to_write: Vec<VerifiedBlock>,
101 commits_to_write: Vec<TrustedCommit>,
102
103 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108 store: Arc<dyn Store>,
110
111 cached_rounds: Round,
113}
114
115impl DagState {
116 pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119 let num_authorities = context.committee.size();
120
121 let genesis = genesis_blocks(context.clone())
122 .into_iter()
123 .map(|block| (block.reference(), block))
124 .collect();
125
126 let threshold_clock = ThresholdClock::new(1, context.clone());
127
128 let last_commit = store
129 .read_last_commit()
130 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132 let commit_info = store
133 .read_last_commit_info()
134 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135 let (mut last_committed_rounds, commit_recovery_start_index) =
136 if let Some((commit_ref, commit_info)) = commit_info {
137 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138 (commit_info.committed_rounds, commit_ref.index + 1)
139 } else {
140 tracing::info!("Found no stored CommitInfo to recover from");
141 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142 };
143
144 let mut unscored_committed_subdags = Vec::new();
145 let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147 if let Some(last_commit) = last_commit.as_ref() {
148 store
149 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151 .iter()
152 .for_each(|commit| {
153 for block_ref in commit.blocks() {
154 last_committed_rounds[block_ref.author] =
155 max(last_committed_rounds[block_ref.author], block_ref.round);
156 }
157
158 let committed_subdag =
159 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); unscored_committed_subdags.push(committed_subdag);
161 });
162 }
163
164 tracing::info!(
165 "DagState was initialized with the following state: \
166 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167 unscored_committed_subdags.len()
168 );
169
170 if context
171 .protocol_config
172 .consensus_distributed_vote_scoring_strategy()
173 {
174 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175 }
176
177 let mut state = Self {
178 context,
179 genesis,
180 recent_blocks: BTreeMap::new(),
181 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182 threshold_clock,
183 highest_accepted_round: 0,
184 last_commit: last_commit.clone(),
185 last_commit_round_advancement_time: None,
186 last_committed_rounds: last_committed_rounds.clone(),
187 pending_commit_votes: VecDeque::new(),
188 blocks_to_write: vec![],
189 commits_to_write: vec![],
190 commit_info_to_write: vec![],
191 scoring_subdag,
192 unscored_committed_subdags,
193 store: store.clone(),
194 cached_rounds,
195 evicted_rounds: vec![0; num_authorities],
196 };
197
198 for (i, round) in last_committed_rounds.into_iter().enumerate() {
199 let authority_index = state.context.committee.to_authority_index(i).unwrap();
200 let (blocks, eviction_round) = if state.gc_enabled() {
201 let last_block = state
205 .store
206 .scan_last_blocks_by_author(authority_index, 1, None)
207 .expect("Database error");
208 let last_block_round = last_block
209 .last()
210 .map(|b| b.round())
211 .unwrap_or(GENESIS_ROUND);
212
213 let eviction_round = Self::gc_eviction_round(
214 last_block_round,
215 state.gc_round(),
216 state.cached_rounds,
217 );
218 let blocks = state
219 .store
220 .scan_blocks_by_author(authority_index, eviction_round + 1)
221 .expect("Database error");
222 (blocks, eviction_round)
223 } else {
224 let eviction_round = Self::eviction_round(round, cached_rounds);
225 let blocks = state
226 .store
227 .scan_blocks_by_author(authority_index, eviction_round + 1)
228 .expect("Database error");
229 (blocks, eviction_round)
230 };
231
232 state.evicted_rounds[authority_index] = eviction_round;
233
234 for block in &blocks {
236 state.update_block_metadata(block);
237 }
238
239 info!(
240 "Recovered blocks {}: {:?}",
241 authority_index,
242 blocks
243 .iter()
244 .map(|b| b.reference())
245 .collect::<Vec<BlockRef>>()
246 );
247 }
248
249 if state.gc_enabled() {
250 if let Some(last_commit) = last_commit {
251 let mut index = last_commit.index();
252 let gc_round = state.gc_round();
253 info!(
254 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
255 index, gc_round
256 );
257
258 loop {
259 let commits = store
260 .scan_commits((index..=index).into())
261 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
262 let Some(commit) = commits.first() else {
263 info!(
264 "Recovering finished up to index {index}, no more commits to recover"
265 );
266 break;
267 };
268
269 if gc_round > 0 && commit.leader().round <= gc_round {
272 info!(
273 "Recovering finished, reached commit leader round {} <= gc_round {}",
274 commit.leader().round,
275 gc_round
276 );
277 break;
278 }
279
280 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
281 debug!(
282 "Setting block {:?} as committed based on commit {:?}",
283 block_ref,
284 commit.index()
285 );
286 assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
287 });
288
289 index = index.saturating_sub(1);
291 if index == 0 {
292 break;
293 }
294 }
295 }
296 }
297
298 state
299 }
300
301 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
303 assert_ne!(
304 block.round(),
305 0,
306 "Genesis block should not be accepted into DAG."
307 );
308
309 let block_ref = block.reference();
310 if self.contains_block(&block_ref) {
311 return;
312 }
313
314 let now = self.context.clock.timestamp_utc_ms();
315 if block.timestamp_ms() > now {
316 panic!(
317 "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
318 block,
319 block.timestamp_ms(),
320 now,
321 );
322 }
323
324 if block_ref.author == self.context.own_index {
327 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
328 assert!(
329 existing_blocks.is_empty(),
330 "Block Rejected! Attempted to add block {block:#?} to own slot where \
331 block(s) {existing_blocks:#?} already exists."
332 );
333 }
334 self.update_block_metadata(&block);
335 self.blocks_to_write.push(block);
336 let source = if self.context.own_index == block_ref.author {
337 "own"
338 } else {
339 "others"
340 };
341 self.context
342 .metrics
343 .node_metrics
344 .accepted_blocks
345 .with_label_values(&[source])
346 .inc();
347 }
348
349 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
351 let block_ref = block.reference();
352 self.recent_blocks
353 .insert(block_ref, BlockInfo::new(block.clone()));
354 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
355 self.threshold_clock.add_block(block_ref);
356 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
357 self.context
358 .metrics
359 .node_metrics
360 .highest_accepted_round
361 .set(self.highest_accepted_round as i64);
362
363 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
364 .last()
365 .map(|block_ref| block_ref.round)
366 .expect("There should be by now at least one block ref");
367 let hostname = &self.context.committee.authority(block_ref.author).hostname;
368 self.context
369 .metrics
370 .node_metrics
371 .highest_accepted_authority_round
372 .with_label_values(&[hostname])
373 .set(highest_accepted_round_for_author as i64);
374 }
375
376 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
378 debug!(
379 "Accepting blocks: {}",
380 blocks.iter().map(|b| b.reference().to_string()).join(",")
381 );
382 for block in blocks {
383 self.accept_block(block);
384 }
385 }
386
387 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
390 self.get_blocks(&[*reference])
391 .pop()
392 .expect("Exactly one element should be returned")
393 }
394
395 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
399 let mut blocks = vec![None; block_refs.len()];
400 let mut missing = Vec::new();
401
402 for (index, block_ref) in block_refs.iter().enumerate() {
403 if block_ref.round == GENESIS_ROUND {
404 if let Some(block) = self.genesis.get(block_ref) {
406 blocks[index] = Some(block.clone());
407 }
408 continue;
409 }
410 if let Some(block_info) = self.recent_blocks.get(block_ref) {
411 blocks[index] = Some(block_info.block.clone());
412 continue;
413 }
414 missing.push((index, block_ref));
415 }
416
417 if missing.is_empty() {
418 return blocks;
419 }
420
421 let missing_refs = missing
422 .iter()
423 .map(|(_, block_ref)| **block_ref)
424 .collect::<Vec<_>>();
425 let store_results = self
426 .store
427 .read_blocks(&missing_refs)
428 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
429 self.context
430 .metrics
431 .node_metrics
432 .dag_state_store_read_count
433 .with_label_values(&["get_blocks"])
434 .inc();
435
436 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
437 blocks[index] = result;
438 }
439
440 blocks
441 }
442
443 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
447 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
448 if !block_info.committed {
449 block_info.committed = true;
450 return true;
451 }
452 false
453 } else {
454 panic!("Block {block_ref:?} not found in cache to set as committed.");
455 }
456 }
457
458 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
459 self.recent_blocks
460 .get(block_ref)
461 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
462 .committed
463 }
464
465 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
469 let mut blocks = vec![];
474 for (_block_ref, block_info) in self.recent_blocks.range((
475 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
476 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
477 )) {
478 blocks.push(block_info.block.clone())
479 }
480 blocks
481 }
482
483 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
487 if round <= self.last_commit_round() {
488 panic!("Round {round} have committed blocks!");
489 }
490
491 let mut blocks = vec![];
492 for (_block_ref, block_info) in self.recent_blocks.range((
493 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
494 Excluded(BlockRef::new(
495 round + 1,
496 AuthorityIndex::ZERO,
497 BlockDigest::MIN,
498 )),
499 )) {
500 blocks.push(block_info.block.clone())
501 }
502 blocks
503 }
504
505 pub(crate) fn ancestors_at_round(
507 &self,
508 later_block: &VerifiedBlock,
509 earlier_round: Round,
510 ) -> Vec<VerifiedBlock> {
511 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
513 while !linked.is_empty() {
514 let round = linked.last().unwrap().round;
515 if round <= earlier_round {
517 break;
518 }
519 let block_ref = linked.pop_last().unwrap();
520 let Some(block) = self.get_block(&block_ref) else {
521 panic!("Block {block_ref:?} should exist in DAG!");
522 };
523 linked.extend(block.ancestors().iter().cloned());
524 }
525 linked
526 .range((
527 Included(BlockRef::new(
528 earlier_round,
529 AuthorityIndex::ZERO,
530 BlockDigest::MIN,
531 )),
532 Unbounded,
533 ))
534 .map(|r| {
535 self.get_block(r)
536 .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
537 .clone()
538 })
539 .collect()
540 }
541
542 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
545 self.get_last_block_for_authority(self.context.own_index)
546 }
547
548 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
552 if let Some(last) = self.recent_refs_by_authority[authority].last() {
553 return self
554 .recent_blocks
555 .get(last)
556 .expect("Block should be found in recent blocks")
557 .block
558 .clone();
559 }
560
561 let (_, genesis_block) = self
563 .genesis
564 .iter()
565 .find(|(block_ref, _)| block_ref.author == authority)
566 .expect("Genesis should be found for authority {authority_index}");
567 genesis_block.clone()
568 }
569
570 pub(crate) fn get_cached_blocks(
576 &self,
577 authority: AuthorityIndex,
578 start: Round,
579 ) -> Vec<VerifiedBlock> {
580 let mut blocks = vec![];
581 for block_ref in self.recent_refs_by_authority[authority].range((
582 Included(BlockRef::new(start, authority, BlockDigest::MIN)),
583 Unbounded,
584 )) {
585 let block_info = self
586 .recent_blocks
587 .get(block_ref)
588 .expect("Block should exist in recent blocks");
589 blocks.push(block_info.block.clone());
590 }
591 blocks
592 }
593
594 pub(crate) fn get_last_cached_block_in_range(
597 &self,
598 authority: AuthorityIndex,
599 start_round: Round,
600 end_round: Round,
601 ) -> Option<VerifiedBlock> {
602 if end_round == GENESIS_ROUND {
603 panic!(
604 "Attempted to retrieve blocks earlier than the genesis round which is impossible"
605 );
606 }
607
608 let block_ref = self.recent_refs_by_authority[authority]
609 .range((
610 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
611 Excluded(BlockRef::new(
612 end_round,
613 AuthorityIndex::MIN,
614 BlockDigest::MIN,
615 )),
616 ))
617 .last()?;
618
619 self.recent_blocks
620 .get(block_ref)
621 .map(|block_info| block_info.block.clone())
622 }
623
624 pub(crate) fn get_last_cached_block_per_authority(
633 &self,
634 end_round: Round,
635 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
636 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
638 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
639
640 if end_round == GENESIS_ROUND {
641 panic!(
642 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
643 );
644 }
645
646 if end_round == GENESIS_ROUND + 1 {
647 return blocks.into_iter().map(|b| (b, vec![])).collect();
648 }
649
650 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
651 let authority_index = self
652 .context
653 .committee
654 .to_authority_index(authority_index)
655 .unwrap();
656
657 let last_evicted_round = self.evicted_rounds[authority_index];
658 if end_round.saturating_sub(1) <= last_evicted_round {
659 panic!(
660 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
661 );
662 }
663
664 let block_ref_iter = block_refs
665 .range((
666 Included(BlockRef::new(
667 last_evicted_round + 1,
668 authority_index,
669 BlockDigest::MIN,
670 )),
671 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
672 ))
673 .rev();
674
675 let mut last_round = 0;
676 for block_ref in block_ref_iter {
677 if last_round == 0 {
678 last_round = block_ref.round;
679 let block_info = self
680 .recent_blocks
681 .get(block_ref)
682 .expect("Block should exist in recent blocks");
683 blocks[authority_index] = block_info.block.clone();
684 continue;
685 }
686 if block_ref.round < last_round {
687 break;
688 }
689 equivocating_blocks[authority_index].push(*block_ref);
690 }
691 }
692
693 blocks.into_iter().zip(equivocating_blocks).collect()
694 }
695
696 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
700 if slot.round == GENESIS_ROUND {
702 return true;
703 }
704
705 let eviction_round = self.evicted_rounds[slot.authority];
706 if slot.round <= eviction_round {
707 panic!(
708 "{}",
709 format!(
710 "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
711 if self.gc_enabled() { " gc " } else { " " }
712 )
713 );
714 }
715
716 let mut result = self.recent_refs_by_authority[slot.authority].range((
717 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
718 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
719 ));
720 result.next().is_some()
721 }
722
723 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
727 let mut exist = vec![false; block_refs.len()];
728 let mut missing = Vec::new();
729
730 for (index, block_ref) in block_refs.into_iter().enumerate() {
731 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
732 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
733 exist[index] = true;
734 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
735 {
736 exist[index] = false;
741 } else {
742 missing.push((index, block_ref));
743 }
744 }
745
746 if missing.is_empty() {
747 return exist;
748 }
749
750 let missing_refs = missing
751 .iter()
752 .map(|(_, block_ref)| *block_ref)
753 .collect::<Vec<_>>();
754 let store_results = self
755 .store
756 .contains_blocks(&missing_refs)
757 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
758 self.context
759 .metrics
760 .node_metrics
761 .dag_state_store_read_count
762 .with_label_values(&["contains_blocks"])
763 .inc();
764
765 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
766 exist[index] = result;
767 }
768
769 exist
770 }
771
772 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
773 let blocks = self.contains_blocks(vec![*block_ref]);
774 blocks.first().cloned().unwrap()
775 }
776
777 pub(crate) fn threshold_clock_round(&self) -> Round {
778 self.threshold_clock.get_round()
779 }
780
781 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
782 self.threshold_clock.get_quorum_ts()
783 }
784
785 pub(crate) fn highest_accepted_round(&self) -> Round {
786 self.highest_accepted_round
787 }
788
789 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
792 if let Some(last_commit) = &self.last_commit {
793 if commit.index() <= last_commit.index() {
794 error!(
795 "New commit index {} <= last commit index {}!",
796 commit.index(),
797 last_commit.index()
798 );
799 return;
800 }
801 assert_eq!(commit.index(), last_commit.index() + 1);
802
803 if commit.timestamp_ms() < last_commit.timestamp_ms() {
804 panic!(
805 "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
806 );
807 }
808 } else {
809 assert_eq!(commit.index(), 1);
810 }
811
812 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
813 previous_commit.round() < commit.round()
814 } else {
815 true
816 };
817
818 self.last_commit = Some(commit.clone());
819
820 if commit_round_advanced {
821 let now = std::time::Instant::now();
822 if let Some(previous_time) = self.last_commit_round_advancement_time {
823 self.context
824 .metrics
825 .node_metrics
826 .commit_round_advancement_interval
827 .observe(now.duration_since(previous_time).as_secs_f64())
828 }
829 self.last_commit_round_advancement_time = Some(now);
830 }
831
832 for block_ref in commit.blocks().iter() {
833 self.last_committed_rounds[block_ref.author] = max(
834 self.last_committed_rounds[block_ref.author],
835 block_ref.round,
836 );
837 }
838
839 for (i, round) in self.last_committed_rounds.iter().enumerate() {
840 let index = self.context.committee.to_authority_index(i).unwrap();
841 let hostname = &self.context.committee.authority(index).hostname;
842 self.context
843 .metrics
844 .node_metrics
845 .last_committed_authority_round
846 .with_label_values(&[hostname])
847 .set((*round).into());
848 }
849
850 self.pending_commit_votes.push_back(commit.reference());
851 self.commits_to_write.push(commit);
852 }
853
854 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
855 assert!(self.unscored_committed_subdags.is_empty());
857
858 assert!(self.scoring_subdag.is_empty());
862
863 let commit_info = CommitInfo {
864 committed_rounds: self.last_committed_rounds.clone(),
865 reputation_scores,
866 };
867 let last_commit = self
868 .last_commit
869 .as_ref()
870 .expect("Last commit should already be set.");
871 self.commit_info_to_write
872 .push((last_commit.reference(), commit_info));
873 }
874
875 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
876 let mut votes = Vec::new();
877 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
878 votes.push(self.pending_commit_votes.pop_front().unwrap());
879 }
880 votes
881 }
882
883 pub(crate) fn last_commit_index(&self) -> CommitIndex {
885 match &self.last_commit {
886 Some(commit) => commit.index(),
887 None => 0,
888 }
889 }
890
891 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
893 match &self.last_commit {
894 Some(commit) => commit.digest(),
895 None => CommitDigest::MIN,
896 }
897 }
898
899 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
901 match &self.last_commit {
902 Some(commit) => commit.timestamp_ms(),
903 None => 0,
904 }
905 }
906
907 pub(crate) fn last_commit_leader(&self) -> Slot {
909 match &self.last_commit {
910 Some(commit) => commit.leader().into(),
911 None => self
912 .genesis
913 .iter()
914 .next()
915 .map(|(genesis_ref, _)| *genesis_ref)
916 .expect("Genesis blocks should always be available.")
917 .into(),
918 }
919 }
920
921 pub(crate) fn last_commit_round(&self) -> Round {
924 match &self.last_commit {
925 Some(commit) => commit.leader().round,
926 None => 0,
927 }
928 }
929
930 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
932 self.last_committed_rounds.clone()
933 }
934
935 pub(crate) fn gc_round(&self) -> Round {
942 self.calculate_gc_round(self.last_commit_round())
943 }
944
945 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
946 let gc_depth = self.context.protocol_config.gc_depth();
947 if gc_depth > 0 {
948 commit_round.saturating_sub(gc_depth)
950 } else {
951 GENESIS_ROUND
954 }
955 }
956
957 pub(crate) fn gc_enabled(&self) -> bool {
958 self.context.protocol_config.gc_depth() > 0
959 }
960
961 pub(crate) fn flush(&mut self) {
964 let _s = self
965 .context
966 .metrics
967 .node_metrics
968 .scope_processing_time
969 .with_label_values(&["DagState::flush"])
970 .start_timer();
971 let blocks = std::mem::take(&mut self.blocks_to_write);
973 let commits = std::mem::take(&mut self.commits_to_write);
974 let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
975
976 if blocks.is_empty() && commits.is_empty() {
977 return;
978 }
979 debug!(
980 "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
981 blocks.len(),
982 blocks.iter().map(|b| b.reference().to_string()).join(","),
983 commits.len(),
984 commits.iter().map(|c| c.reference().to_string()).join(","),
985 commit_info_to_write.len(),
986 commit_info_to_write
987 .iter()
988 .map(|(commit_ref, _)| commit_ref.to_string())
989 .join(","),
990 );
991 self.store
992 .write(WriteBatch::new(blocks, commits, commit_info_to_write))
993 .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
994 self.context
995 .metrics
996 .node_metrics
997 .dag_state_store_write_count
998 .inc();
999
1000 for (authority_index, _) in self.context.committee.authorities() {
1003 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1004 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1005 if block_ref.round <= eviction_round {
1006 self.recent_blocks.remove(block_ref);
1007 self.recent_refs_by_authority[authority_index].pop_first();
1008 } else {
1009 break;
1010 }
1011 }
1012 self.evicted_rounds[authority_index] = eviction_round;
1013 }
1014
1015 let metrics = &self.context.metrics.node_metrics;
1016 metrics
1017 .dag_state_recent_blocks
1018 .set(self.recent_blocks.len() as i64);
1019 metrics.dag_state_recent_refs.set(
1020 self.recent_refs_by_authority
1021 .iter()
1022 .map(BTreeSet::len)
1023 .sum::<usize>() as i64,
1024 );
1025 }
1026
1027 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1028 self.store
1029 .read_last_commit_info()
1030 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1031 }
1032
1033 pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1035 self.unscored_committed_subdags.len() as u64
1036 }
1037
1038 #[cfg(test)]
1039 pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1040 self.unscored_committed_subdags.clone()
1041 }
1042
1043 pub(crate) fn add_unscored_committed_subdags(
1044 &mut self,
1045 committed_subdags: Vec<CommittedSubDag>,
1046 ) {
1047 self.unscored_committed_subdags.extend(committed_subdags);
1048 }
1049
1050 pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1051 std::mem::take(&mut self.unscored_committed_subdags)
1052 }
1053
1054 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1055 self.scoring_subdag.add_subdags(scoring_subdags);
1056 }
1057
1058 pub(crate) fn clear_scoring_subdag(&mut self) {
1059 self.scoring_subdag.clear();
1060 }
1061
1062 pub(crate) fn scoring_subdags_count(&self) -> usize {
1063 self.scoring_subdag.scored_subdags_count()
1064 }
1065
1066 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1067 self.scoring_subdag.is_empty()
1068 }
1069
1070 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1071 self.scoring_subdag.calculate_distributed_vote_scores()
1072 }
1073
1074 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1075 self.scoring_subdag
1076 .commit_range
1077 .as_ref()
1078 .expect("commit range should exist for scoring subdag")
1079 .end()
1080 }
1081
1082 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1087 if self.gc_enabled() {
1088 let last_round = self.recent_refs_by_authority[authority_index]
1089 .last()
1090 .map(|block_ref| block_ref.round)
1091 .unwrap_or(GENESIS_ROUND);
1092
1093 Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1094 } else {
1095 let commit_round = self.last_committed_rounds[authority_index];
1096 Self::eviction_round(commit_round, self.cached_rounds)
1097 }
1098 }
1099
1100 fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1103 commit_round.saturating_sub(cached_rounds)
1104 }
1105
1106 fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1111 gc_round.min(last_round.saturating_sub(cached_rounds))
1112 }
1113
1114 #[cfg(test)]
1117 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1118 for round in
1122 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1123 {
1124 if round == GENESIS_ROUND {
1125 return self.genesis_blocks();
1126 }
1127 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1128 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1129
1130 let blocks = self.get_uncommitted_blocks_at_round(round);
1133 for block in &blocks {
1134 if quorum.add(block.author(), &self.context.committee) {
1135 return blocks;
1136 }
1137 }
1138 }
1139
1140 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1141 }
1142
1143 #[cfg(test)]
1144 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1145 self.genesis.values().cloned().collect()
1146 }
1147
1148 #[cfg(test)]
1149 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1150 self.last_commit = Some(commit);
1151 }
1152}
1153
1154struct BlockInfo {
1155 block: VerifiedBlock,
1156 committed: bool,
1158}
1159
1160impl BlockInfo {
1161 fn new(block: VerifiedBlock) -> Self {
1162 Self {
1163 block,
1164 committed: false,
1165 }
1166 }
1167}
1168
1169#[cfg(test)]
1170mod test {
1171 use std::vec;
1172
1173 use parking_lot::RwLock;
1174 use rstest::rstest;
1175
1176 use super::*;
1177 use crate::{
1178 block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1179 storage::{WriteBatch, mem_store::MemStore},
1180 test_dag_builder::DagBuilder,
1181 test_dag_parser::parse_dag,
1182 };
1183
1184 #[tokio::test]
1185 async fn test_get_blocks() {
1186 let (context, _) = Context::new_for_test(4);
1187 let context = Arc::new(context);
1188 let store = Arc::new(MemStore::new());
1189 let mut dag_state = DagState::new(context.clone(), store.clone());
1190 let own_index = AuthorityIndex::new_for_test(0);
1191
1192 let num_rounds: u32 = 10;
1194 let non_existent_round: u32 = 100;
1195 let num_authorities: u32 = 3;
1196 let num_blocks_per_slot: usize = 3;
1197 let mut blocks = BTreeMap::new();
1198 for round in 1..=num_rounds {
1199 for author in 0..num_authorities {
1200 let base_ts = round as BlockTimestampMs * 1000;
1202 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1203 let block = VerifiedBlock::new_for_test(
1204 TestBlock::new(round, author)
1205 .set_timestamp_ms(timestamp)
1206 .build(),
1207 );
1208 dag_state.accept_block(block.clone());
1209 blocks.insert(block.reference(), block);
1210
1211 if AuthorityIndex::new_for_test(author) == own_index {
1213 break;
1214 }
1215 }
1216 }
1217 }
1218
1219 for (r, block) in &blocks {
1221 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1222 }
1223
1224 let last_ref = blocks.keys().last().unwrap();
1226 assert!(
1227 dag_state
1228 .get_block(&BlockRef::new(
1229 last_ref.round,
1230 last_ref.author,
1231 BlockDigest::MIN
1232 ))
1233 .is_none()
1234 );
1235
1236 for round in 1..=num_rounds {
1238 for author in 0..num_authorities {
1239 let slot = Slot::new(
1240 round,
1241 context
1242 .committee
1243 .to_authority_index(author as usize)
1244 .unwrap(),
1245 );
1246 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1247
1248 if AuthorityIndex::new_for_test(author) == own_index {
1250 assert_eq!(blocks.len(), 1);
1251 } else {
1252 assert_eq!(blocks.len(), num_blocks_per_slot);
1253 }
1254
1255 for b in blocks {
1256 assert_eq!(b.round(), round);
1257 assert_eq!(
1258 b.author(),
1259 context
1260 .committee
1261 .to_authority_index(author as usize)
1262 .unwrap()
1263 );
1264 }
1265 }
1266 }
1267
1268 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1270 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1271
1272 for round in 1..=num_rounds {
1274 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1275 assert_eq!(
1278 blocks.len(),
1279 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1280 );
1281 for b in blocks {
1282 assert_eq!(b.round(), round);
1283 }
1284 }
1285
1286 assert!(
1288 dag_state
1289 .get_uncommitted_blocks_at_round(non_existent_round)
1290 .is_empty()
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn test_ancestors_at_uncommitted_round() {
1296 let (context, _) = Context::new_for_test(4);
1298 let context = Arc::new(context);
1299 let store = Arc::new(MemStore::new());
1300 let mut dag_state = DagState::new(context.clone(), store.clone());
1301
1302 let round_10_refs: Vec<_> = (0..4)
1306 .map(|a| {
1307 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1308 .reference()
1309 })
1310 .collect();
1311
1312 let round_11 = vec![
1314 VerifiedBlock::new_for_test(
1316 TestBlock::new(11, 0)
1317 .set_timestamp_ms(1100)
1318 .set_ancestors(round_10_refs.clone())
1319 .build(),
1320 ),
1321 VerifiedBlock::new_for_test(
1324 TestBlock::new(11, 1)
1325 .set_timestamp_ms(1110)
1326 .set_ancestors(round_10_refs.clone())
1327 .build(),
1328 ),
1329 VerifiedBlock::new_for_test(
1331 TestBlock::new(11, 1)
1332 .set_timestamp_ms(1111)
1333 .set_ancestors(round_10_refs.clone())
1334 .build(),
1335 ),
1336 VerifiedBlock::new_for_test(
1338 TestBlock::new(11, 1)
1339 .set_timestamp_ms(1112)
1340 .set_ancestors(round_10_refs.clone())
1341 .build(),
1342 ),
1343 VerifiedBlock::new_for_test(
1345 TestBlock::new(11, 2)
1346 .set_timestamp_ms(1120)
1347 .set_ancestors(round_10_refs.clone())
1348 .build(),
1349 ),
1350 VerifiedBlock::new_for_test(
1352 TestBlock::new(11, 3)
1353 .set_timestamp_ms(1130)
1354 .set_ancestors(round_10_refs.clone())
1355 .build(),
1356 ),
1357 ];
1358
1359 let ancestors_for_round_12 = vec![
1361 round_11[0].reference(),
1362 round_11[1].reference(),
1363 round_11[5].reference(),
1364 ];
1365 let round_12 = vec![
1366 VerifiedBlock::new_for_test(
1367 TestBlock::new(12, 0)
1368 .set_timestamp_ms(1200)
1369 .set_ancestors(ancestors_for_round_12.clone())
1370 .build(),
1371 ),
1372 VerifiedBlock::new_for_test(
1373 TestBlock::new(12, 2)
1374 .set_timestamp_ms(1220)
1375 .set_ancestors(ancestors_for_round_12.clone())
1376 .build(),
1377 ),
1378 VerifiedBlock::new_for_test(
1379 TestBlock::new(12, 3)
1380 .set_timestamp_ms(1230)
1381 .set_ancestors(ancestors_for_round_12.clone())
1382 .build(),
1383 ),
1384 ];
1385
1386 let ancestors_for_round_13 = vec![
1388 round_12[0].reference(),
1389 round_12[1].reference(),
1390 round_12[2].reference(),
1391 round_11[2].reference(),
1392 ];
1393 let round_13 = vec![
1394 VerifiedBlock::new_for_test(
1395 TestBlock::new(12, 1)
1396 .set_timestamp_ms(1300)
1397 .set_ancestors(ancestors_for_round_13.clone())
1398 .build(),
1399 ),
1400 VerifiedBlock::new_for_test(
1401 TestBlock::new(12, 2)
1402 .set_timestamp_ms(1320)
1403 .set_ancestors(ancestors_for_round_13.clone())
1404 .build(),
1405 ),
1406 VerifiedBlock::new_for_test(
1407 TestBlock::new(12, 3)
1408 .set_timestamp_ms(1330)
1409 .set_ancestors(ancestors_for_round_13.clone())
1410 .build(),
1411 ),
1412 ];
1413
1414 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1416 let anchor = VerifiedBlock::new_for_test(
1417 TestBlock::new(14, 1)
1418 .set_timestamp_ms(1410)
1419 .set_ancestors(ancestors_for_round_14)
1420 .build(),
1421 );
1422
1423 for b in round_11
1425 .iter()
1426 .chain(round_12.iter())
1427 .chain(round_13.iter())
1428 .chain([anchor.clone()].iter())
1429 {
1430 dag_state.accept_block(b.clone());
1431 }
1432
1433 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1435 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1436 ancestors_refs.sort();
1437 let mut expected_refs = vec![
1438 round_11[0].reference(),
1439 round_11[1].reference(),
1440 round_11[2].reference(),
1441 round_11[5].reference(),
1442 ];
1443 expected_refs.sort(); assert_eq!(
1446 ancestors_refs, expected_refs,
1447 "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1448 );
1449 }
1450
1451 #[tokio::test]
1452 async fn test_contains_blocks_in_cache_or_store() {
1453 const CACHED_ROUNDS: Round = 2;
1455
1456 let (mut context, _) = Context::new_for_test(4);
1457 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1458
1459 let context = Arc::new(context);
1460 let store = Arc::new(MemStore::new());
1461 let mut dag_state = DagState::new(context.clone(), store.clone());
1462
1463 let num_rounds: u32 = 10;
1465 let num_authorities: u32 = 4;
1466 let mut blocks = Vec::new();
1467
1468 for round in 1..=num_rounds {
1469 for author in 0..num_authorities {
1470 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1471 blocks.push(block);
1472 }
1473 }
1474
1475 blocks.clone().into_iter().for_each(|block| {
1478 if block.round() <= 4 {
1479 store
1480 .write(WriteBatch::default().blocks(vec![block]))
1481 .unwrap();
1482 } else {
1483 dag_state.accept_blocks(vec![block]);
1484 }
1485 });
1486
1487 let mut block_refs = blocks
1491 .iter()
1492 .map(|block| block.reference())
1493 .collect::<Vec<_>>();
1494 let result = dag_state.contains_blocks(block_refs.clone());
1495
1496 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1498 assert_eq!(result, expected);
1499
1500 block_refs.insert(
1502 3,
1503 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1504 );
1505 let result = dag_state.contains_blocks(block_refs.clone());
1506
1507 expected.insert(3, false);
1509 assert_eq!(result, expected.clone());
1510 }
1511
1512 #[tokio::test]
1513 async fn test_contains_cached_block_at_slot() {
1514 const CACHED_ROUNDS: Round = 2;
1516
1517 let num_authorities: u32 = 4;
1518 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1519 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1520
1521 let context = Arc::new(context);
1522 let store = Arc::new(MemStore::new());
1523 let mut dag_state = DagState::new(context.clone(), store.clone());
1524
1525 let num_rounds: u32 = 10;
1527 let mut blocks = Vec::new();
1528
1529 for round in 1..=num_rounds {
1530 for author in 0..num_authorities {
1531 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1532 blocks.push(block.clone());
1533 dag_state.accept_block(block);
1534 }
1535 }
1536
1537 for (author, _) in context.committee.authorities() {
1539 assert!(
1540 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1541 "Genesis should always be found"
1542 );
1543 }
1544
1545 let mut block_refs = blocks
1549 .iter()
1550 .map(|block| block.reference())
1551 .collect::<Vec<_>>();
1552
1553 for block_ref in block_refs.clone() {
1554 let slot = block_ref.into();
1555 let found = dag_state.contains_cached_block_at_slot(slot);
1556 assert!(found, "A block should be found at slot {slot}");
1557 }
1558
1559 block_refs.insert(
1562 3,
1563 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1564 );
1565 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1566 expected.insert(3, false);
1567
1568 for block_ref in block_refs {
1570 let slot = block_ref.into();
1571 let found = dag_state.contains_cached_block_at_slot(slot);
1572
1573 assert_eq!(expected.remove(0), found);
1574 }
1575 }
1576
1577 #[tokio::test]
1578 #[should_panic(
1579 expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
1580 )]
1581 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1582 const CACHED_ROUNDS: Round = 2;
1584
1585 let (mut context, _) = Context::new_for_test(4);
1586 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1587 context
1588 .protocol_config
1589 .set_consensus_gc_depth_for_testing(0);
1590
1591 let context = Arc::new(context);
1592 let store = Arc::new(MemStore::new());
1593 let mut dag_state = DagState::new(context.clone(), store.clone());
1594
1595 let mut blocks = Vec::new();
1597 for round in 1..=10 {
1598 let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1599 blocks.push(block.clone());
1600 dag_state.accept_block(block);
1601 }
1602
1603 dag_state.add_commit(TrustedCommit::new_for_test(
1605 1 as CommitIndex,
1606 CommitDigest::MIN,
1607 0,
1608 blocks.last().unwrap().reference(),
1609 blocks
1610 .into_iter()
1611 .map(|block| block.reference())
1612 .collect::<Vec<_>>(),
1613 ));
1614
1615 dag_state.flush();
1616
1617 let _ =
1621 dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1622 }
1623
1624 #[tokio::test]
1625 #[should_panic(
1626 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1627 )]
1628 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1629 const GC_DEPTH: u32 = 2;
1633 const CACHED_ROUNDS: Round = 3;
1635
1636 let (mut context, _) = Context::new_for_test(4);
1637 context
1638 .protocol_config
1639 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1640 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1641
1642 let context = Arc::new(context);
1643 let store = Arc::new(MemStore::new());
1644 let mut dag_state = DagState::new(context.clone(), store.clone());
1645
1646 let mut dag_builder = DagBuilder::new(context.clone());
1649 dag_builder.layers(1..=3).build();
1650 dag_builder
1651 .layers(4..=6)
1652 .authorities(vec![AuthorityIndex::new_for_test(0)])
1653 .skip_block()
1654 .build();
1655
1656 dag_builder
1658 .all_blocks()
1659 .into_iter()
1660 .for_each(|block| dag_state.accept_block(block));
1661
1662 dag_state.add_commit(TrustedCommit::new_for_test(
1664 1 as CommitIndex,
1665 CommitDigest::MIN,
1666 0,
1667 dag_builder.leader_block(5).unwrap().reference(),
1668 vec![],
1669 ));
1670
1671 dag_state.flush();
1672
1673 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1675
1676 for authority_index in 1..=3 {
1681 for round in 4..=6 {
1682 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1683 round,
1684 AuthorityIndex::new_for_test(authority_index)
1685 )));
1686 }
1687 }
1688
1689 for round in 1..=3 {
1690 assert!(
1691 dag_state.contains_cached_block_at_slot(Slot::new(
1692 round,
1693 AuthorityIndex::new_for_test(0)
1694 ))
1695 );
1696 }
1697
1698 let _ =
1701 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1702 }
1703
1704 #[tokio::test]
1705 async fn test_get_blocks_in_cache_or_store() {
1706 let (context, _) = Context::new_for_test(4);
1707 let context = Arc::new(context);
1708 let store = Arc::new(MemStore::new());
1709 let mut dag_state = DagState::new(context.clone(), store.clone());
1710
1711 let num_rounds: u32 = 10;
1713 let num_authorities: u32 = 4;
1714 let mut blocks = Vec::new();
1715
1716 for round in 1..=num_rounds {
1717 for author in 0..num_authorities {
1718 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1719 blocks.push(block);
1720 }
1721 }
1722
1723 blocks.clone().into_iter().for_each(|block| {
1726 if block.round() <= 4 {
1727 store
1728 .write(WriteBatch::default().blocks(vec![block]))
1729 .unwrap();
1730 } else {
1731 dag_state.accept_blocks(vec![block]);
1732 }
1733 });
1734
1735 let mut block_refs = blocks
1739 .iter()
1740 .map(|block| block.reference())
1741 .collect::<Vec<_>>();
1742 let result = dag_state.get_blocks(&block_refs);
1743
1744 let mut expected = blocks
1745 .into_iter()
1746 .map(Some)
1747 .collect::<Vec<Option<VerifiedBlock>>>();
1748
1749 assert_eq!(result, expected.clone());
1751
1752 block_refs.insert(
1754 3,
1755 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1756 );
1757 let result = dag_state.get_blocks(&block_refs);
1758
1759 expected.insert(3, None);
1761 assert_eq!(result, expected);
1762 }
1763
1764 #[rstest]
1766 #[tokio::test]
1767 async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1768 telemetry_subscribers::init_for_testing();
1769 let num_authorities: u32 = 4;
1770 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1771 context
1772 .protocol_config
1773 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1774
1775 if gc_depth > 0 {
1776 context
1777 .protocol_config
1778 .set_consensus_gc_depth_for_testing(gc_depth);
1779 }
1780
1781 let context = Arc::new(context);
1782 let store = Arc::new(MemStore::new());
1783 let mut dag_state = DagState::new(context.clone(), store.clone());
1784
1785 let num_rounds: u32 = 10;
1787 let mut dag_builder = DagBuilder::new(context.clone());
1788 dag_builder.layers(1..=num_rounds).build();
1789 let mut commits = vec![];
1790
1791 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1792 commits.push(commit);
1793 }
1794
1795 let temp_commits = commits.split_off(5);
1797 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1798 for commit in commits.clone() {
1799 dag_state.add_commit(commit);
1800 }
1801
1802 dag_state.flush();
1804
1805 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1807 for commit in temp_commits.clone() {
1808 dag_state.add_commit(commit);
1809 }
1810
1811 let all_blocks = dag_builder.blocks(6..=num_rounds);
1813 let block_refs = all_blocks
1814 .iter()
1815 .map(|block| block.reference())
1816 .collect::<Vec<_>>();
1817
1818 let result = dag_state
1819 .get_blocks(&block_refs)
1820 .into_iter()
1821 .map(|b| b.unwrap())
1822 .collect::<Vec<_>>();
1823 assert_eq!(result, all_blocks);
1824
1825 assert_eq!(dag_state.last_commit_index(), 10);
1827 assert_eq!(
1828 dag_state.last_committed_rounds(),
1829 dag_builder.last_committed_rounds.clone()
1830 );
1831
1832 drop(dag_state);
1834
1835 let dag_state = DagState::new(context.clone(), store.clone());
1837
1838 let blocks = dag_builder.blocks(1..=5);
1840 let block_refs = blocks
1841 .iter()
1842 .map(|block| block.reference())
1843 .collect::<Vec<_>>();
1844 let result = dag_state
1845 .get_blocks(&block_refs)
1846 .into_iter()
1847 .map(|b| b.unwrap())
1848 .collect::<Vec<_>>();
1849 assert_eq!(result, blocks);
1850
1851 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1853 let block_refs = missing_blocks
1854 .iter()
1855 .map(|block| block.reference())
1856 .collect::<Vec<_>>();
1857 let retrieved_blocks = dag_state
1858 .get_blocks(&block_refs)
1859 .into_iter()
1860 .flatten()
1861 .collect::<Vec<_>>();
1862 assert!(retrieved_blocks.is_empty());
1863
1864 assert_eq!(dag_state.last_commit_index(), 5);
1866
1867 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1869 assert_eq!(
1870 dag_state.last_committed_rounds(),
1871 expected_last_committed_rounds
1872 );
1873
1874 assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1877 }
1878
1879 #[tokio::test]
1880 async fn test_flush_and_recovery() {
1881 telemetry_subscribers::init_for_testing();
1882 let num_authorities: u32 = 4;
1883 let (context, _) = Context::new_for_test(num_authorities as usize);
1884 let context = Arc::new(context);
1885 let store = Arc::new(MemStore::new());
1886 let mut dag_state = DagState::new(context.clone(), store.clone());
1887
1888 let num_rounds: u32 = 10;
1890 let mut dag_builder = DagBuilder::new(context.clone());
1891 dag_builder.layers(1..=num_rounds).build();
1892 let mut commits = vec![];
1893 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1894 commits.push(commit);
1895 }
1896
1897 let temp_commits = commits.split_off(5);
1899 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1900 for commit in commits.clone() {
1901 dag_state.add_commit(commit);
1902 }
1903
1904 dag_state.flush();
1906
1907 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1909 for commit in temp_commits.clone() {
1910 dag_state.add_commit(commit);
1911 }
1912
1913 let all_blocks = dag_builder.blocks(6..=num_rounds);
1915 let block_refs = all_blocks
1916 .iter()
1917 .map(|block| block.reference())
1918 .collect::<Vec<_>>();
1919 let result = dag_state
1920 .get_blocks(&block_refs)
1921 .into_iter()
1922 .map(|b| b.unwrap())
1923 .collect::<Vec<_>>();
1924 assert_eq!(result, all_blocks);
1925
1926 assert_eq!(dag_state.last_commit_index(), 10);
1928 assert_eq!(
1929 dag_state.last_committed_rounds(),
1930 dag_builder.last_committed_rounds.clone()
1931 );
1932
1933 drop(dag_state);
1935
1936 let dag_state = DagState::new(context.clone(), store.clone());
1938
1939 let blocks = dag_builder.blocks(1..=5);
1941 let block_refs = blocks
1942 .iter()
1943 .map(|block| block.reference())
1944 .collect::<Vec<_>>();
1945 let result = dag_state
1946 .get_blocks(&block_refs)
1947 .into_iter()
1948 .map(|b| b.unwrap())
1949 .collect::<Vec<_>>();
1950 assert_eq!(result, blocks);
1951
1952 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1954 let block_refs = missing_blocks
1955 .iter()
1956 .map(|block| block.reference())
1957 .collect::<Vec<_>>();
1958 let retrieved_blocks = dag_state
1959 .get_blocks(&block_refs)
1960 .into_iter()
1961 .flatten()
1962 .collect::<Vec<_>>();
1963 assert!(retrieved_blocks.is_empty());
1964
1965 assert_eq!(dag_state.last_commit_index(), 5);
1967
1968 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1970 assert_eq!(
1971 dag_state.last_committed_rounds(),
1972 expected_last_committed_rounds
1973 );
1974 assert_eq!(dag_state.scoring_subdags_count(), 5);
1977 }
1978
1979 #[tokio::test]
1980 async fn test_flush_and_recovery_gc_enabled() {
1981 telemetry_subscribers::init_for_testing();
1982
1983 const GC_DEPTH: u32 = 3;
1984 const CACHED_ROUNDS: u32 = 4;
1985
1986 let num_authorities: u32 = 4;
1987 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1988 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1989 context
1990 .protocol_config
1991 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1992 context
1993 .protocol_config
1994 .set_consensus_linearize_subdag_v2_for_testing(true);
1995
1996 let context = Arc::new(context);
1997
1998 let store = Arc::new(MemStore::new());
1999 let mut dag_state = DagState::new(context.clone(), store.clone());
2000
2001 let num_rounds: u32 = 10;
2002 let mut dag_builder = DagBuilder::new(context.clone());
2003 dag_builder.layers(1..=5).build();
2004 dag_builder
2005 .layers(6..=8)
2006 .authorities(vec![AuthorityIndex::new_for_test(0)])
2007 .skip_block()
2008 .build();
2009 dag_builder.layers(9..=num_rounds).build();
2010
2011 let mut commits = dag_builder
2012 .get_sub_dag_and_commits(1..=num_rounds)
2013 .into_iter()
2014 .map(|(_subdag, commit)| commit)
2015 .collect::<Vec<_>>();
2016
2017 let temp_commits = commits.split_off(7);
2021 dag_state.accept_blocks(dag_builder.blocks(1..=8));
2022 for commit in commits.clone() {
2023 dag_state.add_commit(commit);
2024 }
2025
2026 let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2029 for commit in commits.iter() {
2030 all_committed_blocks.extend(commit.blocks());
2031 }
2032 dag_state.flush();
2034
2035 dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2037 for commit in temp_commits.clone() {
2038 dag_state.add_commit(commit);
2039 }
2040
2041 let all_blocks = dag_builder.blocks(1..=num_rounds);
2043 let block_refs = all_blocks
2044 .iter()
2045 .map(|block| block.reference())
2046 .collect::<Vec<_>>();
2047 let result = dag_state
2048 .get_blocks(&block_refs)
2049 .into_iter()
2050 .map(|b| b.unwrap())
2051 .collect::<Vec<_>>();
2052 assert_eq!(result, all_blocks);
2053
2054 assert_eq!(dag_state.last_commit_index(), 9);
2056 assert_eq!(
2057 dag_state.last_committed_rounds(),
2058 dag_builder.last_committed_rounds.clone()
2059 );
2060
2061 drop(dag_state);
2063
2064 let dag_state = DagState::new(context.clone(), store.clone());
2066
2067 let blocks = dag_builder.blocks(1..=5);
2069 let block_refs = blocks
2070 .iter()
2071 .map(|block| block.reference())
2072 .collect::<Vec<_>>();
2073 let result = dag_state
2074 .get_blocks(&block_refs)
2075 .into_iter()
2076 .map(|b| b.unwrap())
2077 .collect::<Vec<_>>();
2078 assert_eq!(result, blocks);
2079
2080 let missing_blocks = dag_builder.blocks(9..=num_rounds);
2082 let block_refs = missing_blocks
2083 .iter()
2084 .map(|block| block.reference())
2085 .collect::<Vec<_>>();
2086 let retrieved_blocks = dag_state
2087 .get_blocks(&block_refs)
2088 .into_iter()
2089 .flatten()
2090 .collect::<Vec<_>>();
2091 assert!(retrieved_blocks.is_empty());
2092
2093 assert_eq!(dag_state.last_commit_index(), 7);
2095
2096 let expected_last_committed_rounds = vec![5, 6, 6, 7];
2098 assert_eq!(
2099 dag_state.last_committed_rounds(),
2100 expected_last_committed_rounds
2101 );
2102 assert_eq!(dag_state.scoring_subdags_count(), 7);
2105 for (authority_index, _) in context.committee.authorities() {
2107 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2108
2109 if authority_index == AuthorityIndex::new_for_test(0) {
2114 assert_eq!(blocks.len(), 4);
2115 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2116 assert!(
2117 blocks
2118 .into_iter()
2119 .all(|block| block.round() >= 2 && block.round() <= 5)
2120 );
2121 } else {
2122 assert_eq!(blocks.len(), 4);
2123 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2124 assert!(
2125 blocks
2126 .into_iter()
2127 .all(|block| block.round() >= 5 && block.round() <= 8)
2128 );
2129 }
2130 }
2131 let gc_round = dag_state.gc_round();
2134 assert_eq!(gc_round, 4);
2135 dag_state
2136 .recent_blocks
2137 .iter()
2138 .for_each(|(block_ref, block_info)| {
2139 if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2140 assert!(
2141 block_info.committed,
2142 "Block {block_ref:?} should be committed"
2143 );
2144 };
2145 });
2146 }
2147
2148 #[tokio::test]
2149 async fn test_block_info_as_committed() {
2150 let num_authorities: u32 = 4;
2151 let (context, _) = Context::new_for_test(num_authorities as usize);
2152 let context = Arc::new(context);
2153
2154 let store = Arc::new(MemStore::new());
2155 let mut dag_state = DagState::new(context.clone(), store.clone());
2156
2157 let block = VerifiedBlock::new_for_test(
2159 TestBlock::new(1, 0)
2160 .set_timestamp_ms(1000)
2161 .set_ancestors(vec![])
2162 .build(),
2163 );
2164
2165 dag_state.accept_block(block.clone());
2166
2167 assert!(!dag_state.is_committed(&block.reference()));
2169
2170 assert!(
2172 dag_state.set_committed(&block.reference()),
2173 "Block should be successfully set as committed for first time"
2174 );
2175
2176 assert!(dag_state.is_committed(&block.reference()));
2178
2179 assert!(
2181 !dag_state.set_committed(&block.reference()),
2182 "Block should not be successfully set as committed"
2183 );
2184 }
2185
2186 #[tokio::test]
2187 async fn test_get_cached_blocks() {
2188 let (mut context, _) = Context::new_for_test(4);
2189 context.parameters.dag_state_cached_rounds = 5;
2190
2191 let context = Arc::new(context);
2192 let store = Arc::new(MemStore::new());
2193 let mut dag_state = DagState::new(context.clone(), store.clone());
2194
2195 let mut all_blocks = Vec::new();
2200 for author in 1..=3 {
2201 for round in 10..(10 + author) {
2202 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2203 all_blocks.push(block.clone());
2204 dag_state.accept_block(block);
2205 }
2206 }
2207
2208 let cached_blocks =
2209 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2210 assert!(cached_blocks.is_empty());
2211
2212 let cached_blocks =
2213 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2214 assert_eq!(cached_blocks.len(), 1);
2215 assert_eq!(cached_blocks[0].round(), 10);
2216
2217 let cached_blocks =
2218 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2219 assert_eq!(cached_blocks.len(), 2);
2220 assert_eq!(cached_blocks[0].round(), 10);
2221 assert_eq!(cached_blocks[1].round(), 11);
2222
2223 let cached_blocks =
2224 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2225 assert_eq!(cached_blocks.len(), 1);
2226 assert_eq!(cached_blocks[0].round(), 11);
2227
2228 let cached_blocks =
2229 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2230 assert_eq!(cached_blocks.len(), 3);
2231 assert_eq!(cached_blocks[0].round(), 10);
2232 assert_eq!(cached_blocks[1].round(), 11);
2233 assert_eq!(cached_blocks[2].round(), 12);
2234
2235 let cached_blocks =
2236 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2237 assert_eq!(cached_blocks.len(), 1);
2238 assert_eq!(cached_blocks[0].round(), 12);
2239 }
2240
2241 #[rstest]
2242 #[tokio::test]
2243 async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2244 const CACHED_ROUNDS: Round = 2;
2246 let (mut context, _) = Context::new_for_test(4);
2247 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2248
2249 if gc_depth > 0 {
2250 context
2251 .protocol_config
2252 .set_consensus_gc_depth_for_testing(gc_depth);
2253 }
2254
2255 let context = Arc::new(context);
2256 let store = Arc::new(MemStore::new());
2257 let mut dag_state = DagState::new(context.clone(), store.clone());
2258
2259 let dag_str = "DAG {
2264 Round 0 : { 4 },
2265 Round 1 : {
2266 B -> [*],
2267 C -> [*],
2268 D -> [*],
2269 },
2270 Round 2 : {
2271 C -> [*],
2272 D -> [*],
2273 },
2274 Round 3 : {
2275 D -> [*],
2276 },
2277 }";
2278
2279 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2280
2281 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2283
2284 for block in dag_builder
2286 .all_blocks()
2287 .into_iter()
2288 .chain(std::iter::once(block))
2289 {
2290 dag_state.accept_block(block);
2291 }
2292
2293 dag_state.add_commit(TrustedCommit::new_for_test(
2294 1 as CommitIndex,
2295 CommitDigest::MIN,
2296 context.clock.timestamp_utc_ms(),
2297 dag_builder.leader_block(3).unwrap().reference(),
2298 vec![],
2299 ));
2300
2301 let end_round = 4;
2303 let expected_rounds = vec![0, 1, 2, 3];
2304 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2305 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2307 assert_eq!(
2308 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2309 expected_rounds
2310 );
2311 assert_eq!(
2312 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2313 expected_excluded_and_equivocating_blocks
2314 );
2315
2316 for (i, expected_round) in expected_rounds.iter().enumerate() {
2318 let round = dag_state
2319 .get_last_cached_block_in_range(
2320 context.committee.to_authority_index(i).unwrap(),
2321 0,
2322 end_round,
2323 )
2324 .map(|b| b.round())
2325 .unwrap_or_default();
2326 assert_eq!(round, *expected_round, "Authority {i}");
2327 }
2328
2329 let start_round = 2;
2331 let expected_rounds = [0, 0, 2, 3];
2332
2333 for (i, expected_round) in expected_rounds.iter().enumerate() {
2335 let round = dag_state
2336 .get_last_cached_block_in_range(
2337 context.committee.to_authority_index(i).unwrap(),
2338 start_round,
2339 end_round,
2340 )
2341 .map(|b| b.round())
2342 .unwrap_or_default();
2343 assert_eq!(round, *expected_round, "Authority {i}");
2344 }
2345
2346 dag_state.flush();
2355
2356 let end_round = 3;
2358 let expected_rounds = vec![0, 1, 2, 2];
2359
2360 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2362 assert_eq!(
2363 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2364 expected_rounds
2365 );
2366
2367 for (i, expected_round) in expected_rounds.iter().enumerate() {
2369 let round = dag_state
2370 .get_last_cached_block_in_range(
2371 context.committee.to_authority_index(i).unwrap(),
2372 0,
2373 end_round,
2374 )
2375 .map(|b| b.round())
2376 .unwrap_or_default();
2377 assert_eq!(round, *expected_round, "Authority {i}");
2378 }
2379 }
2380
2381 #[tokio::test]
2382 #[should_panic(
2383 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2384 )]
2385 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2386 const CACHED_ROUNDS: Round = 1;
2388 let (mut context, _) = Context::new_for_test(4);
2389 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2390 context
2391 .protocol_config
2392 .set_consensus_gc_depth_for_testing(0);
2393
2394 let context = Arc::new(context);
2395 let store = Arc::new(MemStore::new());
2396 let mut dag_state = DagState::new(context.clone(), store.clone());
2397
2398 let mut all_blocks = Vec::new();
2403 for author in 1..=3 {
2404 for round in 1..=author {
2405 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2406 all_blocks.push(block.clone());
2407 dag_state.accept_block(block);
2408 }
2409 }
2410
2411 dag_state.add_commit(TrustedCommit::new_for_test(
2412 1 as CommitIndex,
2413 CommitDigest::MIN,
2414 0,
2415 all_blocks.last().unwrap().reference(),
2416 all_blocks
2417 .into_iter()
2418 .map(|block| block.reference())
2419 .collect::<Vec<_>>(),
2420 ));
2421
2422 dag_state.flush();
2425
2426 let end_round = 2;
2429 dag_state.get_last_cached_block_per_authority(end_round);
2430 }
2431
2432 #[tokio::test]
2433 #[should_panic(
2434 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2435 )]
2436 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
2437 const CACHED_ROUNDS: Round = 1;
2439 const GC_DEPTH: u32 = 1;
2440 let (mut context, _) = Context::new_for_test(4);
2441 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2442 context
2443 .protocol_config
2444 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2445
2446 let context = Arc::new(context);
2447 let store = Arc::new(MemStore::new());
2448 let mut dag_state = DagState::new(context.clone(), store.clone());
2449
2450 let mut dag_builder = DagBuilder::new(context.clone());
2455 dag_builder
2456 .layers(1..=1)
2457 .authorities(vec![AuthorityIndex::new_for_test(0)])
2458 .skip_block()
2459 .build();
2460 dag_builder
2461 .layers(2..=2)
2462 .authorities(vec![
2463 AuthorityIndex::new_for_test(0),
2464 AuthorityIndex::new_for_test(1),
2465 ])
2466 .skip_block()
2467 .build();
2468 dag_builder
2469 .layers(3..=3)
2470 .authorities(vec![
2471 AuthorityIndex::new_for_test(0),
2472 AuthorityIndex::new_for_test(1),
2473 AuthorityIndex::new_for_test(2),
2474 ])
2475 .skip_block()
2476 .build();
2477
2478 for block in dag_builder.all_blocks() {
2480 dag_state.accept_block(block);
2481 }
2482
2483 dag_state.add_commit(TrustedCommit::new_for_test(
2484 1 as CommitIndex,
2485 CommitDigest::MIN,
2486 0,
2487 dag_builder.leader_block(3).unwrap().reference(),
2488 vec![],
2489 ));
2490
2491 dag_state.flush();
2493
2494 dag_state.get_last_cached_block_per_authority(2);
2497 }
2498
2499 #[tokio::test]
2500 async fn test_last_quorum() {
2501 let (context, _) = Context::new_for_test(4);
2503 let context = Arc::new(context);
2504 let store = Arc::new(MemStore::new());
2505 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2506
2507 {
2509 let genesis = genesis_blocks(context.clone());
2510
2511 assert_eq!(dag_state.read().last_quorum(), genesis);
2512 }
2513
2514 {
2517 let mut dag_builder = DagBuilder::new(context.clone());
2518 dag_builder
2519 .layers(1..=4)
2520 .build()
2521 .persist_layers(dag_state.clone());
2522 let round_4_blocks: Vec<_> = dag_builder
2523 .blocks(4..=4)
2524 .into_iter()
2525 .map(|block| block.reference())
2526 .collect();
2527
2528 let last_quorum = dag_state.read().last_quorum();
2529
2530 assert_eq!(
2531 last_quorum
2532 .into_iter()
2533 .map(|block| block.reference())
2534 .collect::<Vec<_>>(),
2535 round_4_blocks
2536 );
2537 }
2538
2539 {
2542 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2543 dag_state.write().accept_block(block);
2544
2545 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2546
2547 let last_quorum = dag_state.read().last_quorum();
2548
2549 assert_eq!(last_quorum, round_4_blocks);
2550 }
2551 }
2552
2553 #[tokio::test]
2554 async fn test_last_block_for_authority() {
2555 let (context, _) = Context::new_for_test(4);
2557 let context = Arc::new(context);
2558 let store = Arc::new(MemStore::new());
2559 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2560
2561 {
2563 let genesis = genesis_blocks(context.clone());
2564 let my_genesis = genesis
2565 .into_iter()
2566 .find(|block| block.author() == context.own_index)
2567 .unwrap();
2568
2569 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2570 }
2571
2572 {
2575 let mut dag_builder = DagBuilder::new(context.clone());
2577 dag_builder
2578 .layers(1..=4)
2579 .build()
2580 .persist_layers(dag_state.clone());
2581
2582 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2584 dag_state.write().accept_block(block);
2585
2586 let block = dag_state
2587 .read()
2588 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2589 assert_eq!(block.round(), 5);
2590
2591 for (authority_index, _) in context.committee.authorities() {
2592 let block = dag_state
2593 .read()
2594 .get_last_block_for_authority(authority_index);
2595
2596 if authority_index.value() == 0 {
2597 assert_eq!(block.round(), 5);
2598 } else {
2599 assert_eq!(block.round(), 4);
2600 }
2601 }
2602 }
2603 }
2604}