1use std::{
6 cmp::max,
7 collections::{BTreeMap, BTreeSet, VecDeque},
8 ops::Bound::{Excluded, Included, Unbounded},
9 panic,
10 sync::Arc,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, error, info, trace};
18
19use crate::{
20 CommittedSubDag,
21 block::{
22 BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23 VerifiedBlock, genesis_blocks,
24 },
25 commit::{
26 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28 },
29 context::Context,
30 leader_scoring::{ReputationScores, ScoringSubdag},
31 storage::{Store, WriteBatch},
32 threshold_clock::ThresholdClock,
33};
34
35pub(crate) struct DagState {
44 context: Arc<Context>,
45
46 genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65 threshold_clock: ThresholdClock,
67
68 evicted_rounds: Vec<Round>,
73
74 highest_accepted_round: Round,
76
77 last_commit: Option<TrustedCommit>,
79
80 last_commit_round_advancement_time: Option<std::time::Instant>,
82
83 last_committed_rounds: Vec<Round>,
85
86 scoring_subdag: ScoringSubdag,
89 unscored_committed_subdags: Vec<CommittedSubDag>,
94
95 pending_commit_votes: VecDeque<CommitVote>,
98
99 blocks_to_write: Vec<VerifiedBlock>,
101 commits_to_write: Vec<TrustedCommit>,
102
103 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108 store: Arc<dyn Store>,
110
111 cached_rounds: Round,
113}
114
115impl DagState {
116 pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119 let num_authorities = context.committee.size();
120
121 let genesis = genesis_blocks(&context)
122 .into_iter()
123 .map(|block| (block.reference(), block))
124 .collect();
125
126 let threshold_clock = ThresholdClock::new(1, context.clone());
127
128 let last_commit = store
129 .read_last_commit()
130 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132 let commit_info = store
133 .read_last_commit_info()
134 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135 let (mut last_committed_rounds, commit_recovery_start_index) =
136 if let Some((commit_ref, commit_info)) = commit_info {
137 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138 (commit_info.committed_rounds, commit_ref.index + 1)
139 } else {
140 tracing::info!("Found no stored CommitInfo to recover from");
141 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142 };
143
144 let mut unscored_committed_subdags = Vec::new();
145 let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147 if let Some(last_commit) = last_commit.as_ref() {
148 store
149 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151 .iter()
152 .for_each(|commit| {
153 for block_ref in commit.blocks() {
154 last_committed_rounds[block_ref.author] =
155 max(last_committed_rounds[block_ref.author], block_ref.round);
156 }
157
158 let committed_subdag =
159 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); unscored_committed_subdags.push(committed_subdag);
161 });
162 }
163
164 tracing::info!(
165 "DagState was initialized with the following state: \
166 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167 unscored_committed_subdags.len()
168 );
169
170 if context
171 .protocol_config
172 .consensus_distributed_vote_scoring_strategy()
173 {
174 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175 }
176
177 let mut state = Self {
178 context,
179 genesis,
180 recent_blocks: BTreeMap::new(),
181 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182 threshold_clock,
183 highest_accepted_round: 0,
184 last_commit: last_commit.clone(),
185 last_commit_round_advancement_time: None,
186 last_committed_rounds: last_committed_rounds.clone(),
187 pending_commit_votes: VecDeque::new(),
188 blocks_to_write: vec![],
189 commits_to_write: vec![],
190 commit_info_to_write: vec![],
191 scoring_subdag,
192 unscored_committed_subdags,
193 store: store.clone(),
194 cached_rounds,
195 evicted_rounds: vec![0; num_authorities],
196 };
197
198 for (i, round) in last_committed_rounds.into_iter().enumerate() {
199 let authority_index = state.context.committee.to_authority_index(i).unwrap();
200 let (blocks, eviction_round) = if state.gc_enabled() {
201 let last_block = state
205 .store
206 .scan_last_blocks_by_author(authority_index, 1, None)
207 .expect("Database error");
208 let last_block_round = last_block
209 .last()
210 .map(|b| b.round())
211 .unwrap_or(GENESIS_ROUND);
212
213 let eviction_round = Self::gc_eviction_round(
214 last_block_round,
215 state.gc_round(),
216 state.cached_rounds,
217 );
218 let blocks = state
219 .store
220 .scan_blocks_by_author(authority_index, eviction_round + 1)
221 .expect("Database error");
222 (blocks, eviction_round)
223 } else {
224 let eviction_round = Self::eviction_round(round, cached_rounds);
225 let blocks = state
226 .store
227 .scan_blocks_by_author(authority_index, eviction_round + 1)
228 .expect("Database error");
229 (blocks, eviction_round)
230 };
231
232 state.evicted_rounds[authority_index] = eviction_round;
233
234 for block in &blocks {
236 state.update_block_metadata(block);
237 }
238
239 info!(
240 "Recovered blocks {}: {:?}",
241 authority_index,
242 blocks
243 .iter()
244 .map(|b| b.reference())
245 .collect::<Vec<BlockRef>>()
246 );
247 }
248
249 let recovered_scoring_metrics = state.store.scan_metrics().expect("Database error");
252 state.context.metrics.initialize_scoring_metrics(
253 recovered_scoring_metrics,
254 &state.recent_refs_by_authority,
255 state.threshold_clock_round(),
256 &state.evicted_rounds,
257 state.context.clone(),
258 );
259
260 if state.gc_enabled() {
261 if let Some(last_commit) = last_commit {
262 let mut index = last_commit.index();
263 let gc_round = state.gc_round();
264 info!(
265 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
266 index, gc_round
267 );
268
269 loop {
270 let commits = store
271 .scan_commits((index..=index).into())
272 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
273 let Some(commit) = commits.first() else {
274 info!(
275 "Recovering finished up to index {index}, no more commits to recover"
276 );
277 break;
278 };
279
280 if gc_round > 0 && commit.leader().round <= gc_round {
283 info!(
284 "Recovering finished, reached commit leader round {} <= gc_round {}",
285 commit.leader().round,
286 gc_round
287 );
288 break;
289 }
290
291 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
292 debug!(
293 "Setting block {:?} as committed based on commit {:?}",
294 block_ref,
295 commit.index()
296 );
297 assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
298 });
299
300 index = index.saturating_sub(1);
302 if index == 0 {
303 break;
304 }
305 }
306 }
307 }
308
309 state
310 }
311
312 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
314 assert_ne!(
315 block.round(),
316 0,
317 "Genesis block should not be accepted into DAG."
318 );
319
320 let block_ref = block.reference();
321 if self.contains_block(&block_ref) {
322 return;
323 }
324
325 let now = self.context.clock.timestamp_utc_ms();
326 if block.timestamp_ms() > now {
327 if self
328 .context
329 .protocol_config
330 .consensus_median_timestamp_with_checkpoint_enforcement()
331 {
332 trace!(
333 "Block {:?} with timestamp {} is greater than local timestamp {}.",
334 block,
335 block.timestamp_ms(),
336 now,
337 );
338 } else {
339 panic!(
340 "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
341 block,
342 block.timestamp_ms(),
343 now,
344 );
345 }
346 }
347 let hostname = &self.context.committee.authority(block_ref.author).hostname;
348 self.context
349 .metrics
350 .node_metrics
351 .accepted_block_time_drift_ms
352 .with_label_values(&[hostname])
353 .inc_by(block.timestamp_ms().saturating_sub(now));
354
355 if block_ref.author == self.context.own_index {
358 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
359 assert!(
360 existing_blocks.is_empty(),
361 "Block Rejected! Attempted to add block {block:#?} to own slot where \
362 block(s) {existing_blocks:#?} already exists."
363 );
364 }
365 self.update_block_metadata(&block);
366 self.blocks_to_write.push(block);
367 let source = if self.context.own_index == block_ref.author {
368 "own"
369 } else {
370 "others"
371 };
372 self.context
373 .metrics
374 .node_metrics
375 .accepted_blocks
376 .with_label_values(&[source])
377 .inc();
378 }
379
380 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
382 let block_ref = block.reference();
383 self.recent_blocks
384 .insert(block_ref, BlockInfo::new(block.clone()));
385 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
386 self.threshold_clock.add_block(block_ref);
387 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
388 self.context
389 .metrics
390 .node_metrics
391 .highest_accepted_round
392 .set(self.highest_accepted_round as i64);
393
394 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
395 .last()
396 .map(|block_ref| block_ref.round)
397 .expect("There should be by now at least one block ref");
398 let hostname = &self.context.committee.authority(block_ref.author).hostname;
399 self.context
400 .metrics
401 .node_metrics
402 .highest_accepted_authority_round
403 .with_label_values(&[hostname])
404 .set(highest_accepted_round_for_author as i64);
405 }
406
407 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
409 debug!(
410 "Accepting blocks: {}",
411 blocks.iter().map(|b| b.reference().to_string()).join(",")
412 );
413 for block in blocks {
414 self.accept_block(block);
415 }
416 }
417
418 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
421 self.get_blocks(&[*reference])
422 .pop()
423 .expect("Exactly one element should be returned")
424 }
425
426 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
430 let mut blocks = vec![None; block_refs.len()];
431 let mut missing = Vec::new();
432
433 for (index, block_ref) in block_refs.iter().enumerate() {
434 if block_ref.round == GENESIS_ROUND {
435 if let Some(block) = self.genesis.get(block_ref) {
437 blocks[index] = Some(block.clone());
438 }
439 continue;
440 }
441 if let Some(block_info) = self.recent_blocks.get(block_ref) {
442 blocks[index] = Some(block_info.block.clone());
443 continue;
444 }
445 missing.push((index, block_ref));
446 }
447
448 if missing.is_empty() {
449 return blocks;
450 }
451
452 let missing_refs = missing
453 .iter()
454 .map(|(_, block_ref)| **block_ref)
455 .collect::<Vec<_>>();
456 let store_results = self
457 .store
458 .read_blocks(&missing_refs)
459 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
460 self.context
461 .metrics
462 .node_metrics
463 .dag_state_store_read_count
464 .with_label_values(&["get_blocks"])
465 .inc();
466
467 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
468 blocks[index] = result;
469 }
470
471 blocks
472 }
473
474 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
478 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
479 if !block_info.committed {
480 block_info.committed = true;
481 return true;
482 }
483 false
484 } else {
485 panic!("Block {block_ref:?} not found in cache to set as committed.");
486 }
487 }
488
489 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
490 self.recent_blocks
491 .get(block_ref)
492 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
493 .committed
494 }
495
496 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
500 let mut blocks = vec![];
505 for (_block_ref, block_info) in self.recent_blocks.range((
506 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
507 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
508 )) {
509 blocks.push(block_info.block.clone())
510 }
511 blocks
512 }
513
514 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
518 if round <= self.last_commit_round() {
519 panic!("Round {round} have committed blocks!");
520 }
521
522 let mut blocks = vec![];
523 for (_block_ref, block_info) in self.recent_blocks.range((
524 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
525 Excluded(BlockRef::new(
526 round + 1,
527 AuthorityIndex::ZERO,
528 BlockDigest::MIN,
529 )),
530 )) {
531 blocks.push(block_info.block.clone())
532 }
533 blocks
534 }
535
536 pub(crate) fn ancestors_at_round(
538 &self,
539 later_block: &VerifiedBlock,
540 earlier_round: Round,
541 ) -> Vec<VerifiedBlock> {
542 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
544 while !linked.is_empty() {
545 let round = linked.last().unwrap().round;
546 if round <= earlier_round {
548 break;
549 }
550 let block_ref = linked.pop_last().unwrap();
551 let Some(block) = self.get_block(&block_ref) else {
552 panic!("Block {block_ref:?} should exist in DAG!");
553 };
554 linked.extend(block.ancestors().iter().cloned());
555 }
556 linked
557 .range((
558 Included(BlockRef::new(
559 earlier_round,
560 AuthorityIndex::ZERO,
561 BlockDigest::MIN,
562 )),
563 Unbounded,
564 ))
565 .map(|r| {
566 self.get_block(r)
567 .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
568 .clone()
569 })
570 .collect()
571 }
572
573 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
576 self.get_last_block_for_authority(self.context.own_index)
577 }
578
579 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
583 if let Some(last) = self.recent_refs_by_authority[authority].last() {
584 return self
585 .recent_blocks
586 .get(last)
587 .expect("Block should be found in recent blocks")
588 .block
589 .clone();
590 }
591
592 let (_, genesis_block) = self
594 .genesis
595 .iter()
596 .find(|(block_ref, _)| block_ref.author == authority)
597 .expect("Genesis should be found for authority {authority_index}");
598 genesis_block.clone()
599 }
600
601 pub(crate) fn get_cached_blocks(
607 &self,
608 authority: AuthorityIndex,
609 start: Round,
610 ) -> Vec<VerifiedBlock> {
611 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
612 }
613
614 pub(crate) fn get_cached_blocks_in_range(
617 &self,
618 authority: AuthorityIndex,
619 start_round: Round,
620 end_round: Round,
621 limit: usize,
622 ) -> Vec<VerifiedBlock> {
623 if start_round >= end_round || limit == 0 {
624 return vec![];
625 }
626
627 let mut blocks = vec![];
628 for block_ref in self.recent_refs_by_authority[authority].range((
629 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
630 Excluded(BlockRef::new(
631 end_round,
632 AuthorityIndex::MIN,
633 BlockDigest::MIN,
634 )),
635 )) {
636 let block_info = self
637 .recent_blocks
638 .get(block_ref)
639 .expect("Block should exist in recent blocks");
640 blocks.push(block_info.block.clone());
641 if blocks.len() >= limit {
642 break;
643 }
644 }
645 blocks
646 }
647
648 pub(crate) fn get_last_cached_block_in_range(
651 &self,
652 authority: AuthorityIndex,
653 start_round: Round,
654 end_round: Round,
655 ) -> Option<VerifiedBlock> {
656 if start_round >= end_round {
657 return None;
658 }
659
660 let block_ref = self.recent_refs_by_authority[authority]
661 .range((
662 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
663 Excluded(BlockRef::new(
664 end_round,
665 AuthorityIndex::MIN,
666 BlockDigest::MIN,
667 )),
668 ))
669 .last()?;
670
671 self.recent_blocks
672 .get(block_ref)
673 .map(|block_info| block_info.block.clone())
674 }
675
676 pub(crate) fn get_last_cached_block_per_authority(
685 &self,
686 end_round: Round,
687 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
688 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
690 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
691
692 if end_round == GENESIS_ROUND {
693 panic!(
694 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
695 );
696 }
697
698 if end_round == GENESIS_ROUND + 1 {
699 return blocks.into_iter().map(|b| (b, vec![])).collect();
700 }
701
702 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
703 let authority_index = self
704 .context
705 .committee
706 .to_authority_index(authority_index)
707 .unwrap();
708
709 let last_evicted_round = self.evicted_rounds[authority_index];
710 if end_round.saturating_sub(1) <= last_evicted_round {
711 panic!(
712 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
713 );
714 }
715
716 let block_ref_iter = block_refs
717 .range((
718 Included(BlockRef::new(
719 last_evicted_round + 1,
720 authority_index,
721 BlockDigest::MIN,
722 )),
723 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
724 ))
725 .rev();
726
727 let mut last_round = 0;
728 for block_ref in block_ref_iter {
729 if last_round == 0 {
730 last_round = block_ref.round;
731 let block_info = self
732 .recent_blocks
733 .get(block_ref)
734 .expect("Block should exist in recent blocks");
735 blocks[authority_index] = block_info.block.clone();
736 continue;
737 }
738 if block_ref.round < last_round {
739 break;
740 }
741 equivocating_blocks[authority_index].push(*block_ref);
742 }
743 }
744
745 blocks.into_iter().zip(equivocating_blocks).collect()
746 }
747
748 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
752 if slot.round == GENESIS_ROUND {
754 return true;
755 }
756
757 let eviction_round = self.evicted_rounds[slot.authority];
758 if slot.round <= eviction_round {
759 panic!(
760 "{}",
761 format!(
762 "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
763 if self.gc_enabled() { " gc " } else { " " }
764 )
765 );
766 }
767
768 let mut result = self.recent_refs_by_authority[slot.authority].range((
769 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
770 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
771 ));
772 result.next().is_some()
773 }
774
775 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
779 let mut exist = vec![false; block_refs.len()];
780 let mut missing = Vec::new();
781
782 for (index, block_ref) in block_refs.into_iter().enumerate() {
783 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
784 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
785 exist[index] = true;
786 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
787 {
788 exist[index] = false;
793 } else {
794 missing.push((index, block_ref));
795 }
796 }
797
798 if missing.is_empty() {
799 return exist;
800 }
801
802 let missing_refs = missing
803 .iter()
804 .map(|(_, block_ref)| *block_ref)
805 .collect::<Vec<_>>();
806 let store_results = self
807 .store
808 .contains_blocks(&missing_refs)
809 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
810 self.context
811 .metrics
812 .node_metrics
813 .dag_state_store_read_count
814 .with_label_values(&["contains_blocks"])
815 .inc();
816
817 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
818 exist[index] = result;
819 }
820
821 exist
822 }
823
824 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
825 let blocks = self.contains_blocks(vec![*block_ref]);
826 blocks.first().cloned().unwrap()
827 }
828
829 pub(crate) fn threshold_clock_round(&self) -> Round {
830 self.threshold_clock.get_round()
831 }
832
833 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
834 self.threshold_clock.get_quorum_ts()
835 }
836
837 pub(crate) fn highest_accepted_round(&self) -> Round {
838 self.highest_accepted_round
839 }
840
841 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
844 let time_diff = if let Some(last_commit) = &self.last_commit {
845 if commit.index() <= last_commit.index() {
846 error!(
847 "New commit index {} <= last commit index {}!",
848 commit.index(),
849 last_commit.index()
850 );
851 return;
852 }
853 assert_eq!(commit.index(), last_commit.index() + 1);
854
855 if commit.timestamp_ms() < last_commit.timestamp_ms() {
856 panic!(
857 "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
858 );
859 }
860 commit
861 .timestamp_ms()
862 .saturating_sub(last_commit.timestamp_ms())
863 } else {
864 assert_eq!(commit.index(), 1);
865 0
866 };
867
868 self.context
869 .metrics
870 .node_metrics
871 .last_commit_time_diff
872 .observe(time_diff as f64);
873
874 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
875 previous_commit.round() < commit.round()
876 } else {
877 true
878 };
879
880 self.last_commit = Some(commit.clone());
881
882 if commit_round_advanced {
883 let now = std::time::Instant::now();
884 if let Some(previous_time) = self.last_commit_round_advancement_time {
885 self.context
886 .metrics
887 .node_metrics
888 .commit_round_advancement_interval
889 .observe(now.duration_since(previous_time).as_secs_f64())
890 }
891 self.last_commit_round_advancement_time = Some(now);
892 }
893
894 for block_ref in commit.blocks().iter() {
895 self.last_committed_rounds[block_ref.author] = max(
896 self.last_committed_rounds[block_ref.author],
897 block_ref.round,
898 );
899 }
900
901 for (i, round) in self.last_committed_rounds.iter().enumerate() {
902 let index = self.context.committee.to_authority_index(i).unwrap();
903 let hostname = &self.context.committee.authority(index).hostname;
904 self.context
905 .metrics
906 .node_metrics
907 .last_committed_authority_round
908 .with_label_values(&[hostname])
909 .set((*round).into());
910 }
911
912 self.pending_commit_votes.push_back(commit.reference());
913 self.commits_to_write.push(commit);
914 }
915
916 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
917 assert!(self.unscored_committed_subdags.is_empty());
919
920 assert!(self.scoring_subdag.is_empty());
924
925 let commit_info = CommitInfo {
926 committed_rounds: self.last_committed_rounds.clone(),
927 reputation_scores,
928 };
929 let last_commit = self
930 .last_commit
931 .as_ref()
932 .expect("Last commit should already be set.");
933 self.commit_info_to_write
934 .push((last_commit.reference(), commit_info));
935 }
936
937 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
938 let mut votes = Vec::new();
939 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
940 votes.push(self.pending_commit_votes.pop_front().unwrap());
941 }
942 votes
943 }
944
945 pub(crate) fn last_commit_index(&self) -> CommitIndex {
947 match &self.last_commit {
948 Some(commit) => commit.index(),
949 None => 0,
950 }
951 }
952
953 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
955 match &self.last_commit {
956 Some(commit) => commit.digest(),
957 None => CommitDigest::MIN,
958 }
959 }
960
961 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
963 match &self.last_commit {
964 Some(commit) => commit.timestamp_ms(),
965 None => 0,
966 }
967 }
968
969 pub(crate) fn last_commit_leader(&self) -> Slot {
971 match &self.last_commit {
972 Some(commit) => commit.leader().into(),
973 None => self
974 .genesis
975 .iter()
976 .next()
977 .map(|(genesis_ref, _)| *genesis_ref)
978 .expect("Genesis blocks should always be available.")
979 .into(),
980 }
981 }
982
983 pub(crate) fn last_commit_round(&self) -> Round {
986 match &self.last_commit {
987 Some(commit) => commit.leader().round,
988 None => 0,
989 }
990 }
991
992 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
994 self.last_committed_rounds.clone()
995 }
996
997 pub(crate) fn gc_round(&self) -> Round {
1004 self.calculate_gc_round(self.last_commit_round())
1005 }
1006
1007 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1008 let gc_depth = self.context.protocol_config.gc_depth();
1009 if gc_depth > 0 {
1010 commit_round.saturating_sub(gc_depth)
1012 } else {
1013 GENESIS_ROUND
1016 }
1017 }
1018
1019 pub(crate) fn gc_enabled(&self) -> bool {
1020 self.context.protocol_config.gc_depth() > 0
1021 }
1022
1023 pub(crate) fn flush(&mut self) {
1026 let _s = self
1027 .context
1028 .metrics
1029 .node_metrics
1030 .scope_processing_time
1031 .with_label_values(&["DagState::flush"])
1032 .start_timer();
1033 let blocks = std::mem::take(&mut self.blocks_to_write);
1035 let commits = std::mem::take(&mut self.commits_to_write);
1036 let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
1037
1038 if blocks.is_empty() && commits.is_empty() {
1039 return;
1040 }
1041 debug!(
1042 "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
1043 blocks.len(),
1044 blocks.iter().map(|b| b.reference().to_string()).join(","),
1045 commits.len(),
1046 commits.iter().map(|c| c.reference().to_string()).join(","),
1047 commit_info_to_write.len(),
1048 commit_info_to_write
1049 .iter()
1050 .map(|(commit_ref, _)| commit_ref.to_string())
1051 .join(","),
1052 );
1053
1054 let mut metrics_to_write = vec![];
1056 let threshold_clock_round = self.threshold_clock_round();
1057 for (authority_index, authority) in self.context.committee.authorities() {
1058 let last_eviction_round = self.evicted_rounds[authority_index];
1059 let current_eviction_round = self.calculate_authority_eviction_round(authority_index);
1060 let metrics_to_write_from_authority =
1061 self.context.metrics.update_scoring_metrics_on_eviction(
1062 authority_index,
1063 authority.hostname.as_str(),
1064 &self.recent_refs_by_authority[authority_index],
1065 current_eviction_round,
1066 last_eviction_round,
1067 threshold_clock_round,
1068 );
1069 if let Some(metrics_to_write_from_authority) = metrics_to_write_from_authority {
1070 metrics_to_write.push((authority_index, metrics_to_write_from_authority));
1071 }
1072 }
1073
1074 self.store
1075 .write(WriteBatch::new(
1076 blocks,
1077 commits,
1078 commit_info_to_write,
1079 metrics_to_write,
1080 ))
1081 .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
1082 self.context
1083 .metrics
1084 .node_metrics
1085 .dag_state_store_write_count
1086 .inc();
1087
1088 for (authority_index, _) in self.context.committee.authorities() {
1092 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1093 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1094 let block_round = block_ref.round;
1095 if block_round <= eviction_round {
1096 self.recent_blocks.remove(block_ref);
1097 self.recent_refs_by_authority[authority_index].pop_first();
1098 } else {
1099 break;
1100 }
1101 }
1102 self.evicted_rounds[authority_index] = eviction_round;
1103 }
1104
1105 let metrics = &self.context.metrics.node_metrics;
1106 metrics
1107 .dag_state_recent_blocks
1108 .set(self.recent_blocks.len() as i64);
1109 metrics.dag_state_recent_refs.set(
1110 self.recent_refs_by_authority
1111 .iter()
1112 .map(BTreeSet::len)
1113 .sum::<usize>() as i64,
1114 );
1115 }
1116
1117 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1118 self.store
1119 .read_last_commit_info()
1120 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1121 }
1122
1123 pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1125 self.unscored_committed_subdags.len() as u64
1126 }
1127
1128 #[cfg(test)]
1129 pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1130 self.unscored_committed_subdags.clone()
1131 }
1132
1133 pub(crate) fn add_unscored_committed_subdags(
1134 &mut self,
1135 committed_subdags: Vec<CommittedSubDag>,
1136 ) {
1137 self.unscored_committed_subdags.extend(committed_subdags);
1138 }
1139
1140 pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1141 std::mem::take(&mut self.unscored_committed_subdags)
1142 }
1143
1144 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1145 self.scoring_subdag.add_subdags(scoring_subdags);
1146 }
1147
1148 pub(crate) fn clear_scoring_subdag(&mut self) {
1149 self.scoring_subdag.clear();
1150 }
1151
1152 pub(crate) fn scoring_subdags_count(&self) -> usize {
1153 self.scoring_subdag.scored_subdags_count()
1154 }
1155
1156 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1157 self.scoring_subdag.is_empty()
1158 }
1159
1160 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1161 self.scoring_subdag.calculate_distributed_vote_scores()
1162 }
1163
1164 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1165 self.scoring_subdag
1166 .commit_range
1167 .as_ref()
1168 .expect("commit range should exist for scoring subdag")
1169 .end()
1170 }
1171
1172 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1177 if self.gc_enabled() {
1178 let last_round = self.recent_refs_by_authority[authority_index]
1179 .last()
1180 .map(|block_ref| block_ref.round)
1181 .unwrap_or(GENESIS_ROUND);
1182
1183 Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1184 } else {
1185 let commit_round = self.last_committed_rounds[authority_index];
1186 Self::eviction_round(commit_round, self.cached_rounds)
1187 }
1188 }
1189
1190 fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1193 commit_round.saturating_sub(cached_rounds)
1194 }
1195
1196 fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1201 gc_round.min(last_round.saturating_sub(cached_rounds))
1202 }
1203
1204 #[cfg(test)]
1207 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1208 for round in
1212 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1213 {
1214 if round == GENESIS_ROUND {
1215 return self.genesis_blocks();
1216 }
1217 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1218 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1219
1220 let blocks = self.get_uncommitted_blocks_at_round(round);
1223 for block in &blocks {
1224 if quorum.add(block.author(), &self.context.committee) {
1225 return blocks;
1226 }
1227 }
1228 }
1229
1230 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1231 }
1232
1233 #[cfg(test)]
1234 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1235 self.genesis.values().cloned().collect()
1236 }
1237
1238 #[cfg(test)]
1239 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1240 self.last_commit = Some(commit);
1241 }
1242}
1243
1244struct BlockInfo {
1245 block: VerifiedBlock,
1246 committed: bool,
1248}
1249
1250impl BlockInfo {
1251 fn new(block: VerifiedBlock) -> Self {
1252 Self {
1253 block,
1254 committed: false,
1255 }
1256 }
1257}
1258
1259#[cfg(test)]
1260mod test {
1261 use std::vec;
1262
1263 use parking_lot::RwLock;
1264 use rstest::rstest;
1265
1266 use super::*;
1267 use crate::{
1268 block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1269 storage::{WriteBatch, mem_store::MemStore},
1270 test_dag_builder::DagBuilder,
1271 test_dag_parser::parse_dag,
1272 };
1273
1274 #[tokio::test]
1275 async fn test_get_blocks() {
1276 let (context, _) = Context::new_for_test(4);
1277 let context = Arc::new(context);
1278 let store = Arc::new(MemStore::new());
1279 let mut dag_state = DagState::new(context.clone(), store.clone());
1280 let own_index = AuthorityIndex::new_for_test(0);
1281
1282 let num_rounds: u32 = 10;
1284 let non_existent_round: u32 = 100;
1285 let num_authorities: u32 = 3;
1286 let num_blocks_per_slot: usize = 3;
1287 let mut blocks = BTreeMap::new();
1288 for round in 1..=num_rounds {
1289 for author in 0..num_authorities {
1290 let base_ts = round as BlockTimestampMs * 1000;
1292 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1293 let block = VerifiedBlock::new_for_test(
1294 TestBlock::new(round, author)
1295 .set_timestamp_ms(timestamp)
1296 .build(),
1297 );
1298 dag_state.accept_block(block.clone());
1299 blocks.insert(block.reference(), block);
1300
1301 if AuthorityIndex::new_for_test(author) == own_index {
1303 break;
1304 }
1305 }
1306 }
1307 }
1308
1309 for (r, block) in &blocks {
1311 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1312 }
1313
1314 let last_ref = blocks.keys().last().unwrap();
1316 assert!(
1317 dag_state
1318 .get_block(&BlockRef::new(
1319 last_ref.round,
1320 last_ref.author,
1321 BlockDigest::MIN
1322 ))
1323 .is_none()
1324 );
1325
1326 for round in 1..=num_rounds {
1328 for author in 0..num_authorities {
1329 let slot = Slot::new(
1330 round,
1331 context
1332 .committee
1333 .to_authority_index(author as usize)
1334 .unwrap(),
1335 );
1336 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1337
1338 if AuthorityIndex::new_for_test(author) == own_index {
1340 assert_eq!(blocks.len(), 1);
1341 } else {
1342 assert_eq!(blocks.len(), num_blocks_per_slot);
1343 }
1344
1345 for b in blocks {
1346 assert_eq!(b.round(), round);
1347 assert_eq!(
1348 b.author(),
1349 context
1350 .committee
1351 .to_authority_index(author as usize)
1352 .unwrap()
1353 );
1354 }
1355 }
1356 }
1357
1358 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1360 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1361
1362 for round in 1..=num_rounds {
1364 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1365 assert_eq!(
1368 blocks.len(),
1369 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1370 );
1371 for b in blocks {
1372 assert_eq!(b.round(), round);
1373 }
1374 }
1375
1376 assert!(
1378 dag_state
1379 .get_uncommitted_blocks_at_round(non_existent_round)
1380 .is_empty()
1381 );
1382 }
1383
1384 #[tokio::test]
1385 async fn test_ancestors_at_uncommitted_round() {
1386 let (context, _) = Context::new_for_test(4);
1388 let context = Arc::new(context);
1389 let store = Arc::new(MemStore::new());
1390 let mut dag_state = DagState::new(context.clone(), store.clone());
1391
1392 let round_10_refs: Vec<_> = (0..4)
1396 .map(|a| {
1397 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1398 .reference()
1399 })
1400 .collect();
1401
1402 let round_11 = vec![
1404 VerifiedBlock::new_for_test(
1406 TestBlock::new(11, 0)
1407 .set_timestamp_ms(1100)
1408 .set_ancestors(round_10_refs.clone())
1409 .build(),
1410 ),
1411 VerifiedBlock::new_for_test(
1414 TestBlock::new(11, 1)
1415 .set_timestamp_ms(1110)
1416 .set_ancestors(round_10_refs.clone())
1417 .build(),
1418 ),
1419 VerifiedBlock::new_for_test(
1421 TestBlock::new(11, 1)
1422 .set_timestamp_ms(1111)
1423 .set_ancestors(round_10_refs.clone())
1424 .build(),
1425 ),
1426 VerifiedBlock::new_for_test(
1428 TestBlock::new(11, 1)
1429 .set_timestamp_ms(1112)
1430 .set_ancestors(round_10_refs.clone())
1431 .build(),
1432 ),
1433 VerifiedBlock::new_for_test(
1435 TestBlock::new(11, 2)
1436 .set_timestamp_ms(1120)
1437 .set_ancestors(round_10_refs.clone())
1438 .build(),
1439 ),
1440 VerifiedBlock::new_for_test(
1442 TestBlock::new(11, 3)
1443 .set_timestamp_ms(1130)
1444 .set_ancestors(round_10_refs.clone())
1445 .build(),
1446 ),
1447 ];
1448
1449 let ancestors_for_round_12 = vec![
1451 round_11[0].reference(),
1452 round_11[1].reference(),
1453 round_11[5].reference(),
1454 ];
1455 let round_12 = vec![
1456 VerifiedBlock::new_for_test(
1457 TestBlock::new(12, 0)
1458 .set_timestamp_ms(1200)
1459 .set_ancestors(ancestors_for_round_12.clone())
1460 .build(),
1461 ),
1462 VerifiedBlock::new_for_test(
1463 TestBlock::new(12, 2)
1464 .set_timestamp_ms(1220)
1465 .set_ancestors(ancestors_for_round_12.clone())
1466 .build(),
1467 ),
1468 VerifiedBlock::new_for_test(
1469 TestBlock::new(12, 3)
1470 .set_timestamp_ms(1230)
1471 .set_ancestors(ancestors_for_round_12.clone())
1472 .build(),
1473 ),
1474 ];
1475
1476 let ancestors_for_round_13 = vec![
1478 round_12[0].reference(),
1479 round_12[1].reference(),
1480 round_12[2].reference(),
1481 round_11[2].reference(),
1482 ];
1483 let round_13 = vec![
1484 VerifiedBlock::new_for_test(
1485 TestBlock::new(12, 1)
1486 .set_timestamp_ms(1300)
1487 .set_ancestors(ancestors_for_round_13.clone())
1488 .build(),
1489 ),
1490 VerifiedBlock::new_for_test(
1491 TestBlock::new(12, 2)
1492 .set_timestamp_ms(1320)
1493 .set_ancestors(ancestors_for_round_13.clone())
1494 .build(),
1495 ),
1496 VerifiedBlock::new_for_test(
1497 TestBlock::new(12, 3)
1498 .set_timestamp_ms(1330)
1499 .set_ancestors(ancestors_for_round_13.clone())
1500 .build(),
1501 ),
1502 ];
1503
1504 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1506 let anchor = VerifiedBlock::new_for_test(
1507 TestBlock::new(14, 1)
1508 .set_timestamp_ms(1410)
1509 .set_ancestors(ancestors_for_round_14)
1510 .build(),
1511 );
1512
1513 for b in round_11
1515 .iter()
1516 .chain(round_12.iter())
1517 .chain(round_13.iter())
1518 .chain([anchor.clone()].iter())
1519 {
1520 dag_state.accept_block(b.clone());
1521 }
1522
1523 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1525 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1526 ancestors_refs.sort();
1527 let mut expected_refs = vec![
1528 round_11[0].reference(),
1529 round_11[1].reference(),
1530 round_11[2].reference(),
1531 round_11[5].reference(),
1532 ];
1533 expected_refs.sort(); assert_eq!(
1536 ancestors_refs, expected_refs,
1537 "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1538 );
1539 }
1540
1541 #[tokio::test]
1542 async fn test_contains_blocks_in_cache_or_store() {
1543 const CACHED_ROUNDS: Round = 2;
1545
1546 let (mut context, _) = Context::new_for_test(4);
1547 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1548
1549 let context = Arc::new(context);
1550 let store = Arc::new(MemStore::new());
1551 let mut dag_state = DagState::new(context.clone(), store.clone());
1552
1553 let num_rounds: u32 = 10;
1555 let num_authorities: u32 = 4;
1556 let mut blocks = Vec::new();
1557
1558 for round in 1..=num_rounds {
1559 for author in 0..num_authorities {
1560 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1561 blocks.push(block);
1562 }
1563 }
1564
1565 blocks.clone().into_iter().for_each(|block| {
1568 if block.round() <= 4 {
1569 store
1570 .write(WriteBatch::default().blocks(vec![block]))
1571 .unwrap();
1572 } else {
1573 dag_state.accept_blocks(vec![block]);
1574 }
1575 });
1576
1577 let mut block_refs = blocks
1581 .iter()
1582 .map(|block| block.reference())
1583 .collect::<Vec<_>>();
1584 let result = dag_state.contains_blocks(block_refs.clone());
1585
1586 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1588 assert_eq!(result, expected);
1589
1590 block_refs.insert(
1592 3,
1593 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1594 );
1595 let result = dag_state.contains_blocks(block_refs.clone());
1596
1597 expected.insert(3, false);
1599 assert_eq!(result, expected.clone());
1600 }
1601
1602 #[tokio::test]
1603 async fn test_contains_cached_block_at_slot() {
1604 const CACHED_ROUNDS: Round = 2;
1606
1607 let num_authorities: u32 = 4;
1608 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1609 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1610
1611 let context = Arc::new(context);
1612 let store = Arc::new(MemStore::new());
1613 let mut dag_state = DagState::new(context.clone(), store.clone());
1614
1615 let num_rounds: u32 = 10;
1617 let mut blocks = Vec::new();
1618
1619 for round in 1..=num_rounds {
1620 for author in 0..num_authorities {
1621 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1622 blocks.push(block.clone());
1623 dag_state.accept_block(block);
1624 }
1625 }
1626
1627 for (author, _) in context.committee.authorities() {
1629 assert!(
1630 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1631 "Genesis should always be found"
1632 );
1633 }
1634
1635 let mut block_refs = blocks
1639 .iter()
1640 .map(|block| block.reference())
1641 .collect::<Vec<_>>();
1642
1643 for block_ref in block_refs.clone() {
1644 let slot = block_ref.into();
1645 let found = dag_state.contains_cached_block_at_slot(slot);
1646 assert!(found, "A block should be found at slot {slot}");
1647 }
1648
1649 block_refs.insert(
1652 3,
1653 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1654 );
1655 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1656 expected.insert(3, false);
1657
1658 for block_ref in block_refs {
1660 let slot = block_ref.into();
1661 let found = dag_state.contains_cached_block_at_slot(slot);
1662
1663 assert_eq!(expected.remove(0), found);
1664 }
1665 }
1666
1667 #[tokio::test]
1668 #[should_panic(
1669 expected = "Attempted to check for slot S8[0] that is <= the last gc evicted round 8"
1670 )]
1671 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1672 const CACHED_ROUNDS: Round = 2;
1674 const GC_DEPTH: u32 = 1;
1675 let (mut context, _) = Context::new_for_test(4);
1676 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1677 context
1678 .protocol_config
1679 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1680
1681 let context = Arc::new(context);
1682 let store = Arc::new(MemStore::new());
1683 let mut dag_state = DagState::new(context.clone(), store.clone());
1684
1685 let mut blocks = Vec::new();
1687 for round in 1..=10 {
1688 let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1689 blocks.push(block.clone());
1690 dag_state.accept_block(block);
1691 }
1692
1693 dag_state.add_commit(TrustedCommit::new_for_test(
1695 1 as CommitIndex,
1696 CommitDigest::MIN,
1697 0,
1698 blocks.last().unwrap().reference(),
1699 blocks
1700 .into_iter()
1701 .map(|block| block.reference())
1702 .collect::<Vec<_>>(),
1703 ));
1704
1705 dag_state.flush();
1706
1707 let _ =
1711 dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1712 }
1713
1714 #[tokio::test]
1715 #[should_panic(
1716 expected = "Attempted to check for slot S3[1] that is <= the last gc evicted round 3"
1717 )]
1718 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1719 const GC_DEPTH: u32 = 2;
1723 const CACHED_ROUNDS: Round = 3;
1725
1726 let (mut context, _) = Context::new_for_test(4);
1727 context
1728 .protocol_config
1729 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1730 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1731
1732 let context = Arc::new(context);
1733 let store = Arc::new(MemStore::new());
1734 let mut dag_state = DagState::new(context.clone(), store.clone());
1735
1736 let mut dag_builder = DagBuilder::new(context.clone());
1739 dag_builder.layers(1..=3).build();
1740 dag_builder
1741 .layers(4..=6)
1742 .authorities(vec![AuthorityIndex::new_for_test(0)])
1743 .skip_block()
1744 .build();
1745
1746 dag_builder
1748 .all_blocks()
1749 .into_iter()
1750 .for_each(|block| dag_state.accept_block(block));
1751
1752 dag_state.add_commit(TrustedCommit::new_for_test(
1754 1 as CommitIndex,
1755 CommitDigest::MIN,
1756 0,
1757 dag_builder.leader_block(5).unwrap().reference(),
1758 vec![],
1759 ));
1760
1761 dag_state.flush();
1762
1763 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1765
1766 for authority_index in 1..=3 {
1771 for round in 4..=6 {
1772 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1773 round,
1774 AuthorityIndex::new_for_test(authority_index)
1775 )));
1776 }
1777 }
1778
1779 for round in 1..=3 {
1780 assert!(
1781 dag_state.contains_cached_block_at_slot(Slot::new(
1782 round,
1783 AuthorityIndex::new_for_test(0)
1784 ))
1785 );
1786 }
1787
1788 let _ =
1791 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1792 }
1793
1794 #[tokio::test]
1795 async fn test_get_blocks_in_cache_or_store() {
1796 let (context, _) = Context::new_for_test(4);
1797 let context = Arc::new(context);
1798 let store = Arc::new(MemStore::new());
1799 let mut dag_state = DagState::new(context.clone(), store.clone());
1800
1801 let num_rounds: u32 = 10;
1803 let num_authorities: u32 = 4;
1804 let mut blocks = Vec::new();
1805
1806 for round in 1..=num_rounds {
1807 for author in 0..num_authorities {
1808 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1809 blocks.push(block);
1810 }
1811 }
1812
1813 blocks.clone().into_iter().for_each(|block| {
1816 if block.round() <= 4 {
1817 store
1818 .write(WriteBatch::default().blocks(vec![block]))
1819 .unwrap();
1820 } else {
1821 dag_state.accept_blocks(vec![block]);
1822 }
1823 });
1824
1825 let mut block_refs = blocks
1829 .iter()
1830 .map(|block| block.reference())
1831 .collect::<Vec<_>>();
1832 let result = dag_state.get_blocks(&block_refs);
1833
1834 let mut expected = blocks
1835 .into_iter()
1836 .map(Some)
1837 .collect::<Vec<Option<VerifiedBlock>>>();
1838
1839 assert_eq!(result, expected.clone());
1841
1842 block_refs.insert(
1844 3,
1845 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1846 );
1847 let result = dag_state.get_blocks(&block_refs);
1848
1849 expected.insert(3, None);
1851 assert_eq!(result, expected);
1852 }
1853
1854 #[rstest]
1856 #[tokio::test]
1857 async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1858 telemetry_subscribers::init_for_testing();
1859 let num_authorities: u32 = 4;
1860 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1861 context
1862 .protocol_config
1863 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1864
1865 if gc_depth > 0 {
1866 context
1867 .protocol_config
1868 .set_consensus_gc_depth_for_testing(gc_depth);
1869 }
1870
1871 let context = Arc::new(context);
1872 let store = Arc::new(MemStore::new());
1873 let mut dag_state = DagState::new(context.clone(), store.clone());
1874
1875 let num_rounds: u32 = 10;
1877 let mut dag_builder = DagBuilder::new(context.clone());
1878 dag_builder.layers(1..=num_rounds).build();
1879 let mut commits = vec![];
1880
1881 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1882 commits.push(commit);
1883 }
1884
1885 let temp_commits = commits.split_off(5);
1887 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1888 for commit in commits.clone() {
1889 dag_state.add_commit(commit);
1890 }
1891
1892 dag_state.flush();
1894
1895 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1897 for commit in temp_commits.clone() {
1898 dag_state.add_commit(commit);
1899 }
1900
1901 let all_blocks = dag_builder.blocks(6..=num_rounds);
1903 let block_refs = all_blocks
1904 .iter()
1905 .map(|block| block.reference())
1906 .collect::<Vec<_>>();
1907
1908 let result = dag_state
1909 .get_blocks(&block_refs)
1910 .into_iter()
1911 .map(|b| b.unwrap())
1912 .collect::<Vec<_>>();
1913 assert_eq!(result, all_blocks);
1914
1915 assert_eq!(dag_state.last_commit_index(), 10);
1917 assert_eq!(
1918 dag_state.last_committed_rounds(),
1919 dag_builder.last_committed_rounds.clone()
1920 );
1921
1922 drop(dag_state);
1924
1925 let dag_state = DagState::new(context.clone(), store.clone());
1927
1928 let blocks = dag_builder.blocks(1..=5);
1930 let block_refs = blocks
1931 .iter()
1932 .map(|block| block.reference())
1933 .collect::<Vec<_>>();
1934 let result = dag_state
1935 .get_blocks(&block_refs)
1936 .into_iter()
1937 .map(|b| b.unwrap())
1938 .collect::<Vec<_>>();
1939 assert_eq!(result, blocks);
1940
1941 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1943 let block_refs = missing_blocks
1944 .iter()
1945 .map(|block| block.reference())
1946 .collect::<Vec<_>>();
1947 let retrieved_blocks = dag_state
1948 .get_blocks(&block_refs)
1949 .into_iter()
1950 .flatten()
1951 .collect::<Vec<_>>();
1952 assert!(retrieved_blocks.is_empty());
1953
1954 assert_eq!(dag_state.last_commit_index(), 5);
1956
1957 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1959 assert_eq!(
1960 dag_state.last_committed_rounds(),
1961 expected_last_committed_rounds
1962 );
1963
1964 assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1967 }
1968
1969 #[tokio::test]
1970 async fn test_flush_and_recovery() {
1971 telemetry_subscribers::init_for_testing();
1972 let num_authorities: u32 = 4;
1973 let (context, _) = Context::new_for_test(num_authorities as usize);
1974 let context = Arc::new(context);
1975 let store = Arc::new(MemStore::new());
1976 let mut dag_state = DagState::new(context.clone(), store.clone());
1977
1978 let num_rounds: u32 = 10;
1980 let mut dag_builder = DagBuilder::new(context.clone());
1981 dag_builder.layers(1..=num_rounds).build();
1982 let mut commits = vec![];
1983 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1984 commits.push(commit);
1985 }
1986
1987 let temp_commits = commits.split_off(5);
1989 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1990 for commit in commits.clone() {
1991 dag_state.add_commit(commit);
1992 }
1993
1994 dag_state.flush();
1996
1997 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1999 for commit in temp_commits.clone() {
2000 dag_state.add_commit(commit);
2001 }
2002
2003 let all_blocks = dag_builder.blocks(6..=num_rounds);
2005 let block_refs = all_blocks
2006 .iter()
2007 .map(|block| block.reference())
2008 .collect::<Vec<_>>();
2009 let result = dag_state
2010 .get_blocks(&block_refs)
2011 .into_iter()
2012 .map(|b| b.unwrap())
2013 .collect::<Vec<_>>();
2014 assert_eq!(result, all_blocks);
2015
2016 assert_eq!(dag_state.last_commit_index(), 10);
2018 assert_eq!(
2019 dag_state.last_committed_rounds(),
2020 dag_builder.last_committed_rounds.clone()
2021 );
2022
2023 drop(dag_state);
2025
2026 let dag_state = DagState::new(context.clone(), store.clone());
2028
2029 let blocks = dag_builder.blocks(1..=5);
2031 let block_refs = blocks
2032 .iter()
2033 .map(|block| block.reference())
2034 .collect::<Vec<_>>();
2035 let result = dag_state
2036 .get_blocks(&block_refs)
2037 .into_iter()
2038 .map(|b| b.unwrap())
2039 .collect::<Vec<_>>();
2040 assert_eq!(result, blocks);
2041
2042 let missing_blocks = dag_builder.blocks(6..=num_rounds);
2044 let block_refs = missing_blocks
2045 .iter()
2046 .map(|block| block.reference())
2047 .collect::<Vec<_>>();
2048 let retrieved_blocks = dag_state
2049 .get_blocks(&block_refs)
2050 .into_iter()
2051 .flatten()
2052 .collect::<Vec<_>>();
2053 assert!(retrieved_blocks.is_empty());
2054
2055 assert_eq!(dag_state.last_commit_index(), 5);
2057
2058 let expected_last_committed_rounds = vec![4, 5, 4, 4];
2060 assert_eq!(
2061 dag_state.last_committed_rounds(),
2062 expected_last_committed_rounds
2063 );
2064 assert_eq!(dag_state.scoring_subdags_count(), 5);
2067 }
2068
2069 #[tokio::test]
2070 async fn test_flush_and_recovery_gc_enabled() {
2071 telemetry_subscribers::init_for_testing();
2072
2073 const GC_DEPTH: u32 = 3;
2074 const CACHED_ROUNDS: u32 = 4;
2075
2076 let num_authorities: u32 = 4;
2077 let (mut context, _) = Context::new_for_test(num_authorities as usize);
2078 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2079 context
2080 .protocol_config
2081 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2082 context
2083 .protocol_config
2084 .set_consensus_linearize_subdag_v2_for_testing(true);
2085
2086 let context = Arc::new(context);
2087
2088 let store = Arc::new(MemStore::new());
2089 let mut dag_state = DagState::new(context.clone(), store.clone());
2090
2091 let num_rounds: u32 = 10;
2092 let mut dag_builder = DagBuilder::new(context.clone());
2093 dag_builder.layers(1..=5).build();
2094 dag_builder
2095 .layers(6..=8)
2096 .authorities(vec![AuthorityIndex::new_for_test(0)])
2097 .skip_block()
2098 .build();
2099 dag_builder.layers(9..=num_rounds).build();
2100
2101 let mut commits = dag_builder
2102 .get_sub_dag_and_commits(1..=num_rounds)
2103 .into_iter()
2104 .map(|(_subdag, commit)| commit)
2105 .collect::<Vec<_>>();
2106
2107 let temp_commits = commits.split_off(7);
2111 dag_state.accept_blocks(dag_builder.blocks(1..=8));
2112 for commit in commits.clone() {
2113 dag_state.add_commit(commit);
2114 }
2115
2116 let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2119 for commit in commits.iter() {
2120 all_committed_blocks.extend(commit.blocks());
2121 }
2122 dag_state.flush();
2124
2125 dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2127 for commit in temp_commits.clone() {
2128 dag_state.add_commit(commit);
2129 }
2130
2131 let all_blocks = dag_builder.blocks(1..=num_rounds);
2133 let block_refs = all_blocks
2134 .iter()
2135 .map(|block| block.reference())
2136 .collect::<Vec<_>>();
2137 let result = dag_state
2138 .get_blocks(&block_refs)
2139 .into_iter()
2140 .map(|b| b.unwrap())
2141 .collect::<Vec<_>>();
2142 assert_eq!(result, all_blocks);
2143
2144 assert_eq!(dag_state.last_commit_index(), 9);
2146 assert_eq!(
2147 dag_state.last_committed_rounds(),
2148 dag_builder.last_committed_rounds.clone()
2149 );
2150
2151 drop(dag_state);
2153
2154 let dag_state = DagState::new(context.clone(), store.clone());
2156
2157 let blocks = dag_builder.blocks(1..=5);
2159 let block_refs = blocks
2160 .iter()
2161 .map(|block| block.reference())
2162 .collect::<Vec<_>>();
2163 let result = dag_state
2164 .get_blocks(&block_refs)
2165 .into_iter()
2166 .map(|b| b.unwrap())
2167 .collect::<Vec<_>>();
2168 assert_eq!(result, blocks);
2169
2170 let missing_blocks = dag_builder.blocks(9..=num_rounds);
2172 let block_refs = missing_blocks
2173 .iter()
2174 .map(|block| block.reference())
2175 .collect::<Vec<_>>();
2176 let retrieved_blocks = dag_state
2177 .get_blocks(&block_refs)
2178 .into_iter()
2179 .flatten()
2180 .collect::<Vec<_>>();
2181 assert!(retrieved_blocks.is_empty());
2182
2183 assert_eq!(dag_state.last_commit_index(), 7);
2185
2186 let expected_last_committed_rounds = vec![5, 6, 6, 7];
2188 assert_eq!(
2189 dag_state.last_committed_rounds(),
2190 expected_last_committed_rounds
2191 );
2192 assert_eq!(dag_state.scoring_subdags_count(), 7);
2195 for (authority_index, _) in context.committee.authorities() {
2197 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2198
2199 if authority_index == AuthorityIndex::new_for_test(0) {
2204 assert_eq!(blocks.len(), 4);
2205 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2206 assert!(
2207 blocks
2208 .into_iter()
2209 .all(|block| block.round() >= 2 && block.round() <= 5)
2210 );
2211 } else {
2212 assert_eq!(blocks.len(), 4);
2213 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2214 assert!(
2215 blocks
2216 .into_iter()
2217 .all(|block| block.round() >= 5 && block.round() <= 8)
2218 );
2219 }
2220 }
2221 let gc_round = dag_state.gc_round();
2224 assert_eq!(gc_round, 4);
2225 dag_state
2226 .recent_blocks
2227 .iter()
2228 .for_each(|(block_ref, block_info)| {
2229 if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2230 assert!(
2231 block_info.committed,
2232 "Block {block_ref:?} should be committed"
2233 );
2234 };
2235 });
2236 }
2237
2238 #[tokio::test]
2239 async fn test_block_info_as_committed() {
2240 let num_authorities: u32 = 4;
2241 let (context, _) = Context::new_for_test(num_authorities as usize);
2242 let context = Arc::new(context);
2243
2244 let store = Arc::new(MemStore::new());
2245 let mut dag_state = DagState::new(context.clone(), store.clone());
2246
2247 let block = VerifiedBlock::new_for_test(
2249 TestBlock::new(1, 0)
2250 .set_timestamp_ms(1000)
2251 .set_ancestors(vec![])
2252 .build(),
2253 );
2254
2255 dag_state.accept_block(block.clone());
2256
2257 assert!(!dag_state.is_committed(&block.reference()));
2259
2260 assert!(
2262 dag_state.set_committed(&block.reference()),
2263 "Block should be successfully set as committed for first time"
2264 );
2265
2266 assert!(dag_state.is_committed(&block.reference()));
2268
2269 assert!(
2271 !dag_state.set_committed(&block.reference()),
2272 "Block should not be successfully set as committed"
2273 );
2274 }
2275
2276 #[tokio::test]
2277 async fn test_get_cached_blocks() {
2278 let (mut context, _) = Context::new_for_test(4);
2279 context.parameters.dag_state_cached_rounds = 5;
2280
2281 let context = Arc::new(context);
2282 let store = Arc::new(MemStore::new());
2283 let mut dag_state = DagState::new(context.clone(), store.clone());
2284
2285 let mut all_blocks = Vec::new();
2290 for author in 1..=3 {
2291 for round in 10..(10 + author) {
2292 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2293 all_blocks.push(block.clone());
2294 dag_state.accept_block(block);
2295 }
2296 }
2297
2298 let cached_blocks =
2299 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2300 assert!(cached_blocks.is_empty());
2301
2302 let cached_blocks =
2303 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2304 assert_eq!(cached_blocks.len(), 1);
2305 assert_eq!(cached_blocks[0].round(), 10);
2306
2307 let cached_blocks =
2308 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2309 assert_eq!(cached_blocks.len(), 2);
2310 assert_eq!(cached_blocks[0].round(), 10);
2311 assert_eq!(cached_blocks[1].round(), 11);
2312
2313 let cached_blocks =
2314 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2315 assert_eq!(cached_blocks.len(), 1);
2316 assert_eq!(cached_blocks[0].round(), 11);
2317
2318 let cached_blocks =
2319 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2320 assert_eq!(cached_blocks.len(), 3);
2321 assert_eq!(cached_blocks[0].round(), 10);
2322 assert_eq!(cached_blocks[1].round(), 11);
2323 assert_eq!(cached_blocks[2].round(), 12);
2324
2325 let cached_blocks =
2326 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2327 assert_eq!(cached_blocks.len(), 1);
2328 assert_eq!(cached_blocks[0].round(), 12);
2329
2330 let cached_blocks = dag_state.get_cached_blocks_in_range(
2334 context.committee.to_authority_index(3).unwrap(),
2335 10,
2336 10,
2337 1,
2338 );
2339 assert!(cached_blocks.is_empty());
2340
2341 let cached_blocks = dag_state.get_cached_blocks_in_range(
2343 context.committee.to_authority_index(3).unwrap(),
2344 11,
2345 10,
2346 1,
2347 );
2348 assert!(cached_blocks.is_empty());
2349
2350 let cached_blocks = dag_state.get_cached_blocks_in_range(
2352 context.committee.to_authority_index(0).unwrap(),
2353 9,
2354 10,
2355 1,
2356 );
2357 assert!(cached_blocks.is_empty());
2358
2359 let cached_blocks = dag_state.get_cached_blocks_in_range(
2361 context.committee.to_authority_index(1).unwrap(),
2362 9,
2363 11,
2364 1,
2365 );
2366 assert_eq!(cached_blocks.len(), 1);
2367 assert_eq!(cached_blocks[0].round(), 10);
2368
2369 let cached_blocks = dag_state.get_cached_blocks_in_range(
2371 context.committee.to_authority_index(2).unwrap(),
2372 9,
2373 12,
2374 5,
2375 );
2376 assert_eq!(cached_blocks.len(), 2);
2377 assert_eq!(cached_blocks[0].round(), 10);
2378 assert_eq!(cached_blocks[1].round(), 11);
2379
2380 let cached_blocks = dag_state.get_cached_blocks_in_range(
2382 context.committee.to_authority_index(3).unwrap(),
2383 11,
2384 20,
2385 5,
2386 );
2387 assert_eq!(cached_blocks.len(), 2);
2388 assert_eq!(cached_blocks[0].round(), 11);
2389 assert_eq!(cached_blocks[1].round(), 12);
2390
2391 let cached_blocks = dag_state.get_cached_blocks_in_range(
2393 context.committee.to_authority_index(3).unwrap(),
2394 10,
2395 20,
2396 1,
2397 );
2398 assert_eq!(cached_blocks.len(), 1);
2399 assert_eq!(cached_blocks[0].round(), 10);
2400 }
2401
2402 #[rstest]
2403 #[tokio::test]
2404 async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2405 const CACHED_ROUNDS: Round = 2;
2407 let (mut context, _) = Context::new_for_test(4);
2408 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2409
2410 if gc_depth > 0 {
2411 context
2412 .protocol_config
2413 .set_consensus_gc_depth_for_testing(gc_depth);
2414 }
2415
2416 let context = Arc::new(context);
2417 let store = Arc::new(MemStore::new());
2418 let mut dag_state = DagState::new(context.clone(), store.clone());
2419
2420 let dag_str = "DAG {
2425 Round 0 : { 4 },
2426 Round 1 : {
2427 B -> [*],
2428 C -> [*],
2429 D -> [*],
2430 },
2431 Round 2 : {
2432 C -> [*],
2433 D -> [*],
2434 },
2435 Round 3 : {
2436 D -> [*],
2437 },
2438 }";
2439
2440 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2441
2442 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2444
2445 for block in dag_builder
2447 .all_blocks()
2448 .into_iter()
2449 .chain(std::iter::once(block))
2450 {
2451 dag_state.accept_block(block);
2452 }
2453
2454 dag_state.add_commit(TrustedCommit::new_for_test(
2455 1 as CommitIndex,
2456 CommitDigest::MIN,
2457 context.clock.timestamp_utc_ms(),
2458 dag_builder.leader_block(3).unwrap().reference(),
2459 vec![],
2460 ));
2461
2462 let end_round = 4;
2464 let expected_rounds = vec![0, 1, 2, 3];
2465 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2466 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2468 assert_eq!(
2469 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2470 expected_rounds
2471 );
2472 assert_eq!(
2473 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2474 expected_excluded_and_equivocating_blocks
2475 );
2476
2477 for (i, expected_round) in expected_rounds.iter().enumerate() {
2479 let round = dag_state
2480 .get_last_cached_block_in_range(
2481 context.committee.to_authority_index(i).unwrap(),
2482 0,
2483 end_round,
2484 )
2485 .map(|b| b.round())
2486 .unwrap_or_default();
2487 assert_eq!(round, *expected_round, "Authority {i}");
2488 }
2489
2490 let start_round = 2;
2492 let expected_rounds = [0, 0, 2, 3];
2493
2494 for (i, expected_round) in expected_rounds.iter().enumerate() {
2496 let round = dag_state
2497 .get_last_cached_block_in_range(
2498 context.committee.to_authority_index(i).unwrap(),
2499 start_round,
2500 end_round,
2501 )
2502 .map(|b| b.round())
2503 .unwrap_or_default();
2504 assert_eq!(round, *expected_round, "Authority {i}");
2505 }
2506
2507 dag_state.flush();
2516
2517 let end_round = 3;
2519 let expected_rounds = vec![0, 1, 2, 2];
2520
2521 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2523 assert_eq!(
2524 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2525 expected_rounds
2526 );
2527
2528 for (i, expected_round) in expected_rounds.iter().enumerate() {
2530 let round = dag_state
2531 .get_last_cached_block_in_range(
2532 context.committee.to_authority_index(i).unwrap(),
2533 0,
2534 end_round,
2535 )
2536 .map(|b| b.round())
2537 .unwrap_or_default();
2538 assert_eq!(round, *expected_round, "Authority {i}");
2539 }
2540 }
2541
2542 #[tokio::test]
2543 #[should_panic(
2544 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2545 )]
2546 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2547 const CACHED_ROUNDS: Round = 1;
2549 const GC_DEPTH: u32 = 1;
2550 let (mut context, _) = Context::new_for_test(4);
2551 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2552 context
2553 .protocol_config
2554 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2555
2556 let context = Arc::new(context);
2557 let store = Arc::new(MemStore::new());
2558 let mut dag_state = DagState::new(context.clone(), store.clone());
2559
2560 let mut dag_builder = DagBuilder::new(context.clone());
2565 dag_builder
2566 .layers(1..=1)
2567 .authorities(vec![AuthorityIndex::new_for_test(0)])
2568 .skip_block()
2569 .build();
2570 dag_builder
2571 .layers(2..=2)
2572 .authorities(vec![
2573 AuthorityIndex::new_for_test(0),
2574 AuthorityIndex::new_for_test(1),
2575 ])
2576 .skip_block()
2577 .build();
2578 dag_builder
2579 .layers(3..=3)
2580 .authorities(vec![
2581 AuthorityIndex::new_for_test(0),
2582 AuthorityIndex::new_for_test(1),
2583 AuthorityIndex::new_for_test(2),
2584 ])
2585 .skip_block()
2586 .build();
2587
2588 for block in dag_builder.all_blocks() {
2590 dag_state.accept_block(block);
2591 }
2592
2593 dag_state.add_commit(TrustedCommit::new_for_test(
2594 1 as CommitIndex,
2595 CommitDigest::MIN,
2596 0,
2597 dag_builder.leader_block(3).unwrap().reference(),
2598 vec![],
2599 ));
2600
2601 dag_state.flush();
2603
2604 dag_state.get_last_cached_block_per_authority(2);
2607 }
2608
2609 #[tokio::test]
2610 async fn test_last_quorum() {
2611 let (context, _) = Context::new_for_test(4);
2613 let context = Arc::new(context);
2614 let store = Arc::new(MemStore::new());
2615 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2616
2617 {
2619 let genesis = genesis_blocks(&context);
2620
2621 assert_eq!(dag_state.read().last_quorum(), genesis);
2622 }
2623
2624 {
2627 let mut dag_builder = DagBuilder::new(context.clone());
2628 dag_builder
2629 .layers(1..=4)
2630 .build()
2631 .persist_layers(dag_state.clone());
2632 let round_4_blocks: Vec<_> = dag_builder
2633 .blocks(4..=4)
2634 .into_iter()
2635 .map(|block| block.reference())
2636 .collect();
2637
2638 let last_quorum = dag_state.read().last_quorum();
2639
2640 assert_eq!(
2641 last_quorum
2642 .into_iter()
2643 .map(|block| block.reference())
2644 .collect::<Vec<_>>(),
2645 round_4_blocks
2646 );
2647 }
2648
2649 {
2652 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2653 dag_state.write().accept_block(block);
2654
2655 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2656
2657 let last_quorum = dag_state.read().last_quorum();
2658
2659 assert_eq!(last_quorum, round_4_blocks);
2660 }
2661 }
2662
2663 #[tokio::test]
2664 async fn test_last_block_for_authority() {
2665 let (context, _) = Context::new_for_test(4);
2667 let context = Arc::new(context);
2668 let store = Arc::new(MemStore::new());
2669 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2670
2671 {
2673 let genesis = genesis_blocks(&context);
2674 let my_genesis = genesis
2675 .into_iter()
2676 .find(|block| block.author() == context.own_index)
2677 .unwrap();
2678
2679 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2680 }
2681
2682 {
2685 let mut dag_builder = DagBuilder::new(context.clone());
2687 dag_builder
2688 .layers(1..=4)
2689 .build()
2690 .persist_layers(dag_state.clone());
2691
2692 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2694 dag_state.write().accept_block(block);
2695
2696 let block = dag_state
2697 .read()
2698 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2699 assert_eq!(block.round(), 5);
2700
2701 for (authority_index, _) in context.committee.authorities() {
2702 let block = dag_state
2703 .read()
2704 .get_last_block_for_authority(authority_index);
2705
2706 if authority_index.value() == 0 {
2707 assert_eq!(block.round(), 5);
2708 } else {
2709 assert_eq!(block.round(), 4);
2710 }
2711 }
2712 }
2713 }
2714
2715 #[tokio::test]
2716 #[should_panic]
2717 async fn test_accept_block_panics_when_timestamp_is_ahead() {
2718 let (mut context, _) = Context::new_for_test(4);
2720 context
2721 .protocol_config
2722 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
2723 let context = Arc::new(context);
2724 let store = Arc::new(MemStore::new());
2725 let mut dag_state = DagState::new(context.clone(), store.clone());
2726
2727 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2729
2730 let block = VerifiedBlock::new_for_test(
2731 TestBlock::new(10, 0)
2732 .set_timestamp_ms(block_timestamp)
2733 .build(),
2734 );
2735
2736 dag_state.accept_block(block);
2739 }
2740
2741 #[tokio::test]
2742 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2743 let (mut context, _) = Context::new_for_test(4);
2745 context
2746 .protocol_config
2747 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(true);
2748
2749 let context = Arc::new(context);
2750 let store = Arc::new(MemStore::new());
2751 let mut dag_state = DagState::new(context.clone(), store.clone());
2752
2753 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2755
2756 let block = VerifiedBlock::new_for_test(
2757 TestBlock::new(10, 0)
2758 .set_timestamp_ms(block_timestamp)
2759 .build(),
2760 );
2761
2762 dag_state.accept_block(block);
2764 }
2765}