1use std::{
6 cmp::max,
7 collections::{BTreeMap, BTreeSet, VecDeque},
8 ops::Bound::{Excluded, Included, Unbounded},
9 panic,
10 sync::Arc,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use itertools::Itertools as _;
16use tokio::time::Instant;
17use tracing::{debug, info, trace, warn};
18
19use crate::{
20 CommittedSubDag,
21 block::{
22 BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, GENESIS_ROUND, Round, Slot,
23 VerifiedBlock, genesis_blocks,
24 },
25 commit::{
26 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
27 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
28 },
29 context::Context,
30 leader_scoring::{ReputationScores, ScoringSubdag},
31 storage::{Store, WriteBatch},
32 threshold_clock::ThresholdClock,
33};
34
35pub(crate) struct DagState {
44 context: Arc<Context>,
45
46 genesis: BTreeMap<BlockRef, VerifiedBlock>,
48
49 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
60
61 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
64
65 threshold_clock: ThresholdClock,
67
68 evicted_rounds: Vec<Round>,
73
74 highest_accepted_round: Round,
76
77 last_commit: Option<TrustedCommit>,
79
80 last_commit_round_advancement_time: Option<std::time::Instant>,
82
83 last_committed_rounds: Vec<Round>,
85
86 scoring_subdag: ScoringSubdag,
89 unscored_committed_subdags: Vec<CommittedSubDag>,
94
95 pending_commit_votes: VecDeque<CommitVote>,
98
99 blocks_to_write: Vec<VerifiedBlock>,
101 commits_to_write: Vec<TrustedCommit>,
102
103 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
107
108 store: Arc<dyn Store>,
110
111 cached_rounds: Round,
113}
114
115impl DagState {
116 pub(crate) fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
118 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
119 let num_authorities = context.committee.size();
120
121 let genesis = genesis_blocks(&context)
122 .into_iter()
123 .map(|block| (block.reference(), block))
124 .collect();
125
126 let threshold_clock = ThresholdClock::new(1, context.clone());
127
128 let last_commit = store
129 .read_last_commit()
130 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
131
132 let commit_info = store
133 .read_last_commit_info()
134 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
135 let (mut last_committed_rounds, commit_recovery_start_index) =
136 if let Some((commit_ref, commit_info)) = commit_info {
137 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
138 (commit_info.committed_rounds, commit_ref.index + 1)
139 } else {
140 tracing::info!("Found no stored CommitInfo to recover from");
141 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
142 };
143
144 let mut unscored_committed_subdags = Vec::new();
145 let mut scoring_subdag = ScoringSubdag::new(context.clone());
146
147 if let Some(last_commit) = last_commit.as_ref() {
148 store
149 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
150 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
151 .iter()
152 .for_each(|commit| {
153 for block_ref in commit.blocks() {
154 last_committed_rounds[block_ref.author] =
155 max(last_committed_rounds[block_ref.author], block_ref.round);
156 }
157
158 let committed_subdag =
159 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); unscored_committed_subdags.push(committed_subdag);
161 });
162 }
163
164 tracing::info!(
165 "DagState was initialized with the following state: \
166 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
167 unscored_committed_subdags.len()
168 );
169
170 if context
171 .protocol_config
172 .consensus_distributed_vote_scoring_strategy()
173 {
174 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
175 }
176
177 let mut state = Self {
178 context,
179 genesis,
180 recent_blocks: BTreeMap::new(),
181 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
182 threshold_clock,
183 highest_accepted_round: 0,
184 last_commit: last_commit.clone(),
185 last_commit_round_advancement_time: None,
186 last_committed_rounds: last_committed_rounds.clone(),
187 pending_commit_votes: VecDeque::new(),
188 blocks_to_write: vec![],
189 commits_to_write: vec![],
190 commit_info_to_write: vec![],
191 scoring_subdag,
192 unscored_committed_subdags,
193 store: store.clone(),
194 cached_rounds,
195 evicted_rounds: vec![0; num_authorities],
196 };
197
198 for (i, round) in last_committed_rounds.into_iter().enumerate() {
199 let authority_index = state.context.committee.to_authority_index(i).unwrap();
200 let (blocks, eviction_round) = if state.gc_enabled() {
201 let last_block = state
205 .store
206 .scan_last_blocks_by_author(authority_index, 1, None)
207 .expect("Database error");
208 let last_block_round = last_block
209 .last()
210 .map(|b| b.round())
211 .unwrap_or(GENESIS_ROUND);
212
213 let eviction_round = Self::gc_eviction_round(
214 last_block_round,
215 state.gc_round(),
216 state.cached_rounds,
217 );
218 let blocks = state
219 .store
220 .scan_blocks_by_author(authority_index, eviction_round + 1)
221 .expect("Database error");
222 (blocks, eviction_round)
223 } else {
224 let eviction_round = Self::eviction_round(round, cached_rounds);
225 let blocks = state
226 .store
227 .scan_blocks_by_author(authority_index, eviction_round + 1)
228 .expect("Database error");
229 (blocks, eviction_round)
230 };
231
232 state.evicted_rounds[authority_index] = eviction_round;
233
234 for block in &blocks {
236 state.update_block_metadata(block);
237 }
238
239 info!(
240 "Recovered blocks {}: {:?}",
241 authority_index,
242 blocks
243 .iter()
244 .map(|b| b.reference())
245 .collect::<Vec<BlockRef>>()
246 );
247 }
248
249 let recovered_scoring_metrics = state.store.scan_scoring_metrics().expect("Database error");
252 state
253 .context
254 .scoring_metrics_store
255 .initialize_scoring_metrics(
256 recovered_scoring_metrics,
257 &state.recent_refs_by_authority,
258 state.threshold_clock_round(),
259 &state.evicted_rounds,
260 state.context.clone(),
261 );
262
263 if state.gc_enabled() {
264 if let Some(last_commit) = last_commit {
265 let mut index = last_commit.index();
266 let gc_round = state.gc_round();
267 info!(
268 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
269 index, gc_round
270 );
271
272 loop {
273 let commits = store
274 .scan_commits((index..=index).into())
275 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
276 let Some(commit) = commits.first() else {
277 info!(
278 "Recovering finished up to index {index}, no more commits to recover"
279 );
280 break;
281 };
282
283 if gc_round > 0 && commit.leader().round <= gc_round {
286 info!(
287 "Recovering finished, reached commit leader round {} <= gc_round {}",
288 commit.leader().round,
289 gc_round
290 );
291 break;
292 }
293
294 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
295 debug!(
296 "Setting block {:?} as committed based on commit {:?}",
297 block_ref,
298 commit.index()
299 );
300 assert!(state.set_committed(block_ref), "Attempted to set again a block {block_ref:?} as committed when recovering commit {commit:?}");
301 });
302
303 index = index.saturating_sub(1);
305 if index == 0 {
306 break;
307 }
308 }
309 }
310 }
311
312 state
313 }
314
315 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
317 assert_ne!(
318 block.round(),
319 0,
320 "Genesis block should not be accepted into DAG."
321 );
322
323 let block_ref = block.reference();
324 if self.contains_block(&block_ref) {
325 return;
326 }
327
328 let now = self.context.clock.timestamp_utc_ms();
329 if block.timestamp_ms() > now {
330 if self
331 .context
332 .protocol_config
333 .consensus_median_timestamp_with_checkpoint_enforcement()
334 {
335 trace!(
336 "Block {:?} with timestamp {} is greater than local timestamp {}.",
337 block,
338 block.timestamp_ms(),
339 now,
340 );
341 } else {
342 panic!(
343 "Block {:?} cannot be accepted! Block timestamp {} is greater than local timestamp {}.",
344 block,
345 block.timestamp_ms(),
346 now,
347 );
348 }
349 }
350 let hostname = &self.context.committee.authority(block_ref.author).hostname;
351 self.context
352 .metrics
353 .node_metrics
354 .accepted_block_time_drift_ms
355 .with_label_values(&[hostname])
356 .inc_by(block.timestamp_ms().saturating_sub(now));
357
358 if block_ref.author == self.context.own_index {
361 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
362 assert!(
363 existing_blocks.is_empty(),
364 "Block Rejected! Attempted to add block {block:#?} to own slot where \
365 block(s) {existing_blocks:#?} already exists."
366 );
367 }
368 self.update_block_metadata(&block);
369 self.blocks_to_write.push(block);
370 let source = if self.context.own_index == block_ref.author {
371 "own"
372 } else {
373 "others"
374 };
375 self.context
376 .metrics
377 .node_metrics
378 .accepted_blocks
379 .with_label_values(&[source])
380 .inc();
381 }
382
383 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
385 let block_ref = block.reference();
386 self.recent_blocks
387 .insert(block_ref, BlockInfo::new(block.clone()));
388 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
389 self.threshold_clock.add_block(block_ref);
390 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
391 self.context
392 .metrics
393 .node_metrics
394 .highest_accepted_round
395 .set(self.highest_accepted_round as i64);
396
397 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
398 .last()
399 .map(|block_ref| block_ref.round)
400 .expect("There should be by now at least one block ref");
401 let hostname = &self.context.committee.authority(block_ref.author).hostname;
402 self.context
403 .metrics
404 .node_metrics
405 .highest_accepted_authority_round
406 .with_label_values(&[hostname])
407 .set(highest_accepted_round_for_author as i64);
408 }
409
410 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
412 debug!(
413 "Accepting blocks: {}",
414 blocks.iter().map(|b| b.reference().to_string()).join(",")
415 );
416 for block in blocks {
417 self.accept_block(block);
418 }
419 }
420
421 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
424 self.get_blocks(&[*reference])
425 .pop()
426 .expect("Exactly one element should be returned")
427 }
428
429 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
433 let mut blocks = vec![None; block_refs.len()];
434 let mut missing = Vec::new();
435
436 for (index, block_ref) in block_refs.iter().enumerate() {
437 if block_ref.round == GENESIS_ROUND {
438 if let Some(block) = self.genesis.get(block_ref) {
440 blocks[index] = Some(block.clone());
441 }
442 continue;
443 }
444 if let Some(block_info) = self.recent_blocks.get(block_ref) {
445 blocks[index] = Some(block_info.block.clone());
446 continue;
447 }
448 missing.push((index, block_ref));
449 }
450
451 if missing.is_empty() {
452 return blocks;
453 }
454
455 let missing_refs = missing
456 .iter()
457 .map(|(_, block_ref)| **block_ref)
458 .collect::<Vec<_>>();
459 let store_results = self
460 .store
461 .read_blocks(&missing_refs)
462 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
463 self.context
464 .metrics
465 .node_metrics
466 .dag_state_store_read_count
467 .with_label_values(&["get_blocks"])
468 .inc();
469
470 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
471 blocks[index] = result;
472 }
473
474 blocks
475 }
476
477 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
481 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
482 if !block_info.committed {
483 block_info.committed = true;
484 return true;
485 }
486 false
487 } else {
488 panic!("Block {block_ref:?} not found in cache to set as committed.");
489 }
490 }
491
492 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
493 self.recent_blocks
494 .get(block_ref)
495 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
496 .committed
497 }
498
499 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
503 let mut blocks = vec![];
508 for (_block_ref, block_info) in self.recent_blocks.range((
509 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
510 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
511 )) {
512 blocks.push(block_info.block.clone())
513 }
514 blocks
515 }
516
517 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
521 if round <= self.last_commit_round() {
522 panic!("Round {round} have committed blocks!");
523 }
524
525 let mut blocks = vec![];
526 for (_block_ref, block_info) in self.recent_blocks.range((
527 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
528 Excluded(BlockRef::new(
529 round + 1,
530 AuthorityIndex::ZERO,
531 BlockDigest::MIN,
532 )),
533 )) {
534 blocks.push(block_info.block.clone())
535 }
536 blocks
537 }
538
539 pub(crate) fn ancestors_at_round(
541 &self,
542 later_block: &VerifiedBlock,
543 earlier_round: Round,
544 ) -> Vec<VerifiedBlock> {
545 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
547 while !linked.is_empty() {
548 let round = linked.last().unwrap().round;
549 if round <= earlier_round {
551 break;
552 }
553 let block_ref = linked.pop_last().unwrap();
554 let Some(block) = self.get_block(&block_ref) else {
555 panic!("Block {block_ref:?} should exist in DAG!");
556 };
557 linked.extend(block.ancestors().iter().cloned());
558 }
559 linked
560 .range((
561 Included(BlockRef::new(
562 earlier_round,
563 AuthorityIndex::ZERO,
564 BlockDigest::MIN,
565 )),
566 Unbounded,
567 ))
568 .map(|r| {
569 self.get_block(r)
570 .unwrap_or_else(|| panic!("Block {r:?} should exist in DAG!"))
571 .clone()
572 })
573 .collect()
574 }
575
576 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
579 self.get_last_block_for_authority(self.context.own_index)
580 }
581
582 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
586 if let Some(last) = self.recent_refs_by_authority[authority].last() {
587 return self
588 .recent_blocks
589 .get(last)
590 .expect("Block should be found in recent blocks")
591 .block
592 .clone();
593 }
594
595 let (_, genesis_block) = self
597 .genesis
598 .iter()
599 .find(|(block_ref, _)| block_ref.author == authority)
600 .expect("Genesis should be found for authority {authority_index}");
601 genesis_block.clone()
602 }
603
604 pub(crate) fn get_cached_blocks(
610 &self,
611 authority: AuthorityIndex,
612 start: Round,
613 ) -> Vec<VerifiedBlock> {
614 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
615 }
616
617 pub(crate) fn get_cached_blocks_in_range(
620 &self,
621 authority: AuthorityIndex,
622 start_round: Round,
623 end_round: Round,
624 limit: usize,
625 ) -> Vec<VerifiedBlock> {
626 if start_round >= end_round || limit == 0 {
627 return vec![];
628 }
629
630 let mut blocks = vec![];
631 for block_ref in self.recent_refs_by_authority[authority].range((
632 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
633 Excluded(BlockRef::new(
634 end_round,
635 AuthorityIndex::MIN,
636 BlockDigest::MIN,
637 )),
638 )) {
639 let block_info = self
640 .recent_blocks
641 .get(block_ref)
642 .expect("Block should exist in recent blocks");
643 blocks.push(block_info.block.clone());
644 if blocks.len() >= limit {
645 break;
646 }
647 }
648 blocks
649 }
650
651 pub(crate) fn get_last_cached_block_in_range(
654 &self,
655 authority: AuthorityIndex,
656 start_round: Round,
657 end_round: Round,
658 ) -> Option<VerifiedBlock> {
659 if start_round >= end_round {
660 return None;
661 }
662
663 let block_ref = self.recent_refs_by_authority[authority]
664 .range((
665 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
666 Excluded(BlockRef::new(
667 end_round,
668 AuthorityIndex::MIN,
669 BlockDigest::MIN,
670 )),
671 ))
672 .last()?;
673
674 self.recent_blocks
675 .get(block_ref)
676 .map(|block_info| block_info.block.clone())
677 }
678
679 pub(crate) fn get_last_cached_block_per_authority(
688 &self,
689 end_round: Round,
690 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
691 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
693 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
694
695 if end_round == GENESIS_ROUND {
696 panic!(
697 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
698 );
699 }
700
701 if end_round == GENESIS_ROUND + 1 {
702 return blocks.into_iter().map(|b| (b, vec![])).collect();
703 }
704
705 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
706 let authority_index = self
707 .context
708 .committee
709 .to_authority_index(authority_index)
710 .unwrap();
711
712 let last_evicted_round = self.evicted_rounds[authority_index];
713 if end_round.saturating_sub(1) <= last_evicted_round {
714 panic!(
715 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
716 );
717 }
718
719 let block_ref_iter = block_refs
720 .range((
721 Included(BlockRef::new(
722 last_evicted_round + 1,
723 authority_index,
724 BlockDigest::MIN,
725 )),
726 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
727 ))
728 .rev();
729
730 let mut last_round = 0;
731 for block_ref in block_ref_iter {
732 if last_round == 0 {
733 last_round = block_ref.round;
734 let block_info = self
735 .recent_blocks
736 .get(block_ref)
737 .expect("Block should exist in recent blocks");
738 blocks[authority_index] = block_info.block.clone();
739 continue;
740 }
741 if block_ref.round < last_round {
742 break;
743 }
744 equivocating_blocks[authority_index].push(*block_ref);
745 }
746 }
747
748 blocks.into_iter().zip(equivocating_blocks).collect()
749 }
750
751 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
755 if slot.round == GENESIS_ROUND {
757 return true;
758 }
759
760 let eviction_round = self.evicted_rounds[slot.authority];
761 if slot.round <= eviction_round {
762 panic!(
763 "{}",
764 format!(
765 "Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}",
766 if self.gc_enabled() { " gc " } else { " " }
767 )
768 );
769 }
770
771 let mut result = self.recent_refs_by_authority[slot.authority].range((
772 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
773 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
774 ));
775 result.next().is_some()
776 }
777
778 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
782 let mut exist = vec![false; block_refs.len()];
783 let mut missing = Vec::new();
784
785 for (index, block_ref) in block_refs.into_iter().enumerate() {
786 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
787 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
788 exist[index] = true;
789 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
790 {
791 exist[index] = false;
796 } else {
797 missing.push((index, block_ref));
798 }
799 }
800
801 if missing.is_empty() {
802 return exist;
803 }
804
805 let missing_refs = missing
806 .iter()
807 .map(|(_, block_ref)| *block_ref)
808 .collect::<Vec<_>>();
809 let store_results = self
810 .store
811 .contains_blocks(&missing_refs)
812 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"));
813 self.context
814 .metrics
815 .node_metrics
816 .dag_state_store_read_count
817 .with_label_values(&["contains_blocks"])
818 .inc();
819
820 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
821 exist[index] = result;
822 }
823
824 exist
825 }
826
827 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
828 let blocks = self.contains_blocks(vec![*block_ref]);
829 blocks.first().cloned().unwrap()
830 }
831
832 pub(crate) fn threshold_clock_round(&self) -> Round {
833 self.threshold_clock.get_round()
834 }
835
836 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
837 self.threshold_clock.get_quorum_ts()
838 }
839
840 pub(crate) fn highest_accepted_round(&self) -> Round {
841 self.highest_accepted_round
842 }
843
844 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
847 let time_diff = if let Some(last_commit) = &self.last_commit {
848 if commit.index() <= last_commit.index() {
849 warn!(
850 "New commit index {} <= last commit index {}!",
851 commit.index(),
852 last_commit.index()
853 ); return;
855 }
856 assert_eq!(commit.index(), last_commit.index() + 1);
857
858 if commit.timestamp_ms() < last_commit.timestamp_ms() {
859 panic!(
860 "Commit timestamps do not monotonically increment, prev commit {last_commit:?}, new commit {commit:?}"
861 );
862 }
863 commit
864 .timestamp_ms()
865 .saturating_sub(last_commit.timestamp_ms())
866 } else {
867 assert_eq!(commit.index(), 1);
868 0
869 };
870
871 self.context
872 .metrics
873 .node_metrics
874 .last_commit_time_diff
875 .observe(time_diff as f64);
876
877 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
878 previous_commit.round() < commit.round()
879 } else {
880 true
881 };
882
883 self.last_commit = Some(commit.clone());
884
885 if commit_round_advanced {
886 let now = std::time::Instant::now();
887 if let Some(previous_time) = self.last_commit_round_advancement_time {
888 self.context
889 .metrics
890 .node_metrics
891 .commit_round_advancement_interval
892 .observe(now.duration_since(previous_time).as_secs_f64())
893 }
894 self.last_commit_round_advancement_time = Some(now);
895 }
896
897 for block_ref in commit.blocks().iter() {
898 self.last_committed_rounds[block_ref.author] = max(
899 self.last_committed_rounds[block_ref.author],
900 block_ref.round,
901 );
902 }
903
904 for (i, round) in self.last_committed_rounds.iter().enumerate() {
905 let index = self.context.committee.to_authority_index(i).unwrap();
906 let hostname = &self.context.committee.authority(index).hostname;
907 self.context
908 .metrics
909 .node_metrics
910 .last_committed_authority_round
911 .with_label_values(&[hostname])
912 .set((*round).into());
913 }
914
915 self.pending_commit_votes.push_back(commit.reference());
916 self.commits_to_write.push(commit);
917 }
918
919 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
920 assert!(self.unscored_committed_subdags.is_empty());
922
923 assert!(self.scoring_subdag.is_empty());
927
928 let commit_info = CommitInfo {
929 committed_rounds: self.last_committed_rounds.clone(),
930 reputation_scores,
931 };
932 let last_commit = self
933 .last_commit
934 .as_ref()
935 .expect("Last commit should already be set.");
936 self.commit_info_to_write
937 .push((last_commit.reference(), commit_info));
938 }
939
940 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
941 let mut votes = Vec::new();
942 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
943 votes.push(self.pending_commit_votes.pop_front().unwrap());
944 }
945 votes
946 }
947
948 pub(crate) fn last_commit_index(&self) -> CommitIndex {
950 match &self.last_commit {
951 Some(commit) => commit.index(),
952 None => 0,
953 }
954 }
955
956 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
958 match &self.last_commit {
959 Some(commit) => commit.digest(),
960 None => CommitDigest::MIN,
961 }
962 }
963
964 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
966 match &self.last_commit {
967 Some(commit) => commit.timestamp_ms(),
968 None => 0,
969 }
970 }
971
972 pub(crate) fn last_commit_leader(&self) -> Slot {
974 match &self.last_commit {
975 Some(commit) => commit.leader().into(),
976 None => self
977 .genesis
978 .iter()
979 .next()
980 .map(|(genesis_ref, _)| *genesis_ref)
981 .expect("Genesis blocks should always be available.")
982 .into(),
983 }
984 }
985
986 pub(crate) fn last_commit_round(&self) -> Round {
989 match &self.last_commit {
990 Some(commit) => commit.leader().round,
991 None => 0,
992 }
993 }
994
995 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
997 self.last_committed_rounds.clone()
998 }
999
1000 pub(crate) fn gc_round(&self) -> Round {
1007 self.calculate_gc_round(self.last_commit_round())
1008 }
1009
1010 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1011 let gc_depth = self.context.protocol_config.gc_depth();
1012 if gc_depth > 0 {
1013 commit_round.saturating_sub(gc_depth)
1015 } else {
1016 GENESIS_ROUND
1019 }
1020 }
1021
1022 pub(crate) fn gc_enabled(&self) -> bool {
1023 self.context.protocol_config.gc_depth() > 0
1024 }
1025
1026 pub(crate) fn flush(&mut self) {
1029 let _s = self
1030 .context
1031 .metrics
1032 .node_metrics
1033 .scope_processing_time
1034 .with_label_values(&["DagState::flush"])
1035 .start_timer();
1036 let blocks = std::mem::take(&mut self.blocks_to_write);
1038 let commits = std::mem::take(&mut self.commits_to_write);
1039 let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write);
1040
1041 if blocks.is_empty() && commits.is_empty() {
1042 return;
1043 }
1044 debug!(
1045 "Flushing {} blocks ({}), {} commits ({}) and {} commit info ({}) to storage.",
1046 blocks.len(),
1047 blocks.iter().map(|b| b.reference().to_string()).join(","),
1048 commits.len(),
1049 commits.iter().map(|c| c.reference().to_string()).join(","),
1050 commit_info_to_write.len(),
1051 commit_info_to_write
1052 .iter()
1053 .map(|(commit_ref, _)| commit_ref.to_string())
1054 .join(","),
1055 );
1056
1057 let mut metrics_to_write = vec![];
1059 let threshold_clock_round = self.threshold_clock_round();
1060 for (authority_index, authority) in self.context.committee.authorities() {
1061 let last_eviction_round = self.evicted_rounds[authority_index];
1062 let current_eviction_round = self.calculate_authority_eviction_round(authority_index);
1063 let metrics_to_write_from_authority = self
1064 .context
1065 .scoring_metrics_store
1066 .update_scoring_metrics_on_eviction(
1067 authority_index,
1068 authority.hostname.as_str(),
1069 &self.recent_refs_by_authority[authority_index],
1070 current_eviction_round,
1071 last_eviction_round,
1072 threshold_clock_round,
1073 &self.context.metrics.node_metrics,
1074 );
1075 if let Some(metrics_to_write_from_authority) = metrics_to_write_from_authority {
1076 metrics_to_write.push((authority_index, metrics_to_write_from_authority));
1077 }
1078 }
1079
1080 self.store
1081 .write(WriteBatch::new(
1082 blocks,
1083 commits,
1084 commit_info_to_write,
1085 metrics_to_write,
1086 ))
1087 .unwrap_or_else(|e| panic!("Failed to write to storage: {e:?}"));
1088 self.context
1089 .metrics
1090 .node_metrics
1091 .dag_state_store_write_count
1092 .inc();
1093
1094 for (authority_index, _) in self.context.committee.authorities() {
1098 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1099 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1100 let block_round = block_ref.round;
1101 if block_round <= eviction_round {
1102 self.recent_blocks.remove(block_ref);
1103 self.recent_refs_by_authority[authority_index].pop_first();
1104 } else {
1105 break;
1106 }
1107 }
1108 self.evicted_rounds[authority_index] = eviction_round;
1109 }
1110
1111 let metrics = &self.context.metrics.node_metrics;
1112 metrics
1113 .dag_state_recent_blocks
1114 .set(self.recent_blocks.len() as i64);
1115 metrics.dag_state_recent_refs.set(
1116 self.recent_refs_by_authority
1117 .iter()
1118 .map(BTreeSet::len)
1119 .sum::<usize>() as i64,
1120 );
1121 }
1122
1123 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1124 self.store
1125 .read_last_commit_info()
1126 .unwrap_or_else(|e| panic!("Failed to read from storage: {e:?}"))
1127 }
1128
1129 pub(crate) fn unscored_committed_subdags_count(&self) -> u64 {
1131 self.unscored_committed_subdags.len() as u64
1132 }
1133
1134 #[cfg(test)]
1135 pub(crate) fn unscored_committed_subdags(&self) -> Vec<CommittedSubDag> {
1136 self.unscored_committed_subdags.clone()
1137 }
1138
1139 pub(crate) fn add_unscored_committed_subdags(
1140 &mut self,
1141 committed_subdags: Vec<CommittedSubDag>,
1142 ) {
1143 self.unscored_committed_subdags.extend(committed_subdags);
1144 }
1145
1146 pub(crate) fn take_unscored_committed_subdags(&mut self) -> Vec<CommittedSubDag> {
1147 std::mem::take(&mut self.unscored_committed_subdags)
1148 }
1149
1150 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1151 self.scoring_subdag.add_subdags(scoring_subdags);
1152 }
1153
1154 pub(crate) fn clear_scoring_subdag(&mut self) {
1155 self.scoring_subdag.clear();
1156 }
1157
1158 pub(crate) fn scoring_subdags_count(&self) -> usize {
1159 self.scoring_subdag.scored_subdags_count()
1160 }
1161
1162 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1163 self.scoring_subdag.is_empty()
1164 }
1165
1166 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1167 self.scoring_subdag.calculate_distributed_vote_scores()
1168 }
1169
1170 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1171 self.scoring_subdag
1172 .commit_range
1173 .as_ref()
1174 .expect("commit range should exist for scoring subdag")
1175 .end()
1176 }
1177
1178 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1183 if self.gc_enabled() {
1184 let last_round = self.recent_refs_by_authority[authority_index]
1185 .last()
1186 .map(|block_ref| block_ref.round)
1187 .unwrap_or(GENESIS_ROUND);
1188
1189 Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds)
1190 } else {
1191 let commit_round = self.last_committed_rounds[authority_index];
1192 Self::eviction_round(commit_round, self.cached_rounds)
1193 }
1194 }
1195
1196 fn eviction_round(commit_round: Round, cached_rounds: Round) -> Round {
1199 commit_round.saturating_sub(cached_rounds)
1200 }
1201
1202 fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1207 gc_round.min(last_round.saturating_sub(cached_rounds))
1208 }
1209
1210 #[cfg(test)]
1213 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1214 for round in
1218 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1219 {
1220 if round == GENESIS_ROUND {
1221 return self.genesis_blocks();
1222 }
1223 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1224 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1225
1226 let blocks = self.get_uncommitted_blocks_at_round(round);
1229 for block in &blocks {
1230 if quorum.add(block.author(), &self.context.committee) {
1231 return blocks;
1232 }
1233 }
1234 }
1235
1236 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1237 }
1238
1239 #[cfg(test)]
1240 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1241 self.genesis.values().cloned().collect()
1242 }
1243
1244 #[cfg(test)]
1245 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1246 self.last_commit = Some(commit);
1247 }
1248}
1249
1250struct BlockInfo {
1251 block: VerifiedBlock,
1252 committed: bool,
1254}
1255
1256impl BlockInfo {
1257 fn new(block: VerifiedBlock) -> Self {
1258 Self {
1259 block,
1260 committed: false,
1261 }
1262 }
1263}
1264
1265#[cfg(test)]
1266mod test {
1267 use std::vec;
1268
1269 use parking_lot::RwLock;
1270 use rstest::rstest;
1271
1272 use super::*;
1273 use crate::{
1274 block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
1275 storage::{WriteBatch, mem_store::MemStore},
1276 test_dag_builder::DagBuilder,
1277 test_dag_parser::parse_dag,
1278 };
1279
1280 #[tokio::test]
1281 async fn test_get_blocks() {
1282 let (context, _) = Context::new_for_test(4);
1283 let context = Arc::new(context);
1284 let store = Arc::new(MemStore::new());
1285 let mut dag_state = DagState::new(context.clone(), store.clone());
1286 let own_index = AuthorityIndex::new_for_test(0);
1287
1288 let num_rounds: u32 = 10;
1290 let non_existent_round: u32 = 100;
1291 let num_authorities: u32 = 3;
1292 let num_blocks_per_slot: usize = 3;
1293 let mut blocks = BTreeMap::new();
1294 for round in 1..=num_rounds {
1295 for author in 0..num_authorities {
1296 let base_ts = round as BlockTimestampMs * 1000;
1298 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1299 let block = VerifiedBlock::new_for_test(
1300 TestBlock::new(round, author)
1301 .set_timestamp_ms(timestamp)
1302 .build(),
1303 );
1304 dag_state.accept_block(block.clone());
1305 blocks.insert(block.reference(), block);
1306
1307 if AuthorityIndex::new_for_test(author) == own_index {
1309 break;
1310 }
1311 }
1312 }
1313 }
1314
1315 for (r, block) in &blocks {
1317 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1318 }
1319
1320 let last_ref = blocks.keys().last().unwrap();
1322 assert!(
1323 dag_state
1324 .get_block(&BlockRef::new(
1325 last_ref.round,
1326 last_ref.author,
1327 BlockDigest::MIN
1328 ))
1329 .is_none()
1330 );
1331
1332 for round in 1..=num_rounds {
1334 for author in 0..num_authorities {
1335 let slot = Slot::new(
1336 round,
1337 context
1338 .committee
1339 .to_authority_index(author as usize)
1340 .unwrap(),
1341 );
1342 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1343
1344 if AuthorityIndex::new_for_test(author) == own_index {
1346 assert_eq!(blocks.len(), 1);
1347 } else {
1348 assert_eq!(blocks.len(), num_blocks_per_slot);
1349 }
1350
1351 for b in blocks {
1352 assert_eq!(b.round(), round);
1353 assert_eq!(
1354 b.author(),
1355 context
1356 .committee
1357 .to_authority_index(author as usize)
1358 .unwrap()
1359 );
1360 }
1361 }
1362 }
1363
1364 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1366 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1367
1368 for round in 1..=num_rounds {
1370 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1371 assert_eq!(
1374 blocks.len(),
1375 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1376 );
1377 for b in blocks {
1378 assert_eq!(b.round(), round);
1379 }
1380 }
1381
1382 assert!(
1384 dag_state
1385 .get_uncommitted_blocks_at_round(non_existent_round)
1386 .is_empty()
1387 );
1388 }
1389
1390 #[tokio::test]
1391 async fn test_ancestors_at_uncommitted_round() {
1392 let (context, _) = Context::new_for_test(4);
1394 let context = Arc::new(context);
1395 let store = Arc::new(MemStore::new());
1396 let mut dag_state = DagState::new(context.clone(), store.clone());
1397
1398 let round_10_refs: Vec<_> = (0..4)
1402 .map(|a| {
1403 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1404 .reference()
1405 })
1406 .collect();
1407
1408 let round_11 = [
1410 VerifiedBlock::new_for_test(
1412 TestBlock::new(11, 0)
1413 .set_timestamp_ms(1100)
1414 .set_ancestors(round_10_refs.clone())
1415 .build(),
1416 ),
1417 VerifiedBlock::new_for_test(
1420 TestBlock::new(11, 1)
1421 .set_timestamp_ms(1110)
1422 .set_ancestors(round_10_refs.clone())
1423 .build(),
1424 ),
1425 VerifiedBlock::new_for_test(
1427 TestBlock::new(11, 1)
1428 .set_timestamp_ms(1111)
1429 .set_ancestors(round_10_refs.clone())
1430 .build(),
1431 ),
1432 VerifiedBlock::new_for_test(
1434 TestBlock::new(11, 1)
1435 .set_timestamp_ms(1112)
1436 .set_ancestors(round_10_refs.clone())
1437 .build(),
1438 ),
1439 VerifiedBlock::new_for_test(
1441 TestBlock::new(11, 2)
1442 .set_timestamp_ms(1120)
1443 .set_ancestors(round_10_refs.clone())
1444 .build(),
1445 ),
1446 VerifiedBlock::new_for_test(
1448 TestBlock::new(11, 3)
1449 .set_timestamp_ms(1130)
1450 .set_ancestors(round_10_refs.clone())
1451 .build(),
1452 ),
1453 ];
1454
1455 let ancestors_for_round_12 = vec![
1457 round_11[0].reference(),
1458 round_11[1].reference(),
1459 round_11[5].reference(),
1460 ];
1461 let round_12 = [
1462 VerifiedBlock::new_for_test(
1463 TestBlock::new(12, 0)
1464 .set_timestamp_ms(1200)
1465 .set_ancestors(ancestors_for_round_12.clone())
1466 .build(),
1467 ),
1468 VerifiedBlock::new_for_test(
1469 TestBlock::new(12, 2)
1470 .set_timestamp_ms(1220)
1471 .set_ancestors(ancestors_for_round_12.clone())
1472 .build(),
1473 ),
1474 VerifiedBlock::new_for_test(
1475 TestBlock::new(12, 3)
1476 .set_timestamp_ms(1230)
1477 .set_ancestors(ancestors_for_round_12.clone())
1478 .build(),
1479 ),
1480 ];
1481
1482 let ancestors_for_round_13 = vec![
1484 round_12[0].reference(),
1485 round_12[1].reference(),
1486 round_12[2].reference(),
1487 round_11[2].reference(),
1488 ];
1489 let round_13 = [
1490 VerifiedBlock::new_for_test(
1491 TestBlock::new(12, 1)
1492 .set_timestamp_ms(1300)
1493 .set_ancestors(ancestors_for_round_13.clone())
1494 .build(),
1495 ),
1496 VerifiedBlock::new_for_test(
1497 TestBlock::new(12, 2)
1498 .set_timestamp_ms(1320)
1499 .set_ancestors(ancestors_for_round_13.clone())
1500 .build(),
1501 ),
1502 VerifiedBlock::new_for_test(
1503 TestBlock::new(12, 3)
1504 .set_timestamp_ms(1330)
1505 .set_ancestors(ancestors_for_round_13.clone())
1506 .build(),
1507 ),
1508 ];
1509
1510 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1512 let anchor = VerifiedBlock::new_for_test(
1513 TestBlock::new(14, 1)
1514 .set_timestamp_ms(1410)
1515 .set_ancestors(ancestors_for_round_14)
1516 .build(),
1517 );
1518
1519 for b in round_11
1521 .iter()
1522 .chain(round_12.iter())
1523 .chain(round_13.iter())
1524 .chain([anchor.clone()].iter())
1525 {
1526 dag_state.accept_block(b.clone());
1527 }
1528
1529 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1531 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1532 ancestors_refs.sort();
1533 let mut expected_refs = vec![
1534 round_11[0].reference(),
1535 round_11[1].reference(),
1536 round_11[2].reference(),
1537 round_11[5].reference(),
1538 ];
1539 expected_refs.sort(); assert_eq!(
1542 ancestors_refs, expected_refs,
1543 "Expected round 11 ancestors: {expected_refs:?}. Got: {ancestors_refs:?}"
1544 );
1545 }
1546
1547 #[tokio::test]
1548 async fn test_contains_blocks_in_cache_or_store() {
1549 const CACHED_ROUNDS: Round = 2;
1551
1552 let (mut context, _) = Context::new_for_test(4);
1553 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1554
1555 let context = Arc::new(context);
1556 let store = Arc::new(MemStore::new());
1557 let mut dag_state = DagState::new(context.clone(), store.clone());
1558
1559 let num_rounds: u32 = 10;
1561 let num_authorities: u32 = 4;
1562 let mut blocks = Vec::new();
1563
1564 for round in 1..=num_rounds {
1565 for author in 0..num_authorities {
1566 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1567 blocks.push(block);
1568 }
1569 }
1570
1571 blocks.clone().into_iter().for_each(|block| {
1574 if block.round() <= 4 {
1575 store
1576 .write(WriteBatch::default().blocks(vec![block]))
1577 .unwrap();
1578 } else {
1579 dag_state.accept_blocks(vec![block]);
1580 }
1581 });
1582
1583 let mut block_refs = blocks
1587 .iter()
1588 .map(|block| block.reference())
1589 .collect::<Vec<_>>();
1590 let result = dag_state.contains_blocks(block_refs.clone());
1591
1592 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1594 assert_eq!(result, expected);
1595
1596 block_refs.insert(
1598 3,
1599 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1600 );
1601 let result = dag_state.contains_blocks(block_refs.clone());
1602
1603 expected.insert(3, false);
1605 assert_eq!(result, expected.clone());
1606 }
1607
1608 #[tokio::test]
1609 async fn test_contains_cached_block_at_slot() {
1610 const CACHED_ROUNDS: Round = 2;
1612
1613 let num_authorities: u32 = 4;
1614 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1615 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1616
1617 let context = Arc::new(context);
1618 let store = Arc::new(MemStore::new());
1619 let mut dag_state = DagState::new(context.clone(), store.clone());
1620
1621 let num_rounds: u32 = 10;
1623 let mut blocks = Vec::new();
1624
1625 for round in 1..=num_rounds {
1626 for author in 0..num_authorities {
1627 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1628 blocks.push(block.clone());
1629 dag_state.accept_block(block);
1630 }
1631 }
1632
1633 for (author, _) in context.committee.authorities() {
1635 assert!(
1636 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1637 "Genesis should always be found"
1638 );
1639 }
1640
1641 let mut block_refs = blocks
1645 .iter()
1646 .map(|block| block.reference())
1647 .collect::<Vec<_>>();
1648
1649 for block_ref in block_refs.clone() {
1650 let slot = block_ref.into();
1651 let found = dag_state.contains_cached_block_at_slot(slot);
1652 assert!(found, "A block should be found at slot {slot}");
1653 }
1654
1655 block_refs.insert(
1658 3,
1659 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1660 );
1661 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1662 expected.insert(3, false);
1663
1664 for block_ref in block_refs {
1666 let slot = block_ref.into();
1667 let found = dag_state.contains_cached_block_at_slot(slot);
1668
1669 assert_eq!(expected.remove(0), found);
1670 }
1671 }
1672
1673 #[tokio::test]
1674 #[should_panic(
1675 expected = "Attempted to check for slot S8[0] that is <= the last gc evicted round 8"
1676 )]
1677 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1678 const CACHED_ROUNDS: Round = 2;
1680 const GC_DEPTH: u32 = 1;
1681 let (mut context, _) = Context::new_for_test(4);
1682 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1683 context
1684 .protocol_config
1685 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1686
1687 let context = Arc::new(context);
1688 let store = Arc::new(MemStore::new());
1689 let mut dag_state = DagState::new(context.clone(), store.clone());
1690
1691 let mut blocks = Vec::new();
1693 for round in 1..=10 {
1694 let block = VerifiedBlock::new_for_test(TestBlock::new(round, 0).build());
1695 blocks.push(block.clone());
1696 dag_state.accept_block(block);
1697 }
1698
1699 dag_state.add_commit(TrustedCommit::new_for_test(
1701 1 as CommitIndex,
1702 CommitDigest::MIN,
1703 0,
1704 blocks.last().unwrap().reference(),
1705 blocks
1706 .into_iter()
1707 .map(|block| block.reference())
1708 .collect::<Vec<_>>(),
1709 ));
1710
1711 dag_state.flush();
1712
1713 let _ =
1717 dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0)));
1718 }
1719
1720 #[tokio::test]
1721 #[should_panic(
1722 expected = "Attempted to check for slot S3[1] that is <= the last gc evicted round 3"
1723 )]
1724 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
1725 const GC_DEPTH: u32 = 2;
1729 const CACHED_ROUNDS: Round = 3;
1731
1732 let (mut context, _) = Context::new_for_test(4);
1733 context
1734 .protocol_config
1735 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1736 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1737
1738 let context = Arc::new(context);
1739 let store = Arc::new(MemStore::new());
1740 let mut dag_state = DagState::new(context.clone(), store.clone());
1741
1742 let mut dag_builder = DagBuilder::new(context.clone());
1745 dag_builder.layers(1..=3).build();
1746 dag_builder
1747 .layers(4..=6)
1748 .authorities(vec![AuthorityIndex::new_for_test(0)])
1749 .skip_block()
1750 .build();
1751
1752 dag_builder
1754 .all_blocks()
1755 .into_iter()
1756 .for_each(|block| dag_state.accept_block(block));
1757
1758 dag_state.add_commit(TrustedCommit::new_for_test(
1760 1 as CommitIndex,
1761 CommitDigest::MIN,
1762 0,
1763 dag_builder.leader_block(5).unwrap().reference(),
1764 vec![],
1765 ));
1766
1767 dag_state.flush();
1768
1769 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1771
1772 for authority_index in 1..=3 {
1777 for round in 4..=6 {
1778 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1779 round,
1780 AuthorityIndex::new_for_test(authority_index)
1781 )));
1782 }
1783 }
1784
1785 for round in 1..=3 {
1786 assert!(
1787 dag_state.contains_cached_block_at_slot(Slot::new(
1788 round,
1789 AuthorityIndex::new_for_test(0)
1790 ))
1791 );
1792 }
1793
1794 let _ =
1797 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1798 }
1799
1800 #[tokio::test]
1801 async fn test_get_blocks_in_cache_or_store() {
1802 let (context, _) = Context::new_for_test(4);
1803 let context = Arc::new(context);
1804 let store = Arc::new(MemStore::new());
1805 let mut dag_state = DagState::new(context.clone(), store.clone());
1806
1807 let num_rounds: u32 = 10;
1809 let num_authorities: u32 = 4;
1810 let mut blocks = Vec::new();
1811
1812 for round in 1..=num_rounds {
1813 for author in 0..num_authorities {
1814 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1815 blocks.push(block);
1816 }
1817 }
1818
1819 blocks.clone().into_iter().for_each(|block| {
1822 if block.round() <= 4 {
1823 store
1824 .write(WriteBatch::default().blocks(vec![block]))
1825 .unwrap();
1826 } else {
1827 dag_state.accept_blocks(vec![block]);
1828 }
1829 });
1830
1831 let mut block_refs = blocks
1835 .iter()
1836 .map(|block| block.reference())
1837 .collect::<Vec<_>>();
1838 let result = dag_state.get_blocks(&block_refs);
1839
1840 let mut expected = blocks
1841 .into_iter()
1842 .map(Some)
1843 .collect::<Vec<Option<VerifiedBlock>>>();
1844
1845 assert_eq!(result, expected.clone());
1847
1848 block_refs.insert(
1850 3,
1851 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1852 );
1853 let result = dag_state.get_blocks(&block_refs);
1854
1855 expected.insert(3, None);
1857 assert_eq!(result, expected);
1858 }
1859
1860 #[rstest]
1862 #[tokio::test]
1863 async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) {
1864 telemetry_subscribers::init_for_testing();
1865 let num_authorities: u32 = 4;
1866 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1867 context
1868 .protocol_config
1869 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
1870
1871 if gc_depth > 0 {
1872 context
1873 .protocol_config
1874 .set_consensus_gc_depth_for_testing(gc_depth);
1875 }
1876
1877 let context = Arc::new(context);
1878 let store = Arc::new(MemStore::new());
1879 let mut dag_state = DagState::new(context.clone(), store.clone());
1880
1881 let num_rounds: u32 = 10;
1883 let mut dag_builder = DagBuilder::new(context.clone());
1884 dag_builder.layers(1..=num_rounds).build();
1885 let mut commits = vec![];
1886
1887 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1888 commits.push(commit);
1889 }
1890
1891 let temp_commits = commits.split_off(5);
1893 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1894 for commit in commits.clone() {
1895 dag_state.add_commit(commit);
1896 }
1897
1898 dag_state.flush();
1900
1901 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
1903 for commit in temp_commits.clone() {
1904 dag_state.add_commit(commit);
1905 }
1906
1907 let all_blocks = dag_builder.blocks(6..=num_rounds);
1909 let block_refs = all_blocks
1910 .iter()
1911 .map(|block| block.reference())
1912 .collect::<Vec<_>>();
1913
1914 let result = dag_state
1915 .get_blocks(&block_refs)
1916 .into_iter()
1917 .map(|b| b.unwrap())
1918 .collect::<Vec<_>>();
1919 assert_eq!(result, all_blocks);
1920
1921 assert_eq!(dag_state.last_commit_index(), 10);
1923 assert_eq!(
1924 dag_state.last_committed_rounds(),
1925 dag_builder.last_committed_rounds.clone()
1926 );
1927
1928 drop(dag_state);
1930
1931 let dag_state = DagState::new(context.clone(), store.clone());
1933
1934 let blocks = dag_builder.blocks(1..=5);
1936 let block_refs = blocks
1937 .iter()
1938 .map(|block| block.reference())
1939 .collect::<Vec<_>>();
1940 let result = dag_state
1941 .get_blocks(&block_refs)
1942 .into_iter()
1943 .map(|b| b.unwrap())
1944 .collect::<Vec<_>>();
1945 assert_eq!(result, blocks);
1946
1947 let missing_blocks = dag_builder.blocks(6..=num_rounds);
1949 let block_refs = missing_blocks
1950 .iter()
1951 .map(|block| block.reference())
1952 .collect::<Vec<_>>();
1953 let retrieved_blocks = dag_state
1954 .get_blocks(&block_refs)
1955 .into_iter()
1956 .flatten()
1957 .collect::<Vec<_>>();
1958 assert!(retrieved_blocks.is_empty());
1959
1960 assert_eq!(dag_state.last_commit_index(), 5);
1962
1963 let expected_last_committed_rounds = vec![4, 5, 4, 4];
1965 assert_eq!(
1966 dag_state.last_committed_rounds(),
1967 expected_last_committed_rounds
1968 );
1969
1970 assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
1973 }
1974
1975 #[tokio::test]
1976 async fn test_flush_and_recovery() {
1977 telemetry_subscribers::init_for_testing();
1978 let num_authorities: u32 = 4;
1979 let (context, _) = Context::new_for_test(num_authorities as usize);
1980 let context = Arc::new(context);
1981 let store = Arc::new(MemStore::new());
1982 let mut dag_state = DagState::new(context.clone(), store.clone());
1983
1984 let num_rounds: u32 = 10;
1986 let mut dag_builder = DagBuilder::new(context.clone());
1987 dag_builder.layers(1..=num_rounds).build();
1988 let mut commits = vec![];
1989 for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) {
1990 commits.push(commit);
1991 }
1992
1993 let temp_commits = commits.split_off(5);
1995 dag_state.accept_blocks(dag_builder.blocks(1..=5));
1996 for commit in commits.clone() {
1997 dag_state.add_commit(commit);
1998 }
1999
2000 dag_state.flush();
2002
2003 dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
2005 for commit in temp_commits.clone() {
2006 dag_state.add_commit(commit);
2007 }
2008
2009 let all_blocks = dag_builder.blocks(6..=num_rounds);
2011 let block_refs = all_blocks
2012 .iter()
2013 .map(|block| block.reference())
2014 .collect::<Vec<_>>();
2015 let result = dag_state
2016 .get_blocks(&block_refs)
2017 .into_iter()
2018 .map(|b| b.unwrap())
2019 .collect::<Vec<_>>();
2020 assert_eq!(result, all_blocks);
2021
2022 assert_eq!(dag_state.last_commit_index(), 10);
2024 assert_eq!(
2025 dag_state.last_committed_rounds(),
2026 dag_builder.last_committed_rounds.clone()
2027 );
2028
2029 drop(dag_state);
2031
2032 let dag_state = DagState::new(context.clone(), store.clone());
2034
2035 let blocks = dag_builder.blocks(1..=5);
2037 let block_refs = blocks
2038 .iter()
2039 .map(|block| block.reference())
2040 .collect::<Vec<_>>();
2041 let result = dag_state
2042 .get_blocks(&block_refs)
2043 .into_iter()
2044 .map(|b| b.unwrap())
2045 .collect::<Vec<_>>();
2046 assert_eq!(result, blocks);
2047
2048 let missing_blocks = dag_builder.blocks(6..=num_rounds);
2050 let block_refs = missing_blocks
2051 .iter()
2052 .map(|block| block.reference())
2053 .collect::<Vec<_>>();
2054 let retrieved_blocks = dag_state
2055 .get_blocks(&block_refs)
2056 .into_iter()
2057 .flatten()
2058 .collect::<Vec<_>>();
2059 assert!(retrieved_blocks.is_empty());
2060
2061 assert_eq!(dag_state.last_commit_index(), 5);
2063
2064 let expected_last_committed_rounds = vec![4, 5, 4, 4];
2066 assert_eq!(
2067 dag_state.last_committed_rounds(),
2068 expected_last_committed_rounds
2069 );
2070 assert_eq!(dag_state.scoring_subdags_count(), 5);
2073 }
2074
2075 #[tokio::test]
2076 async fn test_flush_and_recovery_gc_enabled() {
2077 telemetry_subscribers::init_for_testing();
2078
2079 const GC_DEPTH: u32 = 3;
2080 const CACHED_ROUNDS: u32 = 4;
2081
2082 let num_authorities: u32 = 4;
2083 let (mut context, _) = Context::new_for_test(num_authorities as usize);
2084 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2085 context
2086 .protocol_config
2087 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2088 context
2089 .protocol_config
2090 .set_consensus_linearize_subdag_v2_for_testing(true);
2091
2092 let context = Arc::new(context);
2093
2094 let store = Arc::new(MemStore::new());
2095 let mut dag_state = DagState::new(context.clone(), store.clone());
2096
2097 let num_rounds: u32 = 10;
2098 let mut dag_builder = DagBuilder::new(context.clone());
2099 dag_builder.layers(1..=5).build();
2100 dag_builder
2101 .layers(6..=8)
2102 .authorities(vec![AuthorityIndex::new_for_test(0)])
2103 .skip_block()
2104 .build();
2105 dag_builder.layers(9..=num_rounds).build();
2106
2107 let mut commits = dag_builder
2108 .get_sub_dag_and_commits(1..=num_rounds)
2109 .into_iter()
2110 .map(|(_subdag, commit)| commit)
2111 .collect::<Vec<_>>();
2112
2113 let temp_commits = commits.split_off(7);
2117 dag_state.accept_blocks(dag_builder.blocks(1..=8));
2118 for commit in commits.clone() {
2119 dag_state.add_commit(commit);
2120 }
2121
2122 let mut all_committed_blocks = BTreeSet::<BlockRef>::new();
2125 for commit in commits.iter() {
2126 all_committed_blocks.extend(commit.blocks());
2127 }
2128 dag_state.flush();
2130
2131 dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds));
2133 for commit in temp_commits.clone() {
2134 dag_state.add_commit(commit);
2135 }
2136
2137 let all_blocks = dag_builder.blocks(1..=num_rounds);
2139 let block_refs = all_blocks
2140 .iter()
2141 .map(|block| block.reference())
2142 .collect::<Vec<_>>();
2143 let result = dag_state
2144 .get_blocks(&block_refs)
2145 .into_iter()
2146 .map(|b| b.unwrap())
2147 .collect::<Vec<_>>();
2148 assert_eq!(result, all_blocks);
2149
2150 assert_eq!(dag_state.last_commit_index(), 9);
2152 assert_eq!(
2153 dag_state.last_committed_rounds(),
2154 dag_builder.last_committed_rounds.clone()
2155 );
2156
2157 drop(dag_state);
2159
2160 let dag_state = DagState::new(context.clone(), store.clone());
2162
2163 let blocks = dag_builder.blocks(1..=5);
2165 let block_refs = blocks
2166 .iter()
2167 .map(|block| block.reference())
2168 .collect::<Vec<_>>();
2169 let result = dag_state
2170 .get_blocks(&block_refs)
2171 .into_iter()
2172 .map(|b| b.unwrap())
2173 .collect::<Vec<_>>();
2174 assert_eq!(result, blocks);
2175
2176 let missing_blocks = dag_builder.blocks(9..=num_rounds);
2178 let block_refs = missing_blocks
2179 .iter()
2180 .map(|block| block.reference())
2181 .collect::<Vec<_>>();
2182 let retrieved_blocks = dag_state
2183 .get_blocks(&block_refs)
2184 .into_iter()
2185 .flatten()
2186 .collect::<Vec<_>>();
2187 assert!(retrieved_blocks.is_empty());
2188
2189 assert_eq!(dag_state.last_commit_index(), 7);
2191
2192 let expected_last_committed_rounds = vec![5, 6, 6, 7];
2194 assert_eq!(
2195 dag_state.last_committed_rounds(),
2196 expected_last_committed_rounds
2197 );
2198 assert_eq!(dag_state.scoring_subdags_count(), 7);
2201 for (authority_index, _) in context.committee.authorities() {
2203 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2204
2205 if authority_index == AuthorityIndex::new_for_test(0) {
2210 assert_eq!(blocks.len(), 4);
2211 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1);
2212 assert!(
2213 blocks
2214 .into_iter()
2215 .all(|block| block.round() >= 2 && block.round() <= 5)
2216 );
2217 } else {
2218 assert_eq!(blocks.len(), 4);
2219 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4);
2220 assert!(
2221 blocks
2222 .into_iter()
2223 .all(|block| block.round() >= 5 && block.round() <= 8)
2224 );
2225 }
2226 }
2227 let gc_round = dag_state.gc_round();
2230 assert_eq!(gc_round, 4);
2231 dag_state
2232 .recent_blocks
2233 .iter()
2234 .for_each(|(block_ref, block_info)| {
2235 if block_ref.round > gc_round && all_committed_blocks.contains(block_ref) {
2236 assert!(
2237 block_info.committed,
2238 "Block {block_ref:?} should be committed"
2239 );
2240 };
2241 });
2242 }
2243
2244 #[tokio::test]
2245 async fn test_block_info_as_committed() {
2246 let num_authorities: u32 = 4;
2247 let (context, _) = Context::new_for_test(num_authorities as usize);
2248 let context = Arc::new(context);
2249
2250 let store = Arc::new(MemStore::new());
2251 let mut dag_state = DagState::new(context.clone(), store.clone());
2252
2253 let block = VerifiedBlock::new_for_test(
2255 TestBlock::new(1, 0)
2256 .set_timestamp_ms(1000)
2257 .set_ancestors(vec![])
2258 .build(),
2259 );
2260
2261 dag_state.accept_block(block.clone());
2262
2263 assert!(!dag_state.is_committed(&block.reference()));
2265
2266 assert!(
2268 dag_state.set_committed(&block.reference()),
2269 "Block should be successfully set as committed for first time"
2270 );
2271
2272 assert!(dag_state.is_committed(&block.reference()));
2274
2275 assert!(
2277 !dag_state.set_committed(&block.reference()),
2278 "Block should not be successfully set as committed"
2279 );
2280 }
2281
2282 #[tokio::test]
2283 async fn test_get_cached_blocks() {
2284 let (mut context, _) = Context::new_for_test(4);
2285 context.parameters.dag_state_cached_rounds = 5;
2286
2287 let context = Arc::new(context);
2288 let store = Arc::new(MemStore::new());
2289 let mut dag_state = DagState::new(context.clone(), store.clone());
2290
2291 let mut all_blocks = Vec::new();
2296 for author in 1..=3 {
2297 for round in 10..(10 + author) {
2298 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2299 all_blocks.push(block.clone());
2300 dag_state.accept_block(block);
2301 }
2302 }
2303
2304 let cached_blocks =
2305 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2306 assert!(cached_blocks.is_empty());
2307
2308 let cached_blocks =
2309 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2310 assert_eq!(cached_blocks.len(), 1);
2311 assert_eq!(cached_blocks[0].round(), 10);
2312
2313 let cached_blocks =
2314 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2315 assert_eq!(cached_blocks.len(), 2);
2316 assert_eq!(cached_blocks[0].round(), 10);
2317 assert_eq!(cached_blocks[1].round(), 11);
2318
2319 let cached_blocks =
2320 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2321 assert_eq!(cached_blocks.len(), 1);
2322 assert_eq!(cached_blocks[0].round(), 11);
2323
2324 let cached_blocks =
2325 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2326 assert_eq!(cached_blocks.len(), 3);
2327 assert_eq!(cached_blocks[0].round(), 10);
2328 assert_eq!(cached_blocks[1].round(), 11);
2329 assert_eq!(cached_blocks[2].round(), 12);
2330
2331 let cached_blocks =
2332 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2333 assert_eq!(cached_blocks.len(), 1);
2334 assert_eq!(cached_blocks[0].round(), 12);
2335
2336 let cached_blocks = dag_state.get_cached_blocks_in_range(
2340 context.committee.to_authority_index(3).unwrap(),
2341 10,
2342 10,
2343 1,
2344 );
2345 assert!(cached_blocks.is_empty());
2346
2347 let cached_blocks = dag_state.get_cached_blocks_in_range(
2349 context.committee.to_authority_index(3).unwrap(),
2350 11,
2351 10,
2352 1,
2353 );
2354 assert!(cached_blocks.is_empty());
2355
2356 let cached_blocks = dag_state.get_cached_blocks_in_range(
2358 context.committee.to_authority_index(0).unwrap(),
2359 9,
2360 10,
2361 1,
2362 );
2363 assert!(cached_blocks.is_empty());
2364
2365 let cached_blocks = dag_state.get_cached_blocks_in_range(
2367 context.committee.to_authority_index(1).unwrap(),
2368 9,
2369 11,
2370 1,
2371 );
2372 assert_eq!(cached_blocks.len(), 1);
2373 assert_eq!(cached_blocks[0].round(), 10);
2374
2375 let cached_blocks = dag_state.get_cached_blocks_in_range(
2377 context.committee.to_authority_index(2).unwrap(),
2378 9,
2379 12,
2380 5,
2381 );
2382 assert_eq!(cached_blocks.len(), 2);
2383 assert_eq!(cached_blocks[0].round(), 10);
2384 assert_eq!(cached_blocks[1].round(), 11);
2385
2386 let cached_blocks = dag_state.get_cached_blocks_in_range(
2388 context.committee.to_authority_index(3).unwrap(),
2389 11,
2390 20,
2391 5,
2392 );
2393 assert_eq!(cached_blocks.len(), 2);
2394 assert_eq!(cached_blocks[0].round(), 11);
2395 assert_eq!(cached_blocks[1].round(), 12);
2396
2397 let cached_blocks = dag_state.get_cached_blocks_in_range(
2399 context.committee.to_authority_index(3).unwrap(),
2400 10,
2401 20,
2402 1,
2403 );
2404 assert_eq!(cached_blocks.len(), 1);
2405 assert_eq!(cached_blocks[0].round(), 10);
2406 }
2407
2408 #[rstest]
2409 #[tokio::test]
2410 async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
2411 const CACHED_ROUNDS: Round = 2;
2413 let (mut context, _) = Context::new_for_test(4);
2414 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2415
2416 if gc_depth > 0 {
2417 context
2418 .protocol_config
2419 .set_consensus_gc_depth_for_testing(gc_depth);
2420 }
2421
2422 let context = Arc::new(context);
2423 let store = Arc::new(MemStore::new());
2424 let mut dag_state = DagState::new(context.clone(), store.clone());
2425
2426 let dag_str = "DAG {
2431 Round 0 : { 4 },
2432 Round 1 : {
2433 B -> [*],
2434 C -> [*],
2435 D -> [*],
2436 },
2437 Round 2 : {
2438 C -> [*],
2439 D -> [*],
2440 },
2441 Round 3 : {
2442 D -> [*],
2443 },
2444 }";
2445
2446 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2447
2448 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2450
2451 for block in dag_builder
2453 .all_blocks()
2454 .into_iter()
2455 .chain(std::iter::once(block))
2456 {
2457 dag_state.accept_block(block);
2458 }
2459
2460 dag_state.add_commit(TrustedCommit::new_for_test(
2461 1 as CommitIndex,
2462 CommitDigest::MIN,
2463 context.clock.timestamp_utc_ms(),
2464 dag_builder.leader_block(3).unwrap().reference(),
2465 vec![],
2466 ));
2467
2468 let end_round = 4;
2470 let expected_rounds = vec![0, 1, 2, 3];
2471 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2472 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2474 assert_eq!(
2475 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2476 expected_rounds
2477 );
2478 assert_eq!(
2479 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2480 expected_excluded_and_equivocating_blocks
2481 );
2482
2483 for (i, expected_round) in expected_rounds.iter().enumerate() {
2485 let round = dag_state
2486 .get_last_cached_block_in_range(
2487 context.committee.to_authority_index(i).unwrap(),
2488 0,
2489 end_round,
2490 )
2491 .map(|b| b.round())
2492 .unwrap_or_default();
2493 assert_eq!(round, *expected_round, "Authority {i}");
2494 }
2495
2496 let start_round = 2;
2498 let expected_rounds = [0, 0, 2, 3];
2499
2500 for (i, expected_round) in expected_rounds.iter().enumerate() {
2502 let round = dag_state
2503 .get_last_cached_block_in_range(
2504 context.committee.to_authority_index(i).unwrap(),
2505 start_round,
2506 end_round,
2507 )
2508 .map(|b| b.round())
2509 .unwrap_or_default();
2510 assert_eq!(round, *expected_round, "Authority {i}");
2511 }
2512
2513 dag_state.flush();
2522
2523 let end_round = 3;
2525 let expected_rounds = vec![0, 1, 2, 2];
2526
2527 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2529 assert_eq!(
2530 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2531 expected_rounds
2532 );
2533
2534 for (i, expected_round) in expected_rounds.iter().enumerate() {
2536 let round = dag_state
2537 .get_last_cached_block_in_range(
2538 context.committee.to_authority_index(i).unwrap(),
2539 0,
2540 end_round,
2541 )
2542 .map(|b| b.round())
2543 .unwrap_or_default();
2544 assert_eq!(round, *expected_round, "Authority {i}");
2545 }
2546 }
2547
2548 #[tokio::test]
2549 #[should_panic(
2550 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2551 )]
2552 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2553 const CACHED_ROUNDS: Round = 1;
2555 const GC_DEPTH: u32 = 1;
2556 let (mut context, _) = Context::new_for_test(4);
2557 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2558 context
2559 .protocol_config
2560 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2561
2562 let context = Arc::new(context);
2563 let store = Arc::new(MemStore::new());
2564 let mut dag_state = DagState::new(context.clone(), store.clone());
2565
2566 let mut dag_builder = DagBuilder::new(context.clone());
2571 dag_builder
2572 .layers(1..=1)
2573 .authorities(vec![AuthorityIndex::new_for_test(0)])
2574 .skip_block()
2575 .build();
2576 dag_builder
2577 .layers(2..=2)
2578 .authorities(vec![
2579 AuthorityIndex::new_for_test(0),
2580 AuthorityIndex::new_for_test(1),
2581 ])
2582 .skip_block()
2583 .build();
2584 dag_builder
2585 .layers(3..=3)
2586 .authorities(vec![
2587 AuthorityIndex::new_for_test(0),
2588 AuthorityIndex::new_for_test(1),
2589 AuthorityIndex::new_for_test(2),
2590 ])
2591 .skip_block()
2592 .build();
2593
2594 for block in dag_builder.all_blocks() {
2596 dag_state.accept_block(block);
2597 }
2598
2599 dag_state.add_commit(TrustedCommit::new_for_test(
2600 1 as CommitIndex,
2601 CommitDigest::MIN,
2602 0,
2603 dag_builder.leader_block(3).unwrap().reference(),
2604 vec![],
2605 ));
2606
2607 dag_state.flush();
2609
2610 dag_state.get_last_cached_block_per_authority(2);
2613 }
2614
2615 #[tokio::test]
2616 async fn test_last_quorum() {
2617 let (context, _) = Context::new_for_test(4);
2619 let context = Arc::new(context);
2620 let store = Arc::new(MemStore::new());
2621 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2622
2623 {
2625 let genesis = genesis_blocks(&context);
2626
2627 assert_eq!(dag_state.read().last_quorum(), genesis);
2628 }
2629
2630 {
2633 let mut dag_builder = DagBuilder::new(context.clone());
2634 dag_builder
2635 .layers(1..=4)
2636 .build()
2637 .persist_layers(dag_state.clone());
2638 let round_4_blocks: Vec<_> = dag_builder
2639 .blocks(4..=4)
2640 .into_iter()
2641 .map(|block| block.reference())
2642 .collect();
2643
2644 let last_quorum = dag_state.read().last_quorum();
2645
2646 assert_eq!(
2647 last_quorum
2648 .into_iter()
2649 .map(|block| block.reference())
2650 .collect::<Vec<_>>(),
2651 round_4_blocks
2652 );
2653 }
2654
2655 {
2658 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2659 dag_state.write().accept_block(block);
2660
2661 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2662
2663 let last_quorum = dag_state.read().last_quorum();
2664
2665 assert_eq!(last_quorum, round_4_blocks);
2666 }
2667 }
2668
2669 #[tokio::test]
2670 async fn test_last_block_for_authority() {
2671 let (context, _) = Context::new_for_test(4);
2673 let context = Arc::new(context);
2674 let store = Arc::new(MemStore::new());
2675 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2676
2677 {
2679 let genesis = genesis_blocks(&context);
2680 let my_genesis = genesis
2681 .into_iter()
2682 .find(|block| block.author() == context.own_index)
2683 .unwrap();
2684
2685 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2686 }
2687
2688 {
2691 let mut dag_builder = DagBuilder::new(context.clone());
2693 dag_builder
2694 .layers(1..=4)
2695 .build()
2696 .persist_layers(dag_state.clone());
2697
2698 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2700 dag_state.write().accept_block(block);
2701
2702 let block = dag_state
2703 .read()
2704 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2705 assert_eq!(block.round(), 5);
2706
2707 for (authority_index, _) in context.committee.authorities() {
2708 let block = dag_state
2709 .read()
2710 .get_last_block_for_authority(authority_index);
2711
2712 if authority_index.value() == 0 {
2713 assert_eq!(block.round(), 5);
2714 } else {
2715 assert_eq!(block.round(), 4);
2716 }
2717 }
2718 }
2719 }
2720
2721 #[tokio::test]
2722 #[should_panic]
2723 async fn test_accept_block_panics_when_timestamp_is_ahead() {
2724 let (mut context, _) = Context::new_for_test(4);
2726 context
2727 .protocol_config
2728 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
2729 let context = Arc::new(context);
2730 let store = Arc::new(MemStore::new());
2731 let mut dag_state = DagState::new(context.clone(), store.clone());
2732
2733 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2735
2736 let block = VerifiedBlock::new_for_test(
2737 TestBlock::new(10, 0)
2738 .set_timestamp_ms(block_timestamp)
2739 .build(),
2740 );
2741
2742 dag_state.accept_block(block);
2745 }
2746
2747 #[tokio::test]
2748 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2749 let (mut context, _) = Context::new_for_test(4);
2751 context
2752 .protocol_config
2753 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(true);
2754
2755 let context = Arc::new(context);
2756 let store = Arc::new(MemStore::new());
2757 let mut dag_state = DagState::new(context.clone(), store.clone());
2758
2759 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2761
2762 let block = VerifiedBlock::new_for_test(
2763 TestBlock::new(10, 0)
2764 .set_timestamp_ms(block_timestamp)
2765 .build(),
2766 );
2767
2768 dag_state.accept_block(block);
2770 }
2771}