1use std::{
6 collections::{BTreeMap, BTreeSet},
7 iter, mem,
8 sync::Arc,
9 time::Duration,
10 vec,
11};
12
13use consensus_config::{AuthorityIndex, ProtocolKeyPair};
14#[cfg(test)]
15use consensus_config::{Stake, local_committee_and_keys};
16use iota_macros::fail_point;
17#[cfg(test)]
18use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
19use iota_metrics::monitored_scope;
20use itertools::Itertools as _;
21use parking_lot::RwLock;
22use tokio::{
23 sync::{broadcast, watch},
24 time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30 CommitConsumer, TransactionClient, block_verifier::NoopBlockVerifier,
31 storage::mem_store::MemStore,
32};
33use crate::{
34 ancestor::{AncestorState, AncestorStateManager},
35 block::{
36 Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, GENESIS_ROUND, Round,
37 SignedBlock, Slot, VerifiedBlock,
38 },
39 block_manager::BlockManager,
40 commit::{
41 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42 },
43 commit_observer::CommitObserver,
44 context::Context,
45 dag_state::DagState,
46 error::{ConsensusError, ConsensusResult},
47 leader_schedule::LeaderSchedule,
48 round_prober::QuorumRound,
49 stake_aggregator::{QuorumThreshold, StakeAggregator},
50 transaction::TransactionConsumer,
51 universal_committer::{
52 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
53 },
54};
55
56const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
59
60pub(crate) struct Core {
61 context: Arc<Context>,
62 transaction_consumer: TransactionConsumer,
65 block_manager: BlockManager,
69 quorum_subscribers_exists: bool,
74 propagation_delay: Round,
82
83 committer: UniversalCommitter,
85 last_signaled_round: Round,
87 last_included_ancestors: Vec<Option<BlockRef>>,
92 last_decided_leader: Slot,
99 leader_schedule: Arc<LeaderSchedule>,
102 commit_observer: CommitObserver,
106 signals: CoreSignals,
108 block_signer: ProtocolKeyPair,
110 dag_state: Arc<RwLock<DagState>>,
113 last_known_proposed_round: Option<Round>,
119 ancestor_state_manager: AncestorStateManager,
124}
125
126impl Core {
127 pub(crate) fn new(
128 context: Arc<Context>,
129 leader_schedule: Arc<LeaderSchedule>,
130 transaction_consumer: TransactionConsumer,
131 block_manager: BlockManager,
132 subscriber_exists: bool,
133 commit_observer: CommitObserver,
134 signals: CoreSignals,
135 block_signer: ProtocolKeyPair,
136 dag_state: Arc<RwLock<DagState>>,
137 sync_last_known_own_block: bool,
138 ) -> Self {
139 let last_decided_leader = dag_state.read().last_commit_leader();
140 let committer = UniversalCommitterBuilder::new(
141 context.clone(),
142 leader_schedule.clone(),
143 dag_state.clone(),
144 )
145 .with_number_of_leaders(1)
148 .with_pipeline(true)
149 .build();
150
151 let last_proposed_block = dag_state.read().get_last_proposed_block();
153
154 let last_signaled_round = last_proposed_block.round();
155
156 let mut last_included_ancestors = vec![None; context.committee.size()];
167 for ancestor in last_proposed_block.ancestors() {
168 last_included_ancestors[ancestor.author] = Some(*ancestor);
169 }
170
171 let min_propose_round = if sync_last_known_own_block {
172 None
173 } else {
174 Some(0)
177 };
178
179 let propagation_scores = leader_schedule
180 .leader_swap_table
181 .read()
182 .reputation_scores
183 .clone();
184 let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
185 ancestor_state_manager.set_propagation_scores(propagation_scores);
186
187 Self {
188 context,
189 last_signaled_round,
190 last_included_ancestors,
191 last_decided_leader,
192 leader_schedule,
193 transaction_consumer,
194 block_manager,
195 quorum_subscribers_exists: subscriber_exists,
196 propagation_delay: 0,
197 committer,
198 commit_observer,
199 signals,
200 block_signer,
201 dag_state,
202 last_known_proposed_round: min_propose_round,
203 ancestor_state_manager,
204 }
205 .recover()
206 }
207
208 fn recover(mut self) -> Self {
209 let _s = self
210 .context
211 .metrics
212 .node_metrics
213 .scope_processing_time
214 .with_label_values(&["Core::recover"])
215 .start_timer();
216 let ancestor_blocks = self
218 .dag_state
219 .read()
220 .get_last_cached_block_per_authority(Round::MAX);
221 let max_ancestor_timestamp = ancestor_blocks
222 .iter()
223 .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms()));
224 let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms());
225 if wait_ms > 0 {
226 warn!(
227 "Waiting for {} ms while recovering ancestors from storage",
228 wait_ms
229 );
230 std::thread::sleep(Duration::from_millis(wait_ms));
231 }
232
233 self.try_commit(vec![]).unwrap();
236 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
237 {
238 last_proposed_block
239 } else {
240 let last_proposed_block = self.dag_state.read().get_last_proposed_block();
241 if self.should_propose() {
242 assert!(
243 last_proposed_block.round() > GENESIS_ROUND,
244 "At minimum a block of round higher than genesis should have been produced during recovery"
245 );
246 }
247
248 self.signals
251 .new_block(ExtendedBlock {
252 block: last_proposed_block.clone(),
253 excluded_ancestors: vec![],
254 })
255 .unwrap();
256 last_proposed_block
257 };
258
259 self.try_signal_new_round();
263
264 info!(
265 "Core recovery completed with last proposed block {:?}",
266 last_proposed_block
267 );
268
269 self
270 }
271
272 #[tracing::instrument(skip_all)]
276 pub(crate) fn add_blocks(
277 &mut self,
278 blocks: Vec<VerifiedBlock>,
279 ) -> ConsensusResult<BTreeSet<BlockRef>> {
280 let _scope = monitored_scope("Core::add_blocks");
281 let _s = self
282 .context
283 .metrics
284 .node_metrics
285 .scope_processing_time
286 .with_label_values(&["Core::add_blocks"])
287 .start_timer();
288 self.context
289 .metrics
290 .node_metrics
291 .core_add_blocks_batch_size
292 .observe(blocks.len() as f64);
293
294 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
295
296 if !accepted_blocks.is_empty() {
297 debug!(
298 "Accepted blocks: {}",
299 accepted_blocks
300 .iter()
301 .map(|b| b.reference().to_string())
302 .join(",")
303 );
304
305 self.try_commit(vec![])?;
307
308 self.try_propose(false)?;
310
311 self.try_signal_new_round();
315 }
316
317 if !missing_block_refs.is_empty() {
318 trace!(
319 "Missing block refs: {}",
320 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
321 );
322 }
323 Ok(missing_block_refs)
324 }
325
326 pub(crate) fn check_block_refs(
330 &mut self,
331 block_refs: Vec<BlockRef>,
332 ) -> ConsensusResult<BTreeSet<BlockRef>> {
333 let _scope = monitored_scope("Core::check_block_refs");
334 let _s = self
335 .context
336 .metrics
337 .node_metrics
338 .scope_processing_time
339 .with_label_values(&["Core::check_block_refs"])
340 .start_timer();
341 self.context
342 .metrics
343 .node_metrics
344 .core_check_block_refs_batch_size
345 .observe(block_refs.len() as f64);
346
347 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
349
350 if !missing_block_refs.is_empty() {
351 trace!(
352 "Missing block refs: {}",
353 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
354 );
355 }
356 Ok(missing_block_refs)
357 }
358
359 #[tracing::instrument(skip_all)]
367 pub(crate) fn add_certified_commits(
368 &mut self,
369 certified_commits: CertifiedCommits,
370 ) -> ConsensusResult<BTreeSet<BlockRef>> {
371 let _scope = monitored_scope("Core::add_certified_commits");
372
373 if self.dag_state.read().gc_enabled() {
375 let votes = certified_commits.votes().to_vec();
376 let commits = self
377 .validate_certified_commits(certified_commits.commits().to_vec())
378 .expect("Certified commits validation failed");
379
380 self.block_manager.try_accept_blocks(votes);
384
385 self.try_commit(commits)?;
388
389 self.try_propose(false)?;
391
392 self.try_signal_new_round();
396
397 return Ok(BTreeSet::new());
398 }
399
400 let blocks = certified_commits
402 .commits()
403 .iter()
404 .flat_map(|commit| commit.blocks())
405 .cloned()
406 .collect::<Vec<_>>();
407
408 self.add_blocks(blocks)
409 }
410
411 fn try_signal_new_round(&mut self) {
413 let new_clock_round = self.dag_state.read().threshold_clock_round();
419 if new_clock_round <= self.last_signaled_round {
420 return;
421 }
422 self.signals.new_round(new_clock_round);
424 self.last_signaled_round = new_clock_round;
425
426 self.context
428 .metrics
429 .node_metrics
430 .threshold_clock_round
431 .set(new_clock_round as i64);
432 }
433
434 pub(crate) fn new_block(
439 &mut self,
440 round: Round,
441 force: bool,
442 ) -> ConsensusResult<Option<VerifiedBlock>> {
443 let _scope = monitored_scope("Core::new_block");
444 if self.last_proposed_round() < round {
445 self.context
446 .metrics
447 .node_metrics
448 .leader_timeout_total
449 .with_label_values(&[&format!("{force}")])
450 .inc();
451 let result = self.try_propose(force);
452 self.try_signal_new_round();
454 return result;
455 }
456 Ok(None)
457 }
458
459 fn validate_certified_commits(
463 &mut self,
464 commits: Vec<CertifiedCommit>,
465 ) -> ConsensusResult<Vec<CertifiedCommit>> {
466 let last_commit_index = self.dag_state.read().last_commit_index();
469 let commits = commits
470 .iter()
471 .filter(|commit| {
472 if commit.index() > last_commit_index {
473 true
474 } else {
475 tracing::debug!(
476 "Skip commit for index {} as it is already committed with last commit index {}",
477 commit.index(),
478 last_commit_index
479 );
480 false
481 }
482 })
483 .cloned()
484 .collect::<Vec<_>>();
485
486 if let Some(commit) = commits.first() {
489 if commit.index() != last_commit_index + 1 {
490 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
491 expected_commit_index: last_commit_index + 1,
492 commit_index: commit.index(),
493 });
494 }
495 }
496
497 Ok(commits)
498 }
499
500 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
504 if !self.should_propose() {
505 return Ok(None);
506 }
507 if let Some(extended_block) = self.try_new_block(force) {
508 self.signals.new_block(extended_block.clone())?;
509
510 fail_point!("consensus-after-propose");
511
512 self.try_commit(vec![])?;
514 return Ok(Some(extended_block.block));
515 }
516 Ok(None)
517 }
518
519 fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
523 let _s = self
524 .context
525 .metrics
526 .node_metrics
527 .scope_processing_time
528 .with_label_values(&["Core::try_new_block"])
529 .start_timer();
530
531 let clock_round = {
533 let dag_state = self.dag_state.read();
534 let clock_round = dag_state.threshold_clock_round();
535 if clock_round <= dag_state.get_last_proposed_block().round() {
536 return None;
537 }
538 clock_round
539 };
540
541 let quorum_round = clock_round.saturating_sub(1);
543
544 if !force {
548 if !self.leaders_exist(quorum_round) {
549 return None;
550 }
551
552 if Duration::from_millis(
553 self.context
554 .clock
555 .timestamp_utc_ms()
556 .saturating_sub(self.last_proposed_timestamp_ms()),
557 ) < self.context.parameters.min_round_delay
558 {
559 return None;
560 }
561 }
562
563 let (ancestors, excluded_ancestors) = if self
566 .context
567 .protocol_config
568 .consensus_distributed_vote_scoring_strategy()
569 && self
570 .context
571 .protocol_config
572 .consensus_smart_ancestor_selection()
573 {
574 let (ancestors, excluded_and_equivocating_ancestors) =
575 self.smart_ancestors_to_propose(clock_round, !force);
576
577 if ancestors.is_empty() {
580 assert!(
581 !force,
582 "Ancestors should have been returned if force is true!"
583 );
584 return None;
585 }
586
587 let excluded_ancestors_limit = self.context.committee.size() * 2;
588 if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
589 debug!(
590 "Dropping {} excluded ancestor(s) during proposal due to size limit",
591 excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
592 );
593 }
594 let excluded_ancestors = excluded_and_equivocating_ancestors
595 .into_iter()
596 .take(excluded_ancestors_limit)
597 .collect();
598
599 (ancestors, excluded_ancestors)
600 } else {
601 (self.ancestors_to_propose(clock_round), vec![])
602 };
603
604 for ancestor in &ancestors {
606 self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
607 }
608
609 let leader_authority = &self
610 .context
611 .committee
612 .authority(self.first_leader(quorum_round))
613 .hostname;
614 self.context
615 .metrics
616 .node_metrics
617 .block_proposal_leader_wait_ms
618 .with_label_values(&[leader_authority])
619 .inc_by(
620 Instant::now()
621 .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
622 .as_millis() as u64,
623 );
624 self.context
625 .metrics
626 .node_metrics
627 .block_proposal_leader_wait_count
628 .with_label_values(&[leader_authority])
629 .inc();
630
631 self.context
632 .metrics
633 .node_metrics
634 .proposed_block_ancestors
635 .observe(ancestors.len() as f64);
636 for ancestor in &ancestors {
637 let authority = &self.context.committee.authority(ancestor.author()).hostname;
638 self.context
639 .metrics
640 .node_metrics
641 .proposed_block_ancestors_depth
642 .with_label_values(&[authority])
643 .observe(clock_round.saturating_sub(ancestor.round()).into());
644 }
645
646 let now = self.context.clock.timestamp_utc_ms();
649 ancestors.iter().for_each(|block| {
650 assert!(
651 block.timestamp_ms() <= now,
652 "Violation: ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.",
653 block, block.timestamp_ms(), clock_round
654 );
655 });
656
657 let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
661 self.context
662 .metrics
663 .node_metrics
664 .proposed_block_transactions
665 .observe(transactions.len() as f64);
666
667 let commit_votes = self
669 .dag_state
670 .write()
671 .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
672
673 let block = Block::V1(BlockV1::new(
675 self.context.committee.epoch(),
676 clock_round,
677 self.context.own_index,
678 now,
679 ancestors.iter().map(|b| b.reference()).collect(),
680 transactions,
681 commit_votes,
682 vec![],
683 ));
684 let signed_block =
685 SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
686 let serialized = signed_block
687 .serialize()
688 .expect("Block serialization failed.");
689 self.context
690 .metrics
691 .node_metrics
692 .proposed_block_size
693 .observe(serialized.len() as f64);
694 let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
696
697 let last_proposed_block = self.last_proposed_block();
699 if last_proposed_block.round() > 0 {
700 self.context
701 .metrics
702 .node_metrics
703 .block_proposal_interval
704 .observe(
705 Duration::from_millis(
706 verified_block
707 .timestamp_ms()
708 .saturating_sub(last_proposed_block.timestamp_ms()),
709 )
710 .as_secs_f64(),
711 );
712 }
713
714 let (accepted_blocks, missing) = self
716 .block_manager
717 .try_accept_blocks(vec![verified_block.clone()]);
718 assert_eq!(accepted_blocks.len(), 1);
719 assert!(missing.is_empty());
720
721 self.dag_state.write().flush();
723
724 ack_transactions(verified_block.reference());
726
727 debug!("Created block {verified_block:?} for round {clock_round}");
728
729 self.context
730 .metrics
731 .node_metrics
732 .proposed_blocks
733 .with_label_values(&[&force.to_string()])
734 .inc();
735
736 Some(ExtendedBlock {
737 block: verified_block,
738 excluded_ancestors,
739 })
740 }
741
742 fn try_commit(
746 &mut self,
747 mut certified_commits: Vec<CertifiedCommit>,
748 ) -> ConsensusResult<Vec<CommittedSubDag>> {
749 let _s = self
750 .context
751 .metrics
752 .node_metrics
753 .scope_processing_time
754 .with_label_values(&["Core::try_commit"])
755 .start_timer();
756
757 let mut certified_commits_map = BTreeMap::new();
758 for c in &certified_commits {
759 certified_commits_map.insert(c.index(), c.reference());
760 }
761
762 if !certified_commits.is_empty() {
763 info!(
764 "Will try to commit synced commits first : {:?}",
765 certified_commits
766 .iter()
767 .map(|c| (c.index(), c.leader()))
768 .collect::<Vec<_>>()
769 );
770 }
771
772 let mut committed_sub_dags = Vec::new();
773 loop {
775 let mut commits_until_update = self
780 .leader_schedule
781 .commits_until_leader_schedule_update(self.dag_state.clone());
782
783 if commits_until_update == 0 {
784 let last_commit_index = self.dag_state.read().last_commit_index();
785
786 tracing::info!(
787 "Leader schedule change triggered at commit index {last_commit_index}"
788 );
789 if self
790 .context
791 .protocol_config
792 .consensus_distributed_vote_scoring_strategy()
793 {
794 self.leader_schedule
795 .update_leader_schedule_v2(&self.dag_state);
796
797 let propagation_scores = self
798 .leader_schedule
799 .leader_swap_table
800 .read()
801 .reputation_scores
802 .clone();
803 self.ancestor_state_manager
804 .set_propagation_scores(propagation_scores);
805 } else {
806 self.leader_schedule
807 .update_leader_schedule_v1(&self.dag_state);
808 }
809 commits_until_update = self
810 .leader_schedule
811 .commits_until_leader_schedule_update(self.dag_state.clone());
812
813 fail_point!("consensus-after-leader-schedule-change");
814 }
815 assert!(commits_until_update > 0);
816
817 let (mut decided_leaders, decided_certified_commits): (
820 Vec<DecidedLeader>,
821 Vec<CertifiedCommit>,
822 ) = self
823 .try_decide_certified(&mut certified_commits, commits_until_update)
824 .into_iter()
825 .unzip();
826
827 let blocks = decided_certified_commits
836 .iter()
837 .flat_map(|c| c.blocks())
838 .cloned()
839 .collect::<Vec<_>>();
840 self.block_manager.try_accept_committed_blocks(blocks);
841
842 if decided_leaders.is_empty() {
845 decided_leaders = self.committer.try_decide(self.last_decided_leader);
848
849 if decided_leaders.len() >= commits_until_update {
851 let _ = decided_leaders.split_off(commits_until_update);
852 }
853 }
854
855 let Some(last_decided) = decided_leaders.last().cloned() else {
857 break;
858 };
859
860 self.last_decided_leader = last_decided.slot();
861
862 let sequenced_leaders = decided_leaders
863 .into_iter()
864 .filter_map(|leader| leader.into_committed_block())
865 .collect::<Vec<_>>();
866
867 tracing::debug!(
868 "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
869 sequenced_leaders.len()
870 );
871
872 self.context
873 .metrics
874 .node_metrics
875 .last_decided_leader_round
876 .set(self.last_decided_leader.round as i64);
877
878 if sequenced_leaders.is_empty() {
882 break;
883 }
884
885 tracing::info!(
886 "Committing {} leaders: {}",
887 sequenced_leaders.len(),
888 sequenced_leaders
889 .iter()
890 .map(|b| b.reference().to_string())
891 .join(",")
892 );
893
894 let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
896 if self
897 .context
898 .protocol_config
899 .consensus_distributed_vote_scoring_strategy()
900 {
901 self.dag_state.write().add_scoring_subdags(subdags.clone());
902 } else {
903 self.dag_state
905 .write()
906 .add_unscored_committed_subdags(subdags.clone());
907 }
908
909 self.block_manager
911 .try_unsuspend_blocks_for_latest_gc_round();
912 committed_sub_dags.extend(subdags);
913
914 fail_point!("consensus-after-handle-commit");
915 }
916
917 for sub_dag in &committed_sub_dags {
920 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
921 assert_eq!(
922 commit_ref, sub_dag.commit_ref,
923 "Certified commit has different reference than the committed sub dag"
924 );
925 }
926 }
927
928 let committed_block_refs = committed_sub_dags
930 .iter()
931 .flat_map(|sub_dag| sub_dag.blocks.iter())
932 .filter_map(|block| {
933 (block.author() == self.context.own_index).then_some(block.reference())
934 })
935 .collect::<Vec<_>>();
936 self.transaction_consumer
937 .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
938
939 Ok(committed_sub_dags)
940 }
941
942 pub(crate) fn get_missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
943 let _scope = monitored_scope("Core::get_missing_blocks");
944 self.block_manager.missing_blocks()
945 }
946
947 pub(crate) fn set_quorum_subscribers_exists(&mut self, exists: bool) {
949 info!("A quorum of block subscribers exists: {exists}");
950 self.quorum_subscribers_exists = exists;
951 }
952
953 pub(crate) fn set_propagation_delay_and_quorum_rounds(
957 &mut self,
958 delay: Round,
959 received_quorum_rounds: Vec<QuorumRound>,
960 accepted_quorum_rounds: Vec<QuorumRound>,
961 ) {
962 info!(
963 "Received quorum round per authority in ancestor state manager set to: {}",
964 self.context
965 .committee
966 .authorities()
967 .zip(received_quorum_rounds.iter())
968 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
969 .join(", ")
970 );
971 info!(
972 "Accepted quorum round per authority in ancestor state manager set to: {}",
973 self.context
974 .committee
975 .authorities()
976 .zip(accepted_quorum_rounds.iter())
977 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
978 .join(", ")
979 );
980 self.ancestor_state_manager
981 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
982 info!("Propagation round delay set to: {delay}");
983 self.propagation_delay = delay;
984 }
985
986 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
991 if self.last_known_proposed_round.is_some() {
992 panic!(
993 "Should not attempt to set the last known proposed round if that has been already set"
994 );
995 }
996 self.last_known_proposed_round = Some(round);
997 info!("Last known proposed round set to {round}");
998 }
999
1000 pub(crate) fn should_propose(&self) -> bool {
1002 let clock_round = self.dag_state.read().threshold_clock_round();
1003 let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
1004
1005 if !self.quorum_subscribers_exists {
1006 debug!("Skip proposing for round {clock_round}, don't have a quorum of subscribers.");
1007 core_skipped_proposals
1008 .with_label_values(&["no_quorum_subscriber"])
1009 .inc();
1010 return false;
1011 }
1012
1013 if self.propagation_delay
1014 > self
1015 .context
1016 .parameters
1017 .propagation_delay_stop_proposal_threshold
1018 {
1019 debug!(
1020 "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
1021 self.propagation_delay,
1022 self.context
1023 .parameters
1024 .propagation_delay_stop_proposal_threshold
1025 );
1026 core_skipped_proposals
1027 .with_label_values(&["high_propagation_delay"])
1028 .inc();
1029 return false;
1030 }
1031
1032 let Some(last_known_proposed_round) = self.last_known_proposed_round else {
1033 debug!(
1034 "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
1035 );
1036 core_skipped_proposals
1037 .with_label_values(&["no_last_known_proposed_round"])
1038 .inc();
1039 return false;
1040 };
1041 if clock_round <= last_known_proposed_round {
1042 debug!(
1043 "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
1044 );
1045 core_skipped_proposals
1046 .with_label_values(&["higher_last_known_proposed_round"])
1047 .inc();
1048 return false;
1049 }
1050
1051 true
1052 }
1053
1054 #[tracing::instrument(skip_all)]
1061 fn try_decide_certified(
1062 &mut self,
1063 certified_commits: &mut Vec<CertifiedCommit>,
1064 limit: usize,
1065 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1066 if !self.dag_state.read().gc_enabled() {
1068 return Vec::new();
1069 }
1070
1071 assert!(limit > 0, "limit should be greater than 0");
1072
1073 let to_commit = if certified_commits.len() >= limit {
1074 certified_commits.drain(..limit).collect::<Vec<_>>()
1076 } else {
1077 mem::take(certified_commits)
1079 };
1080
1081 tracing::debug!(
1082 "Decided {} certified leaders: {}",
1083 to_commit.len(),
1084 to_commit.iter().map(|c| c.leader().to_string()).join(",")
1085 );
1086
1087 let sequenced_leaders = to_commit
1088 .into_iter()
1089 .map(|commit| {
1090 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1091 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1092 let leader = DecidedLeader::Commit(leader.clone());
1093 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1094 (leader, commit)
1095 })
1096 .collect::<Vec<_>>();
1097
1098 sequenced_leaders
1099 }
1100
1101 fn ancestors_to_propose(&mut self, clock_round: Round) -> Vec<VerifiedBlock> {
1104 let (ancestors, gc_enabled, gc_round) = {
1106 let dag_state = self.dag_state.read();
1107 (
1108 dag_state.get_last_cached_block_per_authority(clock_round),
1109 dag_state.gc_enabled(),
1110 dag_state.gc_round(),
1111 )
1112 };
1113
1114 assert_eq!(
1115 ancestors.len(),
1116 self.context.committee.size(),
1117 "Fatal error, number of returned ancestors don't match committee size."
1118 );
1119
1120 let (last_proposed_block, _) = ancestors[self.context.own_index].clone();
1123 assert_eq!(last_proposed_block.author(), self.context.own_index);
1124 let ancestors = iter::once(last_proposed_block)
1125 .chain(
1126 ancestors
1127 .into_iter()
1128 .filter(|(block, _)| block.author() != self.context.own_index)
1129 .filter(|(block, _)| {
1130 if gc_enabled && gc_round > GENESIS_ROUND {
1131 return block.round() > gc_round;
1132 }
1133 true
1134 })
1135 .flat_map(|(block, _)| {
1136 if let Some(last_block_ref) = self.last_included_ancestors[block.author()] {
1137 return (last_block_ref.round < block.round()).then_some(block);
1138 }
1139 Some(block)
1140 }),
1141 )
1142 .collect::<Vec<_>>();
1143
1144 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1146 for ancestor in ancestors
1147 .iter()
1148 .filter(|block| block.round() == clock_round - 1)
1149 {
1150 quorum.add(ancestor.author(), &self.context.committee);
1151 }
1152 assert!(
1153 quorum.reached_threshold(&self.context.committee),
1154 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1155 );
1156
1157 ancestors
1158 }
1159
1160 fn smart_ancestors_to_propose(
1165 &mut self,
1166 clock_round: Round,
1167 smart_select: bool,
1168 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1169 let node_metrics = &self.context.metrics.node_metrics;
1170 let _s = node_metrics
1171 .scope_processing_time
1172 .with_label_values(&["Core::smart_ancestors_to_propose"])
1173 .start_timer();
1174
1175 let all_ancestors = self
1177 .dag_state
1178 .read()
1179 .get_last_cached_block_per_authority(clock_round);
1180
1181 assert_eq!(
1182 all_ancestors.len(),
1183 self.context.committee.size(),
1184 "Fatal error, number of returned ancestors don't match committee size."
1185 );
1186
1187 self.ancestor_state_manager.update_all_ancestors_state();
1189 let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1190
1191 let quorum_round = clock_round.saturating_sub(1);
1192
1193 let mut score_and_pending_excluded_ancestors = Vec::new();
1194 let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1195
1196 let included_ancestors = iter::once(self.last_proposed_block().clone())
1201 .chain(
1202 all_ancestors
1203 .into_iter()
1204 .flat_map(|(ancestor, equivocating_ancestors)| {
1205 if ancestor.author() == self.context.own_index {
1206 return None;
1207 }
1208 if let Some(last_block_ref) =
1209 self.last_included_ancestors[ancestor.author()]
1210 {
1211 if last_block_ref.round >= ancestor.round() {
1212 return None;
1213 }
1214 }
1215
1216 excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1218
1219 let ancestor_state = ancestor_state_map[ancestor.author()];
1220 match ancestor_state {
1221 AncestorState::Include => {
1222 trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1223 }
1224 AncestorState::Exclude(score) => {
1225 trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1226 score_and_pending_excluded_ancestors.push((score, ancestor));
1227 return None;
1228 }
1229 }
1230
1231 Some(ancestor)
1232 }),
1233 )
1234 .collect::<Vec<_>>();
1235
1236 let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1237
1238 for ancestor in included_ancestors
1240 .iter()
1241 .filter(|a| a.round() == quorum_round)
1242 {
1243 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1244 }
1245
1246 if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1247 node_metrics.smart_selection_wait.inc();
1248 debug!(
1249 "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1250 parent_round_quorum.stake()
1251 );
1252 return (vec![], BTreeSet::new());
1253 }
1254
1255 score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1258
1259 let mut ancestors_to_propose = included_ancestors;
1260 let mut excluded_ancestors = Vec::new();
1261 for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1262 let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1263 if !parent_round_quorum.reached_threshold(&self.context.committee)
1264 && ancestor.round() == quorum_round
1265 {
1266 debug!(
1267 "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1268 );
1269 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1270 ancestors_to_propose.push(ancestor);
1271 node_metrics
1272 .included_excluded_proposal_ancestors_count_by_authority
1273 .with_label_values(&[block_hostname.as_str(), "timeout"])
1274 .inc();
1275 } else {
1276 excluded_ancestors.push((score, ancestor));
1277 }
1278 }
1279
1280 for (score, ancestor) in excluded_ancestors.iter() {
1286 let excluded_author = ancestor.author();
1287 let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1288 let mut accepted_low_quorum_round = self
1291 .ancestor_state_manager
1292 .accepted_quorum_round_per_authority[excluded_author]
1293 .0;
1294 accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1298
1299 let last_included_round = self.last_included_ancestors[excluded_author]
1300 .map(|block_ref| block_ref.round)
1301 .unwrap_or(GENESIS_ROUND);
1302 if ancestor.round() <= last_included_round {
1303 continue;
1306 }
1307
1308 if last_included_round >= accepted_low_quorum_round {
1309 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1310 trace!(
1311 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1312 ancestor.reference()
1313 );
1314 node_metrics
1315 .excluded_proposal_ancestors_count_by_authority
1316 .with_label_values(&[block_hostname])
1317 .inc();
1318 continue;
1319 }
1320
1321 let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1322 ancestor.clone()
1324 } else {
1325 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1327 trace!(
1328 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1329 ancestor.reference(),
1330 ancestor.round()
1331 );
1332 node_metrics
1333 .excluded_proposal_ancestors_count_by_authority
1334 .with_label_values(&[block_hostname])
1335 .inc();
1336
1337 match self.dag_state.read().get_last_cached_block_in_range(
1343 excluded_author,
1344 last_included_round + 1,
1345 accepted_low_quorum_round + 1,
1346 ) {
1347 Some(earlier_ancestor) => {
1348 earlier_ancestor
1350 }
1351 None => {
1352 continue;
1354 }
1355 }
1356 };
1357 self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1358 ancestors_to_propose.push(ancestor.clone());
1359 trace!(
1360 "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1361 ancestor.reference()
1362 );
1363 node_metrics
1364 .included_excluded_proposal_ancestors_count_by_authority
1365 .with_label_values(&[block_hostname.as_str(), "quorum"])
1366 .inc();
1367 }
1368
1369 assert!(
1370 parent_round_quorum.reached_threshold(&self.context.committee),
1371 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1372 );
1373
1374 info!(
1375 "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1376 ancestors_to_propose.len(),
1377 excluded_and_equivocating_ancestors.len()
1378 );
1379
1380 (ancestors_to_propose, excluded_and_equivocating_ancestors)
1381 }
1382
1383 fn leaders_exist(&self, round: Round) -> bool {
1388 let dag_state = self.dag_state.read();
1389 for leader in self.leaders(round) {
1390 if !dag_state.contains_cached_block_at_slot(leader) {
1395 return false;
1396 }
1397 }
1398
1399 true
1400 }
1401
1402 fn leaders(&self, round: Round) -> Vec<Slot> {
1404 self.committer
1405 .get_leaders(round)
1406 .into_iter()
1407 .map(|authority_index| Slot::new(round, authority_index))
1408 .collect()
1409 }
1410
1411 fn first_leader(&self, round: Round) -> AuthorityIndex {
1413 self.leaders(round).first().unwrap().authority
1414 }
1415
1416 fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1417 self.last_proposed_block().timestamp_ms()
1418 }
1419
1420 fn last_proposed_round(&self) -> Round {
1421 self.last_proposed_block().round()
1422 }
1423
1424 fn last_proposed_block(&self) -> VerifiedBlock {
1425 self.dag_state.read().get_last_proposed_block()
1426 }
1427}
1428
1429pub(crate) struct CoreSignals {
1432 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1433 new_round_sender: watch::Sender<Round>,
1434 context: Arc<Context>,
1435}
1436
1437impl CoreSignals {
1438 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1439 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1443 context.parameters.dag_state_cached_rounds as usize,
1444 );
1445 let (new_round_sender, new_round_receiver) = watch::channel(0);
1446
1447 let me = Self {
1448 tx_block_broadcast,
1449 new_round_sender,
1450 context,
1451 };
1452
1453 let receivers = CoreSignalsReceivers {
1454 rx_block_broadcast,
1455 new_round_receiver,
1456 };
1457
1458 (me, receivers)
1459 }
1460
1461 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1465 if self.context.committee.size() > 1 {
1468 if extended_block.block.round() == GENESIS_ROUND {
1469 debug!("Ignoring broadcasting genesis block to peers");
1470 return Ok(());
1471 }
1472
1473 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1474 warn!("Couldn't broadcast the block to any receiver: {err}");
1475 return Err(ConsensusError::Shutdown);
1476 }
1477 } else {
1478 debug!(
1479 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1480 );
1481 }
1482 Ok(())
1483 }
1484
1485 pub(crate) fn new_round(&mut self, round_number: Round) {
1489 let _ = self.new_round_sender.send_replace(round_number);
1490 }
1491}
1492
1493pub(crate) struct CoreSignalsReceivers {
1497 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1498 new_round_receiver: watch::Receiver<Round>,
1499}
1500
1501impl CoreSignalsReceivers {
1502 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1503 self.rx_block_broadcast.resubscribe()
1504 }
1505
1506 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1507 self.new_round_receiver.clone()
1508 }
1509}
1510
1511#[cfg(test)]
1515pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<CoreTextFixture> {
1516 let mut cores = Vec::new();
1517
1518 for index in 0..authorities.len() {
1519 let own_index = AuthorityIndex::new_for_test(index as u32);
1520 let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
1521 cores.push(core);
1522 }
1523 cores
1524}
1525
1526#[cfg(test)]
1527pub(crate) struct CoreTextFixture {
1528 pub core: Core,
1529 pub signal_receivers: CoreSignalsReceivers,
1530 pub block_receiver: broadcast::Receiver<ExtendedBlock>,
1531 #[expect(unused)]
1532 pub commit_receiver: UnboundedReceiver<CommittedSubDag>,
1533 pub store: Arc<MemStore>,
1534}
1535
1536#[cfg(test)]
1537impl CoreTextFixture {
1538 fn new(
1539 context: Context,
1540 authorities: Vec<Stake>,
1541 own_index: AuthorityIndex,
1542 sync_last_known_own_block: bool,
1543 ) -> Self {
1544 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1545 let mut context = context.clone();
1546 context = context
1547 .with_committee(committee)
1548 .with_authority_index(own_index);
1549 context
1550 .protocol_config
1551 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1552
1553 let context = Arc::new(context);
1554 let store = Arc::new(MemStore::new());
1555 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1556
1557 let block_manager = BlockManager::new(
1558 context.clone(),
1559 dag_state.clone(),
1560 Arc::new(NoopBlockVerifier),
1561 );
1562 let leader_schedule = Arc::new(
1563 LeaderSchedule::from_store(context.clone(), dag_state.clone())
1564 .with_num_commits_per_schedule(10),
1565 );
1566 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1567 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1568 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1569 let block_receiver = signal_receivers.block_broadcast_receiver();
1571
1572 let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
1573 let commit_observer = CommitObserver::new(
1574 context.clone(),
1575 CommitConsumer::new(commit_sender.clone(), 0),
1576 dag_state.clone(),
1577 store.clone(),
1578 leader_schedule.clone(),
1579 );
1580
1581 let block_signer = signers.remove(own_index.value()).1;
1582
1583 let core = Core::new(
1584 context,
1585 leader_schedule,
1586 transaction_consumer,
1587 block_manager,
1588 true,
1589 commit_observer,
1590 signals,
1591 block_signer,
1592 dag_state,
1593 sync_last_known_own_block,
1594 );
1595
1596 Self {
1597 core,
1598 signal_receivers,
1599 block_receiver,
1600 commit_receiver,
1601 store,
1602 }
1603 }
1604}
1605
1606#[cfg(test)]
1607mod test {
1608 use std::{collections::BTreeSet, time::Duration};
1609
1610 use consensus_config::{AuthorityIndex, Parameters};
1611 use futures::{StreamExt, stream::FuturesUnordered};
1612 use iota_metrics::monitored_mpsc::unbounded_channel;
1613 use iota_protocol_config::ProtocolConfig;
1614 use rstest::rstest;
1615 use tokio::time::sleep;
1616
1617 use super::*;
1618 use crate::{
1619 CommitConsumer, CommitIndex,
1620 block::{TestBlock, genesis_blocks},
1621 block_verifier::NoopBlockVerifier,
1622 commit::CommitAPI,
1623 leader_scoring::ReputationScores,
1624 storage::{Store, WriteBatch, mem_store::MemStore},
1625 test_dag_builder::DagBuilder,
1626 test_dag_parser::parse_dag,
1627 transaction::{BlockStatus, TransactionClient},
1628 };
1629
1630 #[tokio::test]
1633 async fn test_core_recover_from_store_for_full_round() {
1634 telemetry_subscribers::init_for_testing();
1635 let (context, mut key_pairs) = Context::new_for_test(4);
1636 let context = Arc::new(context);
1637 let store = Arc::new(MemStore::new());
1638 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1639 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1640 let mut block_status_subscriptions = FuturesUnordered::new();
1641
1642 let mut last_round_blocks = genesis_blocks(context.clone());
1645 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1646 for round in 1..=4 {
1647 let mut this_round_blocks = Vec::new();
1648 for (index, _authority) in context.committee.authorities() {
1649 let block = VerifiedBlock::new_for_test(
1650 TestBlock::new(round, index.value() as u32)
1651 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1652 .build(),
1653 );
1654
1655 if round == 1 && index == context.own_index {
1658 let subscription =
1659 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1660 block_status_subscriptions.push(subscription);
1661 }
1662
1663 this_round_blocks.push(block);
1664 }
1665 all_blocks.extend(this_round_blocks.clone());
1666 last_round_blocks = this_round_blocks;
1667 }
1668 store
1670 .write(WriteBatch::default().blocks(all_blocks))
1671 .expect("Storage error");
1672
1673 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1675 let block_manager = BlockManager::new(
1676 context.clone(),
1677 dag_state.clone(),
1678 Arc::new(NoopBlockVerifier),
1679 );
1680 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1681 context.clone(),
1682 dag_state.clone(),
1683 ));
1684
1685 let (sender, _receiver) = unbounded_channel("consensus_output");
1686 let commit_observer = CommitObserver::new(
1687 context.clone(),
1688 CommitConsumer::new(sender.clone(), 0),
1689 dag_state.clone(),
1690 store.clone(),
1691 leader_schedule.clone(),
1692 );
1693
1694 let last_commit = store.read_last_commit().unwrap();
1696 assert!(last_commit.is_none());
1697 assert_eq!(dag_state.read().last_commit_index(), 0);
1698
1699 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1701 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1703 let _core = Core::new(
1704 context.clone(),
1705 leader_schedule,
1706 transaction_consumer,
1707 block_manager,
1708 true,
1709 commit_observer,
1710 signals,
1711 key_pairs.remove(context.own_index.value()).1,
1712 dag_state.clone(),
1713 false,
1714 );
1715
1716 let mut new_round = signal_receivers.new_round_receiver();
1718 assert_eq!(*new_round.borrow_and_update(), 5);
1719
1720 let proposed_block = block_receiver
1722 .recv()
1723 .await
1724 .expect("A block should have been created");
1725 assert_eq!(proposed_block.block.round(), 5);
1726 let ancestors = proposed_block.block.ancestors();
1727
1728 assert_eq!(ancestors.len(), 4);
1730 for ancestor in ancestors {
1731 assert_eq!(ancestor.round, 4);
1732 }
1733
1734 let last_commit = store
1735 .read_last_commit()
1736 .unwrap()
1737 .expect("last commit should be set");
1738
1739 assert_eq!(last_commit.index(), 2);
1743 assert_eq!(dag_state.read().last_commit_index(), 2);
1744 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1745 assert_eq!(all_stored_commits.len(), 2);
1746
1747 while let Some(result) = block_status_subscriptions.next().await {
1750 let status = result.unwrap();
1751 assert!(matches!(status, BlockStatus::Sequenced(_)));
1752 }
1753 }
1754
1755 #[tokio::test]
1759 async fn test_core_recover_from_store_for_partial_round() {
1760 telemetry_subscribers::init_for_testing();
1761
1762 let (context, mut key_pairs) = Context::new_for_test(4);
1763 let context = Arc::new(context);
1764 let store = Arc::new(MemStore::new());
1765 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1766 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1767
1768 let mut last_round_blocks = genesis_blocks(context.clone());
1770 let mut all_blocks = last_round_blocks.clone();
1771 for round in 1..=4 {
1772 let mut this_round_blocks = Vec::new();
1773
1774 let authorities_to_skip = if round == 4 {
1777 context.committee.validity_threshold() as usize
1778 } else {
1779 1
1781 };
1782
1783 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1784 let block = TestBlock::new(round, index.value() as u32)
1785 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1786 .build();
1787 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1788 }
1789 all_blocks.extend(this_round_blocks.clone());
1790 last_round_blocks = this_round_blocks;
1791 }
1792
1793 store
1795 .write(WriteBatch::default().blocks(all_blocks))
1796 .expect("Storage error");
1797
1798 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1800 let block_manager = BlockManager::new(
1801 context.clone(),
1802 dag_state.clone(),
1803 Arc::new(NoopBlockVerifier),
1804 );
1805 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1806 context.clone(),
1807 dag_state.clone(),
1808 ));
1809
1810 let (sender, _receiver) = unbounded_channel("consensus_output");
1811 let commit_observer = CommitObserver::new(
1812 context.clone(),
1813 CommitConsumer::new(sender.clone(), 0),
1814 dag_state.clone(),
1815 store.clone(),
1816 leader_schedule.clone(),
1817 );
1818
1819 let last_commit = store.read_last_commit().unwrap();
1821 assert!(last_commit.is_none());
1822 assert_eq!(dag_state.read().last_commit_index(), 0);
1823
1824 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1826 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1828 let mut core = Core::new(
1829 context.clone(),
1830 leader_schedule,
1831 transaction_consumer,
1832 block_manager,
1833 true,
1834 commit_observer,
1835 signals,
1836 key_pairs.remove(context.own_index.value()).1,
1837 dag_state.clone(),
1838 false,
1839 );
1840
1841 let mut new_round = signal_receivers.new_round_receiver();
1844 assert_eq!(*new_round.borrow_and_update(), 5);
1845
1846 let proposed_block = block_receiver
1848 .recv()
1849 .await
1850 .expect("A block should have been created");
1851 assert_eq!(proposed_block.block.round(), 4);
1852 let ancestors = proposed_block.block.ancestors();
1853
1854 assert_eq!(ancestors.len(), 4);
1855 for ancestor in ancestors {
1856 if ancestor.author == context.own_index {
1857 assert_eq!(ancestor.round, 0);
1858 } else {
1859 assert_eq!(ancestor.round, 3);
1860 }
1861 }
1862
1863 core.try_commit(vec![]).ok();
1865 let last_commit = store
1866 .read_last_commit()
1867 .unwrap()
1868 .expect("last commit should be set");
1869
1870 assert_eq!(last_commit.index(), 2);
1874 assert_eq!(dag_state.read().last_commit_index(), 2);
1875 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1876 assert_eq!(all_stored_commits.len(), 2);
1877 }
1878
1879 #[tokio::test]
1880 async fn test_core_propose_after_genesis() {
1881 telemetry_subscribers::init_for_testing();
1882 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1883 config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1884 config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1885 config
1886 });
1887
1888 let (context, mut key_pairs) = Context::new_for_test(4);
1889 let context = Arc::new(context);
1890 let store = Arc::new(MemStore::new());
1891 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1892
1893 let block_manager = BlockManager::new(
1894 context.clone(),
1895 dag_state.clone(),
1896 Arc::new(NoopBlockVerifier),
1897 );
1898 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1899 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1900 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1901 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1903 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1904 context.clone(),
1905 dag_state.clone(),
1906 ));
1907
1908 let (sender, _receiver) = unbounded_channel("consensus_output");
1909 let commit_observer = CommitObserver::new(
1910 context.clone(),
1911 CommitConsumer::new(sender.clone(), 0),
1912 dag_state.clone(),
1913 store.clone(),
1914 leader_schedule.clone(),
1915 );
1916
1917 let mut core = Core::new(
1918 context.clone(),
1919 leader_schedule,
1920 transaction_consumer,
1921 block_manager,
1922 true,
1923 commit_observer,
1924 signals,
1925 key_pairs.remove(context.own_index.value()).1,
1926 dag_state.clone(),
1927 false,
1928 );
1929
1930 let mut total = 0;
1932 let mut index = 0;
1933 loop {
1934 let transaction =
1935 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1936 total += transaction.len();
1937 index += 1;
1938 let _w = transaction_client
1939 .submit_no_wait(vec![transaction])
1940 .await
1941 .unwrap();
1942
1943 if total >= 1_000 {
1945 break;
1946 }
1947 }
1948
1949 let extended_block = block_receiver
1951 .recv()
1952 .await
1953 .expect("A new block should have been created");
1954
1955 assert_eq!(extended_block.block.round(), 1);
1957 assert_eq!(extended_block.block.author().value(), 0);
1958 assert_eq!(extended_block.block.ancestors().len(), 4);
1959
1960 let mut total = 0;
1961 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1962 total += transaction.data().len() as u64;
1963 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1964 assert_eq!(format!("Transaction {i}"), transaction);
1965 }
1966 assert!(
1967 total
1968 <= context
1969 .protocol_config
1970 .consensus_max_transactions_in_block_bytes()
1971 );
1972
1973 let all_genesis = genesis_blocks(context);
1975
1976 for ancestor in extended_block.block.ancestors() {
1977 all_genesis
1978 .iter()
1979 .find(|block| block.reference() == *ancestor)
1980 .expect("Block should be found amongst genesis blocks");
1981 }
1982
1983 assert!(core.try_propose(false).unwrap().is_none());
1986 assert!(core.try_propose(true).unwrap().is_none());
1987
1988 let last_commit = store.read_last_commit().unwrap();
1990 assert!(last_commit.is_none());
1991 assert_eq!(dag_state.read().last_commit_index(), 0);
1992 }
1993
1994 #[tokio::test]
1995 async fn test_core_propose_once_receiving_a_quorum() {
1996 telemetry_subscribers::init_for_testing();
1997 let (context, mut key_pairs) = Context::new_for_test(4);
1998 let context = Arc::new(context);
1999
2000 let store = Arc::new(MemStore::new());
2001 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2002
2003 let block_manager = BlockManager::new(
2004 context.clone(),
2005 dag_state.clone(),
2006 Arc::new(NoopBlockVerifier),
2007 );
2008 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2009 context.clone(),
2010 dag_state.clone(),
2011 ));
2012
2013 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2014 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2015 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2016 let _block_receiver = signal_receivers.block_broadcast_receiver();
2018
2019 let (sender, _receiver) = unbounded_channel("consensus_output");
2020 let commit_observer = CommitObserver::new(
2021 context.clone(),
2022 CommitConsumer::new(sender.clone(), 0),
2023 dag_state.clone(),
2024 store.clone(),
2025 leader_schedule.clone(),
2026 );
2027
2028 let mut core = Core::new(
2029 context.clone(),
2030 leader_schedule,
2031 transaction_consumer,
2032 block_manager,
2033 true,
2034 commit_observer,
2035 signals,
2036 key_pairs.remove(context.own_index.value()).1,
2037 dag_state.clone(),
2038 false,
2039 );
2040
2041 let mut expected_ancestors = BTreeSet::new();
2042
2043 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
2045 expected_ancestors.insert(block_1.reference());
2046 sleep(context.parameters.min_round_delay).await;
2048 _ = core.add_blocks(vec![block_1]);
2050
2051 assert_eq!(core.last_proposed_round(), 1);
2052 expected_ancestors.insert(core.last_proposed_block().reference());
2053 assert!(core.try_propose(false).unwrap().is_none());
2055
2056 let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
2059 expected_ancestors.insert(block_3.reference());
2060 sleep(context.parameters.min_round_delay).await;
2062 _ = core.add_blocks(vec![block_3]);
2064
2065 assert_eq!(core.last_proposed_round(), 2);
2066
2067 let proposed_block = core.last_proposed_block();
2068 assert_eq!(proposed_block.round(), 2);
2069 assert_eq!(proposed_block.author(), context.own_index);
2070 assert_eq!(proposed_block.ancestors().len(), 3);
2071 let ancestors = proposed_block.ancestors();
2072 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
2073 assert_eq!(ancestors, expected_ancestors);
2074
2075 let last_commit = store.read_last_commit().unwrap();
2077 assert!(last_commit.is_none());
2078 assert_eq!(dag_state.read().last_commit_index(), 0);
2079 }
2080
2081 #[rstest]
2082 #[tokio::test]
2083 async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
2084 telemetry_subscribers::init_for_testing();
2085 let (mut context, mut key_pairs) = Context::new_for_test(4);
2086
2087 if gc_depth > 0 {
2088 context
2089 .protocol_config
2090 .set_consensus_gc_depth_for_testing(gc_depth);
2091 }
2092
2093 let context = Arc::new(context);
2094
2095 let store = Arc::new(MemStore::new());
2096 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2097 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2098 let mut block_status_subscriptions = FuturesUnordered::new();
2099
2100 let dag_str = "DAG {
2101 Round 0 : { 4 },
2102 Round 1 : { * },
2103 Round 2 : { * },
2104 Round 3 : {
2105 A -> [*],
2106 B -> [-A2],
2107 C -> [-A2],
2108 D -> [-A2],
2109 },
2110 Round 4 : {
2111 B -> [-A3],
2112 C -> [-A3],
2113 D -> [-A3],
2114 },
2115 Round 5 : {
2116 A -> [A3, B4, C4, D4]
2117 B -> [*],
2118 C -> [*],
2119 D -> [*],
2120 },
2121 Round 6 : { * },
2122 Round 7 : { * },
2123 Round 8 : { * },
2124 }";
2125
2126 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2127 dag_builder.print();
2128
2129 for block in dag_builder.blocks(1..=5) {
2132 if block.author() == context.own_index {
2133 let subscription =
2134 transaction_consumer.subscribe_for_block_status_testing(block.reference());
2135 block_status_subscriptions.push(subscription);
2136 }
2137 }
2138
2139 store
2141 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2142 .expect("Storage error");
2143
2144 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2146 let block_manager = BlockManager::new(
2147 context.clone(),
2148 dag_state.clone(),
2149 Arc::new(NoopBlockVerifier),
2150 );
2151 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2152 context.clone(),
2153 dag_state.clone(),
2154 ));
2155
2156 let (sender, _receiver) = unbounded_channel("consensus_output");
2157 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2158 let commit_observer = CommitObserver::new(
2159 context.clone(),
2160 commit_consumer,
2161 dag_state.clone(),
2162 store.clone(),
2163 leader_schedule.clone(),
2164 );
2165
2166 let last_commit = store.read_last_commit().unwrap();
2168 assert!(last_commit.is_none());
2169 assert_eq!(dag_state.read().last_commit_index(), 0);
2170
2171 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2173 let _block_receiver = signal_receivers.block_broadcast_receiver();
2175 let _core = Core::new(
2176 context.clone(),
2177 leader_schedule,
2178 transaction_consumer,
2179 block_manager,
2180 true,
2181 commit_observer,
2182 signals,
2183 key_pairs.remove(context.own_index.value()).1,
2184 dag_state.clone(),
2185 false,
2186 );
2187
2188 let last_commit = store
2189 .read_last_commit()
2190 .unwrap()
2191 .expect("last commit should be set");
2192
2193 assert_eq!(last_commit.index(), 5);
2194
2195 while let Some(result) = block_status_subscriptions.next().await {
2196 let status = result.unwrap();
2197
2198 if gc_depth > 0 {
2200 match status {
2201 BlockStatus::Sequenced(block_ref) => {
2202 assert!(block_ref.round == 1 || block_ref.round == 5);
2203 }
2204 BlockStatus::GarbageCollected(block_ref) => {
2205 assert!(block_ref.round == 2 || block_ref.round == 3);
2206 }
2207 }
2208 } else {
2209 assert!(matches!(status, BlockStatus::Sequenced(_)));
2211 }
2212 }
2213 }
2214
2215 #[tokio::test]
2219 async fn test_multiple_commits_advance_threshold_clock() {
2220 telemetry_subscribers::init_for_testing();
2221 let (mut context, mut key_pairs) = Context::new_for_test(4);
2222 const GC_DEPTH: u32 = 2;
2223
2224 context
2225 .protocol_config
2226 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2227
2228 let context = Arc::new(context);
2229
2230 let store = Arc::new(MemStore::new());
2231 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2232 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2233
2234 let dag_str = "DAG {
2240 Round 0 : { 4 },
2241 Round 1 : { * },
2242 Round 2 : {
2243 B -> [-D1],
2244 C -> [-D1],
2245 D -> [-D1],
2246 },
2247 Round 3 : {
2248 B -> [*],
2249 C -> [*]
2250 D -> [*],
2251 },
2252 Round 4 : {
2253 A -> [*],
2254 B -> [*],
2255 C -> [*]
2256 D -> [*],
2257 },
2258 Round 5 : {
2259 B -> [*],
2260 C -> [*],
2261 D -> [*],
2262 },
2263 Round 6 : {
2264 B -> [A6, B6, C6, D1],
2265 C -> [A6, B6, C6, D1],
2266 D -> [A6, B6, C6, D1],
2267 },
2268 Round 7 : {
2269 B -> [*],
2270 C -> [*],
2271 D -> [*],
2272 },
2273 Round 8 : {
2274 B -> [*],
2275 C -> [*],
2276 D -> [*],
2277 },
2278 Round 9 : {
2279 B -> [*],
2280 C -> [*],
2281 D -> [*],
2282 },
2283 Round 10 : {
2284 B -> [*],
2285 C -> [*],
2286 D -> [*],
2287 },
2288 Round 11 : {
2289 B -> [*],
2290 C -> [*],
2291 D -> [*],
2292 },
2293 }";
2294
2295 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2296 dag_builder.print();
2297
2298 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2300 let block_manager = BlockManager::new(
2301 context.clone(),
2302 dag_state.clone(),
2303 Arc::new(NoopBlockVerifier),
2304 );
2305 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2306 context.clone(),
2307 dag_state.clone(),
2308 ));
2309 let (sender, _receiver) = unbounded_channel("consensus_output");
2310 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2311 let commit_observer = CommitObserver::new(
2312 context.clone(),
2313 commit_consumer,
2314 dag_state.clone(),
2315 store.clone(),
2316 leader_schedule.clone(),
2317 );
2318
2319 let last_commit = store.read_last_commit().unwrap();
2321 assert!(last_commit.is_none());
2322 assert_eq!(dag_state.read().last_commit_index(), 0);
2323
2324 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2326 let _block_receiver = signal_receivers.block_broadcast_receiver();
2328 let mut core = Core::new(
2329 context.clone(),
2330 leader_schedule,
2331 transaction_consumer,
2332 block_manager,
2333 true,
2334 commit_observer,
2335 signals,
2336 key_pairs.remove(context.own_index.value()).1,
2337 dag_state.clone(),
2338 true,
2339 );
2340 core.set_last_known_proposed_round(4);
2344
2345 core.add_blocks(
2357 dag_builder
2358 .blocks(1..=11)
2359 .into_iter()
2360 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2361 .collect(),
2362 )
2363 .expect("Should not fail");
2364
2365 assert_eq!(core.last_proposed_round(), 12);
2366 }
2367
2368 #[tokio::test]
2369 async fn test_core_set_min_propose_round() {
2370 telemetry_subscribers::init_for_testing();
2371 let (context, mut key_pairs) = Context::new_for_test(4);
2372 let context = Arc::new(context.with_parameters(Parameters {
2373 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2374 ..Default::default()
2375 }));
2376
2377 let store = Arc::new(MemStore::new());
2378 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2379
2380 let block_manager = BlockManager::new(
2381 context.clone(),
2382 dag_state.clone(),
2383 Arc::new(NoopBlockVerifier),
2384 );
2385 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2386 context.clone(),
2387 dag_state.clone(),
2388 ));
2389
2390 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2391 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2392 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2393 let _block_receiver = signal_receivers.block_broadcast_receiver();
2395
2396 let (sender, _receiver) = unbounded_channel("consensus_output");
2397 let commit_observer = CommitObserver::new(
2398 context.clone(),
2399 CommitConsumer::new(sender.clone(), 0),
2400 dag_state.clone(),
2401 store.clone(),
2402 leader_schedule.clone(),
2403 );
2404
2405 let mut core = Core::new(
2406 context.clone(),
2407 leader_schedule,
2408 transaction_consumer,
2409 block_manager,
2410 true,
2411 commit_observer,
2412 signals,
2413 key_pairs.remove(context.own_index.value()).1,
2414 dag_state.clone(),
2415 true,
2416 );
2417
2418 assert_eq!(
2420 core.last_proposed_round(),
2421 GENESIS_ROUND,
2422 "No block should have been created other than genesis"
2423 );
2424
2425 assert!(core.try_propose(true).unwrap().is_none());
2427
2428 let mut builder = DagBuilder::new(context.clone());
2431 builder.layers(1..=10).build();
2432
2433 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2434
2435 assert!(core.add_blocks(blocks).unwrap().is_empty());
2437
2438 assert!(core.try_propose(true).unwrap().is_none());
2440
2441 core.set_last_known_proposed_round(10);
2444
2445 let block = core.try_propose(true).expect("No error").unwrap();
2446 assert_eq!(block.round(), 11);
2447 assert_eq!(block.ancestors().len(), 4);
2448
2449 let our_ancestor_included = block.ancestors()[0];
2450 assert_eq!(our_ancestor_included.author, context.own_index);
2451 assert_eq!(our_ancestor_included.round, 10);
2452 }
2453
2454 #[tokio::test(flavor = "current_thread", start_paused = true)]
2455 async fn test_core_try_new_block_leader_timeout() {
2456 telemetry_subscribers::init_for_testing();
2457
2458 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2467 let now = context.clock.timestamp_utc_ms();
2470 let max_timestamp = blocks
2471 .iter()
2472 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2473 .map(|block| block.timestamp_ms())
2474 .unwrap_or(0);
2475
2476 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2477 sleep(wait_time).await;
2478 }
2479
2480 let (context, _) = Context::new_for_test(4);
2481 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2483
2484 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2490
2491 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2494 for round in 1..=3 {
2495 let mut this_round_blocks = Vec::new();
2496
2497 for core_fixture in cores.iter_mut() {
2498 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2499
2500 core_fixture
2501 .core
2502 .add_blocks(last_round_blocks.clone())
2503 .unwrap();
2504
2505 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2507 assert_eq!(round - 1, r);
2508 if core_fixture.core.last_proposed_round() == r {
2509 core_fixture
2511 .core
2512 .try_propose(true)
2513 .unwrap()
2514 .unwrap_or_else(|| {
2515 panic!("Block should have been proposed for round {round}")
2516 });
2517 }
2518 }
2519
2520 assert_eq!(core_fixture.core.last_proposed_round(), round);
2521
2522 this_round_blocks.push(core_fixture.core.last_proposed_block());
2523 }
2524
2525 last_round_blocks = this_round_blocks;
2526 }
2527
2528 for core_fixture in cores.iter_mut() {
2532 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2533
2534 core_fixture
2535 .core
2536 .add_blocks(last_round_blocks.clone())
2537 .unwrap();
2538 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2539 }
2540
2541 for core_fixture in cores.iter_mut() {
2544 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2545 assert_eq!(core_fixture.core.last_proposed_round(), 4);
2546
2547 let last_commit = core_fixture
2549 .store
2550 .read_last_commit()
2551 .unwrap()
2552 .expect("last commit should be set");
2553 assert_eq!(last_commit.index(), 1);
2556 let all_stored_commits = core_fixture
2557 .store
2558 .scan_commits((0..=CommitIndex::MAX).into())
2559 .unwrap();
2560 assert_eq!(all_stored_commits.len(), 1);
2561 }
2562 }
2563
2564 #[tokio::test(flavor = "current_thread", start_paused = true)]
2565 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2566 telemetry_subscribers::init_for_testing();
2567
2568 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2577 let now = context.clock.timestamp_utc_ms();
2580 let max_timestamp = blocks
2581 .iter()
2582 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2583 .map(|block| block.timestamp_ms())
2584 .unwrap_or(0);
2585
2586 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2587 sleep(wait_time).await;
2588 }
2589
2590 let (mut context, _) = Context::new_for_test(4);
2591 context
2592 .protocol_config
2593 .set_consensus_smart_ancestor_selection_for_testing(true);
2594 context
2595 .protocol_config
2596 .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2597
2598 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2600 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2601
2602 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2605 for round in 1..=30 {
2606 let mut this_round_blocks = Vec::new();
2607
2608 for core_fixture in cores.iter_mut() {
2609 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2610
2611 core_fixture
2612 .core
2613 .add_blocks(last_round_blocks.clone())
2614 .unwrap();
2615
2616 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2618 assert_eq!(round - 1, r);
2619 if core_fixture.core.last_proposed_round() == r {
2620 core_fixture
2622 .core
2623 .try_propose(true)
2624 .unwrap()
2625 .unwrap_or_else(|| {
2626 panic!("Block should have been proposed for round {round}")
2627 });
2628 }
2629 }
2630
2631 assert_eq!(core_fixture.core.last_proposed_round(), round);
2632
2633 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2634 }
2635
2636 last_round_blocks = this_round_blocks;
2637 }
2638
2639 for round in 31..=40 {
2641 let mut this_round_blocks = Vec::new();
2642
2643 for core_fixture in all_cores.iter_mut() {
2644 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2645
2646 core_fixture
2647 .core
2648 .add_blocks(last_round_blocks.clone())
2649 .unwrap();
2650
2651 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2653 assert_eq!(round - 1, r);
2654 if core_fixture.core.last_proposed_round() == r {
2655 core_fixture
2657 .core
2658 .try_propose(true)
2659 .unwrap()
2660 .unwrap_or_else(|| {
2661 panic!("Block should have been proposed for round {round}")
2662 });
2663 }
2664 }
2665
2666 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2667
2668 for block in this_round_blocks.iter() {
2669 if block.author() != AuthorityIndex::new_for_test(3) {
2670 assert_eq!(block.ancestors().len(), 3);
2673 } else {
2674 assert_eq!(block.ancestors().len(), 4);
2677 }
2678 }
2679 }
2680
2681 last_round_blocks = this_round_blocks;
2682 }
2683 }
2684
2685 #[tokio::test]
2686 async fn test_smart_ancestor_selection() {
2687 telemetry_subscribers::init_for_testing();
2688 let (mut context, mut key_pairs) = Context::new_for_test(7);
2689 context
2690 .protocol_config
2691 .set_consensus_smart_ancestor_selection_for_testing(true);
2692 context
2693 .protocol_config
2694 .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2695 let context = Arc::new(context.with_parameters(Parameters {
2696 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2697 ..Default::default()
2698 }));
2699
2700 let store = Arc::new(MemStore::new());
2701 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2702
2703 let block_manager = BlockManager::new(
2704 context.clone(),
2705 dag_state.clone(),
2706 Arc::new(NoopBlockVerifier),
2707 );
2708 let leader_schedule = Arc::new(
2709 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2710 .with_num_commits_per_schedule(10),
2711 );
2712
2713 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2714 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2715 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2716 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2718
2719 let (sender, _receiver) = unbounded_channel("consensus_output");
2720 let commit_consumer = CommitConsumer::new(sender, 0);
2721 let commit_observer = CommitObserver::new(
2722 context.clone(),
2723 commit_consumer,
2724 dag_state.clone(),
2725 store.clone(),
2726 leader_schedule.clone(),
2727 );
2728
2729 let mut core = Core::new(
2730 context.clone(),
2731 leader_schedule,
2732 transaction_consumer,
2733 block_manager,
2734 true,
2735 commit_observer,
2736 signals,
2737 key_pairs.remove(context.own_index.value()).1,
2738 dag_state.clone(),
2739 true,
2740 );
2741
2742 assert_eq!(
2744 core.last_proposed_round(),
2745 GENESIS_ROUND,
2746 "No block should have been created other than genesis"
2747 );
2748
2749 assert!(core.try_propose(true).unwrap().is_none());
2751
2752 let mut builder = DagBuilder::new(context.clone());
2754 builder
2755 .layers(1..=12)
2756 .authorities(vec![AuthorityIndex::new_for_test(1)])
2757 .skip_block()
2758 .build();
2759 let blocks = builder.blocks(1..=12);
2760 assert!(core.add_blocks(blocks).unwrap().is_empty());
2762 core.set_last_known_proposed_round(12);
2763
2764 let block = core.try_propose(true).expect("No error").unwrap();
2765 assert_eq!(block.round(), 13);
2766 assert_eq!(block.ancestors().len(), 7);
2767
2768 builder
2770 .layers(13..=14)
2771 .authorities(vec![AuthorityIndex::new_for_test(0)])
2772 .skip_block()
2773 .build();
2774 let blocks = builder.blocks(13..=14);
2775 assert!(core.add_blocks(blocks).unwrap().is_empty());
2776
2777 let block = core.try_propose(true).expect("No error").unwrap();
2781 assert_eq!(block.round(), 15);
2782 assert_eq!(block.ancestors().len(), 6);
2783
2784 builder
2787 .layer(15)
2788 .authorities(vec![
2789 AuthorityIndex::new_for_test(0),
2790 AuthorityIndex::new_for_test(5),
2791 AuthorityIndex::new_for_test(6),
2792 ])
2793 .skip_block()
2794 .build();
2795 let blocks = builder.blocks(15..=15);
2796 let authority_1_excluded_block_reference = blocks
2797 .iter()
2798 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2799 .unwrap()
2800 .reference();
2801 sleep(context.parameters.min_round_delay).await;
2803 assert!(core.add_blocks(blocks).unwrap().is_empty());
2805 assert_eq!(core.last_proposed_block().round(), 15);
2806
2807 builder
2808 .layer(15)
2809 .authorities(vec![
2810 AuthorityIndex::new_for_test(0),
2811 AuthorityIndex::new_for_test(1),
2812 AuthorityIndex::new_for_test(2),
2813 AuthorityIndex::new_for_test(3),
2814 AuthorityIndex::new_for_test(4),
2815 ])
2816 .skip_block()
2817 .build();
2818 let blocks = builder.blocks(15..=15);
2819 let included_block_references = iter::once(&core.last_proposed_block())
2820 .chain(blocks.iter())
2821 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2822 .map(|block| block.reference())
2823 .collect::<Vec<_>>();
2824
2825 assert!(core.add_blocks(blocks).unwrap().is_empty());
2827 assert_eq!(core.last_proposed_block().round(), 16);
2828
2829 let extended_block = loop {
2831 let extended_block =
2832 tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2833 .await
2834 .unwrap()
2835 .unwrap();
2836 if extended_block.block.round() == 16 {
2837 break extended_block;
2838 }
2839 };
2840 assert_eq!(extended_block.block.round(), 16);
2841 assert_eq!(extended_block.block.author(), core.context.own_index);
2842 assert_eq!(extended_block.block.ancestors().len(), 6);
2843 assert_eq!(extended_block.block.ancestors(), included_block_references);
2844 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2845 assert_eq!(
2846 extended_block.excluded_ancestors[0],
2847 authority_1_excluded_block_reference
2848 );
2849
2850 builder
2855 .layer(16)
2856 .authorities(vec![
2857 AuthorityIndex::new_for_test(0),
2858 AuthorityIndex::new_for_test(5),
2859 AuthorityIndex::new_for_test(6),
2860 ])
2861 .skip_block()
2862 .build();
2863 let blocks = builder.blocks(16..=16);
2864 sleep(context.parameters.min_round_delay).await;
2866 assert!(core.add_blocks(blocks).unwrap().is_empty());
2868 assert_eq!(core.last_proposed_block().round(), 16);
2869
2870 let block = core.try_propose(true).expect("No error").unwrap();
2873 assert_eq!(block.round(), 17);
2874 assert_eq!(block.ancestors().len(), 5);
2875
2876 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2878 .await
2879 .unwrap()
2880 .unwrap();
2881 assert_eq!(extended_block.block.round(), 17);
2882 assert_eq!(extended_block.block.author(), core.context.own_index);
2883 assert_eq!(extended_block.block.ancestors().len(), 5);
2884 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2885
2886 core.set_propagation_delay_and_quorum_rounds(
2890 0,
2891 vec![
2892 (16, 16),
2893 (16, 16),
2894 (16, 16),
2895 (16, 16),
2896 (16, 16),
2897 (16, 16),
2898 (16, 16),
2899 ],
2900 vec![
2901 (16, 16),
2902 (16, 16),
2903 (16, 16),
2904 (16, 16),
2905 (16, 16),
2906 (16, 16),
2907 (16, 16),
2908 ],
2909 );
2910
2911 builder
2912 .layer(17)
2913 .authorities(vec![AuthorityIndex::new_for_test(0)])
2914 .skip_block()
2915 .build();
2916 let blocks = builder.blocks(17..=17);
2917 let included_block_references = iter::once(&core.last_proposed_block())
2918 .chain(blocks.iter())
2919 .map(|block| block.reference())
2920 .collect::<Vec<_>>();
2921
2922 sleep(context.parameters.min_round_delay).await;
2924 assert!(core.add_blocks(blocks).unwrap().is_empty());
2925 assert_eq!(core.last_proposed_block().round(), 18);
2926
2927 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2929 .await
2930 .unwrap()
2931 .unwrap();
2932 assert_eq!(extended_block.block.round(), 18);
2933 assert_eq!(extended_block.block.author(), core.context.own_index);
2934 assert_eq!(extended_block.block.ancestors().len(), 7);
2935 assert_eq!(extended_block.block.ancestors(), included_block_references);
2936 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2937 }
2938
2939 #[tokio::test]
2940 async fn test_excluded_ancestor_limit() {
2941 telemetry_subscribers::init_for_testing();
2942 let (mut context, mut key_pairs) = Context::new_for_test(4);
2943 context
2944 .protocol_config
2945 .set_consensus_smart_ancestor_selection_for_testing(true);
2946 context
2947 .protocol_config
2948 .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2949 let context = Arc::new(context.with_parameters(Parameters {
2950 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2951 ..Default::default()
2952 }));
2953
2954 let store = Arc::new(MemStore::new());
2955 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2956
2957 let block_manager = BlockManager::new(
2958 context.clone(),
2959 dag_state.clone(),
2960 Arc::new(NoopBlockVerifier),
2961 );
2962 let leader_schedule = Arc::new(
2963 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2964 .with_num_commits_per_schedule(10),
2965 );
2966
2967 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2968 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2969 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2970 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2972
2973 let (sender, _receiver) = unbounded_channel("consensus_output");
2974 let commit_consumer = CommitConsumer::new(sender, 0);
2975 let commit_observer = CommitObserver::new(
2976 context.clone(),
2977 commit_consumer,
2978 dag_state.clone(),
2979 store.clone(),
2980 leader_schedule.clone(),
2981 );
2982
2983 let mut core = Core::new(
2984 context.clone(),
2985 leader_schedule,
2986 transaction_consumer,
2987 block_manager,
2988 true,
2989 commit_observer,
2990 signals,
2991 key_pairs.remove(context.own_index.value()).1,
2992 dag_state.clone(),
2993 true,
2994 );
2995
2996 assert_eq!(
2998 core.last_proposed_round(),
2999 GENESIS_ROUND,
3000 "No block should have been created other than genesis"
3001 );
3002
3003 let mut builder = DagBuilder::new(context.clone());
3005 builder.layers(1..=3).build();
3006
3007 builder
3011 .layer(4)
3012 .authorities(vec![AuthorityIndex::new_for_test(1)])
3013 .equivocate(9)
3014 .build();
3015 let blocks = builder.blocks(1..=4);
3016
3017 assert!(core.add_blocks(blocks).unwrap().is_empty());
3019 core.set_last_known_proposed_round(3);
3020
3021 let block = core.try_propose(true).expect("No error").unwrap();
3022 assert_eq!(block.round(), 5);
3023 assert_eq!(block.ancestors().len(), 4);
3024
3025 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3027 .await
3028 .unwrap()
3029 .unwrap();
3030 assert_eq!(extended_block.block.round(), 5);
3031 assert_eq!(extended_block.block.author(), core.context.own_index);
3032 assert_eq!(extended_block.block.ancestors().len(), 4);
3033 assert_eq!(extended_block.excluded_ancestors.len(), 8);
3034 }
3035
3036 #[tokio::test]
3037 async fn test_core_set_subscriber_exists() {
3038 telemetry_subscribers::init_for_testing();
3039 let (context, mut key_pairs) = Context::new_for_test(4);
3040 let context = Arc::new(context);
3041 let store = Arc::new(MemStore::new());
3042 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3043
3044 let block_manager = BlockManager::new(
3045 context.clone(),
3046 dag_state.clone(),
3047 Arc::new(NoopBlockVerifier),
3048 );
3049 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3050 context.clone(),
3051 dag_state.clone(),
3052 ));
3053
3054 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3055 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3056 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3057 let _block_receiver = signal_receivers.block_broadcast_receiver();
3059
3060 let (sender, _receiver) = unbounded_channel("consensus_output");
3061 let commit_observer = CommitObserver::new(
3062 context.clone(),
3063 CommitConsumer::new(sender.clone(), 0),
3064 dag_state.clone(),
3065 store.clone(),
3066 leader_schedule.clone(),
3067 );
3068
3069 let mut core = Core::new(
3070 context.clone(),
3071 leader_schedule,
3072 transaction_consumer,
3073 block_manager,
3074 false,
3076 commit_observer,
3077 signals,
3078 key_pairs.remove(context.own_index.value()).1,
3079 dag_state.clone(),
3080 false,
3081 );
3082
3083 assert_eq!(
3085 core.last_proposed_round(),
3086 GENESIS_ROUND,
3087 "No block should have been created other than genesis"
3088 );
3089
3090 assert!(core.try_propose(true).unwrap().is_none());
3092
3093 core.set_quorum_subscribers_exists(true);
3095
3096 assert!(core.try_propose(true).unwrap().is_some());
3098 }
3099
3100 #[tokio::test]
3101 async fn test_core_set_propagation_delay_per_authority() {
3102 telemetry_subscribers::init_for_testing();
3104 let (context, mut key_pairs) = Context::new_for_test(4);
3105 let context = Arc::new(context);
3106 let store = Arc::new(MemStore::new());
3107 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3108
3109 let block_manager = BlockManager::new(
3110 context.clone(),
3111 dag_state.clone(),
3112 Arc::new(NoopBlockVerifier),
3113 );
3114 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3115 context.clone(),
3116 dag_state.clone(),
3117 ));
3118
3119 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3120 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3121 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3122 let _block_receiver = signal_receivers.block_broadcast_receiver();
3124
3125 let (sender, _receiver) = unbounded_channel("consensus_output");
3126 let commit_observer = CommitObserver::new(
3127 context.clone(),
3128 CommitConsumer::new(sender.clone(), 0),
3129 dag_state.clone(),
3130 store.clone(),
3131 leader_schedule.clone(),
3132 );
3133
3134 let mut core = Core::new(
3135 context.clone(),
3136 leader_schedule,
3137 transaction_consumer,
3138 block_manager,
3139 false,
3141 commit_observer,
3142 signals,
3143 key_pairs.remove(context.own_index.value()).1,
3144 dag_state.clone(),
3145 false,
3146 );
3147
3148 assert_eq!(
3150 core.last_proposed_round(),
3151 GENESIS_ROUND,
3152 "No block should have been created other than genesis"
3153 );
3154
3155 core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);
3157
3158 core.set_quorum_subscribers_exists(true);
3160
3161 assert!(core.try_propose(true).unwrap().is_none());
3163
3164 core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);
3166
3167 assert!(core.try_propose(true).unwrap().is_some());
3169 }
3170
3171 #[tokio::test(flavor = "current_thread", start_paused = true)]
3172 async fn test_leader_schedule_change() {
3173 telemetry_subscribers::init_for_testing();
3174 let default_params = Parameters::default();
3175
3176 let (context, _) = Context::new_for_test(4);
3177 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3179
3180 let mut last_round_blocks = Vec::new();
3183 for round in 1..=30 {
3184 let mut this_round_blocks = Vec::new();
3185
3186 sleep(default_params.min_round_delay).await;
3188
3189 for core_fixture in &mut cores {
3190 core_fixture
3194 .core
3195 .add_blocks(last_round_blocks.clone())
3196 .unwrap();
3197
3198 let new_round = receive(
3201 Duration::from_secs(1),
3202 core_fixture.signal_receivers.new_round_receiver(),
3203 )
3204 .await;
3205 assert_eq!(new_round, round);
3206
3207 let extended_block = tokio::time::timeout(
3209 Duration::from_secs(1),
3210 core_fixture.block_receiver.recv(),
3211 )
3212 .await
3213 .unwrap()
3214 .unwrap();
3215 assert_eq!(extended_block.block.round(), round);
3216 assert_eq!(
3217 extended_block.block.author(),
3218 core_fixture.core.context.own_index
3219 );
3220
3221 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3223
3224 let block = core_fixture.core.last_proposed_block();
3225
3226 assert_eq!(
3228 block.ancestors().len(),
3229 core_fixture.core.context.committee.size()
3230 );
3231 for ancestor in block.ancestors() {
3232 if block.round() > 1 {
3233 assert!(
3235 last_round_blocks
3236 .iter()
3237 .any(|block| block.reference() == *ancestor),
3238 "Reference from previous round should be added"
3239 );
3240 }
3241 }
3242 }
3243
3244 last_round_blocks = this_round_blocks;
3245 }
3246
3247 for core_fixture in cores {
3248 let last_commit = core_fixture
3250 .store
3251 .read_last_commit()
3252 .unwrap()
3253 .expect("last commit should be set");
3254 assert_eq!(last_commit.index(), 27);
3258 let all_stored_commits = core_fixture
3259 .store
3260 .scan_commits((0..=CommitIndex::MAX).into())
3261 .unwrap();
3262 assert_eq!(all_stored_commits.len(), 27);
3263 assert_eq!(
3264 core_fixture
3265 .core
3266 .leader_schedule
3267 .leader_swap_table
3268 .read()
3269 .bad_nodes
3270 .len(),
3271 1
3272 );
3273 assert_eq!(
3274 core_fixture
3275 .core
3276 .leader_schedule
3277 .leader_swap_table
3278 .read()
3279 .good_nodes
3280 .len(),
3281 1
3282 );
3283 let expected_reputation_scores =
3284 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3285 assert_eq!(
3286 core_fixture
3287 .core
3288 .leader_schedule
3289 .leader_swap_table
3290 .read()
3291 .reputation_scores,
3292 expected_reputation_scores
3293 );
3294 }
3295 }
3296
3297 #[tokio::test(flavor = "current_thread", start_paused = true)]
3299 async fn test_leader_schedule_change_with_vote_scoring() {
3300 telemetry_subscribers::init_for_testing();
3301 let default_params = Parameters::default();
3302 let (mut context, _) = Context::new_for_test(4);
3303 context
3304 .protocol_config
3305 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3306 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3308 let mut last_round_blocks = Vec::new();
3311 for round in 1..=30 {
3312 let mut this_round_blocks = Vec::new();
3313 sleep(default_params.min_round_delay).await;
3315 for core_fixture in &mut cores {
3316 core_fixture
3320 .core
3321 .add_blocks(last_round_blocks.clone())
3322 .unwrap();
3323 let new_round = receive(
3326 Duration::from_secs(1),
3327 core_fixture.signal_receivers.new_round_receiver(),
3328 )
3329 .await;
3330 assert_eq!(new_round, round);
3331 let extended_block = tokio::time::timeout(
3333 Duration::from_secs(1),
3334 core_fixture.block_receiver.recv(),
3335 )
3336 .await
3337 .unwrap()
3338 .unwrap();
3339 assert_eq!(extended_block.block.round(), round);
3340 assert_eq!(
3341 extended_block.block.author(),
3342 core_fixture.core.context.own_index
3343 );
3344
3345 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3347 let block = core_fixture.core.last_proposed_block();
3348 assert_eq!(
3350 block.ancestors().len(),
3351 core_fixture.core.context.committee.size()
3352 );
3353 for ancestor in block.ancestors() {
3354 if block.round() > 1 {
3355 assert!(
3357 last_round_blocks
3358 .iter()
3359 .any(|block| block.reference() == *ancestor),
3360 "Reference from previous round should be added"
3361 );
3362 }
3363 }
3364 }
3365 last_round_blocks = this_round_blocks;
3366 }
3367 for core_fixture in cores {
3368 let last_commit = core_fixture
3370 .store
3371 .read_last_commit()
3372 .unwrap()
3373 .expect("last commit should be set");
3374 assert_eq!(last_commit.index(), 27);
3378 let all_stored_commits = core_fixture
3379 .store
3380 .scan_commits((0..=CommitIndex::MAX).into())
3381 .unwrap();
3382 assert_eq!(all_stored_commits.len(), 27);
3383 assert_eq!(
3384 core_fixture
3385 .core
3386 .leader_schedule
3387 .leader_swap_table
3388 .read()
3389 .bad_nodes
3390 .len(),
3391 1
3392 );
3393 assert_eq!(
3394 core_fixture
3395 .core
3396 .leader_schedule
3397 .leader_swap_table
3398 .read()
3399 .good_nodes
3400 .len(),
3401 1
3402 );
3403 let expected_reputation_scores =
3404 ReputationScores::new((11..=20).into(), vec![9, 8, 8, 8]);
3405 assert_eq!(
3406 core_fixture
3407 .core
3408 .leader_schedule
3409 .leader_swap_table
3410 .read()
3411 .reputation_scores,
3412 expected_reputation_scores
3413 );
3414 }
3415 }
3416
3417 #[tokio::test]
3418 async fn test_validate_certified_commits() {
3419 telemetry_subscribers::init_for_testing();
3420
3421 let (context, _key_pairs) = Context::new_for_test(4);
3422 let context = context.with_parameters(Parameters {
3423 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3424 ..Default::default()
3425 });
3426
3427 let authority_index = AuthorityIndex::new_for_test(0);
3428 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3429 let mut core = core.core;
3430
3431 assert_eq!(
3433 core.last_proposed_round(),
3434 GENESIS_ROUND,
3435 "No block should have been created other than genesis"
3436 );
3437
3438 let mut dag_builder = DagBuilder::new(core.context.clone());
3440 dag_builder.layers(1..=12).build();
3441
3442 dag_builder.print();
3445 let blocks = dag_builder.blocks(1..=6);
3446
3447 for block in blocks {
3448 core.dag_state.write().accept_block(block);
3449 }
3450
3451 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3453
3454 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3457
3458 assert_eq!(committed_sub_dags.len(), 4);
3460
3461 println!("Case 1. Provide certified commits that are all before the last committed round.");
3463
3464 let certified_commits = sub_dags_and_commits
3466 .iter()
3467 .take(4)
3468 .map(|(_, c)| c)
3469 .cloned()
3470 .collect::<Vec<_>>();
3471 assert!(
3472 certified_commits.last().unwrap().index()
3473 <= committed_sub_dags.last().unwrap().commit_ref.index,
3474 "Highest certified commit should older than the highest committed index."
3475 );
3476
3477 let certified_commits = core.validate_certified_commits(certified_commits).unwrap();
3478
3479 assert!(certified_commits.is_empty());
3481
3482 println!("Case 2. Provide certified commits that are all after the last committed round.");
3483
3484 let certified_commits = sub_dags_and_commits
3486 .iter()
3487 .take(5)
3488 .map(|(_, c)| c.clone())
3489 .collect::<Vec<_>>();
3490
3491 let certified_commits = core
3492 .validate_certified_commits(certified_commits.clone())
3493 .unwrap();
3494
3495 assert_eq!(certified_commits.len(), 1);
3497 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3498
3499 println!(
3500 "Case 3. Provide certified commits where the first certified commit index is not the last_committed_index + 1."
3501 );
3502
3503 let certified_commits = sub_dags_and_commits
3505 .iter()
3506 .skip(5)
3507 .take(1)
3508 .map(|(_, c)| c.clone())
3509 .collect::<Vec<_>>();
3510
3511 let err = core
3512 .validate_certified_commits(certified_commits.clone())
3513 .unwrap_err();
3514 match err {
3515 ConsensusError::UnexpectedCertifiedCommitIndex {
3516 expected_commit_index: 5,
3517 commit_index: 6,
3518 } => (),
3519 _ => panic!("Unexpected error: {err:?}"),
3520 }
3521 }
3522
3523 #[tokio::test]
3524 async fn test_add_certified_commits() {
3525 telemetry_subscribers::init_for_testing();
3526
3527 let (context, _key_pairs) = Context::new_for_test(4);
3528 let context = context.with_parameters(Parameters {
3529 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3530 ..Default::default()
3531 });
3532
3533 let authority_index = AuthorityIndex::new_for_test(0);
3534 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3535 let store = core.store.clone();
3536 let mut core = core.core;
3537
3538 assert_eq!(
3540 core.last_proposed_round(),
3541 GENESIS_ROUND,
3542 "No block should have been created other than genesis"
3543 );
3544
3545 let mut dag_builder = DagBuilder::new(core.context.clone());
3547 dag_builder.layers(1..=12).build();
3548
3549 dag_builder.print();
3552 let blocks = dag_builder.blocks(1..=6);
3553
3554 for block in blocks {
3555 core.dag_state.write().accept_block(block);
3556 }
3557
3558 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3560
3561 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3564
3565 assert_eq!(committed_sub_dags.len(), 4);
3567
3568 let last_commit = store
3569 .read_last_commit()
3570 .unwrap()
3571 .expect("Last commit should be set");
3572 assert_eq!(last_commit.reference().index, 4);
3573
3574 println!("Case 1. Provide no certified commits. No commit should happen.");
3575
3576 let last_commit = store
3577 .read_last_commit()
3578 .unwrap()
3579 .expect("Last commit should be set");
3580 assert_eq!(last_commit.reference().index, 4);
3581
3582 println!(
3583 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3584 );
3585
3586 let certified_commits = sub_dags_and_commits
3589 .iter()
3590 .skip(3)
3591 .take(5)
3592 .map(|(_, c)| c.clone())
3593 .collect::<Vec<_>>();
3594
3595 let blocks = dag_builder.blocks(8..=12);
3598 for block in blocks {
3599 core.dag_state.write().accept_block(block);
3600 }
3601
3602 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3605 .expect("Should not fail");
3606
3607 let commits = store.scan_commits((6..=10).into()).unwrap();
3608
3609 assert_eq!(commits.len(), 5);
3611
3612 for i in 6..=10 {
3613 let commit = &commits[i - 6];
3614 assert_eq!(commit.reference().index, i as u32);
3615 }
3616 }
3617
3618 #[tokio::test]
3619 async fn try_commit_with_certified_commits_gced_blocks() {
3620 const GC_DEPTH: u32 = 3;
3621 telemetry_subscribers::init_for_testing();
3622
3623 let (mut context, mut key_pairs) = Context::new_for_test(5);
3624 context
3625 .protocol_config
3626 .set_consensus_gc_depth_for_testing(GC_DEPTH);
3627 let context = Arc::new(context.with_parameters(Parameters {
3630 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3631 ..Default::default()
3632 }));
3633
3634 let store = Arc::new(MemStore::new());
3635 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3636
3637 let block_manager = BlockManager::new(
3638 context.clone(),
3639 dag_state.clone(),
3640 Arc::new(NoopBlockVerifier),
3641 );
3642 let leader_schedule = Arc::new(
3643 LeaderSchedule::from_store(context.clone(), dag_state.clone())
3644 .with_num_commits_per_schedule(10),
3645 );
3646
3647 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3648 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3649 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3650 let _block_receiver = signal_receivers.block_broadcast_receiver();
3652
3653 let (sender, _receiver) = unbounded_channel("consensus_output");
3654 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
3655 let commit_observer = CommitObserver::new(
3656 context.clone(),
3657 commit_consumer,
3658 dag_state.clone(),
3659 store.clone(),
3660 leader_schedule.clone(),
3661 );
3662
3663 let mut core = Core::new(
3664 context.clone(),
3665 leader_schedule,
3666 transaction_consumer,
3667 block_manager,
3668 true,
3669 commit_observer,
3670 signals,
3671 key_pairs.remove(context.own_index.value()).1,
3672 dag_state.clone(),
3673 true,
3674 );
3675
3676 assert_eq!(
3678 core.last_proposed_round(),
3679 GENESIS_ROUND,
3680 "No block should have been created other than genesis"
3681 );
3682
3683 let dag_str = "DAG {
3684 Round 0 : { 5 },
3685 Round 1 : { * },
3686 Round 2 : {
3687 A -> [-E1],
3688 B -> [-E1],
3689 C -> [-E1],
3690 D -> [-E1],
3691 },
3692 Round 3 : {
3693 A -> [*],
3694 B -> [*],
3695 C -> [*],
3696 D -> [*],
3697 },
3698 Round 4 : {
3699 A -> [*],
3700 B -> [*],
3701 C -> [*],
3702 D -> [*],
3703 },
3704 Round 5 : {
3705 A -> [*],
3706 B -> [*],
3707 C -> [*],
3708 D -> [*],
3709 E -> [A4, B4, C4, D4, E1]
3710 },
3711 Round 6 : { * },
3712 Round 7 : { * },
3713 }";
3714
3715 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3716 dag_builder.print();
3717
3718 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3720 .get_sub_dag_and_certified_commits(1..=5)
3721 .into_iter()
3722 .unzip();
3723
3724 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3728
3729 assert_eq!(committed_sub_dags.len(), 4);
3731 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3732 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3733
3734 for block in committed_sub_dag.blocks.iter() {
3736 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3737 panic!("Did not expect to commit block E1");
3738 }
3739 }
3740 }
3741 }
3742
3743 #[tokio::test(flavor = "current_thread", start_paused = true)]
3744 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3745 telemetry_subscribers::init_for_testing();
3746 let default_params = Parameters::default();
3747
3748 let (context, _) = Context::new_for_test(6);
3749
3750 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3752
3753 let mut last_round_blocks = Vec::new();
3756 for round in 1..=33 {
3757 let mut this_round_blocks = Vec::new();
3758 sleep(default_params.min_round_delay).await;
3760 for core_fixture in &mut cores {
3761 core_fixture
3765 .core
3766 .add_blocks(last_round_blocks.clone())
3767 .unwrap();
3768 let new_round = receive(
3771 Duration::from_secs(1),
3772 core_fixture.signal_receivers.new_round_receiver(),
3773 )
3774 .await;
3775 assert_eq!(new_round, round);
3776 let extended_block = tokio::time::timeout(
3778 Duration::from_secs(1),
3779 core_fixture.block_receiver.recv(),
3780 )
3781 .await
3782 .unwrap()
3783 .unwrap();
3784 assert_eq!(extended_block.block.round(), round);
3785 assert_eq!(
3786 extended_block.block.author(),
3787 core_fixture.core.context.own_index
3788 );
3789
3790 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3792 let block = core_fixture.core.last_proposed_block();
3793 assert_eq!(
3795 block.ancestors().len(),
3796 core_fixture.core.context.committee.size()
3797 );
3798 for ancestor in block.ancestors() {
3799 if block.round() > 1 {
3800 assert!(
3802 last_round_blocks
3803 .iter()
3804 .any(|block| block.reference() == *ancestor),
3805 "Reference from previous round should be added"
3806 );
3807 }
3808 }
3809 }
3810 last_round_blocks = this_round_blocks;
3811 }
3812 for core_fixture in cores {
3813 let last_commit = core_fixture
3815 .store
3816 .read_last_commit()
3817 .unwrap()
3818 .expect("last commit should be set");
3819 let expected_commit_count = 30;
3831 assert_eq!(last_commit.index(), expected_commit_count);
3837 let all_stored_commits = core_fixture
3838 .store
3839 .scan_commits((0..=CommitIndex::MAX).into())
3840 .unwrap();
3841 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3842 assert_eq!(
3843 core_fixture
3844 .core
3845 .leader_schedule
3846 .leader_swap_table
3847 .read()
3848 .bad_nodes
3849 .len(),
3850 1
3851 );
3852 assert_eq!(
3853 core_fixture
3854 .core
3855 .leader_schedule
3856 .leader_swap_table
3857 .read()
3858 .good_nodes
3859 .len(),
3860 1
3861 );
3862 let expected_reputation_scores =
3863 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3864 assert_eq!(
3865 core_fixture
3866 .core
3867 .leader_schedule
3868 .leader_swap_table
3869 .read()
3870 .reputation_scores,
3871 expected_reputation_scores
3872 );
3873 }
3874 }
3875
3876 #[tokio::test(flavor = "current_thread", start_paused = true)]
3878 async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring()
3879 {
3880 telemetry_subscribers::init_for_testing();
3881 let default_params = Parameters::default();
3882
3883 let (mut context, _) = Context::new_for_test(6);
3884 context
3885 .protocol_config
3886 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3887
3888 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3890 let mut last_round_blocks = Vec::new();
3893 for round in 1..=63 {
3894 let mut this_round_blocks = Vec::new();
3895
3896 sleep(default_params.min_round_delay).await;
3898
3899 for core_fixture in &mut cores {
3900 core_fixture
3904 .core
3905 .add_blocks(last_round_blocks.clone())
3906 .unwrap();
3907
3908 let new_round = receive(
3911 Duration::from_secs(1),
3912 core_fixture.signal_receivers.new_round_receiver(),
3913 )
3914 .await;
3915 assert_eq!(new_round, round);
3916
3917 let extended_block = tokio::time::timeout(
3919 Duration::from_secs(1),
3920 core_fixture.block_receiver.recv(),
3921 )
3922 .await
3923 .unwrap()
3924 .unwrap();
3925 assert_eq!(extended_block.block.round(), round);
3926 assert_eq!(
3927 extended_block.block.author(),
3928 core_fixture.core.context.own_index
3929 );
3930
3931 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3933
3934 let block = core_fixture.core.last_proposed_block();
3935
3936 assert_eq!(
3938 block.ancestors().len(),
3939 core_fixture.core.context.committee.size()
3940 );
3941 for ancestor in block.ancestors() {
3942 if block.round() > 1 {
3943 assert!(
3945 last_round_blocks
3946 .iter()
3947 .any(|block| block.reference() == *ancestor),
3948 "Reference from previous round should be added"
3949 );
3950 }
3951 }
3952 }
3953
3954 last_round_blocks = this_round_blocks;
3955 }
3956
3957 for core_fixture in cores {
3958 let last_commit = core_fixture
3960 .store
3961 .read_last_commit()
3962 .unwrap()
3963 .expect("last commit should be set");
3964 let expected_commit_count = 60;
3976 assert_eq!(last_commit.index(), expected_commit_count);
3982 let all_stored_commits = core_fixture
3983 .store
3984 .scan_commits((0..=CommitIndex::MAX).into())
3985 .unwrap();
3986 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3987 assert_eq!(
3988 core_fixture
3989 .core
3990 .leader_schedule
3991 .leader_swap_table
3992 .read()
3993 .bad_nodes
3994 .len(),
3995 1
3996 );
3997 assert_eq!(
3998 core_fixture
3999 .core
4000 .leader_schedule
4001 .leader_swap_table
4002 .read()
4003 .good_nodes
4004 .len(),
4005 1
4006 );
4007 let expected_reputation_scores =
4008 ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
4009 assert_eq!(
4010 core_fixture
4011 .core
4012 .leader_schedule
4013 .leader_swap_table
4014 .read()
4015 .reputation_scores,
4016 expected_reputation_scores
4017 );
4018 }
4019 }
4020
4021 #[tokio::test]
4022 async fn test_core_signals() {
4023 telemetry_subscribers::init_for_testing();
4024 let default_params = Parameters::default();
4025
4026 let (context, _) = Context::new_for_test(4);
4027 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4029
4030 let mut last_round_blocks = Vec::new();
4033 for round in 1..=10 {
4034 let mut this_round_blocks = Vec::new();
4035
4036 sleep(default_params.min_round_delay).await;
4038
4039 for core_fixture in &mut cores {
4040 core_fixture
4044 .core
4045 .add_blocks(last_round_blocks.clone())
4046 .unwrap();
4047
4048 let new_round = receive(
4051 Duration::from_secs(1),
4052 core_fixture.signal_receivers.new_round_receiver(),
4053 )
4054 .await;
4055 assert_eq!(new_round, round);
4056
4057 let extended_block = tokio::time::timeout(
4059 Duration::from_secs(1),
4060 core_fixture.block_receiver.recv(),
4061 )
4062 .await
4063 .unwrap()
4064 .unwrap();
4065 assert_eq!(extended_block.block.round(), round);
4066 assert_eq!(
4067 extended_block.block.author(),
4068 core_fixture.core.context.own_index
4069 );
4070
4071 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
4073
4074 let block = core_fixture.core.last_proposed_block();
4075
4076 assert_eq!(
4078 block.ancestors().len(),
4079 core_fixture.core.context.committee.size()
4080 );
4081 for ancestor in block.ancestors() {
4082 if block.round() > 1 {
4083 assert!(
4085 last_round_blocks
4086 .iter()
4087 .any(|block| block.reference() == *ancestor),
4088 "Reference from previous round should be added"
4089 );
4090 }
4091 }
4092 }
4093
4094 last_round_blocks = this_round_blocks;
4095 }
4096
4097 for core_fixture in cores {
4098 let last_commit = core_fixture
4100 .store
4101 .read_last_commit()
4102 .unwrap()
4103 .expect("last commit should be set");
4104 assert_eq!(last_commit.index(), 7);
4108 let all_stored_commits = core_fixture
4109 .store
4110 .scan_commits((0..=CommitIndex::MAX).into())
4111 .unwrap();
4112 assert_eq!(all_stored_commits.len(), 7);
4113 }
4114 }
4115
4116 #[tokio::test]
4117 async fn test_core_compress_proposal_references() {
4118 telemetry_subscribers::init_for_testing();
4119 let default_params = Parameters::default();
4120
4121 let (context, _) = Context::new_for_test(4);
4122 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4124
4125 let mut last_round_blocks = Vec::new();
4126 let mut all_blocks = Vec::new();
4127
4128 let excluded_authority = AuthorityIndex::new_for_test(3);
4129
4130 for round in 1..=10 {
4131 let mut this_round_blocks = Vec::new();
4132
4133 for core_fixture in &mut cores {
4134 if core_fixture.core.context.own_index == excluded_authority {
4136 continue;
4137 }
4138
4139 core_fixture
4142 .core
4143 .add_blocks(last_round_blocks.clone())
4144 .unwrap();
4145 core_fixture.core.new_block(round, true).unwrap();
4146
4147 let block = core_fixture.core.last_proposed_block();
4148 assert_eq!(block.round(), round);
4149
4150 this_round_blocks.push(block.clone());
4152 }
4153
4154 last_round_blocks = this_round_blocks.clone();
4155 all_blocks.extend(this_round_blocks);
4156 }
4157
4158 let core_fixture = &mut cores[excluded_authority];
4164 sleep(default_params.min_round_delay).await;
4166 core_fixture.core.add_blocks(all_blocks).unwrap();
4168
4169 let block = core_fixture.core.last_proposed_block();
4173 assert_eq!(block.round(), 11);
4174 assert_eq!(block.ancestors().len(), 4);
4175 for block_ref in block.ancestors() {
4176 if block_ref.author == excluded_authority {
4177 assert_eq!(block_ref.round, 1);
4178 } else {
4179 assert_eq!(block_ref.round, 10);
4180 }
4181 }
4182
4183 let last_commit = core_fixture
4185 .store
4186 .read_last_commit()
4187 .unwrap()
4188 .expect("last commit should be set");
4189 assert_eq!(last_commit.index(), 6);
4193 let all_stored_commits = core_fixture
4194 .store
4195 .scan_commits((0..=CommitIndex::MAX).into())
4196 .unwrap();
4197 assert_eq!(all_stored_commits.len(), 6);
4198 }
4199
4200 #[tokio::test]
4201 async fn try_decide_certified() {
4202 telemetry_subscribers::init_for_testing();
4204
4205 let (context, _) = Context::new_for_test(4);
4206
4207 let authority_index = AuthorityIndex::new_for_test(0);
4208 let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
4209 let mut core = core.core;
4210
4211 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4212 dag_builder.layers(1..=12).build();
4213
4214 let limit = 2;
4215
4216 let blocks = dag_builder.blocks(1..=12);
4217
4218 for block in blocks {
4219 core.dag_state.write().accept_block(block);
4220 }
4221
4222 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4224 let mut certified_commits = sub_dags_and_commits
4225 .into_iter()
4226 .map(|(_, commit)| commit)
4227 .collect::<Vec<_>>();
4228
4229 let leaders = core.try_decide_certified(&mut certified_commits, limit);
4230
4231 assert_eq!(leaders.len(), 2);
4233 assert_eq!(certified_commits.len(), 2);
4234 }
4235
4236 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4237 tokio::time::timeout(timeout, receiver.changed())
4238 .await
4239 .expect("Timeout while waiting to read from receiver")
4240 .expect("Signal receive channel shouldn't be closed");
4241 *receiver.borrow_and_update()
4242 }
4243}