1use std::{
6 collections::{BTreeMap, BTreeSet},
7 iter, mem,
8 sync::Arc,
9 time::Duration,
10 vec,
11};
12
13use consensus_config::{AuthorityIndex, ProtocolKeyPair};
14#[cfg(test)]
15use consensus_config::{Stake, local_committee_and_keys};
16use iota_macros::fail_point;
17#[cfg(test)]
18use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
19use iota_metrics::monitored_scope;
20use itertools::Itertools as _;
21use parking_lot::RwLock;
22use tokio::{
23 sync::{broadcast, watch},
24 time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30 CommitConsumer, TransactionClient, block_verifier::NoopBlockVerifier,
31 storage::mem_store::MemStore,
32};
33use crate::{
34 ancestor::{AncestorState, AncestorStateManager},
35 block::{
36 Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, GENESIS_ROUND, Round,
37 SignedBlock, Slot, VerifiedBlock,
38 },
39 block_manager::BlockManager,
40 commit::{
41 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42 },
43 commit_observer::CommitObserver,
44 context::Context,
45 dag_state::DagState,
46 error::{ConsensusError, ConsensusResult},
47 leader_schedule::LeaderSchedule,
48 round_prober::QuorumRound,
49 stake_aggregator::{QuorumThreshold, StakeAggregator},
50 transaction::TransactionConsumer,
51 universal_committer::{
52 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
53 },
54};
55
56const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
59
60pub(crate) struct Core {
61 context: Arc<Context>,
62 transaction_consumer: TransactionConsumer,
65 block_manager: BlockManager,
69 subscriber_exists: bool,
74 propagation_delay: Round,
82
83 committer: UniversalCommitter,
85 last_signaled_round: Round,
87 last_included_ancestors: Vec<Option<BlockRef>>,
92 last_decided_leader: Slot,
99 leader_schedule: Arc<LeaderSchedule>,
102 commit_observer: CommitObserver,
106 signals: CoreSignals,
108 block_signer: ProtocolKeyPair,
110 dag_state: Arc<RwLock<DagState>>,
113 last_known_proposed_round: Option<Round>,
119 ancestor_state_manager: AncestorStateManager,
124}
125
126impl Core {
127 pub(crate) fn new(
128 context: Arc<Context>,
129 leader_schedule: Arc<LeaderSchedule>,
130 transaction_consumer: TransactionConsumer,
131 block_manager: BlockManager,
132 subscriber_exists: bool,
133 commit_observer: CommitObserver,
134 signals: CoreSignals,
135 block_signer: ProtocolKeyPair,
136 dag_state: Arc<RwLock<DagState>>,
137 sync_last_known_own_block: bool,
138 ) -> Self {
139 let last_decided_leader = dag_state.read().last_commit_leader();
140 let committer = UniversalCommitterBuilder::new(
141 context.clone(),
142 leader_schedule.clone(),
143 dag_state.clone(),
144 )
145 .with_number_of_leaders(1)
148 .with_pipeline(true)
149 .build();
150
151 let last_proposed_block = dag_state.read().get_last_proposed_block();
153
154 let last_signaled_round = last_proposed_block.round();
155
156 let mut last_included_ancestors = vec![None; context.committee.size()];
167 for ancestor in last_proposed_block.ancestors() {
168 last_included_ancestors[ancestor.author] = Some(*ancestor);
169 }
170
171 let min_propose_round = if sync_last_known_own_block {
172 None
173 } else {
174 Some(0)
177 };
178
179 let propagation_scores = leader_schedule
180 .leader_swap_table
181 .read()
182 .reputation_scores
183 .clone();
184 let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
185 ancestor_state_manager.set_propagation_scores(propagation_scores);
186
187 Self {
188 context,
189 last_signaled_round,
190 last_included_ancestors,
191 last_decided_leader,
192 leader_schedule,
193 transaction_consumer,
194 block_manager,
195 subscriber_exists,
196 propagation_delay: 0,
197 committer,
198 commit_observer,
199 signals,
200 block_signer,
201 dag_state,
202 last_known_proposed_round: min_propose_round,
203 ancestor_state_manager,
204 }
205 .recover()
206 }
207
208 fn recover(mut self) -> Self {
209 let _s = self
210 .context
211 .metrics
212 .node_metrics
213 .scope_processing_time
214 .with_label_values(&["Core::recover"])
215 .start_timer();
216 let ancestor_blocks = self
218 .dag_state
219 .read()
220 .get_last_cached_block_per_authority(Round::MAX);
221 let max_ancestor_timestamp = ancestor_blocks
222 .iter()
223 .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms()));
224 let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms());
225 if wait_ms > 0 {
226 warn!(
227 "Waiting for {} ms while recovering ancestors from storage",
228 wait_ms
229 );
230 std::thread::sleep(Duration::from_millis(wait_ms));
231 }
232
233 self.try_commit(vec![]).unwrap();
236 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
237 {
238 last_proposed_block
239 } else {
240 let last_proposed_block = self.dag_state.read().get_last_proposed_block();
241 if self.should_propose() {
242 assert!(
243 last_proposed_block.round() > GENESIS_ROUND,
244 "At minimum a block of round higher than genesis should have been produced during recovery"
245 );
246 }
247
248 self.signals
251 .new_block(ExtendedBlock {
252 block: last_proposed_block.clone(),
253 excluded_ancestors: vec![],
254 })
255 .unwrap();
256 last_proposed_block
257 };
258
259 self.try_signal_new_round();
263
264 info!(
265 "Core recovery completed with last proposed block {:?}",
266 last_proposed_block
267 );
268
269 self
270 }
271
272 #[tracing::instrument(skip_all)]
276 pub(crate) fn add_blocks(
277 &mut self,
278 blocks: Vec<VerifiedBlock>,
279 ) -> ConsensusResult<BTreeSet<BlockRef>> {
280 let _scope = monitored_scope("Core::add_blocks");
281 let _s = self
282 .context
283 .metrics
284 .node_metrics
285 .scope_processing_time
286 .with_label_values(&["Core::add_blocks"])
287 .start_timer();
288 self.context
289 .metrics
290 .node_metrics
291 .core_add_blocks_batch_size
292 .observe(blocks.len() as f64);
293
294 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
295
296 if !accepted_blocks.is_empty() {
297 debug!(
298 "Accepted blocks: {}",
299 accepted_blocks
300 .iter()
301 .map(|b| b.reference().to_string())
302 .join(",")
303 );
304
305 self.try_commit(vec![])?;
307
308 self.try_propose(false)?;
310
311 self.try_signal_new_round();
315 }
316
317 if !missing_block_refs.is_empty() {
318 trace!(
319 "Missing block refs: {}",
320 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
321 );
322 }
323 Ok(missing_block_refs)
324 }
325
326 pub(crate) fn check_block_refs(
330 &mut self,
331 block_refs: Vec<BlockRef>,
332 ) -> ConsensusResult<BTreeSet<BlockRef>> {
333 let _scope = monitored_scope("Core::check_block_refs");
334 let _s = self
335 .context
336 .metrics
337 .node_metrics
338 .scope_processing_time
339 .with_label_values(&["Core::check_block_refs"])
340 .start_timer();
341 self.context
342 .metrics
343 .node_metrics
344 .core_check_block_refs_batch_size
345 .observe(block_refs.len() as f64);
346
347 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
349
350 if !missing_block_refs.is_empty() {
351 trace!(
352 "Missing block refs: {}",
353 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
354 );
355 }
356 Ok(missing_block_refs)
357 }
358
359 #[tracing::instrument(skip_all)]
367 pub(crate) fn add_certified_commits(
368 &mut self,
369 certified_commits: CertifiedCommits,
370 ) -> ConsensusResult<BTreeSet<BlockRef>> {
371 let _scope = monitored_scope("Core::add_certified_commits");
372
373 if self.dag_state.read().gc_enabled() {
375 let votes = certified_commits.votes().to_vec();
376 let commits = self
377 .validate_certified_commits(certified_commits.commits().to_vec())
378 .expect("Certified commits validation failed");
379
380 self.block_manager.try_accept_blocks(votes);
384
385 self.try_commit(commits)?;
388
389 self.try_propose(false)?;
391
392 self.try_signal_new_round();
396
397 return Ok(BTreeSet::new());
398 }
399
400 let blocks = certified_commits
402 .commits()
403 .iter()
404 .flat_map(|commit| commit.blocks())
405 .cloned()
406 .collect::<Vec<_>>();
407
408 self.add_blocks(blocks)
409 }
410
411 fn try_signal_new_round(&mut self) {
413 let new_clock_round = self.dag_state.read().threshold_clock_round();
419 if new_clock_round <= self.last_signaled_round {
420 return;
421 }
422 self.signals.new_round(new_clock_round);
424 self.last_signaled_round = new_clock_round;
425
426 self.context
428 .metrics
429 .node_metrics
430 .threshold_clock_round
431 .set(new_clock_round as i64);
432 }
433
434 pub(crate) fn new_block(
439 &mut self,
440 round: Round,
441 force: bool,
442 ) -> ConsensusResult<Option<VerifiedBlock>> {
443 let _scope = monitored_scope("Core::new_block");
444 if self.last_proposed_round() < round {
445 self.context
446 .metrics
447 .node_metrics
448 .leader_timeout_total
449 .with_label_values(&[&format!("{force}")])
450 .inc();
451 let result = self.try_propose(force);
452 self.try_signal_new_round();
454 return result;
455 }
456 Ok(None)
457 }
458
459 fn validate_certified_commits(
463 &mut self,
464 commits: Vec<CertifiedCommit>,
465 ) -> ConsensusResult<Vec<CertifiedCommit>> {
466 let last_commit_index = self.dag_state.read().last_commit_index();
469 let commits = commits
470 .iter()
471 .filter(|commit| {
472 if commit.index() > last_commit_index {
473 true
474 } else {
475 tracing::debug!(
476 "Skip commit for index {} as it is already committed with last commit index {}",
477 commit.index(),
478 last_commit_index
479 );
480 false
481 }
482 })
483 .cloned()
484 .collect::<Vec<_>>();
485
486 if let Some(commit) = commits.first() {
489 if commit.index() != last_commit_index + 1 {
490 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
491 expected_commit_index: last_commit_index + 1,
492 commit_index: commit.index(),
493 });
494 }
495 }
496
497 Ok(commits)
498 }
499
500 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
504 if !self.should_propose() {
505 return Ok(None);
506 }
507 if let Some(extended_block) = self.try_new_block(force) {
508 self.signals.new_block(extended_block.clone())?;
509
510 fail_point!("consensus-after-propose");
511
512 self.try_commit(vec![])?;
514 return Ok(Some(extended_block.block));
515 }
516 Ok(None)
517 }
518
519 fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
523 let _s = self
524 .context
525 .metrics
526 .node_metrics
527 .scope_processing_time
528 .with_label_values(&["Core::try_new_block"])
529 .start_timer();
530
531 let clock_round = {
533 let dag_state = self.dag_state.read();
534 let clock_round = dag_state.threshold_clock_round();
535 if clock_round <= dag_state.get_last_proposed_block().round() {
536 return None;
537 }
538 clock_round
539 };
540
541 let quorum_round = clock_round.saturating_sub(1);
543
544 if !force {
548 if !self.leaders_exist(quorum_round) {
549 return None;
550 }
551
552 if Duration::from_millis(
553 self.context
554 .clock
555 .timestamp_utc_ms()
556 .saturating_sub(self.last_proposed_timestamp_ms()),
557 ) < self.context.parameters.min_round_delay
558 {
559 return None;
560 }
561 }
562
563 let (ancestors, excluded_and_equivocating_ancestors) =
565 self.smart_ancestors_to_propose(clock_round, !force);
566
567 if ancestors.is_empty() {
570 assert!(
571 !force,
572 "Ancestors should have been returned if force is true!"
573 );
574 return None;
575 }
576
577 let excluded_ancestors_limit = self.context.committee.size() * 2;
578 if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
579 debug!(
580 "Dropping {} excluded ancestor(s) during proposal due to size limit",
581 excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
582 );
583 }
584 let excluded_ancestors = excluded_and_equivocating_ancestors
585 .into_iter()
586 .take(excluded_ancestors_limit)
587 .collect();
588
589 for ancestor in &ancestors {
591 self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
592 }
593
594 let leader_authority = &self
595 .context
596 .committee
597 .authority(self.first_leader(quorum_round))
598 .hostname;
599 self.context
600 .metrics
601 .node_metrics
602 .block_proposal_leader_wait_ms
603 .with_label_values(&[leader_authority])
604 .inc_by(
605 Instant::now()
606 .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
607 .as_millis() as u64,
608 );
609 self.context
610 .metrics
611 .node_metrics
612 .block_proposal_leader_wait_count
613 .with_label_values(&[leader_authority])
614 .inc();
615
616 self.context
617 .metrics
618 .node_metrics
619 .proposed_block_ancestors
620 .observe(ancestors.len() as f64);
621 for ancestor in &ancestors {
622 let authority = &self.context.committee.authority(ancestor.author()).hostname;
623 self.context
624 .metrics
625 .node_metrics
626 .proposed_block_ancestors_depth
627 .with_label_values(&[authority])
628 .observe(clock_round.saturating_sub(ancestor.round()).into());
629 }
630
631 let now = self.context.clock.timestamp_utc_ms();
634 ancestors.iter().for_each(|block| {
635 assert!(
636 block.timestamp_ms() <= now,
637 "Violation: ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.",
638 block, block.timestamp_ms(), clock_round
639 );
640 });
641
642 let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
646 self.context
647 .metrics
648 .node_metrics
649 .proposed_block_transactions
650 .observe(transactions.len() as f64);
651
652 let commit_votes = self
654 .dag_state
655 .write()
656 .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
657
658 let block = Block::V1(BlockV1::new(
660 self.context.committee.epoch(),
661 clock_round,
662 self.context.own_index,
663 now,
664 ancestors.iter().map(|b| b.reference()).collect(),
665 transactions,
666 commit_votes,
667 vec![],
668 ));
669 let signed_block =
670 SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
671 let serialized = signed_block
672 .serialize()
673 .expect("Block serialization failed.");
674 self.context
675 .metrics
676 .node_metrics
677 .proposed_block_size
678 .observe(serialized.len() as f64);
679 let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
681
682 let last_proposed_block = self.last_proposed_block();
684 if last_proposed_block.round() > 0 {
685 self.context
686 .metrics
687 .node_metrics
688 .block_proposal_interval
689 .observe(
690 Duration::from_millis(
691 verified_block
692 .timestamp_ms()
693 .saturating_sub(last_proposed_block.timestamp_ms()),
694 )
695 .as_secs_f64(),
696 );
697 }
698
699 let (accepted_blocks, missing) = self
701 .block_manager
702 .try_accept_blocks(vec![verified_block.clone()]);
703 assert_eq!(accepted_blocks.len(), 1);
704 assert!(missing.is_empty());
705
706 self.dag_state.write().flush();
708
709 ack_transactions(verified_block.reference());
711
712 debug!("Created block {verified_block:?} for round {clock_round}");
713
714 self.context
715 .metrics
716 .node_metrics
717 .proposed_blocks
718 .with_label_values(&[&force.to_string()])
719 .inc();
720
721 Some(ExtendedBlock {
722 block: verified_block,
723 excluded_ancestors,
724 })
725 }
726
727 fn try_commit(
731 &mut self,
732 mut certified_commits: Vec<CertifiedCommit>,
733 ) -> ConsensusResult<Vec<CommittedSubDag>> {
734 let _s = self
735 .context
736 .metrics
737 .node_metrics
738 .scope_processing_time
739 .with_label_values(&["Core::try_commit"])
740 .start_timer();
741
742 let mut certified_commits_map = BTreeMap::new();
743 for c in &certified_commits {
744 certified_commits_map.insert(c.index(), c.reference());
745 }
746
747 if !certified_commits.is_empty() {
748 info!(
749 "Will try to commit synced commits first : {:?}",
750 certified_commits
751 .iter()
752 .map(|c| (c.index(), c.leader()))
753 .collect::<Vec<_>>()
754 );
755 }
756
757 let mut committed_sub_dags = Vec::new();
758 loop {
760 let mut commits_until_update = self
765 .leader_schedule
766 .commits_until_leader_schedule_update(self.dag_state.clone());
767
768 if commits_until_update == 0 {
769 let last_commit_index = self.dag_state.read().last_commit_index();
770
771 tracing::info!(
772 "Leader schedule change triggered at commit index {last_commit_index}"
773 );
774 if self
775 .context
776 .protocol_config
777 .consensus_distributed_vote_scoring_strategy()
778 {
779 self.leader_schedule
780 .update_leader_schedule_v2(&self.dag_state);
781
782 let propagation_scores = self
783 .leader_schedule
784 .leader_swap_table
785 .read()
786 .reputation_scores
787 .clone();
788 self.ancestor_state_manager
789 .set_propagation_scores(propagation_scores);
790 } else {
791 self.leader_schedule
792 .update_leader_schedule_v1(&self.dag_state);
793 }
794 commits_until_update = self
795 .leader_schedule
796 .commits_until_leader_schedule_update(self.dag_state.clone());
797
798 fail_point!("consensus-after-leader-schedule-change");
799 }
800 assert!(commits_until_update > 0);
801
802 let (mut decided_leaders, decided_certified_commits): (
805 Vec<DecidedLeader>,
806 Vec<CertifiedCommit>,
807 ) = self
808 .try_decide_certified(&mut certified_commits, commits_until_update)
809 .into_iter()
810 .unzip();
811
812 let blocks = decided_certified_commits
821 .iter()
822 .flat_map(|c| c.blocks())
823 .cloned()
824 .collect::<Vec<_>>();
825 self.block_manager.try_accept_committed_blocks(blocks);
826
827 if decided_leaders.is_empty() {
830 decided_leaders = self.committer.try_decide(self.last_decided_leader);
833
834 if decided_leaders.len() >= commits_until_update {
836 let _ = decided_leaders.split_off(commits_until_update);
837 }
838 }
839
840 let Some(last_decided) = decided_leaders.last().cloned() else {
842 break;
843 };
844
845 self.last_decided_leader = last_decided.slot();
846
847 let sequenced_leaders = decided_leaders
848 .into_iter()
849 .filter_map(|leader| leader.into_committed_block())
850 .collect::<Vec<_>>();
851
852 tracing::debug!(
853 "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
854 sequenced_leaders.len()
855 );
856
857 self.context
858 .metrics
859 .node_metrics
860 .last_decided_leader_round
861 .set(self.last_decided_leader.round as i64);
862
863 if sequenced_leaders.is_empty() {
867 break;
868 }
869
870 tracing::info!(
871 "Committing {} leaders: {}",
872 sequenced_leaders.len(),
873 sequenced_leaders
874 .iter()
875 .map(|b| b.reference().to_string())
876 .join(",")
877 );
878
879 let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
881 if self
882 .context
883 .protocol_config
884 .consensus_distributed_vote_scoring_strategy()
885 {
886 self.dag_state.write().add_scoring_subdags(subdags.clone());
887 } else {
888 self.dag_state
890 .write()
891 .add_unscored_committed_subdags(subdags.clone());
892 }
893
894 self.block_manager
896 .try_unsuspend_blocks_for_latest_gc_round();
897 committed_sub_dags.extend(subdags);
898
899 fail_point!("consensus-after-handle-commit");
900 }
901
902 for sub_dag in &committed_sub_dags {
905 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
906 assert_eq!(
907 commit_ref, sub_dag.commit_ref,
908 "Certified commit has different reference than the committed sub dag"
909 );
910 }
911 }
912
913 let committed_block_refs = committed_sub_dags
915 .iter()
916 .flat_map(|sub_dag| sub_dag.blocks.iter())
917 .filter_map(|block| {
918 (block.author() == self.context.own_index).then_some(block.reference())
919 })
920 .collect::<Vec<_>>();
921 self.transaction_consumer
922 .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
923
924 Ok(committed_sub_dags)
925 }
926
927 pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
928 let _scope = monitored_scope("Core::get_missing_blocks");
929 self.block_manager.missing_blocks()
930 }
931
932 pub(crate) fn set_subscriber_exists(&mut self, exists: bool) {
935 info!("Block subscriber exists: {exists}");
936 self.subscriber_exists = exists;
937 }
938
939 pub(crate) fn set_propagation_delay_and_quorum_rounds(
943 &mut self,
944 delay: Round,
945 received_quorum_rounds: Vec<QuorumRound>,
946 accepted_quorum_rounds: Vec<QuorumRound>,
947 ) {
948 info!(
949 "Received quorum round per authority in ancestor state manager set to: {}",
950 self.context
951 .committee
952 .authorities()
953 .zip(received_quorum_rounds.iter())
954 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
955 .join(", ")
956 );
957 info!(
958 "Accepted quorum round per authority in ancestor state manager set to: {}",
959 self.context
960 .committee
961 .authorities()
962 .zip(accepted_quorum_rounds.iter())
963 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
964 .join(", ")
965 );
966 self.ancestor_state_manager
967 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
968 info!("Propagation round delay set to: {delay}");
969 self.propagation_delay = delay;
970 }
971
972 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
977 if self.last_known_proposed_round.is_some() {
978 panic!(
979 "Should not attempt to set the last known proposed round if that has been already set"
980 );
981 }
982 self.last_known_proposed_round = Some(round);
983 info!("Last known proposed round set to {round}");
984 }
985
986 pub(crate) fn should_propose(&self) -> bool {
988 let clock_round = self.dag_state.read().threshold_clock_round();
989 let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
990
991 if !self.subscriber_exists {
992 debug!("Skip proposing for round {clock_round}, no subscriber exists.");
993 core_skipped_proposals
994 .with_label_values(&["no_subscriber"])
995 .inc();
996 return false;
997 }
998
999 if self.propagation_delay
1000 > self
1001 .context
1002 .parameters
1003 .propagation_delay_stop_proposal_threshold
1004 {
1005 debug!(
1006 "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
1007 self.propagation_delay,
1008 self.context
1009 .parameters
1010 .propagation_delay_stop_proposal_threshold
1011 );
1012 core_skipped_proposals
1013 .with_label_values(&["high_propagation_delay"])
1014 .inc();
1015 return false;
1016 }
1017
1018 let Some(last_known_proposed_round) = self.last_known_proposed_round else {
1019 debug!(
1020 "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
1021 );
1022 core_skipped_proposals
1023 .with_label_values(&["no_last_known_proposed_round"])
1024 .inc();
1025 return false;
1026 };
1027 if clock_round <= last_known_proposed_round {
1028 debug!(
1029 "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
1030 );
1031 core_skipped_proposals
1032 .with_label_values(&["higher_last_known_proposed_round"])
1033 .inc();
1034 return false;
1035 }
1036
1037 true
1038 }
1039
1040 #[tracing::instrument(skip_all)]
1047 fn try_decide_certified(
1048 &mut self,
1049 certified_commits: &mut Vec<CertifiedCommit>,
1050 limit: usize,
1051 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1052 if !self.dag_state.read().gc_enabled() {
1054 return Vec::new();
1055 }
1056
1057 assert!(limit > 0, "limit should be greater than 0");
1058
1059 let to_commit = if certified_commits.len() >= limit {
1060 certified_commits.drain(..limit).collect::<Vec<_>>()
1062 } else {
1063 mem::take(certified_commits)
1065 };
1066
1067 tracing::debug!(
1068 "Decided {} certified leaders: {}",
1069 to_commit.len(),
1070 to_commit.iter().map(|c| c.leader().to_string()).join(",")
1071 );
1072
1073 let sequenced_leaders = to_commit
1074 .into_iter()
1075 .map(|commit| {
1076 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1077 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1078 let leader = DecidedLeader::Commit(leader.clone());
1079 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1080 (leader, commit)
1081 })
1082 .collect::<Vec<_>>();
1083
1084 sequenced_leaders
1085 }
1086
1087 fn smart_ancestors_to_propose(
1092 &mut self,
1093 clock_round: Round,
1094 smart_select: bool,
1095 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1096 let node_metrics = &self.context.metrics.node_metrics;
1097 let _s = node_metrics
1098 .scope_processing_time
1099 .with_label_values(&["Core::smart_ancestors_to_propose"])
1100 .start_timer();
1101
1102 let all_ancestors = self
1104 .dag_state
1105 .read()
1106 .get_last_cached_block_per_authority(clock_round);
1107
1108 assert_eq!(
1109 all_ancestors.len(),
1110 self.context.committee.size(),
1111 "Fatal error, number of returned ancestors don't match committee size."
1112 );
1113
1114 self.ancestor_state_manager.update_all_ancestors_state();
1116 let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1117
1118 let quorum_round = clock_round.saturating_sub(1);
1119
1120 let mut score_and_pending_excluded_ancestors = Vec::new();
1121 let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1122
1123 let included_ancestors = iter::once(self.last_proposed_block().clone())
1128 .chain(
1129 all_ancestors
1130 .into_iter()
1131 .flat_map(|(ancestor, equivocating_ancestors)| {
1132 if ancestor.author() == self.context.own_index {
1133 return None;
1134 }
1135 if let Some(last_block_ref) =
1136 self.last_included_ancestors[ancestor.author()]
1137 {
1138 if last_block_ref.round >= ancestor.round() {
1139 return None;
1140 }
1141 }
1142
1143 excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1145
1146 let ancestor_state = ancestor_state_map[ancestor.author()];
1147 match ancestor_state {
1148 AncestorState::Include => {
1149 trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1150 }
1151 AncestorState::Exclude(score) => {
1152 trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1153 score_and_pending_excluded_ancestors.push((score, ancestor));
1154 return None;
1155 }
1156 }
1157
1158 Some(ancestor)
1159 }),
1160 )
1161 .collect::<Vec<_>>();
1162
1163 let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1164
1165 for ancestor in included_ancestors
1167 .iter()
1168 .filter(|a| a.round() == quorum_round)
1169 {
1170 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1171 }
1172
1173 if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1174 node_metrics.smart_selection_wait.inc();
1175 debug!(
1176 "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1177 parent_round_quorum.stake()
1178 );
1179 return (vec![], BTreeSet::new());
1180 }
1181
1182 score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1185
1186 let mut ancestors_to_propose = included_ancestors;
1187 let mut excluded_ancestors = Vec::new();
1188 for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1189 let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1190 if !parent_round_quorum.reached_threshold(&self.context.committee)
1191 && ancestor.round() == quorum_round
1192 {
1193 debug!(
1194 "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1195 );
1196 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1197 ancestors_to_propose.push(ancestor);
1198 node_metrics
1199 .included_excluded_proposal_ancestors_count_by_authority
1200 .with_label_values(&[block_hostname.as_str(), "timeout"])
1201 .inc();
1202 } else {
1203 excluded_ancestors.push((score, ancestor));
1204 }
1205 }
1206
1207 for (score, ancestor) in excluded_ancestors.iter() {
1213 let excluded_author = ancestor.author();
1214 let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1215 let mut accepted_low_quorum_round = self
1218 .ancestor_state_manager
1219 .accepted_quorum_round_per_authority[excluded_author]
1220 .0;
1221 accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1225
1226 let last_included_round = self.last_included_ancestors[excluded_author]
1227 .map(|block_ref| block_ref.round)
1228 .unwrap_or(GENESIS_ROUND);
1229 if ancestor.round() <= last_included_round {
1230 continue;
1233 }
1234
1235 if last_included_round >= accepted_low_quorum_round {
1236 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1237 trace!(
1238 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1239 ancestor.reference()
1240 );
1241 node_metrics
1242 .excluded_proposal_ancestors_count_by_authority
1243 .with_label_values(&[block_hostname])
1244 .inc();
1245 continue;
1246 }
1247
1248 let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1249 ancestor.clone()
1251 } else {
1252 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1254 trace!(
1255 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1256 ancestor.reference(),
1257 ancestor.round()
1258 );
1259 node_metrics
1260 .excluded_proposal_ancestors_count_by_authority
1261 .with_label_values(&[block_hostname])
1262 .inc();
1263
1264 match self.dag_state.read().get_last_cached_block_in_range(
1270 excluded_author,
1271 last_included_round + 1,
1272 accepted_low_quorum_round + 1,
1273 ) {
1274 Some(earlier_ancestor) => {
1275 earlier_ancestor
1277 }
1278 None => {
1279 continue;
1281 }
1282 }
1283 };
1284 self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1285 ancestors_to_propose.push(ancestor.clone());
1286 trace!(
1287 "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1288 ancestor.reference()
1289 );
1290 node_metrics
1291 .included_excluded_proposal_ancestors_count_by_authority
1292 .with_label_values(&[block_hostname.as_str(), "quorum"])
1293 .inc();
1294 }
1295
1296 assert!(
1297 parent_round_quorum.reached_threshold(&self.context.committee),
1298 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1299 );
1300
1301 info!(
1302 "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1303 ancestors_to_propose.len(),
1304 excluded_and_equivocating_ancestors.len()
1305 );
1306
1307 (ancestors_to_propose, excluded_and_equivocating_ancestors)
1308 }
1309
1310 fn leaders_exist(&self, round: Round) -> bool {
1315 let dag_state = self.dag_state.read();
1316 for leader in self.leaders(round) {
1317 if !dag_state.contains_cached_block_at_slot(leader) {
1322 return false;
1323 }
1324 }
1325
1326 true
1327 }
1328
1329 fn leaders(&self, round: Round) -> Vec<Slot> {
1331 self.committer
1332 .get_leaders(round)
1333 .into_iter()
1334 .map(|authority_index| Slot::new(round, authority_index))
1335 .collect()
1336 }
1337
1338 fn first_leader(&self, round: Round) -> AuthorityIndex {
1340 self.leaders(round).first().unwrap().authority
1341 }
1342
1343 fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1344 self.last_proposed_block().timestamp_ms()
1345 }
1346
1347 fn last_proposed_round(&self) -> Round {
1348 self.last_proposed_block().round()
1349 }
1350
1351 fn last_proposed_block(&self) -> VerifiedBlock {
1352 self.dag_state.read().get_last_proposed_block()
1353 }
1354}
1355
1356pub(crate) struct CoreSignals {
1359 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1360 new_round_sender: watch::Sender<Round>,
1361 context: Arc<Context>,
1362}
1363
1364impl CoreSignals {
1365 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1366 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1370 context.parameters.dag_state_cached_rounds as usize,
1371 );
1372 let (new_round_sender, new_round_receiver) = watch::channel(0);
1373
1374 let me = Self {
1375 tx_block_broadcast,
1376 new_round_sender,
1377 context,
1378 };
1379
1380 let receivers = CoreSignalsReceivers {
1381 rx_block_broadcast,
1382 new_round_receiver,
1383 };
1384
1385 (me, receivers)
1386 }
1387
1388 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1392 if self.context.committee.size() > 1 {
1395 if extended_block.block.round() == GENESIS_ROUND {
1396 debug!("Ignoring broadcasting genesis block to peers");
1397 return Ok(());
1398 }
1399
1400 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1401 warn!("Couldn't broadcast the block to any receiver: {err}");
1402 return Err(ConsensusError::Shutdown);
1403 }
1404 } else {
1405 debug!(
1406 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1407 );
1408 }
1409 Ok(())
1410 }
1411
1412 pub(crate) fn new_round(&mut self, round_number: Round) {
1416 let _ = self.new_round_sender.send_replace(round_number);
1417 }
1418}
1419
1420pub(crate) struct CoreSignalsReceivers {
1424 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1425 new_round_receiver: watch::Receiver<Round>,
1426}
1427
1428impl CoreSignalsReceivers {
1429 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1430 self.rx_block_broadcast.resubscribe()
1431 }
1432
1433 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1434 self.new_round_receiver.clone()
1435 }
1436}
1437
1438#[cfg(test)]
1442pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<CoreTextFixture> {
1443 let mut cores = Vec::new();
1444
1445 for index in 0..authorities.len() {
1446 let own_index = AuthorityIndex::new_for_test(index as u32);
1447 let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
1448 cores.push(core);
1449 }
1450 cores
1451}
1452
1453#[cfg(test)]
1454pub(crate) struct CoreTextFixture {
1455 pub core: Core,
1456 pub signal_receivers: CoreSignalsReceivers,
1457 pub block_receiver: broadcast::Receiver<ExtendedBlock>,
1458 #[expect(unused)]
1459 pub commit_receiver: UnboundedReceiver<CommittedSubDag>,
1460 pub store: Arc<MemStore>,
1461}
1462
1463#[cfg(test)]
1464impl CoreTextFixture {
1465 fn new(
1466 context: Context,
1467 authorities: Vec<Stake>,
1468 own_index: AuthorityIndex,
1469 sync_last_known_own_block: bool,
1470 ) -> Self {
1471 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1472 let mut context = context.clone();
1473 context = context
1474 .with_committee(committee)
1475 .with_authority_index(own_index);
1476 context
1477 .protocol_config
1478 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1479
1480 let context = Arc::new(context);
1481 let store = Arc::new(MemStore::new());
1482 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1483
1484 let block_manager = BlockManager::new(
1485 context.clone(),
1486 dag_state.clone(),
1487 Arc::new(NoopBlockVerifier),
1488 );
1489 let leader_schedule = Arc::new(
1490 LeaderSchedule::from_store(context.clone(), dag_state.clone())
1491 .with_num_commits_per_schedule(10),
1492 );
1493 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1494 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1495 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1496 let block_receiver = signal_receivers.block_broadcast_receiver();
1498
1499 let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
1500 let commit_observer = CommitObserver::new(
1501 context.clone(),
1502 CommitConsumer::new(commit_sender.clone(), 0),
1503 dag_state.clone(),
1504 store.clone(),
1505 leader_schedule.clone(),
1506 );
1507
1508 let block_signer = signers.remove(own_index.value()).1;
1509
1510 let core = Core::new(
1511 context,
1512 leader_schedule,
1513 transaction_consumer,
1514 block_manager,
1515 true,
1516 commit_observer,
1517 signals,
1518 block_signer,
1519 dag_state,
1520 sync_last_known_own_block,
1521 );
1522
1523 Self {
1524 core,
1525 signal_receivers,
1526 block_receiver,
1527 commit_receiver,
1528 store,
1529 }
1530 }
1531}
1532
1533#[cfg(test)]
1534mod test {
1535 use std::{collections::BTreeSet, time::Duration};
1536
1537 use consensus_config::{AuthorityIndex, Parameters};
1538 use futures::{StreamExt, stream::FuturesUnordered};
1539 use iota_metrics::monitored_mpsc::unbounded_channel;
1540 use iota_protocol_config::ProtocolConfig;
1541 use rstest::rstest;
1542 use tokio::time::sleep;
1543
1544 use super::*;
1545 use crate::{
1546 CommitConsumer, CommitIndex,
1547 block::{TestBlock, genesis_blocks},
1548 block_verifier::NoopBlockVerifier,
1549 commit::CommitAPI,
1550 leader_scoring::ReputationScores,
1551 storage::{Store, WriteBatch, mem_store::MemStore},
1552 test_dag_builder::DagBuilder,
1553 test_dag_parser::parse_dag,
1554 transaction::{BlockStatus, TransactionClient},
1555 };
1556
1557 #[tokio::test]
1560 async fn test_core_recover_from_store_for_full_round() {
1561 telemetry_subscribers::init_for_testing();
1562 let (context, mut key_pairs) = Context::new_for_test(4);
1563 let context = Arc::new(context);
1564 let store = Arc::new(MemStore::new());
1565 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1566 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1567 let mut block_status_subscriptions = FuturesUnordered::new();
1568
1569 let mut last_round_blocks = genesis_blocks(context.clone());
1572 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1573 for round in 1..=4 {
1574 let mut this_round_blocks = Vec::new();
1575 for (index, _authority) in context.committee.authorities() {
1576 let block = VerifiedBlock::new_for_test(
1577 TestBlock::new(round, index.value() as u32)
1578 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1579 .build(),
1580 );
1581
1582 if round == 1 && index == context.own_index {
1585 let subscription =
1586 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1587 block_status_subscriptions.push(subscription);
1588 }
1589
1590 this_round_blocks.push(block);
1591 }
1592 all_blocks.extend(this_round_blocks.clone());
1593 last_round_blocks = this_round_blocks;
1594 }
1595 store
1597 .write(WriteBatch::default().blocks(all_blocks))
1598 .expect("Storage error");
1599
1600 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1602 let block_manager = BlockManager::new(
1603 context.clone(),
1604 dag_state.clone(),
1605 Arc::new(NoopBlockVerifier),
1606 );
1607 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1608 context.clone(),
1609 dag_state.clone(),
1610 ));
1611
1612 let (sender, _receiver) = unbounded_channel("consensus_output");
1613 let commit_observer = CommitObserver::new(
1614 context.clone(),
1615 CommitConsumer::new(sender.clone(), 0),
1616 dag_state.clone(),
1617 store.clone(),
1618 leader_schedule.clone(),
1619 );
1620
1621 let last_commit = store.read_last_commit().unwrap();
1623 assert!(last_commit.is_none());
1624 assert_eq!(dag_state.read().last_commit_index(), 0);
1625
1626 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1628 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1630 let _core = Core::new(
1631 context.clone(),
1632 leader_schedule,
1633 transaction_consumer,
1634 block_manager,
1635 true,
1636 commit_observer,
1637 signals,
1638 key_pairs.remove(context.own_index.value()).1,
1639 dag_state.clone(),
1640 false,
1641 );
1642
1643 let mut new_round = signal_receivers.new_round_receiver();
1645 assert_eq!(*new_round.borrow_and_update(), 5);
1646
1647 let proposed_block = block_receiver
1649 .recv()
1650 .await
1651 .expect("A block should have been created");
1652 assert_eq!(proposed_block.block.round(), 5);
1653 let ancestors = proposed_block.block.ancestors();
1654
1655 assert_eq!(ancestors.len(), 4);
1657 for ancestor in ancestors {
1658 assert_eq!(ancestor.round, 4);
1659 }
1660
1661 let last_commit = store
1662 .read_last_commit()
1663 .unwrap()
1664 .expect("last commit should be set");
1665
1666 assert_eq!(last_commit.index(), 2);
1670 assert_eq!(dag_state.read().last_commit_index(), 2);
1671 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1672 assert_eq!(all_stored_commits.len(), 2);
1673
1674 while let Some(result) = block_status_subscriptions.next().await {
1677 let status = result.unwrap();
1678 assert!(matches!(status, BlockStatus::Sequenced(_)));
1679 }
1680 }
1681
1682 #[tokio::test]
1686 async fn test_core_recover_from_store_for_partial_round() {
1687 telemetry_subscribers::init_for_testing();
1688
1689 let (context, mut key_pairs) = Context::new_for_test(4);
1690 let context = Arc::new(context);
1691 let store = Arc::new(MemStore::new());
1692 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1693 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1694
1695 let mut last_round_blocks = genesis_blocks(context.clone());
1697 let mut all_blocks = last_round_blocks.clone();
1698 for round in 1..=4 {
1699 let mut this_round_blocks = Vec::new();
1700
1701 let authorities_to_skip = if round == 4 {
1704 context.committee.validity_threshold() as usize
1705 } else {
1706 1
1708 };
1709
1710 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1711 let block = TestBlock::new(round, index.value() as u32)
1712 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1713 .build();
1714 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1715 }
1716 all_blocks.extend(this_round_blocks.clone());
1717 last_round_blocks = this_round_blocks;
1718 }
1719
1720 store
1722 .write(WriteBatch::default().blocks(all_blocks))
1723 .expect("Storage error");
1724
1725 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1727 let block_manager = BlockManager::new(
1728 context.clone(),
1729 dag_state.clone(),
1730 Arc::new(NoopBlockVerifier),
1731 );
1732 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1733 context.clone(),
1734 dag_state.clone(),
1735 ));
1736
1737 let (sender, _receiver) = unbounded_channel("consensus_output");
1738 let commit_observer = CommitObserver::new(
1739 context.clone(),
1740 CommitConsumer::new(sender.clone(), 0),
1741 dag_state.clone(),
1742 store.clone(),
1743 leader_schedule.clone(),
1744 );
1745
1746 let last_commit = store.read_last_commit().unwrap();
1748 assert!(last_commit.is_none());
1749 assert_eq!(dag_state.read().last_commit_index(), 0);
1750
1751 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1753 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1755 let mut core = Core::new(
1756 context.clone(),
1757 leader_schedule,
1758 transaction_consumer,
1759 block_manager,
1760 true,
1761 commit_observer,
1762 signals,
1763 key_pairs.remove(context.own_index.value()).1,
1764 dag_state.clone(),
1765 false,
1766 );
1767
1768 let mut new_round = signal_receivers.new_round_receiver();
1771 assert_eq!(*new_round.borrow_and_update(), 5);
1772
1773 let proposed_block = block_receiver
1775 .recv()
1776 .await
1777 .expect("A block should have been created");
1778 assert_eq!(proposed_block.block.round(), 4);
1779 let ancestors = proposed_block.block.ancestors();
1780
1781 assert_eq!(ancestors.len(), 4);
1782 for ancestor in ancestors {
1783 if ancestor.author == context.own_index {
1784 assert_eq!(ancestor.round, 0);
1785 } else {
1786 assert_eq!(ancestor.round, 3);
1787 }
1788 }
1789
1790 core.try_commit(vec![]).ok();
1792 let last_commit = store
1793 .read_last_commit()
1794 .unwrap()
1795 .expect("last commit should be set");
1796
1797 assert_eq!(last_commit.index(), 2);
1801 assert_eq!(dag_state.read().last_commit_index(), 2);
1802 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1803 assert_eq!(all_stored_commits.len(), 2);
1804 }
1805
1806 #[tokio::test]
1807 async fn test_core_propose_after_genesis() {
1808 telemetry_subscribers::init_for_testing();
1809 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1810 config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1811 config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1812 config
1813 });
1814
1815 let (context, mut key_pairs) = Context::new_for_test(4);
1816 let context = Arc::new(context);
1817 let store = Arc::new(MemStore::new());
1818 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1819
1820 let block_manager = BlockManager::new(
1821 context.clone(),
1822 dag_state.clone(),
1823 Arc::new(NoopBlockVerifier),
1824 );
1825 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1826 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1827 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1828 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1830 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1831 context.clone(),
1832 dag_state.clone(),
1833 ));
1834
1835 let (sender, _receiver) = unbounded_channel("consensus_output");
1836 let commit_observer = CommitObserver::new(
1837 context.clone(),
1838 CommitConsumer::new(sender.clone(), 0),
1839 dag_state.clone(),
1840 store.clone(),
1841 leader_schedule.clone(),
1842 );
1843
1844 let mut core = Core::new(
1845 context.clone(),
1846 leader_schedule,
1847 transaction_consumer,
1848 block_manager,
1849 true,
1850 commit_observer,
1851 signals,
1852 key_pairs.remove(context.own_index.value()).1,
1853 dag_state.clone(),
1854 false,
1855 );
1856
1857 let mut total = 0;
1859 let mut index = 0;
1860 loop {
1861 let transaction =
1862 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1863 total += transaction.len();
1864 index += 1;
1865 let _w = transaction_client
1866 .submit_no_wait(vec![transaction])
1867 .await
1868 .unwrap();
1869
1870 if total >= 1_000 {
1872 break;
1873 }
1874 }
1875
1876 let extended_block = block_receiver
1878 .recv()
1879 .await
1880 .expect("A new block should have been created");
1881
1882 assert_eq!(extended_block.block.round(), 1);
1884 assert_eq!(extended_block.block.author().value(), 0);
1885 assert_eq!(extended_block.block.ancestors().len(), 4);
1886
1887 let mut total = 0;
1888 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1889 total += transaction.data().len() as u64;
1890 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1891 assert_eq!(format!("Transaction {i}"), transaction);
1892 }
1893 assert!(
1894 total
1895 <= context
1896 .protocol_config
1897 .consensus_max_transactions_in_block_bytes()
1898 );
1899
1900 let all_genesis = genesis_blocks(context);
1902
1903 for ancestor in extended_block.block.ancestors() {
1904 all_genesis
1905 .iter()
1906 .find(|block| block.reference() == *ancestor)
1907 .expect("Block should be found amongst genesis blocks");
1908 }
1909
1910 assert!(core.try_propose(false).unwrap().is_none());
1913 assert!(core.try_propose(true).unwrap().is_none());
1914
1915 let last_commit = store.read_last_commit().unwrap();
1917 assert!(last_commit.is_none());
1918 assert_eq!(dag_state.read().last_commit_index(), 0);
1919 }
1920
1921 #[tokio::test]
1922 async fn test_core_propose_once_receiving_a_quorum() {
1923 telemetry_subscribers::init_for_testing();
1924 let (context, mut key_pairs) = Context::new_for_test(4);
1925 let context = Arc::new(context);
1926
1927 let store = Arc::new(MemStore::new());
1928 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1929
1930 let block_manager = BlockManager::new(
1931 context.clone(),
1932 dag_state.clone(),
1933 Arc::new(NoopBlockVerifier),
1934 );
1935 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1936 context.clone(),
1937 dag_state.clone(),
1938 ));
1939
1940 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1941 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1942 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1943 let _block_receiver = signal_receivers.block_broadcast_receiver();
1945
1946 let (sender, _receiver) = unbounded_channel("consensus_output");
1947 let commit_observer = CommitObserver::new(
1948 context.clone(),
1949 CommitConsumer::new(sender.clone(), 0),
1950 dag_state.clone(),
1951 store.clone(),
1952 leader_schedule.clone(),
1953 );
1954
1955 let mut core = Core::new(
1956 context.clone(),
1957 leader_schedule,
1958 transaction_consumer,
1959 block_manager,
1960 true,
1961 commit_observer,
1962 signals,
1963 key_pairs.remove(context.own_index.value()).1,
1964 dag_state.clone(),
1965 false,
1966 );
1967
1968 let mut expected_ancestors = BTreeSet::new();
1969
1970 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1972 expected_ancestors.insert(block_1.reference());
1973 sleep(context.parameters.min_round_delay).await;
1975 _ = core.add_blocks(vec![block_1]);
1977
1978 assert_eq!(core.last_proposed_round(), 1);
1979 expected_ancestors.insert(core.last_proposed_block().reference());
1980 assert!(core.try_propose(false).unwrap().is_none());
1982
1983 let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1986 expected_ancestors.insert(block_3.reference());
1987 sleep(context.parameters.min_round_delay).await;
1989 _ = core.add_blocks(vec![block_3]);
1991
1992 assert_eq!(core.last_proposed_round(), 2);
1993
1994 let proposed_block = core.last_proposed_block();
1995 assert_eq!(proposed_block.round(), 2);
1996 assert_eq!(proposed_block.author(), context.own_index);
1997 assert_eq!(proposed_block.ancestors().len(), 3);
1998 let ancestors = proposed_block.ancestors();
1999 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
2000 assert_eq!(ancestors, expected_ancestors);
2001
2002 let last_commit = store.read_last_commit().unwrap();
2004 assert!(last_commit.is_none());
2005 assert_eq!(dag_state.read().last_commit_index(), 0);
2006 }
2007
2008 #[rstest]
2009 #[tokio::test]
2010 async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
2011 telemetry_subscribers::init_for_testing();
2012 let (mut context, mut key_pairs) = Context::new_for_test(4);
2013
2014 if gc_depth > 0 {
2015 context
2016 .protocol_config
2017 .set_consensus_gc_depth_for_testing(gc_depth);
2018 }
2019
2020 let context = Arc::new(context);
2021
2022 let store = Arc::new(MemStore::new());
2023 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2024 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2025 let mut block_status_subscriptions = FuturesUnordered::new();
2026
2027 let dag_str = "DAG {
2028 Round 0 : { 4 },
2029 Round 1 : { * },
2030 Round 2 : { * },
2031 Round 3 : {
2032 A -> [*],
2033 B -> [-A2],
2034 C -> [-A2],
2035 D -> [-A2],
2036 },
2037 Round 4 : {
2038 B -> [-A3],
2039 C -> [-A3],
2040 D -> [-A3],
2041 },
2042 Round 5 : {
2043 A -> [A3, B4, C4, D4]
2044 B -> [*],
2045 C -> [*],
2046 D -> [*],
2047 },
2048 Round 6 : { * },
2049 Round 7 : { * },
2050 Round 8 : { * },
2051 }";
2052
2053 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2054 dag_builder.print();
2055
2056 for block in dag_builder.blocks(1..=5) {
2059 if block.author() == context.own_index {
2060 let subscription =
2061 transaction_consumer.subscribe_for_block_status_testing(block.reference());
2062 block_status_subscriptions.push(subscription);
2063 }
2064 }
2065
2066 store
2068 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2069 .expect("Storage error");
2070
2071 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2073 let block_manager = BlockManager::new(
2074 context.clone(),
2075 dag_state.clone(),
2076 Arc::new(NoopBlockVerifier),
2077 );
2078 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2079 context.clone(),
2080 dag_state.clone(),
2081 ));
2082
2083 let (sender, _receiver) = unbounded_channel("consensus_output");
2084 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2085 let commit_observer = CommitObserver::new(
2086 context.clone(),
2087 commit_consumer,
2088 dag_state.clone(),
2089 store.clone(),
2090 leader_schedule.clone(),
2091 );
2092
2093 let last_commit = store.read_last_commit().unwrap();
2095 assert!(last_commit.is_none());
2096 assert_eq!(dag_state.read().last_commit_index(), 0);
2097
2098 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2100 let _block_receiver = signal_receivers.block_broadcast_receiver();
2102 let _core = Core::new(
2103 context.clone(),
2104 leader_schedule,
2105 transaction_consumer,
2106 block_manager,
2107 true,
2108 commit_observer,
2109 signals,
2110 key_pairs.remove(context.own_index.value()).1,
2111 dag_state.clone(),
2112 false,
2113 );
2114
2115 let last_commit = store
2116 .read_last_commit()
2117 .unwrap()
2118 .expect("last commit should be set");
2119
2120 assert_eq!(last_commit.index(), 5);
2121
2122 while let Some(result) = block_status_subscriptions.next().await {
2123 let status = result.unwrap();
2124
2125 if gc_depth > 0 {
2127 match status {
2128 BlockStatus::Sequenced(block_ref) => {
2129 assert!(block_ref.round == 1 || block_ref.round == 5);
2130 }
2131 BlockStatus::GarbageCollected(block_ref) => {
2132 assert!(block_ref.round == 2 || block_ref.round == 3);
2133 }
2134 }
2135 } else {
2136 assert!(matches!(status, BlockStatus::Sequenced(_)));
2138 }
2139 }
2140 }
2141
2142 #[tokio::test]
2146 async fn test_multiple_commits_advance_threshold_clock() {
2147 telemetry_subscribers::init_for_testing();
2148 let (mut context, mut key_pairs) = Context::new_for_test(4);
2149 const GC_DEPTH: u32 = 2;
2150
2151 context
2152 .protocol_config
2153 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2154
2155 let context = Arc::new(context);
2156
2157 let store = Arc::new(MemStore::new());
2158 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2159 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2160
2161 let dag_str = "DAG {
2167 Round 0 : { 4 },
2168 Round 1 : { * },
2169 Round 2 : {
2170 B -> [-D1],
2171 C -> [-D1],
2172 D -> [-D1],
2173 },
2174 Round 3 : {
2175 B -> [*],
2176 C -> [*]
2177 D -> [*],
2178 },
2179 Round 4 : {
2180 A -> [*],
2181 B -> [*],
2182 C -> [*]
2183 D -> [*],
2184 },
2185 Round 5 : {
2186 B -> [*],
2187 C -> [*],
2188 D -> [*],
2189 },
2190 Round 6 : {
2191 B -> [A6, B6, C6, D1],
2192 C -> [A6, B6, C6, D1],
2193 D -> [A6, B6, C6, D1],
2194 },
2195 Round 7 : {
2196 B -> [*],
2197 C -> [*],
2198 D -> [*],
2199 },
2200 Round 8 : {
2201 B -> [*],
2202 C -> [*],
2203 D -> [*],
2204 },
2205 Round 9 : {
2206 B -> [*],
2207 C -> [*],
2208 D -> [*],
2209 },
2210 Round 10 : {
2211 B -> [*],
2212 C -> [*],
2213 D -> [*],
2214 },
2215 Round 11 : {
2216 B -> [*],
2217 C -> [*],
2218 D -> [*],
2219 },
2220 }";
2221
2222 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2223 dag_builder.print();
2224
2225 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2227 let block_manager = BlockManager::new(
2228 context.clone(),
2229 dag_state.clone(),
2230 Arc::new(NoopBlockVerifier),
2231 );
2232 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2233 context.clone(),
2234 dag_state.clone(),
2235 ));
2236 let (sender, _receiver) = unbounded_channel("consensus_output");
2237 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2238 let commit_observer = CommitObserver::new(
2239 context.clone(),
2240 commit_consumer,
2241 dag_state.clone(),
2242 store.clone(),
2243 leader_schedule.clone(),
2244 );
2245
2246 let last_commit = store.read_last_commit().unwrap();
2248 assert!(last_commit.is_none());
2249 assert_eq!(dag_state.read().last_commit_index(), 0);
2250
2251 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2253 let _block_receiver = signal_receivers.block_broadcast_receiver();
2255 let mut core = Core::new(
2256 context.clone(),
2257 leader_schedule,
2258 transaction_consumer,
2259 block_manager,
2260 true,
2261 commit_observer,
2262 signals,
2263 key_pairs.remove(context.own_index.value()).1,
2264 dag_state.clone(),
2265 true,
2266 );
2267 core.set_last_known_proposed_round(4);
2271
2272 core.add_blocks(
2284 dag_builder
2285 .blocks(1..=11)
2286 .into_iter()
2287 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2288 .collect(),
2289 )
2290 .expect("Should not fail");
2291
2292 assert_eq!(core.last_proposed_round(), 12);
2293 }
2294
2295 #[tokio::test]
2296 async fn test_core_set_min_propose_round() {
2297 telemetry_subscribers::init_for_testing();
2298 let (context, mut key_pairs) = Context::new_for_test(4);
2299 let context = Arc::new(context.with_parameters(Parameters {
2300 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2301 ..Default::default()
2302 }));
2303
2304 let store = Arc::new(MemStore::new());
2305 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2306
2307 let block_manager = BlockManager::new(
2308 context.clone(),
2309 dag_state.clone(),
2310 Arc::new(NoopBlockVerifier),
2311 );
2312 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2313 context.clone(),
2314 dag_state.clone(),
2315 ));
2316
2317 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2318 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2319 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2320 let _block_receiver = signal_receivers.block_broadcast_receiver();
2322
2323 let (sender, _receiver) = unbounded_channel("consensus_output");
2324 let commit_observer = CommitObserver::new(
2325 context.clone(),
2326 CommitConsumer::new(sender.clone(), 0),
2327 dag_state.clone(),
2328 store.clone(),
2329 leader_schedule.clone(),
2330 );
2331
2332 let mut core = Core::new(
2333 context.clone(),
2334 leader_schedule,
2335 transaction_consumer,
2336 block_manager,
2337 true,
2338 commit_observer,
2339 signals,
2340 key_pairs.remove(context.own_index.value()).1,
2341 dag_state.clone(),
2342 true,
2343 );
2344
2345 assert_eq!(
2347 core.last_proposed_round(),
2348 GENESIS_ROUND,
2349 "No block should have been created other than genesis"
2350 );
2351
2352 assert!(core.try_propose(true).unwrap().is_none());
2354
2355 let mut builder = DagBuilder::new(context.clone());
2358 builder.layers(1..=10).build();
2359
2360 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2361
2362 assert!(core.add_blocks(blocks).unwrap().is_empty());
2364
2365 assert!(core.try_propose(true).unwrap().is_none());
2367
2368 core.set_last_known_proposed_round(10);
2371
2372 let block = core.try_propose(true).expect("No error").unwrap();
2373 assert_eq!(block.round(), 11);
2374 assert_eq!(block.ancestors().len(), 4);
2375
2376 let our_ancestor_included = block.ancestors()[0];
2377 assert_eq!(our_ancestor_included.author, context.own_index);
2378 assert_eq!(our_ancestor_included.round, 10);
2379 }
2380
2381 #[tokio::test(flavor = "current_thread", start_paused = true)]
2382 async fn test_core_try_new_block_leader_timeout() {
2383 telemetry_subscribers::init_for_testing();
2384
2385 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2394 let now = context.clock.timestamp_utc_ms();
2397 let max_timestamp = blocks
2398 .iter()
2399 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2400 .map(|block| block.timestamp_ms())
2401 .unwrap_or(0);
2402
2403 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2404 sleep(wait_time).await;
2405 }
2406
2407 let (context, _) = Context::new_for_test(4);
2408 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2410
2411 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2417
2418 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2421 for round in 1..=3 {
2422 let mut this_round_blocks = Vec::new();
2423
2424 for core_fixture in cores.iter_mut() {
2425 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2426
2427 core_fixture
2428 .core
2429 .add_blocks(last_round_blocks.clone())
2430 .unwrap();
2431
2432 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2434 assert_eq!(round - 1, r);
2435 if core_fixture.core.last_proposed_round() == r {
2436 core_fixture
2438 .core
2439 .try_propose(true)
2440 .unwrap()
2441 .unwrap_or_else(|| {
2442 panic!("Block should have been proposed for round {}", round)
2443 });
2444 }
2445 }
2446
2447 assert_eq!(core_fixture.core.last_proposed_round(), round);
2448
2449 this_round_blocks.push(core_fixture.core.last_proposed_block());
2450 }
2451
2452 last_round_blocks = this_round_blocks;
2453 }
2454
2455 for core_fixture in cores.iter_mut() {
2459 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2460
2461 core_fixture
2462 .core
2463 .add_blocks(last_round_blocks.clone())
2464 .unwrap();
2465 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2466 }
2467
2468 for core_fixture in cores.iter_mut() {
2471 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2472 assert_eq!(core_fixture.core.last_proposed_round(), 4);
2473
2474 let last_commit = core_fixture
2476 .store
2477 .read_last_commit()
2478 .unwrap()
2479 .expect("last commit should be set");
2480 assert_eq!(last_commit.index(), 1);
2483 let all_stored_commits = core_fixture
2484 .store
2485 .scan_commits((0..=CommitIndex::MAX).into())
2486 .unwrap();
2487 assert_eq!(all_stored_commits.len(), 1);
2488 }
2489 }
2490
2491 #[tokio::test(flavor = "current_thread", start_paused = true)]
2492 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2493 telemetry_subscribers::init_for_testing();
2494
2495 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2504 let now = context.clock.timestamp_utc_ms();
2507 let max_timestamp = blocks
2508 .iter()
2509 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2510 .map(|block| block.timestamp_ms())
2511 .unwrap_or(0);
2512
2513 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2514 sleep(wait_time).await;
2515 }
2516
2517 let (context, _) = Context::new_for_test(4);
2518
2519 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2521 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2522
2523 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2526 for round in 1..=30 {
2527 let mut this_round_blocks = Vec::new();
2528
2529 for core_fixture in cores.iter_mut() {
2530 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2531
2532 core_fixture
2533 .core
2534 .add_blocks(last_round_blocks.clone())
2535 .unwrap();
2536
2537 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2539 assert_eq!(round - 1, r);
2540 if core_fixture.core.last_proposed_round() == r {
2541 core_fixture
2543 .core
2544 .try_propose(true)
2545 .unwrap()
2546 .unwrap_or_else(|| {
2547 panic!("Block should have been proposed for round {}", round)
2548 });
2549 }
2550 }
2551
2552 assert_eq!(core_fixture.core.last_proposed_round(), round);
2553
2554 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2555 }
2556
2557 last_round_blocks = this_round_blocks;
2558 }
2559
2560 for round in 31..=40 {
2562 let mut this_round_blocks = Vec::new();
2563
2564 for core_fixture in all_cores.iter_mut() {
2565 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2566
2567 core_fixture
2568 .core
2569 .add_blocks(last_round_blocks.clone())
2570 .unwrap();
2571
2572 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2574 assert_eq!(round - 1, r);
2575 if core_fixture.core.last_proposed_round() == r {
2576 core_fixture
2578 .core
2579 .try_propose(true)
2580 .unwrap()
2581 .unwrap_or_else(|| {
2582 panic!("Block should have been proposed for round {}", round)
2583 });
2584 }
2585 }
2586
2587 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2588
2589 for block in this_round_blocks.iter() {
2590 if block.author() != AuthorityIndex::new_for_test(3) {
2591 assert_eq!(block.ancestors().len(), 3);
2594 } else {
2595 assert_eq!(block.ancestors().len(), 4);
2598 }
2599 }
2600 }
2601
2602 last_round_blocks = this_round_blocks;
2603 }
2604 }
2605
2606 #[tokio::test]
2607 async fn test_smart_ancestor_selection() {
2608 telemetry_subscribers::init_for_testing();
2609 let (context, mut key_pairs) = Context::new_for_test(7);
2610 let context = Arc::new(context.with_parameters(Parameters {
2611 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2612 ..Default::default()
2613 }));
2614
2615 let store = Arc::new(MemStore::new());
2616 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2617
2618 let block_manager = BlockManager::new(
2619 context.clone(),
2620 dag_state.clone(),
2621 Arc::new(NoopBlockVerifier),
2622 );
2623 let leader_schedule = Arc::new(
2624 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2625 .with_num_commits_per_schedule(10),
2626 );
2627
2628 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2629 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2630 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2631 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2633
2634 let (sender, _receiver) = unbounded_channel("consensus_output");
2635 let commit_consumer = CommitConsumer::new(sender, 0);
2636 let commit_observer = CommitObserver::new(
2637 context.clone(),
2638 commit_consumer,
2639 dag_state.clone(),
2640 store.clone(),
2641 leader_schedule.clone(),
2642 );
2643
2644 let mut core = Core::new(
2645 context.clone(),
2646 leader_schedule,
2647 transaction_consumer,
2648 block_manager,
2649 true,
2650 commit_observer,
2651 signals,
2652 key_pairs.remove(context.own_index.value()).1,
2653 dag_state.clone(),
2654 true,
2655 );
2656
2657 assert_eq!(
2659 core.last_proposed_round(),
2660 GENESIS_ROUND,
2661 "No block should have been created other than genesis"
2662 );
2663
2664 assert!(core.try_propose(true).unwrap().is_none());
2666
2667 let mut builder = DagBuilder::new(context.clone());
2669 builder
2670 .layers(1..=12)
2671 .authorities(vec![AuthorityIndex::new_for_test(1)])
2672 .skip_block()
2673 .build();
2674 let blocks = builder.blocks(1..=12);
2675 assert!(core.add_blocks(blocks).unwrap().is_empty());
2677 core.set_last_known_proposed_round(12);
2678
2679 let block = core.try_propose(true).expect("No error").unwrap();
2680 assert_eq!(block.round(), 13);
2681 assert_eq!(block.ancestors().len(), 7);
2682
2683 builder
2685 .layers(13..=14)
2686 .authorities(vec![AuthorityIndex::new_for_test(0)])
2687 .skip_block()
2688 .build();
2689 let blocks = builder.blocks(13..=14);
2690 assert!(core.add_blocks(blocks).unwrap().is_empty());
2691
2692 let block = core.try_propose(true).expect("No error").unwrap();
2696 assert_eq!(block.round(), 15);
2697 assert_eq!(block.ancestors().len(), 6);
2698
2699 builder
2702 .layer(15)
2703 .authorities(vec![
2704 AuthorityIndex::new_for_test(0),
2705 AuthorityIndex::new_for_test(5),
2706 AuthorityIndex::new_for_test(6),
2707 ])
2708 .skip_block()
2709 .build();
2710 let blocks = builder.blocks(15..=15);
2711 let authority_1_excluded_block_reference = blocks
2712 .iter()
2713 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2714 .unwrap()
2715 .reference();
2716 sleep(context.parameters.min_round_delay).await;
2718 assert!(core.add_blocks(blocks).unwrap().is_empty());
2720 assert_eq!(core.last_proposed_block().round(), 15);
2721
2722 builder
2723 .layer(15)
2724 .authorities(vec![
2725 AuthorityIndex::new_for_test(0),
2726 AuthorityIndex::new_for_test(1),
2727 AuthorityIndex::new_for_test(2),
2728 AuthorityIndex::new_for_test(3),
2729 AuthorityIndex::new_for_test(4),
2730 ])
2731 .skip_block()
2732 .build();
2733 let blocks = builder.blocks(15..=15);
2734 let included_block_references = iter::once(&core.last_proposed_block())
2735 .chain(blocks.iter())
2736 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2737 .map(|block| block.reference())
2738 .collect::<Vec<_>>();
2739
2740 assert!(core.add_blocks(blocks).unwrap().is_empty());
2742 assert_eq!(core.last_proposed_block().round(), 16);
2743
2744 let extended_block = loop {
2746 let extended_block =
2747 tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2748 .await
2749 .unwrap()
2750 .unwrap();
2751 if extended_block.block.round() == 16 {
2752 break extended_block;
2753 }
2754 };
2755 assert_eq!(extended_block.block.round(), 16);
2756 assert_eq!(extended_block.block.author(), core.context.own_index);
2757 assert_eq!(extended_block.block.ancestors().len(), 6);
2758 assert_eq!(extended_block.block.ancestors(), included_block_references);
2759 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2760 assert_eq!(
2761 extended_block.excluded_ancestors[0],
2762 authority_1_excluded_block_reference
2763 );
2764
2765 builder
2770 .layer(16)
2771 .authorities(vec![
2772 AuthorityIndex::new_for_test(0),
2773 AuthorityIndex::new_for_test(5),
2774 AuthorityIndex::new_for_test(6),
2775 ])
2776 .skip_block()
2777 .build();
2778 let blocks = builder.blocks(16..=16);
2779 sleep(context.parameters.min_round_delay).await;
2781 assert!(core.add_blocks(blocks).unwrap().is_empty());
2783 assert_eq!(core.last_proposed_block().round(), 16);
2784
2785 let block = core.try_propose(true).expect("No error").unwrap();
2788 assert_eq!(block.round(), 17);
2789 assert_eq!(block.ancestors().len(), 5);
2790
2791 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2793 .await
2794 .unwrap()
2795 .unwrap();
2796 assert_eq!(extended_block.block.round(), 17);
2797 assert_eq!(extended_block.block.author(), core.context.own_index);
2798 assert_eq!(extended_block.block.ancestors().len(), 5);
2799 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2800
2801 core.set_propagation_delay_and_quorum_rounds(
2805 0,
2806 vec![
2807 (16, 16),
2808 (16, 16),
2809 (16, 16),
2810 (16, 16),
2811 (16, 16),
2812 (16, 16),
2813 (16, 16),
2814 ],
2815 vec![
2816 (16, 16),
2817 (16, 16),
2818 (16, 16),
2819 (16, 16),
2820 (16, 16),
2821 (16, 16),
2822 (16, 16),
2823 ],
2824 );
2825
2826 builder
2827 .layer(17)
2828 .authorities(vec![AuthorityIndex::new_for_test(0)])
2829 .skip_block()
2830 .build();
2831 let blocks = builder.blocks(17..=17);
2832 let included_block_references = iter::once(&core.last_proposed_block())
2833 .chain(blocks.iter())
2834 .map(|block| block.reference())
2835 .collect::<Vec<_>>();
2836
2837 sleep(context.parameters.min_round_delay).await;
2839 assert!(core.add_blocks(blocks).unwrap().is_empty());
2840 assert_eq!(core.last_proposed_block().round(), 18);
2841
2842 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2844 .await
2845 .unwrap()
2846 .unwrap();
2847 assert_eq!(extended_block.block.round(), 18);
2848 assert_eq!(extended_block.block.author(), core.context.own_index);
2849 assert_eq!(extended_block.block.ancestors().len(), 7);
2850 assert_eq!(extended_block.block.ancestors(), included_block_references);
2851 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2852 }
2853
2854 #[tokio::test]
2855 async fn test_excluded_ancestor_limit() {
2856 telemetry_subscribers::init_for_testing();
2857 let (context, mut key_pairs) = Context::new_for_test(4);
2858 let context = Arc::new(context.with_parameters(Parameters {
2859 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2860 ..Default::default()
2861 }));
2862
2863 let store = Arc::new(MemStore::new());
2864 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2865
2866 let block_manager = BlockManager::new(
2867 context.clone(),
2868 dag_state.clone(),
2869 Arc::new(NoopBlockVerifier),
2870 );
2871 let leader_schedule = Arc::new(
2872 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2873 .with_num_commits_per_schedule(10),
2874 );
2875
2876 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2877 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2878 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2879 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2881
2882 let (sender, _receiver) = unbounded_channel("consensus_output");
2883 let commit_consumer = CommitConsumer::new(sender, 0);
2884 let commit_observer = CommitObserver::new(
2885 context.clone(),
2886 commit_consumer,
2887 dag_state.clone(),
2888 store.clone(),
2889 leader_schedule.clone(),
2890 );
2891
2892 let mut core = Core::new(
2893 context.clone(),
2894 leader_schedule,
2895 transaction_consumer,
2896 block_manager,
2897 true,
2898 commit_observer,
2899 signals,
2900 key_pairs.remove(context.own_index.value()).1,
2901 dag_state.clone(),
2902 true,
2903 );
2904
2905 assert_eq!(
2907 core.last_proposed_round(),
2908 GENESIS_ROUND,
2909 "No block should have been created other than genesis"
2910 );
2911
2912 let mut builder = DagBuilder::new(context.clone());
2914 builder.layers(1..=3).build();
2915
2916 builder
2920 .layer(4)
2921 .authorities(vec![AuthorityIndex::new_for_test(1)])
2922 .equivocate(9)
2923 .build();
2924 let blocks = builder.blocks(1..=4);
2925
2926 assert!(core.add_blocks(blocks).unwrap().is_empty());
2928 core.set_last_known_proposed_round(3);
2929
2930 let block = core.try_propose(true).expect("No error").unwrap();
2931 assert_eq!(block.round(), 5);
2932 assert_eq!(block.ancestors().len(), 4);
2933
2934 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2936 .await
2937 .unwrap()
2938 .unwrap();
2939 assert_eq!(extended_block.block.round(), 5);
2940 assert_eq!(extended_block.block.author(), core.context.own_index);
2941 assert_eq!(extended_block.block.ancestors().len(), 4);
2942 assert_eq!(extended_block.excluded_ancestors.len(), 8);
2943 }
2944
2945 #[tokio::test]
2946 async fn test_core_set_subscriber_exists() {
2947 telemetry_subscribers::init_for_testing();
2948 let (context, mut key_pairs) = Context::new_for_test(4);
2949 let context = Arc::new(context);
2950 let store = Arc::new(MemStore::new());
2951 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2952
2953 let block_manager = BlockManager::new(
2954 context.clone(),
2955 dag_state.clone(),
2956 Arc::new(NoopBlockVerifier),
2957 );
2958 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2959 context.clone(),
2960 dag_state.clone(),
2961 ));
2962
2963 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2964 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2965 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2966 let _block_receiver = signal_receivers.block_broadcast_receiver();
2968
2969 let (sender, _receiver) = unbounded_channel("consensus_output");
2970 let commit_observer = CommitObserver::new(
2971 context.clone(),
2972 CommitConsumer::new(sender.clone(), 0),
2973 dag_state.clone(),
2974 store.clone(),
2975 leader_schedule.clone(),
2976 );
2977
2978 let mut core = Core::new(
2979 context.clone(),
2980 leader_schedule,
2981 transaction_consumer,
2982 block_manager,
2983 false,
2985 commit_observer,
2986 signals,
2987 key_pairs.remove(context.own_index.value()).1,
2988 dag_state.clone(),
2989 false,
2990 );
2991
2992 assert_eq!(
2994 core.last_proposed_round(),
2995 GENESIS_ROUND,
2996 "No block should have been created other than genesis"
2997 );
2998
2999 assert!(core.try_propose(true).unwrap().is_none());
3001
3002 core.set_subscriber_exists(true);
3004
3005 assert!(core.try_propose(true).unwrap().is_some());
3007 }
3008
3009 #[tokio::test]
3010 async fn test_core_set_propagation_delay_per_authority() {
3011 telemetry_subscribers::init_for_testing();
3013 let (context, mut key_pairs) = Context::new_for_test(4);
3014 let context = Arc::new(context);
3015 let store = Arc::new(MemStore::new());
3016 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3017
3018 let block_manager = BlockManager::new(
3019 context.clone(),
3020 dag_state.clone(),
3021 Arc::new(NoopBlockVerifier),
3022 );
3023 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3024 context.clone(),
3025 dag_state.clone(),
3026 ));
3027
3028 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3029 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3030 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3031 let _block_receiver = signal_receivers.block_broadcast_receiver();
3033
3034 let (sender, _receiver) = unbounded_channel("consensus_output");
3035 let commit_observer = CommitObserver::new(
3036 context.clone(),
3037 CommitConsumer::new(sender.clone(), 0),
3038 dag_state.clone(),
3039 store.clone(),
3040 leader_schedule.clone(),
3041 );
3042
3043 let mut core = Core::new(
3044 context.clone(),
3045 leader_schedule,
3046 transaction_consumer,
3047 block_manager,
3048 false,
3050 commit_observer,
3051 signals,
3052 key_pairs.remove(context.own_index.value()).1,
3053 dag_state.clone(),
3054 false,
3055 );
3056
3057 assert_eq!(
3059 core.last_proposed_round(),
3060 GENESIS_ROUND,
3061 "No block should have been created other than genesis"
3062 );
3063
3064 core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);
3066
3067 core.set_subscriber_exists(true);
3069
3070 assert!(core.try_propose(true).unwrap().is_none());
3072
3073 core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);
3075
3076 assert!(core.try_propose(true).unwrap().is_some());
3078 }
3079
3080 #[tokio::test(flavor = "current_thread", start_paused = true)]
3081 async fn test_leader_schedule_change() {
3082 telemetry_subscribers::init_for_testing();
3083 let default_params = Parameters::default();
3084
3085 let (context, _) = Context::new_for_test(4);
3086 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3088
3089 let mut last_round_blocks = Vec::new();
3092 for round in 1..=30 {
3093 let mut this_round_blocks = Vec::new();
3094
3095 sleep(default_params.min_round_delay).await;
3097
3098 for core_fixture in &mut cores {
3099 core_fixture
3103 .core
3104 .add_blocks(last_round_blocks.clone())
3105 .unwrap();
3106
3107 let new_round = receive(
3110 Duration::from_secs(1),
3111 core_fixture.signal_receivers.new_round_receiver(),
3112 )
3113 .await;
3114 assert_eq!(new_round, round);
3115
3116 let extended_block = tokio::time::timeout(
3118 Duration::from_secs(1),
3119 core_fixture.block_receiver.recv(),
3120 )
3121 .await
3122 .unwrap()
3123 .unwrap();
3124 assert_eq!(extended_block.block.round(), round);
3125 assert_eq!(
3126 extended_block.block.author(),
3127 core_fixture.core.context.own_index
3128 );
3129
3130 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3132
3133 let block = core_fixture.core.last_proposed_block();
3134
3135 assert_eq!(
3137 block.ancestors().len(),
3138 core_fixture.core.context.committee.size()
3139 );
3140 for ancestor in block.ancestors() {
3141 if block.round() > 1 {
3142 assert!(
3144 last_round_blocks
3145 .iter()
3146 .any(|block| block.reference() == *ancestor),
3147 "Reference from previous round should be added"
3148 );
3149 }
3150 }
3151 }
3152
3153 last_round_blocks = this_round_blocks;
3154 }
3155
3156 for core_fixture in cores {
3157 let last_commit = core_fixture
3159 .store
3160 .read_last_commit()
3161 .unwrap()
3162 .expect("last commit should be set");
3163 assert_eq!(last_commit.index(), 27);
3167 let all_stored_commits = core_fixture
3168 .store
3169 .scan_commits((0..=CommitIndex::MAX).into())
3170 .unwrap();
3171 assert_eq!(all_stored_commits.len(), 27);
3172 assert_eq!(
3173 core_fixture
3174 .core
3175 .leader_schedule
3176 .leader_swap_table
3177 .read()
3178 .bad_nodes
3179 .len(),
3180 1
3181 );
3182 assert_eq!(
3183 core_fixture
3184 .core
3185 .leader_schedule
3186 .leader_swap_table
3187 .read()
3188 .good_nodes
3189 .len(),
3190 1
3191 );
3192 let expected_reputation_scores =
3193 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3194 assert_eq!(
3195 core_fixture
3196 .core
3197 .leader_schedule
3198 .leader_swap_table
3199 .read()
3200 .reputation_scores,
3201 expected_reputation_scores
3202 );
3203 }
3204 }
3205
3206 #[tokio::test(flavor = "current_thread", start_paused = true)]
3208 async fn test_leader_schedule_change_with_vote_scoring() {
3209 telemetry_subscribers::init_for_testing();
3210 let default_params = Parameters::default();
3211 let (mut context, _) = Context::new_for_test(4);
3212 context
3213 .protocol_config
3214 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3215 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3217 let mut last_round_blocks = Vec::new();
3220 for round in 1..=30 {
3221 let mut this_round_blocks = Vec::new();
3222 sleep(default_params.min_round_delay).await;
3224 for core_fixture in &mut cores {
3225 core_fixture
3229 .core
3230 .add_blocks(last_round_blocks.clone())
3231 .unwrap();
3232 let new_round = receive(
3235 Duration::from_secs(1),
3236 core_fixture.signal_receivers.new_round_receiver(),
3237 )
3238 .await;
3239 assert_eq!(new_round, round);
3240 let extended_block = tokio::time::timeout(
3242 Duration::from_secs(1),
3243 core_fixture.block_receiver.recv(),
3244 )
3245 .await
3246 .unwrap()
3247 .unwrap();
3248 assert_eq!(extended_block.block.round(), round);
3249 assert_eq!(
3250 extended_block.block.author(),
3251 core_fixture.core.context.own_index
3252 );
3253
3254 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3256 let block = core_fixture.core.last_proposed_block();
3257 assert_eq!(
3259 block.ancestors().len(),
3260 core_fixture.core.context.committee.size()
3261 );
3262 for ancestor in block.ancestors() {
3263 if block.round() > 1 {
3264 assert!(
3266 last_round_blocks
3267 .iter()
3268 .any(|block| block.reference() == *ancestor),
3269 "Reference from previous round should be added"
3270 );
3271 }
3272 }
3273 }
3274 last_round_blocks = this_round_blocks;
3275 }
3276 for core_fixture in cores {
3277 let last_commit = core_fixture
3279 .store
3280 .read_last_commit()
3281 .unwrap()
3282 .expect("last commit should be set");
3283 assert_eq!(last_commit.index(), 27);
3287 let all_stored_commits = core_fixture
3288 .store
3289 .scan_commits((0..=CommitIndex::MAX).into())
3290 .unwrap();
3291 assert_eq!(all_stored_commits.len(), 27);
3292 assert_eq!(
3293 core_fixture
3294 .core
3295 .leader_schedule
3296 .leader_swap_table
3297 .read()
3298 .bad_nodes
3299 .len(),
3300 1
3301 );
3302 assert_eq!(
3303 core_fixture
3304 .core
3305 .leader_schedule
3306 .leader_swap_table
3307 .read()
3308 .good_nodes
3309 .len(),
3310 1
3311 );
3312 let expected_reputation_scores =
3313 ReputationScores::new((11..=20).into(), vec![9, 8, 8, 8]);
3314 assert_eq!(
3315 core_fixture
3316 .core
3317 .leader_schedule
3318 .leader_swap_table
3319 .read()
3320 .reputation_scores,
3321 expected_reputation_scores
3322 );
3323 }
3324 }
3325
3326 #[tokio::test]
3327 async fn test_validate_certified_commits() {
3328 telemetry_subscribers::init_for_testing();
3329
3330 let (context, _key_pairs) = Context::new_for_test(4);
3331 let context = context.with_parameters(Parameters {
3332 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3333 ..Default::default()
3334 });
3335
3336 let authority_index = AuthorityIndex::new_for_test(0);
3337 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3338 let mut core = core.core;
3339
3340 assert_eq!(
3342 core.last_proposed_round(),
3343 GENESIS_ROUND,
3344 "No block should have been created other than genesis"
3345 );
3346
3347 let mut dag_builder = DagBuilder::new(core.context.clone());
3349 dag_builder.layers(1..=12).build();
3350
3351 dag_builder.print();
3354 let blocks = dag_builder.blocks(1..=6);
3355
3356 for block in blocks {
3357 core.dag_state.write().accept_block(block);
3358 }
3359
3360 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3362
3363 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3366
3367 assert_eq!(committed_sub_dags.len(), 4);
3369
3370 println!("Case 1. Provide certified commits that are all before the last committed round.");
3372
3373 let certified_commits = sub_dags_and_commits
3375 .iter()
3376 .take(4)
3377 .map(|(_, c)| c)
3378 .cloned()
3379 .collect::<Vec<_>>();
3380 assert!(
3381 certified_commits.last().unwrap().index()
3382 <= committed_sub_dags.last().unwrap().commit_ref.index,
3383 "Highest certified commit should older than the highest committed index."
3384 );
3385
3386 let certified_commits = core.validate_certified_commits(certified_commits).unwrap();
3387
3388 assert!(certified_commits.is_empty());
3390
3391 println!("Case 2. Provide certified commits that are all after the last committed round.");
3392
3393 let certified_commits = sub_dags_and_commits
3395 .iter()
3396 .take(5)
3397 .map(|(_, c)| c.clone())
3398 .collect::<Vec<_>>();
3399
3400 let certified_commits = core
3401 .validate_certified_commits(certified_commits.clone())
3402 .unwrap();
3403
3404 assert_eq!(certified_commits.len(), 1);
3406 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3407
3408 println!(
3409 "Case 3. Provide certified commits where the first certified commit index is not the last_committed_index + 1."
3410 );
3411
3412 let certified_commits = sub_dags_and_commits
3414 .iter()
3415 .skip(5)
3416 .take(1)
3417 .map(|(_, c)| c.clone())
3418 .collect::<Vec<_>>();
3419
3420 let err = core
3421 .validate_certified_commits(certified_commits.clone())
3422 .unwrap_err();
3423 match err {
3424 ConsensusError::UnexpectedCertifiedCommitIndex {
3425 expected_commit_index: 5,
3426 commit_index: 6,
3427 } => (),
3428 _ => panic!("Unexpected error: {:?}", err),
3429 }
3430 }
3431
3432 #[tokio::test]
3433 async fn test_add_certified_commits() {
3434 telemetry_subscribers::init_for_testing();
3435
3436 let (context, _key_pairs) = Context::new_for_test(4);
3437 let context = context.with_parameters(Parameters {
3438 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3439 ..Default::default()
3440 });
3441
3442 let authority_index = AuthorityIndex::new_for_test(0);
3443 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3444 let store = core.store.clone();
3445 let mut core = core.core;
3446
3447 assert_eq!(
3449 core.last_proposed_round(),
3450 GENESIS_ROUND,
3451 "No block should have been created other than genesis"
3452 );
3453
3454 let mut dag_builder = DagBuilder::new(core.context.clone());
3456 dag_builder.layers(1..=12).build();
3457
3458 dag_builder.print();
3461 let blocks = dag_builder.blocks(1..=6);
3462
3463 for block in blocks {
3464 core.dag_state.write().accept_block(block);
3465 }
3466
3467 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3469
3470 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3473
3474 assert_eq!(committed_sub_dags.len(), 4);
3476
3477 let last_commit = store
3478 .read_last_commit()
3479 .unwrap()
3480 .expect("Last commit should be set");
3481 assert_eq!(last_commit.reference().index, 4);
3482
3483 println!("Case 1. Provide no certified commits. No commit should happen.");
3484
3485 let last_commit = store
3486 .read_last_commit()
3487 .unwrap()
3488 .expect("Last commit should be set");
3489 assert_eq!(last_commit.reference().index, 4);
3490
3491 println!(
3492 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3493 );
3494
3495 let certified_commits = sub_dags_and_commits
3498 .iter()
3499 .skip(3)
3500 .take(5)
3501 .map(|(_, c)| c.clone())
3502 .collect::<Vec<_>>();
3503
3504 let blocks = dag_builder.blocks(8..=12);
3507 for block in blocks {
3508 core.dag_state.write().accept_block(block);
3509 }
3510
3511 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3514 .expect("Should not fail");
3515
3516 let commits = store.scan_commits((6..=10).into()).unwrap();
3517
3518 assert_eq!(commits.len(), 5);
3520
3521 for i in 6..=10 {
3522 let commit = &commits[i - 6];
3523 assert_eq!(commit.reference().index, i as u32);
3524 }
3525 }
3526
3527 #[tokio::test]
3528 async fn try_commit_with_certified_commits_gced_blocks() {
3529 const GC_DEPTH: u32 = 3;
3530 telemetry_subscribers::init_for_testing();
3531
3532 let (mut context, mut key_pairs) = Context::new_for_test(5);
3533 context
3534 .protocol_config
3535 .set_consensus_gc_depth_for_testing(GC_DEPTH);
3536 let context = Arc::new(context.with_parameters(Parameters {
3539 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3540 ..Default::default()
3541 }));
3542
3543 let store = Arc::new(MemStore::new());
3544 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3545
3546 let block_manager = BlockManager::new(
3547 context.clone(),
3548 dag_state.clone(),
3549 Arc::new(NoopBlockVerifier),
3550 );
3551 let leader_schedule = Arc::new(
3552 LeaderSchedule::from_store(context.clone(), dag_state.clone())
3553 .with_num_commits_per_schedule(10),
3554 );
3555
3556 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3557 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3558 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3559 let _block_receiver = signal_receivers.block_broadcast_receiver();
3561
3562 let (sender, _receiver) = unbounded_channel("consensus_output");
3563 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
3564 let commit_observer = CommitObserver::new(
3565 context.clone(),
3566 commit_consumer,
3567 dag_state.clone(),
3568 store.clone(),
3569 leader_schedule.clone(),
3570 );
3571
3572 let mut core = Core::new(
3573 context.clone(),
3574 leader_schedule,
3575 transaction_consumer,
3576 block_manager,
3577 true,
3578 commit_observer,
3579 signals,
3580 key_pairs.remove(context.own_index.value()).1,
3581 dag_state.clone(),
3582 true,
3583 );
3584
3585 assert_eq!(
3587 core.last_proposed_round(),
3588 GENESIS_ROUND,
3589 "No block should have been created other than genesis"
3590 );
3591
3592 let dag_str = "DAG {
3593 Round 0 : { 5 },
3594 Round 1 : { * },
3595 Round 2 : {
3596 A -> [-E1],
3597 B -> [-E1],
3598 C -> [-E1],
3599 D -> [-E1],
3600 },
3601 Round 3 : {
3602 A -> [*],
3603 B -> [*],
3604 C -> [*],
3605 D -> [*],
3606 },
3607 Round 4 : {
3608 A -> [*],
3609 B -> [*],
3610 C -> [*],
3611 D -> [*],
3612 },
3613 Round 5 : {
3614 A -> [*],
3615 B -> [*],
3616 C -> [*],
3617 D -> [*],
3618 E -> [A4, B4, C4, D4, E1]
3619 },
3620 Round 6 : { * },
3621 Round 7 : { * },
3622 }";
3623
3624 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3625 dag_builder.print();
3626
3627 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3629 .get_sub_dag_and_certified_commits(1..=5)
3630 .into_iter()
3631 .unzip();
3632
3633 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3637
3638 assert_eq!(committed_sub_dags.len(), 4);
3640 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3641 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3642
3643 for block in committed_sub_dag.blocks.iter() {
3645 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3646 panic!("Did not expect to commit block E1");
3647 }
3648 }
3649 }
3650 }
3651
3652 #[tokio::test(flavor = "current_thread", start_paused = true)]
3653 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3654 telemetry_subscribers::init_for_testing();
3655 let default_params = Parameters::default();
3656
3657 let (context, _) = Context::new_for_test(6);
3658
3659 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3661
3662 let mut last_round_blocks = Vec::new();
3665 for round in 1..=33 {
3666 let mut this_round_blocks = Vec::new();
3667 sleep(default_params.min_round_delay).await;
3669 for core_fixture in &mut cores {
3670 core_fixture
3674 .core
3675 .add_blocks(last_round_blocks.clone())
3676 .unwrap();
3677 let new_round = receive(
3680 Duration::from_secs(1),
3681 core_fixture.signal_receivers.new_round_receiver(),
3682 )
3683 .await;
3684 assert_eq!(new_round, round);
3685 let extended_block = tokio::time::timeout(
3687 Duration::from_secs(1),
3688 core_fixture.block_receiver.recv(),
3689 )
3690 .await
3691 .unwrap()
3692 .unwrap();
3693 assert_eq!(extended_block.block.round(), round);
3694 assert_eq!(
3695 extended_block.block.author(),
3696 core_fixture.core.context.own_index
3697 );
3698
3699 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3701 let block = core_fixture.core.last_proposed_block();
3702 assert_eq!(
3704 block.ancestors().len(),
3705 core_fixture.core.context.committee.size()
3706 );
3707 for ancestor in block.ancestors() {
3708 if block.round() > 1 {
3709 assert!(
3711 last_round_blocks
3712 .iter()
3713 .any(|block| block.reference() == *ancestor),
3714 "Reference from previous round should be added"
3715 );
3716 }
3717 }
3718 }
3719 last_round_blocks = this_round_blocks;
3720 }
3721 for core_fixture in cores {
3722 let last_commit = core_fixture
3724 .store
3725 .read_last_commit()
3726 .unwrap()
3727 .expect("last commit should be set");
3728 let expected_commit_count = 30;
3740 assert_eq!(last_commit.index(), expected_commit_count);
3746 let all_stored_commits = core_fixture
3747 .store
3748 .scan_commits((0..=CommitIndex::MAX).into())
3749 .unwrap();
3750 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3751 assert_eq!(
3752 core_fixture
3753 .core
3754 .leader_schedule
3755 .leader_swap_table
3756 .read()
3757 .bad_nodes
3758 .len(),
3759 1
3760 );
3761 assert_eq!(
3762 core_fixture
3763 .core
3764 .leader_schedule
3765 .leader_swap_table
3766 .read()
3767 .good_nodes
3768 .len(),
3769 1
3770 );
3771 let expected_reputation_scores =
3772 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3773 assert_eq!(
3774 core_fixture
3775 .core
3776 .leader_schedule
3777 .leader_swap_table
3778 .read()
3779 .reputation_scores,
3780 expected_reputation_scores
3781 );
3782 }
3783 }
3784
3785 #[tokio::test(flavor = "current_thread", start_paused = true)]
3787 async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring()
3788 {
3789 telemetry_subscribers::init_for_testing();
3790 let default_params = Parameters::default();
3791
3792 let (mut context, _) = Context::new_for_test(6);
3793 context
3794 .protocol_config
3795 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3796
3797 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3799 let mut last_round_blocks = Vec::new();
3802 for round in 1..=63 {
3803 let mut this_round_blocks = Vec::new();
3804
3805 sleep(default_params.min_round_delay).await;
3807
3808 for core_fixture in &mut cores {
3809 core_fixture
3813 .core
3814 .add_blocks(last_round_blocks.clone())
3815 .unwrap();
3816
3817 let new_round = receive(
3820 Duration::from_secs(1),
3821 core_fixture.signal_receivers.new_round_receiver(),
3822 )
3823 .await;
3824 assert_eq!(new_round, round);
3825
3826 let extended_block = tokio::time::timeout(
3828 Duration::from_secs(1),
3829 core_fixture.block_receiver.recv(),
3830 )
3831 .await
3832 .unwrap()
3833 .unwrap();
3834 assert_eq!(extended_block.block.round(), round);
3835 assert_eq!(
3836 extended_block.block.author(),
3837 core_fixture.core.context.own_index
3838 );
3839
3840 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3842
3843 let block = core_fixture.core.last_proposed_block();
3844
3845 assert_eq!(
3847 block.ancestors().len(),
3848 core_fixture.core.context.committee.size()
3849 );
3850 for ancestor in block.ancestors() {
3851 if block.round() > 1 {
3852 assert!(
3854 last_round_blocks
3855 .iter()
3856 .any(|block| block.reference() == *ancestor),
3857 "Reference from previous round should be added"
3858 );
3859 }
3860 }
3861 }
3862
3863 last_round_blocks = this_round_blocks;
3864 }
3865
3866 for core_fixture in cores {
3867 let last_commit = core_fixture
3869 .store
3870 .read_last_commit()
3871 .unwrap()
3872 .expect("last commit should be set");
3873 let expected_commit_count = 60;
3885 assert_eq!(last_commit.index(), expected_commit_count);
3891 let all_stored_commits = core_fixture
3892 .store
3893 .scan_commits((0..=CommitIndex::MAX).into())
3894 .unwrap();
3895 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3896 assert_eq!(
3897 core_fixture
3898 .core
3899 .leader_schedule
3900 .leader_swap_table
3901 .read()
3902 .bad_nodes
3903 .len(),
3904 1
3905 );
3906 assert_eq!(
3907 core_fixture
3908 .core
3909 .leader_schedule
3910 .leader_swap_table
3911 .read()
3912 .good_nodes
3913 .len(),
3914 1
3915 );
3916 let expected_reputation_scores =
3917 ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
3918 assert_eq!(
3919 core_fixture
3920 .core
3921 .leader_schedule
3922 .leader_swap_table
3923 .read()
3924 .reputation_scores,
3925 expected_reputation_scores
3926 );
3927 }
3928 }
3929
3930 #[tokio::test]
3931 async fn test_core_signals() {
3932 telemetry_subscribers::init_for_testing();
3933 let default_params = Parameters::default();
3934
3935 let (context, _) = Context::new_for_test(4);
3936 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3938
3939 let mut last_round_blocks = Vec::new();
3942 for round in 1..=10 {
3943 let mut this_round_blocks = Vec::new();
3944
3945 sleep(default_params.min_round_delay).await;
3947
3948 for core_fixture in &mut cores {
3949 core_fixture
3953 .core
3954 .add_blocks(last_round_blocks.clone())
3955 .unwrap();
3956
3957 let new_round = receive(
3960 Duration::from_secs(1),
3961 core_fixture.signal_receivers.new_round_receiver(),
3962 )
3963 .await;
3964 assert_eq!(new_round, round);
3965
3966 let extended_block = tokio::time::timeout(
3968 Duration::from_secs(1),
3969 core_fixture.block_receiver.recv(),
3970 )
3971 .await
3972 .unwrap()
3973 .unwrap();
3974 assert_eq!(extended_block.block.round(), round);
3975 assert_eq!(
3976 extended_block.block.author(),
3977 core_fixture.core.context.own_index
3978 );
3979
3980 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3982
3983 let block = core_fixture.core.last_proposed_block();
3984
3985 assert_eq!(
3987 block.ancestors().len(),
3988 core_fixture.core.context.committee.size()
3989 );
3990 for ancestor in block.ancestors() {
3991 if block.round() > 1 {
3992 assert!(
3994 last_round_blocks
3995 .iter()
3996 .any(|block| block.reference() == *ancestor),
3997 "Reference from previous round should be added"
3998 );
3999 }
4000 }
4001 }
4002
4003 last_round_blocks = this_round_blocks;
4004 }
4005
4006 for core_fixture in cores {
4007 let last_commit = core_fixture
4009 .store
4010 .read_last_commit()
4011 .unwrap()
4012 .expect("last commit should be set");
4013 assert_eq!(last_commit.index(), 7);
4017 let all_stored_commits = core_fixture
4018 .store
4019 .scan_commits((0..=CommitIndex::MAX).into())
4020 .unwrap();
4021 assert_eq!(all_stored_commits.len(), 7);
4022 }
4023 }
4024
4025 #[tokio::test]
4026 async fn test_core_compress_proposal_references() {
4027 telemetry_subscribers::init_for_testing();
4028 let default_params = Parameters::default();
4029
4030 let (context, _) = Context::new_for_test(4);
4031 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4033
4034 let mut last_round_blocks = Vec::new();
4035 let mut all_blocks = Vec::new();
4036
4037 let excluded_authority = AuthorityIndex::new_for_test(3);
4038
4039 for round in 1..=10 {
4040 let mut this_round_blocks = Vec::new();
4041
4042 for core_fixture in &mut cores {
4043 if core_fixture.core.context.own_index == excluded_authority {
4045 continue;
4046 }
4047
4048 core_fixture
4051 .core
4052 .add_blocks(last_round_blocks.clone())
4053 .unwrap();
4054 core_fixture.core.new_block(round, true).unwrap();
4055
4056 let block = core_fixture.core.last_proposed_block();
4057 assert_eq!(block.round(), round);
4058
4059 this_round_blocks.push(block.clone());
4061 }
4062
4063 last_round_blocks = this_round_blocks.clone();
4064 all_blocks.extend(this_round_blocks);
4065 }
4066
4067 let core_fixture = &mut cores[excluded_authority];
4073 sleep(default_params.min_round_delay).await;
4075 core_fixture.core.add_blocks(all_blocks).unwrap();
4077
4078 let block = core_fixture.core.last_proposed_block();
4082 assert_eq!(block.round(), 11);
4083 assert_eq!(block.ancestors().len(), 4);
4084 for block_ref in block.ancestors() {
4085 if block_ref.author == excluded_authority {
4086 assert_eq!(block_ref.round, 1);
4087 } else {
4088 assert_eq!(block_ref.round, 10);
4089 }
4090 }
4091
4092 let last_commit = core_fixture
4094 .store
4095 .read_last_commit()
4096 .unwrap()
4097 .expect("last commit should be set");
4098 assert_eq!(last_commit.index(), 6);
4102 let all_stored_commits = core_fixture
4103 .store
4104 .scan_commits((0..=CommitIndex::MAX).into())
4105 .unwrap();
4106 assert_eq!(all_stored_commits.len(), 6);
4107 }
4108
4109 #[tokio::test]
4110 async fn try_decide_certified() {
4111 telemetry_subscribers::init_for_testing();
4113
4114 let (context, _) = Context::new_for_test(4);
4115
4116 let authority_index = AuthorityIndex::new_for_test(0);
4117 let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
4118 let mut core = core.core;
4119
4120 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4121 dag_builder.layers(1..=12).build();
4122
4123 let limit = 2;
4124
4125 let blocks = dag_builder.blocks(1..=12);
4126
4127 for block in blocks {
4128 core.dag_state.write().accept_block(block);
4129 }
4130
4131 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4133 let mut certified_commits = sub_dags_and_commits
4134 .into_iter()
4135 .map(|(_, commit)| commit)
4136 .collect::<Vec<_>>();
4137
4138 let leaders = core.try_decide_certified(&mut certified_commits, limit);
4139
4140 assert_eq!(leaders.len(), 2);
4142 assert_eq!(certified_commits.len(), 2);
4143 }
4144
4145 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4146 tokio::time::timeout(timeout, receiver.changed())
4147 .await
4148 .expect("Timeout while waiting to read from receiver")
4149 .expect("Signal receive channel shouldn't be closed");
4150 *receiver.borrow_and_update()
4151 }
4152}