consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    iter, mem,
8    sync::Arc,
9    time::Duration,
10    vec,
11};
12
13use consensus_config::{AuthorityIndex, ProtocolKeyPair};
14#[cfg(test)]
15use consensus_config::{Stake, local_committee_and_keys};
16use iota_macros::fail_point;
17#[cfg(test)]
18use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
19use iota_metrics::monitored_scope;
20use itertools::Itertools as _;
21use parking_lot::RwLock;
22use tokio::{
23    sync::{broadcast, watch},
24    time::Instant,
25};
26use tracing::{debug, info, instrument, trace, warn};
27
28#[cfg(test)]
29use crate::{
30    CommitConsumer, TransactionClient, block_verifier::NoopBlockVerifier,
31    storage::mem_store::MemStore,
32};
33use crate::{
34    ancestor::{AncestorState, AncestorStateManager},
35    block::{
36        Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, GENESIS_ROUND, Round,
37        SignedBlock, Slot, VerifiedBlock,
38    },
39    block_manager::BlockManager,
40    commit::{
41        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42    },
43    commit_observer::CommitObserver,
44    context::Context,
45    dag_state::DagState,
46    error::{ConsensusError, ConsensusResult},
47    leader_schedule::LeaderSchedule,
48    round_prober::QuorumRound,
49    stake_aggregator::{QuorumThreshold, StakeAggregator},
50    transaction::TransactionConsumer,
51    universal_committer::{
52        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
53    },
54};
55
56// Maximum number of commit votes to include in a block.
57// TODO: Move to protocol config, and verify in BlockVerifier.
58const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
59
60pub(crate) struct Core {
61    context: Arc<Context>,
62    /// The consumer to use in order to pull transactions to be included for the
63    /// next proposals
64    transaction_consumer: TransactionConsumer,
65    /// The block manager which is responsible for keeping track of the DAG
66    /// dependencies when processing new blocks and accept them or suspend
67    /// if we are missing their causal history
68    block_manager: BlockManager,
69    /// Whether there is a quorum of 2f+1 subscribers waiting for new blocks
70    /// proposed by this authority. Core stops proposing new blocks when
71    /// there is not enough subscribers, because new proposed blocks will
72    /// not be sufficiently propagated to the network.
73    quorum_subscribers_exists: bool,
74    /// Estimated delay by round for propagating blocks to a quorum.
75    /// Because of the nature of TCP and block streaming, propagation delay is
76    /// expected to be 0 in most cases, even when the actual latency of
77    /// broadcasting blocks is high. When this value is higher than the
78    /// `propagation_delay_stop_proposal_threshold`, most likely this
79    /// validator cannot broadcast  blocks to the network at all. Core stops
80    /// proposing new blocks in this case.
81    propagation_delay: Round,
82
83    /// Used to make commit decisions for leader blocks in the dag.
84    committer: UniversalCommitter,
85    /// The last new round for which core has sent out a signal.
86    last_signaled_round: Round,
87    /// The blocks of the last included ancestors per authority. This vector is
88    /// basically used as a watermark in order to include in the next block
89    /// proposal only ancestors of higher rounds. By default, is initialised
90    /// with `None` values.
91    last_included_ancestors: Vec<Option<BlockRef>>,
92    /// The last decided leader returned from the universal committer. Important
93    /// to note that this does not signify that the leader has been
94    /// persisted yet as it still has to go through CommitObserver and
95    /// persist the commit in store. On recovery/restart
96    /// the last_decided_leader will be set to the last_commit leader in dag
97    /// state.
98    last_decided_leader: Slot,
99    /// The consensus leader schedule to be used to resolve the leader for a
100    /// given round.
101    leader_schedule: Arc<LeaderSchedule>,
102    /// The commit observer is responsible for observing the commits and
103    /// collecting
104    /// + sending subdags over the consensus output channel.
105    commit_observer: CommitObserver,
106    /// Sender of outgoing signals from Core.
107    signals: CoreSignals,
108    /// The keypair to be used for block signing
109    block_signer: ProtocolKeyPair,
110    /// Keeping track of state of the DAG, including blocks, commits and last
111    /// committed rounds.
112    dag_state: Arc<RwLock<DagState>>,
113    /// The last known round for which the node has proposed. Any proposal
114    /// should be for a round > of this. This is currently being used to
115    /// avoid equivocations during a node recovering from amnesia. When value is
116    /// None it means that the last block sync mechanism is enabled, but it
117    /// hasn't been initialised yet.
118    last_known_proposed_round: Option<Round>,
119    // The ancestor state manager will keep track of the quality of the authorities
120    // based on the distribution of their blocks to the network. It will use this
121    // information to decide whether to include that authority block in the next
122    // proposal or not.
123    ancestor_state_manager: AncestorStateManager,
124}
125
126impl Core {
127    pub(crate) fn new(
128        context: Arc<Context>,
129        leader_schedule: Arc<LeaderSchedule>,
130        transaction_consumer: TransactionConsumer,
131        block_manager: BlockManager,
132        subscriber_exists: bool,
133        commit_observer: CommitObserver,
134        signals: CoreSignals,
135        block_signer: ProtocolKeyPair,
136        dag_state: Arc<RwLock<DagState>>,
137        sync_last_known_own_block: bool,
138    ) -> Self {
139        let last_decided_leader = dag_state.read().last_commit_leader();
140        let committer = UniversalCommitterBuilder::new(
141            context.clone(),
142            leader_schedule.clone(),
143            dag_state.clone(),
144        )
145        // we want to keep with_number_of_leaders param to be able to enable it in a future protocol
146        // upgrade
147        .with_number_of_leaders(1)
148        .with_pipeline(true)
149        .build();
150
151        // Recover the last proposed block
152        let last_proposed_block = dag_state.read().get_last_proposed_block();
153
154        let last_signaled_round = last_proposed_block.round();
155
156        // Recover the last included ancestor rounds based on the last proposed block.
157        // That will allow to perform the next block proposal by using ancestor
158        // blocks of higher rounds and avoid re-including blocks that have been
159        // already included in the last (or earlier) block proposal.
160        // This is only strongly guaranteed for a quorum of ancestors. It is still
161        // possible to re-include a block from an authority which hadn't been
162        // added as part of the last proposal hence its latest included ancestor
163        // is not accurately captured here. This is considered a small deficiency,
164        // and it mostly matters just for this next proposal without any actual
165        // penalties in performance or block proposal.
166        let mut last_included_ancestors = vec![None; context.committee.size()];
167        for ancestor in last_proposed_block.ancestors() {
168            last_included_ancestors[ancestor.author] = Some(*ancestor);
169        }
170
171        let min_propose_round = if sync_last_known_own_block {
172            None
173        } else {
174            // if the sync is disabled then we practically don't want to impose any
175            // restriction.
176            Some(0)
177        };
178
179        let propagation_scores = leader_schedule
180            .leader_swap_table
181            .read()
182            .reputation_scores
183            .clone();
184        let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
185        ancestor_state_manager.set_propagation_scores(propagation_scores);
186
187        Self {
188            context,
189            last_signaled_round,
190            last_included_ancestors,
191            last_decided_leader,
192            leader_schedule,
193            transaction_consumer,
194            block_manager,
195            quorum_subscribers_exists: subscriber_exists,
196            propagation_delay: 0,
197            committer,
198            commit_observer,
199            signals,
200            block_signer,
201            dag_state,
202            last_known_proposed_round: min_propose_round,
203            ancestor_state_manager,
204        }
205        .recover()
206    }
207
208    fn recover(mut self) -> Self {
209        let _s = self
210            .context
211            .metrics
212            .node_metrics
213            .scope_processing_time
214            .with_label_values(&["Core::recover"])
215            .start_timer();
216        // Ensure local time is after max ancestor timestamp.
217        let ancestor_blocks = self
218            .dag_state
219            .read()
220            .get_last_cached_block_per_authority(Round::MAX);
221        let max_ancestor_timestamp = ancestor_blocks
222            .iter()
223            .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms()));
224        let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms());
225
226        if self
227            .context
228            .protocol_config
229            .consensus_median_timestamp_with_checkpoint_enforcement()
230        {
231            info!(
232                "Median based timestamp is enabled. Will not wait for {} ms while recovering ancestors from storage",
233                wait_ms
234            );
235        } else if wait_ms > 0 {
236            warn!(
237                "Waiting for {} ms while recovering ancestors from storage",
238                wait_ms
239            );
240            std::thread::sleep(Duration::from_millis(wait_ms));
241        }
242
243        // Try to commit and propose, since they may not have run after the last storage
244        // write.
245        self.try_commit(vec![]).unwrap();
246        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
247        {
248            last_proposed_block
249        } else {
250            let last_proposed_block = self.dag_state.read().get_last_proposed_block();
251            if self.should_propose() {
252                assert!(
253                    last_proposed_block.round() > GENESIS_ROUND,
254                    "At minimum a block of round higher than genesis should have been produced during recovery"
255                );
256            }
257
258            // if no new block proposed then just re-broadcast the last proposed one to
259            // ensure liveness.
260            self.signals
261                .new_block(ExtendedBlock {
262                    block: last_proposed_block.clone(),
263                    excluded_ancestors: vec![],
264                })
265                .unwrap();
266            last_proposed_block
267        };
268
269        // Try to set up leader timeout if needed.
270        // This needs to be called after try_commit() and try_propose(), which may
271        // have advanced the threshold clock round.
272        self.try_signal_new_round();
273
274        info!(
275            "Core recovery completed with last proposed block {:?}",
276            last_proposed_block
277        );
278
279        self
280    }
281
282    /// Processes the provided blocks and accepts them if possible when their
283    /// causal history exists. The method returns:
284    /// - The references of ancestors missing their block
285    #[tracing::instrument("consensus_add_blocks", skip_all)]
286    pub(crate) fn add_blocks(
287        &mut self,
288        blocks: Vec<VerifiedBlock>,
289    ) -> ConsensusResult<BTreeSet<BlockRef>> {
290        let _scope = monitored_scope("Core::add_blocks");
291        let _s = self
292            .context
293            .metrics
294            .node_metrics
295            .scope_processing_time
296            .with_label_values(&["Core::add_blocks"])
297            .start_timer();
298        self.context
299            .metrics
300            .node_metrics
301            .core_add_blocks_batch_size
302            .observe(blocks.len() as f64);
303
304        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
305
306        if !accepted_blocks.is_empty() {
307            debug!(
308                "Accepted blocks: {}",
309                accepted_blocks
310                    .iter()
311                    .map(|b| b.reference().to_string())
312                    .join(",")
313            );
314
315            // Try to commit the new blocks if possible.
316            self.try_commit(vec![])?;
317
318            // Try to propose now since there are new blocks accepted.
319            self.try_propose(false)?;
320
321            // Now set up leader timeout if needed.
322            // This needs to be called after try_commit() and try_propose(), which may
323            // have advanced the threshold clock round.
324            self.try_signal_new_round();
325        }
326
327        if !missing_block_refs.is_empty() {
328            trace!(
329                "Missing block refs: {}",
330                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
331            );
332        }
333        Ok(missing_block_refs)
334    }
335
336    /// Checks if provided block refs have been accepted. If not, missing block
337    /// refs are kept for synchronizations. Returns the references of
338    /// missing blocks among the input blocks.
339    pub(crate) fn check_block_refs(
340        &mut self,
341        block_refs: Vec<BlockRef>,
342    ) -> ConsensusResult<BTreeSet<BlockRef>> {
343        let _scope = monitored_scope("Core::check_block_refs");
344        let _s = self
345            .context
346            .metrics
347            .node_metrics
348            .scope_processing_time
349            .with_label_values(&["Core::check_block_refs"])
350            .start_timer();
351        self.context
352            .metrics
353            .node_metrics
354            .core_check_block_refs_batch_size
355            .observe(block_refs.len() as f64);
356
357        // Try to find them via the block manager
358        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
359
360        if !missing_block_refs.is_empty() {
361            trace!(
362                "Missing block refs: {}",
363                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
364            );
365        }
366        Ok(missing_block_refs)
367    }
368
369    // Adds the certified commits that have been synced via the commit syncer. We
370    // are using the commit info in order to skip running the decision
371    // rule and immediately commit the corresponding leaders and sub dags. Pay
372    // attention that no block acceptance is happening here, but rather
373    // internally in the `try_commit` method which ensures that everytime only the
374    // blocks corresponding to the certified commits that are about to
375    // be committed are accepted.
376    #[tracing::instrument(skip_all)]
377    pub(crate) fn add_certified_commits(
378        &mut self,
379        certified_commits: CertifiedCommits,
380    ) -> ConsensusResult<BTreeSet<BlockRef>> {
381        let _scope = monitored_scope("Core::add_certified_commits");
382
383        // We want to enable the commit process logic when GC is enabled.
384        if self.dag_state.read().gc_enabled() {
385            let votes = certified_commits.votes().to_vec();
386            let commits = self
387                .validate_certified_commits(certified_commits.commits().to_vec())
388                .expect("Certified commits validation failed");
389
390            // Accept the certified commit votes. This is optimistically done to increase
391            // the chances of having votes available when this node will need to
392            // sync commits to other nodes.
393            self.block_manager.try_accept_blocks(votes);
394
395            // Try to commit the new blocks. Take into account the trusted commit that has
396            // been provided.
397            self.try_commit(commits)?;
398
399            // Try to propose now since there are new blocks accepted.
400            self.try_propose(false)?;
401
402            // Now set up leader timeout if needed.
403            // This needs to be called after try_commit() and try_propose(), which may
404            // have advanced the threshold clock round.
405            self.try_signal_new_round();
406
407            return Ok(BTreeSet::new());
408        }
409
410        // If GC is not enabled then process blocks as usual.
411        let blocks = certified_commits
412            .commits()
413            .iter()
414            .flat_map(|commit| commit.blocks())
415            .cloned()
416            .collect::<Vec<_>>();
417
418        self.add_blocks(blocks)
419    }
420
421    /// If needed, signals a new clock round and sets up leader timeout.
422    fn try_signal_new_round(&mut self) {
423        // Signal only when the threshold clock round is more advanced than the last
424        // signaled round.
425        //
426        // NOTE: a signal is still sent even when a block has been proposed at the new
427        // round. We can consider changing this in the future.
428        let new_clock_round = self.dag_state.read().threshold_clock_round();
429        if new_clock_round <= self.last_signaled_round {
430            return;
431        }
432        // Then send a signal to set up leader timeout.
433        tracing::trace!(round = ?new_clock_round, "new_consensus_round_sent");
434        self.signals.new_round(new_clock_round);
435        self.last_signaled_round = new_clock_round;
436
437        // Report the threshold clock round
438        self.context
439            .metrics
440            .node_metrics
441            .threshold_clock_round
442            .set(new_clock_round as i64);
443    }
444
445    /// Creating a new block for the dictated round. This is used when a leader
446    /// timeout occurs, either when the min timeout expires or max. When
447    /// `force = true` , then any checks like previous round
448    /// leader existence will get skipped.
449    pub(crate) fn new_block(
450        &mut self,
451        round: Round,
452        force: bool,
453    ) -> ConsensusResult<Option<VerifiedBlock>> {
454        let _scope = monitored_scope("Core::new_block");
455        if self.last_proposed_round() < round {
456            self.context
457                .metrics
458                .node_metrics
459                .leader_timeout_total
460                .with_label_values(&[&format!("{force}")])
461                .inc();
462            let result = self.try_propose(force);
463            // The threshold clock round may have advanced, so a signal needs to be sent.
464            self.try_signal_new_round();
465            return result;
466        }
467        Ok(None)
468    }
469
470    /// Keeps only the certified commits that have a commit index > last commit
471    /// index. It also ensures that the first commit in the list is the next one
472    /// in line, otherwise it panics.
473    fn validate_certified_commits(
474        &mut self,
475        commits: Vec<CertifiedCommit>,
476    ) -> ConsensusResult<Vec<CertifiedCommit>> {
477        // Filter out the commits that have been already locally committed and keep only
478        // anything that is above the last committed index.
479        let last_commit_index = self.dag_state.read().last_commit_index();
480        let commits = commits
481            .iter()
482            .filter(|commit| {
483                if commit.index() > last_commit_index {
484                    true
485                } else {
486                    tracing::debug!(
487                        "Skip commit for index {} as it is already committed with last commit index {}",
488                        commit.index(),
489                        last_commit_index
490                    );
491                    false
492                }
493            })
494            .cloned()
495            .collect::<Vec<_>>();
496
497        // Make sure that the first commit we find is the next one in line and there is
498        // no gap.
499        if let Some(commit) = commits.first() {
500            if commit.index() != last_commit_index + 1 {
501                return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
502                    expected_commit_index: last_commit_index + 1,
503                    commit_index: commit.index(),
504                });
505            }
506        }
507
508        Ok(commits)
509    }
510
511    // Attempts to create a new block, persist and propose it to all peers.
512    // When force is true, ignore if leader from the last round exists among
513    // ancestors and if the minimum round delay has passed.
514    #[instrument(level = "trace", skip_all)]
515    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
516        if !self.should_propose() {
517            return Ok(None);
518        }
519        if let Some(extended_block) = self.try_new_block(force) {
520            self.signals.new_block(extended_block.clone())?;
521
522            fail_point!("consensus-after-propose");
523
524            // The new block may help commit.
525            self.try_commit(vec![])?;
526            return Ok(Some(extended_block.block));
527        }
528        Ok(None)
529    }
530
531    /// Attempts to propose a new block for the next round. If a block has
532    /// already proposed for latest or earlier round, then no block is
533    /// created and None is returned.
534    #[instrument(level = "trace", skip_all)]
535    fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
536        let _s = self
537            .context
538            .metrics
539            .node_metrics
540            .scope_processing_time
541            .with_label_values(&["Core::try_new_block"])
542            .start_timer();
543
544        // Ensure the new block has a higher round than the last proposed block.
545        let clock_round = {
546            let dag_state = self.dag_state.read();
547            let clock_round = dag_state.threshold_clock_round();
548            if clock_round <= dag_state.get_last_proposed_block().round() {
549                return None;
550            }
551            clock_round
552        };
553
554        // There must be a quorum of blocks from the previous round.
555        let quorum_round = clock_round.saturating_sub(1);
556
557        // Create a new block either because we want to "forcefully" propose a block due
558        // to a leader timeout, or because we are actually ready to produce the
559        // block (leader exists and min delay has passed).
560        if !force {
561            if !self.leaders_exist(quorum_round) {
562                return None;
563            }
564
565            if Duration::from_millis(
566                self.context
567                    .clock
568                    .timestamp_utc_ms()
569                    .saturating_sub(self.last_proposed_timestamp_ms()),
570            ) < self.context.parameters.min_round_delay
571            {
572                return None;
573            }
574        }
575
576        // Determine the ancestors to be included in proposal.
577        // Smart ancestor selection requires distributed scoring to be enabled.
578        let (ancestors, excluded_ancestors) = if self
579            .context
580            .protocol_config
581            .consensus_distributed_vote_scoring_strategy()
582            && self
583                .context
584                .protocol_config
585                .consensus_smart_ancestor_selection()
586        {
587            let (ancestors, excluded_and_equivocating_ancestors) =
588                self.smart_ancestors_to_propose(clock_round, !force);
589
590            // If we did not find enough good ancestors to propose, continue to wait before
591            // proposing.
592            if ancestors.is_empty() {
593                assert!(
594                    !force,
595                    "Ancestors should have been returned if force is true!"
596                );
597                return None;
598            }
599
600            let excluded_ancestors_limit = self.context.committee.size() * 2;
601            if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
602                debug!(
603                    "Dropping {} excluded ancestor(s) during proposal due to size limit",
604                    excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
605                );
606            }
607            let excluded_ancestors = excluded_and_equivocating_ancestors
608                .into_iter()
609                .take(excluded_ancestors_limit)
610                .collect();
611
612            (ancestors, excluded_ancestors)
613        } else {
614            (self.ancestors_to_propose(clock_round), vec![])
615        };
616
617        // Update the last included ancestor block refs
618        for ancestor in &ancestors {
619            self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
620        }
621
622        let leader_authority = &self
623            .context
624            .committee
625            .authority(self.first_leader(quorum_round))
626            .hostname;
627        self.context
628            .metrics
629            .node_metrics
630            .block_proposal_leader_wait_ms
631            .with_label_values(&[leader_authority])
632            .inc_by(
633                Instant::now()
634                    .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
635                    .as_millis() as u64,
636            );
637        self.context
638            .metrics
639            .node_metrics
640            .block_proposal_leader_wait_count
641            .with_label_values(&[leader_authority])
642            .inc();
643
644        self.context
645            .metrics
646            .node_metrics
647            .proposed_block_ancestors
648            .observe(ancestors.len() as f64);
649        for ancestor in &ancestors {
650            let authority = &self.context.committee.authority(ancestor.author()).hostname;
651            self.context
652                .metrics
653                .node_metrics
654                .proposed_block_ancestors_depth
655                .with_label_values(&[authority])
656                .observe(clock_round.saturating_sub(ancestor.round()).into());
657        }
658
659        let now = self.context.clock.timestamp_utc_ms();
660        ancestors.iter().for_each(|block| {
661            if self.context.protocol_config.consensus_median_timestamp_with_checkpoint_enforcement() {
662                if block.timestamp_ms() > now {
663                    trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
664                    let authority = &self.context.committee.authority(block.author()).hostname;
665                    self.context
666                        .metrics
667                        .node_metrics
668                        .proposed_block_ancestors_timestamp_drift_ms
669                        .with_label_values(&[authority])
670                        .inc_by(block.timestamp_ms().saturating_sub(now));
671                }
672            } else {
673                // Ensure ancestor timestamps are not more advanced than the current time.
674                // Also catch the issue if system's clock go backwards.
675                assert!(
676                    block.timestamp_ms() <= now,
677                    "Violation: ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.",
678                    block, block.timestamp_ms(), clock_round
679                );
680            }
681        });
682
683        // Consume the next transactions to be included. Do not drop the guards yet as
684        // this would acknowledge the inclusion of transactions. Just let this
685        // be done in the end of the method.
686        let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
687        self.context
688            .metrics
689            .node_metrics
690            .proposed_block_transactions
691            .observe(transactions.len() as f64);
692
693        // Consume the commit votes to be included.
694        let commit_votes = self
695            .dag_state
696            .write()
697            .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
698
699        // Create the block and insert to storage.
700        let block = Block::V1(BlockV1::new(
701            self.context.committee.epoch(),
702            clock_round,
703            self.context.own_index,
704            now,
705            ancestors.iter().map(|b| b.reference()).collect(),
706            transactions,
707            commit_votes,
708            vec![],
709        ));
710        let signed_block =
711            SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
712        let serialized = signed_block
713            .serialize()
714            .expect("Block serialization failed.");
715        self.context
716            .metrics
717            .node_metrics
718            .proposed_block_size
719            .observe(serialized.len() as f64);
720        // Own blocks are assumed to be valid.
721        let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
722
723        // Record the interval from last proposal, before accepting the proposed block.
724        let last_proposed_block = self.last_proposed_block();
725        if last_proposed_block.round() > 0 {
726            self.context
727                .metrics
728                .node_metrics
729                .block_proposal_interval
730                .observe(
731                    Duration::from_millis(
732                        verified_block
733                            .timestamp_ms()
734                            .saturating_sub(last_proposed_block.timestamp_ms()),
735                    )
736                    .as_secs_f64(),
737                );
738        }
739
740        // Accept the block into BlockManager and DagState.
741        let (accepted_blocks, missing) = self
742            .block_manager
743            .try_accept_blocks(vec![verified_block.clone()]);
744        assert_eq!(accepted_blocks.len(), 1);
745        assert!(missing.is_empty());
746
747        // Ensure the new block and its ancestors are persisted, before broadcasting it.
748        self.dag_state.write().flush();
749
750        // Now acknowledge the transactions for their inclusion to block
751        ack_transactions(verified_block.reference());
752
753        info!("Created block {verified_block:?} for round {clock_round}");
754
755        self.context
756            .metrics
757            .node_metrics
758            .proposed_blocks
759            .with_label_values(&[&force.to_string()])
760            .inc();
761
762        Some(ExtendedBlock {
763            block: verified_block,
764            excluded_ancestors,
765        })
766    }
767
768    /// Runs commit rule to attempt to commit additional blocks from the DAG. If
769    /// any `certified_commits` are provided, then it will attempt to commit
770    /// those first before trying to commit any further leaders.
771    #[instrument(level = "trace", skip_all)]
772    fn try_commit(
773        &mut self,
774        mut certified_commits: Vec<CertifiedCommit>,
775    ) -> ConsensusResult<Vec<CommittedSubDag>> {
776        let _s = self
777            .context
778            .metrics
779            .node_metrics
780            .scope_processing_time
781            .with_label_values(&["Core::try_commit"])
782            .start_timer();
783
784        let mut certified_commits_map = BTreeMap::new();
785        for c in &certified_commits {
786            certified_commits_map.insert(c.index(), c.reference());
787        }
788
789        if !certified_commits.is_empty() {
790            info!(
791                "Will try to commit synced commits first : {:?}",
792                certified_commits
793                    .iter()
794                    .map(|c| (c.index(), c.leader()))
795                    .collect::<Vec<_>>()
796            );
797        }
798
799        let mut committed_sub_dags = Vec::new();
800        // TODO: Add optimization to abort early without quorum for a round.
801        loop {
802            // LeaderSchedule has a limit to how many sequenced leaders can be committed
803            // before a change is triggered. Calling into leader schedule will get you
804            // how many commits till next leader change. We will loop back and recalculate
805            // any discarded leaders with the new schedule.
806            let mut commits_until_update = self
807                .leader_schedule
808                .commits_until_leader_schedule_update(self.dag_state.clone());
809
810            if commits_until_update == 0 {
811                let last_commit_index = self.dag_state.read().last_commit_index();
812
813                tracing::info!(
814                    "Leader schedule change triggered at commit index {last_commit_index}"
815                );
816                if self
817                    .context
818                    .protocol_config
819                    .consensus_distributed_vote_scoring_strategy()
820                {
821                    self.leader_schedule
822                        .update_leader_schedule_v2(&self.dag_state);
823
824                    let propagation_scores = self
825                        .leader_schedule
826                        .leader_swap_table
827                        .read()
828                        .reputation_scores
829                        .clone();
830                    self.ancestor_state_manager
831                        .set_propagation_scores(propagation_scores);
832                } else {
833                    self.leader_schedule
834                        .update_leader_schedule_v1(&self.dag_state);
835                }
836                commits_until_update = self
837                    .leader_schedule
838                    .commits_until_leader_schedule_update(self.dag_state.clone());
839
840                fail_point!("consensus-after-leader-schedule-change");
841            }
842            assert!(commits_until_update > 0);
843
844            // Always try to process the synced commits first. If there are certified
845            // commits to process then the decided leaders and the commits will be returned.
846            let (mut decided_leaders, decided_certified_commits): (
847                Vec<DecidedLeader>,
848                Vec<CertifiedCommit>,
849            ) = self
850                .try_decide_certified(&mut certified_commits, commits_until_update)
851                .into_iter()
852                .unzip();
853
854            // Only accept blocks for the certified commits that we are certain to sequence.
855            // This ensures that only blocks corresponding to committed certified commits
856            // are flushed to disk. Blocks from non-committed certified commits
857            // will not be flushed, preventing issues during crash-recovery.
858            // This avoids scenarios where accepting and flushing blocks of non-committed
859            // certified commits could lead to premature commit rule execution.
860            // Due to GC, this could cause a panic if the commit rule tries to access
861            // missing causal history from blocks of certified commits.
862            let blocks = decided_certified_commits
863                .iter()
864                .flat_map(|c| c.blocks())
865                .cloned()
866                .collect::<Vec<_>>();
867            self.block_manager.try_accept_committed_blocks(blocks);
868
869            // If the certified `decided_leaders` is empty then try to run the decision
870            // rule.
871            if decided_leaders.is_empty() {
872                // TODO: limit commits by commits_until_update, which may be needed when leader
873                // schedule length is reduced.
874                decided_leaders = self.committer.try_decide(self.last_decided_leader);
875
876                // Truncate the decided leaders to fit the commit schedule limit.
877                if decided_leaders.len() >= commits_until_update {
878                    let _ = decided_leaders.split_off(commits_until_update);
879                }
880            }
881
882            // If the decided leaders list is empty then just break the loop.
883            let Some(last_decided) = decided_leaders.last().cloned() else {
884                break;
885            };
886
887            self.last_decided_leader = last_decided.slot();
888
889            let sequenced_leaders = decided_leaders
890                .into_iter()
891                .filter_map(|leader| leader.into_committed_block())
892                .collect::<Vec<_>>();
893
894            tracing::debug!(
895                "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
896                sequenced_leaders.len()
897            );
898
899            self.context
900                .metrics
901                .node_metrics
902                .last_decided_leader_round
903                .set(self.last_decided_leader.round as i64);
904
905            // It's possible to reach this point as the decided leaders might all of them be
906            // "Skip" decisions. In this case there is no leader to commit and
907            // we should break the loop.
908            if sequenced_leaders.is_empty() {
909                break;
910            }
911
912            tracing::info!(
913                "Committing {} leaders: {}",
914                sequenced_leaders.len(),
915                sequenced_leaders
916                    .iter()
917                    .map(|b| b.reference().to_string())
918                    .join(",")
919            );
920
921            // TODO: refcount subdags
922            let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
923            if self
924                .context
925                .protocol_config
926                .consensus_distributed_vote_scoring_strategy()
927            {
928                self.dag_state.write().add_scoring_subdags(subdags.clone());
929            } else {
930                // TODO: Remove when DistributedVoteScoring is enabled.
931                self.dag_state
932                    .write()
933                    .add_unscored_committed_subdags(subdags.clone());
934            }
935
936            // Try to unsuspend blocks if gc_round has advanced.
937            self.block_manager
938                .try_unsuspend_blocks_for_latest_gc_round();
939            committed_sub_dags.extend(subdags);
940
941            fail_point!("consensus-after-handle-commit");
942        }
943
944        // Sanity check: for commits that have been linearized using the certified
945        // commits, ensure that the same sub dag has been committed.
946        for sub_dag in &committed_sub_dags {
947            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
948                assert_eq!(
949                    commit_ref, sub_dag.commit_ref,
950                    "Certified commit has different reference than the committed sub dag"
951                );
952            }
953        }
954
955        // Notify about our own committed blocks
956        let committed_block_refs = committed_sub_dags
957            .iter()
958            .flat_map(|sub_dag| sub_dag.blocks.iter())
959            .filter_map(|block| {
960                (block.author() == self.context.own_index).then_some(block.reference())
961            })
962            .collect::<Vec<_>>();
963        self.transaction_consumer
964            .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
965
966        Ok(committed_sub_dags)
967    }
968
969    pub(crate) fn get_missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
970        let _scope = monitored_scope("Core::get_missing_blocks");
971        self.block_manager.missing_blocks()
972    }
973
974    /// Sets if there is 2f+1 subscriptions to the block stream.
975    pub(crate) fn set_quorum_subscribers_exists(&mut self, exists: bool) {
976        info!("A quorum of block subscribers exists: {exists}");
977        self.quorum_subscribers_exists = exists;
978    }
979
980    /// Sets the delay by round for propagating blocks to a quorum and the
981    /// received & accepted quorum rounds per authority for ancestor state
982    /// manager.
983    pub(crate) fn set_propagation_delay_and_quorum_rounds(
984        &mut self,
985        delay: Round,
986        received_quorum_rounds: Vec<QuorumRound>,
987        accepted_quorum_rounds: Vec<QuorumRound>,
988    ) {
989        info!(
990            "Received quorum round per authority in ancestor state manager set to: {}",
991            self.context
992                .committee
993                .authorities()
994                .zip(received_quorum_rounds.iter())
995                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
996                .join(", ")
997        );
998        info!(
999            "Accepted quorum round per authority in ancestor state manager set to: {}",
1000            self.context
1001                .committee
1002                .authorities()
1003                .zip(accepted_quorum_rounds.iter())
1004                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
1005                .join(", ")
1006        );
1007        self.ancestor_state_manager
1008            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
1009        info!("Propagation round delay set to: {delay}");
1010        self.propagation_delay = delay;
1011    }
1012
1013    /// Sets the min propose round for the proposer allowing to propose blocks
1014    /// only for round numbers `> last_known_proposed_round`. At the moment
1015    /// is allowed to call the method only once leading to a panic
1016    /// if attempt to do multiple times.
1017    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
1018        if self.last_known_proposed_round.is_some() {
1019            panic!(
1020                "Should not attempt to set the last known proposed round if that has been already set"
1021            );
1022        }
1023        self.last_known_proposed_round = Some(round);
1024        info!("Last known proposed round set to {round}");
1025    }
1026
1027    /// Whether the core should propose new blocks.
1028    pub(crate) fn should_propose(&self) -> bool {
1029        let clock_round = self.dag_state.read().threshold_clock_round();
1030        let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
1031
1032        if !self.quorum_subscribers_exists {
1033            debug!("Skip proposing for round {clock_round}, don't have a quorum of subscribers.");
1034            core_skipped_proposals
1035                .with_label_values(&["no_quorum_subscriber"])
1036                .inc();
1037            return false;
1038        }
1039
1040        if self.propagation_delay
1041            > self
1042                .context
1043                .parameters
1044                .propagation_delay_stop_proposal_threshold
1045        {
1046            debug!(
1047                "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
1048                self.propagation_delay,
1049                self.context
1050                    .parameters
1051                    .propagation_delay_stop_proposal_threshold
1052            );
1053            core_skipped_proposals
1054                .with_label_values(&["high_propagation_delay"])
1055                .inc();
1056            return false;
1057        }
1058
1059        let Some(last_known_proposed_round) = self.last_known_proposed_round else {
1060            debug!(
1061                "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
1062            );
1063            core_skipped_proposals
1064                .with_label_values(&["no_last_known_proposed_round"])
1065                .inc();
1066            return false;
1067        };
1068        if clock_round <= last_known_proposed_round {
1069            debug!(
1070                "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
1071            );
1072            core_skipped_proposals
1073                .with_label_values(&["higher_last_known_proposed_round"])
1074                .inc();
1075            return false;
1076        }
1077
1078        true
1079    }
1080
1081    // Try to decide which of the certified commits will have to be committed next
1082    // respecting the `limit`. If provided `limit` is zero, it will panic.
1083    // The function returns the list of decided leaders and updates in place the
1084    // remaining certified commits. If empty vector is returned, it means that
1085    // there are no certified commits to be committed as `certified_commits` is
1086    // either empty or all of the certified commits are already committed.
1087    #[tracing::instrument(skip_all)]
1088    fn try_decide_certified(
1089        &mut self,
1090        certified_commits: &mut Vec<CertifiedCommit>,
1091        limit: usize,
1092    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1093        // If GC is disabled then should not run any of this logic.
1094        if !self.dag_state.read().gc_enabled() {
1095            return Vec::new();
1096        }
1097
1098        assert!(limit > 0, "limit should be greater than 0");
1099
1100        let to_commit = if certified_commits.len() >= limit {
1101            // We keep only the number of leaders as dictated by the `limit`
1102            certified_commits.drain(..limit).collect::<Vec<_>>()
1103        } else {
1104            // Otherwise just take all of them and leave the `synced_commits` empty.
1105            mem::take(certified_commits)
1106        };
1107
1108        tracing::debug!(
1109            "Decided {} certified leaders: {}",
1110            to_commit.len(),
1111            to_commit.iter().map(|c| c.leader().to_string()).join(",")
1112        );
1113
1114        let sequenced_leaders = to_commit
1115            .into_iter()
1116            .map(|commit| {
1117                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1118                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1119                let leader = DecidedLeader::Commit(leader.clone());
1120                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1121                (leader, commit)
1122            })
1123            .collect::<Vec<_>>();
1124
1125        sequenced_leaders
1126    }
1127
1128    /// Retrieves the next ancestors to propose to form a block at `clock_round`
1129    /// round.
1130    fn ancestors_to_propose(&mut self, clock_round: Round) -> Vec<VerifiedBlock> {
1131        // Now take the ancestors before the clock_round (excluded) for each authority.
1132        let (ancestors, gc_enabled, gc_round) = {
1133            let dag_state = self.dag_state.read();
1134            (
1135                dag_state.get_last_cached_block_per_authority(clock_round),
1136                dag_state.gc_enabled(),
1137                dag_state.gc_round(),
1138            )
1139        };
1140
1141        assert_eq!(
1142            ancestors.len(),
1143            self.context.committee.size(),
1144            "Fatal error, number of returned ancestors don't match committee size."
1145        );
1146
1147        // Propose only ancestors of higher rounds than what has already been proposed.
1148        // And always include own last proposed block first among ancestors.
1149        let (last_proposed_block, _) = ancestors[self.context.own_index].clone();
1150        assert_eq!(last_proposed_block.author(), self.context.own_index);
1151        let ancestors = iter::once(last_proposed_block)
1152            .chain(
1153                ancestors
1154                    .into_iter()
1155                    .filter(|(block, _)| block.author() != self.context.own_index)
1156                    .filter(|(block, _)| {
1157                        if gc_enabled && gc_round > GENESIS_ROUND {
1158                            return block.round() > gc_round;
1159                        }
1160                        true
1161                    })
1162                    .flat_map(|(block, _)| {
1163                        if let Some(last_block_ref) = self.last_included_ancestors[block.author()] {
1164                            return (last_block_ref.round < block.round()).then_some(block);
1165                        }
1166                        Some(block)
1167                    }),
1168            )
1169            .collect::<Vec<_>>();
1170
1171        // TODO: this is for temporary sanity check - we might want to remove later on
1172        let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1173        for ancestor in ancestors
1174            .iter()
1175            .filter(|block| block.round() == clock_round - 1)
1176        {
1177            quorum.add(ancestor.author(), &self.context.committee);
1178        }
1179        assert!(
1180            quorum.reached_threshold(&self.context.committee),
1181            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1182        );
1183
1184        ancestors
1185    }
1186
1187    /// Retrieves the next ancestors to propose to form a block at `clock_round`
1188    /// round. If smart selection is enabled then this will try to select
1189    /// the best ancestors based on the propagation scores of the
1190    /// authorities.
1191    fn smart_ancestors_to_propose(
1192        &mut self,
1193        clock_round: Round,
1194        smart_select: bool,
1195    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1196        let node_metrics = &self.context.metrics.node_metrics;
1197        let _s = node_metrics
1198            .scope_processing_time
1199            .with_label_values(&["Core::smart_ancestors_to_propose"])
1200            .start_timer();
1201
1202        // Now take the ancestors before the clock_round (excluded) for each authority.
1203        let all_ancestors = self
1204            .dag_state
1205            .read()
1206            .get_last_cached_block_per_authority(clock_round);
1207
1208        assert_eq!(
1209            all_ancestors.len(),
1210            self.context.committee.size(),
1211            "Fatal error, number of returned ancestors don't match committee size."
1212        );
1213
1214        // Ensure ancestor state is up to date before selecting for proposal.
1215        self.ancestor_state_manager.update_all_ancestors_state();
1216        let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1217
1218        let quorum_round = clock_round.saturating_sub(1);
1219
1220        let mut score_and_pending_excluded_ancestors = Vec::new();
1221        let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1222
1223        // Propose only ancestors of higher rounds than what has already been proposed.
1224        // And always include own last proposed block first among ancestors.
1225        // Start by only including the high scoring ancestors. Low scoring ancestors
1226        // will be included in a second pass below.
1227        let included_ancestors = iter::once(self.last_proposed_block().clone())
1228            .chain(
1229                all_ancestors
1230                    .into_iter()
1231                    .flat_map(|(ancestor, equivocating_ancestors)| {
1232                        if ancestor.author() == self.context.own_index {
1233                            return None;
1234                        }
1235                        if let Some(last_block_ref) =
1236                            self.last_included_ancestors[ancestor.author()]
1237                        {
1238                            if last_block_ref.round >= ancestor.round() {
1239                                return None;
1240                            }
1241                        }
1242
1243                        // We will never include equivocating ancestors so add them immediately
1244                        excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1245
1246                        let ancestor_state = ancestor_state_map[ancestor.author()];
1247                        match ancestor_state {
1248                            AncestorState::Include => {
1249                                trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1250                            }
1251                            AncestorState::Exclude(score) => {
1252                                trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1253                                score_and_pending_excluded_ancestors.push((score, ancestor));
1254                                return None;
1255                            }
1256                        }
1257
1258                        Some(ancestor)
1259                    }),
1260            )
1261            .collect::<Vec<_>>();
1262
1263        let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1264
1265        // Check total stake of high scoring parent round ancestors
1266        for ancestor in included_ancestors
1267            .iter()
1268            .filter(|a| a.round() == quorum_round)
1269        {
1270            parent_round_quorum.add(ancestor.author(), &self.context.committee);
1271        }
1272
1273        if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1274            node_metrics.smart_selection_wait.inc();
1275            debug!(
1276                "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1277                parent_round_quorum.stake()
1278            );
1279            return (vec![], BTreeSet::new());
1280        }
1281
1282        // Sort scores descending so we can include the best of the pending excluded
1283        // ancestors first until we reach the threshold.
1284        score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1285
1286        let mut ancestors_to_propose = included_ancestors;
1287        let mut excluded_ancestors = Vec::new();
1288        for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1289            let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1290            if !parent_round_quorum.reached_threshold(&self.context.committee)
1291                && ancestor.round() == quorum_round
1292            {
1293                debug!(
1294                    "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1295                );
1296                parent_round_quorum.add(ancestor.author(), &self.context.committee);
1297                ancestors_to_propose.push(ancestor);
1298                node_metrics
1299                    .included_excluded_proposal_ancestors_count_by_authority
1300                    .with_label_values(&[block_hostname.as_str(), "timeout"])
1301                    .inc();
1302            } else {
1303                excluded_ancestors.push((score, ancestor));
1304            }
1305        }
1306
1307        // Iterate through excluded ancestors and include the ancestor or the ancestor's
1308        // ancestor that has been accepted by a quorum of the network. If the
1309        // original ancestor itself is not included then it will be part of
1310        // excluded ancestors that are not included in the block but will still
1311        // be broadcasted to peers.
1312        for (score, ancestor) in excluded_ancestors.iter() {
1313            let excluded_author = ancestor.author();
1314            let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1315            // A quorum of validators reported to have accepted blocks from the
1316            // excluded_author up to the low quorum round.
1317            let mut accepted_low_quorum_round = self
1318                .ancestor_state_manager
1319                .accepted_quorum_round_per_authority[excluded_author]
1320                .0;
1321            // If the accepted quorum round of this ancestor is greater than or equal
1322            // to the clock round then we want to make sure to set it to clock_round - 1
1323            // as that is the max round the new block can include as an ancestor.
1324            accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1325
1326            let last_included_round = self.last_included_ancestors[excluded_author]
1327                .map(|block_ref| block_ref.round)
1328                .unwrap_or(GENESIS_ROUND);
1329            if ancestor.round() <= last_included_round {
1330                // This should have already been filtered out when filtering all_ancestors.
1331                // Still, ensure previously included ancestors are filtered out.
1332                continue;
1333            }
1334
1335            if last_included_round >= accepted_low_quorum_round {
1336                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1337                trace!(
1338                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1339                    ancestor.reference()
1340                );
1341                node_metrics
1342                    .excluded_proposal_ancestors_count_by_authority
1343                    .with_label_values(&[block_hostname])
1344                    .inc();
1345                continue;
1346            }
1347
1348            let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1349                // Include the ancestor block as it has been seen & accepted by a strong quorum.
1350                ancestor.clone()
1351            } else {
1352                // Exclude this ancestor since it hasn't been accepted by a strong quorum
1353                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1354                trace!(
1355                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1356                    ancestor.reference(),
1357                    ancestor.round()
1358                );
1359                node_metrics
1360                    .excluded_proposal_ancestors_count_by_authority
1361                    .with_label_values(&[block_hostname])
1362                    .inc();
1363
1364                // Look for an earlier block in the ancestor chain that we can include as there
1365                // is a gap between the last included round and the accepted low quorum round.
1366                //
1367                // Note: Only cached blocks need to be propagated. Committed and GC'ed blocks
1368                // do not need to be propagated.
1369                match self.dag_state.read().get_last_cached_block_in_range(
1370                    excluded_author,
1371                    last_included_round + 1,
1372                    accepted_low_quorum_round + 1,
1373                ) {
1374                    Some(earlier_ancestor) => {
1375                        // Found an earlier block that has been propagated well - include it instead
1376                        earlier_ancestor
1377                    }
1378                    None => {
1379                        // No suitable earlier block found
1380                        continue;
1381                    }
1382                }
1383            };
1384            self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1385            ancestors_to_propose.push(ancestor.clone());
1386            trace!(
1387                "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1388                ancestor.reference()
1389            );
1390            node_metrics
1391                .included_excluded_proposal_ancestors_count_by_authority
1392                .with_label_values(&[block_hostname.as_str(), "quorum"])
1393                .inc();
1394        }
1395
1396        assert!(
1397            parent_round_quorum.reached_threshold(&self.context.committee),
1398            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1399        );
1400
1401        info!(
1402            "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1403            ancestors_to_propose.len(),
1404            excluded_and_equivocating_ancestors.len()
1405        );
1406
1407        (ancestors_to_propose, excluded_and_equivocating_ancestors)
1408    }
1409
1410    /// Checks whether all the leaders of the round exist.
1411    /// TODO: we can leverage some additional signal here in order to more
1412    /// cleverly manipulate later the leader timeout Ex if we already have
1413    /// one leader - the first in order - we might don't want to wait as much.
1414    fn leaders_exist(&self, round: Round) -> bool {
1415        let dag_state = self.dag_state.read();
1416        for leader in self.leaders(round) {
1417            // Search for all the leaders. If at least one is not found, then return false.
1418            // A linear search should be fine here as the set of elements is not expected to
1419            // be small enough and more sophisticated data structures might not
1420            // give us much here.
1421            if !dag_state.contains_cached_block_at_slot(leader) {
1422                return false;
1423            }
1424        }
1425
1426        true
1427    }
1428
1429    /// Returns the leaders of the provided round.
1430    fn leaders(&self, round: Round) -> Vec<Slot> {
1431        self.committer
1432            .get_leaders(round)
1433            .into_iter()
1434            .map(|authority_index| Slot::new(round, authority_index))
1435            .collect()
1436    }
1437
1438    /// Returns the 1st leader of the round.
1439    fn first_leader(&self, round: Round) -> AuthorityIndex {
1440        self.leaders(round).first().unwrap().authority
1441    }
1442
1443    fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1444        self.last_proposed_block().timestamp_ms()
1445    }
1446
1447    fn last_proposed_round(&self) -> Round {
1448        self.last_proposed_block().round()
1449    }
1450
1451    fn last_proposed_block(&self) -> VerifiedBlock {
1452        self.dag_state.read().get_last_proposed_block()
1453    }
1454}
1455
1456/// Senders of signals from Core, for outputs and events (ex new block
1457/// produced).
1458pub(crate) struct CoreSignals {
1459    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1460    new_round_sender: watch::Sender<Round>,
1461    context: Arc<Context>,
1462}
1463
1464impl CoreSignals {
1465    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1466        // Blocks buffered in broadcast channel should be roughly equal to thosed cached
1467        // in dag state, since the underlying blocks are ref counted so a lower
1468        // buffer here will not reduce memory usage significantly.
1469        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1470            context.parameters.dag_state_cached_rounds as usize,
1471        );
1472        let (new_round_sender, new_round_receiver) = watch::channel(0);
1473
1474        let me = Self {
1475            tx_block_broadcast,
1476            new_round_sender,
1477            context,
1478        };
1479
1480        let receivers = CoreSignalsReceivers {
1481            rx_block_broadcast,
1482            new_round_receiver,
1483        };
1484
1485        (me, receivers)
1486    }
1487
1488    /// Sends a signal to all the waiters that a new block has been produced.
1489    /// The method will return true if block has reached even one
1490    /// subscriber, false otherwise.
1491    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1492        // When there is only one authority in committee, it is unnecessary to broadcast
1493        // the block which will fail anyway without subscribers to the signal.
1494        if self.context.committee.size() > 1 {
1495            if extended_block.block.round() == GENESIS_ROUND {
1496                debug!("Ignoring broadcasting genesis block to peers");
1497                return Ok(());
1498            }
1499
1500            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1501                warn!("Couldn't broadcast the block to any receiver: {err}");
1502                return Err(ConsensusError::Shutdown);
1503            }
1504        } else {
1505            debug!(
1506                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1507            );
1508        }
1509        Ok(())
1510    }
1511
1512    /// Sends a signal that threshold clock has advanced to new round. The
1513    /// `round_number` is the round at which the threshold clock has
1514    /// advanced to.
1515    pub(crate) fn new_round(&mut self, round_number: Round) {
1516        let _ = self.new_round_sender.send_replace(round_number);
1517    }
1518}
1519
1520/// Receivers of signals from Core.
1521/// Intentionally un-cloneable. Components should only subscribe to channels
1522/// they need.
1523pub(crate) struct CoreSignalsReceivers {
1524    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1525    new_round_receiver: watch::Receiver<Round>,
1526}
1527
1528impl CoreSignalsReceivers {
1529    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1530        self.rx_block_broadcast.resubscribe()
1531    }
1532
1533    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1534        self.new_round_receiver.clone()
1535    }
1536}
1537
1538/// Creates cores for the specified number of authorities for their
1539/// corresponding stakes. The method returns the cores and their respective
1540/// signal receivers are returned in `AuthorityIndex` order asc.
1541#[cfg(test)]
1542pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<CoreTextFixture> {
1543    let mut cores = Vec::new();
1544
1545    for index in 0..authorities.len() {
1546        let own_index = AuthorityIndex::new_for_test(index as u32);
1547        let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
1548        cores.push(core);
1549    }
1550    cores
1551}
1552
1553#[cfg(test)]
1554pub(crate) struct CoreTextFixture {
1555    pub core: Core,
1556    pub signal_receivers: CoreSignalsReceivers,
1557    pub block_receiver: broadcast::Receiver<ExtendedBlock>,
1558    #[expect(unused)]
1559    pub commit_receiver: UnboundedReceiver<CommittedSubDag>,
1560    pub store: Arc<MemStore>,
1561}
1562
1563#[cfg(test)]
1564impl CoreTextFixture {
1565    fn new(
1566        context: Context,
1567        authorities: Vec<Stake>,
1568        own_index: AuthorityIndex,
1569        sync_last_known_own_block: bool,
1570    ) -> Self {
1571        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1572        let mut context = context.clone();
1573        context = context
1574            .with_committee(committee)
1575            .with_authority_index(own_index);
1576        context
1577            .protocol_config
1578            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1579
1580        let context = Arc::new(context);
1581        let store = Arc::new(MemStore::new());
1582        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1583
1584        let block_manager = BlockManager::new(
1585            context.clone(),
1586            dag_state.clone(),
1587            Arc::new(NoopBlockVerifier),
1588        );
1589        let leader_schedule = Arc::new(
1590            LeaderSchedule::from_store(context.clone(), dag_state.clone())
1591                .with_num_commits_per_schedule(10),
1592        );
1593        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1594        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1595        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1596        // Need at least one subscriber to the block broadcast channel.
1597        let block_receiver = signal_receivers.block_broadcast_receiver();
1598
1599        let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
1600        let commit_observer = CommitObserver::new(
1601            context.clone(),
1602            CommitConsumer::new(commit_sender.clone(), 0),
1603            dag_state.clone(),
1604            store.clone(),
1605            leader_schedule.clone(),
1606        );
1607
1608        let block_signer = signers.remove(own_index.value()).1;
1609
1610        let core = Core::new(
1611            context,
1612            leader_schedule,
1613            transaction_consumer,
1614            block_manager,
1615            true,
1616            commit_observer,
1617            signals,
1618            block_signer,
1619            dag_state,
1620            sync_last_known_own_block,
1621        );
1622
1623        Self {
1624            core,
1625            signal_receivers,
1626            block_receiver,
1627            commit_receiver,
1628            store,
1629        }
1630    }
1631}
1632
1633#[cfg(test)]
1634mod test {
1635    use std::{collections::BTreeSet, time::Duration};
1636
1637    use consensus_config::{AuthorityIndex, Parameters};
1638    use futures::{StreamExt, stream::FuturesUnordered};
1639    use iota_metrics::monitored_mpsc::unbounded_channel;
1640    use iota_protocol_config::ProtocolConfig;
1641    use rstest::rstest;
1642    use tokio::time::sleep;
1643
1644    use super::*;
1645    use crate::{
1646        CommitConsumer, CommitIndex,
1647        block::{TestBlock, genesis_blocks},
1648        block_verifier::NoopBlockVerifier,
1649        commit::CommitAPI,
1650        leader_scoring::ReputationScores,
1651        storage::{Store, WriteBatch, mem_store::MemStore},
1652        test_dag_builder::DagBuilder,
1653        test_dag_parser::parse_dag,
1654        transaction::{BlockStatus, TransactionClient},
1655    };
1656
1657    /// Recover Core and continue proposing from the last round which forms a
1658    /// quorum.
1659    #[tokio::test]
1660    async fn test_core_recover_from_store_for_full_round() {
1661        telemetry_subscribers::init_for_testing();
1662        let (context, mut key_pairs) = Context::new_for_test(4);
1663        let context = Arc::new(context);
1664        let store = Arc::new(MemStore::new());
1665        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1666        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1667        let mut block_status_subscriptions = FuturesUnordered::new();
1668
1669        // Create test blocks for all the authorities for 4 rounds and populate them in
1670        // store
1671        let mut last_round_blocks = genesis_blocks(&context);
1672        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1673        for round in 1..=4 {
1674            let mut this_round_blocks = Vec::new();
1675            for (index, _authority) in context.committee.authorities() {
1676                let block = VerifiedBlock::new_for_test(
1677                    TestBlock::new(round, index.value() as u32)
1678                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1679                        .build(),
1680                );
1681
1682                // If it's round 1, that one will be committed later on, and it's our "own"
1683                // block, then subscribe to listen for the block status.
1684                if round == 1 && index == context.own_index {
1685                    let subscription =
1686                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1687                    block_status_subscriptions.push(subscription);
1688                }
1689
1690                this_round_blocks.push(block);
1691            }
1692            all_blocks.extend(this_round_blocks.clone());
1693            last_round_blocks = this_round_blocks;
1694        }
1695        // write them in store
1696        store
1697            .write(WriteBatch::default().blocks(all_blocks))
1698            .expect("Storage error");
1699
1700        // create dag state after all blocks have been written to store
1701        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1702        let block_manager = BlockManager::new(
1703            context.clone(),
1704            dag_state.clone(),
1705            Arc::new(NoopBlockVerifier),
1706        );
1707        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1708            context.clone(),
1709            dag_state.clone(),
1710        ));
1711
1712        let (sender, _receiver) = unbounded_channel("consensus_output");
1713        let commit_observer = CommitObserver::new(
1714            context.clone(),
1715            CommitConsumer::new(sender.clone(), 0),
1716            dag_state.clone(),
1717            store.clone(),
1718            leader_schedule.clone(),
1719        );
1720
1721        // Check no commits have been persisted to dag_state or store.
1722        let last_commit = store.read_last_commit().unwrap();
1723        assert!(last_commit.is_none());
1724        assert_eq!(dag_state.read().last_commit_index(), 0);
1725
1726        // Now spin up core
1727        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1728        // Need at least one subscriber to the block broadcast channel.
1729        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1730        let _core = Core::new(
1731            context.clone(),
1732            leader_schedule,
1733            transaction_consumer,
1734            block_manager,
1735            true,
1736            commit_observer,
1737            signals,
1738            key_pairs.remove(context.own_index.value()).1,
1739            dag_state.clone(),
1740            false,
1741        );
1742
1743        // New round should be 5
1744        let mut new_round = signal_receivers.new_round_receiver();
1745        assert_eq!(*new_round.borrow_and_update(), 5);
1746
1747        // Block for round 5 should have been proposed.
1748        let proposed_block = block_receiver
1749            .recv()
1750            .await
1751            .expect("A block should have been created");
1752        assert_eq!(proposed_block.block.round(), 5);
1753        let ancestors = proposed_block.block.ancestors();
1754
1755        // Only ancestors of round 4 should be included.
1756        assert_eq!(ancestors.len(), 4);
1757        for ancestor in ancestors {
1758            assert_eq!(ancestor.round, 4);
1759        }
1760
1761        let last_commit = store
1762            .read_last_commit()
1763            .unwrap()
1764            .expect("last commit should be set");
1765
1766        // There were no commits prior to the core starting up but there was completed
1767        // rounds up to and including round 4. So we should commit leaders in round 1 &
1768        // 2 as soon as the new block for round 5 is proposed.
1769        assert_eq!(last_commit.index(), 2);
1770        assert_eq!(dag_state.read().last_commit_index(), 2);
1771        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1772        assert_eq!(all_stored_commits.len(), 2);
1773
1774        // And ensure that our "own" block 1 sent to TransactionConsumer as notification
1775        // alongside with gc_round
1776        while let Some(result) = block_status_subscriptions.next().await {
1777            let status = result.unwrap();
1778            assert!(matches!(status, BlockStatus::Sequenced(_)));
1779        }
1780    }
1781
1782    /// Recover Core and continue proposing when having a partial last round
1783    /// which doesn't form a quorum and we haven't proposed for that round
1784    /// yet.
1785    #[tokio::test]
1786    async fn test_core_recover_from_store_for_partial_round() {
1787        telemetry_subscribers::init_for_testing();
1788
1789        let (context, mut key_pairs) = Context::new_for_test(4);
1790        let context = Arc::new(context);
1791        let store = Arc::new(MemStore::new());
1792        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1793        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1794
1795        // Create test blocks for all authorities except our's (index = 0).
1796        let mut last_round_blocks = genesis_blocks(&context);
1797        let mut all_blocks = last_round_blocks.clone();
1798        for round in 1..=4 {
1799            let mut this_round_blocks = Vec::new();
1800
1801            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of
1802            // position 1 from creating blocks.
1803            let authorities_to_skip = if round == 4 {
1804                context.committee.validity_threshold() as usize
1805            } else {
1806                // otherwise always skip creating a block for our authority
1807                1
1808            };
1809
1810            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1811                let block = TestBlock::new(round, index.value() as u32)
1812                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1813                    .build();
1814                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1815            }
1816            all_blocks.extend(this_round_blocks.clone());
1817            last_round_blocks = this_round_blocks;
1818        }
1819
1820        // write them in store
1821        store
1822            .write(WriteBatch::default().blocks(all_blocks))
1823            .expect("Storage error");
1824
1825        // create dag state after all blocks have been written to store
1826        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1827        let block_manager = BlockManager::new(
1828            context.clone(),
1829            dag_state.clone(),
1830            Arc::new(NoopBlockVerifier),
1831        );
1832        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1833            context.clone(),
1834            dag_state.clone(),
1835        ));
1836
1837        let (sender, _receiver) = unbounded_channel("consensus_output");
1838        let commit_observer = CommitObserver::new(
1839            context.clone(),
1840            CommitConsumer::new(sender.clone(), 0),
1841            dag_state.clone(),
1842            store.clone(),
1843            leader_schedule.clone(),
1844        );
1845
1846        // Check no commits have been persisted to dag_state & store
1847        let last_commit = store.read_last_commit().unwrap();
1848        assert!(last_commit.is_none());
1849        assert_eq!(dag_state.read().last_commit_index(), 0);
1850
1851        // Now spin up core
1852        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1853        // Need at least one subscriber to the block broadcast channel.
1854        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1855        let mut core = Core::new(
1856            context.clone(),
1857            leader_schedule,
1858            transaction_consumer,
1859            block_manager,
1860            true,
1861            commit_observer,
1862            signals,
1863            key_pairs.remove(context.own_index.value()).1,
1864            dag_state.clone(),
1865            false,
1866        );
1867
1868        // Clock round should have advanced to 5 during recovery because
1869        // a quorum has formed in round 4.
1870        let mut new_round = signal_receivers.new_round_receiver();
1871        assert_eq!(*new_round.borrow_and_update(), 5);
1872
1873        // During recovery, round 4 block should have been proposed.
1874        let proposed_block = block_receiver
1875            .recv()
1876            .await
1877            .expect("A block should have been created");
1878        assert_eq!(proposed_block.block.round(), 4);
1879        let ancestors = proposed_block.block.ancestors();
1880
1881        assert_eq!(ancestors.len(), 4);
1882        for ancestor in ancestors {
1883            if ancestor.author == context.own_index {
1884                assert_eq!(ancestor.round, 0);
1885            } else {
1886                assert_eq!(ancestor.round, 3);
1887            }
1888        }
1889
1890        // Run commit rule.
1891        core.try_commit(vec![]).ok();
1892        let last_commit = store
1893            .read_last_commit()
1894            .unwrap()
1895            .expect("last commit should be set");
1896
1897        // There were no commits prior to the core starting up but there was completed
1898        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1899        // as the new block for round 4 is proposed.
1900        assert_eq!(last_commit.index(), 2);
1901        assert_eq!(dag_state.read().last_commit_index(), 2);
1902        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1903        assert_eq!(all_stored_commits.len(), 2);
1904    }
1905
1906    #[tokio::test]
1907    async fn test_core_propose_after_genesis() {
1908        telemetry_subscribers::init_for_testing();
1909        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1910            config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1911            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1912            config
1913        });
1914
1915        let (context, mut key_pairs) = Context::new_for_test(4);
1916        let context = Arc::new(context);
1917        let store = Arc::new(MemStore::new());
1918        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1919
1920        let block_manager = BlockManager::new(
1921            context.clone(),
1922            dag_state.clone(),
1923            Arc::new(NoopBlockVerifier),
1924        );
1925        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1926        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1927        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1928        // Need at least one subscriber to the block broadcast channel.
1929        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1930        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1931            context.clone(),
1932            dag_state.clone(),
1933        ));
1934
1935        let (sender, _receiver) = unbounded_channel("consensus_output");
1936        let commit_observer = CommitObserver::new(
1937            context.clone(),
1938            CommitConsumer::new(sender.clone(), 0),
1939            dag_state.clone(),
1940            store.clone(),
1941            leader_schedule.clone(),
1942        );
1943
1944        let mut core = Core::new(
1945            context.clone(),
1946            leader_schedule,
1947            transaction_consumer,
1948            block_manager,
1949            true,
1950            commit_observer,
1951            signals,
1952            key_pairs.remove(context.own_index.value()).1,
1953            dag_state.clone(),
1954            false,
1955        );
1956
1957        // Send some transactions
1958        let mut total = 0;
1959        let mut index = 0;
1960        loop {
1961            let transaction =
1962                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1963            total += transaction.len();
1964            index += 1;
1965            let _w = transaction_client
1966                .submit_no_wait(vec![transaction])
1967                .await
1968                .unwrap();
1969
1970            // Create total size of transactions up to 1KB
1971            if total >= 1_000 {
1972                break;
1973            }
1974        }
1975
1976        // a new block should have been created during recovery.
1977        let extended_block = block_receiver
1978            .recv()
1979            .await
1980            .expect("A new block should have been created");
1981
1982        // A new block created - assert the details
1983        assert_eq!(extended_block.block.round(), 1);
1984        assert_eq!(extended_block.block.author().value(), 0);
1985        assert_eq!(extended_block.block.ancestors().len(), 4);
1986
1987        let mut total = 0;
1988        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1989            total += transaction.data().len() as u64;
1990            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1991            assert_eq!(format!("Transaction {i}"), transaction);
1992        }
1993        assert!(
1994            total
1995                <= context
1996                    .protocol_config
1997                    .consensus_max_transactions_in_block_bytes()
1998        );
1999
2000        // genesis blocks should be referenced
2001        let all_genesis = genesis_blocks(&context);
2002
2003        for ancestor in extended_block.block.ancestors() {
2004            all_genesis
2005                .iter()
2006                .find(|block| block.reference() == *ancestor)
2007                .expect("Block should be found amongst genesis blocks");
2008        }
2009
2010        // Try to propose again - with or without ignore leaders check, it will not
2011        // return any block
2012        assert!(core.try_propose(false).unwrap().is_none());
2013        assert!(core.try_propose(true).unwrap().is_none());
2014
2015        // Check no commits have been persisted to dag_state & store
2016        let last_commit = store.read_last_commit().unwrap();
2017        assert!(last_commit.is_none());
2018        assert_eq!(dag_state.read().last_commit_index(), 0);
2019    }
2020
2021    #[tokio::test]
2022    async fn test_core_propose_once_receiving_a_quorum() {
2023        telemetry_subscribers::init_for_testing();
2024        let (context, mut key_pairs) = Context::new_for_test(4);
2025        let context = Arc::new(context);
2026
2027        let store = Arc::new(MemStore::new());
2028        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2029
2030        let block_manager = BlockManager::new(
2031            context.clone(),
2032            dag_state.clone(),
2033            Arc::new(NoopBlockVerifier),
2034        );
2035        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2036            context.clone(),
2037            dag_state.clone(),
2038        ));
2039
2040        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2041        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2042        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2043        // Need at least one subscriber to the block broadcast channel.
2044        let _block_receiver = signal_receivers.block_broadcast_receiver();
2045
2046        let (sender, _receiver) = unbounded_channel("consensus_output");
2047        let commit_observer = CommitObserver::new(
2048            context.clone(),
2049            CommitConsumer::new(sender.clone(), 0),
2050            dag_state.clone(),
2051            store.clone(),
2052            leader_schedule.clone(),
2053        );
2054
2055        let mut core = Core::new(
2056            context.clone(),
2057            leader_schedule,
2058            transaction_consumer,
2059            block_manager,
2060            true,
2061            commit_observer,
2062            signals,
2063            key_pairs.remove(context.own_index.value()).1,
2064            dag_state.clone(),
2065            false,
2066        );
2067
2068        let mut expected_ancestors = BTreeSet::new();
2069
2070        // Adding one block now will trigger the creation of new block for round 1
2071        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
2072        expected_ancestors.insert(block_1.reference());
2073        // Wait for min round delay to allow blocks to be proposed.
2074        sleep(context.parameters.min_round_delay).await;
2075        // add blocks to trigger proposal.
2076        _ = core.add_blocks(vec![block_1]);
2077
2078        assert_eq!(core.last_proposed_round(), 1);
2079        expected_ancestors.insert(core.last_proposed_block().reference());
2080        // attempt to create a block - none will be produced.
2081        assert!(core.try_propose(false).unwrap().is_none());
2082
2083        // Adding another block now forms a quorum for round 1, so block at round 2 will
2084        // proposed
2085        let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
2086        expected_ancestors.insert(block_3.reference());
2087        // Wait for min round delay to allow blocks to be proposed.
2088        sleep(context.parameters.min_round_delay).await;
2089        // add blocks to trigger proposal.
2090        _ = core.add_blocks(vec![block_3]);
2091
2092        assert_eq!(core.last_proposed_round(), 2);
2093
2094        let proposed_block = core.last_proposed_block();
2095        assert_eq!(proposed_block.round(), 2);
2096        assert_eq!(proposed_block.author(), context.own_index);
2097        assert_eq!(proposed_block.ancestors().len(), 3);
2098        let ancestors = proposed_block.ancestors();
2099        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
2100        assert_eq!(ancestors, expected_ancestors);
2101
2102        // Check no commits have been persisted to dag_state & store
2103        let last_commit = store.read_last_commit().unwrap();
2104        assert!(last_commit.is_none());
2105        assert_eq!(dag_state.read().last_commit_index(), 0);
2106    }
2107
2108    #[rstest]
2109    #[tokio::test]
2110    async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
2111        telemetry_subscribers::init_for_testing();
2112        let (mut context, mut key_pairs) = Context::new_for_test(4);
2113
2114        if gc_depth > 0 {
2115            context
2116                .protocol_config
2117                .set_consensus_gc_depth_for_testing(gc_depth);
2118        }
2119
2120        let context = Arc::new(context);
2121
2122        let store = Arc::new(MemStore::new());
2123        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2124        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2125        let mut block_status_subscriptions = FuturesUnordered::new();
2126
2127        let dag_str = "DAG {
2128            Round 0 : { 4 },
2129            Round 1 : { * },
2130            Round 2 : { * },
2131            Round 3 : {
2132                A -> [*],
2133                B -> [-A2],
2134                C -> [-A2],
2135                D -> [-A2],
2136            },
2137            Round 4 : {
2138                B -> [-A3],
2139                C -> [-A3],
2140                D -> [-A3],
2141            },
2142            Round 5 : {
2143                A -> [A3, B4, C4, D4]
2144                B -> [*],
2145                C -> [*],
2146                D -> [*],
2147            },
2148            Round 6 : { * },
2149            Round 7 : { * },
2150            Round 8 : { * },
2151        }";
2152
2153        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2154        dag_builder.print();
2155
2156        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be
2157        // able to commit up to round 5.
2158        for block in dag_builder.blocks(1..=5) {
2159            if block.author() == context.own_index {
2160                let subscription =
2161                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
2162                block_status_subscriptions.push(subscription);
2163            }
2164        }
2165
2166        // write them in store
2167        store
2168            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2169            .expect("Storage error");
2170
2171        // create dag state after all blocks have been written to store
2172        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2173        let block_manager = BlockManager::new(
2174            context.clone(),
2175            dag_state.clone(),
2176            Arc::new(NoopBlockVerifier),
2177        );
2178        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2179            context.clone(),
2180            dag_state.clone(),
2181        ));
2182
2183        let (sender, _receiver) = unbounded_channel("consensus_output");
2184        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2185        let commit_observer = CommitObserver::new(
2186            context.clone(),
2187            commit_consumer,
2188            dag_state.clone(),
2189            store.clone(),
2190            leader_schedule.clone(),
2191        );
2192
2193        // Check no commits have been persisted to dag_state or store.
2194        let last_commit = store.read_last_commit().unwrap();
2195        assert!(last_commit.is_none());
2196        assert_eq!(dag_state.read().last_commit_index(), 0);
2197
2198        // Now spin up core
2199        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2200        // Need at least one subscriber to the block broadcast channel.
2201        let _block_receiver = signal_receivers.block_broadcast_receiver();
2202        let _core = Core::new(
2203            context.clone(),
2204            leader_schedule,
2205            transaction_consumer,
2206            block_manager,
2207            true,
2208            commit_observer,
2209            signals,
2210            key_pairs.remove(context.own_index.value()).1,
2211            dag_state.clone(),
2212            false,
2213        );
2214
2215        let last_commit = store
2216            .read_last_commit()
2217            .unwrap()
2218            .expect("last commit should be set");
2219
2220        assert_eq!(last_commit.index(), 5);
2221
2222        while let Some(result) = block_status_subscriptions.next().await {
2223            let status = result.unwrap();
2224
2225            // If gc is enabled, then we expect some blocks to be garbage collected.
2226            if gc_depth > 0 {
2227                match status {
2228                    BlockStatus::Sequenced(block_ref) => {
2229                        assert!(block_ref.round == 1 || block_ref.round == 5);
2230                    }
2231                    BlockStatus::GarbageCollected(block_ref) => {
2232                        assert!(block_ref.round == 2 || block_ref.round == 3);
2233                    }
2234                }
2235            } else {
2236                // otherwise all of them should be committed
2237                assert!(matches!(status, BlockStatus::Sequenced(_)));
2238            }
2239        }
2240    }
2241
2242    // Tests that the threshold clock advances when blocks get unsuspended due to
2243    // GC'ed blocks and newly created blocks are always higher than the last
2244    // advanced gc round.
2245    #[tokio::test]
2246    async fn test_multiple_commits_advance_threshold_clock() {
2247        telemetry_subscribers::init_for_testing();
2248        let (mut context, mut key_pairs) = Context::new_for_test(4);
2249        const GC_DEPTH: u32 = 2;
2250
2251        context
2252            .protocol_config
2253            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2254
2255        let context = Arc::new(context);
2256
2257        let store = Arc::new(MemStore::new());
2258        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2259        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2260
2261        // On round 1 we do produce the block for authority D but we do not link it
2262        // until round 6. This is making round 6 unable to get processed
2263        // until leader of round 3 is committed where round 1 gets garbage collected.
2264        // Then we add more rounds so we can trigger a commit for leader of round 9
2265        // which will move the gc round to 7.
2266        let dag_str = "DAG {
2267            Round 0 : { 4 },
2268            Round 1 : { * },
2269            Round 2 : { 
2270                B -> [-D1],
2271                C -> [-D1],
2272                D -> [-D1],
2273            },
2274            Round 3 : {
2275                B -> [*],
2276                C -> [*]
2277                D -> [*],
2278            },
2279            Round 4 : { 
2280                A -> [*],
2281                B -> [*],
2282                C -> [*]
2283                D -> [*],
2284            },
2285            Round 5 : { 
2286                A -> [*],
2287                B -> [*],
2288                C -> [*],
2289                D -> [*],
2290            },
2291            Round 6 : { 
2292                B -> [A5, B5, C5, D1],
2293                C -> [A5, B5, C5, D1],
2294                D -> [A5, B5, C5, D1],
2295            },
2296            Round 7 : { 
2297                B -> [*],
2298                C -> [*],
2299                D -> [*],
2300            },
2301            Round 8 : { 
2302                B -> [*],
2303                C -> [*],
2304                D -> [*],
2305            },
2306            Round 9 : { 
2307                B -> [*],
2308                C -> [*],
2309                D -> [*],
2310            },
2311            Round 10 : { 
2312                B -> [*],
2313                C -> [*],
2314                D -> [*],
2315            },
2316            Round 11 : { 
2317                B -> [*],
2318                C -> [*],
2319                D -> [*],
2320            },
2321        }";
2322
2323        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2324        dag_builder.print();
2325
2326        // create dag state after all blocks have been written to store
2327        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2328        let block_manager = BlockManager::new(
2329            context.clone(),
2330            dag_state.clone(),
2331            Arc::new(NoopBlockVerifier),
2332        );
2333        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2334            context.clone(),
2335            dag_state.clone(),
2336        ));
2337        let (sender, _receiver) = unbounded_channel("consensus_output");
2338        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2339        let commit_observer = CommitObserver::new(
2340            context.clone(),
2341            commit_consumer,
2342            dag_state.clone(),
2343            store.clone(),
2344            leader_schedule.clone(),
2345        );
2346
2347        // Check no commits have been persisted to dag_state or store.
2348        let last_commit = store.read_last_commit().unwrap();
2349        assert!(last_commit.is_none());
2350        assert_eq!(dag_state.read().last_commit_index(), 0);
2351
2352        // Now spin up core
2353        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2354        // Need at least one subscriber to the block broadcast channel.
2355        let _block_receiver = signal_receivers.block_broadcast_receiver();
2356        let mut core = Core::new(
2357            context.clone(),
2358            leader_schedule,
2359            transaction_consumer,
2360            block_manager,
2361            true,
2362            commit_observer,
2363            signals,
2364            key_pairs.remove(context.own_index.value()).1,
2365            dag_state.clone(),
2366            true,
2367        );
2368        // We set the last known round to 4 so we avoid creating new blocks until then -
2369        // otherwise it will crash as the already created DAG contains blocks for this
2370        // authority.
2371        core.set_last_known_proposed_round(4);
2372
2373        // We add all the blocks except D1. The only ones we can immediately accept are
2374        // the ones up to round 5 as they don't have a dependency on D1. Rest of blocks
2375        // do have causal dependency to D1 so they can't be processed until the
2376        // leader of round 3 can get committed and gc round moves to 1. That will make
2377        // all the blocks that depend to D1 get accepted. However, our threshold
2378        // clock is now at round 6 as the last quorum that we managed to process was the
2379        // round 5. As commits happen blocks of later rounds get accepted and
2380        // more leaders get committed. Eventually the leader of round 9 gets committed
2381        // and gc is moved to 9 - 2 = 7. If our node attempts to produce a block
2382        // for the threshold clock 6, that will make the acceptance checks fail as now
2383        // gc has moved far past this round.
2384        core.add_blocks(
2385            dag_builder
2386                .blocks(1..=11)
2387                .into_iter()
2388                .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2389                .collect(),
2390        )
2391        .expect("Should not fail");
2392
2393        assert_eq!(core.last_proposed_round(), 12);
2394    }
2395
2396    #[tokio::test]
2397    async fn test_core_set_min_propose_round() {
2398        telemetry_subscribers::init_for_testing();
2399        let (context, mut key_pairs) = Context::new_for_test(4);
2400        let context = Arc::new(context.with_parameters(Parameters {
2401            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2402            ..Default::default()
2403        }));
2404
2405        let store = Arc::new(MemStore::new());
2406        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2407
2408        let block_manager = BlockManager::new(
2409            context.clone(),
2410            dag_state.clone(),
2411            Arc::new(NoopBlockVerifier),
2412        );
2413        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2414            context.clone(),
2415            dag_state.clone(),
2416        ));
2417
2418        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2419        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2420        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2421        // Need at least one subscriber to the block broadcast channel.
2422        let _block_receiver = signal_receivers.block_broadcast_receiver();
2423
2424        let (sender, _receiver) = unbounded_channel("consensus_output");
2425        let commit_observer = CommitObserver::new(
2426            context.clone(),
2427            CommitConsumer::new(sender.clone(), 0),
2428            dag_state.clone(),
2429            store.clone(),
2430            leader_schedule.clone(),
2431        );
2432
2433        let mut core = Core::new(
2434            context.clone(),
2435            leader_schedule,
2436            transaction_consumer,
2437            block_manager,
2438            true,
2439            commit_observer,
2440            signals,
2441            key_pairs.remove(context.own_index.value()).1,
2442            dag_state.clone(),
2443            true,
2444        );
2445
2446        // No new block should have been produced
2447        assert_eq!(
2448            core.last_proposed_round(),
2449            GENESIS_ROUND,
2450            "No block should have been created other than genesis"
2451        );
2452
2453        // Trying to explicitly propose a block will not produce anything
2454        assert!(core.try_propose(true).unwrap().is_none());
2455
2456        // Create blocks for the whole network - even "our" node in order to replicate
2457        // an "amnesia" recovery.
2458        let mut builder = DagBuilder::new(context.clone());
2459        builder.layers(1..=10).build();
2460
2461        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2462
2463        // Process all the blocks
2464        assert!(core.add_blocks(blocks).unwrap().is_empty());
2465
2466        // Try to propose - no block should be produced.
2467        assert!(core.try_propose(true).unwrap().is_none());
2468
2469        // Now set the last known proposed round which is the highest round for which
2470        // the network informed us that we do have proposed a block about.
2471        core.set_last_known_proposed_round(10);
2472
2473        let block = core.try_propose(true).expect("No error").unwrap();
2474        assert_eq!(block.round(), 11);
2475        assert_eq!(block.ancestors().len(), 4);
2476
2477        let our_ancestor_included = block.ancestors()[0];
2478        assert_eq!(our_ancestor_included.author, context.own_index);
2479        assert_eq!(our_ancestor_included.round, 10);
2480    }
2481
2482    #[tokio::test(flavor = "current_thread", start_paused = true)]
2483    async fn test_core_try_new_block_leader_timeout() {
2484        telemetry_subscribers::init_for_testing();
2485
2486        // Since we run the test with started_paused = true, any time-dependent
2487        // operations using Tokio's time facilities, such as tokio::time::sleep
2488        // or tokio::time::Instant, will not advance. So practically each Core's
2489        // clock will have initialised potentially with different values but it never
2490        // advances. To ensure that blocks won't get rejected by cores we'll
2491        // need to manually wait for the time diff before processing them. By
2492        // calling the `tokio::time::sleep` we implicitly also advance the tokio
2493        // clock.
2494        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2495            // Simulate the time wait before processing a block to ensure that
2496            // block.timestamp <= now
2497            let now = context.clock.timestamp_utc_ms();
2498            let max_timestamp = blocks
2499                .iter()
2500                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2501                .map(|block| block.timestamp_ms())
2502                .unwrap_or(0);
2503
2504            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2505            sleep(wait_time).await;
2506        }
2507
2508        let (context, _) = Context::new_for_test(4);
2509        // Create the cores for all authorities
2510        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2511
2512        // Create blocks for rounds 1..=3 from all Cores except last Core of authority
2513        // 3, so we miss the block from it. As it will be the leader of round 3
2514        // then no-one will be able to progress to round 4 unless we explicitly trigger
2515        // the block creation.
2516        // create the cores and their signals for all the authorities
2517        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2518
2519        // Now iterate over a few rounds and ensure the corresponding signals are
2520        // created while network advances
2521        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2522        for round in 1..=3 {
2523            let mut this_round_blocks = Vec::new();
2524
2525            for core_fixture in cores.iter_mut() {
2526                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2527
2528                core_fixture
2529                    .core
2530                    .add_blocks(last_round_blocks.clone())
2531                    .unwrap();
2532
2533                // Only when round > 1 and using non-genesis parents.
2534                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2535                    assert_eq!(round - 1, r);
2536                    if core_fixture.core.last_proposed_round() == r {
2537                        // Force propose new block regardless of min round delay.
2538                        core_fixture
2539                            .core
2540                            .try_propose(true)
2541                            .unwrap()
2542                            .unwrap_or_else(|| {
2543                                panic!("Block should have been proposed for round {round}")
2544                            });
2545                    }
2546                }
2547
2548                assert_eq!(core_fixture.core.last_proposed_round(), round);
2549
2550                this_round_blocks.push(core_fixture.core.last_proposed_block());
2551            }
2552
2553            last_round_blocks = this_round_blocks;
2554        }
2555
2556        // Try to create the blocks for round 4 by calling the try_propose() method. No
2557        // block should be created as the leader - authority 3 - hasn't proposed
2558        // any block.
2559        for core_fixture in cores.iter_mut() {
2560            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2561
2562            core_fixture
2563                .core
2564                .add_blocks(last_round_blocks.clone())
2565                .unwrap();
2566            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2567        }
2568
2569        // Now try to create the blocks for round 4 via the leader timeout method which
2570        // should ignore any leader checks or min round delay.
2571        for core_fixture in cores.iter_mut() {
2572            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2573            assert_eq!(core_fixture.core.last_proposed_round(), 4);
2574
2575            // Check commits have been persisted to store
2576            let last_commit = core_fixture
2577                .store
2578                .read_last_commit()
2579                .unwrap()
2580                .expect("last commit should be set");
2581            // There are 1 leader rounds with rounds completed up to and including
2582            // round 4
2583            assert_eq!(last_commit.index(), 1);
2584            let all_stored_commits = core_fixture
2585                .store
2586                .scan_commits((0..=CommitIndex::MAX).into())
2587                .unwrap();
2588            assert_eq!(all_stored_commits.len(), 1);
2589        }
2590    }
2591
2592    #[tokio::test(flavor = "current_thread", start_paused = true)]
2593    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2594        telemetry_subscribers::init_for_testing();
2595
2596        // Since we run the test with started_paused = true, any time-dependent
2597        // operations using Tokio's time facilities, such as tokio::time::sleep
2598        // or tokio::time::Instant, will not advance. So practically each Core's
2599        // clock will have initialised potentially with different values but it never
2600        // advances. To ensure that blocks won't get rejected by cores we'll
2601        // need to manually wait for the time diff before processing them. By
2602        // calling the `tokio::time::sleep` we implicitly also advance the tokio
2603        // clock.
2604        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2605            // Simulate the time wait before processing a block to ensure that
2606            // block.timestamp <= now
2607            let now = context.clock.timestamp_utc_ms();
2608            let max_timestamp = blocks
2609                .iter()
2610                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2611                .map(|block| block.timestamp_ms())
2612                .unwrap_or(0);
2613
2614            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2615            sleep(wait_time).await;
2616        }
2617
2618        let (mut context, _) = Context::new_for_test(4);
2619        context
2620            .protocol_config
2621            .set_consensus_smart_ancestor_selection_for_testing(true);
2622        context
2623            .protocol_config
2624            .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2625
2626        // Create the cores for all authorities
2627        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2628        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2629
2630        // Create blocks for rounds 1..=30 from all Cores except last Core of authority
2631        // 3.
2632        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2633        for round in 1..=30 {
2634            let mut this_round_blocks = Vec::new();
2635
2636            for core_fixture in cores.iter_mut() {
2637                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2638
2639                core_fixture
2640                    .core
2641                    .add_blocks(last_round_blocks.clone())
2642                    .unwrap();
2643
2644                // Only when round > 1 and using non-genesis parents.
2645                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2646                    assert_eq!(round - 1, r);
2647                    if core_fixture.core.last_proposed_round() == r {
2648                        // Force propose new block regardless of min round delay.
2649                        core_fixture
2650                            .core
2651                            .try_propose(true)
2652                            .unwrap()
2653                            .unwrap_or_else(|| {
2654                                panic!("Block should have been proposed for round {round}")
2655                            });
2656                    }
2657                }
2658
2659                assert_eq!(core_fixture.core.last_proposed_round(), round);
2660
2661                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2662            }
2663
2664            last_round_blocks = this_round_blocks;
2665        }
2666
2667        // Now produce blocks for all Cores
2668        for round in 31..=40 {
2669            let mut this_round_blocks = Vec::new();
2670
2671            for core_fixture in all_cores.iter_mut() {
2672                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2673
2674                core_fixture
2675                    .core
2676                    .add_blocks(last_round_blocks.clone())
2677                    .unwrap();
2678
2679                // Only when round > 1 and using non-genesis parents.
2680                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2681                    assert_eq!(round - 1, r);
2682                    if core_fixture.core.last_proposed_round() == r {
2683                        // Force propose new block regardless of min round delay.
2684                        core_fixture
2685                            .core
2686                            .try_propose(true)
2687                            .unwrap()
2688                            .unwrap_or_else(|| {
2689                                panic!("Block should have been proposed for round {round}")
2690                            });
2691                    }
2692                }
2693
2694                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2695
2696                for block in this_round_blocks.iter() {
2697                    if block.author() != AuthorityIndex::new_for_test(3) {
2698                        // Assert blocks created include only 3 ancestors per block as one
2699                        // should be excluded
2700                        assert_eq!(block.ancestors().len(), 3);
2701                    } else {
2702                        // Authority 3 is the low scoring authority so it will still include
2703                        // its own blocks.
2704                        assert_eq!(block.ancestors().len(), 4);
2705                    }
2706                }
2707            }
2708
2709            last_round_blocks = this_round_blocks;
2710        }
2711    }
2712
2713    #[tokio::test]
2714    async fn test_smart_ancestor_selection() {
2715        telemetry_subscribers::init_for_testing();
2716        let (mut context, mut key_pairs) = Context::new_for_test(7);
2717        context
2718            .protocol_config
2719            .set_consensus_smart_ancestor_selection_for_testing(true);
2720        context
2721            .protocol_config
2722            .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2723        let context = Arc::new(context.with_parameters(Parameters {
2724            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2725            ..Default::default()
2726        }));
2727
2728        let store = Arc::new(MemStore::new());
2729        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2730
2731        let block_manager = BlockManager::new(
2732            context.clone(),
2733            dag_state.clone(),
2734            Arc::new(NoopBlockVerifier),
2735        );
2736        let leader_schedule = Arc::new(
2737            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2738                .with_num_commits_per_schedule(10),
2739        );
2740
2741        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2742        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2743        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2744        // Need at least one subscriber to the block broadcast channel.
2745        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2746
2747        let (sender, _receiver) = unbounded_channel("consensus_output");
2748        let commit_consumer = CommitConsumer::new(sender, 0);
2749        let commit_observer = CommitObserver::new(
2750            context.clone(),
2751            commit_consumer,
2752            dag_state.clone(),
2753            store.clone(),
2754            leader_schedule.clone(),
2755        );
2756
2757        let mut core = Core::new(
2758            context.clone(),
2759            leader_schedule,
2760            transaction_consumer,
2761            block_manager,
2762            true,
2763            commit_observer,
2764            signals,
2765            key_pairs.remove(context.own_index.value()).1,
2766            dag_state.clone(),
2767            true,
2768        );
2769
2770        // No new block should have been produced
2771        assert_eq!(
2772            core.last_proposed_round(),
2773            GENESIS_ROUND,
2774            "No block should have been created other than genesis"
2775        );
2776
2777        // Trying to explicitly propose a block will not produce anything
2778        assert!(core.try_propose(true).unwrap().is_none());
2779
2780        // Create blocks for the whole network but not for authority 1
2781        let mut builder = DagBuilder::new(context.clone());
2782        builder
2783            .layers(1..=12)
2784            .authorities(vec![AuthorityIndex::new_for_test(1)])
2785            .skip_block()
2786            .build();
2787        let blocks = builder.blocks(1..=12);
2788        // Process all the blocks
2789        assert!(core.add_blocks(blocks).unwrap().is_empty());
2790        core.set_last_known_proposed_round(12);
2791
2792        let block = core.try_propose(true).expect("No error").unwrap();
2793        assert_eq!(block.round(), 13);
2794        assert_eq!(block.ancestors().len(), 7);
2795
2796        // Build blocks for rest of the network other than own index
2797        builder
2798            .layers(13..=14)
2799            .authorities(vec![AuthorityIndex::new_for_test(0)])
2800            .skip_block()
2801            .build();
2802        let blocks = builder.blocks(13..=14);
2803        assert!(core.add_blocks(blocks).unwrap().is_empty());
2804
2805        // We now have triggered a leader schedule change so we should have
2806        // one EXCLUDE authority (1) when we go to select ancestors for the next
2807        // proposal
2808        let block = core.try_propose(true).expect("No error").unwrap();
2809        assert_eq!(block.round(), 15);
2810        assert_eq!(block.ancestors().len(), 6);
2811
2812        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2813        // which will trigger smart select and we will not propose a block
2814        builder
2815            .layer(15)
2816            .authorities(vec![
2817                AuthorityIndex::new_for_test(0),
2818                AuthorityIndex::new_for_test(5),
2819                AuthorityIndex::new_for_test(6),
2820            ])
2821            .skip_block()
2822            .build();
2823        let blocks = builder.blocks(15..=15);
2824        let authority_1_excluded_block_reference = blocks
2825            .iter()
2826            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2827            .unwrap()
2828            .reference();
2829        // Wait for min round delay to allow blocks to be proposed.
2830        sleep(context.parameters.min_round_delay).await;
2831        // Smart select should be triggered and no block should be proposed.
2832        assert!(core.add_blocks(blocks).unwrap().is_empty());
2833        assert_eq!(core.last_proposed_block().round(), 15);
2834
2835        builder
2836            .layer(15)
2837            .authorities(vec![
2838                AuthorityIndex::new_for_test(0),
2839                AuthorityIndex::new_for_test(1),
2840                AuthorityIndex::new_for_test(2),
2841                AuthorityIndex::new_for_test(3),
2842                AuthorityIndex::new_for_test(4),
2843            ])
2844            .skip_block()
2845            .build();
2846        let blocks = builder.blocks(15..=15);
2847        let included_block_references = iter::once(&core.last_proposed_block())
2848            .chain(blocks.iter())
2849            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2850            .map(|block| block.reference())
2851            .collect::<Vec<_>>();
2852
2853        // Have enough ancestor blocks to propose now.
2854        assert!(core.add_blocks(blocks).unwrap().is_empty());
2855        assert_eq!(core.last_proposed_block().round(), 16);
2856
2857        // Check that a new block has been proposed & signaled.
2858        let extended_block = loop {
2859            let extended_block =
2860                tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2861                    .await
2862                    .unwrap()
2863                    .unwrap();
2864            if extended_block.block.round() == 16 {
2865                break extended_block;
2866            }
2867        };
2868        assert_eq!(extended_block.block.round(), 16);
2869        assert_eq!(extended_block.block.author(), core.context.own_index);
2870        assert_eq!(extended_block.block.ancestors().len(), 6);
2871        assert_eq!(extended_block.block.ancestors(), included_block_references);
2872        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2873        assert_eq!(
2874            extended_block.excluded_ancestors[0],
2875            authority_1_excluded_block_reference
2876        );
2877
2878        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2879        // which will trigger smart select and we will not propose a block.
2880        // This time we will force propose by hitting the leader timeout after which
2881        // should cause us to include this EXCLUDE ancestor.
2882        builder
2883            .layer(16)
2884            .authorities(vec![
2885                AuthorityIndex::new_for_test(0),
2886                AuthorityIndex::new_for_test(5),
2887                AuthorityIndex::new_for_test(6),
2888            ])
2889            .skip_block()
2890            .build();
2891        let blocks = builder.blocks(16..=16);
2892        // Wait for leader timeout to force blocks to be proposed.
2893        sleep(context.parameters.min_round_delay).await;
2894        // Smart select should be triggered and no block should be proposed.
2895        assert!(core.add_blocks(blocks).unwrap().is_empty());
2896        assert_eq!(core.last_proposed_block().round(), 16);
2897
2898        // Simulate a leader timeout and a force proposal where we will include
2899        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2900        let block = core.try_propose(true).expect("No error").unwrap();
2901        assert_eq!(block.round(), 17);
2902        assert_eq!(block.ancestors().len(), 5);
2903
2904        // Check that a new block has been proposed & signaled.
2905        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2906            .await
2907            .unwrap()
2908            .unwrap();
2909        assert_eq!(extended_block.block.round(), 17);
2910        assert_eq!(extended_block.block.author(), core.context.own_index);
2911        assert_eq!(extended_block.block.ancestors().len(), 5);
2912        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2913
2914        // Set quorum rounds for authority which will unlock the Excluded
2915        // authority (1) and then we should be able to create a new layer of blocks
2916        // which will then all be included as ancestors for the next proposal
2917        core.set_propagation_delay_and_quorum_rounds(
2918            0,
2919            vec![
2920                (16, 16),
2921                (16, 16),
2922                (16, 16),
2923                (16, 16),
2924                (16, 16),
2925                (16, 16),
2926                (16, 16),
2927            ],
2928            vec![
2929                (16, 16),
2930                (16, 16),
2931                (16, 16),
2932                (16, 16),
2933                (16, 16),
2934                (16, 16),
2935                (16, 16),
2936            ],
2937        );
2938
2939        builder
2940            .layer(17)
2941            .authorities(vec![AuthorityIndex::new_for_test(0)])
2942            .skip_block()
2943            .build();
2944        let blocks = builder.blocks(17..=17);
2945        let included_block_references = iter::once(&core.last_proposed_block())
2946            .chain(blocks.iter())
2947            .map(|block| block.reference())
2948            .collect::<Vec<_>>();
2949
2950        // Have enough ancestor blocks to propose now.
2951        sleep(context.parameters.min_round_delay).await;
2952        assert!(core.add_blocks(blocks).unwrap().is_empty());
2953        assert_eq!(core.last_proposed_block().round(), 18);
2954
2955        // Check that a new block has been proposed & signaled.
2956        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2957            .await
2958            .unwrap()
2959            .unwrap();
2960        assert_eq!(extended_block.block.round(), 18);
2961        assert_eq!(extended_block.block.author(), core.context.own_index);
2962        assert_eq!(extended_block.block.ancestors().len(), 7);
2963        assert_eq!(extended_block.block.ancestors(), included_block_references);
2964        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2965    }
2966
2967    #[tokio::test]
2968    async fn test_excluded_ancestor_limit() {
2969        telemetry_subscribers::init_for_testing();
2970        let (mut context, mut key_pairs) = Context::new_for_test(4);
2971        context
2972            .protocol_config
2973            .set_consensus_smart_ancestor_selection_for_testing(true);
2974        context
2975            .protocol_config
2976            .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2977        let context = Arc::new(context.with_parameters(Parameters {
2978            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2979            ..Default::default()
2980        }));
2981
2982        let store = Arc::new(MemStore::new());
2983        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2984
2985        let block_manager = BlockManager::new(
2986            context.clone(),
2987            dag_state.clone(),
2988            Arc::new(NoopBlockVerifier),
2989        );
2990        let leader_schedule = Arc::new(
2991            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2992                .with_num_commits_per_schedule(10),
2993        );
2994
2995        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2996        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2997        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2998        // Need at least one subscriber to the block broadcast channel.
2999        let mut block_receiver = signal_receivers.block_broadcast_receiver();
3000
3001        let (sender, _receiver) = unbounded_channel("consensus_output");
3002        let commit_consumer = CommitConsumer::new(sender, 0);
3003        let commit_observer = CommitObserver::new(
3004            context.clone(),
3005            commit_consumer,
3006            dag_state.clone(),
3007            store.clone(),
3008            leader_schedule.clone(),
3009        );
3010
3011        let mut core = Core::new(
3012            context.clone(),
3013            leader_schedule,
3014            transaction_consumer,
3015            block_manager,
3016            true,
3017            commit_observer,
3018            signals,
3019            key_pairs.remove(context.own_index.value()).1,
3020            dag_state.clone(),
3021            true,
3022        );
3023
3024        // No new block should have been produced
3025        assert_eq!(
3026            core.last_proposed_round(),
3027            GENESIS_ROUND,
3028            "No block should have been created other than genesis"
3029        );
3030
3031        // Create blocks for the whole network
3032        let mut builder = DagBuilder::new(context.clone());
3033        builder.layers(1..=3).build();
3034
3035        // This will equivocate 9 blocks for authority 1 which will be excluded on
3036        // the proposal but because of the limits set will be dropped and not included
3037        // as part of the ExtendedBlock structure sent to the rest of the network
3038        builder
3039            .layer(4)
3040            .authorities(vec![AuthorityIndex::new_for_test(1)])
3041            .equivocate(9)
3042            .build();
3043        let blocks = builder.blocks(1..=4);
3044
3045        // Process all the blocks
3046        assert!(core.add_blocks(blocks).unwrap().is_empty());
3047        core.set_last_known_proposed_round(3);
3048
3049        let block = core.try_propose(true).expect("No error").unwrap();
3050        assert_eq!(block.round(), 5);
3051        assert_eq!(block.ancestors().len(), 4);
3052
3053        // Check that a new block has been proposed & signaled.
3054        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3055            .await
3056            .unwrap()
3057            .unwrap();
3058        assert_eq!(extended_block.block.round(), 5);
3059        assert_eq!(extended_block.block.author(), core.context.own_index);
3060        assert_eq!(extended_block.block.ancestors().len(), 4);
3061        assert_eq!(extended_block.excluded_ancestors.len(), 8);
3062    }
3063
3064    #[tokio::test]
3065    async fn test_core_set_subscriber_exists() {
3066        telemetry_subscribers::init_for_testing();
3067        let (context, mut key_pairs) = Context::new_for_test(4);
3068        let context = Arc::new(context);
3069        let store = Arc::new(MemStore::new());
3070        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3071
3072        let block_manager = BlockManager::new(
3073            context.clone(),
3074            dag_state.clone(),
3075            Arc::new(NoopBlockVerifier),
3076        );
3077        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3078            context.clone(),
3079            dag_state.clone(),
3080        ));
3081
3082        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3083        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3084        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3085        // Need at least one subscriber to the block broadcast channel.
3086        let _block_receiver = signal_receivers.block_broadcast_receiver();
3087
3088        let (sender, _receiver) = unbounded_channel("consensus_output");
3089        let commit_observer = CommitObserver::new(
3090            context.clone(),
3091            CommitConsumer::new(sender.clone(), 0),
3092            dag_state.clone(),
3093            store.clone(),
3094            leader_schedule.clone(),
3095        );
3096
3097        let mut core = Core::new(
3098            context.clone(),
3099            leader_schedule,
3100            transaction_consumer,
3101            block_manager,
3102            // Set to no subscriber exists initially.
3103            false,
3104            commit_observer,
3105            signals,
3106            key_pairs.remove(context.own_index.value()).1,
3107            dag_state.clone(),
3108            false,
3109        );
3110
3111        // There is no proposal during recovery because there is no subscriber.
3112        assert_eq!(
3113            core.last_proposed_round(),
3114            GENESIS_ROUND,
3115            "No block should have been created other than genesis"
3116        );
3117
3118        // There is no proposal even with forced proposing.
3119        assert!(core.try_propose(true).unwrap().is_none());
3120
3121        // Let Core know subscriber exists.
3122        core.set_quorum_subscribers_exists(true);
3123
3124        // Proposing now would succeed.
3125        assert!(core.try_propose(true).unwrap().is_some());
3126    }
3127
3128    #[tokio::test]
3129    async fn test_core_set_propagation_delay_per_authority() {
3130        // TODO: create helper to avoid the duplicated code here.
3131        telemetry_subscribers::init_for_testing();
3132        let (context, mut key_pairs) = Context::new_for_test(4);
3133        let context = Arc::new(context);
3134        let store = Arc::new(MemStore::new());
3135        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3136
3137        let block_manager = BlockManager::new(
3138            context.clone(),
3139            dag_state.clone(),
3140            Arc::new(NoopBlockVerifier),
3141        );
3142        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3143            context.clone(),
3144            dag_state.clone(),
3145        ));
3146
3147        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3148        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3149        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3150        // Need at least one subscriber to the block broadcast channel.
3151        let _block_receiver = signal_receivers.block_broadcast_receiver();
3152
3153        let (sender, _receiver) = unbounded_channel("consensus_output");
3154        let commit_observer = CommitObserver::new(
3155            context.clone(),
3156            CommitConsumer::new(sender.clone(), 0),
3157            dag_state.clone(),
3158            store.clone(),
3159            leader_schedule.clone(),
3160        );
3161
3162        let mut core = Core::new(
3163            context.clone(),
3164            leader_schedule,
3165            transaction_consumer,
3166            block_manager,
3167            // Set to no subscriber exists initially.
3168            false,
3169            commit_observer,
3170            signals,
3171            key_pairs.remove(context.own_index.value()).1,
3172            dag_state.clone(),
3173            false,
3174        );
3175
3176        // There is no proposal during recovery because there is no subscriber.
3177        assert_eq!(
3178            core.last_proposed_round(),
3179            GENESIS_ROUND,
3180            "No block should have been created other than genesis"
3181        );
3182
3183        // Use a large propagation delay to disable proposing.
3184        core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);
3185
3186        // Make propagation delay the only reason for not proposing.
3187        core.set_quorum_subscribers_exists(true);
3188
3189        // There is no proposal even with forced proposing.
3190        assert!(core.try_propose(true).unwrap().is_none());
3191
3192        // Let Core know there is no propagation delay.
3193        core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);
3194
3195        // Proposing now would succeed.
3196        assert!(core.try_propose(true).unwrap().is_some());
3197    }
3198
3199    #[tokio::test(flavor = "current_thread", start_paused = true)]
3200    async fn test_leader_schedule_change() {
3201        telemetry_subscribers::init_for_testing();
3202        let default_params = Parameters::default();
3203
3204        let (context, _) = Context::new_for_test(4);
3205        // create the cores and their signals for all the authorities
3206        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3207
3208        // Now iterate over a few rounds and ensure the corresponding signals are
3209        // created while network advances
3210        let mut last_round_blocks = Vec::new();
3211        for round in 1..=30 {
3212            let mut this_round_blocks = Vec::new();
3213
3214            // Wait for min round delay to allow blocks to be proposed.
3215            sleep(default_params.min_round_delay).await;
3216
3217            for core_fixture in &mut cores {
3218                // add the blocks from last round
3219                // this will trigger a block creation for the round and a signal should be
3220                // emitted
3221                core_fixture
3222                    .core
3223                    .add_blocks(last_round_blocks.clone())
3224                    .unwrap();
3225
3226                // A "new round" signal should be received given that all the blocks of previous
3227                // round have been processed
3228                let new_round = receive(
3229                    Duration::from_secs(1),
3230                    core_fixture.signal_receivers.new_round_receiver(),
3231                )
3232                .await;
3233                assert_eq!(new_round, round);
3234
3235                // Check that a new block has been proposed.
3236                let extended_block = tokio::time::timeout(
3237                    Duration::from_secs(1),
3238                    core_fixture.block_receiver.recv(),
3239                )
3240                .await
3241                .unwrap()
3242                .unwrap();
3243                assert_eq!(extended_block.block.round(), round);
3244                assert_eq!(
3245                    extended_block.block.author(),
3246                    core_fixture.core.context.own_index
3247                );
3248
3249                // append the new block to this round blocks
3250                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3251
3252                let block = core_fixture.core.last_proposed_block();
3253
3254                // ensure that produced block is referring to the blocks of last_round
3255                assert_eq!(
3256                    block.ancestors().len(),
3257                    core_fixture.core.context.committee.size()
3258                );
3259                for ancestor in block.ancestors() {
3260                    if block.round() > 1 {
3261                        // don't bother with round 1 block which just contains the genesis blocks.
3262                        assert!(
3263                            last_round_blocks
3264                                .iter()
3265                                .any(|block| block.reference() == *ancestor),
3266                            "Reference from previous round should be added"
3267                        );
3268                    }
3269                }
3270            }
3271
3272            last_round_blocks = this_round_blocks;
3273        }
3274
3275        for core_fixture in cores {
3276            // Check commits have been persisted to store
3277            let last_commit = core_fixture
3278                .store
3279                .read_last_commit()
3280                .unwrap()
3281                .expect("last commit should be set");
3282            // There are 28 leader rounds with rounds completed up to and including
3283            // round 29. Round 30 blocks will only include their own blocks, so the
3284            // 28th leader will not be committed.
3285            assert_eq!(last_commit.index(), 27);
3286            let all_stored_commits = core_fixture
3287                .store
3288                .scan_commits((0..=CommitIndex::MAX).into())
3289                .unwrap();
3290            assert_eq!(all_stored_commits.len(), 27);
3291            assert_eq!(
3292                core_fixture
3293                    .core
3294                    .leader_schedule
3295                    .leader_swap_table
3296                    .read()
3297                    .bad_nodes
3298                    .len(),
3299                1
3300            );
3301            assert_eq!(
3302                core_fixture
3303                    .core
3304                    .leader_schedule
3305                    .leader_swap_table
3306                    .read()
3307                    .good_nodes
3308                    .len(),
3309                1
3310            );
3311            let expected_reputation_scores =
3312                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3313            assert_eq!(
3314                core_fixture
3315                    .core
3316                    .leader_schedule
3317                    .leader_swap_table
3318                    .read()
3319                    .reputation_scores,
3320                expected_reputation_scores
3321            );
3322        }
3323    }
3324
3325    // TODO: Remove this when DistributedVoteScoring is enabled.
3326    #[tokio::test(flavor = "current_thread", start_paused = true)]
3327    async fn test_leader_schedule_change_with_vote_scoring() {
3328        telemetry_subscribers::init_for_testing();
3329        let default_params = Parameters::default();
3330        let (mut context, _) = Context::new_for_test(4);
3331        context
3332            .protocol_config
3333            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3334        // create the cores and their signals for all the authorities
3335        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3336        // Now iterate over a few rounds and ensure the corresponding signals are
3337        // created while network advances
3338        let mut last_round_blocks = Vec::new();
3339        for round in 1..=30 {
3340            let mut this_round_blocks = Vec::new();
3341            // Wait for min round delay to allow blocks to be proposed.
3342            sleep(default_params.min_round_delay).await;
3343            for core_fixture in &mut cores {
3344                // add the blocks from last round
3345                // this will trigger a block creation for the round and a signal should be
3346                // emitted
3347                core_fixture
3348                    .core
3349                    .add_blocks(last_round_blocks.clone())
3350                    .unwrap();
3351                // A "new round" signal should be received given that all the blocks of previous
3352                // round have been processed
3353                let new_round = receive(
3354                    Duration::from_secs(1),
3355                    core_fixture.signal_receivers.new_round_receiver(),
3356                )
3357                .await;
3358                assert_eq!(new_round, round);
3359                // Check that a new block has been proposed.
3360                let extended_block = tokio::time::timeout(
3361                    Duration::from_secs(1),
3362                    core_fixture.block_receiver.recv(),
3363                )
3364                .await
3365                .unwrap()
3366                .unwrap();
3367                assert_eq!(extended_block.block.round(), round);
3368                assert_eq!(
3369                    extended_block.block.author(),
3370                    core_fixture.core.context.own_index
3371                );
3372
3373                // append the new block to this round blocks
3374                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3375                let block = core_fixture.core.last_proposed_block();
3376                // ensure that produced block is referring to the blocks of last_round
3377                assert_eq!(
3378                    block.ancestors().len(),
3379                    core_fixture.core.context.committee.size()
3380                );
3381                for ancestor in block.ancestors() {
3382                    if block.round() > 1 {
3383                        // don't bother with round 1 block which just contains the genesis blocks.
3384                        assert!(
3385                            last_round_blocks
3386                                .iter()
3387                                .any(|block| block.reference() == *ancestor),
3388                            "Reference from previous round should be added"
3389                        );
3390                    }
3391                }
3392            }
3393            last_round_blocks = this_round_blocks;
3394        }
3395        for core_fixture in cores {
3396            // Check commits have been persisted to store
3397            let last_commit = core_fixture
3398                .store
3399                .read_last_commit()
3400                .unwrap()
3401                .expect("last commit should be set");
3402            // There are 28 leader rounds with rounds completed up to and including
3403            // round 29. Round 30 blocks will only include their own blocks, so the
3404            // 28th leader will not be committed.
3405            assert_eq!(last_commit.index(), 27);
3406            let all_stored_commits = core_fixture
3407                .store
3408                .scan_commits((0..=CommitIndex::MAX).into())
3409                .unwrap();
3410            assert_eq!(all_stored_commits.len(), 27);
3411            assert_eq!(
3412                core_fixture
3413                    .core
3414                    .leader_schedule
3415                    .leader_swap_table
3416                    .read()
3417                    .bad_nodes
3418                    .len(),
3419                1
3420            );
3421            assert_eq!(
3422                core_fixture
3423                    .core
3424                    .leader_schedule
3425                    .leader_swap_table
3426                    .read()
3427                    .good_nodes
3428                    .len(),
3429                1
3430            );
3431            let expected_reputation_scores =
3432                ReputationScores::new((11..=20).into(), vec![9, 8, 8, 8]);
3433            assert_eq!(
3434                core_fixture
3435                    .core
3436                    .leader_schedule
3437                    .leader_swap_table
3438                    .read()
3439                    .reputation_scores,
3440                expected_reputation_scores
3441            );
3442        }
3443    }
3444
3445    #[tokio::test]
3446    async fn test_validate_certified_commits() {
3447        telemetry_subscribers::init_for_testing();
3448
3449        let (context, _key_pairs) = Context::new_for_test(4);
3450        let context = context.with_parameters(Parameters {
3451            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3452            ..Default::default()
3453        });
3454
3455        let authority_index = AuthorityIndex::new_for_test(0);
3456        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3457        let mut core = core.core;
3458
3459        // No new block should have been produced
3460        assert_eq!(
3461            core.last_proposed_round(),
3462            GENESIS_ROUND,
3463            "No block should have been created other than genesis"
3464        );
3465
3466        // create a DAG of 12 rounds
3467        let mut dag_builder = DagBuilder::new(core.context.clone());
3468        dag_builder.layers(1..=12).build();
3469
3470        // Store all blocks up to round 6 which should be enough to decide up to leader
3471        // 4
3472        dag_builder.print();
3473        let blocks = dag_builder.blocks(1..=6);
3474
3475        for block in blocks {
3476            core.dag_state.write().accept_block(block);
3477        }
3478
3479        // Get all the committed sub dags up to round 10
3480        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3481
3482        // Now try to commit up to the latest leader (round = 4). Do not provide any
3483        // certified commits.
3484        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3485
3486        // We should have committed up to round 4
3487        assert_eq!(committed_sub_dags.len(), 4);
3488
3489        // Now validate the certified commits. We'll try 3 different scenarios:
3490        println!("Case 1. Provide certified commits that are all before the last committed round.");
3491
3492        // Highest certified commit should be for leader of round 4.
3493        let certified_commits = sub_dags_and_commits
3494            .iter()
3495            .take(4)
3496            .map(|(_, c)| c)
3497            .cloned()
3498            .collect::<Vec<_>>();
3499        assert!(
3500            certified_commits.last().unwrap().index()
3501                <= committed_sub_dags.last().unwrap().commit_ref.index,
3502            "Highest certified commit should older than the highest committed index."
3503        );
3504
3505        let certified_commits = core.validate_certified_commits(certified_commits).unwrap();
3506
3507        // No commits should be processed
3508        assert!(certified_commits.is_empty());
3509
3510        println!("Case 2. Provide certified commits that are all after the last committed round.");
3511
3512        // Highest certified commit should be for leader of round 4.
3513        let certified_commits = sub_dags_and_commits
3514            .iter()
3515            .take(5)
3516            .map(|(_, c)| c.clone())
3517            .collect::<Vec<_>>();
3518
3519        let certified_commits = core
3520            .validate_certified_commits(certified_commits.clone())
3521            .unwrap();
3522
3523        // The certified commit of index 5 should be processed.
3524        assert_eq!(certified_commits.len(), 1);
3525        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3526
3527        println!(
3528            "Case 3. Provide certified commits where the first certified commit index is not the last_committed_index + 1."
3529        );
3530
3531        // Highest certified commit should be for leader of round 4.
3532        let certified_commits = sub_dags_and_commits
3533            .iter()
3534            .skip(5)
3535            .take(1)
3536            .map(|(_, c)| c.clone())
3537            .collect::<Vec<_>>();
3538
3539        let err = core
3540            .validate_certified_commits(certified_commits.clone())
3541            .unwrap_err();
3542        match err {
3543            ConsensusError::UnexpectedCertifiedCommitIndex {
3544                expected_commit_index: 5,
3545                commit_index: 6,
3546            } => (),
3547            _ => panic!("Unexpected error: {err:?}"),
3548        }
3549    }
3550
3551    #[tokio::test]
3552    async fn test_add_certified_commits() {
3553        telemetry_subscribers::init_for_testing();
3554
3555        let (context, _key_pairs) = Context::new_for_test(4);
3556        let context = context.with_parameters(Parameters {
3557            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3558            ..Default::default()
3559        });
3560
3561        let authority_index = AuthorityIndex::new_for_test(0);
3562        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3563        let store = core.store.clone();
3564        let mut core = core.core;
3565
3566        // No new block should have been produced
3567        assert_eq!(
3568            core.last_proposed_round(),
3569            GENESIS_ROUND,
3570            "No block should have been created other than genesis"
3571        );
3572
3573        // create a DAG of 12 rounds
3574        let mut dag_builder = DagBuilder::new(core.context.clone());
3575        dag_builder.layers(1..=12).build();
3576
3577        // Store all blocks up to round 6 which should be enough to decide up to leader
3578        // 4
3579        dag_builder.print();
3580        let blocks = dag_builder.blocks(1..=6);
3581
3582        for block in blocks {
3583            core.dag_state.write().accept_block(block);
3584        }
3585
3586        // Get all the committed sub dags up to round 10
3587        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3588
3589        // Now try to commit up to the latest leader (round = 4). Do not provide any
3590        // certified commits.
3591        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3592
3593        // We should have committed up to round 4
3594        assert_eq!(committed_sub_dags.len(), 4);
3595
3596        let last_commit = store
3597            .read_last_commit()
3598            .unwrap()
3599            .expect("Last commit should be set");
3600        assert_eq!(last_commit.reference().index, 4);
3601
3602        println!("Case 1. Provide no certified commits. No commit should happen.");
3603
3604        let last_commit = store
3605            .read_last_commit()
3606            .unwrap()
3607            .expect("Last commit should be set");
3608        assert_eq!(last_commit.reference().index, 4);
3609
3610        println!(
3611            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3612        );
3613
3614        // The commits of leader rounds 5-8 should be committed via the certified
3615        // commits.
3616        let certified_commits = sub_dags_and_commits
3617            .iter()
3618            .skip(3)
3619            .take(5)
3620            .map(|(_, c)| c.clone())
3621            .collect::<Vec<_>>();
3622
3623        // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be
3624        // accepted via the certified commits processing.
3625        let blocks = dag_builder.blocks(8..=12);
3626        for block in blocks {
3627            core.dag_state.write().accept_block(block);
3628        }
3629
3630        // The corresponding blocks of the certified commits should be accepted and
3631        // stored before linearizing and committing the DAG.
3632        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3633            .expect("Should not fail");
3634
3635        let commits = store.scan_commits((6..=10).into()).unwrap();
3636
3637        // We expect all the sub dags up to leader round 10 to be committed.
3638        assert_eq!(commits.len(), 5);
3639
3640        for i in 6..=10 {
3641            let commit = &commits[i - 6];
3642            assert_eq!(commit.reference().index, i as u32);
3643        }
3644    }
3645
3646    #[tokio::test]
3647    async fn try_commit_with_certified_commits_gced_blocks() {
3648        const GC_DEPTH: u32 = 3;
3649        telemetry_subscribers::init_for_testing();
3650
3651        let (mut context, mut key_pairs) = Context::new_for_test(5);
3652        context
3653            .protocol_config
3654            .set_consensus_gc_depth_for_testing(GC_DEPTH);
3655        // context.protocol_config.
3656        // set_narwhal_new_leader_election_schedule_for_testing(val);
3657        let context = Arc::new(context.with_parameters(Parameters {
3658            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3659            ..Default::default()
3660        }));
3661
3662        let store = Arc::new(MemStore::new());
3663        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3664
3665        let block_manager = BlockManager::new(
3666            context.clone(),
3667            dag_state.clone(),
3668            Arc::new(NoopBlockVerifier),
3669        );
3670        let leader_schedule = Arc::new(
3671            LeaderSchedule::from_store(context.clone(), dag_state.clone())
3672                .with_num_commits_per_schedule(10),
3673        );
3674
3675        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3676        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3677        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3678        // Need at least one subscriber to the block broadcast channel.
3679        let _block_receiver = signal_receivers.block_broadcast_receiver();
3680
3681        let (sender, _receiver) = unbounded_channel("consensus_output");
3682        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
3683        let commit_observer = CommitObserver::new(
3684            context.clone(),
3685            commit_consumer,
3686            dag_state.clone(),
3687            store.clone(),
3688            leader_schedule.clone(),
3689        );
3690
3691        let mut core = Core::new(
3692            context.clone(),
3693            leader_schedule,
3694            transaction_consumer,
3695            block_manager,
3696            true,
3697            commit_observer,
3698            signals,
3699            key_pairs.remove(context.own_index.value()).1,
3700            dag_state.clone(),
3701            true,
3702        );
3703
3704        // No new block should have been produced
3705        assert_eq!(
3706            core.last_proposed_round(),
3707            GENESIS_ROUND,
3708            "No block should have been created other than genesis"
3709        );
3710
3711        let dag_str = "DAG {
3712            Round 0 : { 5 },
3713            Round 1 : { * },
3714            Round 2 : {
3715                A -> [-E1],
3716                B -> [-E1],
3717                C -> [-E1],
3718                D -> [-E1],
3719            },
3720            Round 3 : {
3721                A -> [*],
3722                B -> [*],
3723                C -> [*],
3724                D -> [*],
3725            },
3726            Round 4 : {
3727                A -> [*],
3728                B -> [*],
3729                C -> [*],
3730                D -> [*],
3731            },
3732            Round 5 : {
3733                A -> [*],
3734                B -> [*],
3735                C -> [*],
3736                D -> [*],
3737                E -> [A4, B4, C4, D4, E1]
3738            },
3739            Round 6 : { * },
3740            Round 7 : { * },
3741        }";
3742
3743        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3744        dag_builder.print();
3745
3746        // Now get all the committed sub dags from the DagBuilder
3747        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3748            .get_sub_dag_and_certified_commits(1..=5)
3749            .into_iter()
3750            .unzip();
3751
3752        // Now try to commit up to the latest leader (round = 5) with the provided
3753        // certified commits. Not that we have not accepted any blocks. That
3754        // should happen during the commit process.
3755        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3756
3757        // We should have committed up to round 4
3758        assert_eq!(committed_sub_dags.len(), 4);
3759        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3760            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3761
3762            // ensure that block from E1 node has not been committed
3763            for block in committed_sub_dag.blocks.iter() {
3764                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3765                    panic!("Did not expect to commit block E1");
3766                }
3767            }
3768        }
3769    }
3770
3771    #[tokio::test(flavor = "current_thread", start_paused = true)]
3772    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3773        telemetry_subscribers::init_for_testing();
3774        let default_params = Parameters::default();
3775
3776        let (context, _) = Context::new_for_test(6);
3777
3778        // create the cores and their signals for all the authorities
3779        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3780
3781        // Now iterate over a few rounds and ensure the corresponding signals are
3782        // created while network advances
3783        let mut last_round_blocks = Vec::new();
3784        for round in 1..=33 {
3785            let mut this_round_blocks = Vec::new();
3786            // Wait for min round delay to allow blocks to be proposed.
3787            sleep(default_params.min_round_delay).await;
3788            for core_fixture in &mut cores {
3789                // add the blocks from last round
3790                // this will trigger a block creation for the round and a signal should be
3791                // emitted
3792                core_fixture
3793                    .core
3794                    .add_blocks(last_round_blocks.clone())
3795                    .unwrap();
3796                // A "new round" signal should be received given that all the blocks of previous
3797                // round have been processed
3798                let new_round = receive(
3799                    Duration::from_secs(1),
3800                    core_fixture.signal_receivers.new_round_receiver(),
3801                )
3802                .await;
3803                assert_eq!(new_round, round);
3804                // Check that a new block has been proposed.
3805                let extended_block = tokio::time::timeout(
3806                    Duration::from_secs(1),
3807                    core_fixture.block_receiver.recv(),
3808                )
3809                .await
3810                .unwrap()
3811                .unwrap();
3812                assert_eq!(extended_block.block.round(), round);
3813                assert_eq!(
3814                    extended_block.block.author(),
3815                    core_fixture.core.context.own_index
3816                );
3817
3818                // append the new block to this round blocks
3819                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3820                let block = core_fixture.core.last_proposed_block();
3821                // ensure that produced block is referring to the blocks of last_round
3822                assert_eq!(
3823                    block.ancestors().len(),
3824                    core_fixture.core.context.committee.size()
3825                );
3826                for ancestor in block.ancestors() {
3827                    if block.round() > 1 {
3828                        // don't bother with round 1 block which just contains the genesis blocks.
3829                        assert!(
3830                            last_round_blocks
3831                                .iter()
3832                                .any(|block| block.reference() == *ancestor),
3833                            "Reference from previous round should be added"
3834                        );
3835                    }
3836                }
3837            }
3838            last_round_blocks = this_round_blocks;
3839        }
3840        for core_fixture in cores {
3841            // Check commits have been persisted to store
3842            let last_commit = core_fixture
3843                .store
3844                .read_last_commit()
3845                .unwrap()
3846                .expect("last commit should be set");
3847            // There are 31 leader rounds with rounds completed up to and including
3848            // round 33. Round 33 blocks will only include their own blocks, so there
3849            // should only be 30 commits.
3850            // However on a leader schedule change boundary its is possible for a
3851            // new leader to get selected for the same round if the leader elected
3852            // gets swapped allowing for multiple leaders to be committed at a round.
3853            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3854            // otherwise 31.
3855            // NOTE: We used 31 leader rounds to specifically trigger the scenario
3856            // where the leader schedule boundary occurred AND we had a swap to a new
3857            // leader for the same round
3858            let expected_commit_count = 30;
3859            // Leave the code for re-use.
3860            // let expected_commit_count = match num_leaders_per_round {
3861            //    Some(1) => 30,
3862            //    _ => 31,
3863            //};
3864            assert_eq!(last_commit.index(), expected_commit_count);
3865            let all_stored_commits = core_fixture
3866                .store
3867                .scan_commits((0..=CommitIndex::MAX).into())
3868                .unwrap();
3869            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3870            assert_eq!(
3871                core_fixture
3872                    .core
3873                    .leader_schedule
3874                    .leader_swap_table
3875                    .read()
3876                    .bad_nodes
3877                    .len(),
3878                1
3879            );
3880            assert_eq!(
3881                core_fixture
3882                    .core
3883                    .leader_schedule
3884                    .leader_swap_table
3885                    .read()
3886                    .good_nodes
3887                    .len(),
3888                1
3889            );
3890            let expected_reputation_scores =
3891                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3892            assert_eq!(
3893                core_fixture
3894                    .core
3895                    .leader_schedule
3896                    .leader_swap_table
3897                    .read()
3898                    .reputation_scores,
3899                expected_reputation_scores
3900            );
3901        }
3902    }
3903
3904    // TODO: Remove two tests below this when DistributedVoteScoring is enabled.
3905    #[tokio::test(flavor = "current_thread", start_paused = true)]
3906    async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring()
3907    {
3908        telemetry_subscribers::init_for_testing();
3909        let default_params = Parameters::default();
3910
3911        let (mut context, _) = Context::new_for_test(6);
3912        context
3913            .protocol_config
3914            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3915
3916        // create the cores and their signals for all the authorities
3917        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3918        // Now iterate over a few rounds and ensure the corresponding signals are
3919        // created while network advances
3920        let mut last_round_blocks = Vec::new();
3921        for round in 1..=63 {
3922            let mut this_round_blocks = Vec::new();
3923
3924            // Wait for min round delay to allow blocks to be proposed.
3925            sleep(default_params.min_round_delay).await;
3926
3927            for core_fixture in &mut cores {
3928                // add the blocks from last round
3929                // this will trigger a block creation for the round and a signal should be
3930                // emitted
3931                core_fixture
3932                    .core
3933                    .add_blocks(last_round_blocks.clone())
3934                    .unwrap();
3935
3936                // A "new round" signal should be received given that all the blocks of previous
3937                // round have been processed
3938                let new_round = receive(
3939                    Duration::from_secs(1),
3940                    core_fixture.signal_receivers.new_round_receiver(),
3941                )
3942                .await;
3943                assert_eq!(new_round, round);
3944
3945                // Check that a new block has been proposed.
3946                let extended_block = tokio::time::timeout(
3947                    Duration::from_secs(1),
3948                    core_fixture.block_receiver.recv(),
3949                )
3950                .await
3951                .unwrap()
3952                .unwrap();
3953                assert_eq!(extended_block.block.round(), round);
3954                assert_eq!(
3955                    extended_block.block.author(),
3956                    core_fixture.core.context.own_index
3957                );
3958
3959                // append the new block to this round blocks
3960                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3961
3962                let block = core_fixture.core.last_proposed_block();
3963
3964                // ensure that produced block is referring to the blocks of last_round
3965                assert_eq!(
3966                    block.ancestors().len(),
3967                    core_fixture.core.context.committee.size()
3968                );
3969                for ancestor in block.ancestors() {
3970                    if block.round() > 1 {
3971                        // don't bother with round 1 block which just contains the genesis blocks.
3972                        assert!(
3973                            last_round_blocks
3974                                .iter()
3975                                .any(|block| block.reference() == *ancestor),
3976                            "Reference from previous round should be added"
3977                        );
3978                    }
3979                }
3980            }
3981
3982            last_round_blocks = this_round_blocks;
3983        }
3984
3985        for core_fixture in cores {
3986            // Check commits have been persisted to store
3987            let last_commit = core_fixture
3988                .store
3989                .read_last_commit()
3990                .unwrap()
3991                .expect("last commit should be set");
3992            // There are 61 leader rounds with rounds completed up to and including
3993            // round 63. Round 63 blocks will only include their own blocks, so there
3994            // should only be 60 commits.
3995            // However on a leader schedule change boundary its is possible for a
3996            // new leader to get selected for the same round if the leader elected
3997            // gets swapped allowing for multiple leaders to be committed at a round.
3998            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3999            // otherwise 61.
4000            // NOTE: We used 61 leader rounds to specifically trigger the scenario
4001            // where the leader schedule boundary occurred AND we had a swap to a new
4002            // leader for the same round
4003            let expected_commit_count = 60;
4004            // Leave the code for re-use.
4005            // let expected_commit_count = match num_leaders_per_round {
4006            //    Some(1) => 60,
4007            //    _ => 61,
4008            //};
4009            assert_eq!(last_commit.index(), expected_commit_count);
4010            let all_stored_commits = core_fixture
4011                .store
4012                .scan_commits((0..=CommitIndex::MAX).into())
4013                .unwrap();
4014            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
4015            assert_eq!(
4016                core_fixture
4017                    .core
4018                    .leader_schedule
4019                    .leader_swap_table
4020                    .read()
4021                    .bad_nodes
4022                    .len(),
4023                1
4024            );
4025            assert_eq!(
4026                core_fixture
4027                    .core
4028                    .leader_schedule
4029                    .leader_swap_table
4030                    .read()
4031                    .good_nodes
4032                    .len(),
4033                1
4034            );
4035            let expected_reputation_scores =
4036                ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
4037            assert_eq!(
4038                core_fixture
4039                    .core
4040                    .leader_schedule
4041                    .leader_swap_table
4042                    .read()
4043                    .reputation_scores,
4044                expected_reputation_scores
4045            );
4046        }
4047    }
4048
4049    #[tokio::test]
4050    async fn test_core_signals() {
4051        telemetry_subscribers::init_for_testing();
4052        let default_params = Parameters::default();
4053
4054        let (context, _) = Context::new_for_test(4);
4055        // create the cores and their signals for all the authorities
4056        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4057
4058        // Now iterate over a few rounds and ensure the corresponding signals are
4059        // created while network advances
4060        let mut last_round_blocks = Vec::new();
4061        for round in 1..=10 {
4062            let mut this_round_blocks = Vec::new();
4063
4064            // Wait for min round delay to allow blocks to be proposed.
4065            sleep(default_params.min_round_delay).await;
4066
4067            for core_fixture in &mut cores {
4068                // add the blocks from last round
4069                // this will trigger a block creation for the round and a signal should be
4070                // emitted
4071                core_fixture
4072                    .core
4073                    .add_blocks(last_round_blocks.clone())
4074                    .unwrap();
4075
4076                // A "new round" signal should be received given that all the blocks of previous
4077                // round have been processed
4078                let new_round = receive(
4079                    Duration::from_secs(1),
4080                    core_fixture.signal_receivers.new_round_receiver(),
4081                )
4082                .await;
4083                assert_eq!(new_round, round);
4084
4085                // Check that a new block has been proposed.
4086                let extended_block = tokio::time::timeout(
4087                    Duration::from_secs(1),
4088                    core_fixture.block_receiver.recv(),
4089                )
4090                .await
4091                .unwrap()
4092                .unwrap();
4093                assert_eq!(extended_block.block.round(), round);
4094                assert_eq!(
4095                    extended_block.block.author(),
4096                    core_fixture.core.context.own_index
4097                );
4098
4099                // append the new block to this round blocks
4100                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
4101
4102                let block = core_fixture.core.last_proposed_block();
4103
4104                // ensure that produced block is referring to the blocks of last_round
4105                assert_eq!(
4106                    block.ancestors().len(),
4107                    core_fixture.core.context.committee.size()
4108                );
4109                for ancestor in block.ancestors() {
4110                    if block.round() > 1 {
4111                        // don't bother with round 1 block which just contains the genesis blocks.
4112                        assert!(
4113                            last_round_blocks
4114                                .iter()
4115                                .any(|block| block.reference() == *ancestor),
4116                            "Reference from previous round should be added"
4117                        );
4118                    }
4119                }
4120            }
4121
4122            last_round_blocks = this_round_blocks;
4123        }
4124
4125        for core_fixture in cores {
4126            // Check commits have been persisted to store
4127            let last_commit = core_fixture
4128                .store
4129                .read_last_commit()
4130                .unwrap()
4131                .expect("last commit should be set");
4132            // There are 8 leader rounds with rounds completed up to and including
4133            // round 9. Round 10 blocks will only include their own blocks, so the
4134            // 8th leader will not be committed.
4135            assert_eq!(last_commit.index(), 7);
4136            let all_stored_commits = core_fixture
4137                .store
4138                .scan_commits((0..=CommitIndex::MAX).into())
4139                .unwrap();
4140            assert_eq!(all_stored_commits.len(), 7);
4141        }
4142    }
4143
4144    #[tokio::test]
4145    async fn test_core_compress_proposal_references() {
4146        telemetry_subscribers::init_for_testing();
4147        let default_params = Parameters::default();
4148
4149        let (context, _) = Context::new_for_test(4);
4150        // create the cores and their signals for all the authorities
4151        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4152
4153        let mut last_round_blocks = Vec::new();
4154        let mut all_blocks = Vec::new();
4155
4156        let excluded_authority = AuthorityIndex::new_for_test(3);
4157
4158        for round in 1..=10 {
4159            let mut this_round_blocks = Vec::new();
4160
4161            for core_fixture in &mut cores {
4162                // do not produce any block for authority 3
4163                if core_fixture.core.context.own_index == excluded_authority {
4164                    continue;
4165                }
4166
4167                // try to propose to ensure that we are covering the case where we miss the
4168                // leader authority 3
4169                core_fixture
4170                    .core
4171                    .add_blocks(last_round_blocks.clone())
4172                    .unwrap();
4173                core_fixture.core.new_block(round, true).unwrap();
4174
4175                let block = core_fixture.core.last_proposed_block();
4176                assert_eq!(block.round(), round);
4177
4178                // append the new block to this round blocks
4179                this_round_blocks.push(block.clone());
4180            }
4181
4182            last_round_blocks = this_round_blocks.clone();
4183            all_blocks.extend(this_round_blocks);
4184        }
4185
4186        // Now send all the produced blocks to core of authority 3. It should produce a
4187        // new block. If no compression would be applied the we should expect
4188        // all the previous blocks to be referenced from round 0..=10. However, since
4189        // compression is applied only the last round's (10) blocks should be
4190        // referenced + the authority's block of round 0.
4191        let core_fixture = &mut cores[excluded_authority];
4192        // Wait for min round delay to allow blocks to be proposed.
4193        sleep(default_params.min_round_delay).await;
4194        // add blocks to trigger proposal.
4195        core_fixture.core.add_blocks(all_blocks).unwrap();
4196
4197        // Assert that a block has been created for round 11 and it references to blocks
4198        // of round 10 for the other peers, and to round 1 for its own block
4199        // (created after recovery).
4200        let block = core_fixture.core.last_proposed_block();
4201        assert_eq!(block.round(), 11);
4202        assert_eq!(block.ancestors().len(), 4);
4203        for block_ref in block.ancestors() {
4204            if block_ref.author == excluded_authority {
4205                assert_eq!(block_ref.round, 1);
4206            } else {
4207                assert_eq!(block_ref.round, 10);
4208            }
4209        }
4210
4211        // Check commits have been persisted to store
4212        let last_commit = core_fixture
4213            .store
4214            .read_last_commit()
4215            .unwrap()
4216            .expect("last commit should be set");
4217        // There are 8 leader rounds with rounds completed up to and including
4218        // round 10. However because there were no blocks produced for authority 3
4219        // 2 leader rounds will be skipped.
4220        assert_eq!(last_commit.index(), 6);
4221        let all_stored_commits = core_fixture
4222            .store
4223            .scan_commits((0..=CommitIndex::MAX).into())
4224            .unwrap();
4225        assert_eq!(all_stored_commits.len(), 6);
4226    }
4227
4228    #[tokio::test]
4229    async fn try_decide_certified() {
4230        // GIVEN
4231        telemetry_subscribers::init_for_testing();
4232
4233        let (context, _) = Context::new_for_test(4);
4234
4235        let authority_index = AuthorityIndex::new_for_test(0);
4236        let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
4237        let mut core = core.core;
4238
4239        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4240        dag_builder.layers(1..=12).build();
4241
4242        let limit = 2;
4243
4244        let blocks = dag_builder.blocks(1..=12);
4245
4246        for block in blocks {
4247            core.dag_state.write().accept_block(block);
4248        }
4249
4250        // WHEN
4251        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4252        let mut certified_commits = sub_dags_and_commits
4253            .into_iter()
4254            .map(|(_, commit)| commit)
4255            .collect::<Vec<_>>();
4256
4257        let leaders = core.try_decide_certified(&mut certified_commits, limit);
4258
4259        // THEN
4260        assert_eq!(leaders.len(), 2);
4261        assert_eq!(certified_commits.len(), 2);
4262    }
4263
4264    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4265        tokio::time::timeout(timeout, receiver.changed())
4266            .await
4267            .expect("Timeout while waiting to read from receiver")
4268            .expect("Signal receive channel shouldn't be closed");
4269        *receiver.borrow_and_update()
4270    }
4271}