consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    iter, mem,
8    sync::Arc,
9    time::Duration,
10    vec,
11};
12
13use consensus_config::{AuthorityIndex, ProtocolKeyPair};
14#[cfg(test)]
15use consensus_config::{Stake, local_committee_and_keys};
16use iota_macros::fail_point;
17#[cfg(test)]
18use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
19use iota_metrics::monitored_scope;
20use itertools::Itertools as _;
21use parking_lot::RwLock;
22use tokio::{
23    sync::{broadcast, watch},
24    time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30    CommitConsumer, TransactionClient, block_verifier::NoopBlockVerifier,
31    storage::mem_store::MemStore,
32};
33use crate::{
34    ancestor::{AncestorState, AncestorStateManager},
35    block::{
36        Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, GENESIS_ROUND, Round,
37        SignedBlock, Slot, VerifiedBlock,
38    },
39    block_manager::BlockManager,
40    commit::{
41        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42    },
43    commit_observer::CommitObserver,
44    context::Context,
45    dag_state::DagState,
46    error::{ConsensusError, ConsensusResult},
47    leader_schedule::LeaderSchedule,
48    round_prober::QuorumRound,
49    stake_aggregator::{QuorumThreshold, StakeAggregator},
50    transaction::TransactionConsumer,
51    universal_committer::{
52        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
53    },
54};
55
56// Maximum number of commit votes to include in a block.
57// TODO: Move to protocol config, and verify in BlockVerifier.
58const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
59
60pub(crate) struct Core {
61    context: Arc<Context>,
62    /// The consumer to use in order to pull transactions to be included for the
63    /// next proposals
64    transaction_consumer: TransactionConsumer,
65    /// The block manager which is responsible for keeping track of the DAG
66    /// dependencies when processing new blocks and accept them or suspend
67    /// if we are missing their causal history
68    block_manager: BlockManager,
69    /// Whether there are subscribers waiting for new blocks proposed by this
70    /// authority. Core stops proposing new blocks when there is no
71    /// subscriber, because new proposed blocks will likely contain only
72    /// stale info when they propagate to peers.
73    subscriber_exists: bool,
74    /// Estimated delay by round for propagating blocks to a quorum.
75    /// Because of the nature of TCP and block streaming, propagation delay is
76    /// expected to be 0 in most cases, even when the actual latency of
77    /// broadcasting blocks is high. When this value is higher than the
78    /// `propagation_delay_stop_proposal_threshold`, most likely this
79    /// validator cannot broadcast  blocks to the network at all. Core stops
80    /// proposing new blocks in this case.
81    propagation_delay: Round,
82
83    /// Used to make commit decisions for leader blocks in the dag.
84    committer: UniversalCommitter,
85    /// The last new round for which core has sent out a signal.
86    last_signaled_round: Round,
87    /// The blocks of the last included ancestors per authority. This vector is
88    /// basically used as a watermark in order to include in the next block
89    /// proposal only ancestors of higher rounds. By default, is initialised
90    /// with `None` values.
91    last_included_ancestors: Vec<Option<BlockRef>>,
92    /// The last decided leader returned from the universal committer. Important
93    /// to note that this does not signify that the leader has been
94    /// persisted yet as it still has to go through CommitObserver and
95    /// persist the commit in store. On recovery/restart
96    /// the last_decided_leader will be set to the last_commit leader in dag
97    /// state.
98    last_decided_leader: Slot,
99    /// The consensus leader schedule to be used to resolve the leader for a
100    /// given round.
101    leader_schedule: Arc<LeaderSchedule>,
102    /// The commit observer is responsible for observing the commits and
103    /// collecting
104    /// + sending subdags over the consensus output channel.
105    commit_observer: CommitObserver,
106    /// Sender of outgoing signals from Core.
107    signals: CoreSignals,
108    /// The keypair to be used for block signing
109    block_signer: ProtocolKeyPair,
110    /// Keeping track of state of the DAG, including blocks, commits and last
111    /// committed rounds.
112    dag_state: Arc<RwLock<DagState>>,
113    /// The last known round for which the node has proposed. Any proposal
114    /// should be for a round > of this. This is currently being used to
115    /// avoid equivocations during a node recovering from amnesia. When value is
116    /// None it means that the last block sync mechanism is enabled, but it
117    /// hasn't been initialised yet.
118    last_known_proposed_round: Option<Round>,
119    // The ancestor state manager will keep track of the quality of the authorities
120    // based on the distribution of their blocks to the network. It will use this
121    // information to decide whether to include that authority block in the next
122    // proposal or not.
123    ancestor_state_manager: AncestorStateManager,
124}
125
126impl Core {
127    pub(crate) fn new(
128        context: Arc<Context>,
129        leader_schedule: Arc<LeaderSchedule>,
130        transaction_consumer: TransactionConsumer,
131        block_manager: BlockManager,
132        subscriber_exists: bool,
133        commit_observer: CommitObserver,
134        signals: CoreSignals,
135        block_signer: ProtocolKeyPair,
136        dag_state: Arc<RwLock<DagState>>,
137        sync_last_known_own_block: bool,
138    ) -> Self {
139        let last_decided_leader = dag_state.read().last_commit_leader();
140        let committer = UniversalCommitterBuilder::new(
141            context.clone(),
142            leader_schedule.clone(),
143            dag_state.clone(),
144        )
145        // we want to keep with_number_of_leaders param to be able to enable it in a future protocol
146        // upgrade
147        .with_number_of_leaders(1)
148        .with_pipeline(true)
149        .build();
150
151        // Recover the last proposed block
152        let last_proposed_block = dag_state.read().get_last_proposed_block();
153
154        let last_signaled_round = last_proposed_block.round();
155
156        // Recover the last included ancestor rounds based on the last proposed block.
157        // That will allow to perform the next block proposal by using ancestor
158        // blocks of higher rounds and avoid re-including blocks that have been
159        // already included in the last (or earlier) block proposal.
160        // This is only strongly guaranteed for a quorum of ancestors. It is still
161        // possible to re-include a block from an authority which hadn't been
162        // added as part of the last proposal hence its latest included ancestor
163        // is not accurately captured here. This is considered a small deficiency,
164        // and it mostly matters just for this next proposal without any actual
165        // penalties in performance or block proposal.
166        let mut last_included_ancestors = vec![None; context.committee.size()];
167        for ancestor in last_proposed_block.ancestors() {
168            last_included_ancestors[ancestor.author] = Some(*ancestor);
169        }
170
171        let min_propose_round = if sync_last_known_own_block {
172            None
173        } else {
174            // if the sync is disabled then we practically don't want to impose any
175            // restriction.
176            Some(0)
177        };
178
179        let propagation_scores = leader_schedule
180            .leader_swap_table
181            .read()
182            .reputation_scores
183            .clone();
184        let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
185        ancestor_state_manager.set_propagation_scores(propagation_scores);
186
187        Self {
188            context,
189            last_signaled_round,
190            last_included_ancestors,
191            last_decided_leader,
192            leader_schedule,
193            transaction_consumer,
194            block_manager,
195            subscriber_exists,
196            propagation_delay: 0,
197            committer,
198            commit_observer,
199            signals,
200            block_signer,
201            dag_state,
202            last_known_proposed_round: min_propose_round,
203            ancestor_state_manager,
204        }
205        .recover()
206    }
207
208    fn recover(mut self) -> Self {
209        let _s = self
210            .context
211            .metrics
212            .node_metrics
213            .scope_processing_time
214            .with_label_values(&["Core::recover"])
215            .start_timer();
216        // Ensure local time is after max ancestor timestamp.
217        let ancestor_blocks = self
218            .dag_state
219            .read()
220            .get_last_cached_block_per_authority(Round::MAX);
221        let max_ancestor_timestamp = ancestor_blocks
222            .iter()
223            .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms()));
224        let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms());
225        if wait_ms > 0 {
226            warn!(
227                "Waiting for {} ms while recovering ancestors from storage",
228                wait_ms
229            );
230            std::thread::sleep(Duration::from_millis(wait_ms));
231        }
232
233        // Try to commit and propose, since they may not have run after the last storage
234        // write.
235        self.try_commit(vec![]).unwrap();
236        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
237        {
238            last_proposed_block
239        } else {
240            let last_proposed_block = self.dag_state.read().get_last_proposed_block();
241            if self.should_propose() {
242                assert!(
243                    last_proposed_block.round() > GENESIS_ROUND,
244                    "At minimum a block of round higher than genesis should have been produced during recovery"
245                );
246            }
247
248            // if no new block proposed then just re-broadcast the last proposed one to
249            // ensure liveness.
250            self.signals
251                .new_block(ExtendedBlock {
252                    block: last_proposed_block.clone(),
253                    excluded_ancestors: vec![],
254                })
255                .unwrap();
256            last_proposed_block
257        };
258
259        // Try to set up leader timeout if needed.
260        // This needs to be called after try_commit() and try_propose(), which may
261        // have advanced the threshold clock round.
262        self.try_signal_new_round();
263
264        info!(
265            "Core recovery completed with last proposed block {:?}",
266            last_proposed_block
267        );
268
269        self
270    }
271
272    /// Processes the provided blocks and accepts them if possible when their
273    /// causal history exists. The method returns:
274    /// - The references of ancestors missing their block
275    #[tracing::instrument(skip_all)]
276    pub(crate) fn add_blocks(
277        &mut self,
278        blocks: Vec<VerifiedBlock>,
279    ) -> ConsensusResult<BTreeSet<BlockRef>> {
280        let _scope = monitored_scope("Core::add_blocks");
281        let _s = self
282            .context
283            .metrics
284            .node_metrics
285            .scope_processing_time
286            .with_label_values(&["Core::add_blocks"])
287            .start_timer();
288        self.context
289            .metrics
290            .node_metrics
291            .core_add_blocks_batch_size
292            .observe(blocks.len() as f64);
293
294        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
295
296        if !accepted_blocks.is_empty() {
297            debug!(
298                "Accepted blocks: {}",
299                accepted_blocks
300                    .iter()
301                    .map(|b| b.reference().to_string())
302                    .join(",")
303            );
304
305            // Try to commit the new blocks if possible.
306            self.try_commit(vec![])?;
307
308            // Try to propose now since there are new blocks accepted.
309            self.try_propose(false)?;
310
311            // Now set up leader timeout if needed.
312            // This needs to be called after try_commit() and try_propose(), which may
313            // have advanced the threshold clock round.
314            self.try_signal_new_round();
315        }
316
317        if !missing_block_refs.is_empty() {
318            trace!(
319                "Missing block refs: {}",
320                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
321            );
322        }
323        Ok(missing_block_refs)
324    }
325
326    /// Checks if provided block refs have been accepted. If not, missing block
327    /// refs are kept for synchronizations. Returns the references of
328    /// missing blocks among the input blocks.
329    pub(crate) fn check_block_refs(
330        &mut self,
331        block_refs: Vec<BlockRef>,
332    ) -> ConsensusResult<BTreeSet<BlockRef>> {
333        let _scope = monitored_scope("Core::check_block_refs");
334        let _s = self
335            .context
336            .metrics
337            .node_metrics
338            .scope_processing_time
339            .with_label_values(&["Core::check_block_refs"])
340            .start_timer();
341        self.context
342            .metrics
343            .node_metrics
344            .core_check_block_refs_batch_size
345            .observe(block_refs.len() as f64);
346
347        // Try to find them via the block manager
348        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
349
350        if !missing_block_refs.is_empty() {
351            trace!(
352                "Missing block refs: {}",
353                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
354            );
355        }
356        Ok(missing_block_refs)
357    }
358
359    // Adds the certified commits that have been synced via the commit syncer. We
360    // are using the commit info in order to skip running the decision
361    // rule and immediately commit the corresponding leaders and sub dags. Pay
362    // attention that no block acceptance is happening here, but rather
363    // internally in the `try_commit` method which ensures that everytime only the
364    // blocks corresponding to the certified commits that are about to
365    // be committed are accepted.
366    #[tracing::instrument(skip_all)]
367    pub(crate) fn add_certified_commits(
368        &mut self,
369        certified_commits: CertifiedCommits,
370    ) -> ConsensusResult<BTreeSet<BlockRef>> {
371        let _scope = monitored_scope("Core::add_certified_commits");
372
373        // We want to enable the commit process logic when GC is enabled.
374        if self.dag_state.read().gc_enabled() {
375            let votes = certified_commits.votes().to_vec();
376            let commits = self
377                .validate_certified_commits(certified_commits.commits().to_vec())
378                .expect("Certified commits validation failed");
379
380            // Accept the certified commit votes. This is optimistically done to increase
381            // the chances of having votes available when this node will need to
382            // sync commits to other nodes.
383            self.block_manager.try_accept_blocks(votes);
384
385            // Try to commit the new blocks. Take into account the trusted commit that has
386            // been provided.
387            self.try_commit(commits)?;
388
389            // Try to propose now since there are new blocks accepted.
390            self.try_propose(false)?;
391
392            // Now set up leader timeout if needed.
393            // This needs to be called after try_commit() and try_propose(), which may
394            // have advanced the threshold clock round.
395            self.try_signal_new_round();
396
397            return Ok(BTreeSet::new());
398        }
399
400        // If GC is not enabled then process blocks as usual.
401        let blocks = certified_commits
402            .commits()
403            .iter()
404            .flat_map(|commit| commit.blocks())
405            .cloned()
406            .collect::<Vec<_>>();
407
408        self.add_blocks(blocks)
409    }
410
411    /// If needed, signals a new clock round and sets up leader timeout.
412    fn try_signal_new_round(&mut self) {
413        // Signal only when the threshold clock round is more advanced than the last
414        // signaled round.
415        //
416        // NOTE: a signal is still sent even when a block has been proposed at the new
417        // round. We can consider changing this in the future.
418        let new_clock_round = self.dag_state.read().threshold_clock_round();
419        if new_clock_round <= self.last_signaled_round {
420            return;
421        }
422        // Then send a signal to set up leader timeout.
423        self.signals.new_round(new_clock_round);
424        self.last_signaled_round = new_clock_round;
425
426        // Report the threshold clock round
427        self.context
428            .metrics
429            .node_metrics
430            .threshold_clock_round
431            .set(new_clock_round as i64);
432    }
433
434    /// Creating a new block for the dictated round. This is used when a leader
435    /// timeout occurs, either when the min timeout expires or max. When
436    /// `force = true` , then any checks like previous round
437    /// leader existence will get skipped.
438    pub(crate) fn new_block(
439        &mut self,
440        round: Round,
441        force: bool,
442    ) -> ConsensusResult<Option<VerifiedBlock>> {
443        let _scope = monitored_scope("Core::new_block");
444        if self.last_proposed_round() < round {
445            self.context
446                .metrics
447                .node_metrics
448                .leader_timeout_total
449                .with_label_values(&[&format!("{force}")])
450                .inc();
451            let result = self.try_propose(force);
452            // The threshold clock round may have advanced, so a signal needs to be sent.
453            self.try_signal_new_round();
454            return result;
455        }
456        Ok(None)
457    }
458
459    /// Keeps only the certified commits that have a commit index > last commit
460    /// index. It also ensures that the first commit in the list is the next one
461    /// in line, otherwise it panics.
462    fn validate_certified_commits(
463        &mut self,
464        commits: Vec<CertifiedCommit>,
465    ) -> ConsensusResult<Vec<CertifiedCommit>> {
466        // Filter out the commits that have been already locally committed and keep only
467        // anything that is above the last committed index.
468        let last_commit_index = self.dag_state.read().last_commit_index();
469        let commits = commits
470            .iter()
471            .filter(|commit| {
472                if commit.index() > last_commit_index {
473                    true
474                } else {
475                    tracing::debug!(
476                        "Skip commit for index {} as it is already committed with last commit index {}",
477                        commit.index(),
478                        last_commit_index
479                    );
480                    false
481                }
482            })
483            .cloned()
484            .collect::<Vec<_>>();
485
486        // Make sure that the first commit we find is the next one in line and there is
487        // no gap.
488        if let Some(commit) = commits.first() {
489            if commit.index() != last_commit_index + 1 {
490                return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
491                    expected_commit_index: last_commit_index + 1,
492                    commit_index: commit.index(),
493                });
494            }
495        }
496
497        Ok(commits)
498    }
499
500    // Attempts to create a new block, persist and propose it to all peers.
501    // When force is true, ignore if leader from the last round exists among
502    // ancestors and if the minimum round delay has passed.
503    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
504        if !self.should_propose() {
505            return Ok(None);
506        }
507        if let Some(extended_block) = self.try_new_block(force) {
508            self.signals.new_block(extended_block.clone())?;
509
510            fail_point!("consensus-after-propose");
511
512            // The new block may help commit.
513            self.try_commit(vec![])?;
514            return Ok(Some(extended_block.block));
515        }
516        Ok(None)
517    }
518
519    /// Attempts to propose a new block for the next round. If a block has
520    /// already proposed for latest or earlier round, then no block is
521    /// created and None is returned.
522    fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
523        let _s = self
524            .context
525            .metrics
526            .node_metrics
527            .scope_processing_time
528            .with_label_values(&["Core::try_new_block"])
529            .start_timer();
530
531        // Ensure the new block has a higher round than the last proposed block.
532        let clock_round = {
533            let dag_state = self.dag_state.read();
534            let clock_round = dag_state.threshold_clock_round();
535            if clock_round <= dag_state.get_last_proposed_block().round() {
536                return None;
537            }
538            clock_round
539        };
540
541        // There must be a quorum of blocks from the previous round.
542        let quorum_round = clock_round.saturating_sub(1);
543
544        // Create a new block either because we want to "forcefully" propose a block due
545        // to a leader timeout, or because we are actually ready to produce the
546        // block (leader exists and min delay has passed).
547        if !force {
548            if !self.leaders_exist(quorum_round) {
549                return None;
550            }
551
552            if Duration::from_millis(
553                self.context
554                    .clock
555                    .timestamp_utc_ms()
556                    .saturating_sub(self.last_proposed_timestamp_ms()),
557            ) < self.context.parameters.min_round_delay
558            {
559                return None;
560            }
561        }
562
563        // Determine the ancestors to be included in proposal.
564        let (ancestors, excluded_and_equivocating_ancestors) =
565            self.smart_ancestors_to_propose(clock_round, !force);
566
567        // If we did not find enough good ancestors to propose, continue to wait before
568        // proposing.
569        if ancestors.is_empty() {
570            assert!(
571                !force,
572                "Ancestors should have been returned if force is true!"
573            );
574            return None;
575        }
576
577        let excluded_ancestors_limit = self.context.committee.size() * 2;
578        if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
579            debug!(
580                "Dropping {} excluded ancestor(s) during proposal due to size limit",
581                excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
582            );
583        }
584        let excluded_ancestors = excluded_and_equivocating_ancestors
585            .into_iter()
586            .take(excluded_ancestors_limit)
587            .collect();
588
589        // Update the last included ancestor block refs
590        for ancestor in &ancestors {
591            self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
592        }
593
594        let leader_authority = &self
595            .context
596            .committee
597            .authority(self.first_leader(quorum_round))
598            .hostname;
599        self.context
600            .metrics
601            .node_metrics
602            .block_proposal_leader_wait_ms
603            .with_label_values(&[leader_authority])
604            .inc_by(
605                Instant::now()
606                    .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
607                    .as_millis() as u64,
608            );
609        self.context
610            .metrics
611            .node_metrics
612            .block_proposal_leader_wait_count
613            .with_label_values(&[leader_authority])
614            .inc();
615
616        self.context
617            .metrics
618            .node_metrics
619            .proposed_block_ancestors
620            .observe(ancestors.len() as f64);
621        for ancestor in &ancestors {
622            let authority = &self.context.committee.authority(ancestor.author()).hostname;
623            self.context
624                .metrics
625                .node_metrics
626                .proposed_block_ancestors_depth
627                .with_label_values(&[authority])
628                .observe(clock_round.saturating_sub(ancestor.round()).into());
629        }
630
631        // Ensure ancestor timestamps are not more advanced than the current time.
632        // Also catch the issue if system's clock go backwards.
633        let now = self.context.clock.timestamp_utc_ms();
634        ancestors.iter().for_each(|block| {
635            assert!(
636                block.timestamp_ms() <= now,
637                "Violation: ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.",
638                block, block.timestamp_ms(), clock_round
639            );
640        });
641
642        // Consume the next transactions to be included. Do not drop the guards yet as
643        // this would acknowledge the inclusion of transactions. Just let this
644        // be done in the end of the method.
645        let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
646        self.context
647            .metrics
648            .node_metrics
649            .proposed_block_transactions
650            .observe(transactions.len() as f64);
651
652        // Consume the commit votes to be included.
653        let commit_votes = self
654            .dag_state
655            .write()
656            .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
657
658        // Create the block and insert to storage.
659        let block = Block::V1(BlockV1::new(
660            self.context.committee.epoch(),
661            clock_round,
662            self.context.own_index,
663            now,
664            ancestors.iter().map(|b| b.reference()).collect(),
665            transactions,
666            commit_votes,
667            vec![],
668        ));
669        let signed_block =
670            SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
671        let serialized = signed_block
672            .serialize()
673            .expect("Block serialization failed.");
674        self.context
675            .metrics
676            .node_metrics
677            .proposed_block_size
678            .observe(serialized.len() as f64);
679        // Own blocks are assumed to be valid.
680        let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
681
682        // Record the interval from last proposal, before accepting the proposed block.
683        let last_proposed_block = self.last_proposed_block();
684        if last_proposed_block.round() > 0 {
685            self.context
686                .metrics
687                .node_metrics
688                .block_proposal_interval
689                .observe(
690                    Duration::from_millis(
691                        verified_block
692                            .timestamp_ms()
693                            .saturating_sub(last_proposed_block.timestamp_ms()),
694                    )
695                    .as_secs_f64(),
696                );
697        }
698
699        // Accept the block into BlockManager and DagState.
700        let (accepted_blocks, missing) = self
701            .block_manager
702            .try_accept_blocks(vec![verified_block.clone()]);
703        assert_eq!(accepted_blocks.len(), 1);
704        assert!(missing.is_empty());
705
706        // Ensure the new block and its ancestors are persisted, before broadcasting it.
707        self.dag_state.write().flush();
708
709        // Now acknowledge the transactions for their inclusion to block
710        ack_transactions(verified_block.reference());
711
712        debug!("Created block {verified_block:?} for round {clock_round}");
713
714        self.context
715            .metrics
716            .node_metrics
717            .proposed_blocks
718            .with_label_values(&[&force.to_string()])
719            .inc();
720
721        Some(ExtendedBlock {
722            block: verified_block,
723            excluded_ancestors,
724        })
725    }
726
727    /// Runs commit rule to attempt to commit additional blocks from the DAG. If
728    /// any `certified_commits` are provided, then it will attempt to commit
729    /// those first before trying to commit any further leaders.
730    fn try_commit(
731        &mut self,
732        mut certified_commits: Vec<CertifiedCommit>,
733    ) -> ConsensusResult<Vec<CommittedSubDag>> {
734        let _s = self
735            .context
736            .metrics
737            .node_metrics
738            .scope_processing_time
739            .with_label_values(&["Core::try_commit"])
740            .start_timer();
741
742        let mut certified_commits_map = BTreeMap::new();
743        for c in &certified_commits {
744            certified_commits_map.insert(c.index(), c.reference());
745        }
746
747        if !certified_commits.is_empty() {
748            info!(
749                "Will try to commit synced commits first : {:?}",
750                certified_commits
751                    .iter()
752                    .map(|c| (c.index(), c.leader()))
753                    .collect::<Vec<_>>()
754            );
755        }
756
757        let mut committed_sub_dags = Vec::new();
758        // TODO: Add optimization to abort early without quorum for a round.
759        loop {
760            // LeaderSchedule has a limit to how many sequenced leaders can be committed
761            // before a change is triggered. Calling into leader schedule will get you
762            // how many commits till next leader change. We will loop back and recalculate
763            // any discarded leaders with the new schedule.
764            let mut commits_until_update = self
765                .leader_schedule
766                .commits_until_leader_schedule_update(self.dag_state.clone());
767
768            if commits_until_update == 0 {
769                let last_commit_index = self.dag_state.read().last_commit_index();
770
771                tracing::info!(
772                    "Leader schedule change triggered at commit index {last_commit_index}"
773                );
774                if self
775                    .context
776                    .protocol_config
777                    .consensus_distributed_vote_scoring_strategy()
778                {
779                    self.leader_schedule
780                        .update_leader_schedule_v2(&self.dag_state);
781
782                    let propagation_scores = self
783                        .leader_schedule
784                        .leader_swap_table
785                        .read()
786                        .reputation_scores
787                        .clone();
788                    self.ancestor_state_manager
789                        .set_propagation_scores(propagation_scores);
790                } else {
791                    self.leader_schedule
792                        .update_leader_schedule_v1(&self.dag_state);
793                }
794                commits_until_update = self
795                    .leader_schedule
796                    .commits_until_leader_schedule_update(self.dag_state.clone());
797
798                fail_point!("consensus-after-leader-schedule-change");
799            }
800            assert!(commits_until_update > 0);
801
802            // Always try to process the synced commits first. If there are certified
803            // commits to process then the decided leaders and the commits will be returned.
804            let (mut decided_leaders, decided_certified_commits): (
805                Vec<DecidedLeader>,
806                Vec<CertifiedCommit>,
807            ) = self
808                .try_decide_certified(&mut certified_commits, commits_until_update)
809                .into_iter()
810                .unzip();
811
812            // Only accept blocks for the certified commits that we are certain to sequence.
813            // This ensures that only blocks corresponding to committed certified commits
814            // are flushed to disk. Blocks from non-committed certified commits
815            // will not be flushed, preventing issues during crash-recovery.
816            // This avoids scenarios where accepting and flushing blocks of non-committed
817            // certified commits could lead to premature commit rule execution.
818            // Due to GC, this could cause a panic if the commit rule tries to access
819            // missing causal history from blocks of certified commits.
820            let blocks = decided_certified_commits
821                .iter()
822                .flat_map(|c| c.blocks())
823                .cloned()
824                .collect::<Vec<_>>();
825            self.block_manager.try_accept_committed_blocks(blocks);
826
827            // If the certified `decided_leaders` is empty then try to run the decision
828            // rule.
829            if decided_leaders.is_empty() {
830                // TODO: limit commits by commits_until_update, which may be needed when leader
831                // schedule length is reduced.
832                decided_leaders = self.committer.try_decide(self.last_decided_leader);
833
834                // Truncate the decided leaders to fit the commit schedule limit.
835                if decided_leaders.len() >= commits_until_update {
836                    let _ = decided_leaders.split_off(commits_until_update);
837                }
838            }
839
840            // If the decided leaders list is empty then just break the loop.
841            let Some(last_decided) = decided_leaders.last().cloned() else {
842                break;
843            };
844
845            self.last_decided_leader = last_decided.slot();
846
847            let sequenced_leaders = decided_leaders
848                .into_iter()
849                .filter_map(|leader| leader.into_committed_block())
850                .collect::<Vec<_>>();
851
852            tracing::debug!(
853                "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
854                sequenced_leaders.len()
855            );
856
857            self.context
858                .metrics
859                .node_metrics
860                .last_decided_leader_round
861                .set(self.last_decided_leader.round as i64);
862
863            // It's possible to reach this point as the decided leaders might all of them be
864            // "Skip" decisions. In this case there is no leader to commit and
865            // we should break the loop.
866            if sequenced_leaders.is_empty() {
867                break;
868            }
869
870            tracing::info!(
871                "Committing {} leaders: {}",
872                sequenced_leaders.len(),
873                sequenced_leaders
874                    .iter()
875                    .map(|b| b.reference().to_string())
876                    .join(",")
877            );
878
879            // TODO: refcount subdags
880            let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
881            if self
882                .context
883                .protocol_config
884                .consensus_distributed_vote_scoring_strategy()
885            {
886                self.dag_state.write().add_scoring_subdags(subdags.clone());
887            } else {
888                // TODO: Remove when DistributedVoteScoring is enabled.
889                self.dag_state
890                    .write()
891                    .add_unscored_committed_subdags(subdags.clone());
892            }
893
894            // Try to unsuspend blocks if gc_round has advanced.
895            self.block_manager
896                .try_unsuspend_blocks_for_latest_gc_round();
897            committed_sub_dags.extend(subdags);
898
899            fail_point!("consensus-after-handle-commit");
900        }
901
902        // Sanity check: for commits that have been linearized using the certified
903        // commits, ensure that the same sub dag has been committed.
904        for sub_dag in &committed_sub_dags {
905            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
906                assert_eq!(
907                    commit_ref, sub_dag.commit_ref,
908                    "Certified commit has different reference than the committed sub dag"
909                );
910            }
911        }
912
913        // Notify about our own committed blocks
914        let committed_block_refs = committed_sub_dags
915            .iter()
916            .flat_map(|sub_dag| sub_dag.blocks.iter())
917            .filter_map(|block| {
918                (block.author() == self.context.own_index).then_some(block.reference())
919            })
920            .collect::<Vec<_>>();
921        self.transaction_consumer
922            .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
923
924        Ok(committed_sub_dags)
925    }
926
927    pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
928        let _scope = monitored_scope("Core::get_missing_blocks");
929        self.block_manager.missing_blocks()
930    }
931
932    /// Sets if there is consumer available to consume blocks produced by the
933    /// core.
934    pub(crate) fn set_subscriber_exists(&mut self, exists: bool) {
935        info!("Block subscriber exists: {exists}");
936        self.subscriber_exists = exists;
937    }
938
939    /// Sets the delay by round for propagating blocks to a quorum and the
940    /// received & accepted quorum rounds per authority for ancestor state
941    /// manager.
942    pub(crate) fn set_propagation_delay_and_quorum_rounds(
943        &mut self,
944        delay: Round,
945        received_quorum_rounds: Vec<QuorumRound>,
946        accepted_quorum_rounds: Vec<QuorumRound>,
947    ) {
948        info!(
949            "Received quorum round per authority in ancestor state manager set to: {}",
950            self.context
951                .committee
952                .authorities()
953                .zip(received_quorum_rounds.iter())
954                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
955                .join(", ")
956        );
957        info!(
958            "Accepted quorum round per authority in ancestor state manager set to: {}",
959            self.context
960                .committee
961                .authorities()
962                .zip(accepted_quorum_rounds.iter())
963                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
964                .join(", ")
965        );
966        self.ancestor_state_manager
967            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
968        info!("Propagation round delay set to: {delay}");
969        self.propagation_delay = delay;
970    }
971
972    /// Sets the min propose round for the proposer allowing to propose blocks
973    /// only for round numbers `> last_known_proposed_round`. At the moment
974    /// is allowed to call the method only once leading to a panic
975    /// if attempt to do multiple times.
976    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
977        if self.last_known_proposed_round.is_some() {
978            panic!(
979                "Should not attempt to set the last known proposed round if that has been already set"
980            );
981        }
982        self.last_known_proposed_round = Some(round);
983        info!("Last known proposed round set to {round}");
984    }
985
986    /// Whether the core should propose new blocks.
987    pub(crate) fn should_propose(&self) -> bool {
988        let clock_round = self.dag_state.read().threshold_clock_round();
989        let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
990
991        if !self.subscriber_exists {
992            debug!("Skip proposing for round {clock_round}, no subscriber exists.");
993            core_skipped_proposals
994                .with_label_values(&["no_subscriber"])
995                .inc();
996            return false;
997        }
998
999        if self.propagation_delay
1000            > self
1001                .context
1002                .parameters
1003                .propagation_delay_stop_proposal_threshold
1004        {
1005            debug!(
1006                "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
1007                self.propagation_delay,
1008                self.context
1009                    .parameters
1010                    .propagation_delay_stop_proposal_threshold
1011            );
1012            core_skipped_proposals
1013                .with_label_values(&["high_propagation_delay"])
1014                .inc();
1015            return false;
1016        }
1017
1018        let Some(last_known_proposed_round) = self.last_known_proposed_round else {
1019            debug!(
1020                "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
1021            );
1022            core_skipped_proposals
1023                .with_label_values(&["no_last_known_proposed_round"])
1024                .inc();
1025            return false;
1026        };
1027        if clock_round <= last_known_proposed_round {
1028            debug!(
1029                "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
1030            );
1031            core_skipped_proposals
1032                .with_label_values(&["higher_last_known_proposed_round"])
1033                .inc();
1034            return false;
1035        }
1036
1037        true
1038    }
1039
1040    // Try to decide which of the certified commits will have to be committed next
1041    // respecting the `limit`. If provided `limit` is zero, it will panic.
1042    // The function returns the list of decided leaders and updates in place the
1043    // remaining certified commits. If empty vector is returned, it means that
1044    // there are no certified commits to be committed as `certified_commits` is
1045    // either empty or all of the certified commits are already committed.
1046    #[tracing::instrument(skip_all)]
1047    fn try_decide_certified(
1048        &mut self,
1049        certified_commits: &mut Vec<CertifiedCommit>,
1050        limit: usize,
1051    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1052        // If GC is disabled then should not run any of this logic.
1053        if !self.dag_state.read().gc_enabled() {
1054            return Vec::new();
1055        }
1056
1057        assert!(limit > 0, "limit should be greater than 0");
1058
1059        let to_commit = if certified_commits.len() >= limit {
1060            // We keep only the number of leaders as dictated by the `limit`
1061            certified_commits.drain(..limit).collect::<Vec<_>>()
1062        } else {
1063            // Otherwise just take all of them and leave the `synced_commits` empty.
1064            mem::take(certified_commits)
1065        };
1066
1067        tracing::debug!(
1068            "Decided {} certified leaders: {}",
1069            to_commit.len(),
1070            to_commit.iter().map(|c| c.leader().to_string()).join(",")
1071        );
1072
1073        let sequenced_leaders = to_commit
1074            .into_iter()
1075            .map(|commit| {
1076                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1077                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1078                let leader = DecidedLeader::Commit(leader.clone());
1079                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1080                (leader, commit)
1081            })
1082            .collect::<Vec<_>>();
1083
1084        sequenced_leaders
1085    }
1086
1087    /// Retrieves the next ancestors to propose to form a block at `clock_round`
1088    /// round. If smart selection is enabled then this will try to select
1089    /// the best ancestors based on the propagation scores of the
1090    /// authorities.
1091    fn smart_ancestors_to_propose(
1092        &mut self,
1093        clock_round: Round,
1094        smart_select: bool,
1095    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1096        let node_metrics = &self.context.metrics.node_metrics;
1097        let _s = node_metrics
1098            .scope_processing_time
1099            .with_label_values(&["Core::smart_ancestors_to_propose"])
1100            .start_timer();
1101
1102        // Now take the ancestors before the clock_round (excluded) for each authority.
1103        let all_ancestors = self
1104            .dag_state
1105            .read()
1106            .get_last_cached_block_per_authority(clock_round);
1107
1108        assert_eq!(
1109            all_ancestors.len(),
1110            self.context.committee.size(),
1111            "Fatal error, number of returned ancestors don't match committee size."
1112        );
1113
1114        // Ensure ancestor state is up to date before selecting for proposal.
1115        self.ancestor_state_manager.update_all_ancestors_state();
1116        let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1117
1118        let quorum_round = clock_round.saturating_sub(1);
1119
1120        let mut score_and_pending_excluded_ancestors = Vec::new();
1121        let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1122
1123        // Propose only ancestors of higher rounds than what has already been proposed.
1124        // And always include own last proposed block first among ancestors.
1125        // Start by only including the high scoring ancestors. Low scoring ancestors
1126        // will be included in a second pass below.
1127        let included_ancestors = iter::once(self.last_proposed_block().clone())
1128            .chain(
1129                all_ancestors
1130                    .into_iter()
1131                    .flat_map(|(ancestor, equivocating_ancestors)| {
1132                        if ancestor.author() == self.context.own_index {
1133                            return None;
1134                        }
1135                        if let Some(last_block_ref) =
1136                            self.last_included_ancestors[ancestor.author()]
1137                        {
1138                            if last_block_ref.round >= ancestor.round() {
1139                                return None;
1140                            }
1141                        }
1142
1143                        // We will never include equivocating ancestors so add them immediately
1144                        excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1145
1146                        let ancestor_state = ancestor_state_map[ancestor.author()];
1147                        match ancestor_state {
1148                            AncestorState::Include => {
1149                                trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1150                            }
1151                            AncestorState::Exclude(score) => {
1152                                trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1153                                score_and_pending_excluded_ancestors.push((score, ancestor));
1154                                return None;
1155                            }
1156                        }
1157
1158                        Some(ancestor)
1159                    }),
1160            )
1161            .collect::<Vec<_>>();
1162
1163        let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1164
1165        // Check total stake of high scoring parent round ancestors
1166        for ancestor in included_ancestors
1167            .iter()
1168            .filter(|a| a.round() == quorum_round)
1169        {
1170            parent_round_quorum.add(ancestor.author(), &self.context.committee);
1171        }
1172
1173        if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1174            node_metrics.smart_selection_wait.inc();
1175            debug!(
1176                "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1177                parent_round_quorum.stake()
1178            );
1179            return (vec![], BTreeSet::new());
1180        }
1181
1182        // Sort scores descending so we can include the best of the pending excluded
1183        // ancestors first until we reach the threshold.
1184        score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1185
1186        let mut ancestors_to_propose = included_ancestors;
1187        let mut excluded_ancestors = Vec::new();
1188        for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1189            let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1190            if !parent_round_quorum.reached_threshold(&self.context.committee)
1191                && ancestor.round() == quorum_round
1192            {
1193                debug!(
1194                    "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1195                );
1196                parent_round_quorum.add(ancestor.author(), &self.context.committee);
1197                ancestors_to_propose.push(ancestor);
1198                node_metrics
1199                    .included_excluded_proposal_ancestors_count_by_authority
1200                    .with_label_values(&[block_hostname.as_str(), "timeout"])
1201                    .inc();
1202            } else {
1203                excluded_ancestors.push((score, ancestor));
1204            }
1205        }
1206
1207        // Iterate through excluded ancestors and include the ancestor or the ancestor's
1208        // ancestor that has been accepted by a quorum of the network. If the
1209        // original ancestor itself is not included then it will be part of
1210        // excluded ancestors that are not included in the block but will still
1211        // be broadcasted to peers.
1212        for (score, ancestor) in excluded_ancestors.iter() {
1213            let excluded_author = ancestor.author();
1214            let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1215            // A quorum of validators reported to have accepted blocks from the
1216            // excluded_author up to the low quorum round.
1217            let mut accepted_low_quorum_round = self
1218                .ancestor_state_manager
1219                .accepted_quorum_round_per_authority[excluded_author]
1220                .0;
1221            // If the accepted quorum round of this ancestor is greater than or equal
1222            // to the clock round then we want to make sure to set it to clock_round - 1
1223            // as that is the max round the new block can include as an ancestor.
1224            accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1225
1226            let last_included_round = self.last_included_ancestors[excluded_author]
1227                .map(|block_ref| block_ref.round)
1228                .unwrap_or(GENESIS_ROUND);
1229            if ancestor.round() <= last_included_round {
1230                // This should have already been filtered out when filtering all_ancestors.
1231                // Still, ensure previously included ancestors are filtered out.
1232                continue;
1233            }
1234
1235            if last_included_round >= accepted_low_quorum_round {
1236                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1237                trace!(
1238                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1239                    ancestor.reference()
1240                );
1241                node_metrics
1242                    .excluded_proposal_ancestors_count_by_authority
1243                    .with_label_values(&[block_hostname])
1244                    .inc();
1245                continue;
1246            }
1247
1248            let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1249                // Include the ancestor block as it has been seen & accepted by a strong quorum.
1250                ancestor.clone()
1251            } else {
1252                // Exclude this ancestor since it hasn't been accepted by a strong quorum
1253                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1254                trace!(
1255                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1256                    ancestor.reference(),
1257                    ancestor.round()
1258                );
1259                node_metrics
1260                    .excluded_proposal_ancestors_count_by_authority
1261                    .with_label_values(&[block_hostname])
1262                    .inc();
1263
1264                // Look for an earlier block in the ancestor chain that we can include as there
1265                // is a gap between the last included round and the accepted low quorum round.
1266                //
1267                // Note: Only cached blocks need to be propagated. Committed and GC'ed blocks
1268                // do not need to be propagated.
1269                match self.dag_state.read().get_last_cached_block_in_range(
1270                    excluded_author,
1271                    last_included_round + 1,
1272                    accepted_low_quorum_round + 1,
1273                ) {
1274                    Some(earlier_ancestor) => {
1275                        // Found an earlier block that has been propagated well - include it instead
1276                        earlier_ancestor
1277                    }
1278                    None => {
1279                        // No suitable earlier block found
1280                        continue;
1281                    }
1282                }
1283            };
1284            self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1285            ancestors_to_propose.push(ancestor.clone());
1286            trace!(
1287                "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1288                ancestor.reference()
1289            );
1290            node_metrics
1291                .included_excluded_proposal_ancestors_count_by_authority
1292                .with_label_values(&[block_hostname.as_str(), "quorum"])
1293                .inc();
1294        }
1295
1296        assert!(
1297            parent_round_quorum.reached_threshold(&self.context.committee),
1298            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1299        );
1300
1301        info!(
1302            "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1303            ancestors_to_propose.len(),
1304            excluded_and_equivocating_ancestors.len()
1305        );
1306
1307        (ancestors_to_propose, excluded_and_equivocating_ancestors)
1308    }
1309
1310    /// Checks whether all the leaders of the round exist.
1311    /// TODO: we can leverage some additional signal here in order to more
1312    /// cleverly manipulate later the leader timeout Ex if we already have
1313    /// one leader - the first in order - we might don't want to wait as much.
1314    fn leaders_exist(&self, round: Round) -> bool {
1315        let dag_state = self.dag_state.read();
1316        for leader in self.leaders(round) {
1317            // Search for all the leaders. If at least one is not found, then return false.
1318            // A linear search should be fine here as the set of elements is not expected to
1319            // be small enough and more sophisticated data structures might not
1320            // give us much here.
1321            if !dag_state.contains_cached_block_at_slot(leader) {
1322                return false;
1323            }
1324        }
1325
1326        true
1327    }
1328
1329    /// Returns the leaders of the provided round.
1330    fn leaders(&self, round: Round) -> Vec<Slot> {
1331        self.committer
1332            .get_leaders(round)
1333            .into_iter()
1334            .map(|authority_index| Slot::new(round, authority_index))
1335            .collect()
1336    }
1337
1338    /// Returns the 1st leader of the round.
1339    fn first_leader(&self, round: Round) -> AuthorityIndex {
1340        self.leaders(round).first().unwrap().authority
1341    }
1342
1343    fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1344        self.last_proposed_block().timestamp_ms()
1345    }
1346
1347    fn last_proposed_round(&self) -> Round {
1348        self.last_proposed_block().round()
1349    }
1350
1351    fn last_proposed_block(&self) -> VerifiedBlock {
1352        self.dag_state.read().get_last_proposed_block()
1353    }
1354}
1355
1356/// Senders of signals from Core, for outputs and events (ex new block
1357/// produced).
1358pub(crate) struct CoreSignals {
1359    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1360    new_round_sender: watch::Sender<Round>,
1361    context: Arc<Context>,
1362}
1363
1364impl CoreSignals {
1365    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1366        // Blocks buffered in broadcast channel should be roughly equal to thosed cached
1367        // in dag state, since the underlying blocks are ref counted so a lower
1368        // buffer here will not reduce memory usage significantly.
1369        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1370            context.parameters.dag_state_cached_rounds as usize,
1371        );
1372        let (new_round_sender, new_round_receiver) = watch::channel(0);
1373
1374        let me = Self {
1375            tx_block_broadcast,
1376            new_round_sender,
1377            context,
1378        };
1379
1380        let receivers = CoreSignalsReceivers {
1381            rx_block_broadcast,
1382            new_round_receiver,
1383        };
1384
1385        (me, receivers)
1386    }
1387
1388    /// Sends a signal to all the waiters that a new block has been produced.
1389    /// The method will return true if block has reached even one
1390    /// subscriber, false otherwise.
1391    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1392        // When there is only one authority in committee, it is unnecessary to broadcast
1393        // the block which will fail anyway without subscribers to the signal.
1394        if self.context.committee.size() > 1 {
1395            if extended_block.block.round() == GENESIS_ROUND {
1396                debug!("Ignoring broadcasting genesis block to peers");
1397                return Ok(());
1398            }
1399
1400            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1401                warn!("Couldn't broadcast the block to any receiver: {err}");
1402                return Err(ConsensusError::Shutdown);
1403            }
1404        } else {
1405            debug!(
1406                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1407            );
1408        }
1409        Ok(())
1410    }
1411
1412    /// Sends a signal that threshold clock has advanced to new round. The
1413    /// `round_number` is the round at which the threshold clock has
1414    /// advanced to.
1415    pub(crate) fn new_round(&mut self, round_number: Round) {
1416        let _ = self.new_round_sender.send_replace(round_number);
1417    }
1418}
1419
1420/// Receivers of signals from Core.
1421/// Intentionally un-cloneable. Components should only subscribe to channels
1422/// they need.
1423pub(crate) struct CoreSignalsReceivers {
1424    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1425    new_round_receiver: watch::Receiver<Round>,
1426}
1427
1428impl CoreSignalsReceivers {
1429    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1430        self.rx_block_broadcast.resubscribe()
1431    }
1432
1433    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1434        self.new_round_receiver.clone()
1435    }
1436}
1437
1438/// Creates cores for the specified number of authorities for their
1439/// corresponding stakes. The method returns the cores and their respective
1440/// signal receivers are returned in `AuthorityIndex` order asc.
1441#[cfg(test)]
1442pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<CoreTextFixture> {
1443    let mut cores = Vec::new();
1444
1445    for index in 0..authorities.len() {
1446        let own_index = AuthorityIndex::new_for_test(index as u32);
1447        let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
1448        cores.push(core);
1449    }
1450    cores
1451}
1452
1453#[cfg(test)]
1454pub(crate) struct CoreTextFixture {
1455    pub core: Core,
1456    pub signal_receivers: CoreSignalsReceivers,
1457    pub block_receiver: broadcast::Receiver<ExtendedBlock>,
1458    #[expect(unused)]
1459    pub commit_receiver: UnboundedReceiver<CommittedSubDag>,
1460    pub store: Arc<MemStore>,
1461}
1462
1463#[cfg(test)]
1464impl CoreTextFixture {
1465    fn new(
1466        context: Context,
1467        authorities: Vec<Stake>,
1468        own_index: AuthorityIndex,
1469        sync_last_known_own_block: bool,
1470    ) -> Self {
1471        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1472        let mut context = context.clone();
1473        context = context
1474            .with_committee(committee)
1475            .with_authority_index(own_index);
1476        context
1477            .protocol_config
1478            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1479
1480        let context = Arc::new(context);
1481        let store = Arc::new(MemStore::new());
1482        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1483
1484        let block_manager = BlockManager::new(
1485            context.clone(),
1486            dag_state.clone(),
1487            Arc::new(NoopBlockVerifier),
1488        );
1489        let leader_schedule = Arc::new(
1490            LeaderSchedule::from_store(context.clone(), dag_state.clone())
1491                .with_num_commits_per_schedule(10),
1492        );
1493        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1494        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1495        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1496        // Need at least one subscriber to the block broadcast channel.
1497        let block_receiver = signal_receivers.block_broadcast_receiver();
1498
1499        let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
1500        let commit_observer = CommitObserver::new(
1501            context.clone(),
1502            CommitConsumer::new(commit_sender.clone(), 0),
1503            dag_state.clone(),
1504            store.clone(),
1505            leader_schedule.clone(),
1506        );
1507
1508        let block_signer = signers.remove(own_index.value()).1;
1509
1510        let core = Core::new(
1511            context,
1512            leader_schedule,
1513            transaction_consumer,
1514            block_manager,
1515            true,
1516            commit_observer,
1517            signals,
1518            block_signer,
1519            dag_state,
1520            sync_last_known_own_block,
1521        );
1522
1523        Self {
1524            core,
1525            signal_receivers,
1526            block_receiver,
1527            commit_receiver,
1528            store,
1529        }
1530    }
1531}
1532
1533#[cfg(test)]
1534mod test {
1535    use std::{collections::BTreeSet, time::Duration};
1536
1537    use consensus_config::{AuthorityIndex, Parameters};
1538    use futures::{StreamExt, stream::FuturesUnordered};
1539    use iota_metrics::monitored_mpsc::unbounded_channel;
1540    use iota_protocol_config::ProtocolConfig;
1541    use rstest::rstest;
1542    use tokio::time::sleep;
1543
1544    use super::*;
1545    use crate::{
1546        CommitConsumer, CommitIndex,
1547        block::{TestBlock, genesis_blocks},
1548        block_verifier::NoopBlockVerifier,
1549        commit::CommitAPI,
1550        leader_scoring::ReputationScores,
1551        storage::{Store, WriteBatch, mem_store::MemStore},
1552        test_dag_builder::DagBuilder,
1553        test_dag_parser::parse_dag,
1554        transaction::{BlockStatus, TransactionClient},
1555    };
1556
1557    /// Recover Core and continue proposing from the last round which forms a
1558    /// quorum.
1559    #[tokio::test]
1560    async fn test_core_recover_from_store_for_full_round() {
1561        telemetry_subscribers::init_for_testing();
1562        let (context, mut key_pairs) = Context::new_for_test(4);
1563        let context = Arc::new(context);
1564        let store = Arc::new(MemStore::new());
1565        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1566        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1567        let mut block_status_subscriptions = FuturesUnordered::new();
1568
1569        // Create test blocks for all the authorities for 4 rounds and populate them in
1570        // store
1571        let mut last_round_blocks = genesis_blocks(context.clone());
1572        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1573        for round in 1..=4 {
1574            let mut this_round_blocks = Vec::new();
1575            for (index, _authority) in context.committee.authorities() {
1576                let block = VerifiedBlock::new_for_test(
1577                    TestBlock::new(round, index.value() as u32)
1578                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1579                        .build(),
1580                );
1581
1582                // If it's round 1, that one will be committed later on, and it's our "own"
1583                // block, then subscribe to listen for the block status.
1584                if round == 1 && index == context.own_index {
1585                    let subscription =
1586                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1587                    block_status_subscriptions.push(subscription);
1588                }
1589
1590                this_round_blocks.push(block);
1591            }
1592            all_blocks.extend(this_round_blocks.clone());
1593            last_round_blocks = this_round_blocks;
1594        }
1595        // write them in store
1596        store
1597            .write(WriteBatch::default().blocks(all_blocks))
1598            .expect("Storage error");
1599
1600        // create dag state after all blocks have been written to store
1601        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1602        let block_manager = BlockManager::new(
1603            context.clone(),
1604            dag_state.clone(),
1605            Arc::new(NoopBlockVerifier),
1606        );
1607        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1608            context.clone(),
1609            dag_state.clone(),
1610        ));
1611
1612        let (sender, _receiver) = unbounded_channel("consensus_output");
1613        let commit_observer = CommitObserver::new(
1614            context.clone(),
1615            CommitConsumer::new(sender.clone(), 0),
1616            dag_state.clone(),
1617            store.clone(),
1618            leader_schedule.clone(),
1619        );
1620
1621        // Check no commits have been persisted to dag_state or store.
1622        let last_commit = store.read_last_commit().unwrap();
1623        assert!(last_commit.is_none());
1624        assert_eq!(dag_state.read().last_commit_index(), 0);
1625
1626        // Now spin up core
1627        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1628        // Need at least one subscriber to the block broadcast channel.
1629        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1630        let _core = Core::new(
1631            context.clone(),
1632            leader_schedule,
1633            transaction_consumer,
1634            block_manager,
1635            true,
1636            commit_observer,
1637            signals,
1638            key_pairs.remove(context.own_index.value()).1,
1639            dag_state.clone(),
1640            false,
1641        );
1642
1643        // New round should be 5
1644        let mut new_round = signal_receivers.new_round_receiver();
1645        assert_eq!(*new_round.borrow_and_update(), 5);
1646
1647        // Block for round 5 should have been proposed.
1648        let proposed_block = block_receiver
1649            .recv()
1650            .await
1651            .expect("A block should have been created");
1652        assert_eq!(proposed_block.block.round(), 5);
1653        let ancestors = proposed_block.block.ancestors();
1654
1655        // Only ancestors of round 4 should be included.
1656        assert_eq!(ancestors.len(), 4);
1657        for ancestor in ancestors {
1658            assert_eq!(ancestor.round, 4);
1659        }
1660
1661        let last_commit = store
1662            .read_last_commit()
1663            .unwrap()
1664            .expect("last commit should be set");
1665
1666        // There were no commits prior to the core starting up but there was completed
1667        // rounds up to and including round 4. So we should commit leaders in round 1 &
1668        // 2 as soon as the new block for round 5 is proposed.
1669        assert_eq!(last_commit.index(), 2);
1670        assert_eq!(dag_state.read().last_commit_index(), 2);
1671        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1672        assert_eq!(all_stored_commits.len(), 2);
1673
1674        // And ensure that our "own" block 1 sent to TransactionConsumer as notification
1675        // alongside with gc_round
1676        while let Some(result) = block_status_subscriptions.next().await {
1677            let status = result.unwrap();
1678            assert!(matches!(status, BlockStatus::Sequenced(_)));
1679        }
1680    }
1681
1682    /// Recover Core and continue proposing when having a partial last round
1683    /// which doesn't form a quorum and we haven't proposed for that round
1684    /// yet.
1685    #[tokio::test]
1686    async fn test_core_recover_from_store_for_partial_round() {
1687        telemetry_subscribers::init_for_testing();
1688
1689        let (context, mut key_pairs) = Context::new_for_test(4);
1690        let context = Arc::new(context);
1691        let store = Arc::new(MemStore::new());
1692        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1693        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1694
1695        // Create test blocks for all authorities except our's (index = 0).
1696        let mut last_round_blocks = genesis_blocks(context.clone());
1697        let mut all_blocks = last_round_blocks.clone();
1698        for round in 1..=4 {
1699            let mut this_round_blocks = Vec::new();
1700
1701            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of
1702            // position 1 from creating blocks.
1703            let authorities_to_skip = if round == 4 {
1704                context.committee.validity_threshold() as usize
1705            } else {
1706                // otherwise always skip creating a block for our authority
1707                1
1708            };
1709
1710            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1711                let block = TestBlock::new(round, index.value() as u32)
1712                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1713                    .build();
1714                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1715            }
1716            all_blocks.extend(this_round_blocks.clone());
1717            last_round_blocks = this_round_blocks;
1718        }
1719
1720        // write them in store
1721        store
1722            .write(WriteBatch::default().blocks(all_blocks))
1723            .expect("Storage error");
1724
1725        // create dag state after all blocks have been written to store
1726        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1727        let block_manager = BlockManager::new(
1728            context.clone(),
1729            dag_state.clone(),
1730            Arc::new(NoopBlockVerifier),
1731        );
1732        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1733            context.clone(),
1734            dag_state.clone(),
1735        ));
1736
1737        let (sender, _receiver) = unbounded_channel("consensus_output");
1738        let commit_observer = CommitObserver::new(
1739            context.clone(),
1740            CommitConsumer::new(sender.clone(), 0),
1741            dag_state.clone(),
1742            store.clone(),
1743            leader_schedule.clone(),
1744        );
1745
1746        // Check no commits have been persisted to dag_state & store
1747        let last_commit = store.read_last_commit().unwrap();
1748        assert!(last_commit.is_none());
1749        assert_eq!(dag_state.read().last_commit_index(), 0);
1750
1751        // Now spin up core
1752        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1753        // Need at least one subscriber to the block broadcast channel.
1754        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1755        let mut core = Core::new(
1756            context.clone(),
1757            leader_schedule,
1758            transaction_consumer,
1759            block_manager,
1760            true,
1761            commit_observer,
1762            signals,
1763            key_pairs.remove(context.own_index.value()).1,
1764            dag_state.clone(),
1765            false,
1766        );
1767
1768        // Clock round should have advanced to 5 during recovery because
1769        // a quorum has formed in round 4.
1770        let mut new_round = signal_receivers.new_round_receiver();
1771        assert_eq!(*new_round.borrow_and_update(), 5);
1772
1773        // During recovery, round 4 block should have been proposed.
1774        let proposed_block = block_receiver
1775            .recv()
1776            .await
1777            .expect("A block should have been created");
1778        assert_eq!(proposed_block.block.round(), 4);
1779        let ancestors = proposed_block.block.ancestors();
1780
1781        assert_eq!(ancestors.len(), 4);
1782        for ancestor in ancestors {
1783            if ancestor.author == context.own_index {
1784                assert_eq!(ancestor.round, 0);
1785            } else {
1786                assert_eq!(ancestor.round, 3);
1787            }
1788        }
1789
1790        // Run commit rule.
1791        core.try_commit(vec![]).ok();
1792        let last_commit = store
1793            .read_last_commit()
1794            .unwrap()
1795            .expect("last commit should be set");
1796
1797        // There were no commits prior to the core starting up but there was completed
1798        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1799        // as the new block for round 4 is proposed.
1800        assert_eq!(last_commit.index(), 2);
1801        assert_eq!(dag_state.read().last_commit_index(), 2);
1802        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1803        assert_eq!(all_stored_commits.len(), 2);
1804    }
1805
1806    #[tokio::test]
1807    async fn test_core_propose_after_genesis() {
1808        telemetry_subscribers::init_for_testing();
1809        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1810            config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1811            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1812            config
1813        });
1814
1815        let (context, mut key_pairs) = Context::new_for_test(4);
1816        let context = Arc::new(context);
1817        let store = Arc::new(MemStore::new());
1818        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1819
1820        let block_manager = BlockManager::new(
1821            context.clone(),
1822            dag_state.clone(),
1823            Arc::new(NoopBlockVerifier),
1824        );
1825        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1826        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1827        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1828        // Need at least one subscriber to the block broadcast channel.
1829        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1830        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1831            context.clone(),
1832            dag_state.clone(),
1833        ));
1834
1835        let (sender, _receiver) = unbounded_channel("consensus_output");
1836        let commit_observer = CommitObserver::new(
1837            context.clone(),
1838            CommitConsumer::new(sender.clone(), 0),
1839            dag_state.clone(),
1840            store.clone(),
1841            leader_schedule.clone(),
1842        );
1843
1844        let mut core = Core::new(
1845            context.clone(),
1846            leader_schedule,
1847            transaction_consumer,
1848            block_manager,
1849            true,
1850            commit_observer,
1851            signals,
1852            key_pairs.remove(context.own_index.value()).1,
1853            dag_state.clone(),
1854            false,
1855        );
1856
1857        // Send some transactions
1858        let mut total = 0;
1859        let mut index = 0;
1860        loop {
1861            let transaction =
1862                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1863            total += transaction.len();
1864            index += 1;
1865            let _w = transaction_client
1866                .submit_no_wait(vec![transaction])
1867                .await
1868                .unwrap();
1869
1870            // Create total size of transactions up to 1KB
1871            if total >= 1_000 {
1872                break;
1873            }
1874        }
1875
1876        // a new block should have been created during recovery.
1877        let extended_block = block_receiver
1878            .recv()
1879            .await
1880            .expect("A new block should have been created");
1881
1882        // A new block created - assert the details
1883        assert_eq!(extended_block.block.round(), 1);
1884        assert_eq!(extended_block.block.author().value(), 0);
1885        assert_eq!(extended_block.block.ancestors().len(), 4);
1886
1887        let mut total = 0;
1888        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1889            total += transaction.data().len() as u64;
1890            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1891            assert_eq!(format!("Transaction {i}"), transaction);
1892        }
1893        assert!(
1894            total
1895                <= context
1896                    .protocol_config
1897                    .consensus_max_transactions_in_block_bytes()
1898        );
1899
1900        // genesis blocks should be referenced
1901        let all_genesis = genesis_blocks(context);
1902
1903        for ancestor in extended_block.block.ancestors() {
1904            all_genesis
1905                .iter()
1906                .find(|block| block.reference() == *ancestor)
1907                .expect("Block should be found amongst genesis blocks");
1908        }
1909
1910        // Try to propose again - with or without ignore leaders check, it will not
1911        // return any block
1912        assert!(core.try_propose(false).unwrap().is_none());
1913        assert!(core.try_propose(true).unwrap().is_none());
1914
1915        // Check no commits have been persisted to dag_state & store
1916        let last_commit = store.read_last_commit().unwrap();
1917        assert!(last_commit.is_none());
1918        assert_eq!(dag_state.read().last_commit_index(), 0);
1919    }
1920
1921    #[tokio::test]
1922    async fn test_core_propose_once_receiving_a_quorum() {
1923        telemetry_subscribers::init_for_testing();
1924        let (context, mut key_pairs) = Context::new_for_test(4);
1925        let context = Arc::new(context);
1926
1927        let store = Arc::new(MemStore::new());
1928        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1929
1930        let block_manager = BlockManager::new(
1931            context.clone(),
1932            dag_state.clone(),
1933            Arc::new(NoopBlockVerifier),
1934        );
1935        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1936            context.clone(),
1937            dag_state.clone(),
1938        ));
1939
1940        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1941        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1942        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1943        // Need at least one subscriber to the block broadcast channel.
1944        let _block_receiver = signal_receivers.block_broadcast_receiver();
1945
1946        let (sender, _receiver) = unbounded_channel("consensus_output");
1947        let commit_observer = CommitObserver::new(
1948            context.clone(),
1949            CommitConsumer::new(sender.clone(), 0),
1950            dag_state.clone(),
1951            store.clone(),
1952            leader_schedule.clone(),
1953        );
1954
1955        let mut core = Core::new(
1956            context.clone(),
1957            leader_schedule,
1958            transaction_consumer,
1959            block_manager,
1960            true,
1961            commit_observer,
1962            signals,
1963            key_pairs.remove(context.own_index.value()).1,
1964            dag_state.clone(),
1965            false,
1966        );
1967
1968        let mut expected_ancestors = BTreeSet::new();
1969
1970        // Adding one block now will trigger the creation of new block for round 1
1971        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1972        expected_ancestors.insert(block_1.reference());
1973        // Wait for min round delay to allow blocks to be proposed.
1974        sleep(context.parameters.min_round_delay).await;
1975        // add blocks to trigger proposal.
1976        _ = core.add_blocks(vec![block_1]);
1977
1978        assert_eq!(core.last_proposed_round(), 1);
1979        expected_ancestors.insert(core.last_proposed_block().reference());
1980        // attempt to create a block - none will be produced.
1981        assert!(core.try_propose(false).unwrap().is_none());
1982
1983        // Adding another block now forms a quorum for round 1, so block at round 2 will
1984        // proposed
1985        let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1986        expected_ancestors.insert(block_3.reference());
1987        // Wait for min round delay to allow blocks to be proposed.
1988        sleep(context.parameters.min_round_delay).await;
1989        // add blocks to trigger proposal.
1990        _ = core.add_blocks(vec![block_3]);
1991
1992        assert_eq!(core.last_proposed_round(), 2);
1993
1994        let proposed_block = core.last_proposed_block();
1995        assert_eq!(proposed_block.round(), 2);
1996        assert_eq!(proposed_block.author(), context.own_index);
1997        assert_eq!(proposed_block.ancestors().len(), 3);
1998        let ancestors = proposed_block.ancestors();
1999        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
2000        assert_eq!(ancestors, expected_ancestors);
2001
2002        // Check no commits have been persisted to dag_state & store
2003        let last_commit = store.read_last_commit().unwrap();
2004        assert!(last_commit.is_none());
2005        assert_eq!(dag_state.read().last_commit_index(), 0);
2006    }
2007
2008    #[rstest]
2009    #[tokio::test]
2010    async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
2011        telemetry_subscribers::init_for_testing();
2012        let (mut context, mut key_pairs) = Context::new_for_test(4);
2013
2014        if gc_depth > 0 {
2015            context
2016                .protocol_config
2017                .set_consensus_gc_depth_for_testing(gc_depth);
2018        }
2019
2020        let context = Arc::new(context);
2021
2022        let store = Arc::new(MemStore::new());
2023        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2024        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2025        let mut block_status_subscriptions = FuturesUnordered::new();
2026
2027        let dag_str = "DAG {
2028            Round 0 : { 4 },
2029            Round 1 : { * },
2030            Round 2 : { * },
2031            Round 3 : {
2032                A -> [*],
2033                B -> [-A2],
2034                C -> [-A2],
2035                D -> [-A2],
2036            },
2037            Round 4 : {
2038                B -> [-A3],
2039                C -> [-A3],
2040                D -> [-A3],
2041            },
2042            Round 5 : {
2043                A -> [A3, B4, C4, D4]
2044                B -> [*],
2045                C -> [*],
2046                D -> [*],
2047            },
2048            Round 6 : { * },
2049            Round 7 : { * },
2050            Round 8 : { * },
2051        }";
2052
2053        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2054        dag_builder.print();
2055
2056        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be
2057        // able to commit up to round 5.
2058        for block in dag_builder.blocks(1..=5) {
2059            if block.author() == context.own_index {
2060                let subscription =
2061                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
2062                block_status_subscriptions.push(subscription);
2063            }
2064        }
2065
2066        // write them in store
2067        store
2068            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2069            .expect("Storage error");
2070
2071        // create dag state after all blocks have been written to store
2072        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2073        let block_manager = BlockManager::new(
2074            context.clone(),
2075            dag_state.clone(),
2076            Arc::new(NoopBlockVerifier),
2077        );
2078        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2079            context.clone(),
2080            dag_state.clone(),
2081        ));
2082
2083        let (sender, _receiver) = unbounded_channel("consensus_output");
2084        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2085        let commit_observer = CommitObserver::new(
2086            context.clone(),
2087            commit_consumer,
2088            dag_state.clone(),
2089            store.clone(),
2090            leader_schedule.clone(),
2091        );
2092
2093        // Check no commits have been persisted to dag_state or store.
2094        let last_commit = store.read_last_commit().unwrap();
2095        assert!(last_commit.is_none());
2096        assert_eq!(dag_state.read().last_commit_index(), 0);
2097
2098        // Now spin up core
2099        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2100        // Need at least one subscriber to the block broadcast channel.
2101        let _block_receiver = signal_receivers.block_broadcast_receiver();
2102        let _core = Core::new(
2103            context.clone(),
2104            leader_schedule,
2105            transaction_consumer,
2106            block_manager,
2107            true,
2108            commit_observer,
2109            signals,
2110            key_pairs.remove(context.own_index.value()).1,
2111            dag_state.clone(),
2112            false,
2113        );
2114
2115        let last_commit = store
2116            .read_last_commit()
2117            .unwrap()
2118            .expect("last commit should be set");
2119
2120        assert_eq!(last_commit.index(), 5);
2121
2122        while let Some(result) = block_status_subscriptions.next().await {
2123            let status = result.unwrap();
2124
2125            // If gc is enabled, then we expect some blocks to be garbage collected.
2126            if gc_depth > 0 {
2127                match status {
2128                    BlockStatus::Sequenced(block_ref) => {
2129                        assert!(block_ref.round == 1 || block_ref.round == 5);
2130                    }
2131                    BlockStatus::GarbageCollected(block_ref) => {
2132                        assert!(block_ref.round == 2 || block_ref.round == 3);
2133                    }
2134                }
2135            } else {
2136                // otherwise all of them should be committed
2137                assert!(matches!(status, BlockStatus::Sequenced(_)));
2138            }
2139        }
2140    }
2141
2142    // Tests that the threshold clock advances when blocks get unsuspended due to
2143    // GC'ed blocks and newly created blocks are always higher than the last
2144    // advanced gc round.
2145    #[tokio::test]
2146    async fn test_multiple_commits_advance_threshold_clock() {
2147        telemetry_subscribers::init_for_testing();
2148        let (mut context, mut key_pairs) = Context::new_for_test(4);
2149        const GC_DEPTH: u32 = 2;
2150
2151        context
2152            .protocol_config
2153            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2154
2155        let context = Arc::new(context);
2156
2157        let store = Arc::new(MemStore::new());
2158        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2159        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2160
2161        // On round 1 we do produce the block for authority D but we do not link it
2162        // until round 6. This is making round 6 unable to get processed
2163        // until leader of round 3 is committed where round 1 gets garbage collected.
2164        // Then we add more rounds so we can trigger a commit for leader of round 9
2165        // which will move the gc round to 7.
2166        let dag_str = "DAG {
2167            Round 0 : { 4 },
2168            Round 1 : { * },
2169            Round 2 : { 
2170                B -> [-D1],
2171                C -> [-D1],
2172                D -> [-D1],
2173            },
2174            Round 3 : {
2175                B -> [*],
2176                C -> [*]
2177                D -> [*],
2178            },
2179            Round 4 : { 
2180                A -> [*],
2181                B -> [*],
2182                C -> [*]
2183                D -> [*],
2184            },
2185            Round 5 : { 
2186                B -> [*],
2187                C -> [*],
2188                D -> [*],
2189            },
2190            Round 6 : { 
2191                B -> [A6, B6, C6, D1],
2192                C -> [A6, B6, C6, D1],
2193                D -> [A6, B6, C6, D1],
2194            },
2195            Round 7 : { 
2196                B -> [*],
2197                C -> [*],
2198                D -> [*],
2199            },
2200            Round 8 : { 
2201                B -> [*],
2202                C -> [*],
2203                D -> [*],
2204            },
2205            Round 9 : { 
2206                B -> [*],
2207                C -> [*],
2208                D -> [*],
2209            },
2210            Round 10 : { 
2211                B -> [*],
2212                C -> [*],
2213                D -> [*],
2214            },
2215            Round 11 : { 
2216                B -> [*],
2217                C -> [*],
2218                D -> [*],
2219            },
2220        }";
2221
2222        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2223        dag_builder.print();
2224
2225        // create dag state after all blocks have been written to store
2226        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2227        let block_manager = BlockManager::new(
2228            context.clone(),
2229            dag_state.clone(),
2230            Arc::new(NoopBlockVerifier),
2231        );
2232        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2233            context.clone(),
2234            dag_state.clone(),
2235        ));
2236        let (sender, _receiver) = unbounded_channel("consensus_output");
2237        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2238        let commit_observer = CommitObserver::new(
2239            context.clone(),
2240            commit_consumer,
2241            dag_state.clone(),
2242            store.clone(),
2243            leader_schedule.clone(),
2244        );
2245
2246        // Check no commits have been persisted to dag_state or store.
2247        let last_commit = store.read_last_commit().unwrap();
2248        assert!(last_commit.is_none());
2249        assert_eq!(dag_state.read().last_commit_index(), 0);
2250
2251        // Now spin up core
2252        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2253        // Need at least one subscriber to the block broadcast channel.
2254        let _block_receiver = signal_receivers.block_broadcast_receiver();
2255        let mut core = Core::new(
2256            context.clone(),
2257            leader_schedule,
2258            transaction_consumer,
2259            block_manager,
2260            true,
2261            commit_observer,
2262            signals,
2263            key_pairs.remove(context.own_index.value()).1,
2264            dag_state.clone(),
2265            true,
2266        );
2267        // We set the last known round to 4 so we avoid creating new blocks until then -
2268        // otherwise it will crash as the already created DAG contains blocks for this
2269        // authority.
2270        core.set_last_known_proposed_round(4);
2271
2272        // We add all the blocks except D1. The only ones we can immediately accept are
2273        // the ones up to round 5 as they don't have a dependency on D1. Rest of blocks
2274        // do have causal dependency to D1 so they can't be processed until the
2275        // leader of round 3 can get committed and gc round moves to 1. That will make
2276        // all the blocks that depend to D1 get accepted. However, our threshold
2277        // clock is now at round 6 as the last quorum that we managed to process was the
2278        // round 5. As commits happen blocks of later rounds get accepted and
2279        // more leaders get committed. Eventually the leader of round 9 gets committed
2280        // and gc is moved to 9 - 2 = 7. If our node attempts to produce a block
2281        // for the threshold clock 6, that will make the acceptance checks fail as now
2282        // gc has moved far past this round.
2283        core.add_blocks(
2284            dag_builder
2285                .blocks(1..=11)
2286                .into_iter()
2287                .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2288                .collect(),
2289        )
2290        .expect("Should not fail");
2291
2292        assert_eq!(core.last_proposed_round(), 12);
2293    }
2294
2295    #[tokio::test]
2296    async fn test_core_set_min_propose_round() {
2297        telemetry_subscribers::init_for_testing();
2298        let (context, mut key_pairs) = Context::new_for_test(4);
2299        let context = Arc::new(context.with_parameters(Parameters {
2300            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2301            ..Default::default()
2302        }));
2303
2304        let store = Arc::new(MemStore::new());
2305        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2306
2307        let block_manager = BlockManager::new(
2308            context.clone(),
2309            dag_state.clone(),
2310            Arc::new(NoopBlockVerifier),
2311        );
2312        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2313            context.clone(),
2314            dag_state.clone(),
2315        ));
2316
2317        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2318        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2319        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2320        // Need at least one subscriber to the block broadcast channel.
2321        let _block_receiver = signal_receivers.block_broadcast_receiver();
2322
2323        let (sender, _receiver) = unbounded_channel("consensus_output");
2324        let commit_observer = CommitObserver::new(
2325            context.clone(),
2326            CommitConsumer::new(sender.clone(), 0),
2327            dag_state.clone(),
2328            store.clone(),
2329            leader_schedule.clone(),
2330        );
2331
2332        let mut core = Core::new(
2333            context.clone(),
2334            leader_schedule,
2335            transaction_consumer,
2336            block_manager,
2337            true,
2338            commit_observer,
2339            signals,
2340            key_pairs.remove(context.own_index.value()).1,
2341            dag_state.clone(),
2342            true,
2343        );
2344
2345        // No new block should have been produced
2346        assert_eq!(
2347            core.last_proposed_round(),
2348            GENESIS_ROUND,
2349            "No block should have been created other than genesis"
2350        );
2351
2352        // Trying to explicitly propose a block will not produce anything
2353        assert!(core.try_propose(true).unwrap().is_none());
2354
2355        // Create blocks for the whole network - even "our" node in order to replicate
2356        // an "amnesia" recovery.
2357        let mut builder = DagBuilder::new(context.clone());
2358        builder.layers(1..=10).build();
2359
2360        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2361
2362        // Process all the blocks
2363        assert!(core.add_blocks(blocks).unwrap().is_empty());
2364
2365        // Try to propose - no block should be produced.
2366        assert!(core.try_propose(true).unwrap().is_none());
2367
2368        // Now set the last known proposed round which is the highest round for which
2369        // the network informed us that we do have proposed a block about.
2370        core.set_last_known_proposed_round(10);
2371
2372        let block = core.try_propose(true).expect("No error").unwrap();
2373        assert_eq!(block.round(), 11);
2374        assert_eq!(block.ancestors().len(), 4);
2375
2376        let our_ancestor_included = block.ancestors()[0];
2377        assert_eq!(our_ancestor_included.author, context.own_index);
2378        assert_eq!(our_ancestor_included.round, 10);
2379    }
2380
2381    #[tokio::test(flavor = "current_thread", start_paused = true)]
2382    async fn test_core_try_new_block_leader_timeout() {
2383        telemetry_subscribers::init_for_testing();
2384
2385        // Since we run the test with started_paused = true, any time-dependent
2386        // operations using Tokio's time facilities, such as tokio::time::sleep
2387        // or tokio::time::Instant, will not advance. So practically each Core's
2388        // clock will have initialised potentially with different values but it never
2389        // advances. To ensure that blocks won't get rejected by cores we'll
2390        // need to manually wait for the time diff before processing them. By
2391        // calling the `tokio::time::sleep` we implicitly also advance the tokio
2392        // clock.
2393        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2394            // Simulate the time wait before processing a block to ensure that
2395            // block.timestamp <= now
2396            let now = context.clock.timestamp_utc_ms();
2397            let max_timestamp = blocks
2398                .iter()
2399                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2400                .map(|block| block.timestamp_ms())
2401                .unwrap_or(0);
2402
2403            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2404            sleep(wait_time).await;
2405        }
2406
2407        let (context, _) = Context::new_for_test(4);
2408        // Create the cores for all authorities
2409        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2410
2411        // Create blocks for rounds 1..=3 from all Cores except last Core of authority
2412        // 3, so we miss the block from it. As it will be the leader of round 3
2413        // then no-one will be able to progress to round 4 unless we explicitly trigger
2414        // the block creation.
2415        // create the cores and their signals for all the authorities
2416        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2417
2418        // Now iterate over a few rounds and ensure the corresponding signals are
2419        // created while network advances
2420        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2421        for round in 1..=3 {
2422            let mut this_round_blocks = Vec::new();
2423
2424            for core_fixture in cores.iter_mut() {
2425                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2426
2427                core_fixture
2428                    .core
2429                    .add_blocks(last_round_blocks.clone())
2430                    .unwrap();
2431
2432                // Only when round > 1 and using non-genesis parents.
2433                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2434                    assert_eq!(round - 1, r);
2435                    if core_fixture.core.last_proposed_round() == r {
2436                        // Force propose new block regardless of min round delay.
2437                        core_fixture
2438                            .core
2439                            .try_propose(true)
2440                            .unwrap()
2441                            .unwrap_or_else(|| {
2442                                panic!("Block should have been proposed for round {}", round)
2443                            });
2444                    }
2445                }
2446
2447                assert_eq!(core_fixture.core.last_proposed_round(), round);
2448
2449                this_round_blocks.push(core_fixture.core.last_proposed_block());
2450            }
2451
2452            last_round_blocks = this_round_blocks;
2453        }
2454
2455        // Try to create the blocks for round 4 by calling the try_propose() method. No
2456        // block should be created as the leader - authority 3 - hasn't proposed
2457        // any block.
2458        for core_fixture in cores.iter_mut() {
2459            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2460
2461            core_fixture
2462                .core
2463                .add_blocks(last_round_blocks.clone())
2464                .unwrap();
2465            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2466        }
2467
2468        // Now try to create the blocks for round 4 via the leader timeout method which
2469        // should ignore any leader checks or min round delay.
2470        for core_fixture in cores.iter_mut() {
2471            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2472            assert_eq!(core_fixture.core.last_proposed_round(), 4);
2473
2474            // Check commits have been persisted to store
2475            let last_commit = core_fixture
2476                .store
2477                .read_last_commit()
2478                .unwrap()
2479                .expect("last commit should be set");
2480            // There are 1 leader rounds with rounds completed up to and including
2481            // round 4
2482            assert_eq!(last_commit.index(), 1);
2483            let all_stored_commits = core_fixture
2484                .store
2485                .scan_commits((0..=CommitIndex::MAX).into())
2486                .unwrap();
2487            assert_eq!(all_stored_commits.len(), 1);
2488        }
2489    }
2490
2491    #[tokio::test(flavor = "current_thread", start_paused = true)]
2492    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2493        telemetry_subscribers::init_for_testing();
2494
2495        // Since we run the test with started_paused = true, any time-dependent
2496        // operations using Tokio's time facilities, such as tokio::time::sleep
2497        // or tokio::time::Instant, will not advance. So practically each Core's
2498        // clock will have initialised potentially with different values but it never
2499        // advances. To ensure that blocks won't get rejected by cores we'll
2500        // need to manually wait for the time diff before processing them. By
2501        // calling the `tokio::time::sleep` we implicitly also advance the tokio
2502        // clock.
2503        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2504            // Simulate the time wait before processing a block to ensure that
2505            // block.timestamp <= now
2506            let now = context.clock.timestamp_utc_ms();
2507            let max_timestamp = blocks
2508                .iter()
2509                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2510                .map(|block| block.timestamp_ms())
2511                .unwrap_or(0);
2512
2513            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2514            sleep(wait_time).await;
2515        }
2516
2517        let (context, _) = Context::new_for_test(4);
2518
2519        // Create the cores for all authorities
2520        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2521        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2522
2523        // Create blocks for rounds 1..=30 from all Cores except last Core of authority
2524        // 3.
2525        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2526        for round in 1..=30 {
2527            let mut this_round_blocks = Vec::new();
2528
2529            for core_fixture in cores.iter_mut() {
2530                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2531
2532                core_fixture
2533                    .core
2534                    .add_blocks(last_round_blocks.clone())
2535                    .unwrap();
2536
2537                // Only when round > 1 and using non-genesis parents.
2538                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2539                    assert_eq!(round - 1, r);
2540                    if core_fixture.core.last_proposed_round() == r {
2541                        // Force propose new block regardless of min round delay.
2542                        core_fixture
2543                            .core
2544                            .try_propose(true)
2545                            .unwrap()
2546                            .unwrap_or_else(|| {
2547                                panic!("Block should have been proposed for round {}", round)
2548                            });
2549                    }
2550                }
2551
2552                assert_eq!(core_fixture.core.last_proposed_round(), round);
2553
2554                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2555            }
2556
2557            last_round_blocks = this_round_blocks;
2558        }
2559
2560        // Now produce blocks for all Cores
2561        for round in 31..=40 {
2562            let mut this_round_blocks = Vec::new();
2563
2564            for core_fixture in all_cores.iter_mut() {
2565                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2566
2567                core_fixture
2568                    .core
2569                    .add_blocks(last_round_blocks.clone())
2570                    .unwrap();
2571
2572                // Only when round > 1 and using non-genesis parents.
2573                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2574                    assert_eq!(round - 1, r);
2575                    if core_fixture.core.last_proposed_round() == r {
2576                        // Force propose new block regardless of min round delay.
2577                        core_fixture
2578                            .core
2579                            .try_propose(true)
2580                            .unwrap()
2581                            .unwrap_or_else(|| {
2582                                panic!("Block should have been proposed for round {}", round)
2583                            });
2584                    }
2585                }
2586
2587                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2588
2589                for block in this_round_blocks.iter() {
2590                    if block.author() != AuthorityIndex::new_for_test(3) {
2591                        // Assert blocks created include only 3 ancestors per block as one
2592                        // should be excluded
2593                        assert_eq!(block.ancestors().len(), 3);
2594                    } else {
2595                        // Authority 3 is the low scoring authority so it will still include
2596                        // its own blocks.
2597                        assert_eq!(block.ancestors().len(), 4);
2598                    }
2599                }
2600            }
2601
2602            last_round_blocks = this_round_blocks;
2603        }
2604    }
2605
2606    #[tokio::test]
2607    async fn test_smart_ancestor_selection() {
2608        telemetry_subscribers::init_for_testing();
2609        let (context, mut key_pairs) = Context::new_for_test(7);
2610        let context = Arc::new(context.with_parameters(Parameters {
2611            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2612            ..Default::default()
2613        }));
2614
2615        let store = Arc::new(MemStore::new());
2616        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2617
2618        let block_manager = BlockManager::new(
2619            context.clone(),
2620            dag_state.clone(),
2621            Arc::new(NoopBlockVerifier),
2622        );
2623        let leader_schedule = Arc::new(
2624            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2625                .with_num_commits_per_schedule(10),
2626        );
2627
2628        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2629        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2630        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2631        // Need at least one subscriber to the block broadcast channel.
2632        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2633
2634        let (sender, _receiver) = unbounded_channel("consensus_output");
2635        let commit_consumer = CommitConsumer::new(sender, 0);
2636        let commit_observer = CommitObserver::new(
2637            context.clone(),
2638            commit_consumer,
2639            dag_state.clone(),
2640            store.clone(),
2641            leader_schedule.clone(),
2642        );
2643
2644        let mut core = Core::new(
2645            context.clone(),
2646            leader_schedule,
2647            transaction_consumer,
2648            block_manager,
2649            true,
2650            commit_observer,
2651            signals,
2652            key_pairs.remove(context.own_index.value()).1,
2653            dag_state.clone(),
2654            true,
2655        );
2656
2657        // No new block should have been produced
2658        assert_eq!(
2659            core.last_proposed_round(),
2660            GENESIS_ROUND,
2661            "No block should have been created other than genesis"
2662        );
2663
2664        // Trying to explicitly propose a block will not produce anything
2665        assert!(core.try_propose(true).unwrap().is_none());
2666
2667        // Create blocks for the whole network but not for authority 1
2668        let mut builder = DagBuilder::new(context.clone());
2669        builder
2670            .layers(1..=12)
2671            .authorities(vec![AuthorityIndex::new_for_test(1)])
2672            .skip_block()
2673            .build();
2674        let blocks = builder.blocks(1..=12);
2675        // Process all the blocks
2676        assert!(core.add_blocks(blocks).unwrap().is_empty());
2677        core.set_last_known_proposed_round(12);
2678
2679        let block = core.try_propose(true).expect("No error").unwrap();
2680        assert_eq!(block.round(), 13);
2681        assert_eq!(block.ancestors().len(), 7);
2682
2683        // Build blocks for rest of the network other than own index
2684        builder
2685            .layers(13..=14)
2686            .authorities(vec![AuthorityIndex::new_for_test(0)])
2687            .skip_block()
2688            .build();
2689        let blocks = builder.blocks(13..=14);
2690        assert!(core.add_blocks(blocks).unwrap().is_empty());
2691
2692        // We now have triggered a leader schedule change so we should have
2693        // one EXCLUDE authority (1) when we go to select ancestors for the next
2694        // proposal
2695        let block = core.try_propose(true).expect("No error").unwrap();
2696        assert_eq!(block.round(), 15);
2697        assert_eq!(block.ancestors().len(), 6);
2698
2699        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2700        // which will trigger smart select and we will not propose a block
2701        builder
2702            .layer(15)
2703            .authorities(vec![
2704                AuthorityIndex::new_for_test(0),
2705                AuthorityIndex::new_for_test(5),
2706                AuthorityIndex::new_for_test(6),
2707            ])
2708            .skip_block()
2709            .build();
2710        let blocks = builder.blocks(15..=15);
2711        let authority_1_excluded_block_reference = blocks
2712            .iter()
2713            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2714            .unwrap()
2715            .reference();
2716        // Wait for min round delay to allow blocks to be proposed.
2717        sleep(context.parameters.min_round_delay).await;
2718        // Smart select should be triggered and no block should be proposed.
2719        assert!(core.add_blocks(blocks).unwrap().is_empty());
2720        assert_eq!(core.last_proposed_block().round(), 15);
2721
2722        builder
2723            .layer(15)
2724            .authorities(vec![
2725                AuthorityIndex::new_for_test(0),
2726                AuthorityIndex::new_for_test(1),
2727                AuthorityIndex::new_for_test(2),
2728                AuthorityIndex::new_for_test(3),
2729                AuthorityIndex::new_for_test(4),
2730            ])
2731            .skip_block()
2732            .build();
2733        let blocks = builder.blocks(15..=15);
2734        let included_block_references = iter::once(&core.last_proposed_block())
2735            .chain(blocks.iter())
2736            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2737            .map(|block| block.reference())
2738            .collect::<Vec<_>>();
2739
2740        // Have enough ancestor blocks to propose now.
2741        assert!(core.add_blocks(blocks).unwrap().is_empty());
2742        assert_eq!(core.last_proposed_block().round(), 16);
2743
2744        // Check that a new block has been proposed & signaled.
2745        let extended_block = loop {
2746            let extended_block =
2747                tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2748                    .await
2749                    .unwrap()
2750                    .unwrap();
2751            if extended_block.block.round() == 16 {
2752                break extended_block;
2753            }
2754        };
2755        assert_eq!(extended_block.block.round(), 16);
2756        assert_eq!(extended_block.block.author(), core.context.own_index);
2757        assert_eq!(extended_block.block.ancestors().len(), 6);
2758        assert_eq!(extended_block.block.ancestors(), included_block_references);
2759        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2760        assert_eq!(
2761            extended_block.excluded_ancestors[0],
2762            authority_1_excluded_block_reference
2763        );
2764
2765        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2766        // which will trigger smart select and we will not propose a block.
2767        // This time we will force propose by hitting the leader timeout after which
2768        // should cause us to include this EXCLUDE ancestor.
2769        builder
2770            .layer(16)
2771            .authorities(vec![
2772                AuthorityIndex::new_for_test(0),
2773                AuthorityIndex::new_for_test(5),
2774                AuthorityIndex::new_for_test(6),
2775            ])
2776            .skip_block()
2777            .build();
2778        let blocks = builder.blocks(16..=16);
2779        // Wait for leader timeout to force blocks to be proposed.
2780        sleep(context.parameters.min_round_delay).await;
2781        // Smart select should be triggered and no block should be proposed.
2782        assert!(core.add_blocks(blocks).unwrap().is_empty());
2783        assert_eq!(core.last_proposed_block().round(), 16);
2784
2785        // Simulate a leader timeout and a force proposal where we will include
2786        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2787        let block = core.try_propose(true).expect("No error").unwrap();
2788        assert_eq!(block.round(), 17);
2789        assert_eq!(block.ancestors().len(), 5);
2790
2791        // Check that a new block has been proposed & signaled.
2792        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2793            .await
2794            .unwrap()
2795            .unwrap();
2796        assert_eq!(extended_block.block.round(), 17);
2797        assert_eq!(extended_block.block.author(), core.context.own_index);
2798        assert_eq!(extended_block.block.ancestors().len(), 5);
2799        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2800
2801        // Set quorum rounds for authority which will unlock the Excluded
2802        // authority (1) and then we should be able to create a new layer of blocks
2803        // which will then all be included as ancestors for the next proposal
2804        core.set_propagation_delay_and_quorum_rounds(
2805            0,
2806            vec![
2807                (16, 16),
2808                (16, 16),
2809                (16, 16),
2810                (16, 16),
2811                (16, 16),
2812                (16, 16),
2813                (16, 16),
2814            ],
2815            vec![
2816                (16, 16),
2817                (16, 16),
2818                (16, 16),
2819                (16, 16),
2820                (16, 16),
2821                (16, 16),
2822                (16, 16),
2823            ],
2824        );
2825
2826        builder
2827            .layer(17)
2828            .authorities(vec![AuthorityIndex::new_for_test(0)])
2829            .skip_block()
2830            .build();
2831        let blocks = builder.blocks(17..=17);
2832        let included_block_references = iter::once(&core.last_proposed_block())
2833            .chain(blocks.iter())
2834            .map(|block| block.reference())
2835            .collect::<Vec<_>>();
2836
2837        // Have enough ancestor blocks to propose now.
2838        sleep(context.parameters.min_round_delay).await;
2839        assert!(core.add_blocks(blocks).unwrap().is_empty());
2840        assert_eq!(core.last_proposed_block().round(), 18);
2841
2842        // Check that a new block has been proposed & signaled.
2843        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2844            .await
2845            .unwrap()
2846            .unwrap();
2847        assert_eq!(extended_block.block.round(), 18);
2848        assert_eq!(extended_block.block.author(), core.context.own_index);
2849        assert_eq!(extended_block.block.ancestors().len(), 7);
2850        assert_eq!(extended_block.block.ancestors(), included_block_references);
2851        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2852    }
2853
2854    #[tokio::test]
2855    async fn test_excluded_ancestor_limit() {
2856        telemetry_subscribers::init_for_testing();
2857        let (context, mut key_pairs) = Context::new_for_test(4);
2858        let context = Arc::new(context.with_parameters(Parameters {
2859            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2860            ..Default::default()
2861        }));
2862
2863        let store = Arc::new(MemStore::new());
2864        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2865
2866        let block_manager = BlockManager::new(
2867            context.clone(),
2868            dag_state.clone(),
2869            Arc::new(NoopBlockVerifier),
2870        );
2871        let leader_schedule = Arc::new(
2872            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2873                .with_num_commits_per_schedule(10),
2874        );
2875
2876        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2877        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2878        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2879        // Need at least one subscriber to the block broadcast channel.
2880        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2881
2882        let (sender, _receiver) = unbounded_channel("consensus_output");
2883        let commit_consumer = CommitConsumer::new(sender, 0);
2884        let commit_observer = CommitObserver::new(
2885            context.clone(),
2886            commit_consumer,
2887            dag_state.clone(),
2888            store.clone(),
2889            leader_schedule.clone(),
2890        );
2891
2892        let mut core = Core::new(
2893            context.clone(),
2894            leader_schedule,
2895            transaction_consumer,
2896            block_manager,
2897            true,
2898            commit_observer,
2899            signals,
2900            key_pairs.remove(context.own_index.value()).1,
2901            dag_state.clone(),
2902            true,
2903        );
2904
2905        // No new block should have been produced
2906        assert_eq!(
2907            core.last_proposed_round(),
2908            GENESIS_ROUND,
2909            "No block should have been created other than genesis"
2910        );
2911
2912        // Create blocks for the whole network
2913        let mut builder = DagBuilder::new(context.clone());
2914        builder.layers(1..=3).build();
2915
2916        // This will equivocate 9 blocks for authority 1 which will be excluded on
2917        // the proposal but because of the limits set will be dropped and not included
2918        // as part of the ExtendedBlock structure sent to the rest of the network
2919        builder
2920            .layer(4)
2921            .authorities(vec![AuthorityIndex::new_for_test(1)])
2922            .equivocate(9)
2923            .build();
2924        let blocks = builder.blocks(1..=4);
2925
2926        // Process all the blocks
2927        assert!(core.add_blocks(blocks).unwrap().is_empty());
2928        core.set_last_known_proposed_round(3);
2929
2930        let block = core.try_propose(true).expect("No error").unwrap();
2931        assert_eq!(block.round(), 5);
2932        assert_eq!(block.ancestors().len(), 4);
2933
2934        // Check that a new block has been proposed & signaled.
2935        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2936            .await
2937            .unwrap()
2938            .unwrap();
2939        assert_eq!(extended_block.block.round(), 5);
2940        assert_eq!(extended_block.block.author(), core.context.own_index);
2941        assert_eq!(extended_block.block.ancestors().len(), 4);
2942        assert_eq!(extended_block.excluded_ancestors.len(), 8);
2943    }
2944
2945    #[tokio::test]
2946    async fn test_core_set_subscriber_exists() {
2947        telemetry_subscribers::init_for_testing();
2948        let (context, mut key_pairs) = Context::new_for_test(4);
2949        let context = Arc::new(context);
2950        let store = Arc::new(MemStore::new());
2951        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2952
2953        let block_manager = BlockManager::new(
2954            context.clone(),
2955            dag_state.clone(),
2956            Arc::new(NoopBlockVerifier),
2957        );
2958        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2959            context.clone(),
2960            dag_state.clone(),
2961        ));
2962
2963        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2964        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2965        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2966        // Need at least one subscriber to the block broadcast channel.
2967        let _block_receiver = signal_receivers.block_broadcast_receiver();
2968
2969        let (sender, _receiver) = unbounded_channel("consensus_output");
2970        let commit_observer = CommitObserver::new(
2971            context.clone(),
2972            CommitConsumer::new(sender.clone(), 0),
2973            dag_state.clone(),
2974            store.clone(),
2975            leader_schedule.clone(),
2976        );
2977
2978        let mut core = Core::new(
2979            context.clone(),
2980            leader_schedule,
2981            transaction_consumer,
2982            block_manager,
2983            // Set to no subscriber exists initially.
2984            false,
2985            commit_observer,
2986            signals,
2987            key_pairs.remove(context.own_index.value()).1,
2988            dag_state.clone(),
2989            false,
2990        );
2991
2992        // There is no proposal during recovery because there is no subscriber.
2993        assert_eq!(
2994            core.last_proposed_round(),
2995            GENESIS_ROUND,
2996            "No block should have been created other than genesis"
2997        );
2998
2999        // There is no proposal even with forced proposing.
3000        assert!(core.try_propose(true).unwrap().is_none());
3001
3002        // Let Core know subscriber exists.
3003        core.set_subscriber_exists(true);
3004
3005        // Proposing now would succeed.
3006        assert!(core.try_propose(true).unwrap().is_some());
3007    }
3008
3009    #[tokio::test]
3010    async fn test_core_set_propagation_delay_per_authority() {
3011        // TODO: create helper to avoid the duplicated code here.
3012        telemetry_subscribers::init_for_testing();
3013        let (context, mut key_pairs) = Context::new_for_test(4);
3014        let context = Arc::new(context);
3015        let store = Arc::new(MemStore::new());
3016        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3017
3018        let block_manager = BlockManager::new(
3019            context.clone(),
3020            dag_state.clone(),
3021            Arc::new(NoopBlockVerifier),
3022        );
3023        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3024            context.clone(),
3025            dag_state.clone(),
3026        ));
3027
3028        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3029        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3030        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3031        // Need at least one subscriber to the block broadcast channel.
3032        let _block_receiver = signal_receivers.block_broadcast_receiver();
3033
3034        let (sender, _receiver) = unbounded_channel("consensus_output");
3035        let commit_observer = CommitObserver::new(
3036            context.clone(),
3037            CommitConsumer::new(sender.clone(), 0),
3038            dag_state.clone(),
3039            store.clone(),
3040            leader_schedule.clone(),
3041        );
3042
3043        let mut core = Core::new(
3044            context.clone(),
3045            leader_schedule,
3046            transaction_consumer,
3047            block_manager,
3048            // Set to no subscriber exists initially.
3049            false,
3050            commit_observer,
3051            signals,
3052            key_pairs.remove(context.own_index.value()).1,
3053            dag_state.clone(),
3054            false,
3055        );
3056
3057        // There is no proposal during recovery because there is no subscriber.
3058        assert_eq!(
3059            core.last_proposed_round(),
3060            GENESIS_ROUND,
3061            "No block should have been created other than genesis"
3062        );
3063
3064        // Use a large propagation delay to disable proposing.
3065        core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);
3066
3067        // Make propagation delay the only reason for not proposing.
3068        core.set_subscriber_exists(true);
3069
3070        // There is no proposal even with forced proposing.
3071        assert!(core.try_propose(true).unwrap().is_none());
3072
3073        // Let Core know there is no propagation delay.
3074        core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);
3075
3076        // Proposing now would succeed.
3077        assert!(core.try_propose(true).unwrap().is_some());
3078    }
3079
3080    #[tokio::test(flavor = "current_thread", start_paused = true)]
3081    async fn test_leader_schedule_change() {
3082        telemetry_subscribers::init_for_testing();
3083        let default_params = Parameters::default();
3084
3085        let (context, _) = Context::new_for_test(4);
3086        // create the cores and their signals for all the authorities
3087        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3088
3089        // Now iterate over a few rounds and ensure the corresponding signals are
3090        // created while network advances
3091        let mut last_round_blocks = Vec::new();
3092        for round in 1..=30 {
3093            let mut this_round_blocks = Vec::new();
3094
3095            // Wait for min round delay to allow blocks to be proposed.
3096            sleep(default_params.min_round_delay).await;
3097
3098            for core_fixture in &mut cores {
3099                // add the blocks from last round
3100                // this will trigger a block creation for the round and a signal should be
3101                // emitted
3102                core_fixture
3103                    .core
3104                    .add_blocks(last_round_blocks.clone())
3105                    .unwrap();
3106
3107                // A "new round" signal should be received given that all the blocks of previous
3108                // round have been processed
3109                let new_round = receive(
3110                    Duration::from_secs(1),
3111                    core_fixture.signal_receivers.new_round_receiver(),
3112                )
3113                .await;
3114                assert_eq!(new_round, round);
3115
3116                // Check that a new block has been proposed.
3117                let extended_block = tokio::time::timeout(
3118                    Duration::from_secs(1),
3119                    core_fixture.block_receiver.recv(),
3120                )
3121                .await
3122                .unwrap()
3123                .unwrap();
3124                assert_eq!(extended_block.block.round(), round);
3125                assert_eq!(
3126                    extended_block.block.author(),
3127                    core_fixture.core.context.own_index
3128                );
3129
3130                // append the new block to this round blocks
3131                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3132
3133                let block = core_fixture.core.last_proposed_block();
3134
3135                // ensure that produced block is referring to the blocks of last_round
3136                assert_eq!(
3137                    block.ancestors().len(),
3138                    core_fixture.core.context.committee.size()
3139                );
3140                for ancestor in block.ancestors() {
3141                    if block.round() > 1 {
3142                        // don't bother with round 1 block which just contains the genesis blocks.
3143                        assert!(
3144                            last_round_blocks
3145                                .iter()
3146                                .any(|block| block.reference() == *ancestor),
3147                            "Reference from previous round should be added"
3148                        );
3149                    }
3150                }
3151            }
3152
3153            last_round_blocks = this_round_blocks;
3154        }
3155
3156        for core_fixture in cores {
3157            // Check commits have been persisted to store
3158            let last_commit = core_fixture
3159                .store
3160                .read_last_commit()
3161                .unwrap()
3162                .expect("last commit should be set");
3163            // There are 28 leader rounds with rounds completed up to and including
3164            // round 29. Round 30 blocks will only include their own blocks, so the
3165            // 28th leader will not be committed.
3166            assert_eq!(last_commit.index(), 27);
3167            let all_stored_commits = core_fixture
3168                .store
3169                .scan_commits((0..=CommitIndex::MAX).into())
3170                .unwrap();
3171            assert_eq!(all_stored_commits.len(), 27);
3172            assert_eq!(
3173                core_fixture
3174                    .core
3175                    .leader_schedule
3176                    .leader_swap_table
3177                    .read()
3178                    .bad_nodes
3179                    .len(),
3180                1
3181            );
3182            assert_eq!(
3183                core_fixture
3184                    .core
3185                    .leader_schedule
3186                    .leader_swap_table
3187                    .read()
3188                    .good_nodes
3189                    .len(),
3190                1
3191            );
3192            let expected_reputation_scores =
3193                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3194            assert_eq!(
3195                core_fixture
3196                    .core
3197                    .leader_schedule
3198                    .leader_swap_table
3199                    .read()
3200                    .reputation_scores,
3201                expected_reputation_scores
3202            );
3203        }
3204    }
3205
3206    // TODO: Remove this when DistributedVoteScoring is enabled.
3207    #[tokio::test(flavor = "current_thread", start_paused = true)]
3208    async fn test_leader_schedule_change_with_vote_scoring() {
3209        telemetry_subscribers::init_for_testing();
3210        let default_params = Parameters::default();
3211        let (mut context, _) = Context::new_for_test(4);
3212        context
3213            .protocol_config
3214            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3215        // create the cores and their signals for all the authorities
3216        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3217        // Now iterate over a few rounds and ensure the corresponding signals are
3218        // created while network advances
3219        let mut last_round_blocks = Vec::new();
3220        for round in 1..=30 {
3221            let mut this_round_blocks = Vec::new();
3222            // Wait for min round delay to allow blocks to be proposed.
3223            sleep(default_params.min_round_delay).await;
3224            for core_fixture in &mut cores {
3225                // add the blocks from last round
3226                // this will trigger a block creation for the round and a signal should be
3227                // emitted
3228                core_fixture
3229                    .core
3230                    .add_blocks(last_round_blocks.clone())
3231                    .unwrap();
3232                // A "new round" signal should be received given that all the blocks of previous
3233                // round have been processed
3234                let new_round = receive(
3235                    Duration::from_secs(1),
3236                    core_fixture.signal_receivers.new_round_receiver(),
3237                )
3238                .await;
3239                assert_eq!(new_round, round);
3240                // Check that a new block has been proposed.
3241                let extended_block = tokio::time::timeout(
3242                    Duration::from_secs(1),
3243                    core_fixture.block_receiver.recv(),
3244                )
3245                .await
3246                .unwrap()
3247                .unwrap();
3248                assert_eq!(extended_block.block.round(), round);
3249                assert_eq!(
3250                    extended_block.block.author(),
3251                    core_fixture.core.context.own_index
3252                );
3253
3254                // append the new block to this round blocks
3255                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3256                let block = core_fixture.core.last_proposed_block();
3257                // ensure that produced block is referring to the blocks of last_round
3258                assert_eq!(
3259                    block.ancestors().len(),
3260                    core_fixture.core.context.committee.size()
3261                );
3262                for ancestor in block.ancestors() {
3263                    if block.round() > 1 {
3264                        // don't bother with round 1 block which just contains the genesis blocks.
3265                        assert!(
3266                            last_round_blocks
3267                                .iter()
3268                                .any(|block| block.reference() == *ancestor),
3269                            "Reference from previous round should be added"
3270                        );
3271                    }
3272                }
3273            }
3274            last_round_blocks = this_round_blocks;
3275        }
3276        for core_fixture in cores {
3277            // Check commits have been persisted to store
3278            let last_commit = core_fixture
3279                .store
3280                .read_last_commit()
3281                .unwrap()
3282                .expect("last commit should be set");
3283            // There are 28 leader rounds with rounds completed up to and including
3284            // round 29. Round 30 blocks will only include their own blocks, so the
3285            // 28th leader will not be committed.
3286            assert_eq!(last_commit.index(), 27);
3287            let all_stored_commits = core_fixture
3288                .store
3289                .scan_commits((0..=CommitIndex::MAX).into())
3290                .unwrap();
3291            assert_eq!(all_stored_commits.len(), 27);
3292            assert_eq!(
3293                core_fixture
3294                    .core
3295                    .leader_schedule
3296                    .leader_swap_table
3297                    .read()
3298                    .bad_nodes
3299                    .len(),
3300                1
3301            );
3302            assert_eq!(
3303                core_fixture
3304                    .core
3305                    .leader_schedule
3306                    .leader_swap_table
3307                    .read()
3308                    .good_nodes
3309                    .len(),
3310                1
3311            );
3312            let expected_reputation_scores =
3313                ReputationScores::new((11..=20).into(), vec![9, 8, 8, 8]);
3314            assert_eq!(
3315                core_fixture
3316                    .core
3317                    .leader_schedule
3318                    .leader_swap_table
3319                    .read()
3320                    .reputation_scores,
3321                expected_reputation_scores
3322            );
3323        }
3324    }
3325
3326    #[tokio::test]
3327    async fn test_validate_certified_commits() {
3328        telemetry_subscribers::init_for_testing();
3329
3330        let (context, _key_pairs) = Context::new_for_test(4);
3331        let context = context.with_parameters(Parameters {
3332            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3333            ..Default::default()
3334        });
3335
3336        let authority_index = AuthorityIndex::new_for_test(0);
3337        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3338        let mut core = core.core;
3339
3340        // No new block should have been produced
3341        assert_eq!(
3342            core.last_proposed_round(),
3343            GENESIS_ROUND,
3344            "No block should have been created other than genesis"
3345        );
3346
3347        // create a DAG of 12 rounds
3348        let mut dag_builder = DagBuilder::new(core.context.clone());
3349        dag_builder.layers(1..=12).build();
3350
3351        // Store all blocks up to round 6 which should be enough to decide up to leader
3352        // 4
3353        dag_builder.print();
3354        let blocks = dag_builder.blocks(1..=6);
3355
3356        for block in blocks {
3357            core.dag_state.write().accept_block(block);
3358        }
3359
3360        // Get all the committed sub dags up to round 10
3361        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3362
3363        // Now try to commit up to the latest leader (round = 4). Do not provide any
3364        // certified commits.
3365        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3366
3367        // We should have committed up to round 4
3368        assert_eq!(committed_sub_dags.len(), 4);
3369
3370        // Now validate the certified commits. We'll try 3 different scenarios:
3371        println!("Case 1. Provide certified commits that are all before the last committed round.");
3372
3373        // Highest certified commit should be for leader of round 4.
3374        let certified_commits = sub_dags_and_commits
3375            .iter()
3376            .take(4)
3377            .map(|(_, c)| c)
3378            .cloned()
3379            .collect::<Vec<_>>();
3380        assert!(
3381            certified_commits.last().unwrap().index()
3382                <= committed_sub_dags.last().unwrap().commit_ref.index,
3383            "Highest certified commit should older than the highest committed index."
3384        );
3385
3386        let certified_commits = core.validate_certified_commits(certified_commits).unwrap();
3387
3388        // No commits should be processed
3389        assert!(certified_commits.is_empty());
3390
3391        println!("Case 2. Provide certified commits that are all after the last committed round.");
3392
3393        // Highest certified commit should be for leader of round 4.
3394        let certified_commits = sub_dags_and_commits
3395            .iter()
3396            .take(5)
3397            .map(|(_, c)| c.clone())
3398            .collect::<Vec<_>>();
3399
3400        let certified_commits = core
3401            .validate_certified_commits(certified_commits.clone())
3402            .unwrap();
3403
3404        // The certified commit of index 5 should be processed.
3405        assert_eq!(certified_commits.len(), 1);
3406        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3407
3408        println!(
3409            "Case 3. Provide certified commits where the first certified commit index is not the last_committed_index + 1."
3410        );
3411
3412        // Highest certified commit should be for leader of round 4.
3413        let certified_commits = sub_dags_and_commits
3414            .iter()
3415            .skip(5)
3416            .take(1)
3417            .map(|(_, c)| c.clone())
3418            .collect::<Vec<_>>();
3419
3420        let err = core
3421            .validate_certified_commits(certified_commits.clone())
3422            .unwrap_err();
3423        match err {
3424            ConsensusError::UnexpectedCertifiedCommitIndex {
3425                expected_commit_index: 5,
3426                commit_index: 6,
3427            } => (),
3428            _ => panic!("Unexpected error: {:?}", err),
3429        }
3430    }
3431
3432    #[tokio::test]
3433    async fn test_add_certified_commits() {
3434        telemetry_subscribers::init_for_testing();
3435
3436        let (context, _key_pairs) = Context::new_for_test(4);
3437        let context = context.with_parameters(Parameters {
3438            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3439            ..Default::default()
3440        });
3441
3442        let authority_index = AuthorityIndex::new_for_test(0);
3443        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3444        let store = core.store.clone();
3445        let mut core = core.core;
3446
3447        // No new block should have been produced
3448        assert_eq!(
3449            core.last_proposed_round(),
3450            GENESIS_ROUND,
3451            "No block should have been created other than genesis"
3452        );
3453
3454        // create a DAG of 12 rounds
3455        let mut dag_builder = DagBuilder::new(core.context.clone());
3456        dag_builder.layers(1..=12).build();
3457
3458        // Store all blocks up to round 6 which should be enough to decide up to leader
3459        // 4
3460        dag_builder.print();
3461        let blocks = dag_builder.blocks(1..=6);
3462
3463        for block in blocks {
3464            core.dag_state.write().accept_block(block);
3465        }
3466
3467        // Get all the committed sub dags up to round 10
3468        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3469
3470        // Now try to commit up to the latest leader (round = 4). Do not provide any
3471        // certified commits.
3472        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3473
3474        // We should have committed up to round 4
3475        assert_eq!(committed_sub_dags.len(), 4);
3476
3477        let last_commit = store
3478            .read_last_commit()
3479            .unwrap()
3480            .expect("Last commit should be set");
3481        assert_eq!(last_commit.reference().index, 4);
3482
3483        println!("Case 1. Provide no certified commits. No commit should happen.");
3484
3485        let last_commit = store
3486            .read_last_commit()
3487            .unwrap()
3488            .expect("Last commit should be set");
3489        assert_eq!(last_commit.reference().index, 4);
3490
3491        println!(
3492            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3493        );
3494
3495        // The commits of leader rounds 5-8 should be committed via the certified
3496        // commits.
3497        let certified_commits = sub_dags_and_commits
3498            .iter()
3499            .skip(3)
3500            .take(5)
3501            .map(|(_, c)| c.clone())
3502            .collect::<Vec<_>>();
3503
3504        // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be
3505        // accepted via the certified commits processing.
3506        let blocks = dag_builder.blocks(8..=12);
3507        for block in blocks {
3508            core.dag_state.write().accept_block(block);
3509        }
3510
3511        // The corresponding blocks of the certified commits should be accepted and
3512        // stored before linearizing and committing the DAG.
3513        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3514            .expect("Should not fail");
3515
3516        let commits = store.scan_commits((6..=10).into()).unwrap();
3517
3518        // We expect all the sub dags up to leader round 10 to be committed.
3519        assert_eq!(commits.len(), 5);
3520
3521        for i in 6..=10 {
3522            let commit = &commits[i - 6];
3523            assert_eq!(commit.reference().index, i as u32);
3524        }
3525    }
3526
3527    #[tokio::test]
3528    async fn try_commit_with_certified_commits_gced_blocks() {
3529        const GC_DEPTH: u32 = 3;
3530        telemetry_subscribers::init_for_testing();
3531
3532        let (mut context, mut key_pairs) = Context::new_for_test(5);
3533        context
3534            .protocol_config
3535            .set_consensus_gc_depth_for_testing(GC_DEPTH);
3536        // context.protocol_config.
3537        // set_narwhal_new_leader_election_schedule_for_testing(val);
3538        let context = Arc::new(context.with_parameters(Parameters {
3539            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3540            ..Default::default()
3541        }));
3542
3543        let store = Arc::new(MemStore::new());
3544        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3545
3546        let block_manager = BlockManager::new(
3547            context.clone(),
3548            dag_state.clone(),
3549            Arc::new(NoopBlockVerifier),
3550        );
3551        let leader_schedule = Arc::new(
3552            LeaderSchedule::from_store(context.clone(), dag_state.clone())
3553                .with_num_commits_per_schedule(10),
3554        );
3555
3556        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3557        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3558        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3559        // Need at least one subscriber to the block broadcast channel.
3560        let _block_receiver = signal_receivers.block_broadcast_receiver();
3561
3562        let (sender, _receiver) = unbounded_channel("consensus_output");
3563        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
3564        let commit_observer = CommitObserver::new(
3565            context.clone(),
3566            commit_consumer,
3567            dag_state.clone(),
3568            store.clone(),
3569            leader_schedule.clone(),
3570        );
3571
3572        let mut core = Core::new(
3573            context.clone(),
3574            leader_schedule,
3575            transaction_consumer,
3576            block_manager,
3577            true,
3578            commit_observer,
3579            signals,
3580            key_pairs.remove(context.own_index.value()).1,
3581            dag_state.clone(),
3582            true,
3583        );
3584
3585        // No new block should have been produced
3586        assert_eq!(
3587            core.last_proposed_round(),
3588            GENESIS_ROUND,
3589            "No block should have been created other than genesis"
3590        );
3591
3592        let dag_str = "DAG {
3593            Round 0 : { 5 },
3594            Round 1 : { * },
3595            Round 2 : {
3596                A -> [-E1],
3597                B -> [-E1],
3598                C -> [-E1],
3599                D -> [-E1],
3600            },
3601            Round 3 : {
3602                A -> [*],
3603                B -> [*],
3604                C -> [*],
3605                D -> [*],
3606            },
3607            Round 4 : {
3608                A -> [*],
3609                B -> [*],
3610                C -> [*],
3611                D -> [*],
3612            },
3613            Round 5 : {
3614                A -> [*],
3615                B -> [*],
3616                C -> [*],
3617                D -> [*],
3618                E -> [A4, B4, C4, D4, E1]
3619            },
3620            Round 6 : { * },
3621            Round 7 : { * },
3622        }";
3623
3624        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3625        dag_builder.print();
3626
3627        // Now get all the committed sub dags from the DagBuilder
3628        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3629            .get_sub_dag_and_certified_commits(1..=5)
3630            .into_iter()
3631            .unzip();
3632
3633        // Now try to commit up to the latest leader (round = 5) with the provided
3634        // certified commits. Not that we have not accepted any blocks. That
3635        // should happen during the commit process.
3636        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3637
3638        // We should have committed up to round 4
3639        assert_eq!(committed_sub_dags.len(), 4);
3640        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3641            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3642
3643            // ensure that block from E1 node has not been committed
3644            for block in committed_sub_dag.blocks.iter() {
3645                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3646                    panic!("Did not expect to commit block E1");
3647                }
3648            }
3649        }
3650    }
3651
3652    #[tokio::test(flavor = "current_thread", start_paused = true)]
3653    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3654        telemetry_subscribers::init_for_testing();
3655        let default_params = Parameters::default();
3656
3657        let (context, _) = Context::new_for_test(6);
3658
3659        // create the cores and their signals for all the authorities
3660        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3661
3662        // Now iterate over a few rounds and ensure the corresponding signals are
3663        // created while network advances
3664        let mut last_round_blocks = Vec::new();
3665        for round in 1..=33 {
3666            let mut this_round_blocks = Vec::new();
3667            // Wait for min round delay to allow blocks to be proposed.
3668            sleep(default_params.min_round_delay).await;
3669            for core_fixture in &mut cores {
3670                // add the blocks from last round
3671                // this will trigger a block creation for the round and a signal should be
3672                // emitted
3673                core_fixture
3674                    .core
3675                    .add_blocks(last_round_blocks.clone())
3676                    .unwrap();
3677                // A "new round" signal should be received given that all the blocks of previous
3678                // round have been processed
3679                let new_round = receive(
3680                    Duration::from_secs(1),
3681                    core_fixture.signal_receivers.new_round_receiver(),
3682                )
3683                .await;
3684                assert_eq!(new_round, round);
3685                // Check that a new block has been proposed.
3686                let extended_block = tokio::time::timeout(
3687                    Duration::from_secs(1),
3688                    core_fixture.block_receiver.recv(),
3689                )
3690                .await
3691                .unwrap()
3692                .unwrap();
3693                assert_eq!(extended_block.block.round(), round);
3694                assert_eq!(
3695                    extended_block.block.author(),
3696                    core_fixture.core.context.own_index
3697                );
3698
3699                // append the new block to this round blocks
3700                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3701                let block = core_fixture.core.last_proposed_block();
3702                // ensure that produced block is referring to the blocks of last_round
3703                assert_eq!(
3704                    block.ancestors().len(),
3705                    core_fixture.core.context.committee.size()
3706                );
3707                for ancestor in block.ancestors() {
3708                    if block.round() > 1 {
3709                        // don't bother with round 1 block which just contains the genesis blocks.
3710                        assert!(
3711                            last_round_blocks
3712                                .iter()
3713                                .any(|block| block.reference() == *ancestor),
3714                            "Reference from previous round should be added"
3715                        );
3716                    }
3717                }
3718            }
3719            last_round_blocks = this_round_blocks;
3720        }
3721        for core_fixture in cores {
3722            // Check commits have been persisted to store
3723            let last_commit = core_fixture
3724                .store
3725                .read_last_commit()
3726                .unwrap()
3727                .expect("last commit should be set");
3728            // There are 31 leader rounds with rounds completed up to and including
3729            // round 33. Round 33 blocks will only include their own blocks, so there
3730            // should only be 30 commits.
3731            // However on a leader schedule change boundary its is possible for a
3732            // new leader to get selected for the same round if the leader elected
3733            // gets swapped allowing for multiple leaders to be committed at a round.
3734            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3735            // otherwise 31.
3736            // NOTE: We used 31 leader rounds to specifically trigger the scenario
3737            // where the leader schedule boundary occurred AND we had a swap to a new
3738            // leader for the same round
3739            let expected_commit_count = 30;
3740            // Leave the code for re-use.
3741            // let expected_commit_count = match num_leaders_per_round {
3742            //    Some(1) => 30,
3743            //    _ => 31,
3744            //};
3745            assert_eq!(last_commit.index(), expected_commit_count);
3746            let all_stored_commits = core_fixture
3747                .store
3748                .scan_commits((0..=CommitIndex::MAX).into())
3749                .unwrap();
3750            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3751            assert_eq!(
3752                core_fixture
3753                    .core
3754                    .leader_schedule
3755                    .leader_swap_table
3756                    .read()
3757                    .bad_nodes
3758                    .len(),
3759                1
3760            );
3761            assert_eq!(
3762                core_fixture
3763                    .core
3764                    .leader_schedule
3765                    .leader_swap_table
3766                    .read()
3767                    .good_nodes
3768                    .len(),
3769                1
3770            );
3771            let expected_reputation_scores =
3772                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3773            assert_eq!(
3774                core_fixture
3775                    .core
3776                    .leader_schedule
3777                    .leader_swap_table
3778                    .read()
3779                    .reputation_scores,
3780                expected_reputation_scores
3781            );
3782        }
3783    }
3784
3785    // TODO: Remove two tests below this when DistributedVoteScoring is enabled.
3786    #[tokio::test(flavor = "current_thread", start_paused = true)]
3787    async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring()
3788    {
3789        telemetry_subscribers::init_for_testing();
3790        let default_params = Parameters::default();
3791
3792        let (mut context, _) = Context::new_for_test(6);
3793        context
3794            .protocol_config
3795            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3796
3797        // create the cores and their signals for all the authorities
3798        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3799        // Now iterate over a few rounds and ensure the corresponding signals are
3800        // created while network advances
3801        let mut last_round_blocks = Vec::new();
3802        for round in 1..=63 {
3803            let mut this_round_blocks = Vec::new();
3804
3805            // Wait for min round delay to allow blocks to be proposed.
3806            sleep(default_params.min_round_delay).await;
3807
3808            for core_fixture in &mut cores {
3809                // add the blocks from last round
3810                // this will trigger a block creation for the round and a signal should be
3811                // emitted
3812                core_fixture
3813                    .core
3814                    .add_blocks(last_round_blocks.clone())
3815                    .unwrap();
3816
3817                // A "new round" signal should be received given that all the blocks of previous
3818                // round have been processed
3819                let new_round = receive(
3820                    Duration::from_secs(1),
3821                    core_fixture.signal_receivers.new_round_receiver(),
3822                )
3823                .await;
3824                assert_eq!(new_round, round);
3825
3826                // Check that a new block has been proposed.
3827                let extended_block = tokio::time::timeout(
3828                    Duration::from_secs(1),
3829                    core_fixture.block_receiver.recv(),
3830                )
3831                .await
3832                .unwrap()
3833                .unwrap();
3834                assert_eq!(extended_block.block.round(), round);
3835                assert_eq!(
3836                    extended_block.block.author(),
3837                    core_fixture.core.context.own_index
3838                );
3839
3840                // append the new block to this round blocks
3841                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3842
3843                let block = core_fixture.core.last_proposed_block();
3844
3845                // ensure that produced block is referring to the blocks of last_round
3846                assert_eq!(
3847                    block.ancestors().len(),
3848                    core_fixture.core.context.committee.size()
3849                );
3850                for ancestor in block.ancestors() {
3851                    if block.round() > 1 {
3852                        // don't bother with round 1 block which just contains the genesis blocks.
3853                        assert!(
3854                            last_round_blocks
3855                                .iter()
3856                                .any(|block| block.reference() == *ancestor),
3857                            "Reference from previous round should be added"
3858                        );
3859                    }
3860                }
3861            }
3862
3863            last_round_blocks = this_round_blocks;
3864        }
3865
3866        for core_fixture in cores {
3867            // Check commits have been persisted to store
3868            let last_commit = core_fixture
3869                .store
3870                .read_last_commit()
3871                .unwrap()
3872                .expect("last commit should be set");
3873            // There are 61 leader rounds with rounds completed up to and including
3874            // round 63. Round 63 blocks will only include their own blocks, so there
3875            // should only be 60 commits.
3876            // However on a leader schedule change boundary its is possible for a
3877            // new leader to get selected for the same round if the leader elected
3878            // gets swapped allowing for multiple leaders to be committed at a round.
3879            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3880            // otherwise 61.
3881            // NOTE: We used 61 leader rounds to specifically trigger the scenario
3882            // where the leader schedule boundary occurred AND we had a swap to a new
3883            // leader for the same round
3884            let expected_commit_count = 60;
3885            // Leave the code for re-use.
3886            // let expected_commit_count = match num_leaders_per_round {
3887            //    Some(1) => 60,
3888            //    _ => 61,
3889            //};
3890            assert_eq!(last_commit.index(), expected_commit_count);
3891            let all_stored_commits = core_fixture
3892                .store
3893                .scan_commits((0..=CommitIndex::MAX).into())
3894                .unwrap();
3895            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3896            assert_eq!(
3897                core_fixture
3898                    .core
3899                    .leader_schedule
3900                    .leader_swap_table
3901                    .read()
3902                    .bad_nodes
3903                    .len(),
3904                1
3905            );
3906            assert_eq!(
3907                core_fixture
3908                    .core
3909                    .leader_schedule
3910                    .leader_swap_table
3911                    .read()
3912                    .good_nodes
3913                    .len(),
3914                1
3915            );
3916            let expected_reputation_scores =
3917                ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
3918            assert_eq!(
3919                core_fixture
3920                    .core
3921                    .leader_schedule
3922                    .leader_swap_table
3923                    .read()
3924                    .reputation_scores,
3925                expected_reputation_scores
3926            );
3927        }
3928    }
3929
3930    #[tokio::test]
3931    async fn test_core_signals() {
3932        telemetry_subscribers::init_for_testing();
3933        let default_params = Parameters::default();
3934
3935        let (context, _) = Context::new_for_test(4);
3936        // create the cores and their signals for all the authorities
3937        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3938
3939        // Now iterate over a few rounds and ensure the corresponding signals are
3940        // created while network advances
3941        let mut last_round_blocks = Vec::new();
3942        for round in 1..=10 {
3943            let mut this_round_blocks = Vec::new();
3944
3945            // Wait for min round delay to allow blocks to be proposed.
3946            sleep(default_params.min_round_delay).await;
3947
3948            for core_fixture in &mut cores {
3949                // add the blocks from last round
3950                // this will trigger a block creation for the round and a signal should be
3951                // emitted
3952                core_fixture
3953                    .core
3954                    .add_blocks(last_round_blocks.clone())
3955                    .unwrap();
3956
3957                // A "new round" signal should be received given that all the blocks of previous
3958                // round have been processed
3959                let new_round = receive(
3960                    Duration::from_secs(1),
3961                    core_fixture.signal_receivers.new_round_receiver(),
3962                )
3963                .await;
3964                assert_eq!(new_round, round);
3965
3966                // Check that a new block has been proposed.
3967                let extended_block = tokio::time::timeout(
3968                    Duration::from_secs(1),
3969                    core_fixture.block_receiver.recv(),
3970                )
3971                .await
3972                .unwrap()
3973                .unwrap();
3974                assert_eq!(extended_block.block.round(), round);
3975                assert_eq!(
3976                    extended_block.block.author(),
3977                    core_fixture.core.context.own_index
3978                );
3979
3980                // append the new block to this round blocks
3981                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3982
3983                let block = core_fixture.core.last_proposed_block();
3984
3985                // ensure that produced block is referring to the blocks of last_round
3986                assert_eq!(
3987                    block.ancestors().len(),
3988                    core_fixture.core.context.committee.size()
3989                );
3990                for ancestor in block.ancestors() {
3991                    if block.round() > 1 {
3992                        // don't bother with round 1 block which just contains the genesis blocks.
3993                        assert!(
3994                            last_round_blocks
3995                                .iter()
3996                                .any(|block| block.reference() == *ancestor),
3997                            "Reference from previous round should be added"
3998                        );
3999                    }
4000                }
4001            }
4002
4003            last_round_blocks = this_round_blocks;
4004        }
4005
4006        for core_fixture in cores {
4007            // Check commits have been persisted to store
4008            let last_commit = core_fixture
4009                .store
4010                .read_last_commit()
4011                .unwrap()
4012                .expect("last commit should be set");
4013            // There are 8 leader rounds with rounds completed up to and including
4014            // round 9. Round 10 blocks will only include their own blocks, so the
4015            // 8th leader will not be committed.
4016            assert_eq!(last_commit.index(), 7);
4017            let all_stored_commits = core_fixture
4018                .store
4019                .scan_commits((0..=CommitIndex::MAX).into())
4020                .unwrap();
4021            assert_eq!(all_stored_commits.len(), 7);
4022        }
4023    }
4024
4025    #[tokio::test]
4026    async fn test_core_compress_proposal_references() {
4027        telemetry_subscribers::init_for_testing();
4028        let default_params = Parameters::default();
4029
4030        let (context, _) = Context::new_for_test(4);
4031        // create the cores and their signals for all the authorities
4032        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4033
4034        let mut last_round_blocks = Vec::new();
4035        let mut all_blocks = Vec::new();
4036
4037        let excluded_authority = AuthorityIndex::new_for_test(3);
4038
4039        for round in 1..=10 {
4040            let mut this_round_blocks = Vec::new();
4041
4042            for core_fixture in &mut cores {
4043                // do not produce any block for authority 3
4044                if core_fixture.core.context.own_index == excluded_authority {
4045                    continue;
4046                }
4047
4048                // try to propose to ensure that we are covering the case where we miss the
4049                // leader authority 3
4050                core_fixture
4051                    .core
4052                    .add_blocks(last_round_blocks.clone())
4053                    .unwrap();
4054                core_fixture.core.new_block(round, true).unwrap();
4055
4056                let block = core_fixture.core.last_proposed_block();
4057                assert_eq!(block.round(), round);
4058
4059                // append the new block to this round blocks
4060                this_round_blocks.push(block.clone());
4061            }
4062
4063            last_round_blocks = this_round_blocks.clone();
4064            all_blocks.extend(this_round_blocks);
4065        }
4066
4067        // Now send all the produced blocks to core of authority 3. It should produce a
4068        // new block. If no compression would be applied the we should expect
4069        // all the previous blocks to be referenced from round 0..=10. However, since
4070        // compression is applied only the last round's (10) blocks should be
4071        // referenced + the authority's block of round 0.
4072        let core_fixture = &mut cores[excluded_authority];
4073        // Wait for min round delay to allow blocks to be proposed.
4074        sleep(default_params.min_round_delay).await;
4075        // add blocks to trigger proposal.
4076        core_fixture.core.add_blocks(all_blocks).unwrap();
4077
4078        // Assert that a block has been created for round 11 and it references to blocks
4079        // of round 10 for the other peers, and to round 1 for its own block
4080        // (created after recovery).
4081        let block = core_fixture.core.last_proposed_block();
4082        assert_eq!(block.round(), 11);
4083        assert_eq!(block.ancestors().len(), 4);
4084        for block_ref in block.ancestors() {
4085            if block_ref.author == excluded_authority {
4086                assert_eq!(block_ref.round, 1);
4087            } else {
4088                assert_eq!(block_ref.round, 10);
4089            }
4090        }
4091
4092        // Check commits have been persisted to store
4093        let last_commit = core_fixture
4094            .store
4095            .read_last_commit()
4096            .unwrap()
4097            .expect("last commit should be set");
4098        // There are 8 leader rounds with rounds completed up to and including
4099        // round 10. However because there were no blocks produced for authority 3
4100        // 2 leader rounds will be skipped.
4101        assert_eq!(last_commit.index(), 6);
4102        let all_stored_commits = core_fixture
4103            .store
4104            .scan_commits((0..=CommitIndex::MAX).into())
4105            .unwrap();
4106        assert_eq!(all_stored_commits.len(), 6);
4107    }
4108
4109    #[tokio::test]
4110    async fn try_decide_certified() {
4111        // GIVEN
4112        telemetry_subscribers::init_for_testing();
4113
4114        let (context, _) = Context::new_for_test(4);
4115
4116        let authority_index = AuthorityIndex::new_for_test(0);
4117        let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
4118        let mut core = core.core;
4119
4120        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4121        dag_builder.layers(1..=12).build();
4122
4123        let limit = 2;
4124
4125        let blocks = dag_builder.blocks(1..=12);
4126
4127        for block in blocks {
4128            core.dag_state.write().accept_block(block);
4129        }
4130
4131        // WHEN
4132        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4133        let mut certified_commits = sub_dags_and_commits
4134            .into_iter()
4135            .map(|(_, commit)| commit)
4136            .collect::<Vec<_>>();
4137
4138        let leaders = core.try_decide_certified(&mut certified_commits, limit);
4139
4140        // THEN
4141        assert_eq!(leaders.len(), 2);
4142        assert_eq!(certified_commits.len(), 2);
4143    }
4144
4145    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4146        tokio::time::timeout(timeout, receiver.changed())
4147            .await
4148            .expect("Timeout while waiting to read from receiver")
4149            .expect("Signal receive channel shouldn't be closed");
4150        *receiver.borrow_and_update()
4151    }
4152}