1use std::{
6 collections::{BTreeMap, BTreeSet},
7 iter, mem,
8 sync::Arc,
9 time::Duration,
10 vec,
11};
12
13use consensus_config::{AuthorityIndex, ProtocolKeyPair};
14#[cfg(test)]
15use consensus_config::{Stake, local_committee_and_keys};
16use iota_macros::fail_point;
17#[cfg(test)]
18use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
19use iota_metrics::monitored_scope;
20use itertools::Itertools as _;
21use parking_lot::RwLock;
22use tokio::{
23 sync::{broadcast, watch},
24 time::Instant,
25};
26use tracing::{debug, info, instrument, trace, warn};
27
28#[cfg(test)]
29use crate::{
30 CommitConsumer, TransactionClient, block_verifier::NoopBlockVerifier,
31 storage::mem_store::MemStore,
32};
33use crate::{
34 ancestor::{AncestorState, AncestorStateManager},
35 block::{
36 Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, GENESIS_ROUND, Round,
37 SignedBlock, Slot, VerifiedBlock,
38 },
39 block_manager::BlockManager,
40 commit::{
41 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42 },
43 commit_observer::CommitObserver,
44 context::Context,
45 dag_state::DagState,
46 error::{ConsensusError, ConsensusResult},
47 leader_schedule::LeaderSchedule,
48 round_prober::QuorumRound,
49 stake_aggregator::{QuorumThreshold, StakeAggregator},
50 transaction::TransactionConsumer,
51 universal_committer::{
52 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
53 },
54};
55
56const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
59
60pub(crate) struct Core {
61 context: Arc<Context>,
62 transaction_consumer: TransactionConsumer,
65 block_manager: BlockManager,
69 quorum_subscribers_exists: bool,
74 propagation_delay: Round,
82
83 committer: UniversalCommitter,
85 last_signaled_round: Round,
87 last_included_ancestors: Vec<Option<BlockRef>>,
92 last_decided_leader: Slot,
99 leader_schedule: Arc<LeaderSchedule>,
102 commit_observer: CommitObserver,
106 signals: CoreSignals,
108 block_signer: ProtocolKeyPair,
110 dag_state: Arc<RwLock<DagState>>,
113 last_known_proposed_round: Option<Round>,
119 ancestor_state_manager: AncestorStateManager,
124}
125
126impl Core {
127 pub(crate) fn new(
128 context: Arc<Context>,
129 leader_schedule: Arc<LeaderSchedule>,
130 transaction_consumer: TransactionConsumer,
131 block_manager: BlockManager,
132 subscriber_exists: bool,
133 commit_observer: CommitObserver,
134 signals: CoreSignals,
135 block_signer: ProtocolKeyPair,
136 dag_state: Arc<RwLock<DagState>>,
137 sync_last_known_own_block: bool,
138 ) -> Self {
139 let last_decided_leader = dag_state.read().last_commit_leader();
140 let committer = UniversalCommitterBuilder::new(
141 context.clone(),
142 leader_schedule.clone(),
143 dag_state.clone(),
144 )
145 .with_number_of_leaders(1)
148 .with_pipeline(true)
149 .build();
150
151 let last_proposed_block = dag_state.read().get_last_proposed_block();
153
154 let last_signaled_round = last_proposed_block.round();
155
156 let mut last_included_ancestors = vec![None; context.committee.size()];
167 for ancestor in last_proposed_block.ancestors() {
168 last_included_ancestors[ancestor.author] = Some(*ancestor);
169 }
170
171 let min_propose_round = if sync_last_known_own_block {
172 None
173 } else {
174 Some(0)
177 };
178
179 let propagation_scores = leader_schedule
180 .leader_swap_table
181 .read()
182 .reputation_scores
183 .clone();
184 let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
185 ancestor_state_manager.set_propagation_scores(propagation_scores);
186
187 Self {
188 context,
189 last_signaled_round,
190 last_included_ancestors,
191 last_decided_leader,
192 leader_schedule,
193 transaction_consumer,
194 block_manager,
195 quorum_subscribers_exists: subscriber_exists,
196 propagation_delay: 0,
197 committer,
198 commit_observer,
199 signals,
200 block_signer,
201 dag_state,
202 last_known_proposed_round: min_propose_round,
203 ancestor_state_manager,
204 }
205 .recover()
206 }
207
208 fn recover(mut self) -> Self {
209 let _s = self
210 .context
211 .metrics
212 .node_metrics
213 .scope_processing_time
214 .with_label_values(&["Core::recover"])
215 .start_timer();
216 let ancestor_blocks = self
218 .dag_state
219 .read()
220 .get_last_cached_block_per_authority(Round::MAX);
221 let max_ancestor_timestamp = ancestor_blocks
222 .iter()
223 .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms()));
224 let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms());
225
226 if self
227 .context
228 .protocol_config
229 .consensus_median_timestamp_with_checkpoint_enforcement()
230 {
231 info!(
232 "Median based timestamp is enabled. Will not wait for {} ms while recovering ancestors from storage",
233 wait_ms
234 );
235 } else if wait_ms > 0 {
236 warn!(
237 "Waiting for {} ms while recovering ancestors from storage",
238 wait_ms
239 );
240 std::thread::sleep(Duration::from_millis(wait_ms));
241 }
242
243 self.try_commit(vec![]).unwrap();
246 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
247 {
248 last_proposed_block
249 } else {
250 let last_proposed_block = self.dag_state.read().get_last_proposed_block();
251 if self.should_propose() {
252 assert!(
253 last_proposed_block.round() > GENESIS_ROUND,
254 "At minimum a block of round higher than genesis should have been produced during recovery"
255 );
256 }
257
258 self.signals
261 .new_block(ExtendedBlock {
262 block: last_proposed_block.clone(),
263 excluded_ancestors: vec![],
264 })
265 .unwrap();
266 last_proposed_block
267 };
268
269 self.try_signal_new_round();
273
274 info!(
275 "Core recovery completed with last proposed block {:?}",
276 last_proposed_block
277 );
278
279 self
280 }
281
282 #[tracing::instrument("consensus_add_blocks", skip_all)]
286 pub(crate) fn add_blocks(
287 &mut self,
288 blocks: Vec<VerifiedBlock>,
289 ) -> ConsensusResult<BTreeSet<BlockRef>> {
290 let _scope = monitored_scope("Core::add_blocks");
291 let _s = self
292 .context
293 .metrics
294 .node_metrics
295 .scope_processing_time
296 .with_label_values(&["Core::add_blocks"])
297 .start_timer();
298 self.context
299 .metrics
300 .node_metrics
301 .core_add_blocks_batch_size
302 .observe(blocks.len() as f64);
303
304 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
305
306 if !accepted_blocks.is_empty() {
307 debug!(
308 "Accepted blocks: {}",
309 accepted_blocks
310 .iter()
311 .map(|b| b.reference().to_string())
312 .join(",")
313 );
314
315 self.try_commit(vec![])?;
317
318 self.try_propose(false)?;
320
321 self.try_signal_new_round();
325 }
326
327 if !missing_block_refs.is_empty() {
328 trace!(
329 "Missing block refs: {}",
330 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
331 );
332 }
333 Ok(missing_block_refs)
334 }
335
336 pub(crate) fn check_block_refs(
340 &mut self,
341 block_refs: Vec<BlockRef>,
342 ) -> ConsensusResult<BTreeSet<BlockRef>> {
343 let _scope = monitored_scope("Core::check_block_refs");
344 let _s = self
345 .context
346 .metrics
347 .node_metrics
348 .scope_processing_time
349 .with_label_values(&["Core::check_block_refs"])
350 .start_timer();
351 self.context
352 .metrics
353 .node_metrics
354 .core_check_block_refs_batch_size
355 .observe(block_refs.len() as f64);
356
357 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
359
360 if !missing_block_refs.is_empty() {
361 trace!(
362 "Missing block refs: {}",
363 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
364 );
365 }
366 Ok(missing_block_refs)
367 }
368
369 #[tracing::instrument(skip_all)]
377 pub(crate) fn add_certified_commits(
378 &mut self,
379 certified_commits: CertifiedCommits,
380 ) -> ConsensusResult<BTreeSet<BlockRef>> {
381 let _scope = monitored_scope("Core::add_certified_commits");
382
383 if self.dag_state.read().gc_enabled() {
385 let votes = certified_commits.votes().to_vec();
386 let commits = self
387 .validate_certified_commits(certified_commits.commits().to_vec())
388 .expect("Certified commits validation failed");
389
390 self.block_manager.try_accept_blocks(votes);
394
395 self.try_commit(commits)?;
398
399 self.try_propose(false)?;
401
402 self.try_signal_new_round();
406
407 return Ok(BTreeSet::new());
408 }
409
410 let blocks = certified_commits
412 .commits()
413 .iter()
414 .flat_map(|commit| commit.blocks())
415 .cloned()
416 .collect::<Vec<_>>();
417
418 self.add_blocks(blocks)
419 }
420
421 fn try_signal_new_round(&mut self) {
423 let new_clock_round = self.dag_state.read().threshold_clock_round();
429 if new_clock_round <= self.last_signaled_round {
430 return;
431 }
432 tracing::trace!(round = ?new_clock_round, "new_consensus_round_sent");
434 self.signals.new_round(new_clock_round);
435 self.last_signaled_round = new_clock_round;
436
437 self.context
439 .metrics
440 .node_metrics
441 .threshold_clock_round
442 .set(new_clock_round as i64);
443 }
444
445 pub(crate) fn new_block(
450 &mut self,
451 round: Round,
452 force: bool,
453 ) -> ConsensusResult<Option<VerifiedBlock>> {
454 let _scope = monitored_scope("Core::new_block");
455 if self.last_proposed_round() < round {
456 self.context
457 .metrics
458 .node_metrics
459 .leader_timeout_total
460 .with_label_values(&[&format!("{force}")])
461 .inc();
462 let result = self.try_propose(force);
463 self.try_signal_new_round();
465 return result;
466 }
467 Ok(None)
468 }
469
470 fn validate_certified_commits(
474 &mut self,
475 commits: Vec<CertifiedCommit>,
476 ) -> ConsensusResult<Vec<CertifiedCommit>> {
477 let last_commit_index = self.dag_state.read().last_commit_index();
480 let commits = commits
481 .iter()
482 .filter(|commit| {
483 if commit.index() > last_commit_index {
484 true
485 } else {
486 tracing::debug!(
487 "Skip commit for index {} as it is already committed with last commit index {}",
488 commit.index(),
489 last_commit_index
490 );
491 false
492 }
493 })
494 .cloned()
495 .collect::<Vec<_>>();
496
497 if let Some(commit) = commits.first() {
500 if commit.index() != last_commit_index + 1 {
501 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
502 expected_commit_index: last_commit_index + 1,
503 commit_index: commit.index(),
504 });
505 }
506 }
507
508 Ok(commits)
509 }
510
511 #[instrument(level = "trace", skip_all)]
515 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
516 if !self.should_propose() {
517 return Ok(None);
518 }
519 if let Some(extended_block) = self.try_new_block(force) {
520 self.signals.new_block(extended_block.clone())?;
521
522 fail_point!("consensus-after-propose");
523
524 self.try_commit(vec![])?;
526 return Ok(Some(extended_block.block));
527 }
528 Ok(None)
529 }
530
531 #[instrument(level = "trace", skip_all)]
535 fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
536 let _s = self
537 .context
538 .metrics
539 .node_metrics
540 .scope_processing_time
541 .with_label_values(&["Core::try_new_block"])
542 .start_timer();
543
544 let clock_round = {
546 let dag_state = self.dag_state.read();
547 let clock_round = dag_state.threshold_clock_round();
548 if clock_round <= dag_state.get_last_proposed_block().round() {
549 return None;
550 }
551 clock_round
552 };
553
554 let quorum_round = clock_round.saturating_sub(1);
556
557 if !force {
561 if !self.leaders_exist(quorum_round) {
562 return None;
563 }
564
565 if Duration::from_millis(
566 self.context
567 .clock
568 .timestamp_utc_ms()
569 .saturating_sub(self.last_proposed_timestamp_ms()),
570 ) < self.context.parameters.min_round_delay
571 {
572 return None;
573 }
574 }
575
576 let (ancestors, excluded_ancestors) = if self
579 .context
580 .protocol_config
581 .consensus_distributed_vote_scoring_strategy()
582 && self
583 .context
584 .protocol_config
585 .consensus_smart_ancestor_selection()
586 {
587 let (ancestors, excluded_and_equivocating_ancestors) =
588 self.smart_ancestors_to_propose(clock_round, !force);
589
590 if ancestors.is_empty() {
593 assert!(
594 !force,
595 "Ancestors should have been returned if force is true!"
596 );
597 return None;
598 }
599
600 let excluded_ancestors_limit = self.context.committee.size() * 2;
601 if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
602 debug!(
603 "Dropping {} excluded ancestor(s) during proposal due to size limit",
604 excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
605 );
606 }
607 let excluded_ancestors = excluded_and_equivocating_ancestors
608 .into_iter()
609 .take(excluded_ancestors_limit)
610 .collect();
611
612 (ancestors, excluded_ancestors)
613 } else {
614 (self.ancestors_to_propose(clock_round), vec![])
615 };
616
617 for ancestor in &ancestors {
619 self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
620 }
621
622 let leader_authority = &self
623 .context
624 .committee
625 .authority(self.first_leader(quorum_round))
626 .hostname;
627 self.context
628 .metrics
629 .node_metrics
630 .block_proposal_leader_wait_ms
631 .with_label_values(&[leader_authority])
632 .inc_by(
633 Instant::now()
634 .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
635 .as_millis() as u64,
636 );
637 self.context
638 .metrics
639 .node_metrics
640 .block_proposal_leader_wait_count
641 .with_label_values(&[leader_authority])
642 .inc();
643
644 self.context
645 .metrics
646 .node_metrics
647 .proposed_block_ancestors
648 .observe(ancestors.len() as f64);
649 for ancestor in &ancestors {
650 let authority = &self.context.committee.authority(ancestor.author()).hostname;
651 self.context
652 .metrics
653 .node_metrics
654 .proposed_block_ancestors_depth
655 .with_label_values(&[authority])
656 .observe(clock_round.saturating_sub(ancestor.round()).into());
657 }
658
659 let now = self.context.clock.timestamp_utc_ms();
660 ancestors.iter().for_each(|block| {
661 if self.context.protocol_config.consensus_median_timestamp_with_checkpoint_enforcement() {
662 if block.timestamp_ms() > now {
663 trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
664 let authority = &self.context.committee.authority(block.author()).hostname;
665 self.context
666 .metrics
667 .node_metrics
668 .proposed_block_ancestors_timestamp_drift_ms
669 .with_label_values(&[authority])
670 .inc_by(block.timestamp_ms().saturating_sub(now));
671 }
672 } else {
673 assert!(
676 block.timestamp_ms() <= now,
677 "Violation: ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.",
678 block, block.timestamp_ms(), clock_round
679 );
680 }
681 });
682
683 let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
687 self.context
688 .metrics
689 .node_metrics
690 .proposed_block_transactions
691 .observe(transactions.len() as f64);
692
693 let commit_votes = self
695 .dag_state
696 .write()
697 .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
698
699 let block = Block::V1(BlockV1::new(
701 self.context.committee.epoch(),
702 clock_round,
703 self.context.own_index,
704 now,
705 ancestors.iter().map(|b| b.reference()).collect(),
706 transactions,
707 commit_votes,
708 vec![],
709 ));
710 let signed_block =
711 SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
712 let serialized = signed_block
713 .serialize()
714 .expect("Block serialization failed.");
715 self.context
716 .metrics
717 .node_metrics
718 .proposed_block_size
719 .observe(serialized.len() as f64);
720 let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
722
723 let last_proposed_block = self.last_proposed_block();
725 if last_proposed_block.round() > 0 {
726 self.context
727 .metrics
728 .node_metrics
729 .block_proposal_interval
730 .observe(
731 Duration::from_millis(
732 verified_block
733 .timestamp_ms()
734 .saturating_sub(last_proposed_block.timestamp_ms()),
735 )
736 .as_secs_f64(),
737 );
738 }
739
740 let (accepted_blocks, missing) = self
742 .block_manager
743 .try_accept_blocks(vec![verified_block.clone()]);
744 assert_eq!(accepted_blocks.len(), 1);
745 assert!(missing.is_empty());
746
747 self.dag_state.write().flush();
749
750 ack_transactions(verified_block.reference());
752
753 info!("Created block {verified_block:?} for round {clock_round}");
754
755 self.context
756 .metrics
757 .node_metrics
758 .proposed_blocks
759 .with_label_values(&[&force.to_string()])
760 .inc();
761
762 Some(ExtendedBlock {
763 block: verified_block,
764 excluded_ancestors,
765 })
766 }
767
768 #[instrument(level = "trace", skip_all)]
772 fn try_commit(
773 &mut self,
774 mut certified_commits: Vec<CertifiedCommit>,
775 ) -> ConsensusResult<Vec<CommittedSubDag>> {
776 let _s = self
777 .context
778 .metrics
779 .node_metrics
780 .scope_processing_time
781 .with_label_values(&["Core::try_commit"])
782 .start_timer();
783
784 let mut certified_commits_map = BTreeMap::new();
785 for c in &certified_commits {
786 certified_commits_map.insert(c.index(), c.reference());
787 }
788
789 if !certified_commits.is_empty() {
790 info!(
791 "Will try to commit synced commits first : {:?}",
792 certified_commits
793 .iter()
794 .map(|c| (c.index(), c.leader()))
795 .collect::<Vec<_>>()
796 );
797 }
798
799 let mut committed_sub_dags = Vec::new();
800 loop {
802 let mut commits_until_update = self
807 .leader_schedule
808 .commits_until_leader_schedule_update(self.dag_state.clone());
809
810 if commits_until_update == 0 {
811 let last_commit_index = self.dag_state.read().last_commit_index();
812
813 tracing::info!(
814 "Leader schedule change triggered at commit index {last_commit_index}"
815 );
816 if self
817 .context
818 .protocol_config
819 .consensus_distributed_vote_scoring_strategy()
820 {
821 self.leader_schedule
822 .update_leader_schedule_v2(&self.dag_state);
823
824 let propagation_scores = self
825 .leader_schedule
826 .leader_swap_table
827 .read()
828 .reputation_scores
829 .clone();
830 self.ancestor_state_manager
831 .set_propagation_scores(propagation_scores);
832 } else {
833 self.leader_schedule
834 .update_leader_schedule_v1(&self.dag_state);
835 }
836 commits_until_update = self
837 .leader_schedule
838 .commits_until_leader_schedule_update(self.dag_state.clone());
839
840 fail_point!("consensus-after-leader-schedule-change");
841 }
842 assert!(commits_until_update > 0);
843
844 let (mut decided_leaders, decided_certified_commits): (
847 Vec<DecidedLeader>,
848 Vec<CertifiedCommit>,
849 ) = self
850 .try_decide_certified(&mut certified_commits, commits_until_update)
851 .into_iter()
852 .unzip();
853
854 let blocks = decided_certified_commits
863 .iter()
864 .flat_map(|c| c.blocks())
865 .cloned()
866 .collect::<Vec<_>>();
867 self.block_manager.try_accept_committed_blocks(blocks);
868
869 if decided_leaders.is_empty() {
872 decided_leaders = self.committer.try_decide(self.last_decided_leader);
875
876 if decided_leaders.len() >= commits_until_update {
878 let _ = decided_leaders.split_off(commits_until_update);
879 }
880 }
881
882 let Some(last_decided) = decided_leaders.last().cloned() else {
884 break;
885 };
886
887 self.last_decided_leader = last_decided.slot();
888
889 let sequenced_leaders = decided_leaders
890 .into_iter()
891 .filter_map(|leader| leader.into_committed_block())
892 .collect::<Vec<_>>();
893
894 tracing::debug!(
895 "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
896 sequenced_leaders.len()
897 );
898
899 self.context
900 .metrics
901 .node_metrics
902 .last_decided_leader_round
903 .set(self.last_decided_leader.round as i64);
904
905 if sequenced_leaders.is_empty() {
909 break;
910 }
911
912 tracing::info!(
913 "Committing {} leaders: {}",
914 sequenced_leaders.len(),
915 sequenced_leaders
916 .iter()
917 .map(|b| b.reference().to_string())
918 .join(",")
919 );
920
921 let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
923 if self
924 .context
925 .protocol_config
926 .consensus_distributed_vote_scoring_strategy()
927 {
928 self.dag_state.write().add_scoring_subdags(subdags.clone());
929 } else {
930 self.dag_state
932 .write()
933 .add_unscored_committed_subdags(subdags.clone());
934 }
935
936 self.block_manager
938 .try_unsuspend_blocks_for_latest_gc_round();
939 committed_sub_dags.extend(subdags);
940
941 fail_point!("consensus-after-handle-commit");
942 }
943
944 for sub_dag in &committed_sub_dags {
947 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
948 assert_eq!(
949 commit_ref, sub_dag.commit_ref,
950 "Certified commit has different reference than the committed sub dag"
951 );
952 }
953 }
954
955 let committed_block_refs = committed_sub_dags
957 .iter()
958 .flat_map(|sub_dag| sub_dag.blocks.iter())
959 .filter_map(|block| {
960 (block.author() == self.context.own_index).then_some(block.reference())
961 })
962 .collect::<Vec<_>>();
963 self.transaction_consumer
964 .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
965
966 Ok(committed_sub_dags)
967 }
968
969 pub(crate) fn get_missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
970 let _scope = monitored_scope("Core::get_missing_blocks");
971 self.block_manager.missing_blocks()
972 }
973
974 pub(crate) fn set_quorum_subscribers_exists(&mut self, exists: bool) {
976 info!("A quorum of block subscribers exists: {exists}");
977 self.quorum_subscribers_exists = exists;
978 }
979
980 pub(crate) fn set_propagation_delay_and_quorum_rounds(
984 &mut self,
985 delay: Round,
986 received_quorum_rounds: Vec<QuorumRound>,
987 accepted_quorum_rounds: Vec<QuorumRound>,
988 ) {
989 info!(
990 "Received quorum round per authority in ancestor state manager set to: {}",
991 self.context
992 .committee
993 .authorities()
994 .zip(received_quorum_rounds.iter())
995 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
996 .join(", ")
997 );
998 info!(
999 "Accepted quorum round per authority in ancestor state manager set to: {}",
1000 self.context
1001 .committee
1002 .authorities()
1003 .zip(accepted_quorum_rounds.iter())
1004 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
1005 .join(", ")
1006 );
1007 self.ancestor_state_manager
1008 .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
1009 info!("Propagation round delay set to: {delay}");
1010 self.propagation_delay = delay;
1011 }
1012
1013 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
1018 if self.last_known_proposed_round.is_some() {
1019 panic!(
1020 "Should not attempt to set the last known proposed round if that has been already set"
1021 );
1022 }
1023 self.last_known_proposed_round = Some(round);
1024 info!("Last known proposed round set to {round}");
1025 }
1026
1027 pub(crate) fn should_propose(&self) -> bool {
1029 let clock_round = self.dag_state.read().threshold_clock_round();
1030 let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
1031
1032 if !self.quorum_subscribers_exists {
1033 debug!("Skip proposing for round {clock_round}, don't have a quorum of subscribers.");
1034 core_skipped_proposals
1035 .with_label_values(&["no_quorum_subscriber"])
1036 .inc();
1037 return false;
1038 }
1039
1040 if self.propagation_delay
1041 > self
1042 .context
1043 .parameters
1044 .propagation_delay_stop_proposal_threshold
1045 {
1046 debug!(
1047 "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
1048 self.propagation_delay,
1049 self.context
1050 .parameters
1051 .propagation_delay_stop_proposal_threshold
1052 );
1053 core_skipped_proposals
1054 .with_label_values(&["high_propagation_delay"])
1055 .inc();
1056 return false;
1057 }
1058
1059 let Some(last_known_proposed_round) = self.last_known_proposed_round else {
1060 debug!(
1061 "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
1062 );
1063 core_skipped_proposals
1064 .with_label_values(&["no_last_known_proposed_round"])
1065 .inc();
1066 return false;
1067 };
1068 if clock_round <= last_known_proposed_round {
1069 debug!(
1070 "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
1071 );
1072 core_skipped_proposals
1073 .with_label_values(&["higher_last_known_proposed_round"])
1074 .inc();
1075 return false;
1076 }
1077
1078 true
1079 }
1080
1081 #[tracing::instrument(skip_all)]
1088 fn try_decide_certified(
1089 &mut self,
1090 certified_commits: &mut Vec<CertifiedCommit>,
1091 limit: usize,
1092 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1093 if !self.dag_state.read().gc_enabled() {
1095 return Vec::new();
1096 }
1097
1098 assert!(limit > 0, "limit should be greater than 0");
1099
1100 let to_commit = if certified_commits.len() >= limit {
1101 certified_commits.drain(..limit).collect::<Vec<_>>()
1103 } else {
1104 mem::take(certified_commits)
1106 };
1107
1108 tracing::debug!(
1109 "Decided {} certified leaders: {}",
1110 to_commit.len(),
1111 to_commit.iter().map(|c| c.leader().to_string()).join(",")
1112 );
1113
1114 let sequenced_leaders = to_commit
1115 .into_iter()
1116 .map(|commit| {
1117 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1118 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1119 let leader = DecidedLeader::Commit(leader.clone());
1120 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1121 (leader, commit)
1122 })
1123 .collect::<Vec<_>>();
1124
1125 sequenced_leaders
1126 }
1127
1128 fn ancestors_to_propose(&mut self, clock_round: Round) -> Vec<VerifiedBlock> {
1131 let (ancestors, gc_enabled, gc_round) = {
1133 let dag_state = self.dag_state.read();
1134 (
1135 dag_state.get_last_cached_block_per_authority(clock_round),
1136 dag_state.gc_enabled(),
1137 dag_state.gc_round(),
1138 )
1139 };
1140
1141 assert_eq!(
1142 ancestors.len(),
1143 self.context.committee.size(),
1144 "Fatal error, number of returned ancestors don't match committee size."
1145 );
1146
1147 let (last_proposed_block, _) = ancestors[self.context.own_index].clone();
1150 assert_eq!(last_proposed_block.author(), self.context.own_index);
1151 let ancestors = iter::once(last_proposed_block)
1152 .chain(
1153 ancestors
1154 .into_iter()
1155 .filter(|(block, _)| block.author() != self.context.own_index)
1156 .filter(|(block, _)| {
1157 if gc_enabled && gc_round > GENESIS_ROUND {
1158 return block.round() > gc_round;
1159 }
1160 true
1161 })
1162 .flat_map(|(block, _)| {
1163 if let Some(last_block_ref) = self.last_included_ancestors[block.author()] {
1164 return (last_block_ref.round < block.round()).then_some(block);
1165 }
1166 Some(block)
1167 }),
1168 )
1169 .collect::<Vec<_>>();
1170
1171 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1173 for ancestor in ancestors
1174 .iter()
1175 .filter(|block| block.round() == clock_round - 1)
1176 {
1177 quorum.add(ancestor.author(), &self.context.committee);
1178 }
1179 assert!(
1180 quorum.reached_threshold(&self.context.committee),
1181 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1182 );
1183
1184 ancestors
1185 }
1186
1187 fn smart_ancestors_to_propose(
1192 &mut self,
1193 clock_round: Round,
1194 smart_select: bool,
1195 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1196 let node_metrics = &self.context.metrics.node_metrics;
1197 let _s = node_metrics
1198 .scope_processing_time
1199 .with_label_values(&["Core::smart_ancestors_to_propose"])
1200 .start_timer();
1201
1202 let all_ancestors = self
1204 .dag_state
1205 .read()
1206 .get_last_cached_block_per_authority(clock_round);
1207
1208 assert_eq!(
1209 all_ancestors.len(),
1210 self.context.committee.size(),
1211 "Fatal error, number of returned ancestors don't match committee size."
1212 );
1213
1214 self.ancestor_state_manager.update_all_ancestors_state();
1216 let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1217
1218 let quorum_round = clock_round.saturating_sub(1);
1219
1220 let mut score_and_pending_excluded_ancestors = Vec::new();
1221 let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1222
1223 let included_ancestors = iter::once(self.last_proposed_block().clone())
1228 .chain(
1229 all_ancestors
1230 .into_iter()
1231 .flat_map(|(ancestor, equivocating_ancestors)| {
1232 if ancestor.author() == self.context.own_index {
1233 return None;
1234 }
1235 if let Some(last_block_ref) =
1236 self.last_included_ancestors[ancestor.author()]
1237 {
1238 if last_block_ref.round >= ancestor.round() {
1239 return None;
1240 }
1241 }
1242
1243 excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1245
1246 let ancestor_state = ancestor_state_map[ancestor.author()];
1247 match ancestor_state {
1248 AncestorState::Include => {
1249 trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1250 }
1251 AncestorState::Exclude(score) => {
1252 trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1253 score_and_pending_excluded_ancestors.push((score, ancestor));
1254 return None;
1255 }
1256 }
1257
1258 Some(ancestor)
1259 }),
1260 )
1261 .collect::<Vec<_>>();
1262
1263 let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1264
1265 for ancestor in included_ancestors
1267 .iter()
1268 .filter(|a| a.round() == quorum_round)
1269 {
1270 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1271 }
1272
1273 if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1274 node_metrics.smart_selection_wait.inc();
1275 debug!(
1276 "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1277 parent_round_quorum.stake()
1278 );
1279 return (vec![], BTreeSet::new());
1280 }
1281
1282 score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1285
1286 let mut ancestors_to_propose = included_ancestors;
1287 let mut excluded_ancestors = Vec::new();
1288 for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1289 let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1290 if !parent_round_quorum.reached_threshold(&self.context.committee)
1291 && ancestor.round() == quorum_round
1292 {
1293 debug!(
1294 "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1295 );
1296 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1297 ancestors_to_propose.push(ancestor);
1298 node_metrics
1299 .included_excluded_proposal_ancestors_count_by_authority
1300 .with_label_values(&[block_hostname.as_str(), "timeout"])
1301 .inc();
1302 } else {
1303 excluded_ancestors.push((score, ancestor));
1304 }
1305 }
1306
1307 for (score, ancestor) in excluded_ancestors.iter() {
1313 let excluded_author = ancestor.author();
1314 let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1315 let mut accepted_low_quorum_round = self
1318 .ancestor_state_manager
1319 .accepted_quorum_round_per_authority[excluded_author]
1320 .0;
1321 accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1325
1326 let last_included_round = self.last_included_ancestors[excluded_author]
1327 .map(|block_ref| block_ref.round)
1328 .unwrap_or(GENESIS_ROUND);
1329 if ancestor.round() <= last_included_round {
1330 continue;
1333 }
1334
1335 if last_included_round >= accepted_low_quorum_round {
1336 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1337 trace!(
1338 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1339 ancestor.reference()
1340 );
1341 node_metrics
1342 .excluded_proposal_ancestors_count_by_authority
1343 .with_label_values(&[block_hostname])
1344 .inc();
1345 continue;
1346 }
1347
1348 let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1349 ancestor.clone()
1351 } else {
1352 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1354 trace!(
1355 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1356 ancestor.reference(),
1357 ancestor.round()
1358 );
1359 node_metrics
1360 .excluded_proposal_ancestors_count_by_authority
1361 .with_label_values(&[block_hostname])
1362 .inc();
1363
1364 match self.dag_state.read().get_last_cached_block_in_range(
1370 excluded_author,
1371 last_included_round + 1,
1372 accepted_low_quorum_round + 1,
1373 ) {
1374 Some(earlier_ancestor) => {
1375 earlier_ancestor
1377 }
1378 None => {
1379 continue;
1381 }
1382 }
1383 };
1384 self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1385 ancestors_to_propose.push(ancestor.clone());
1386 trace!(
1387 "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1388 ancestor.reference()
1389 );
1390 node_metrics
1391 .included_excluded_proposal_ancestors_count_by_authority
1392 .with_label_values(&[block_hostname.as_str(), "quorum"])
1393 .inc();
1394 }
1395
1396 assert!(
1397 parent_round_quorum.reached_threshold(&self.context.committee),
1398 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1399 );
1400
1401 info!(
1402 "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1403 ancestors_to_propose.len(),
1404 excluded_and_equivocating_ancestors.len()
1405 );
1406
1407 (ancestors_to_propose, excluded_and_equivocating_ancestors)
1408 }
1409
1410 fn leaders_exist(&self, round: Round) -> bool {
1415 let dag_state = self.dag_state.read();
1416 for leader in self.leaders(round) {
1417 if !dag_state.contains_cached_block_at_slot(leader) {
1422 return false;
1423 }
1424 }
1425
1426 true
1427 }
1428
1429 fn leaders(&self, round: Round) -> Vec<Slot> {
1431 self.committer
1432 .get_leaders(round)
1433 .into_iter()
1434 .map(|authority_index| Slot::new(round, authority_index))
1435 .collect()
1436 }
1437
1438 fn first_leader(&self, round: Round) -> AuthorityIndex {
1440 self.leaders(round).first().unwrap().authority
1441 }
1442
1443 fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1444 self.last_proposed_block().timestamp_ms()
1445 }
1446
1447 fn last_proposed_round(&self) -> Round {
1448 self.last_proposed_block().round()
1449 }
1450
1451 fn last_proposed_block(&self) -> VerifiedBlock {
1452 self.dag_state.read().get_last_proposed_block()
1453 }
1454}
1455
1456pub(crate) struct CoreSignals {
1459 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1460 new_round_sender: watch::Sender<Round>,
1461 context: Arc<Context>,
1462}
1463
1464impl CoreSignals {
1465 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1466 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1470 context.parameters.dag_state_cached_rounds as usize,
1471 );
1472 let (new_round_sender, new_round_receiver) = watch::channel(0);
1473
1474 let me = Self {
1475 tx_block_broadcast,
1476 new_round_sender,
1477 context,
1478 };
1479
1480 let receivers = CoreSignalsReceivers {
1481 rx_block_broadcast,
1482 new_round_receiver,
1483 };
1484
1485 (me, receivers)
1486 }
1487
1488 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1492 if self.context.committee.size() > 1 {
1495 if extended_block.block.round() == GENESIS_ROUND {
1496 debug!("Ignoring broadcasting genesis block to peers");
1497 return Ok(());
1498 }
1499
1500 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1501 warn!("Couldn't broadcast the block to any receiver: {err}");
1502 return Err(ConsensusError::Shutdown);
1503 }
1504 } else {
1505 debug!(
1506 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1507 );
1508 }
1509 Ok(())
1510 }
1511
1512 pub(crate) fn new_round(&mut self, round_number: Round) {
1516 let _ = self.new_round_sender.send_replace(round_number);
1517 }
1518}
1519
1520pub(crate) struct CoreSignalsReceivers {
1524 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1525 new_round_receiver: watch::Receiver<Round>,
1526}
1527
1528impl CoreSignalsReceivers {
1529 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1530 self.rx_block_broadcast.resubscribe()
1531 }
1532
1533 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1534 self.new_round_receiver.clone()
1535 }
1536}
1537
1538#[cfg(test)]
1542pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<CoreTextFixture> {
1543 let mut cores = Vec::new();
1544
1545 for index in 0..authorities.len() {
1546 let own_index = AuthorityIndex::new_for_test(index as u32);
1547 let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
1548 cores.push(core);
1549 }
1550 cores
1551}
1552
1553#[cfg(test)]
1554pub(crate) struct CoreTextFixture {
1555 pub core: Core,
1556 pub signal_receivers: CoreSignalsReceivers,
1557 pub block_receiver: broadcast::Receiver<ExtendedBlock>,
1558 #[expect(unused)]
1559 pub commit_receiver: UnboundedReceiver<CommittedSubDag>,
1560 pub store: Arc<MemStore>,
1561}
1562
1563#[cfg(test)]
1564impl CoreTextFixture {
1565 fn new(
1566 context: Context,
1567 authorities: Vec<Stake>,
1568 own_index: AuthorityIndex,
1569 sync_last_known_own_block: bool,
1570 ) -> Self {
1571 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1572 let mut context = context.clone();
1573 context = context
1574 .with_committee(committee)
1575 .with_authority_index(own_index);
1576 context
1577 .protocol_config
1578 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1579
1580 let context = Arc::new(context);
1581 let store = Arc::new(MemStore::new());
1582 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1583
1584 let block_manager = BlockManager::new(
1585 context.clone(),
1586 dag_state.clone(),
1587 Arc::new(NoopBlockVerifier),
1588 );
1589 let leader_schedule = Arc::new(
1590 LeaderSchedule::from_store(context.clone(), dag_state.clone())
1591 .with_num_commits_per_schedule(10),
1592 );
1593 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1594 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1595 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1596 let block_receiver = signal_receivers.block_broadcast_receiver();
1598
1599 let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
1600 let commit_observer = CommitObserver::new(
1601 context.clone(),
1602 CommitConsumer::new(commit_sender.clone(), 0),
1603 dag_state.clone(),
1604 store.clone(),
1605 leader_schedule.clone(),
1606 );
1607
1608 let block_signer = signers.remove(own_index.value()).1;
1609
1610 let core = Core::new(
1611 context,
1612 leader_schedule,
1613 transaction_consumer,
1614 block_manager,
1615 true,
1616 commit_observer,
1617 signals,
1618 block_signer,
1619 dag_state,
1620 sync_last_known_own_block,
1621 );
1622
1623 Self {
1624 core,
1625 signal_receivers,
1626 block_receiver,
1627 commit_receiver,
1628 store,
1629 }
1630 }
1631}
1632
1633#[cfg(test)]
1634mod test {
1635 use std::{collections::BTreeSet, time::Duration};
1636
1637 use consensus_config::{AuthorityIndex, Parameters};
1638 use futures::{StreamExt, stream::FuturesUnordered};
1639 use iota_metrics::monitored_mpsc::unbounded_channel;
1640 use iota_protocol_config::ProtocolConfig;
1641 use rstest::rstest;
1642 use tokio::time::sleep;
1643
1644 use super::*;
1645 use crate::{
1646 CommitConsumer, CommitIndex,
1647 block::{TestBlock, genesis_blocks},
1648 block_verifier::NoopBlockVerifier,
1649 commit::CommitAPI,
1650 leader_scoring::ReputationScores,
1651 storage::{Store, WriteBatch, mem_store::MemStore},
1652 test_dag_builder::DagBuilder,
1653 test_dag_parser::parse_dag,
1654 transaction::{BlockStatus, TransactionClient},
1655 };
1656
1657 #[tokio::test]
1660 async fn test_core_recover_from_store_for_full_round() {
1661 telemetry_subscribers::init_for_testing();
1662 let (context, mut key_pairs) = Context::new_for_test(4);
1663 let context = Arc::new(context);
1664 let store = Arc::new(MemStore::new());
1665 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1666 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1667 let mut block_status_subscriptions = FuturesUnordered::new();
1668
1669 let mut last_round_blocks = genesis_blocks(&context);
1672 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1673 for round in 1..=4 {
1674 let mut this_round_blocks = Vec::new();
1675 for (index, _authority) in context.committee.authorities() {
1676 let block = VerifiedBlock::new_for_test(
1677 TestBlock::new(round, index.value() as u32)
1678 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1679 .build(),
1680 );
1681
1682 if round == 1 && index == context.own_index {
1685 let subscription =
1686 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1687 block_status_subscriptions.push(subscription);
1688 }
1689
1690 this_round_blocks.push(block);
1691 }
1692 all_blocks.extend(this_round_blocks.clone());
1693 last_round_blocks = this_round_blocks;
1694 }
1695 store
1697 .write(WriteBatch::default().blocks(all_blocks))
1698 .expect("Storage error");
1699
1700 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1702 let block_manager = BlockManager::new(
1703 context.clone(),
1704 dag_state.clone(),
1705 Arc::new(NoopBlockVerifier),
1706 );
1707 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1708 context.clone(),
1709 dag_state.clone(),
1710 ));
1711
1712 let (sender, _receiver) = unbounded_channel("consensus_output");
1713 let commit_observer = CommitObserver::new(
1714 context.clone(),
1715 CommitConsumer::new(sender.clone(), 0),
1716 dag_state.clone(),
1717 store.clone(),
1718 leader_schedule.clone(),
1719 );
1720
1721 let last_commit = store.read_last_commit().unwrap();
1723 assert!(last_commit.is_none());
1724 assert_eq!(dag_state.read().last_commit_index(), 0);
1725
1726 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1728 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1730 let _core = Core::new(
1731 context.clone(),
1732 leader_schedule,
1733 transaction_consumer,
1734 block_manager,
1735 true,
1736 commit_observer,
1737 signals,
1738 key_pairs.remove(context.own_index.value()).1,
1739 dag_state.clone(),
1740 false,
1741 );
1742
1743 let mut new_round = signal_receivers.new_round_receiver();
1745 assert_eq!(*new_round.borrow_and_update(), 5);
1746
1747 let proposed_block = block_receiver
1749 .recv()
1750 .await
1751 .expect("A block should have been created");
1752 assert_eq!(proposed_block.block.round(), 5);
1753 let ancestors = proposed_block.block.ancestors();
1754
1755 assert_eq!(ancestors.len(), 4);
1757 for ancestor in ancestors {
1758 assert_eq!(ancestor.round, 4);
1759 }
1760
1761 let last_commit = store
1762 .read_last_commit()
1763 .unwrap()
1764 .expect("last commit should be set");
1765
1766 assert_eq!(last_commit.index(), 2);
1770 assert_eq!(dag_state.read().last_commit_index(), 2);
1771 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1772 assert_eq!(all_stored_commits.len(), 2);
1773
1774 while let Some(result) = block_status_subscriptions.next().await {
1777 let status = result.unwrap();
1778 assert!(matches!(status, BlockStatus::Sequenced(_)));
1779 }
1780 }
1781
1782 #[tokio::test]
1786 async fn test_core_recover_from_store_for_partial_round() {
1787 telemetry_subscribers::init_for_testing();
1788
1789 let (context, mut key_pairs) = Context::new_for_test(4);
1790 let context = Arc::new(context);
1791 let store = Arc::new(MemStore::new());
1792 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1793 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1794
1795 let mut last_round_blocks = genesis_blocks(&context);
1797 let mut all_blocks = last_round_blocks.clone();
1798 for round in 1..=4 {
1799 let mut this_round_blocks = Vec::new();
1800
1801 let authorities_to_skip = if round == 4 {
1804 context.committee.validity_threshold() as usize
1805 } else {
1806 1
1808 };
1809
1810 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1811 let block = TestBlock::new(round, index.value() as u32)
1812 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1813 .build();
1814 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1815 }
1816 all_blocks.extend(this_round_blocks.clone());
1817 last_round_blocks = this_round_blocks;
1818 }
1819
1820 store
1822 .write(WriteBatch::default().blocks(all_blocks))
1823 .expect("Storage error");
1824
1825 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1827 let block_manager = BlockManager::new(
1828 context.clone(),
1829 dag_state.clone(),
1830 Arc::new(NoopBlockVerifier),
1831 );
1832 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1833 context.clone(),
1834 dag_state.clone(),
1835 ));
1836
1837 let (sender, _receiver) = unbounded_channel("consensus_output");
1838 let commit_observer = CommitObserver::new(
1839 context.clone(),
1840 CommitConsumer::new(sender.clone(), 0),
1841 dag_state.clone(),
1842 store.clone(),
1843 leader_schedule.clone(),
1844 );
1845
1846 let last_commit = store.read_last_commit().unwrap();
1848 assert!(last_commit.is_none());
1849 assert_eq!(dag_state.read().last_commit_index(), 0);
1850
1851 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1853 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1855 let mut core = Core::new(
1856 context.clone(),
1857 leader_schedule,
1858 transaction_consumer,
1859 block_manager,
1860 true,
1861 commit_observer,
1862 signals,
1863 key_pairs.remove(context.own_index.value()).1,
1864 dag_state.clone(),
1865 false,
1866 );
1867
1868 let mut new_round = signal_receivers.new_round_receiver();
1871 assert_eq!(*new_round.borrow_and_update(), 5);
1872
1873 let proposed_block = block_receiver
1875 .recv()
1876 .await
1877 .expect("A block should have been created");
1878 assert_eq!(proposed_block.block.round(), 4);
1879 let ancestors = proposed_block.block.ancestors();
1880
1881 assert_eq!(ancestors.len(), 4);
1882 for ancestor in ancestors {
1883 if ancestor.author == context.own_index {
1884 assert_eq!(ancestor.round, 0);
1885 } else {
1886 assert_eq!(ancestor.round, 3);
1887 }
1888 }
1889
1890 core.try_commit(vec![]).ok();
1892 let last_commit = store
1893 .read_last_commit()
1894 .unwrap()
1895 .expect("last commit should be set");
1896
1897 assert_eq!(last_commit.index(), 2);
1901 assert_eq!(dag_state.read().last_commit_index(), 2);
1902 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1903 assert_eq!(all_stored_commits.len(), 2);
1904 }
1905
1906 #[tokio::test]
1907 async fn test_core_propose_after_genesis() {
1908 telemetry_subscribers::init_for_testing();
1909 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1910 config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1911 config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1912 config
1913 });
1914
1915 let (context, mut key_pairs) = Context::new_for_test(4);
1916 let context = Arc::new(context);
1917 let store = Arc::new(MemStore::new());
1918 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1919
1920 let block_manager = BlockManager::new(
1921 context.clone(),
1922 dag_state.clone(),
1923 Arc::new(NoopBlockVerifier),
1924 );
1925 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1926 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1927 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1928 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1930 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1931 context.clone(),
1932 dag_state.clone(),
1933 ));
1934
1935 let (sender, _receiver) = unbounded_channel("consensus_output");
1936 let commit_observer = CommitObserver::new(
1937 context.clone(),
1938 CommitConsumer::new(sender.clone(), 0),
1939 dag_state.clone(),
1940 store.clone(),
1941 leader_schedule.clone(),
1942 );
1943
1944 let mut core = Core::new(
1945 context.clone(),
1946 leader_schedule,
1947 transaction_consumer,
1948 block_manager,
1949 true,
1950 commit_observer,
1951 signals,
1952 key_pairs.remove(context.own_index.value()).1,
1953 dag_state.clone(),
1954 false,
1955 );
1956
1957 let mut total = 0;
1959 let mut index = 0;
1960 loop {
1961 let transaction =
1962 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1963 total += transaction.len();
1964 index += 1;
1965 let _w = transaction_client
1966 .submit_no_wait(vec![transaction])
1967 .await
1968 .unwrap();
1969
1970 if total >= 1_000 {
1972 break;
1973 }
1974 }
1975
1976 let extended_block = block_receiver
1978 .recv()
1979 .await
1980 .expect("A new block should have been created");
1981
1982 assert_eq!(extended_block.block.round(), 1);
1984 assert_eq!(extended_block.block.author().value(), 0);
1985 assert_eq!(extended_block.block.ancestors().len(), 4);
1986
1987 let mut total = 0;
1988 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1989 total += transaction.data().len() as u64;
1990 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1991 assert_eq!(format!("Transaction {i}"), transaction);
1992 }
1993 assert!(
1994 total
1995 <= context
1996 .protocol_config
1997 .consensus_max_transactions_in_block_bytes()
1998 );
1999
2000 let all_genesis = genesis_blocks(&context);
2002
2003 for ancestor in extended_block.block.ancestors() {
2004 all_genesis
2005 .iter()
2006 .find(|block| block.reference() == *ancestor)
2007 .expect("Block should be found amongst genesis blocks");
2008 }
2009
2010 assert!(core.try_propose(false).unwrap().is_none());
2013 assert!(core.try_propose(true).unwrap().is_none());
2014
2015 let last_commit = store.read_last_commit().unwrap();
2017 assert!(last_commit.is_none());
2018 assert_eq!(dag_state.read().last_commit_index(), 0);
2019 }
2020
2021 #[tokio::test]
2022 async fn test_core_propose_once_receiving_a_quorum() {
2023 telemetry_subscribers::init_for_testing();
2024 let (context, mut key_pairs) = Context::new_for_test(4);
2025 let context = Arc::new(context);
2026
2027 let store = Arc::new(MemStore::new());
2028 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2029
2030 let block_manager = BlockManager::new(
2031 context.clone(),
2032 dag_state.clone(),
2033 Arc::new(NoopBlockVerifier),
2034 );
2035 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2036 context.clone(),
2037 dag_state.clone(),
2038 ));
2039
2040 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2041 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2042 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2043 let _block_receiver = signal_receivers.block_broadcast_receiver();
2045
2046 let (sender, _receiver) = unbounded_channel("consensus_output");
2047 let commit_observer = CommitObserver::new(
2048 context.clone(),
2049 CommitConsumer::new(sender.clone(), 0),
2050 dag_state.clone(),
2051 store.clone(),
2052 leader_schedule.clone(),
2053 );
2054
2055 let mut core = Core::new(
2056 context.clone(),
2057 leader_schedule,
2058 transaction_consumer,
2059 block_manager,
2060 true,
2061 commit_observer,
2062 signals,
2063 key_pairs.remove(context.own_index.value()).1,
2064 dag_state.clone(),
2065 false,
2066 );
2067
2068 let mut expected_ancestors = BTreeSet::new();
2069
2070 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
2072 expected_ancestors.insert(block_1.reference());
2073 sleep(context.parameters.min_round_delay).await;
2075 _ = core.add_blocks(vec![block_1]);
2077
2078 assert_eq!(core.last_proposed_round(), 1);
2079 expected_ancestors.insert(core.last_proposed_block().reference());
2080 assert!(core.try_propose(false).unwrap().is_none());
2082
2083 let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
2086 expected_ancestors.insert(block_3.reference());
2087 sleep(context.parameters.min_round_delay).await;
2089 _ = core.add_blocks(vec![block_3]);
2091
2092 assert_eq!(core.last_proposed_round(), 2);
2093
2094 let proposed_block = core.last_proposed_block();
2095 assert_eq!(proposed_block.round(), 2);
2096 assert_eq!(proposed_block.author(), context.own_index);
2097 assert_eq!(proposed_block.ancestors().len(), 3);
2098 let ancestors = proposed_block.ancestors();
2099 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
2100 assert_eq!(ancestors, expected_ancestors);
2101
2102 let last_commit = store.read_last_commit().unwrap();
2104 assert!(last_commit.is_none());
2105 assert_eq!(dag_state.read().last_commit_index(), 0);
2106 }
2107
2108 #[rstest]
2109 #[tokio::test]
2110 async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
2111 telemetry_subscribers::init_for_testing();
2112 let (mut context, mut key_pairs) = Context::new_for_test(4);
2113
2114 if gc_depth > 0 {
2115 context
2116 .protocol_config
2117 .set_consensus_gc_depth_for_testing(gc_depth);
2118 }
2119
2120 let context = Arc::new(context);
2121
2122 let store = Arc::new(MemStore::new());
2123 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2124 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2125 let mut block_status_subscriptions = FuturesUnordered::new();
2126
2127 let dag_str = "DAG {
2128 Round 0 : { 4 },
2129 Round 1 : { * },
2130 Round 2 : { * },
2131 Round 3 : {
2132 A -> [*],
2133 B -> [-A2],
2134 C -> [-A2],
2135 D -> [-A2],
2136 },
2137 Round 4 : {
2138 B -> [-A3],
2139 C -> [-A3],
2140 D -> [-A3],
2141 },
2142 Round 5 : {
2143 A -> [A3, B4, C4, D4]
2144 B -> [*],
2145 C -> [*],
2146 D -> [*],
2147 },
2148 Round 6 : { * },
2149 Round 7 : { * },
2150 Round 8 : { * },
2151 }";
2152
2153 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2154 dag_builder.print();
2155
2156 for block in dag_builder.blocks(1..=5) {
2159 if block.author() == context.own_index {
2160 let subscription =
2161 transaction_consumer.subscribe_for_block_status_testing(block.reference());
2162 block_status_subscriptions.push(subscription);
2163 }
2164 }
2165
2166 store
2168 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2169 .expect("Storage error");
2170
2171 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2173 let block_manager = BlockManager::new(
2174 context.clone(),
2175 dag_state.clone(),
2176 Arc::new(NoopBlockVerifier),
2177 );
2178 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2179 context.clone(),
2180 dag_state.clone(),
2181 ));
2182
2183 let (sender, _receiver) = unbounded_channel("consensus_output");
2184 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2185 let commit_observer = CommitObserver::new(
2186 context.clone(),
2187 commit_consumer,
2188 dag_state.clone(),
2189 store.clone(),
2190 leader_schedule.clone(),
2191 );
2192
2193 let last_commit = store.read_last_commit().unwrap();
2195 assert!(last_commit.is_none());
2196 assert_eq!(dag_state.read().last_commit_index(), 0);
2197
2198 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2200 let _block_receiver = signal_receivers.block_broadcast_receiver();
2202 let _core = Core::new(
2203 context.clone(),
2204 leader_schedule,
2205 transaction_consumer,
2206 block_manager,
2207 true,
2208 commit_observer,
2209 signals,
2210 key_pairs.remove(context.own_index.value()).1,
2211 dag_state.clone(),
2212 false,
2213 );
2214
2215 let last_commit = store
2216 .read_last_commit()
2217 .unwrap()
2218 .expect("last commit should be set");
2219
2220 assert_eq!(last_commit.index(), 5);
2221
2222 while let Some(result) = block_status_subscriptions.next().await {
2223 let status = result.unwrap();
2224
2225 if gc_depth > 0 {
2227 match status {
2228 BlockStatus::Sequenced(block_ref) => {
2229 assert!(block_ref.round == 1 || block_ref.round == 5);
2230 }
2231 BlockStatus::GarbageCollected(block_ref) => {
2232 assert!(block_ref.round == 2 || block_ref.round == 3);
2233 }
2234 }
2235 } else {
2236 assert!(matches!(status, BlockStatus::Sequenced(_)));
2238 }
2239 }
2240 }
2241
2242 #[tokio::test]
2246 async fn test_multiple_commits_advance_threshold_clock() {
2247 telemetry_subscribers::init_for_testing();
2248 let (mut context, mut key_pairs) = Context::new_for_test(4);
2249 const GC_DEPTH: u32 = 2;
2250
2251 context
2252 .protocol_config
2253 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2254
2255 let context = Arc::new(context);
2256
2257 let store = Arc::new(MemStore::new());
2258 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2259 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2260
2261 let dag_str = "DAG {
2267 Round 0 : { 4 },
2268 Round 1 : { * },
2269 Round 2 : {
2270 B -> [-D1],
2271 C -> [-D1],
2272 D -> [-D1],
2273 },
2274 Round 3 : {
2275 B -> [*],
2276 C -> [*]
2277 D -> [*],
2278 },
2279 Round 4 : {
2280 A -> [*],
2281 B -> [*],
2282 C -> [*]
2283 D -> [*],
2284 },
2285 Round 5 : {
2286 A -> [*],
2287 B -> [*],
2288 C -> [*],
2289 D -> [*],
2290 },
2291 Round 6 : {
2292 B -> [A5, B5, C5, D1],
2293 C -> [A5, B5, C5, D1],
2294 D -> [A5, B5, C5, D1],
2295 },
2296 Round 7 : {
2297 B -> [*],
2298 C -> [*],
2299 D -> [*],
2300 },
2301 Round 8 : {
2302 B -> [*],
2303 C -> [*],
2304 D -> [*],
2305 },
2306 Round 9 : {
2307 B -> [*],
2308 C -> [*],
2309 D -> [*],
2310 },
2311 Round 10 : {
2312 B -> [*],
2313 C -> [*],
2314 D -> [*],
2315 },
2316 Round 11 : {
2317 B -> [*],
2318 C -> [*],
2319 D -> [*],
2320 },
2321 }";
2322
2323 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2324 dag_builder.print();
2325
2326 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2328 let block_manager = BlockManager::new(
2329 context.clone(),
2330 dag_state.clone(),
2331 Arc::new(NoopBlockVerifier),
2332 );
2333 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2334 context.clone(),
2335 dag_state.clone(),
2336 ));
2337 let (sender, _receiver) = unbounded_channel("consensus_output");
2338 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2339 let commit_observer = CommitObserver::new(
2340 context.clone(),
2341 commit_consumer,
2342 dag_state.clone(),
2343 store.clone(),
2344 leader_schedule.clone(),
2345 );
2346
2347 let last_commit = store.read_last_commit().unwrap();
2349 assert!(last_commit.is_none());
2350 assert_eq!(dag_state.read().last_commit_index(), 0);
2351
2352 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2354 let _block_receiver = signal_receivers.block_broadcast_receiver();
2356 let mut core = Core::new(
2357 context.clone(),
2358 leader_schedule,
2359 transaction_consumer,
2360 block_manager,
2361 true,
2362 commit_observer,
2363 signals,
2364 key_pairs.remove(context.own_index.value()).1,
2365 dag_state.clone(),
2366 true,
2367 );
2368 core.set_last_known_proposed_round(4);
2372
2373 core.add_blocks(
2385 dag_builder
2386 .blocks(1..=11)
2387 .into_iter()
2388 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2389 .collect(),
2390 )
2391 .expect("Should not fail");
2392
2393 assert_eq!(core.last_proposed_round(), 12);
2394 }
2395
2396 #[tokio::test]
2397 async fn test_core_set_min_propose_round() {
2398 telemetry_subscribers::init_for_testing();
2399 let (context, mut key_pairs) = Context::new_for_test(4);
2400 let context = Arc::new(context.with_parameters(Parameters {
2401 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2402 ..Default::default()
2403 }));
2404
2405 let store = Arc::new(MemStore::new());
2406 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2407
2408 let block_manager = BlockManager::new(
2409 context.clone(),
2410 dag_state.clone(),
2411 Arc::new(NoopBlockVerifier),
2412 );
2413 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2414 context.clone(),
2415 dag_state.clone(),
2416 ));
2417
2418 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2419 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2420 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2421 let _block_receiver = signal_receivers.block_broadcast_receiver();
2423
2424 let (sender, _receiver) = unbounded_channel("consensus_output");
2425 let commit_observer = CommitObserver::new(
2426 context.clone(),
2427 CommitConsumer::new(sender.clone(), 0),
2428 dag_state.clone(),
2429 store.clone(),
2430 leader_schedule.clone(),
2431 );
2432
2433 let mut core = Core::new(
2434 context.clone(),
2435 leader_schedule,
2436 transaction_consumer,
2437 block_manager,
2438 true,
2439 commit_observer,
2440 signals,
2441 key_pairs.remove(context.own_index.value()).1,
2442 dag_state.clone(),
2443 true,
2444 );
2445
2446 assert_eq!(
2448 core.last_proposed_round(),
2449 GENESIS_ROUND,
2450 "No block should have been created other than genesis"
2451 );
2452
2453 assert!(core.try_propose(true).unwrap().is_none());
2455
2456 let mut builder = DagBuilder::new(context.clone());
2459 builder.layers(1..=10).build();
2460
2461 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2462
2463 assert!(core.add_blocks(blocks).unwrap().is_empty());
2465
2466 assert!(core.try_propose(true).unwrap().is_none());
2468
2469 core.set_last_known_proposed_round(10);
2472
2473 let block = core.try_propose(true).expect("No error").unwrap();
2474 assert_eq!(block.round(), 11);
2475 assert_eq!(block.ancestors().len(), 4);
2476
2477 let our_ancestor_included = block.ancestors()[0];
2478 assert_eq!(our_ancestor_included.author, context.own_index);
2479 assert_eq!(our_ancestor_included.round, 10);
2480 }
2481
2482 #[tokio::test(flavor = "current_thread", start_paused = true)]
2483 async fn test_core_try_new_block_leader_timeout() {
2484 telemetry_subscribers::init_for_testing();
2485
2486 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2495 let now = context.clock.timestamp_utc_ms();
2498 let max_timestamp = blocks
2499 .iter()
2500 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2501 .map(|block| block.timestamp_ms())
2502 .unwrap_or(0);
2503
2504 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2505 sleep(wait_time).await;
2506 }
2507
2508 let (context, _) = Context::new_for_test(4);
2509 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2511
2512 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2518
2519 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2522 for round in 1..=3 {
2523 let mut this_round_blocks = Vec::new();
2524
2525 for core_fixture in cores.iter_mut() {
2526 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2527
2528 core_fixture
2529 .core
2530 .add_blocks(last_round_blocks.clone())
2531 .unwrap();
2532
2533 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2535 assert_eq!(round - 1, r);
2536 if core_fixture.core.last_proposed_round() == r {
2537 core_fixture
2539 .core
2540 .try_propose(true)
2541 .unwrap()
2542 .unwrap_or_else(|| {
2543 panic!("Block should have been proposed for round {round}")
2544 });
2545 }
2546 }
2547
2548 assert_eq!(core_fixture.core.last_proposed_round(), round);
2549
2550 this_round_blocks.push(core_fixture.core.last_proposed_block());
2551 }
2552
2553 last_round_blocks = this_round_blocks;
2554 }
2555
2556 for core_fixture in cores.iter_mut() {
2560 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2561
2562 core_fixture
2563 .core
2564 .add_blocks(last_round_blocks.clone())
2565 .unwrap();
2566 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2567 }
2568
2569 for core_fixture in cores.iter_mut() {
2572 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2573 assert_eq!(core_fixture.core.last_proposed_round(), 4);
2574
2575 let last_commit = core_fixture
2577 .store
2578 .read_last_commit()
2579 .unwrap()
2580 .expect("last commit should be set");
2581 assert_eq!(last_commit.index(), 1);
2584 let all_stored_commits = core_fixture
2585 .store
2586 .scan_commits((0..=CommitIndex::MAX).into())
2587 .unwrap();
2588 assert_eq!(all_stored_commits.len(), 1);
2589 }
2590 }
2591
2592 #[tokio::test(flavor = "current_thread", start_paused = true)]
2593 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2594 telemetry_subscribers::init_for_testing();
2595
2596 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2605 let now = context.clock.timestamp_utc_ms();
2608 let max_timestamp = blocks
2609 .iter()
2610 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2611 .map(|block| block.timestamp_ms())
2612 .unwrap_or(0);
2613
2614 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2615 sleep(wait_time).await;
2616 }
2617
2618 let (mut context, _) = Context::new_for_test(4);
2619 context
2620 .protocol_config
2621 .set_consensus_smart_ancestor_selection_for_testing(true);
2622 context
2623 .protocol_config
2624 .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2625
2626 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2628 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2629
2630 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2633 for round in 1..=30 {
2634 let mut this_round_blocks = Vec::new();
2635
2636 for core_fixture in cores.iter_mut() {
2637 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2638
2639 core_fixture
2640 .core
2641 .add_blocks(last_round_blocks.clone())
2642 .unwrap();
2643
2644 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2646 assert_eq!(round - 1, r);
2647 if core_fixture.core.last_proposed_round() == r {
2648 core_fixture
2650 .core
2651 .try_propose(true)
2652 .unwrap()
2653 .unwrap_or_else(|| {
2654 panic!("Block should have been proposed for round {round}")
2655 });
2656 }
2657 }
2658
2659 assert_eq!(core_fixture.core.last_proposed_round(), round);
2660
2661 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2662 }
2663
2664 last_round_blocks = this_round_blocks;
2665 }
2666
2667 for round in 31..=40 {
2669 let mut this_round_blocks = Vec::new();
2670
2671 for core_fixture in all_cores.iter_mut() {
2672 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2673
2674 core_fixture
2675 .core
2676 .add_blocks(last_round_blocks.clone())
2677 .unwrap();
2678
2679 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2681 assert_eq!(round - 1, r);
2682 if core_fixture.core.last_proposed_round() == r {
2683 core_fixture
2685 .core
2686 .try_propose(true)
2687 .unwrap()
2688 .unwrap_or_else(|| {
2689 panic!("Block should have been proposed for round {round}")
2690 });
2691 }
2692 }
2693
2694 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2695
2696 for block in this_round_blocks.iter() {
2697 if block.author() != AuthorityIndex::new_for_test(3) {
2698 assert_eq!(block.ancestors().len(), 3);
2701 } else {
2702 assert_eq!(block.ancestors().len(), 4);
2705 }
2706 }
2707 }
2708
2709 last_round_blocks = this_round_blocks;
2710 }
2711 }
2712
2713 #[tokio::test]
2714 async fn test_smart_ancestor_selection() {
2715 telemetry_subscribers::init_for_testing();
2716 let (mut context, mut key_pairs) = Context::new_for_test(7);
2717 context
2718 .protocol_config
2719 .set_consensus_smart_ancestor_selection_for_testing(true);
2720 context
2721 .protocol_config
2722 .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2723 let context = Arc::new(context.with_parameters(Parameters {
2724 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2725 ..Default::default()
2726 }));
2727
2728 let store = Arc::new(MemStore::new());
2729 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2730
2731 let block_manager = BlockManager::new(
2732 context.clone(),
2733 dag_state.clone(),
2734 Arc::new(NoopBlockVerifier),
2735 );
2736 let leader_schedule = Arc::new(
2737 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2738 .with_num_commits_per_schedule(10),
2739 );
2740
2741 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2742 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2743 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2744 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2746
2747 let (sender, _receiver) = unbounded_channel("consensus_output");
2748 let commit_consumer = CommitConsumer::new(sender, 0);
2749 let commit_observer = CommitObserver::new(
2750 context.clone(),
2751 commit_consumer,
2752 dag_state.clone(),
2753 store.clone(),
2754 leader_schedule.clone(),
2755 );
2756
2757 let mut core = Core::new(
2758 context.clone(),
2759 leader_schedule,
2760 transaction_consumer,
2761 block_manager,
2762 true,
2763 commit_observer,
2764 signals,
2765 key_pairs.remove(context.own_index.value()).1,
2766 dag_state.clone(),
2767 true,
2768 );
2769
2770 assert_eq!(
2772 core.last_proposed_round(),
2773 GENESIS_ROUND,
2774 "No block should have been created other than genesis"
2775 );
2776
2777 assert!(core.try_propose(true).unwrap().is_none());
2779
2780 let mut builder = DagBuilder::new(context.clone());
2782 builder
2783 .layers(1..=12)
2784 .authorities(vec![AuthorityIndex::new_for_test(1)])
2785 .skip_block()
2786 .build();
2787 let blocks = builder.blocks(1..=12);
2788 assert!(core.add_blocks(blocks).unwrap().is_empty());
2790 core.set_last_known_proposed_round(12);
2791
2792 let block = core.try_propose(true).expect("No error").unwrap();
2793 assert_eq!(block.round(), 13);
2794 assert_eq!(block.ancestors().len(), 7);
2795
2796 builder
2798 .layers(13..=14)
2799 .authorities(vec![AuthorityIndex::new_for_test(0)])
2800 .skip_block()
2801 .build();
2802 let blocks = builder.blocks(13..=14);
2803 assert!(core.add_blocks(blocks).unwrap().is_empty());
2804
2805 let block = core.try_propose(true).expect("No error").unwrap();
2809 assert_eq!(block.round(), 15);
2810 assert_eq!(block.ancestors().len(), 6);
2811
2812 builder
2815 .layer(15)
2816 .authorities(vec![
2817 AuthorityIndex::new_for_test(0),
2818 AuthorityIndex::new_for_test(5),
2819 AuthorityIndex::new_for_test(6),
2820 ])
2821 .skip_block()
2822 .build();
2823 let blocks = builder.blocks(15..=15);
2824 let authority_1_excluded_block_reference = blocks
2825 .iter()
2826 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2827 .unwrap()
2828 .reference();
2829 sleep(context.parameters.min_round_delay).await;
2831 assert!(core.add_blocks(blocks).unwrap().is_empty());
2833 assert_eq!(core.last_proposed_block().round(), 15);
2834
2835 builder
2836 .layer(15)
2837 .authorities(vec![
2838 AuthorityIndex::new_for_test(0),
2839 AuthorityIndex::new_for_test(1),
2840 AuthorityIndex::new_for_test(2),
2841 AuthorityIndex::new_for_test(3),
2842 AuthorityIndex::new_for_test(4),
2843 ])
2844 .skip_block()
2845 .build();
2846 let blocks = builder.blocks(15..=15);
2847 let included_block_references = iter::once(&core.last_proposed_block())
2848 .chain(blocks.iter())
2849 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2850 .map(|block| block.reference())
2851 .collect::<Vec<_>>();
2852
2853 assert!(core.add_blocks(blocks).unwrap().is_empty());
2855 assert_eq!(core.last_proposed_block().round(), 16);
2856
2857 let extended_block = loop {
2859 let extended_block =
2860 tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2861 .await
2862 .unwrap()
2863 .unwrap();
2864 if extended_block.block.round() == 16 {
2865 break extended_block;
2866 }
2867 };
2868 assert_eq!(extended_block.block.round(), 16);
2869 assert_eq!(extended_block.block.author(), core.context.own_index);
2870 assert_eq!(extended_block.block.ancestors().len(), 6);
2871 assert_eq!(extended_block.block.ancestors(), included_block_references);
2872 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2873 assert_eq!(
2874 extended_block.excluded_ancestors[0],
2875 authority_1_excluded_block_reference
2876 );
2877
2878 builder
2883 .layer(16)
2884 .authorities(vec![
2885 AuthorityIndex::new_for_test(0),
2886 AuthorityIndex::new_for_test(5),
2887 AuthorityIndex::new_for_test(6),
2888 ])
2889 .skip_block()
2890 .build();
2891 let blocks = builder.blocks(16..=16);
2892 sleep(context.parameters.min_round_delay).await;
2894 assert!(core.add_blocks(blocks).unwrap().is_empty());
2896 assert_eq!(core.last_proposed_block().round(), 16);
2897
2898 let block = core.try_propose(true).expect("No error").unwrap();
2901 assert_eq!(block.round(), 17);
2902 assert_eq!(block.ancestors().len(), 5);
2903
2904 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2906 .await
2907 .unwrap()
2908 .unwrap();
2909 assert_eq!(extended_block.block.round(), 17);
2910 assert_eq!(extended_block.block.author(), core.context.own_index);
2911 assert_eq!(extended_block.block.ancestors().len(), 5);
2912 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2913
2914 core.set_propagation_delay_and_quorum_rounds(
2918 0,
2919 vec![
2920 (16, 16),
2921 (16, 16),
2922 (16, 16),
2923 (16, 16),
2924 (16, 16),
2925 (16, 16),
2926 (16, 16),
2927 ],
2928 vec![
2929 (16, 16),
2930 (16, 16),
2931 (16, 16),
2932 (16, 16),
2933 (16, 16),
2934 (16, 16),
2935 (16, 16),
2936 ],
2937 );
2938
2939 builder
2940 .layer(17)
2941 .authorities(vec![AuthorityIndex::new_for_test(0)])
2942 .skip_block()
2943 .build();
2944 let blocks = builder.blocks(17..=17);
2945 let included_block_references = iter::once(&core.last_proposed_block())
2946 .chain(blocks.iter())
2947 .map(|block| block.reference())
2948 .collect::<Vec<_>>();
2949
2950 sleep(context.parameters.min_round_delay).await;
2952 assert!(core.add_blocks(blocks).unwrap().is_empty());
2953 assert_eq!(core.last_proposed_block().round(), 18);
2954
2955 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2957 .await
2958 .unwrap()
2959 .unwrap();
2960 assert_eq!(extended_block.block.round(), 18);
2961 assert_eq!(extended_block.block.author(), core.context.own_index);
2962 assert_eq!(extended_block.block.ancestors().len(), 7);
2963 assert_eq!(extended_block.block.ancestors(), included_block_references);
2964 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2965 }
2966
2967 #[tokio::test]
2968 async fn test_excluded_ancestor_limit() {
2969 telemetry_subscribers::init_for_testing();
2970 let (mut context, mut key_pairs) = Context::new_for_test(4);
2971 context
2972 .protocol_config
2973 .set_consensus_smart_ancestor_selection_for_testing(true);
2974 context
2975 .protocol_config
2976 .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2977 let context = Arc::new(context.with_parameters(Parameters {
2978 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2979 ..Default::default()
2980 }));
2981
2982 let store = Arc::new(MemStore::new());
2983 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2984
2985 let block_manager = BlockManager::new(
2986 context.clone(),
2987 dag_state.clone(),
2988 Arc::new(NoopBlockVerifier),
2989 );
2990 let leader_schedule = Arc::new(
2991 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2992 .with_num_commits_per_schedule(10),
2993 );
2994
2995 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2996 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2997 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2998 let mut block_receiver = signal_receivers.block_broadcast_receiver();
3000
3001 let (sender, _receiver) = unbounded_channel("consensus_output");
3002 let commit_consumer = CommitConsumer::new(sender, 0);
3003 let commit_observer = CommitObserver::new(
3004 context.clone(),
3005 commit_consumer,
3006 dag_state.clone(),
3007 store.clone(),
3008 leader_schedule.clone(),
3009 );
3010
3011 let mut core = Core::new(
3012 context.clone(),
3013 leader_schedule,
3014 transaction_consumer,
3015 block_manager,
3016 true,
3017 commit_observer,
3018 signals,
3019 key_pairs.remove(context.own_index.value()).1,
3020 dag_state.clone(),
3021 true,
3022 );
3023
3024 assert_eq!(
3026 core.last_proposed_round(),
3027 GENESIS_ROUND,
3028 "No block should have been created other than genesis"
3029 );
3030
3031 let mut builder = DagBuilder::new(context.clone());
3033 builder.layers(1..=3).build();
3034
3035 builder
3039 .layer(4)
3040 .authorities(vec![AuthorityIndex::new_for_test(1)])
3041 .equivocate(9)
3042 .build();
3043 let blocks = builder.blocks(1..=4);
3044
3045 assert!(core.add_blocks(blocks).unwrap().is_empty());
3047 core.set_last_known_proposed_round(3);
3048
3049 let block = core.try_propose(true).expect("No error").unwrap();
3050 assert_eq!(block.round(), 5);
3051 assert_eq!(block.ancestors().len(), 4);
3052
3053 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3055 .await
3056 .unwrap()
3057 .unwrap();
3058 assert_eq!(extended_block.block.round(), 5);
3059 assert_eq!(extended_block.block.author(), core.context.own_index);
3060 assert_eq!(extended_block.block.ancestors().len(), 4);
3061 assert_eq!(extended_block.excluded_ancestors.len(), 8);
3062 }
3063
3064 #[tokio::test]
3065 async fn test_core_set_subscriber_exists() {
3066 telemetry_subscribers::init_for_testing();
3067 let (context, mut key_pairs) = Context::new_for_test(4);
3068 let context = Arc::new(context);
3069 let store = Arc::new(MemStore::new());
3070 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3071
3072 let block_manager = BlockManager::new(
3073 context.clone(),
3074 dag_state.clone(),
3075 Arc::new(NoopBlockVerifier),
3076 );
3077 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3078 context.clone(),
3079 dag_state.clone(),
3080 ));
3081
3082 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3083 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3084 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3085 let _block_receiver = signal_receivers.block_broadcast_receiver();
3087
3088 let (sender, _receiver) = unbounded_channel("consensus_output");
3089 let commit_observer = CommitObserver::new(
3090 context.clone(),
3091 CommitConsumer::new(sender.clone(), 0),
3092 dag_state.clone(),
3093 store.clone(),
3094 leader_schedule.clone(),
3095 );
3096
3097 let mut core = Core::new(
3098 context.clone(),
3099 leader_schedule,
3100 transaction_consumer,
3101 block_manager,
3102 false,
3104 commit_observer,
3105 signals,
3106 key_pairs.remove(context.own_index.value()).1,
3107 dag_state.clone(),
3108 false,
3109 );
3110
3111 assert_eq!(
3113 core.last_proposed_round(),
3114 GENESIS_ROUND,
3115 "No block should have been created other than genesis"
3116 );
3117
3118 assert!(core.try_propose(true).unwrap().is_none());
3120
3121 core.set_quorum_subscribers_exists(true);
3123
3124 assert!(core.try_propose(true).unwrap().is_some());
3126 }
3127
3128 #[tokio::test]
3129 async fn test_core_set_propagation_delay_per_authority() {
3130 telemetry_subscribers::init_for_testing();
3132 let (context, mut key_pairs) = Context::new_for_test(4);
3133 let context = Arc::new(context);
3134 let store = Arc::new(MemStore::new());
3135 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3136
3137 let block_manager = BlockManager::new(
3138 context.clone(),
3139 dag_state.clone(),
3140 Arc::new(NoopBlockVerifier),
3141 );
3142 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3143 context.clone(),
3144 dag_state.clone(),
3145 ));
3146
3147 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3148 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3149 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3150 let _block_receiver = signal_receivers.block_broadcast_receiver();
3152
3153 let (sender, _receiver) = unbounded_channel("consensus_output");
3154 let commit_observer = CommitObserver::new(
3155 context.clone(),
3156 CommitConsumer::new(sender.clone(), 0),
3157 dag_state.clone(),
3158 store.clone(),
3159 leader_schedule.clone(),
3160 );
3161
3162 let mut core = Core::new(
3163 context.clone(),
3164 leader_schedule,
3165 transaction_consumer,
3166 block_manager,
3167 false,
3169 commit_observer,
3170 signals,
3171 key_pairs.remove(context.own_index.value()).1,
3172 dag_state.clone(),
3173 false,
3174 );
3175
3176 assert_eq!(
3178 core.last_proposed_round(),
3179 GENESIS_ROUND,
3180 "No block should have been created other than genesis"
3181 );
3182
3183 core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);
3185
3186 core.set_quorum_subscribers_exists(true);
3188
3189 assert!(core.try_propose(true).unwrap().is_none());
3191
3192 core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);
3194
3195 assert!(core.try_propose(true).unwrap().is_some());
3197 }
3198
3199 #[tokio::test(flavor = "current_thread", start_paused = true)]
3200 async fn test_leader_schedule_change() {
3201 telemetry_subscribers::init_for_testing();
3202 let default_params = Parameters::default();
3203
3204 let (context, _) = Context::new_for_test(4);
3205 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3207
3208 let mut last_round_blocks = Vec::new();
3211 for round in 1..=30 {
3212 let mut this_round_blocks = Vec::new();
3213
3214 sleep(default_params.min_round_delay).await;
3216
3217 for core_fixture in &mut cores {
3218 core_fixture
3222 .core
3223 .add_blocks(last_round_blocks.clone())
3224 .unwrap();
3225
3226 let new_round = receive(
3229 Duration::from_secs(1),
3230 core_fixture.signal_receivers.new_round_receiver(),
3231 )
3232 .await;
3233 assert_eq!(new_round, round);
3234
3235 let extended_block = tokio::time::timeout(
3237 Duration::from_secs(1),
3238 core_fixture.block_receiver.recv(),
3239 )
3240 .await
3241 .unwrap()
3242 .unwrap();
3243 assert_eq!(extended_block.block.round(), round);
3244 assert_eq!(
3245 extended_block.block.author(),
3246 core_fixture.core.context.own_index
3247 );
3248
3249 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3251
3252 let block = core_fixture.core.last_proposed_block();
3253
3254 assert_eq!(
3256 block.ancestors().len(),
3257 core_fixture.core.context.committee.size()
3258 );
3259 for ancestor in block.ancestors() {
3260 if block.round() > 1 {
3261 assert!(
3263 last_round_blocks
3264 .iter()
3265 .any(|block| block.reference() == *ancestor),
3266 "Reference from previous round should be added"
3267 );
3268 }
3269 }
3270 }
3271
3272 last_round_blocks = this_round_blocks;
3273 }
3274
3275 for core_fixture in cores {
3276 let last_commit = core_fixture
3278 .store
3279 .read_last_commit()
3280 .unwrap()
3281 .expect("last commit should be set");
3282 assert_eq!(last_commit.index(), 27);
3286 let all_stored_commits = core_fixture
3287 .store
3288 .scan_commits((0..=CommitIndex::MAX).into())
3289 .unwrap();
3290 assert_eq!(all_stored_commits.len(), 27);
3291 assert_eq!(
3292 core_fixture
3293 .core
3294 .leader_schedule
3295 .leader_swap_table
3296 .read()
3297 .bad_nodes
3298 .len(),
3299 1
3300 );
3301 assert_eq!(
3302 core_fixture
3303 .core
3304 .leader_schedule
3305 .leader_swap_table
3306 .read()
3307 .good_nodes
3308 .len(),
3309 1
3310 );
3311 let expected_reputation_scores =
3312 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3313 assert_eq!(
3314 core_fixture
3315 .core
3316 .leader_schedule
3317 .leader_swap_table
3318 .read()
3319 .reputation_scores,
3320 expected_reputation_scores
3321 );
3322 }
3323 }
3324
3325 #[tokio::test(flavor = "current_thread", start_paused = true)]
3327 async fn test_leader_schedule_change_with_vote_scoring() {
3328 telemetry_subscribers::init_for_testing();
3329 let default_params = Parameters::default();
3330 let (mut context, _) = Context::new_for_test(4);
3331 context
3332 .protocol_config
3333 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3334 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3336 let mut last_round_blocks = Vec::new();
3339 for round in 1..=30 {
3340 let mut this_round_blocks = Vec::new();
3341 sleep(default_params.min_round_delay).await;
3343 for core_fixture in &mut cores {
3344 core_fixture
3348 .core
3349 .add_blocks(last_round_blocks.clone())
3350 .unwrap();
3351 let new_round = receive(
3354 Duration::from_secs(1),
3355 core_fixture.signal_receivers.new_round_receiver(),
3356 )
3357 .await;
3358 assert_eq!(new_round, round);
3359 let extended_block = tokio::time::timeout(
3361 Duration::from_secs(1),
3362 core_fixture.block_receiver.recv(),
3363 )
3364 .await
3365 .unwrap()
3366 .unwrap();
3367 assert_eq!(extended_block.block.round(), round);
3368 assert_eq!(
3369 extended_block.block.author(),
3370 core_fixture.core.context.own_index
3371 );
3372
3373 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3375 let block = core_fixture.core.last_proposed_block();
3376 assert_eq!(
3378 block.ancestors().len(),
3379 core_fixture.core.context.committee.size()
3380 );
3381 for ancestor in block.ancestors() {
3382 if block.round() > 1 {
3383 assert!(
3385 last_round_blocks
3386 .iter()
3387 .any(|block| block.reference() == *ancestor),
3388 "Reference from previous round should be added"
3389 );
3390 }
3391 }
3392 }
3393 last_round_blocks = this_round_blocks;
3394 }
3395 for core_fixture in cores {
3396 let last_commit = core_fixture
3398 .store
3399 .read_last_commit()
3400 .unwrap()
3401 .expect("last commit should be set");
3402 assert_eq!(last_commit.index(), 27);
3406 let all_stored_commits = core_fixture
3407 .store
3408 .scan_commits((0..=CommitIndex::MAX).into())
3409 .unwrap();
3410 assert_eq!(all_stored_commits.len(), 27);
3411 assert_eq!(
3412 core_fixture
3413 .core
3414 .leader_schedule
3415 .leader_swap_table
3416 .read()
3417 .bad_nodes
3418 .len(),
3419 1
3420 );
3421 assert_eq!(
3422 core_fixture
3423 .core
3424 .leader_schedule
3425 .leader_swap_table
3426 .read()
3427 .good_nodes
3428 .len(),
3429 1
3430 );
3431 let expected_reputation_scores =
3432 ReputationScores::new((11..=20).into(), vec![9, 8, 8, 8]);
3433 assert_eq!(
3434 core_fixture
3435 .core
3436 .leader_schedule
3437 .leader_swap_table
3438 .read()
3439 .reputation_scores,
3440 expected_reputation_scores
3441 );
3442 }
3443 }
3444
3445 #[tokio::test]
3446 async fn test_validate_certified_commits() {
3447 telemetry_subscribers::init_for_testing();
3448
3449 let (context, _key_pairs) = Context::new_for_test(4);
3450 let context = context.with_parameters(Parameters {
3451 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3452 ..Default::default()
3453 });
3454
3455 let authority_index = AuthorityIndex::new_for_test(0);
3456 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3457 let mut core = core.core;
3458
3459 assert_eq!(
3461 core.last_proposed_round(),
3462 GENESIS_ROUND,
3463 "No block should have been created other than genesis"
3464 );
3465
3466 let mut dag_builder = DagBuilder::new(core.context.clone());
3468 dag_builder.layers(1..=12).build();
3469
3470 dag_builder.print();
3473 let blocks = dag_builder.blocks(1..=6);
3474
3475 for block in blocks {
3476 core.dag_state.write().accept_block(block);
3477 }
3478
3479 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3481
3482 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3485
3486 assert_eq!(committed_sub_dags.len(), 4);
3488
3489 println!("Case 1. Provide certified commits that are all before the last committed round.");
3491
3492 let certified_commits = sub_dags_and_commits
3494 .iter()
3495 .take(4)
3496 .map(|(_, c)| c)
3497 .cloned()
3498 .collect::<Vec<_>>();
3499 assert!(
3500 certified_commits.last().unwrap().index()
3501 <= committed_sub_dags.last().unwrap().commit_ref.index,
3502 "Highest certified commit should older than the highest committed index."
3503 );
3504
3505 let certified_commits = core.validate_certified_commits(certified_commits).unwrap();
3506
3507 assert!(certified_commits.is_empty());
3509
3510 println!("Case 2. Provide certified commits that are all after the last committed round.");
3511
3512 let certified_commits = sub_dags_and_commits
3514 .iter()
3515 .take(5)
3516 .map(|(_, c)| c.clone())
3517 .collect::<Vec<_>>();
3518
3519 let certified_commits = core
3520 .validate_certified_commits(certified_commits.clone())
3521 .unwrap();
3522
3523 assert_eq!(certified_commits.len(), 1);
3525 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3526
3527 println!(
3528 "Case 3. Provide certified commits where the first certified commit index is not the last_committed_index + 1."
3529 );
3530
3531 let certified_commits = sub_dags_and_commits
3533 .iter()
3534 .skip(5)
3535 .take(1)
3536 .map(|(_, c)| c.clone())
3537 .collect::<Vec<_>>();
3538
3539 let err = core
3540 .validate_certified_commits(certified_commits.clone())
3541 .unwrap_err();
3542 match err {
3543 ConsensusError::UnexpectedCertifiedCommitIndex {
3544 expected_commit_index: 5,
3545 commit_index: 6,
3546 } => (),
3547 _ => panic!("Unexpected error: {err:?}"),
3548 }
3549 }
3550
3551 #[tokio::test]
3552 async fn test_add_certified_commits() {
3553 telemetry_subscribers::init_for_testing();
3554
3555 let (context, _key_pairs) = Context::new_for_test(4);
3556 let context = context.with_parameters(Parameters {
3557 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3558 ..Default::default()
3559 });
3560
3561 let authority_index = AuthorityIndex::new_for_test(0);
3562 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3563 let store = core.store.clone();
3564 let mut core = core.core;
3565
3566 assert_eq!(
3568 core.last_proposed_round(),
3569 GENESIS_ROUND,
3570 "No block should have been created other than genesis"
3571 );
3572
3573 let mut dag_builder = DagBuilder::new(core.context.clone());
3575 dag_builder.layers(1..=12).build();
3576
3577 dag_builder.print();
3580 let blocks = dag_builder.blocks(1..=6);
3581
3582 for block in blocks {
3583 core.dag_state.write().accept_block(block);
3584 }
3585
3586 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3588
3589 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3592
3593 assert_eq!(committed_sub_dags.len(), 4);
3595
3596 let last_commit = store
3597 .read_last_commit()
3598 .unwrap()
3599 .expect("Last commit should be set");
3600 assert_eq!(last_commit.reference().index, 4);
3601
3602 println!("Case 1. Provide no certified commits. No commit should happen.");
3603
3604 let last_commit = store
3605 .read_last_commit()
3606 .unwrap()
3607 .expect("Last commit should be set");
3608 assert_eq!(last_commit.reference().index, 4);
3609
3610 println!(
3611 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3612 );
3613
3614 let certified_commits = sub_dags_and_commits
3617 .iter()
3618 .skip(3)
3619 .take(5)
3620 .map(|(_, c)| c.clone())
3621 .collect::<Vec<_>>();
3622
3623 let blocks = dag_builder.blocks(8..=12);
3626 for block in blocks {
3627 core.dag_state.write().accept_block(block);
3628 }
3629
3630 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3633 .expect("Should not fail");
3634
3635 let commits = store.scan_commits((6..=10).into()).unwrap();
3636
3637 assert_eq!(commits.len(), 5);
3639
3640 for i in 6..=10 {
3641 let commit = &commits[i - 6];
3642 assert_eq!(commit.reference().index, i as u32);
3643 }
3644 }
3645
3646 #[tokio::test]
3647 async fn try_commit_with_certified_commits_gced_blocks() {
3648 const GC_DEPTH: u32 = 3;
3649 telemetry_subscribers::init_for_testing();
3650
3651 let (mut context, mut key_pairs) = Context::new_for_test(5);
3652 context
3653 .protocol_config
3654 .set_consensus_gc_depth_for_testing(GC_DEPTH);
3655 let context = Arc::new(context.with_parameters(Parameters {
3658 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3659 ..Default::default()
3660 }));
3661
3662 let store = Arc::new(MemStore::new());
3663 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3664
3665 let block_manager = BlockManager::new(
3666 context.clone(),
3667 dag_state.clone(),
3668 Arc::new(NoopBlockVerifier),
3669 );
3670 let leader_schedule = Arc::new(
3671 LeaderSchedule::from_store(context.clone(), dag_state.clone())
3672 .with_num_commits_per_schedule(10),
3673 );
3674
3675 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3676 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3677 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3678 let _block_receiver = signal_receivers.block_broadcast_receiver();
3680
3681 let (sender, _receiver) = unbounded_channel("consensus_output");
3682 let commit_consumer = CommitConsumer::new(sender.clone(), 0);
3683 let commit_observer = CommitObserver::new(
3684 context.clone(),
3685 commit_consumer,
3686 dag_state.clone(),
3687 store.clone(),
3688 leader_schedule.clone(),
3689 );
3690
3691 let mut core = Core::new(
3692 context.clone(),
3693 leader_schedule,
3694 transaction_consumer,
3695 block_manager,
3696 true,
3697 commit_observer,
3698 signals,
3699 key_pairs.remove(context.own_index.value()).1,
3700 dag_state.clone(),
3701 true,
3702 );
3703
3704 assert_eq!(
3706 core.last_proposed_round(),
3707 GENESIS_ROUND,
3708 "No block should have been created other than genesis"
3709 );
3710
3711 let dag_str = "DAG {
3712 Round 0 : { 5 },
3713 Round 1 : { * },
3714 Round 2 : {
3715 A -> [-E1],
3716 B -> [-E1],
3717 C -> [-E1],
3718 D -> [-E1],
3719 },
3720 Round 3 : {
3721 A -> [*],
3722 B -> [*],
3723 C -> [*],
3724 D -> [*],
3725 },
3726 Round 4 : {
3727 A -> [*],
3728 B -> [*],
3729 C -> [*],
3730 D -> [*],
3731 },
3732 Round 5 : {
3733 A -> [*],
3734 B -> [*],
3735 C -> [*],
3736 D -> [*],
3737 E -> [A4, B4, C4, D4, E1]
3738 },
3739 Round 6 : { * },
3740 Round 7 : { * },
3741 }";
3742
3743 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3744 dag_builder.print();
3745
3746 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3748 .get_sub_dag_and_certified_commits(1..=5)
3749 .into_iter()
3750 .unzip();
3751
3752 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3756
3757 assert_eq!(committed_sub_dags.len(), 4);
3759 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3760 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3761
3762 for block in committed_sub_dag.blocks.iter() {
3764 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3765 panic!("Did not expect to commit block E1");
3766 }
3767 }
3768 }
3769 }
3770
3771 #[tokio::test(flavor = "current_thread", start_paused = true)]
3772 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3773 telemetry_subscribers::init_for_testing();
3774 let default_params = Parameters::default();
3775
3776 let (context, _) = Context::new_for_test(6);
3777
3778 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3780
3781 let mut last_round_blocks = Vec::new();
3784 for round in 1..=33 {
3785 let mut this_round_blocks = Vec::new();
3786 sleep(default_params.min_round_delay).await;
3788 for core_fixture in &mut cores {
3789 core_fixture
3793 .core
3794 .add_blocks(last_round_blocks.clone())
3795 .unwrap();
3796 let new_round = receive(
3799 Duration::from_secs(1),
3800 core_fixture.signal_receivers.new_round_receiver(),
3801 )
3802 .await;
3803 assert_eq!(new_round, round);
3804 let extended_block = tokio::time::timeout(
3806 Duration::from_secs(1),
3807 core_fixture.block_receiver.recv(),
3808 )
3809 .await
3810 .unwrap()
3811 .unwrap();
3812 assert_eq!(extended_block.block.round(), round);
3813 assert_eq!(
3814 extended_block.block.author(),
3815 core_fixture.core.context.own_index
3816 );
3817
3818 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3820 let block = core_fixture.core.last_proposed_block();
3821 assert_eq!(
3823 block.ancestors().len(),
3824 core_fixture.core.context.committee.size()
3825 );
3826 for ancestor in block.ancestors() {
3827 if block.round() > 1 {
3828 assert!(
3830 last_round_blocks
3831 .iter()
3832 .any(|block| block.reference() == *ancestor),
3833 "Reference from previous round should be added"
3834 );
3835 }
3836 }
3837 }
3838 last_round_blocks = this_round_blocks;
3839 }
3840 for core_fixture in cores {
3841 let last_commit = core_fixture
3843 .store
3844 .read_last_commit()
3845 .unwrap()
3846 .expect("last commit should be set");
3847 let expected_commit_count = 30;
3859 assert_eq!(last_commit.index(), expected_commit_count);
3865 let all_stored_commits = core_fixture
3866 .store
3867 .scan_commits((0..=CommitIndex::MAX).into())
3868 .unwrap();
3869 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3870 assert_eq!(
3871 core_fixture
3872 .core
3873 .leader_schedule
3874 .leader_swap_table
3875 .read()
3876 .bad_nodes
3877 .len(),
3878 1
3879 );
3880 assert_eq!(
3881 core_fixture
3882 .core
3883 .leader_schedule
3884 .leader_swap_table
3885 .read()
3886 .good_nodes
3887 .len(),
3888 1
3889 );
3890 let expected_reputation_scores =
3891 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3892 assert_eq!(
3893 core_fixture
3894 .core
3895 .leader_schedule
3896 .leader_swap_table
3897 .read()
3898 .reputation_scores,
3899 expected_reputation_scores
3900 );
3901 }
3902 }
3903
3904 #[tokio::test(flavor = "current_thread", start_paused = true)]
3906 async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring()
3907 {
3908 telemetry_subscribers::init_for_testing();
3909 let default_params = Parameters::default();
3910
3911 let (mut context, _) = Context::new_for_test(6);
3912 context
3913 .protocol_config
3914 .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3915
3916 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3918 let mut last_round_blocks = Vec::new();
3921 for round in 1..=63 {
3922 let mut this_round_blocks = Vec::new();
3923
3924 sleep(default_params.min_round_delay).await;
3926
3927 for core_fixture in &mut cores {
3928 core_fixture
3932 .core
3933 .add_blocks(last_round_blocks.clone())
3934 .unwrap();
3935
3936 let new_round = receive(
3939 Duration::from_secs(1),
3940 core_fixture.signal_receivers.new_round_receiver(),
3941 )
3942 .await;
3943 assert_eq!(new_round, round);
3944
3945 let extended_block = tokio::time::timeout(
3947 Duration::from_secs(1),
3948 core_fixture.block_receiver.recv(),
3949 )
3950 .await
3951 .unwrap()
3952 .unwrap();
3953 assert_eq!(extended_block.block.round(), round);
3954 assert_eq!(
3955 extended_block.block.author(),
3956 core_fixture.core.context.own_index
3957 );
3958
3959 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3961
3962 let block = core_fixture.core.last_proposed_block();
3963
3964 assert_eq!(
3966 block.ancestors().len(),
3967 core_fixture.core.context.committee.size()
3968 );
3969 for ancestor in block.ancestors() {
3970 if block.round() > 1 {
3971 assert!(
3973 last_round_blocks
3974 .iter()
3975 .any(|block| block.reference() == *ancestor),
3976 "Reference from previous round should be added"
3977 );
3978 }
3979 }
3980 }
3981
3982 last_round_blocks = this_round_blocks;
3983 }
3984
3985 for core_fixture in cores {
3986 let last_commit = core_fixture
3988 .store
3989 .read_last_commit()
3990 .unwrap()
3991 .expect("last commit should be set");
3992 let expected_commit_count = 60;
4004 assert_eq!(last_commit.index(), expected_commit_count);
4010 let all_stored_commits = core_fixture
4011 .store
4012 .scan_commits((0..=CommitIndex::MAX).into())
4013 .unwrap();
4014 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
4015 assert_eq!(
4016 core_fixture
4017 .core
4018 .leader_schedule
4019 .leader_swap_table
4020 .read()
4021 .bad_nodes
4022 .len(),
4023 1
4024 );
4025 assert_eq!(
4026 core_fixture
4027 .core
4028 .leader_schedule
4029 .leader_swap_table
4030 .read()
4031 .good_nodes
4032 .len(),
4033 1
4034 );
4035 let expected_reputation_scores =
4036 ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
4037 assert_eq!(
4038 core_fixture
4039 .core
4040 .leader_schedule
4041 .leader_swap_table
4042 .read()
4043 .reputation_scores,
4044 expected_reputation_scores
4045 );
4046 }
4047 }
4048
4049 #[tokio::test]
4050 async fn test_core_signals() {
4051 telemetry_subscribers::init_for_testing();
4052 let default_params = Parameters::default();
4053
4054 let (context, _) = Context::new_for_test(4);
4055 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4057
4058 let mut last_round_blocks = Vec::new();
4061 for round in 1..=10 {
4062 let mut this_round_blocks = Vec::new();
4063
4064 sleep(default_params.min_round_delay).await;
4066
4067 for core_fixture in &mut cores {
4068 core_fixture
4072 .core
4073 .add_blocks(last_round_blocks.clone())
4074 .unwrap();
4075
4076 let new_round = receive(
4079 Duration::from_secs(1),
4080 core_fixture.signal_receivers.new_round_receiver(),
4081 )
4082 .await;
4083 assert_eq!(new_round, round);
4084
4085 let extended_block = tokio::time::timeout(
4087 Duration::from_secs(1),
4088 core_fixture.block_receiver.recv(),
4089 )
4090 .await
4091 .unwrap()
4092 .unwrap();
4093 assert_eq!(extended_block.block.round(), round);
4094 assert_eq!(
4095 extended_block.block.author(),
4096 core_fixture.core.context.own_index
4097 );
4098
4099 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
4101
4102 let block = core_fixture.core.last_proposed_block();
4103
4104 assert_eq!(
4106 block.ancestors().len(),
4107 core_fixture.core.context.committee.size()
4108 );
4109 for ancestor in block.ancestors() {
4110 if block.round() > 1 {
4111 assert!(
4113 last_round_blocks
4114 .iter()
4115 .any(|block| block.reference() == *ancestor),
4116 "Reference from previous round should be added"
4117 );
4118 }
4119 }
4120 }
4121
4122 last_round_blocks = this_round_blocks;
4123 }
4124
4125 for core_fixture in cores {
4126 let last_commit = core_fixture
4128 .store
4129 .read_last_commit()
4130 .unwrap()
4131 .expect("last commit should be set");
4132 assert_eq!(last_commit.index(), 7);
4136 let all_stored_commits = core_fixture
4137 .store
4138 .scan_commits((0..=CommitIndex::MAX).into())
4139 .unwrap();
4140 assert_eq!(all_stored_commits.len(), 7);
4141 }
4142 }
4143
4144 #[tokio::test]
4145 async fn test_core_compress_proposal_references() {
4146 telemetry_subscribers::init_for_testing();
4147 let default_params = Parameters::default();
4148
4149 let (context, _) = Context::new_for_test(4);
4150 let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4152
4153 let mut last_round_blocks = Vec::new();
4154 let mut all_blocks = Vec::new();
4155
4156 let excluded_authority = AuthorityIndex::new_for_test(3);
4157
4158 for round in 1..=10 {
4159 let mut this_round_blocks = Vec::new();
4160
4161 for core_fixture in &mut cores {
4162 if core_fixture.core.context.own_index == excluded_authority {
4164 continue;
4165 }
4166
4167 core_fixture
4170 .core
4171 .add_blocks(last_round_blocks.clone())
4172 .unwrap();
4173 core_fixture.core.new_block(round, true).unwrap();
4174
4175 let block = core_fixture.core.last_proposed_block();
4176 assert_eq!(block.round(), round);
4177
4178 this_round_blocks.push(block.clone());
4180 }
4181
4182 last_round_blocks = this_round_blocks.clone();
4183 all_blocks.extend(this_round_blocks);
4184 }
4185
4186 let core_fixture = &mut cores[excluded_authority];
4192 sleep(default_params.min_round_delay).await;
4194 core_fixture.core.add_blocks(all_blocks).unwrap();
4196
4197 let block = core_fixture.core.last_proposed_block();
4201 assert_eq!(block.round(), 11);
4202 assert_eq!(block.ancestors().len(), 4);
4203 for block_ref in block.ancestors() {
4204 if block_ref.author == excluded_authority {
4205 assert_eq!(block_ref.round, 1);
4206 } else {
4207 assert_eq!(block_ref.round, 10);
4208 }
4209 }
4210
4211 let last_commit = core_fixture
4213 .store
4214 .read_last_commit()
4215 .unwrap()
4216 .expect("last commit should be set");
4217 assert_eq!(last_commit.index(), 6);
4221 let all_stored_commits = core_fixture
4222 .store
4223 .scan_commits((0..=CommitIndex::MAX).into())
4224 .unwrap();
4225 assert_eq!(all_stored_commits.len(), 6);
4226 }
4227
4228 #[tokio::test]
4229 async fn try_decide_certified() {
4230 telemetry_subscribers::init_for_testing();
4232
4233 let (context, _) = Context::new_for_test(4);
4234
4235 let authority_index = AuthorityIndex::new_for_test(0);
4236 let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
4237 let mut core = core.core;
4238
4239 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4240 dag_builder.layers(1..=12).build();
4241
4242 let limit = 2;
4243
4244 let blocks = dag_builder.blocks(1..=12);
4245
4246 for block in blocks {
4247 core.dag_state.write().accept_block(block);
4248 }
4249
4250 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4252 let mut certified_commits = sub_dags_and_commits
4253 .into_iter()
4254 .map(|(_, commit)| commit)
4255 .collect::<Vec<_>>();
4256
4257 let leaders = core.try_decide_certified(&mut certified_commits, limit);
4258
4259 assert_eq!(leaders.len(), 2);
4261 assert_eq!(certified_commits.len(), 2);
4262 }
4263
4264 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4265 tokio::time::timeout(timeout, receiver.changed())
4266 .await
4267 .expect("Timeout while waiting to read from receiver")
4268 .expect("Signal receive channel shouldn't be closed");
4269 *receiver.borrow_and_update()
4270 }
4271}