consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    iter, mem,
8    sync::Arc,
9    time::Duration,
10    vec,
11};
12
13use consensus_config::{AuthorityIndex, ProtocolKeyPair};
14#[cfg(test)]
15use consensus_config::{Stake, local_committee_and_keys};
16use iota_macros::fail_point;
17#[cfg(test)]
18use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
19use iota_metrics::monitored_scope;
20use itertools::Itertools as _;
21use parking_lot::RwLock;
22use tokio::{
23    sync::{broadcast, watch},
24    time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30    CommitConsumer, TransactionClient, block_verifier::NoopBlockVerifier,
31    storage::mem_store::MemStore,
32};
33use crate::{
34    ancestor::{AncestorState, AncestorStateManager},
35    block::{
36        Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, ExtendedBlock, GENESIS_ROUND, Round,
37        SignedBlock, Slot, VerifiedBlock,
38    },
39    block_manager::BlockManager,
40    commit::{
41        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42    },
43    commit_observer::CommitObserver,
44    context::Context,
45    dag_state::DagState,
46    error::{ConsensusError, ConsensusResult},
47    leader_schedule::LeaderSchedule,
48    round_prober::QuorumRound,
49    stake_aggregator::{QuorumThreshold, StakeAggregator},
50    transaction::TransactionConsumer,
51    universal_committer::{
52        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
53    },
54};
55
56// Maximum number of commit votes to include in a block.
57// TODO: Move to protocol config, and verify in BlockVerifier.
58const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
59
60pub(crate) struct Core {
61    context: Arc<Context>,
62    /// The consumer to use in order to pull transactions to be included for the
63    /// next proposals
64    transaction_consumer: TransactionConsumer,
65    /// The block manager which is responsible for keeping track of the DAG
66    /// dependencies when processing new blocks and accept them or suspend
67    /// if we are missing their causal history
68    block_manager: BlockManager,
69    /// Whether there is a quorum of 2f+1 subscribers waiting for new blocks
70    /// proposed by this authority. Core stops proposing new blocks when
71    /// there is not enough subscribers, because new proposed blocks will
72    /// not be sufficiently propagated to the network.
73    quorum_subscribers_exists: bool,
74    /// Estimated delay by round for propagating blocks to a quorum.
75    /// Because of the nature of TCP and block streaming, propagation delay is
76    /// expected to be 0 in most cases, even when the actual latency of
77    /// broadcasting blocks is high. When this value is higher than the
78    /// `propagation_delay_stop_proposal_threshold`, most likely this
79    /// validator cannot broadcast  blocks to the network at all. Core stops
80    /// proposing new blocks in this case.
81    propagation_delay: Round,
82
83    /// Used to make commit decisions for leader blocks in the dag.
84    committer: UniversalCommitter,
85    /// The last new round for which core has sent out a signal.
86    last_signaled_round: Round,
87    /// The blocks of the last included ancestors per authority. This vector is
88    /// basically used as a watermark in order to include in the next block
89    /// proposal only ancestors of higher rounds. By default, is initialised
90    /// with `None` values.
91    last_included_ancestors: Vec<Option<BlockRef>>,
92    /// The last decided leader returned from the universal committer. Important
93    /// to note that this does not signify that the leader has been
94    /// persisted yet as it still has to go through CommitObserver and
95    /// persist the commit in store. On recovery/restart
96    /// the last_decided_leader will be set to the last_commit leader in dag
97    /// state.
98    last_decided_leader: Slot,
99    /// The consensus leader schedule to be used to resolve the leader for a
100    /// given round.
101    leader_schedule: Arc<LeaderSchedule>,
102    /// The commit observer is responsible for observing the commits and
103    /// collecting
104    /// + sending subdags over the consensus output channel.
105    commit_observer: CommitObserver,
106    /// Sender of outgoing signals from Core.
107    signals: CoreSignals,
108    /// The keypair to be used for block signing
109    block_signer: ProtocolKeyPair,
110    /// Keeping track of state of the DAG, including blocks, commits and last
111    /// committed rounds.
112    dag_state: Arc<RwLock<DagState>>,
113    /// The last known round for which the node has proposed. Any proposal
114    /// should be for a round > of this. This is currently being used to
115    /// avoid equivocations during a node recovering from amnesia. When value is
116    /// None it means that the last block sync mechanism is enabled, but it
117    /// hasn't been initialised yet.
118    last_known_proposed_round: Option<Round>,
119    // The ancestor state manager will keep track of the quality of the authorities
120    // based on the distribution of their blocks to the network. It will use this
121    // information to decide whether to include that authority block in the next
122    // proposal or not.
123    ancestor_state_manager: AncestorStateManager,
124}
125
126impl Core {
127    pub(crate) fn new(
128        context: Arc<Context>,
129        leader_schedule: Arc<LeaderSchedule>,
130        transaction_consumer: TransactionConsumer,
131        block_manager: BlockManager,
132        subscriber_exists: bool,
133        commit_observer: CommitObserver,
134        signals: CoreSignals,
135        block_signer: ProtocolKeyPair,
136        dag_state: Arc<RwLock<DagState>>,
137        sync_last_known_own_block: bool,
138    ) -> Self {
139        let last_decided_leader = dag_state.read().last_commit_leader();
140        let committer = UniversalCommitterBuilder::new(
141            context.clone(),
142            leader_schedule.clone(),
143            dag_state.clone(),
144        )
145        // we want to keep with_number_of_leaders param to be able to enable it in a future protocol
146        // upgrade
147        .with_number_of_leaders(1)
148        .with_pipeline(true)
149        .build();
150
151        // Recover the last proposed block
152        let last_proposed_block = dag_state.read().get_last_proposed_block();
153
154        let last_signaled_round = last_proposed_block.round();
155
156        // Recover the last included ancestor rounds based on the last proposed block.
157        // That will allow to perform the next block proposal by using ancestor
158        // blocks of higher rounds and avoid re-including blocks that have been
159        // already included in the last (or earlier) block proposal.
160        // This is only strongly guaranteed for a quorum of ancestors. It is still
161        // possible to re-include a block from an authority which hadn't been
162        // added as part of the last proposal hence its latest included ancestor
163        // is not accurately captured here. This is considered a small deficiency,
164        // and it mostly matters just for this next proposal without any actual
165        // penalties in performance or block proposal.
166        let mut last_included_ancestors = vec![None; context.committee.size()];
167        for ancestor in last_proposed_block.ancestors() {
168            last_included_ancestors[ancestor.author] = Some(*ancestor);
169        }
170
171        let min_propose_round = if sync_last_known_own_block {
172            None
173        } else {
174            // if the sync is disabled then we practically don't want to impose any
175            // restriction.
176            Some(0)
177        };
178
179        let propagation_scores = leader_schedule
180            .leader_swap_table
181            .read()
182            .reputation_scores
183            .clone();
184        let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
185        ancestor_state_manager.set_propagation_scores(propagation_scores);
186
187        Self {
188            context,
189            last_signaled_round,
190            last_included_ancestors,
191            last_decided_leader,
192            leader_schedule,
193            transaction_consumer,
194            block_manager,
195            quorum_subscribers_exists: subscriber_exists,
196            propagation_delay: 0,
197            committer,
198            commit_observer,
199            signals,
200            block_signer,
201            dag_state,
202            last_known_proposed_round: min_propose_round,
203            ancestor_state_manager,
204        }
205        .recover()
206    }
207
208    fn recover(mut self) -> Self {
209        let _s = self
210            .context
211            .metrics
212            .node_metrics
213            .scope_processing_time
214            .with_label_values(&["Core::recover"])
215            .start_timer();
216        // Ensure local time is after max ancestor timestamp.
217        let ancestor_blocks = self
218            .dag_state
219            .read()
220            .get_last_cached_block_per_authority(Round::MAX);
221        let max_ancestor_timestamp = ancestor_blocks
222            .iter()
223            .fold(0, |ts, (b, _)| ts.max(b.timestamp_ms()));
224        let wait_ms = max_ancestor_timestamp.saturating_sub(self.context.clock.timestamp_utc_ms());
225        if wait_ms > 0 {
226            warn!(
227                "Waiting for {} ms while recovering ancestors from storage",
228                wait_ms
229            );
230            std::thread::sleep(Duration::from_millis(wait_ms));
231        }
232
233        // Try to commit and propose, since they may not have run after the last storage
234        // write.
235        self.try_commit(vec![]).unwrap();
236        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
237        {
238            last_proposed_block
239        } else {
240            let last_proposed_block = self.dag_state.read().get_last_proposed_block();
241            if self.should_propose() {
242                assert!(
243                    last_proposed_block.round() > GENESIS_ROUND,
244                    "At minimum a block of round higher than genesis should have been produced during recovery"
245                );
246            }
247
248            // if no new block proposed then just re-broadcast the last proposed one to
249            // ensure liveness.
250            self.signals
251                .new_block(ExtendedBlock {
252                    block: last_proposed_block.clone(),
253                    excluded_ancestors: vec![],
254                })
255                .unwrap();
256            last_proposed_block
257        };
258
259        // Try to set up leader timeout if needed.
260        // This needs to be called after try_commit() and try_propose(), which may
261        // have advanced the threshold clock round.
262        self.try_signal_new_round();
263
264        info!(
265            "Core recovery completed with last proposed block {:?}",
266            last_proposed_block
267        );
268
269        self
270    }
271
272    /// Processes the provided blocks and accepts them if possible when their
273    /// causal history exists. The method returns:
274    /// - The references of ancestors missing their block
275    #[tracing::instrument(skip_all)]
276    pub(crate) fn add_blocks(
277        &mut self,
278        blocks: Vec<VerifiedBlock>,
279    ) -> ConsensusResult<BTreeSet<BlockRef>> {
280        let _scope = monitored_scope("Core::add_blocks");
281        let _s = self
282            .context
283            .metrics
284            .node_metrics
285            .scope_processing_time
286            .with_label_values(&["Core::add_blocks"])
287            .start_timer();
288        self.context
289            .metrics
290            .node_metrics
291            .core_add_blocks_batch_size
292            .observe(blocks.len() as f64);
293
294        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
295
296        if !accepted_blocks.is_empty() {
297            debug!(
298                "Accepted blocks: {}",
299                accepted_blocks
300                    .iter()
301                    .map(|b| b.reference().to_string())
302                    .join(",")
303            );
304
305            // Try to commit the new blocks if possible.
306            self.try_commit(vec![])?;
307
308            // Try to propose now since there are new blocks accepted.
309            self.try_propose(false)?;
310
311            // Now set up leader timeout if needed.
312            // This needs to be called after try_commit() and try_propose(), which may
313            // have advanced the threshold clock round.
314            self.try_signal_new_round();
315        }
316
317        if !missing_block_refs.is_empty() {
318            trace!(
319                "Missing block refs: {}",
320                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
321            );
322        }
323        Ok(missing_block_refs)
324    }
325
326    /// Checks if provided block refs have been accepted. If not, missing block
327    /// refs are kept for synchronizations. Returns the references of
328    /// missing blocks among the input blocks.
329    pub(crate) fn check_block_refs(
330        &mut self,
331        block_refs: Vec<BlockRef>,
332    ) -> ConsensusResult<BTreeSet<BlockRef>> {
333        let _scope = monitored_scope("Core::check_block_refs");
334        let _s = self
335            .context
336            .metrics
337            .node_metrics
338            .scope_processing_time
339            .with_label_values(&["Core::check_block_refs"])
340            .start_timer();
341        self.context
342            .metrics
343            .node_metrics
344            .core_check_block_refs_batch_size
345            .observe(block_refs.len() as f64);
346
347        // Try to find them via the block manager
348        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
349
350        if !missing_block_refs.is_empty() {
351            trace!(
352                "Missing block refs: {}",
353                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
354            );
355        }
356        Ok(missing_block_refs)
357    }
358
359    // Adds the certified commits that have been synced via the commit syncer. We
360    // are using the commit info in order to skip running the decision
361    // rule and immediately commit the corresponding leaders and sub dags. Pay
362    // attention that no block acceptance is happening here, but rather
363    // internally in the `try_commit` method which ensures that everytime only the
364    // blocks corresponding to the certified commits that are about to
365    // be committed are accepted.
366    #[tracing::instrument(skip_all)]
367    pub(crate) fn add_certified_commits(
368        &mut self,
369        certified_commits: CertifiedCommits,
370    ) -> ConsensusResult<BTreeSet<BlockRef>> {
371        let _scope = monitored_scope("Core::add_certified_commits");
372
373        // We want to enable the commit process logic when GC is enabled.
374        if self.dag_state.read().gc_enabled() {
375            let votes = certified_commits.votes().to_vec();
376            let commits = self
377                .validate_certified_commits(certified_commits.commits().to_vec())
378                .expect("Certified commits validation failed");
379
380            // Accept the certified commit votes. This is optimistically done to increase
381            // the chances of having votes available when this node will need to
382            // sync commits to other nodes.
383            self.block_manager.try_accept_blocks(votes);
384
385            // Try to commit the new blocks. Take into account the trusted commit that has
386            // been provided.
387            self.try_commit(commits)?;
388
389            // Try to propose now since there are new blocks accepted.
390            self.try_propose(false)?;
391
392            // Now set up leader timeout if needed.
393            // This needs to be called after try_commit() and try_propose(), which may
394            // have advanced the threshold clock round.
395            self.try_signal_new_round();
396
397            return Ok(BTreeSet::new());
398        }
399
400        // If GC is not enabled then process blocks as usual.
401        let blocks = certified_commits
402            .commits()
403            .iter()
404            .flat_map(|commit| commit.blocks())
405            .cloned()
406            .collect::<Vec<_>>();
407
408        self.add_blocks(blocks)
409    }
410
411    /// If needed, signals a new clock round and sets up leader timeout.
412    fn try_signal_new_round(&mut self) {
413        // Signal only when the threshold clock round is more advanced than the last
414        // signaled round.
415        //
416        // NOTE: a signal is still sent even when a block has been proposed at the new
417        // round. We can consider changing this in the future.
418        let new_clock_round = self.dag_state.read().threshold_clock_round();
419        if new_clock_round <= self.last_signaled_round {
420            return;
421        }
422        // Then send a signal to set up leader timeout.
423        self.signals.new_round(new_clock_round);
424        self.last_signaled_round = new_clock_round;
425
426        // Report the threshold clock round
427        self.context
428            .metrics
429            .node_metrics
430            .threshold_clock_round
431            .set(new_clock_round as i64);
432    }
433
434    /// Creating a new block for the dictated round. This is used when a leader
435    /// timeout occurs, either when the min timeout expires or max. When
436    /// `force = true` , then any checks like previous round
437    /// leader existence will get skipped.
438    pub(crate) fn new_block(
439        &mut self,
440        round: Round,
441        force: bool,
442    ) -> ConsensusResult<Option<VerifiedBlock>> {
443        let _scope = monitored_scope("Core::new_block");
444        if self.last_proposed_round() < round {
445            self.context
446                .metrics
447                .node_metrics
448                .leader_timeout_total
449                .with_label_values(&[&format!("{force}")])
450                .inc();
451            let result = self.try_propose(force);
452            // The threshold clock round may have advanced, so a signal needs to be sent.
453            self.try_signal_new_round();
454            return result;
455        }
456        Ok(None)
457    }
458
459    /// Keeps only the certified commits that have a commit index > last commit
460    /// index. It also ensures that the first commit in the list is the next one
461    /// in line, otherwise it panics.
462    fn validate_certified_commits(
463        &mut self,
464        commits: Vec<CertifiedCommit>,
465    ) -> ConsensusResult<Vec<CertifiedCommit>> {
466        // Filter out the commits that have been already locally committed and keep only
467        // anything that is above the last committed index.
468        let last_commit_index = self.dag_state.read().last_commit_index();
469        let commits = commits
470            .iter()
471            .filter(|commit| {
472                if commit.index() > last_commit_index {
473                    true
474                } else {
475                    tracing::debug!(
476                        "Skip commit for index {} as it is already committed with last commit index {}",
477                        commit.index(),
478                        last_commit_index
479                    );
480                    false
481                }
482            })
483            .cloned()
484            .collect::<Vec<_>>();
485
486        // Make sure that the first commit we find is the next one in line and there is
487        // no gap.
488        if let Some(commit) = commits.first() {
489            if commit.index() != last_commit_index + 1 {
490                return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
491                    expected_commit_index: last_commit_index + 1,
492                    commit_index: commit.index(),
493                });
494            }
495        }
496
497        Ok(commits)
498    }
499
500    // Attempts to create a new block, persist and propose it to all peers.
501    // When force is true, ignore if leader from the last round exists among
502    // ancestors and if the minimum round delay has passed.
503    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
504        if !self.should_propose() {
505            return Ok(None);
506        }
507        if let Some(extended_block) = self.try_new_block(force) {
508            self.signals.new_block(extended_block.clone())?;
509
510            fail_point!("consensus-after-propose");
511
512            // The new block may help commit.
513            self.try_commit(vec![])?;
514            return Ok(Some(extended_block.block));
515        }
516        Ok(None)
517    }
518
519    /// Attempts to propose a new block for the next round. If a block has
520    /// already proposed for latest or earlier round, then no block is
521    /// created and None is returned.
522    fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
523        let _s = self
524            .context
525            .metrics
526            .node_metrics
527            .scope_processing_time
528            .with_label_values(&["Core::try_new_block"])
529            .start_timer();
530
531        // Ensure the new block has a higher round than the last proposed block.
532        let clock_round = {
533            let dag_state = self.dag_state.read();
534            let clock_round = dag_state.threshold_clock_round();
535            if clock_round <= dag_state.get_last_proposed_block().round() {
536                return None;
537            }
538            clock_round
539        };
540
541        // There must be a quorum of blocks from the previous round.
542        let quorum_round = clock_round.saturating_sub(1);
543
544        // Create a new block either because we want to "forcefully" propose a block due
545        // to a leader timeout, or because we are actually ready to produce the
546        // block (leader exists and min delay has passed).
547        if !force {
548            if !self.leaders_exist(quorum_round) {
549                return None;
550            }
551
552            if Duration::from_millis(
553                self.context
554                    .clock
555                    .timestamp_utc_ms()
556                    .saturating_sub(self.last_proposed_timestamp_ms()),
557            ) < self.context.parameters.min_round_delay
558            {
559                return None;
560            }
561        }
562
563        // Determine the ancestors to be included in proposal.
564        // Smart ancestor selection requires distributed scoring to be enabled.
565        let (ancestors, excluded_ancestors) = if self
566            .context
567            .protocol_config
568            .consensus_distributed_vote_scoring_strategy()
569            && self
570                .context
571                .protocol_config
572                .consensus_smart_ancestor_selection()
573        {
574            let (ancestors, excluded_and_equivocating_ancestors) =
575                self.smart_ancestors_to_propose(clock_round, !force);
576
577            // If we did not find enough good ancestors to propose, continue to wait before
578            // proposing.
579            if ancestors.is_empty() {
580                assert!(
581                    !force,
582                    "Ancestors should have been returned if force is true!"
583                );
584                return None;
585            }
586
587            let excluded_ancestors_limit = self.context.committee.size() * 2;
588            if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
589                debug!(
590                    "Dropping {} excluded ancestor(s) during proposal due to size limit",
591                    excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
592                );
593            }
594            let excluded_ancestors = excluded_and_equivocating_ancestors
595                .into_iter()
596                .take(excluded_ancestors_limit)
597                .collect();
598
599            (ancestors, excluded_ancestors)
600        } else {
601            (self.ancestors_to_propose(clock_round), vec![])
602        };
603
604        // Update the last included ancestor block refs
605        for ancestor in &ancestors {
606            self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
607        }
608
609        let leader_authority = &self
610            .context
611            .committee
612            .authority(self.first_leader(quorum_round))
613            .hostname;
614        self.context
615            .metrics
616            .node_metrics
617            .block_proposal_leader_wait_ms
618            .with_label_values(&[leader_authority])
619            .inc_by(
620                Instant::now()
621                    .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
622                    .as_millis() as u64,
623            );
624        self.context
625            .metrics
626            .node_metrics
627            .block_proposal_leader_wait_count
628            .with_label_values(&[leader_authority])
629            .inc();
630
631        self.context
632            .metrics
633            .node_metrics
634            .proposed_block_ancestors
635            .observe(ancestors.len() as f64);
636        for ancestor in &ancestors {
637            let authority = &self.context.committee.authority(ancestor.author()).hostname;
638            self.context
639                .metrics
640                .node_metrics
641                .proposed_block_ancestors_depth
642                .with_label_values(&[authority])
643                .observe(clock_round.saturating_sub(ancestor.round()).into());
644        }
645
646        // Ensure ancestor timestamps are not more advanced than the current time.
647        // Also catch the issue if system's clock go backwards.
648        let now = self.context.clock.timestamp_utc_ms();
649        ancestors.iter().for_each(|block| {
650            assert!(
651                block.timestamp_ms() <= now,
652                "Violation: ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.",
653                block, block.timestamp_ms(), clock_round
654            );
655        });
656
657        // Consume the next transactions to be included. Do not drop the guards yet as
658        // this would acknowledge the inclusion of transactions. Just let this
659        // be done in the end of the method.
660        let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
661        self.context
662            .metrics
663            .node_metrics
664            .proposed_block_transactions
665            .observe(transactions.len() as f64);
666
667        // Consume the commit votes to be included.
668        let commit_votes = self
669            .dag_state
670            .write()
671            .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
672
673        // Create the block and insert to storage.
674        let block = Block::V1(BlockV1::new(
675            self.context.committee.epoch(),
676            clock_round,
677            self.context.own_index,
678            now,
679            ancestors.iter().map(|b| b.reference()).collect(),
680            transactions,
681            commit_votes,
682            vec![],
683        ));
684        let signed_block =
685            SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
686        let serialized = signed_block
687            .serialize()
688            .expect("Block serialization failed.");
689        self.context
690            .metrics
691            .node_metrics
692            .proposed_block_size
693            .observe(serialized.len() as f64);
694        // Own blocks are assumed to be valid.
695        let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
696
697        // Record the interval from last proposal, before accepting the proposed block.
698        let last_proposed_block = self.last_proposed_block();
699        if last_proposed_block.round() > 0 {
700            self.context
701                .metrics
702                .node_metrics
703                .block_proposal_interval
704                .observe(
705                    Duration::from_millis(
706                        verified_block
707                            .timestamp_ms()
708                            .saturating_sub(last_proposed_block.timestamp_ms()),
709                    )
710                    .as_secs_f64(),
711                );
712        }
713
714        // Accept the block into BlockManager and DagState.
715        let (accepted_blocks, missing) = self
716            .block_manager
717            .try_accept_blocks(vec![verified_block.clone()]);
718        assert_eq!(accepted_blocks.len(), 1);
719        assert!(missing.is_empty());
720
721        // Ensure the new block and its ancestors are persisted, before broadcasting it.
722        self.dag_state.write().flush();
723
724        // Now acknowledge the transactions for their inclusion to block
725        ack_transactions(verified_block.reference());
726
727        debug!("Created block {verified_block:?} for round {clock_round}");
728
729        self.context
730            .metrics
731            .node_metrics
732            .proposed_blocks
733            .with_label_values(&[&force.to_string()])
734            .inc();
735
736        Some(ExtendedBlock {
737            block: verified_block,
738            excluded_ancestors,
739        })
740    }
741
742    /// Runs commit rule to attempt to commit additional blocks from the DAG. If
743    /// any `certified_commits` are provided, then it will attempt to commit
744    /// those first before trying to commit any further leaders.
745    fn try_commit(
746        &mut self,
747        mut certified_commits: Vec<CertifiedCommit>,
748    ) -> ConsensusResult<Vec<CommittedSubDag>> {
749        let _s = self
750            .context
751            .metrics
752            .node_metrics
753            .scope_processing_time
754            .with_label_values(&["Core::try_commit"])
755            .start_timer();
756
757        let mut certified_commits_map = BTreeMap::new();
758        for c in &certified_commits {
759            certified_commits_map.insert(c.index(), c.reference());
760        }
761
762        if !certified_commits.is_empty() {
763            info!(
764                "Will try to commit synced commits first : {:?}",
765                certified_commits
766                    .iter()
767                    .map(|c| (c.index(), c.leader()))
768                    .collect::<Vec<_>>()
769            );
770        }
771
772        let mut committed_sub_dags = Vec::new();
773        // TODO: Add optimization to abort early without quorum for a round.
774        loop {
775            // LeaderSchedule has a limit to how many sequenced leaders can be committed
776            // before a change is triggered. Calling into leader schedule will get you
777            // how many commits till next leader change. We will loop back and recalculate
778            // any discarded leaders with the new schedule.
779            let mut commits_until_update = self
780                .leader_schedule
781                .commits_until_leader_schedule_update(self.dag_state.clone());
782
783            if commits_until_update == 0 {
784                let last_commit_index = self.dag_state.read().last_commit_index();
785
786                tracing::info!(
787                    "Leader schedule change triggered at commit index {last_commit_index}"
788                );
789                if self
790                    .context
791                    .protocol_config
792                    .consensus_distributed_vote_scoring_strategy()
793                {
794                    self.leader_schedule
795                        .update_leader_schedule_v2(&self.dag_state);
796
797                    let propagation_scores = self
798                        .leader_schedule
799                        .leader_swap_table
800                        .read()
801                        .reputation_scores
802                        .clone();
803                    self.ancestor_state_manager
804                        .set_propagation_scores(propagation_scores);
805                } else {
806                    self.leader_schedule
807                        .update_leader_schedule_v1(&self.dag_state);
808                }
809                commits_until_update = self
810                    .leader_schedule
811                    .commits_until_leader_schedule_update(self.dag_state.clone());
812
813                fail_point!("consensus-after-leader-schedule-change");
814            }
815            assert!(commits_until_update > 0);
816
817            // Always try to process the synced commits first. If there are certified
818            // commits to process then the decided leaders and the commits will be returned.
819            let (mut decided_leaders, decided_certified_commits): (
820                Vec<DecidedLeader>,
821                Vec<CertifiedCommit>,
822            ) = self
823                .try_decide_certified(&mut certified_commits, commits_until_update)
824                .into_iter()
825                .unzip();
826
827            // Only accept blocks for the certified commits that we are certain to sequence.
828            // This ensures that only blocks corresponding to committed certified commits
829            // are flushed to disk. Blocks from non-committed certified commits
830            // will not be flushed, preventing issues during crash-recovery.
831            // This avoids scenarios where accepting and flushing blocks of non-committed
832            // certified commits could lead to premature commit rule execution.
833            // Due to GC, this could cause a panic if the commit rule tries to access
834            // missing causal history from blocks of certified commits.
835            let blocks = decided_certified_commits
836                .iter()
837                .flat_map(|c| c.blocks())
838                .cloned()
839                .collect::<Vec<_>>();
840            self.block_manager.try_accept_committed_blocks(blocks);
841
842            // If the certified `decided_leaders` is empty then try to run the decision
843            // rule.
844            if decided_leaders.is_empty() {
845                // TODO: limit commits by commits_until_update, which may be needed when leader
846                // schedule length is reduced.
847                decided_leaders = self.committer.try_decide(self.last_decided_leader);
848
849                // Truncate the decided leaders to fit the commit schedule limit.
850                if decided_leaders.len() >= commits_until_update {
851                    let _ = decided_leaders.split_off(commits_until_update);
852                }
853            }
854
855            // If the decided leaders list is empty then just break the loop.
856            let Some(last_decided) = decided_leaders.last().cloned() else {
857                break;
858            };
859
860            self.last_decided_leader = last_decided.slot();
861
862            let sequenced_leaders = decided_leaders
863                .into_iter()
864                .filter_map(|leader| leader.into_committed_block())
865                .collect::<Vec<_>>();
866
867            tracing::debug!(
868                "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
869                sequenced_leaders.len()
870            );
871
872            self.context
873                .metrics
874                .node_metrics
875                .last_decided_leader_round
876                .set(self.last_decided_leader.round as i64);
877
878            // It's possible to reach this point as the decided leaders might all of them be
879            // "Skip" decisions. In this case there is no leader to commit and
880            // we should break the loop.
881            if sequenced_leaders.is_empty() {
882                break;
883            }
884
885            tracing::info!(
886                "Committing {} leaders: {}",
887                sequenced_leaders.len(),
888                sequenced_leaders
889                    .iter()
890                    .map(|b| b.reference().to_string())
891                    .join(",")
892            );
893
894            // TODO: refcount subdags
895            let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
896            if self
897                .context
898                .protocol_config
899                .consensus_distributed_vote_scoring_strategy()
900            {
901                self.dag_state.write().add_scoring_subdags(subdags.clone());
902            } else {
903                // TODO: Remove when DistributedVoteScoring is enabled.
904                self.dag_state
905                    .write()
906                    .add_unscored_committed_subdags(subdags.clone());
907            }
908
909            // Try to unsuspend blocks if gc_round has advanced.
910            self.block_manager
911                .try_unsuspend_blocks_for_latest_gc_round();
912            committed_sub_dags.extend(subdags);
913
914            fail_point!("consensus-after-handle-commit");
915        }
916
917        // Sanity check: for commits that have been linearized using the certified
918        // commits, ensure that the same sub dag has been committed.
919        for sub_dag in &committed_sub_dags {
920            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
921                assert_eq!(
922                    commit_ref, sub_dag.commit_ref,
923                    "Certified commit has different reference than the committed sub dag"
924                );
925            }
926        }
927
928        // Notify about our own committed blocks
929        let committed_block_refs = committed_sub_dags
930            .iter()
931            .flat_map(|sub_dag| sub_dag.blocks.iter())
932            .filter_map(|block| {
933                (block.author() == self.context.own_index).then_some(block.reference())
934            })
935            .collect::<Vec<_>>();
936        self.transaction_consumer
937            .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
938
939        Ok(committed_sub_dags)
940    }
941
942    pub(crate) fn get_missing_blocks(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
943        let _scope = monitored_scope("Core::get_missing_blocks");
944        self.block_manager.missing_blocks()
945    }
946
947    /// Sets if there is 2f+1 subscriptions to the block stream.
948    pub(crate) fn set_quorum_subscribers_exists(&mut self, exists: bool) {
949        info!("A quorum of block subscribers exists: {exists}");
950        self.quorum_subscribers_exists = exists;
951    }
952
953    /// Sets the delay by round for propagating blocks to a quorum and the
954    /// received & accepted quorum rounds per authority for ancestor state
955    /// manager.
956    pub(crate) fn set_propagation_delay_and_quorum_rounds(
957        &mut self,
958        delay: Round,
959        received_quorum_rounds: Vec<QuorumRound>,
960        accepted_quorum_rounds: Vec<QuorumRound>,
961    ) {
962        info!(
963            "Received quorum round per authority in ancestor state manager set to: {}",
964            self.context
965                .committee
966                .authorities()
967                .zip(received_quorum_rounds.iter())
968                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
969                .join(", ")
970        );
971        info!(
972            "Accepted quorum round per authority in ancestor state manager set to: {}",
973            self.context
974                .committee
975                .authorities()
976                .zip(accepted_quorum_rounds.iter())
977                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
978                .join(", ")
979        );
980        self.ancestor_state_manager
981            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
982        info!("Propagation round delay set to: {delay}");
983        self.propagation_delay = delay;
984    }
985
986    /// Sets the min propose round for the proposer allowing to propose blocks
987    /// only for round numbers `> last_known_proposed_round`. At the moment
988    /// is allowed to call the method only once leading to a panic
989    /// if attempt to do multiple times.
990    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
991        if self.last_known_proposed_round.is_some() {
992            panic!(
993                "Should not attempt to set the last known proposed round if that has been already set"
994            );
995        }
996        self.last_known_proposed_round = Some(round);
997        info!("Last known proposed round set to {round}");
998    }
999
1000    /// Whether the core should propose new blocks.
1001    pub(crate) fn should_propose(&self) -> bool {
1002        let clock_round = self.dag_state.read().threshold_clock_round();
1003        let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
1004
1005        if !self.quorum_subscribers_exists {
1006            debug!("Skip proposing for round {clock_round}, don't have a quorum of subscribers.");
1007            core_skipped_proposals
1008                .with_label_values(&["no_quorum_subscriber"])
1009                .inc();
1010            return false;
1011        }
1012
1013        if self.propagation_delay
1014            > self
1015                .context
1016                .parameters
1017                .propagation_delay_stop_proposal_threshold
1018        {
1019            debug!(
1020                "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
1021                self.propagation_delay,
1022                self.context
1023                    .parameters
1024                    .propagation_delay_stop_proposal_threshold
1025            );
1026            core_skipped_proposals
1027                .with_label_values(&["high_propagation_delay"])
1028                .inc();
1029            return false;
1030        }
1031
1032        let Some(last_known_proposed_round) = self.last_known_proposed_round else {
1033            debug!(
1034                "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
1035            );
1036            core_skipped_proposals
1037                .with_label_values(&["no_last_known_proposed_round"])
1038                .inc();
1039            return false;
1040        };
1041        if clock_round <= last_known_proposed_round {
1042            debug!(
1043                "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
1044            );
1045            core_skipped_proposals
1046                .with_label_values(&["higher_last_known_proposed_round"])
1047                .inc();
1048            return false;
1049        }
1050
1051        true
1052    }
1053
1054    // Try to decide which of the certified commits will have to be committed next
1055    // respecting the `limit`. If provided `limit` is zero, it will panic.
1056    // The function returns the list of decided leaders and updates in place the
1057    // remaining certified commits. If empty vector is returned, it means that
1058    // there are no certified commits to be committed as `certified_commits` is
1059    // either empty or all of the certified commits are already committed.
1060    #[tracing::instrument(skip_all)]
1061    fn try_decide_certified(
1062        &mut self,
1063        certified_commits: &mut Vec<CertifiedCommit>,
1064        limit: usize,
1065    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1066        // If GC is disabled then should not run any of this logic.
1067        if !self.dag_state.read().gc_enabled() {
1068            return Vec::new();
1069        }
1070
1071        assert!(limit > 0, "limit should be greater than 0");
1072
1073        let to_commit = if certified_commits.len() >= limit {
1074            // We keep only the number of leaders as dictated by the `limit`
1075            certified_commits.drain(..limit).collect::<Vec<_>>()
1076        } else {
1077            // Otherwise just take all of them and leave the `synced_commits` empty.
1078            mem::take(certified_commits)
1079        };
1080
1081        tracing::debug!(
1082            "Decided {} certified leaders: {}",
1083            to_commit.len(),
1084            to_commit.iter().map(|c| c.leader().to_string()).join(",")
1085        );
1086
1087        let sequenced_leaders = to_commit
1088            .into_iter()
1089            .map(|commit| {
1090                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1091                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1092                let leader = DecidedLeader::Commit(leader.clone());
1093                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1094                (leader, commit)
1095            })
1096            .collect::<Vec<_>>();
1097
1098        sequenced_leaders
1099    }
1100
1101    /// Retrieves the next ancestors to propose to form a block at `clock_round`
1102    /// round.
1103    fn ancestors_to_propose(&mut self, clock_round: Round) -> Vec<VerifiedBlock> {
1104        // Now take the ancestors before the clock_round (excluded) for each authority.
1105        let (ancestors, gc_enabled, gc_round) = {
1106            let dag_state = self.dag_state.read();
1107            (
1108                dag_state.get_last_cached_block_per_authority(clock_round),
1109                dag_state.gc_enabled(),
1110                dag_state.gc_round(),
1111            )
1112        };
1113
1114        assert_eq!(
1115            ancestors.len(),
1116            self.context.committee.size(),
1117            "Fatal error, number of returned ancestors don't match committee size."
1118        );
1119
1120        // Propose only ancestors of higher rounds than what has already been proposed.
1121        // And always include own last proposed block first among ancestors.
1122        let (last_proposed_block, _) = ancestors[self.context.own_index].clone();
1123        assert_eq!(last_proposed_block.author(), self.context.own_index);
1124        let ancestors = iter::once(last_proposed_block)
1125            .chain(
1126                ancestors
1127                    .into_iter()
1128                    .filter(|(block, _)| block.author() != self.context.own_index)
1129                    .filter(|(block, _)| {
1130                        if gc_enabled && gc_round > GENESIS_ROUND {
1131                            return block.round() > gc_round;
1132                        }
1133                        true
1134                    })
1135                    .flat_map(|(block, _)| {
1136                        if let Some(last_block_ref) = self.last_included_ancestors[block.author()] {
1137                            return (last_block_ref.round < block.round()).then_some(block);
1138                        }
1139                        Some(block)
1140                    }),
1141            )
1142            .collect::<Vec<_>>();
1143
1144        // TODO: this is for temporary sanity check - we might want to remove later on
1145        let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1146        for ancestor in ancestors
1147            .iter()
1148            .filter(|block| block.round() == clock_round - 1)
1149        {
1150            quorum.add(ancestor.author(), &self.context.committee);
1151        }
1152        assert!(
1153            quorum.reached_threshold(&self.context.committee),
1154            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1155        );
1156
1157        ancestors
1158    }
1159
1160    /// Retrieves the next ancestors to propose to form a block at `clock_round`
1161    /// round. If smart selection is enabled then this will try to select
1162    /// the best ancestors based on the propagation scores of the
1163    /// authorities.
1164    fn smart_ancestors_to_propose(
1165        &mut self,
1166        clock_round: Round,
1167        smart_select: bool,
1168    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1169        let node_metrics = &self.context.metrics.node_metrics;
1170        let _s = node_metrics
1171            .scope_processing_time
1172            .with_label_values(&["Core::smart_ancestors_to_propose"])
1173            .start_timer();
1174
1175        // Now take the ancestors before the clock_round (excluded) for each authority.
1176        let all_ancestors = self
1177            .dag_state
1178            .read()
1179            .get_last_cached_block_per_authority(clock_round);
1180
1181        assert_eq!(
1182            all_ancestors.len(),
1183            self.context.committee.size(),
1184            "Fatal error, number of returned ancestors don't match committee size."
1185        );
1186
1187        // Ensure ancestor state is up to date before selecting for proposal.
1188        self.ancestor_state_manager.update_all_ancestors_state();
1189        let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1190
1191        let quorum_round = clock_round.saturating_sub(1);
1192
1193        let mut score_and_pending_excluded_ancestors = Vec::new();
1194        let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1195
1196        // Propose only ancestors of higher rounds than what has already been proposed.
1197        // And always include own last proposed block first among ancestors.
1198        // Start by only including the high scoring ancestors. Low scoring ancestors
1199        // will be included in a second pass below.
1200        let included_ancestors = iter::once(self.last_proposed_block().clone())
1201            .chain(
1202                all_ancestors
1203                    .into_iter()
1204                    .flat_map(|(ancestor, equivocating_ancestors)| {
1205                        if ancestor.author() == self.context.own_index {
1206                            return None;
1207                        }
1208                        if let Some(last_block_ref) =
1209                            self.last_included_ancestors[ancestor.author()]
1210                        {
1211                            if last_block_ref.round >= ancestor.round() {
1212                                return None;
1213                            }
1214                        }
1215
1216                        // We will never include equivocating ancestors so add them immediately
1217                        excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1218
1219                        let ancestor_state = ancestor_state_map[ancestor.author()];
1220                        match ancestor_state {
1221                            AncestorState::Include => {
1222                                trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1223                            }
1224                            AncestorState::Exclude(score) => {
1225                                trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1226                                score_and_pending_excluded_ancestors.push((score, ancestor));
1227                                return None;
1228                            }
1229                        }
1230
1231                        Some(ancestor)
1232                    }),
1233            )
1234            .collect::<Vec<_>>();
1235
1236        let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1237
1238        // Check total stake of high scoring parent round ancestors
1239        for ancestor in included_ancestors
1240            .iter()
1241            .filter(|a| a.round() == quorum_round)
1242        {
1243            parent_round_quorum.add(ancestor.author(), &self.context.committee);
1244        }
1245
1246        if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1247            node_metrics.smart_selection_wait.inc();
1248            debug!(
1249                "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1250                parent_round_quorum.stake()
1251            );
1252            return (vec![], BTreeSet::new());
1253        }
1254
1255        // Sort scores descending so we can include the best of the pending excluded
1256        // ancestors first until we reach the threshold.
1257        score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1258
1259        let mut ancestors_to_propose = included_ancestors;
1260        let mut excluded_ancestors = Vec::new();
1261        for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1262            let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1263            if !parent_round_quorum.reached_threshold(&self.context.committee)
1264                && ancestor.round() == quorum_round
1265            {
1266                debug!(
1267                    "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1268                );
1269                parent_round_quorum.add(ancestor.author(), &self.context.committee);
1270                ancestors_to_propose.push(ancestor);
1271                node_metrics
1272                    .included_excluded_proposal_ancestors_count_by_authority
1273                    .with_label_values(&[block_hostname.as_str(), "timeout"])
1274                    .inc();
1275            } else {
1276                excluded_ancestors.push((score, ancestor));
1277            }
1278        }
1279
1280        // Iterate through excluded ancestors and include the ancestor or the ancestor's
1281        // ancestor that has been accepted by a quorum of the network. If the
1282        // original ancestor itself is not included then it will be part of
1283        // excluded ancestors that are not included in the block but will still
1284        // be broadcasted to peers.
1285        for (score, ancestor) in excluded_ancestors.iter() {
1286            let excluded_author = ancestor.author();
1287            let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1288            // A quorum of validators reported to have accepted blocks from the
1289            // excluded_author up to the low quorum round.
1290            let mut accepted_low_quorum_round = self
1291                .ancestor_state_manager
1292                .accepted_quorum_round_per_authority[excluded_author]
1293                .0;
1294            // If the accepted quorum round of this ancestor is greater than or equal
1295            // to the clock round then we want to make sure to set it to clock_round - 1
1296            // as that is the max round the new block can include as an ancestor.
1297            accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1298
1299            let last_included_round = self.last_included_ancestors[excluded_author]
1300                .map(|block_ref| block_ref.round)
1301                .unwrap_or(GENESIS_ROUND);
1302            if ancestor.round() <= last_included_round {
1303                // This should have already been filtered out when filtering all_ancestors.
1304                // Still, ensure previously included ancestors are filtered out.
1305                continue;
1306            }
1307
1308            if last_included_round >= accepted_low_quorum_round {
1309                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1310                trace!(
1311                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1312                    ancestor.reference()
1313                );
1314                node_metrics
1315                    .excluded_proposal_ancestors_count_by_authority
1316                    .with_label_values(&[block_hostname])
1317                    .inc();
1318                continue;
1319            }
1320
1321            let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1322                // Include the ancestor block as it has been seen & accepted by a strong quorum.
1323                ancestor.clone()
1324            } else {
1325                // Exclude this ancestor since it hasn't been accepted by a strong quorum
1326                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1327                trace!(
1328                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1329                    ancestor.reference(),
1330                    ancestor.round()
1331                );
1332                node_metrics
1333                    .excluded_proposal_ancestors_count_by_authority
1334                    .with_label_values(&[block_hostname])
1335                    .inc();
1336
1337                // Look for an earlier block in the ancestor chain that we can include as there
1338                // is a gap between the last included round and the accepted low quorum round.
1339                //
1340                // Note: Only cached blocks need to be propagated. Committed and GC'ed blocks
1341                // do not need to be propagated.
1342                match self.dag_state.read().get_last_cached_block_in_range(
1343                    excluded_author,
1344                    last_included_round + 1,
1345                    accepted_low_quorum_round + 1,
1346                ) {
1347                    Some(earlier_ancestor) => {
1348                        // Found an earlier block that has been propagated well - include it instead
1349                        earlier_ancestor
1350                    }
1351                    None => {
1352                        // No suitable earlier block found
1353                        continue;
1354                    }
1355                }
1356            };
1357            self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1358            ancestors_to_propose.push(ancestor.clone());
1359            trace!(
1360                "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1361                ancestor.reference()
1362            );
1363            node_metrics
1364                .included_excluded_proposal_ancestors_count_by_authority
1365                .with_label_values(&[block_hostname.as_str(), "quorum"])
1366                .inc();
1367        }
1368
1369        assert!(
1370            parent_round_quorum.reached_threshold(&self.context.committee),
1371            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1372        );
1373
1374        info!(
1375            "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1376            ancestors_to_propose.len(),
1377            excluded_and_equivocating_ancestors.len()
1378        );
1379
1380        (ancestors_to_propose, excluded_and_equivocating_ancestors)
1381    }
1382
1383    /// Checks whether all the leaders of the round exist.
1384    /// TODO: we can leverage some additional signal here in order to more
1385    /// cleverly manipulate later the leader timeout Ex if we already have
1386    /// one leader - the first in order - we might don't want to wait as much.
1387    fn leaders_exist(&self, round: Round) -> bool {
1388        let dag_state = self.dag_state.read();
1389        for leader in self.leaders(round) {
1390            // Search for all the leaders. If at least one is not found, then return false.
1391            // A linear search should be fine here as the set of elements is not expected to
1392            // be small enough and more sophisticated data structures might not
1393            // give us much here.
1394            if !dag_state.contains_cached_block_at_slot(leader) {
1395                return false;
1396            }
1397        }
1398
1399        true
1400    }
1401
1402    /// Returns the leaders of the provided round.
1403    fn leaders(&self, round: Round) -> Vec<Slot> {
1404        self.committer
1405            .get_leaders(round)
1406            .into_iter()
1407            .map(|authority_index| Slot::new(round, authority_index))
1408            .collect()
1409    }
1410
1411    /// Returns the 1st leader of the round.
1412    fn first_leader(&self, round: Round) -> AuthorityIndex {
1413        self.leaders(round).first().unwrap().authority
1414    }
1415
1416    fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1417        self.last_proposed_block().timestamp_ms()
1418    }
1419
1420    fn last_proposed_round(&self) -> Round {
1421        self.last_proposed_block().round()
1422    }
1423
1424    fn last_proposed_block(&self) -> VerifiedBlock {
1425        self.dag_state.read().get_last_proposed_block()
1426    }
1427}
1428
1429/// Senders of signals from Core, for outputs and events (ex new block
1430/// produced).
1431pub(crate) struct CoreSignals {
1432    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1433    new_round_sender: watch::Sender<Round>,
1434    context: Arc<Context>,
1435}
1436
1437impl CoreSignals {
1438    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1439        // Blocks buffered in broadcast channel should be roughly equal to thosed cached
1440        // in dag state, since the underlying blocks are ref counted so a lower
1441        // buffer here will not reduce memory usage significantly.
1442        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1443            context.parameters.dag_state_cached_rounds as usize,
1444        );
1445        let (new_round_sender, new_round_receiver) = watch::channel(0);
1446
1447        let me = Self {
1448            tx_block_broadcast,
1449            new_round_sender,
1450            context,
1451        };
1452
1453        let receivers = CoreSignalsReceivers {
1454            rx_block_broadcast,
1455            new_round_receiver,
1456        };
1457
1458        (me, receivers)
1459    }
1460
1461    /// Sends a signal to all the waiters that a new block has been produced.
1462    /// The method will return true if block has reached even one
1463    /// subscriber, false otherwise.
1464    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1465        // When there is only one authority in committee, it is unnecessary to broadcast
1466        // the block which will fail anyway without subscribers to the signal.
1467        if self.context.committee.size() > 1 {
1468            if extended_block.block.round() == GENESIS_ROUND {
1469                debug!("Ignoring broadcasting genesis block to peers");
1470                return Ok(());
1471            }
1472
1473            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1474                warn!("Couldn't broadcast the block to any receiver: {err}");
1475                return Err(ConsensusError::Shutdown);
1476            }
1477        } else {
1478            debug!(
1479                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1480            );
1481        }
1482        Ok(())
1483    }
1484
1485    /// Sends a signal that threshold clock has advanced to new round. The
1486    /// `round_number` is the round at which the threshold clock has
1487    /// advanced to.
1488    pub(crate) fn new_round(&mut self, round_number: Round) {
1489        let _ = self.new_round_sender.send_replace(round_number);
1490    }
1491}
1492
1493/// Receivers of signals from Core.
1494/// Intentionally un-cloneable. Components should only subscribe to channels
1495/// they need.
1496pub(crate) struct CoreSignalsReceivers {
1497    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1498    new_round_receiver: watch::Receiver<Round>,
1499}
1500
1501impl CoreSignalsReceivers {
1502    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1503        self.rx_block_broadcast.resubscribe()
1504    }
1505
1506    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1507        self.new_round_receiver.clone()
1508    }
1509}
1510
1511/// Creates cores for the specified number of authorities for their
1512/// corresponding stakes. The method returns the cores and their respective
1513/// signal receivers are returned in `AuthorityIndex` order asc.
1514#[cfg(test)]
1515pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<CoreTextFixture> {
1516    let mut cores = Vec::new();
1517
1518    for index in 0..authorities.len() {
1519        let own_index = AuthorityIndex::new_for_test(index as u32);
1520        let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
1521        cores.push(core);
1522    }
1523    cores
1524}
1525
1526#[cfg(test)]
1527pub(crate) struct CoreTextFixture {
1528    pub core: Core,
1529    pub signal_receivers: CoreSignalsReceivers,
1530    pub block_receiver: broadcast::Receiver<ExtendedBlock>,
1531    #[expect(unused)]
1532    pub commit_receiver: UnboundedReceiver<CommittedSubDag>,
1533    pub store: Arc<MemStore>,
1534}
1535
1536#[cfg(test)]
1537impl CoreTextFixture {
1538    fn new(
1539        context: Context,
1540        authorities: Vec<Stake>,
1541        own_index: AuthorityIndex,
1542        sync_last_known_own_block: bool,
1543    ) -> Self {
1544        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1545        let mut context = context.clone();
1546        context = context
1547            .with_committee(committee)
1548            .with_authority_index(own_index);
1549        context
1550            .protocol_config
1551            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1552
1553        let context = Arc::new(context);
1554        let store = Arc::new(MemStore::new());
1555        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1556
1557        let block_manager = BlockManager::new(
1558            context.clone(),
1559            dag_state.clone(),
1560            Arc::new(NoopBlockVerifier),
1561        );
1562        let leader_schedule = Arc::new(
1563            LeaderSchedule::from_store(context.clone(), dag_state.clone())
1564                .with_num_commits_per_schedule(10),
1565        );
1566        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1567        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1568        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1569        // Need at least one subscriber to the block broadcast channel.
1570        let block_receiver = signal_receivers.block_broadcast_receiver();
1571
1572        let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
1573        let commit_observer = CommitObserver::new(
1574            context.clone(),
1575            CommitConsumer::new(commit_sender.clone(), 0),
1576            dag_state.clone(),
1577            store.clone(),
1578            leader_schedule.clone(),
1579        );
1580
1581        let block_signer = signers.remove(own_index.value()).1;
1582
1583        let core = Core::new(
1584            context,
1585            leader_schedule,
1586            transaction_consumer,
1587            block_manager,
1588            true,
1589            commit_observer,
1590            signals,
1591            block_signer,
1592            dag_state,
1593            sync_last_known_own_block,
1594        );
1595
1596        Self {
1597            core,
1598            signal_receivers,
1599            block_receiver,
1600            commit_receiver,
1601            store,
1602        }
1603    }
1604}
1605
1606#[cfg(test)]
1607mod test {
1608    use std::{collections::BTreeSet, time::Duration};
1609
1610    use consensus_config::{AuthorityIndex, Parameters};
1611    use futures::{StreamExt, stream::FuturesUnordered};
1612    use iota_metrics::monitored_mpsc::unbounded_channel;
1613    use iota_protocol_config::ProtocolConfig;
1614    use rstest::rstest;
1615    use tokio::time::sleep;
1616
1617    use super::*;
1618    use crate::{
1619        CommitConsumer, CommitIndex,
1620        block::{TestBlock, genesis_blocks},
1621        block_verifier::NoopBlockVerifier,
1622        commit::CommitAPI,
1623        leader_scoring::ReputationScores,
1624        storage::{Store, WriteBatch, mem_store::MemStore},
1625        test_dag_builder::DagBuilder,
1626        test_dag_parser::parse_dag,
1627        transaction::{BlockStatus, TransactionClient},
1628    };
1629
1630    /// Recover Core and continue proposing from the last round which forms a
1631    /// quorum.
1632    #[tokio::test]
1633    async fn test_core_recover_from_store_for_full_round() {
1634        telemetry_subscribers::init_for_testing();
1635        let (context, mut key_pairs) = Context::new_for_test(4);
1636        let context = Arc::new(context);
1637        let store = Arc::new(MemStore::new());
1638        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1639        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1640        let mut block_status_subscriptions = FuturesUnordered::new();
1641
1642        // Create test blocks for all the authorities for 4 rounds and populate them in
1643        // store
1644        let mut last_round_blocks = genesis_blocks(context.clone());
1645        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1646        for round in 1..=4 {
1647            let mut this_round_blocks = Vec::new();
1648            for (index, _authority) in context.committee.authorities() {
1649                let block = VerifiedBlock::new_for_test(
1650                    TestBlock::new(round, index.value() as u32)
1651                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1652                        .build(),
1653                );
1654
1655                // If it's round 1, that one will be committed later on, and it's our "own"
1656                // block, then subscribe to listen for the block status.
1657                if round == 1 && index == context.own_index {
1658                    let subscription =
1659                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1660                    block_status_subscriptions.push(subscription);
1661                }
1662
1663                this_round_blocks.push(block);
1664            }
1665            all_blocks.extend(this_round_blocks.clone());
1666            last_round_blocks = this_round_blocks;
1667        }
1668        // write them in store
1669        store
1670            .write(WriteBatch::default().blocks(all_blocks))
1671            .expect("Storage error");
1672
1673        // create dag state after all blocks have been written to store
1674        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1675        let block_manager = BlockManager::new(
1676            context.clone(),
1677            dag_state.clone(),
1678            Arc::new(NoopBlockVerifier),
1679        );
1680        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1681            context.clone(),
1682            dag_state.clone(),
1683        ));
1684
1685        let (sender, _receiver) = unbounded_channel("consensus_output");
1686        let commit_observer = CommitObserver::new(
1687            context.clone(),
1688            CommitConsumer::new(sender.clone(), 0),
1689            dag_state.clone(),
1690            store.clone(),
1691            leader_schedule.clone(),
1692        );
1693
1694        // Check no commits have been persisted to dag_state or store.
1695        let last_commit = store.read_last_commit().unwrap();
1696        assert!(last_commit.is_none());
1697        assert_eq!(dag_state.read().last_commit_index(), 0);
1698
1699        // Now spin up core
1700        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1701        // Need at least one subscriber to the block broadcast channel.
1702        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1703        let _core = Core::new(
1704            context.clone(),
1705            leader_schedule,
1706            transaction_consumer,
1707            block_manager,
1708            true,
1709            commit_observer,
1710            signals,
1711            key_pairs.remove(context.own_index.value()).1,
1712            dag_state.clone(),
1713            false,
1714        );
1715
1716        // New round should be 5
1717        let mut new_round = signal_receivers.new_round_receiver();
1718        assert_eq!(*new_round.borrow_and_update(), 5);
1719
1720        // Block for round 5 should have been proposed.
1721        let proposed_block = block_receiver
1722            .recv()
1723            .await
1724            .expect("A block should have been created");
1725        assert_eq!(proposed_block.block.round(), 5);
1726        let ancestors = proposed_block.block.ancestors();
1727
1728        // Only ancestors of round 4 should be included.
1729        assert_eq!(ancestors.len(), 4);
1730        for ancestor in ancestors {
1731            assert_eq!(ancestor.round, 4);
1732        }
1733
1734        let last_commit = store
1735            .read_last_commit()
1736            .unwrap()
1737            .expect("last commit should be set");
1738
1739        // There were no commits prior to the core starting up but there was completed
1740        // rounds up to and including round 4. So we should commit leaders in round 1 &
1741        // 2 as soon as the new block for round 5 is proposed.
1742        assert_eq!(last_commit.index(), 2);
1743        assert_eq!(dag_state.read().last_commit_index(), 2);
1744        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1745        assert_eq!(all_stored_commits.len(), 2);
1746
1747        // And ensure that our "own" block 1 sent to TransactionConsumer as notification
1748        // alongside with gc_round
1749        while let Some(result) = block_status_subscriptions.next().await {
1750            let status = result.unwrap();
1751            assert!(matches!(status, BlockStatus::Sequenced(_)));
1752        }
1753    }
1754
1755    /// Recover Core and continue proposing when having a partial last round
1756    /// which doesn't form a quorum and we haven't proposed for that round
1757    /// yet.
1758    #[tokio::test]
1759    async fn test_core_recover_from_store_for_partial_round() {
1760        telemetry_subscribers::init_for_testing();
1761
1762        let (context, mut key_pairs) = Context::new_for_test(4);
1763        let context = Arc::new(context);
1764        let store = Arc::new(MemStore::new());
1765        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1766        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1767
1768        // Create test blocks for all authorities except our's (index = 0).
1769        let mut last_round_blocks = genesis_blocks(context.clone());
1770        let mut all_blocks = last_round_blocks.clone();
1771        for round in 1..=4 {
1772            let mut this_round_blocks = Vec::new();
1773
1774            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of
1775            // position 1 from creating blocks.
1776            let authorities_to_skip = if round == 4 {
1777                context.committee.validity_threshold() as usize
1778            } else {
1779                // otherwise always skip creating a block for our authority
1780                1
1781            };
1782
1783            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1784                let block = TestBlock::new(round, index.value() as u32)
1785                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1786                    .build();
1787                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1788            }
1789            all_blocks.extend(this_round_blocks.clone());
1790            last_round_blocks = this_round_blocks;
1791        }
1792
1793        // write them in store
1794        store
1795            .write(WriteBatch::default().blocks(all_blocks))
1796            .expect("Storage error");
1797
1798        // create dag state after all blocks have been written to store
1799        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1800        let block_manager = BlockManager::new(
1801            context.clone(),
1802            dag_state.clone(),
1803            Arc::new(NoopBlockVerifier),
1804        );
1805        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1806            context.clone(),
1807            dag_state.clone(),
1808        ));
1809
1810        let (sender, _receiver) = unbounded_channel("consensus_output");
1811        let commit_observer = CommitObserver::new(
1812            context.clone(),
1813            CommitConsumer::new(sender.clone(), 0),
1814            dag_state.clone(),
1815            store.clone(),
1816            leader_schedule.clone(),
1817        );
1818
1819        // Check no commits have been persisted to dag_state & store
1820        let last_commit = store.read_last_commit().unwrap();
1821        assert!(last_commit.is_none());
1822        assert_eq!(dag_state.read().last_commit_index(), 0);
1823
1824        // Now spin up core
1825        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1826        // Need at least one subscriber to the block broadcast channel.
1827        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1828        let mut core = Core::new(
1829            context.clone(),
1830            leader_schedule,
1831            transaction_consumer,
1832            block_manager,
1833            true,
1834            commit_observer,
1835            signals,
1836            key_pairs.remove(context.own_index.value()).1,
1837            dag_state.clone(),
1838            false,
1839        );
1840
1841        // Clock round should have advanced to 5 during recovery because
1842        // a quorum has formed in round 4.
1843        let mut new_round = signal_receivers.new_round_receiver();
1844        assert_eq!(*new_round.borrow_and_update(), 5);
1845
1846        // During recovery, round 4 block should have been proposed.
1847        let proposed_block = block_receiver
1848            .recv()
1849            .await
1850            .expect("A block should have been created");
1851        assert_eq!(proposed_block.block.round(), 4);
1852        let ancestors = proposed_block.block.ancestors();
1853
1854        assert_eq!(ancestors.len(), 4);
1855        for ancestor in ancestors {
1856            if ancestor.author == context.own_index {
1857                assert_eq!(ancestor.round, 0);
1858            } else {
1859                assert_eq!(ancestor.round, 3);
1860            }
1861        }
1862
1863        // Run commit rule.
1864        core.try_commit(vec![]).ok();
1865        let last_commit = store
1866            .read_last_commit()
1867            .unwrap()
1868            .expect("last commit should be set");
1869
1870        // There were no commits prior to the core starting up but there was completed
1871        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1872        // as the new block for round 4 is proposed.
1873        assert_eq!(last_commit.index(), 2);
1874        assert_eq!(dag_state.read().last_commit_index(), 2);
1875        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1876        assert_eq!(all_stored_commits.len(), 2);
1877    }
1878
1879    #[tokio::test]
1880    async fn test_core_propose_after_genesis() {
1881        telemetry_subscribers::init_for_testing();
1882        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1883            config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1884            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1885            config
1886        });
1887
1888        let (context, mut key_pairs) = Context::new_for_test(4);
1889        let context = Arc::new(context);
1890        let store = Arc::new(MemStore::new());
1891        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1892
1893        let block_manager = BlockManager::new(
1894            context.clone(),
1895            dag_state.clone(),
1896            Arc::new(NoopBlockVerifier),
1897        );
1898        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1899        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1900        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1901        // Need at least one subscriber to the block broadcast channel.
1902        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1903        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1904            context.clone(),
1905            dag_state.clone(),
1906        ));
1907
1908        let (sender, _receiver) = unbounded_channel("consensus_output");
1909        let commit_observer = CommitObserver::new(
1910            context.clone(),
1911            CommitConsumer::new(sender.clone(), 0),
1912            dag_state.clone(),
1913            store.clone(),
1914            leader_schedule.clone(),
1915        );
1916
1917        let mut core = Core::new(
1918            context.clone(),
1919            leader_schedule,
1920            transaction_consumer,
1921            block_manager,
1922            true,
1923            commit_observer,
1924            signals,
1925            key_pairs.remove(context.own_index.value()).1,
1926            dag_state.clone(),
1927            false,
1928        );
1929
1930        // Send some transactions
1931        let mut total = 0;
1932        let mut index = 0;
1933        loop {
1934            let transaction =
1935                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1936            total += transaction.len();
1937            index += 1;
1938            let _w = transaction_client
1939                .submit_no_wait(vec![transaction])
1940                .await
1941                .unwrap();
1942
1943            // Create total size of transactions up to 1KB
1944            if total >= 1_000 {
1945                break;
1946            }
1947        }
1948
1949        // a new block should have been created during recovery.
1950        let extended_block = block_receiver
1951            .recv()
1952            .await
1953            .expect("A new block should have been created");
1954
1955        // A new block created - assert the details
1956        assert_eq!(extended_block.block.round(), 1);
1957        assert_eq!(extended_block.block.author().value(), 0);
1958        assert_eq!(extended_block.block.ancestors().len(), 4);
1959
1960        let mut total = 0;
1961        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1962            total += transaction.data().len() as u64;
1963            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1964            assert_eq!(format!("Transaction {i}"), transaction);
1965        }
1966        assert!(
1967            total
1968                <= context
1969                    .protocol_config
1970                    .consensus_max_transactions_in_block_bytes()
1971        );
1972
1973        // genesis blocks should be referenced
1974        let all_genesis = genesis_blocks(context);
1975
1976        for ancestor in extended_block.block.ancestors() {
1977            all_genesis
1978                .iter()
1979                .find(|block| block.reference() == *ancestor)
1980                .expect("Block should be found amongst genesis blocks");
1981        }
1982
1983        // Try to propose again - with or without ignore leaders check, it will not
1984        // return any block
1985        assert!(core.try_propose(false).unwrap().is_none());
1986        assert!(core.try_propose(true).unwrap().is_none());
1987
1988        // Check no commits have been persisted to dag_state & store
1989        let last_commit = store.read_last_commit().unwrap();
1990        assert!(last_commit.is_none());
1991        assert_eq!(dag_state.read().last_commit_index(), 0);
1992    }
1993
1994    #[tokio::test]
1995    async fn test_core_propose_once_receiving_a_quorum() {
1996        telemetry_subscribers::init_for_testing();
1997        let (context, mut key_pairs) = Context::new_for_test(4);
1998        let context = Arc::new(context);
1999
2000        let store = Arc::new(MemStore::new());
2001        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2002
2003        let block_manager = BlockManager::new(
2004            context.clone(),
2005            dag_state.clone(),
2006            Arc::new(NoopBlockVerifier),
2007        );
2008        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2009            context.clone(),
2010            dag_state.clone(),
2011        ));
2012
2013        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2014        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2015        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2016        // Need at least one subscriber to the block broadcast channel.
2017        let _block_receiver = signal_receivers.block_broadcast_receiver();
2018
2019        let (sender, _receiver) = unbounded_channel("consensus_output");
2020        let commit_observer = CommitObserver::new(
2021            context.clone(),
2022            CommitConsumer::new(sender.clone(), 0),
2023            dag_state.clone(),
2024            store.clone(),
2025            leader_schedule.clone(),
2026        );
2027
2028        let mut core = Core::new(
2029            context.clone(),
2030            leader_schedule,
2031            transaction_consumer,
2032            block_manager,
2033            true,
2034            commit_observer,
2035            signals,
2036            key_pairs.remove(context.own_index.value()).1,
2037            dag_state.clone(),
2038            false,
2039        );
2040
2041        let mut expected_ancestors = BTreeSet::new();
2042
2043        // Adding one block now will trigger the creation of new block for round 1
2044        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
2045        expected_ancestors.insert(block_1.reference());
2046        // Wait for min round delay to allow blocks to be proposed.
2047        sleep(context.parameters.min_round_delay).await;
2048        // add blocks to trigger proposal.
2049        _ = core.add_blocks(vec![block_1]);
2050
2051        assert_eq!(core.last_proposed_round(), 1);
2052        expected_ancestors.insert(core.last_proposed_block().reference());
2053        // attempt to create a block - none will be produced.
2054        assert!(core.try_propose(false).unwrap().is_none());
2055
2056        // Adding another block now forms a quorum for round 1, so block at round 2 will
2057        // proposed
2058        let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
2059        expected_ancestors.insert(block_3.reference());
2060        // Wait for min round delay to allow blocks to be proposed.
2061        sleep(context.parameters.min_round_delay).await;
2062        // add blocks to trigger proposal.
2063        _ = core.add_blocks(vec![block_3]);
2064
2065        assert_eq!(core.last_proposed_round(), 2);
2066
2067        let proposed_block = core.last_proposed_block();
2068        assert_eq!(proposed_block.round(), 2);
2069        assert_eq!(proposed_block.author(), context.own_index);
2070        assert_eq!(proposed_block.ancestors().len(), 3);
2071        let ancestors = proposed_block.ancestors();
2072        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
2073        assert_eq!(ancestors, expected_ancestors);
2074
2075        // Check no commits have been persisted to dag_state & store
2076        let last_commit = store.read_last_commit().unwrap();
2077        assert!(last_commit.is_none());
2078        assert_eq!(dag_state.read().last_commit_index(), 0);
2079    }
2080
2081    #[rstest]
2082    #[tokio::test]
2083    async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
2084        telemetry_subscribers::init_for_testing();
2085        let (mut context, mut key_pairs) = Context::new_for_test(4);
2086
2087        if gc_depth > 0 {
2088            context
2089                .protocol_config
2090                .set_consensus_gc_depth_for_testing(gc_depth);
2091        }
2092
2093        let context = Arc::new(context);
2094
2095        let store = Arc::new(MemStore::new());
2096        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2097        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2098        let mut block_status_subscriptions = FuturesUnordered::new();
2099
2100        let dag_str = "DAG {
2101            Round 0 : { 4 },
2102            Round 1 : { * },
2103            Round 2 : { * },
2104            Round 3 : {
2105                A -> [*],
2106                B -> [-A2],
2107                C -> [-A2],
2108                D -> [-A2],
2109            },
2110            Round 4 : {
2111                B -> [-A3],
2112                C -> [-A3],
2113                D -> [-A3],
2114            },
2115            Round 5 : {
2116                A -> [A3, B4, C4, D4]
2117                B -> [*],
2118                C -> [*],
2119                D -> [*],
2120            },
2121            Round 6 : { * },
2122            Round 7 : { * },
2123            Round 8 : { * },
2124        }";
2125
2126        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2127        dag_builder.print();
2128
2129        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be
2130        // able to commit up to round 5.
2131        for block in dag_builder.blocks(1..=5) {
2132            if block.author() == context.own_index {
2133                let subscription =
2134                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
2135                block_status_subscriptions.push(subscription);
2136            }
2137        }
2138
2139        // write them in store
2140        store
2141            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2142            .expect("Storage error");
2143
2144        // create dag state after all blocks have been written to store
2145        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2146        let block_manager = BlockManager::new(
2147            context.clone(),
2148            dag_state.clone(),
2149            Arc::new(NoopBlockVerifier),
2150        );
2151        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2152            context.clone(),
2153            dag_state.clone(),
2154        ));
2155
2156        let (sender, _receiver) = unbounded_channel("consensus_output");
2157        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2158        let commit_observer = CommitObserver::new(
2159            context.clone(),
2160            commit_consumer,
2161            dag_state.clone(),
2162            store.clone(),
2163            leader_schedule.clone(),
2164        );
2165
2166        // Check no commits have been persisted to dag_state or store.
2167        let last_commit = store.read_last_commit().unwrap();
2168        assert!(last_commit.is_none());
2169        assert_eq!(dag_state.read().last_commit_index(), 0);
2170
2171        // Now spin up core
2172        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2173        // Need at least one subscriber to the block broadcast channel.
2174        let _block_receiver = signal_receivers.block_broadcast_receiver();
2175        let _core = Core::new(
2176            context.clone(),
2177            leader_schedule,
2178            transaction_consumer,
2179            block_manager,
2180            true,
2181            commit_observer,
2182            signals,
2183            key_pairs.remove(context.own_index.value()).1,
2184            dag_state.clone(),
2185            false,
2186        );
2187
2188        let last_commit = store
2189            .read_last_commit()
2190            .unwrap()
2191            .expect("last commit should be set");
2192
2193        assert_eq!(last_commit.index(), 5);
2194
2195        while let Some(result) = block_status_subscriptions.next().await {
2196            let status = result.unwrap();
2197
2198            // If gc is enabled, then we expect some blocks to be garbage collected.
2199            if gc_depth > 0 {
2200                match status {
2201                    BlockStatus::Sequenced(block_ref) => {
2202                        assert!(block_ref.round == 1 || block_ref.round == 5);
2203                    }
2204                    BlockStatus::GarbageCollected(block_ref) => {
2205                        assert!(block_ref.round == 2 || block_ref.round == 3);
2206                    }
2207                }
2208            } else {
2209                // otherwise all of them should be committed
2210                assert!(matches!(status, BlockStatus::Sequenced(_)));
2211            }
2212        }
2213    }
2214
2215    // Tests that the threshold clock advances when blocks get unsuspended due to
2216    // GC'ed blocks and newly created blocks are always higher than the last
2217    // advanced gc round.
2218    #[tokio::test]
2219    async fn test_multiple_commits_advance_threshold_clock() {
2220        telemetry_subscribers::init_for_testing();
2221        let (mut context, mut key_pairs) = Context::new_for_test(4);
2222        const GC_DEPTH: u32 = 2;
2223
2224        context
2225            .protocol_config
2226            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2227
2228        let context = Arc::new(context);
2229
2230        let store = Arc::new(MemStore::new());
2231        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2232        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2233
2234        // On round 1 we do produce the block for authority D but we do not link it
2235        // until round 6. This is making round 6 unable to get processed
2236        // until leader of round 3 is committed where round 1 gets garbage collected.
2237        // Then we add more rounds so we can trigger a commit for leader of round 9
2238        // which will move the gc round to 7.
2239        let dag_str = "DAG {
2240            Round 0 : { 4 },
2241            Round 1 : { * },
2242            Round 2 : { 
2243                B -> [-D1],
2244                C -> [-D1],
2245                D -> [-D1],
2246            },
2247            Round 3 : {
2248                B -> [*],
2249                C -> [*]
2250                D -> [*],
2251            },
2252            Round 4 : { 
2253                A -> [*],
2254                B -> [*],
2255                C -> [*]
2256                D -> [*],
2257            },
2258            Round 5 : { 
2259                B -> [*],
2260                C -> [*],
2261                D -> [*],
2262            },
2263            Round 6 : { 
2264                B -> [A6, B6, C6, D1],
2265                C -> [A6, B6, C6, D1],
2266                D -> [A6, B6, C6, D1],
2267            },
2268            Round 7 : { 
2269                B -> [*],
2270                C -> [*],
2271                D -> [*],
2272            },
2273            Round 8 : { 
2274                B -> [*],
2275                C -> [*],
2276                D -> [*],
2277            },
2278            Round 9 : { 
2279                B -> [*],
2280                C -> [*],
2281                D -> [*],
2282            },
2283            Round 10 : { 
2284                B -> [*],
2285                C -> [*],
2286                D -> [*],
2287            },
2288            Round 11 : { 
2289                B -> [*],
2290                C -> [*],
2291                D -> [*],
2292            },
2293        }";
2294
2295        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2296        dag_builder.print();
2297
2298        // create dag state after all blocks have been written to store
2299        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2300        let block_manager = BlockManager::new(
2301            context.clone(),
2302            dag_state.clone(),
2303            Arc::new(NoopBlockVerifier),
2304        );
2305        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2306            context.clone(),
2307            dag_state.clone(),
2308        ));
2309        let (sender, _receiver) = unbounded_channel("consensus_output");
2310        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
2311        let commit_observer = CommitObserver::new(
2312            context.clone(),
2313            commit_consumer,
2314            dag_state.clone(),
2315            store.clone(),
2316            leader_schedule.clone(),
2317        );
2318
2319        // Check no commits have been persisted to dag_state or store.
2320        let last_commit = store.read_last_commit().unwrap();
2321        assert!(last_commit.is_none());
2322        assert_eq!(dag_state.read().last_commit_index(), 0);
2323
2324        // Now spin up core
2325        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2326        // Need at least one subscriber to the block broadcast channel.
2327        let _block_receiver = signal_receivers.block_broadcast_receiver();
2328        let mut core = Core::new(
2329            context.clone(),
2330            leader_schedule,
2331            transaction_consumer,
2332            block_manager,
2333            true,
2334            commit_observer,
2335            signals,
2336            key_pairs.remove(context.own_index.value()).1,
2337            dag_state.clone(),
2338            true,
2339        );
2340        // We set the last known round to 4 so we avoid creating new blocks until then -
2341        // otherwise it will crash as the already created DAG contains blocks for this
2342        // authority.
2343        core.set_last_known_proposed_round(4);
2344
2345        // We add all the blocks except D1. The only ones we can immediately accept are
2346        // the ones up to round 5 as they don't have a dependency on D1. Rest of blocks
2347        // do have causal dependency to D1 so they can't be processed until the
2348        // leader of round 3 can get committed and gc round moves to 1. That will make
2349        // all the blocks that depend to D1 get accepted. However, our threshold
2350        // clock is now at round 6 as the last quorum that we managed to process was the
2351        // round 5. As commits happen blocks of later rounds get accepted and
2352        // more leaders get committed. Eventually the leader of round 9 gets committed
2353        // and gc is moved to 9 - 2 = 7. If our node attempts to produce a block
2354        // for the threshold clock 6, that will make the acceptance checks fail as now
2355        // gc has moved far past this round.
2356        core.add_blocks(
2357            dag_builder
2358                .blocks(1..=11)
2359                .into_iter()
2360                .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2361                .collect(),
2362        )
2363        .expect("Should not fail");
2364
2365        assert_eq!(core.last_proposed_round(), 12);
2366    }
2367
2368    #[tokio::test]
2369    async fn test_core_set_min_propose_round() {
2370        telemetry_subscribers::init_for_testing();
2371        let (context, mut key_pairs) = Context::new_for_test(4);
2372        let context = Arc::new(context.with_parameters(Parameters {
2373            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2374            ..Default::default()
2375        }));
2376
2377        let store = Arc::new(MemStore::new());
2378        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2379
2380        let block_manager = BlockManager::new(
2381            context.clone(),
2382            dag_state.clone(),
2383            Arc::new(NoopBlockVerifier),
2384        );
2385        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2386            context.clone(),
2387            dag_state.clone(),
2388        ));
2389
2390        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2391        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2392        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2393        // Need at least one subscriber to the block broadcast channel.
2394        let _block_receiver = signal_receivers.block_broadcast_receiver();
2395
2396        let (sender, _receiver) = unbounded_channel("consensus_output");
2397        let commit_observer = CommitObserver::new(
2398            context.clone(),
2399            CommitConsumer::new(sender.clone(), 0),
2400            dag_state.clone(),
2401            store.clone(),
2402            leader_schedule.clone(),
2403        );
2404
2405        let mut core = Core::new(
2406            context.clone(),
2407            leader_schedule,
2408            transaction_consumer,
2409            block_manager,
2410            true,
2411            commit_observer,
2412            signals,
2413            key_pairs.remove(context.own_index.value()).1,
2414            dag_state.clone(),
2415            true,
2416        );
2417
2418        // No new block should have been produced
2419        assert_eq!(
2420            core.last_proposed_round(),
2421            GENESIS_ROUND,
2422            "No block should have been created other than genesis"
2423        );
2424
2425        // Trying to explicitly propose a block will not produce anything
2426        assert!(core.try_propose(true).unwrap().is_none());
2427
2428        // Create blocks for the whole network - even "our" node in order to replicate
2429        // an "amnesia" recovery.
2430        let mut builder = DagBuilder::new(context.clone());
2431        builder.layers(1..=10).build();
2432
2433        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2434
2435        // Process all the blocks
2436        assert!(core.add_blocks(blocks).unwrap().is_empty());
2437
2438        // Try to propose - no block should be produced.
2439        assert!(core.try_propose(true).unwrap().is_none());
2440
2441        // Now set the last known proposed round which is the highest round for which
2442        // the network informed us that we do have proposed a block about.
2443        core.set_last_known_proposed_round(10);
2444
2445        let block = core.try_propose(true).expect("No error").unwrap();
2446        assert_eq!(block.round(), 11);
2447        assert_eq!(block.ancestors().len(), 4);
2448
2449        let our_ancestor_included = block.ancestors()[0];
2450        assert_eq!(our_ancestor_included.author, context.own_index);
2451        assert_eq!(our_ancestor_included.round, 10);
2452    }
2453
2454    #[tokio::test(flavor = "current_thread", start_paused = true)]
2455    async fn test_core_try_new_block_leader_timeout() {
2456        telemetry_subscribers::init_for_testing();
2457
2458        // Since we run the test with started_paused = true, any time-dependent
2459        // operations using Tokio's time facilities, such as tokio::time::sleep
2460        // or tokio::time::Instant, will not advance. So practically each Core's
2461        // clock will have initialised potentially with different values but it never
2462        // advances. To ensure that blocks won't get rejected by cores we'll
2463        // need to manually wait for the time diff before processing them. By
2464        // calling the `tokio::time::sleep` we implicitly also advance the tokio
2465        // clock.
2466        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2467            // Simulate the time wait before processing a block to ensure that
2468            // block.timestamp <= now
2469            let now = context.clock.timestamp_utc_ms();
2470            let max_timestamp = blocks
2471                .iter()
2472                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2473                .map(|block| block.timestamp_ms())
2474                .unwrap_or(0);
2475
2476            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2477            sleep(wait_time).await;
2478        }
2479
2480        let (context, _) = Context::new_for_test(4);
2481        // Create the cores for all authorities
2482        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2483
2484        // Create blocks for rounds 1..=3 from all Cores except last Core of authority
2485        // 3, so we miss the block from it. As it will be the leader of round 3
2486        // then no-one will be able to progress to round 4 unless we explicitly trigger
2487        // the block creation.
2488        // create the cores and their signals for all the authorities
2489        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2490
2491        // Now iterate over a few rounds and ensure the corresponding signals are
2492        // created while network advances
2493        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2494        for round in 1..=3 {
2495            let mut this_round_blocks = Vec::new();
2496
2497            for core_fixture in cores.iter_mut() {
2498                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2499
2500                core_fixture
2501                    .core
2502                    .add_blocks(last_round_blocks.clone())
2503                    .unwrap();
2504
2505                // Only when round > 1 and using non-genesis parents.
2506                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2507                    assert_eq!(round - 1, r);
2508                    if core_fixture.core.last_proposed_round() == r {
2509                        // Force propose new block regardless of min round delay.
2510                        core_fixture
2511                            .core
2512                            .try_propose(true)
2513                            .unwrap()
2514                            .unwrap_or_else(|| {
2515                                panic!("Block should have been proposed for round {round}")
2516                            });
2517                    }
2518                }
2519
2520                assert_eq!(core_fixture.core.last_proposed_round(), round);
2521
2522                this_round_blocks.push(core_fixture.core.last_proposed_block());
2523            }
2524
2525            last_round_blocks = this_round_blocks;
2526        }
2527
2528        // Try to create the blocks for round 4 by calling the try_propose() method. No
2529        // block should be created as the leader - authority 3 - hasn't proposed
2530        // any block.
2531        for core_fixture in cores.iter_mut() {
2532            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2533
2534            core_fixture
2535                .core
2536                .add_blocks(last_round_blocks.clone())
2537                .unwrap();
2538            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2539        }
2540
2541        // Now try to create the blocks for round 4 via the leader timeout method which
2542        // should ignore any leader checks or min round delay.
2543        for core_fixture in cores.iter_mut() {
2544            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2545            assert_eq!(core_fixture.core.last_proposed_round(), 4);
2546
2547            // Check commits have been persisted to store
2548            let last_commit = core_fixture
2549                .store
2550                .read_last_commit()
2551                .unwrap()
2552                .expect("last commit should be set");
2553            // There are 1 leader rounds with rounds completed up to and including
2554            // round 4
2555            assert_eq!(last_commit.index(), 1);
2556            let all_stored_commits = core_fixture
2557                .store
2558                .scan_commits((0..=CommitIndex::MAX).into())
2559                .unwrap();
2560            assert_eq!(all_stored_commits.len(), 1);
2561        }
2562    }
2563
2564    #[tokio::test(flavor = "current_thread", start_paused = true)]
2565    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2566        telemetry_subscribers::init_for_testing();
2567
2568        // Since we run the test with started_paused = true, any time-dependent
2569        // operations using Tokio's time facilities, such as tokio::time::sleep
2570        // or tokio::time::Instant, will not advance. So practically each Core's
2571        // clock will have initialised potentially with different values but it never
2572        // advances. To ensure that blocks won't get rejected by cores we'll
2573        // need to manually wait for the time diff before processing them. By
2574        // calling the `tokio::time::sleep` we implicitly also advance the tokio
2575        // clock.
2576        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2577            // Simulate the time wait before processing a block to ensure that
2578            // block.timestamp <= now
2579            let now = context.clock.timestamp_utc_ms();
2580            let max_timestamp = blocks
2581                .iter()
2582                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2583                .map(|block| block.timestamp_ms())
2584                .unwrap_or(0);
2585
2586            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2587            sleep(wait_time).await;
2588        }
2589
2590        let (mut context, _) = Context::new_for_test(4);
2591        context
2592            .protocol_config
2593            .set_consensus_smart_ancestor_selection_for_testing(true);
2594        context
2595            .protocol_config
2596            .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2597
2598        // Create the cores for all authorities
2599        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]);
2600        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2601
2602        // Create blocks for rounds 1..=30 from all Cores except last Core of authority
2603        // 3.
2604        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2605        for round in 1..=30 {
2606            let mut this_round_blocks = Vec::new();
2607
2608            for core_fixture in cores.iter_mut() {
2609                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2610
2611                core_fixture
2612                    .core
2613                    .add_blocks(last_round_blocks.clone())
2614                    .unwrap();
2615
2616                // Only when round > 1 and using non-genesis parents.
2617                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2618                    assert_eq!(round - 1, r);
2619                    if core_fixture.core.last_proposed_round() == r {
2620                        // Force propose new block regardless of min round delay.
2621                        core_fixture
2622                            .core
2623                            .try_propose(true)
2624                            .unwrap()
2625                            .unwrap_or_else(|| {
2626                                panic!("Block should have been proposed for round {round}")
2627                            });
2628                    }
2629                }
2630
2631                assert_eq!(core_fixture.core.last_proposed_round(), round);
2632
2633                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2634            }
2635
2636            last_round_blocks = this_round_blocks;
2637        }
2638
2639        // Now produce blocks for all Cores
2640        for round in 31..=40 {
2641            let mut this_round_blocks = Vec::new();
2642
2643            for core_fixture in all_cores.iter_mut() {
2644                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2645
2646                core_fixture
2647                    .core
2648                    .add_blocks(last_round_blocks.clone())
2649                    .unwrap();
2650
2651                // Only when round > 1 and using non-genesis parents.
2652                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2653                    assert_eq!(round - 1, r);
2654                    if core_fixture.core.last_proposed_round() == r {
2655                        // Force propose new block regardless of min round delay.
2656                        core_fixture
2657                            .core
2658                            .try_propose(true)
2659                            .unwrap()
2660                            .unwrap_or_else(|| {
2661                                panic!("Block should have been proposed for round {round}")
2662                            });
2663                    }
2664                }
2665
2666                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2667
2668                for block in this_round_blocks.iter() {
2669                    if block.author() != AuthorityIndex::new_for_test(3) {
2670                        // Assert blocks created include only 3 ancestors per block as one
2671                        // should be excluded
2672                        assert_eq!(block.ancestors().len(), 3);
2673                    } else {
2674                        // Authority 3 is the low scoring authority so it will still include
2675                        // its own blocks.
2676                        assert_eq!(block.ancestors().len(), 4);
2677                    }
2678                }
2679            }
2680
2681            last_round_blocks = this_round_blocks;
2682        }
2683    }
2684
2685    #[tokio::test]
2686    async fn test_smart_ancestor_selection() {
2687        telemetry_subscribers::init_for_testing();
2688        let (mut context, mut key_pairs) = Context::new_for_test(7);
2689        context
2690            .protocol_config
2691            .set_consensus_smart_ancestor_selection_for_testing(true);
2692        context
2693            .protocol_config
2694            .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2695        let context = Arc::new(context.with_parameters(Parameters {
2696            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2697            ..Default::default()
2698        }));
2699
2700        let store = Arc::new(MemStore::new());
2701        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2702
2703        let block_manager = BlockManager::new(
2704            context.clone(),
2705            dag_state.clone(),
2706            Arc::new(NoopBlockVerifier),
2707        );
2708        let leader_schedule = Arc::new(
2709            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2710                .with_num_commits_per_schedule(10),
2711        );
2712
2713        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2714        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2715        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2716        // Need at least one subscriber to the block broadcast channel.
2717        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2718
2719        let (sender, _receiver) = unbounded_channel("consensus_output");
2720        let commit_consumer = CommitConsumer::new(sender, 0);
2721        let commit_observer = CommitObserver::new(
2722            context.clone(),
2723            commit_consumer,
2724            dag_state.clone(),
2725            store.clone(),
2726            leader_schedule.clone(),
2727        );
2728
2729        let mut core = Core::new(
2730            context.clone(),
2731            leader_schedule,
2732            transaction_consumer,
2733            block_manager,
2734            true,
2735            commit_observer,
2736            signals,
2737            key_pairs.remove(context.own_index.value()).1,
2738            dag_state.clone(),
2739            true,
2740        );
2741
2742        // No new block should have been produced
2743        assert_eq!(
2744            core.last_proposed_round(),
2745            GENESIS_ROUND,
2746            "No block should have been created other than genesis"
2747        );
2748
2749        // Trying to explicitly propose a block will not produce anything
2750        assert!(core.try_propose(true).unwrap().is_none());
2751
2752        // Create blocks for the whole network but not for authority 1
2753        let mut builder = DagBuilder::new(context.clone());
2754        builder
2755            .layers(1..=12)
2756            .authorities(vec![AuthorityIndex::new_for_test(1)])
2757            .skip_block()
2758            .build();
2759        let blocks = builder.blocks(1..=12);
2760        // Process all the blocks
2761        assert!(core.add_blocks(blocks).unwrap().is_empty());
2762        core.set_last_known_proposed_round(12);
2763
2764        let block = core.try_propose(true).expect("No error").unwrap();
2765        assert_eq!(block.round(), 13);
2766        assert_eq!(block.ancestors().len(), 7);
2767
2768        // Build blocks for rest of the network other than own index
2769        builder
2770            .layers(13..=14)
2771            .authorities(vec![AuthorityIndex::new_for_test(0)])
2772            .skip_block()
2773            .build();
2774        let blocks = builder.blocks(13..=14);
2775        assert!(core.add_blocks(blocks).unwrap().is_empty());
2776
2777        // We now have triggered a leader schedule change so we should have
2778        // one EXCLUDE authority (1) when we go to select ancestors for the next
2779        // proposal
2780        let block = core.try_propose(true).expect("No error").unwrap();
2781        assert_eq!(block.round(), 15);
2782        assert_eq!(block.ancestors().len(), 6);
2783
2784        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2785        // which will trigger smart select and we will not propose a block
2786        builder
2787            .layer(15)
2788            .authorities(vec![
2789                AuthorityIndex::new_for_test(0),
2790                AuthorityIndex::new_for_test(5),
2791                AuthorityIndex::new_for_test(6),
2792            ])
2793            .skip_block()
2794            .build();
2795        let blocks = builder.blocks(15..=15);
2796        let authority_1_excluded_block_reference = blocks
2797            .iter()
2798            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2799            .unwrap()
2800            .reference();
2801        // Wait for min round delay to allow blocks to be proposed.
2802        sleep(context.parameters.min_round_delay).await;
2803        // Smart select should be triggered and no block should be proposed.
2804        assert!(core.add_blocks(blocks).unwrap().is_empty());
2805        assert_eq!(core.last_proposed_block().round(), 15);
2806
2807        builder
2808            .layer(15)
2809            .authorities(vec![
2810                AuthorityIndex::new_for_test(0),
2811                AuthorityIndex::new_for_test(1),
2812                AuthorityIndex::new_for_test(2),
2813                AuthorityIndex::new_for_test(3),
2814                AuthorityIndex::new_for_test(4),
2815            ])
2816            .skip_block()
2817            .build();
2818        let blocks = builder.blocks(15..=15);
2819        let included_block_references = iter::once(&core.last_proposed_block())
2820            .chain(blocks.iter())
2821            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2822            .map(|block| block.reference())
2823            .collect::<Vec<_>>();
2824
2825        // Have enough ancestor blocks to propose now.
2826        assert!(core.add_blocks(blocks).unwrap().is_empty());
2827        assert_eq!(core.last_proposed_block().round(), 16);
2828
2829        // Check that a new block has been proposed & signaled.
2830        let extended_block = loop {
2831            let extended_block =
2832                tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2833                    .await
2834                    .unwrap()
2835                    .unwrap();
2836            if extended_block.block.round() == 16 {
2837                break extended_block;
2838            }
2839        };
2840        assert_eq!(extended_block.block.round(), 16);
2841        assert_eq!(extended_block.block.author(), core.context.own_index);
2842        assert_eq!(extended_block.block.ancestors().len(), 6);
2843        assert_eq!(extended_block.block.ancestors(), included_block_references);
2844        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2845        assert_eq!(
2846            extended_block.excluded_ancestors[0],
2847            authority_1_excluded_block_reference
2848        );
2849
2850        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2851        // which will trigger smart select and we will not propose a block.
2852        // This time we will force propose by hitting the leader timeout after which
2853        // should cause us to include this EXCLUDE ancestor.
2854        builder
2855            .layer(16)
2856            .authorities(vec![
2857                AuthorityIndex::new_for_test(0),
2858                AuthorityIndex::new_for_test(5),
2859                AuthorityIndex::new_for_test(6),
2860            ])
2861            .skip_block()
2862            .build();
2863        let blocks = builder.blocks(16..=16);
2864        // Wait for leader timeout to force blocks to be proposed.
2865        sleep(context.parameters.min_round_delay).await;
2866        // Smart select should be triggered and no block should be proposed.
2867        assert!(core.add_blocks(blocks).unwrap().is_empty());
2868        assert_eq!(core.last_proposed_block().round(), 16);
2869
2870        // Simulate a leader timeout and a force proposal where we will include
2871        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2872        let block = core.try_propose(true).expect("No error").unwrap();
2873        assert_eq!(block.round(), 17);
2874        assert_eq!(block.ancestors().len(), 5);
2875
2876        // Check that a new block has been proposed & signaled.
2877        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2878            .await
2879            .unwrap()
2880            .unwrap();
2881        assert_eq!(extended_block.block.round(), 17);
2882        assert_eq!(extended_block.block.author(), core.context.own_index);
2883        assert_eq!(extended_block.block.ancestors().len(), 5);
2884        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2885
2886        // Set quorum rounds for authority which will unlock the Excluded
2887        // authority (1) and then we should be able to create a new layer of blocks
2888        // which will then all be included as ancestors for the next proposal
2889        core.set_propagation_delay_and_quorum_rounds(
2890            0,
2891            vec![
2892                (16, 16),
2893                (16, 16),
2894                (16, 16),
2895                (16, 16),
2896                (16, 16),
2897                (16, 16),
2898                (16, 16),
2899            ],
2900            vec![
2901                (16, 16),
2902                (16, 16),
2903                (16, 16),
2904                (16, 16),
2905                (16, 16),
2906                (16, 16),
2907                (16, 16),
2908            ],
2909        );
2910
2911        builder
2912            .layer(17)
2913            .authorities(vec![AuthorityIndex::new_for_test(0)])
2914            .skip_block()
2915            .build();
2916        let blocks = builder.blocks(17..=17);
2917        let included_block_references = iter::once(&core.last_proposed_block())
2918            .chain(blocks.iter())
2919            .map(|block| block.reference())
2920            .collect::<Vec<_>>();
2921
2922        // Have enough ancestor blocks to propose now.
2923        sleep(context.parameters.min_round_delay).await;
2924        assert!(core.add_blocks(blocks).unwrap().is_empty());
2925        assert_eq!(core.last_proposed_block().round(), 18);
2926
2927        // Check that a new block has been proposed & signaled.
2928        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2929            .await
2930            .unwrap()
2931            .unwrap();
2932        assert_eq!(extended_block.block.round(), 18);
2933        assert_eq!(extended_block.block.author(), core.context.own_index);
2934        assert_eq!(extended_block.block.ancestors().len(), 7);
2935        assert_eq!(extended_block.block.ancestors(), included_block_references);
2936        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2937    }
2938
2939    #[tokio::test]
2940    async fn test_excluded_ancestor_limit() {
2941        telemetry_subscribers::init_for_testing();
2942        let (mut context, mut key_pairs) = Context::new_for_test(4);
2943        context
2944            .protocol_config
2945            .set_consensus_smart_ancestor_selection_for_testing(true);
2946        context
2947            .protocol_config
2948            .set_consensus_distributed_vote_scoring_strategy_for_testing(true);
2949        let context = Arc::new(context.with_parameters(Parameters {
2950            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2951            ..Default::default()
2952        }));
2953
2954        let store = Arc::new(MemStore::new());
2955        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2956
2957        let block_manager = BlockManager::new(
2958            context.clone(),
2959            dag_state.clone(),
2960            Arc::new(NoopBlockVerifier),
2961        );
2962        let leader_schedule = Arc::new(
2963            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2964                .with_num_commits_per_schedule(10),
2965        );
2966
2967        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2968        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2969        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2970        // Need at least one subscriber to the block broadcast channel.
2971        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2972
2973        let (sender, _receiver) = unbounded_channel("consensus_output");
2974        let commit_consumer = CommitConsumer::new(sender, 0);
2975        let commit_observer = CommitObserver::new(
2976            context.clone(),
2977            commit_consumer,
2978            dag_state.clone(),
2979            store.clone(),
2980            leader_schedule.clone(),
2981        );
2982
2983        let mut core = Core::new(
2984            context.clone(),
2985            leader_schedule,
2986            transaction_consumer,
2987            block_manager,
2988            true,
2989            commit_observer,
2990            signals,
2991            key_pairs.remove(context.own_index.value()).1,
2992            dag_state.clone(),
2993            true,
2994        );
2995
2996        // No new block should have been produced
2997        assert_eq!(
2998            core.last_proposed_round(),
2999            GENESIS_ROUND,
3000            "No block should have been created other than genesis"
3001        );
3002
3003        // Create blocks for the whole network
3004        let mut builder = DagBuilder::new(context.clone());
3005        builder.layers(1..=3).build();
3006
3007        // This will equivocate 9 blocks for authority 1 which will be excluded on
3008        // the proposal but because of the limits set will be dropped and not included
3009        // as part of the ExtendedBlock structure sent to the rest of the network
3010        builder
3011            .layer(4)
3012            .authorities(vec![AuthorityIndex::new_for_test(1)])
3013            .equivocate(9)
3014            .build();
3015        let blocks = builder.blocks(1..=4);
3016
3017        // Process all the blocks
3018        assert!(core.add_blocks(blocks).unwrap().is_empty());
3019        core.set_last_known_proposed_round(3);
3020
3021        let block = core.try_propose(true).expect("No error").unwrap();
3022        assert_eq!(block.round(), 5);
3023        assert_eq!(block.ancestors().len(), 4);
3024
3025        // Check that a new block has been proposed & signaled.
3026        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3027            .await
3028            .unwrap()
3029            .unwrap();
3030        assert_eq!(extended_block.block.round(), 5);
3031        assert_eq!(extended_block.block.author(), core.context.own_index);
3032        assert_eq!(extended_block.block.ancestors().len(), 4);
3033        assert_eq!(extended_block.excluded_ancestors.len(), 8);
3034    }
3035
3036    #[tokio::test]
3037    async fn test_core_set_subscriber_exists() {
3038        telemetry_subscribers::init_for_testing();
3039        let (context, mut key_pairs) = Context::new_for_test(4);
3040        let context = Arc::new(context);
3041        let store = Arc::new(MemStore::new());
3042        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3043
3044        let block_manager = BlockManager::new(
3045            context.clone(),
3046            dag_state.clone(),
3047            Arc::new(NoopBlockVerifier),
3048        );
3049        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3050            context.clone(),
3051            dag_state.clone(),
3052        ));
3053
3054        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3055        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3056        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3057        // Need at least one subscriber to the block broadcast channel.
3058        let _block_receiver = signal_receivers.block_broadcast_receiver();
3059
3060        let (sender, _receiver) = unbounded_channel("consensus_output");
3061        let commit_observer = CommitObserver::new(
3062            context.clone(),
3063            CommitConsumer::new(sender.clone(), 0),
3064            dag_state.clone(),
3065            store.clone(),
3066            leader_schedule.clone(),
3067        );
3068
3069        let mut core = Core::new(
3070            context.clone(),
3071            leader_schedule,
3072            transaction_consumer,
3073            block_manager,
3074            // Set to no subscriber exists initially.
3075            false,
3076            commit_observer,
3077            signals,
3078            key_pairs.remove(context.own_index.value()).1,
3079            dag_state.clone(),
3080            false,
3081        );
3082
3083        // There is no proposal during recovery because there is no subscriber.
3084        assert_eq!(
3085            core.last_proposed_round(),
3086            GENESIS_ROUND,
3087            "No block should have been created other than genesis"
3088        );
3089
3090        // There is no proposal even with forced proposing.
3091        assert!(core.try_propose(true).unwrap().is_none());
3092
3093        // Let Core know subscriber exists.
3094        core.set_quorum_subscribers_exists(true);
3095
3096        // Proposing now would succeed.
3097        assert!(core.try_propose(true).unwrap().is_some());
3098    }
3099
3100    #[tokio::test]
3101    async fn test_core_set_propagation_delay_per_authority() {
3102        // TODO: create helper to avoid the duplicated code here.
3103        telemetry_subscribers::init_for_testing();
3104        let (context, mut key_pairs) = Context::new_for_test(4);
3105        let context = Arc::new(context);
3106        let store = Arc::new(MemStore::new());
3107        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3108
3109        let block_manager = BlockManager::new(
3110            context.clone(),
3111            dag_state.clone(),
3112            Arc::new(NoopBlockVerifier),
3113        );
3114        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3115            context.clone(),
3116            dag_state.clone(),
3117        ));
3118
3119        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3120        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3121        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3122        // Need at least one subscriber to the block broadcast channel.
3123        let _block_receiver = signal_receivers.block_broadcast_receiver();
3124
3125        let (sender, _receiver) = unbounded_channel("consensus_output");
3126        let commit_observer = CommitObserver::new(
3127            context.clone(),
3128            CommitConsumer::new(sender.clone(), 0),
3129            dag_state.clone(),
3130            store.clone(),
3131            leader_schedule.clone(),
3132        );
3133
3134        let mut core = Core::new(
3135            context.clone(),
3136            leader_schedule,
3137            transaction_consumer,
3138            block_manager,
3139            // Set to no subscriber exists initially.
3140            false,
3141            commit_observer,
3142            signals,
3143            key_pairs.remove(context.own_index.value()).1,
3144            dag_state.clone(),
3145            false,
3146        );
3147
3148        // There is no proposal during recovery because there is no subscriber.
3149        assert_eq!(
3150            core.last_proposed_round(),
3151            GENESIS_ROUND,
3152            "No block should have been created other than genesis"
3153        );
3154
3155        // Use a large propagation delay to disable proposing.
3156        core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);
3157
3158        // Make propagation delay the only reason for not proposing.
3159        core.set_quorum_subscribers_exists(true);
3160
3161        // There is no proposal even with forced proposing.
3162        assert!(core.try_propose(true).unwrap().is_none());
3163
3164        // Let Core know there is no propagation delay.
3165        core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);
3166
3167        // Proposing now would succeed.
3168        assert!(core.try_propose(true).unwrap().is_some());
3169    }
3170
3171    #[tokio::test(flavor = "current_thread", start_paused = true)]
3172    async fn test_leader_schedule_change() {
3173        telemetry_subscribers::init_for_testing();
3174        let default_params = Parameters::default();
3175
3176        let (context, _) = Context::new_for_test(4);
3177        // create the cores and their signals for all the authorities
3178        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3179
3180        // Now iterate over a few rounds and ensure the corresponding signals are
3181        // created while network advances
3182        let mut last_round_blocks = Vec::new();
3183        for round in 1..=30 {
3184            let mut this_round_blocks = Vec::new();
3185
3186            // Wait for min round delay to allow blocks to be proposed.
3187            sleep(default_params.min_round_delay).await;
3188
3189            for core_fixture in &mut cores {
3190                // add the blocks from last round
3191                // this will trigger a block creation for the round and a signal should be
3192                // emitted
3193                core_fixture
3194                    .core
3195                    .add_blocks(last_round_blocks.clone())
3196                    .unwrap();
3197
3198                // A "new round" signal should be received given that all the blocks of previous
3199                // round have been processed
3200                let new_round = receive(
3201                    Duration::from_secs(1),
3202                    core_fixture.signal_receivers.new_round_receiver(),
3203                )
3204                .await;
3205                assert_eq!(new_round, round);
3206
3207                // Check that a new block has been proposed.
3208                let extended_block = tokio::time::timeout(
3209                    Duration::from_secs(1),
3210                    core_fixture.block_receiver.recv(),
3211                )
3212                .await
3213                .unwrap()
3214                .unwrap();
3215                assert_eq!(extended_block.block.round(), round);
3216                assert_eq!(
3217                    extended_block.block.author(),
3218                    core_fixture.core.context.own_index
3219                );
3220
3221                // append the new block to this round blocks
3222                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3223
3224                let block = core_fixture.core.last_proposed_block();
3225
3226                // ensure that produced block is referring to the blocks of last_round
3227                assert_eq!(
3228                    block.ancestors().len(),
3229                    core_fixture.core.context.committee.size()
3230                );
3231                for ancestor in block.ancestors() {
3232                    if block.round() > 1 {
3233                        // don't bother with round 1 block which just contains the genesis blocks.
3234                        assert!(
3235                            last_round_blocks
3236                                .iter()
3237                                .any(|block| block.reference() == *ancestor),
3238                            "Reference from previous round should be added"
3239                        );
3240                    }
3241                }
3242            }
3243
3244            last_round_blocks = this_round_blocks;
3245        }
3246
3247        for core_fixture in cores {
3248            // Check commits have been persisted to store
3249            let last_commit = core_fixture
3250                .store
3251                .read_last_commit()
3252                .unwrap()
3253                .expect("last commit should be set");
3254            // There are 28 leader rounds with rounds completed up to and including
3255            // round 29. Round 30 blocks will only include their own blocks, so the
3256            // 28th leader will not be committed.
3257            assert_eq!(last_commit.index(), 27);
3258            let all_stored_commits = core_fixture
3259                .store
3260                .scan_commits((0..=CommitIndex::MAX).into())
3261                .unwrap();
3262            assert_eq!(all_stored_commits.len(), 27);
3263            assert_eq!(
3264                core_fixture
3265                    .core
3266                    .leader_schedule
3267                    .leader_swap_table
3268                    .read()
3269                    .bad_nodes
3270                    .len(),
3271                1
3272            );
3273            assert_eq!(
3274                core_fixture
3275                    .core
3276                    .leader_schedule
3277                    .leader_swap_table
3278                    .read()
3279                    .good_nodes
3280                    .len(),
3281                1
3282            );
3283            let expected_reputation_scores =
3284                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3285            assert_eq!(
3286                core_fixture
3287                    .core
3288                    .leader_schedule
3289                    .leader_swap_table
3290                    .read()
3291                    .reputation_scores,
3292                expected_reputation_scores
3293            );
3294        }
3295    }
3296
3297    // TODO: Remove this when DistributedVoteScoring is enabled.
3298    #[tokio::test(flavor = "current_thread", start_paused = true)]
3299    async fn test_leader_schedule_change_with_vote_scoring() {
3300        telemetry_subscribers::init_for_testing();
3301        let default_params = Parameters::default();
3302        let (mut context, _) = Context::new_for_test(4);
3303        context
3304            .protocol_config
3305            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3306        // create the cores and their signals for all the authorities
3307        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
3308        // Now iterate over a few rounds and ensure the corresponding signals are
3309        // created while network advances
3310        let mut last_round_blocks = Vec::new();
3311        for round in 1..=30 {
3312            let mut this_round_blocks = Vec::new();
3313            // Wait for min round delay to allow blocks to be proposed.
3314            sleep(default_params.min_round_delay).await;
3315            for core_fixture in &mut cores {
3316                // add the blocks from last round
3317                // this will trigger a block creation for the round and a signal should be
3318                // emitted
3319                core_fixture
3320                    .core
3321                    .add_blocks(last_round_blocks.clone())
3322                    .unwrap();
3323                // A "new round" signal should be received given that all the blocks of previous
3324                // round have been processed
3325                let new_round = receive(
3326                    Duration::from_secs(1),
3327                    core_fixture.signal_receivers.new_round_receiver(),
3328                )
3329                .await;
3330                assert_eq!(new_round, round);
3331                // Check that a new block has been proposed.
3332                let extended_block = tokio::time::timeout(
3333                    Duration::from_secs(1),
3334                    core_fixture.block_receiver.recv(),
3335                )
3336                .await
3337                .unwrap()
3338                .unwrap();
3339                assert_eq!(extended_block.block.round(), round);
3340                assert_eq!(
3341                    extended_block.block.author(),
3342                    core_fixture.core.context.own_index
3343                );
3344
3345                // append the new block to this round blocks
3346                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3347                let block = core_fixture.core.last_proposed_block();
3348                // ensure that produced block is referring to the blocks of last_round
3349                assert_eq!(
3350                    block.ancestors().len(),
3351                    core_fixture.core.context.committee.size()
3352                );
3353                for ancestor in block.ancestors() {
3354                    if block.round() > 1 {
3355                        // don't bother with round 1 block which just contains the genesis blocks.
3356                        assert!(
3357                            last_round_blocks
3358                                .iter()
3359                                .any(|block| block.reference() == *ancestor),
3360                            "Reference from previous round should be added"
3361                        );
3362                    }
3363                }
3364            }
3365            last_round_blocks = this_round_blocks;
3366        }
3367        for core_fixture in cores {
3368            // Check commits have been persisted to store
3369            let last_commit = core_fixture
3370                .store
3371                .read_last_commit()
3372                .unwrap()
3373                .expect("last commit should be set");
3374            // There are 28 leader rounds with rounds completed up to and including
3375            // round 29. Round 30 blocks will only include their own blocks, so the
3376            // 28th leader will not be committed.
3377            assert_eq!(last_commit.index(), 27);
3378            let all_stored_commits = core_fixture
3379                .store
3380                .scan_commits((0..=CommitIndex::MAX).into())
3381                .unwrap();
3382            assert_eq!(all_stored_commits.len(), 27);
3383            assert_eq!(
3384                core_fixture
3385                    .core
3386                    .leader_schedule
3387                    .leader_swap_table
3388                    .read()
3389                    .bad_nodes
3390                    .len(),
3391                1
3392            );
3393            assert_eq!(
3394                core_fixture
3395                    .core
3396                    .leader_schedule
3397                    .leader_swap_table
3398                    .read()
3399                    .good_nodes
3400                    .len(),
3401                1
3402            );
3403            let expected_reputation_scores =
3404                ReputationScores::new((11..=20).into(), vec![9, 8, 8, 8]);
3405            assert_eq!(
3406                core_fixture
3407                    .core
3408                    .leader_schedule
3409                    .leader_swap_table
3410                    .read()
3411                    .reputation_scores,
3412                expected_reputation_scores
3413            );
3414        }
3415    }
3416
3417    #[tokio::test]
3418    async fn test_validate_certified_commits() {
3419        telemetry_subscribers::init_for_testing();
3420
3421        let (context, _key_pairs) = Context::new_for_test(4);
3422        let context = context.with_parameters(Parameters {
3423            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3424            ..Default::default()
3425        });
3426
3427        let authority_index = AuthorityIndex::new_for_test(0);
3428        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3429        let mut core = core.core;
3430
3431        // No new block should have been produced
3432        assert_eq!(
3433            core.last_proposed_round(),
3434            GENESIS_ROUND,
3435            "No block should have been created other than genesis"
3436        );
3437
3438        // create a DAG of 12 rounds
3439        let mut dag_builder = DagBuilder::new(core.context.clone());
3440        dag_builder.layers(1..=12).build();
3441
3442        // Store all blocks up to round 6 which should be enough to decide up to leader
3443        // 4
3444        dag_builder.print();
3445        let blocks = dag_builder.blocks(1..=6);
3446
3447        for block in blocks {
3448            core.dag_state.write().accept_block(block);
3449        }
3450
3451        // Get all the committed sub dags up to round 10
3452        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3453
3454        // Now try to commit up to the latest leader (round = 4). Do not provide any
3455        // certified commits.
3456        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3457
3458        // We should have committed up to round 4
3459        assert_eq!(committed_sub_dags.len(), 4);
3460
3461        // Now validate the certified commits. We'll try 3 different scenarios:
3462        println!("Case 1. Provide certified commits that are all before the last committed round.");
3463
3464        // Highest certified commit should be for leader of round 4.
3465        let certified_commits = sub_dags_and_commits
3466            .iter()
3467            .take(4)
3468            .map(|(_, c)| c)
3469            .cloned()
3470            .collect::<Vec<_>>();
3471        assert!(
3472            certified_commits.last().unwrap().index()
3473                <= committed_sub_dags.last().unwrap().commit_ref.index,
3474            "Highest certified commit should older than the highest committed index."
3475        );
3476
3477        let certified_commits = core.validate_certified_commits(certified_commits).unwrap();
3478
3479        // No commits should be processed
3480        assert!(certified_commits.is_empty());
3481
3482        println!("Case 2. Provide certified commits that are all after the last committed round.");
3483
3484        // Highest certified commit should be for leader of round 4.
3485        let certified_commits = sub_dags_and_commits
3486            .iter()
3487            .take(5)
3488            .map(|(_, c)| c.clone())
3489            .collect::<Vec<_>>();
3490
3491        let certified_commits = core
3492            .validate_certified_commits(certified_commits.clone())
3493            .unwrap();
3494
3495        // The certified commit of index 5 should be processed.
3496        assert_eq!(certified_commits.len(), 1);
3497        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3498
3499        println!(
3500            "Case 3. Provide certified commits where the first certified commit index is not the last_committed_index + 1."
3501        );
3502
3503        // Highest certified commit should be for leader of round 4.
3504        let certified_commits = sub_dags_and_commits
3505            .iter()
3506            .skip(5)
3507            .take(1)
3508            .map(|(_, c)| c.clone())
3509            .collect::<Vec<_>>();
3510
3511        let err = core
3512            .validate_certified_commits(certified_commits.clone())
3513            .unwrap_err();
3514        match err {
3515            ConsensusError::UnexpectedCertifiedCommitIndex {
3516                expected_commit_index: 5,
3517                commit_index: 6,
3518            } => (),
3519            _ => panic!("Unexpected error: {err:?}"),
3520        }
3521    }
3522
3523    #[tokio::test]
3524    async fn test_add_certified_commits() {
3525        telemetry_subscribers::init_for_testing();
3526
3527        let (context, _key_pairs) = Context::new_for_test(4);
3528        let context = context.with_parameters(Parameters {
3529            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3530            ..Default::default()
3531        });
3532
3533        let authority_index = AuthorityIndex::new_for_test(0);
3534        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
3535        let store = core.store.clone();
3536        let mut core = core.core;
3537
3538        // No new block should have been produced
3539        assert_eq!(
3540            core.last_proposed_round(),
3541            GENESIS_ROUND,
3542            "No block should have been created other than genesis"
3543        );
3544
3545        // create a DAG of 12 rounds
3546        let mut dag_builder = DagBuilder::new(core.context.clone());
3547        dag_builder.layers(1..=12).build();
3548
3549        // Store all blocks up to round 6 which should be enough to decide up to leader
3550        // 4
3551        dag_builder.print();
3552        let blocks = dag_builder.blocks(1..=6);
3553
3554        for block in blocks {
3555            core.dag_state.write().accept_block(block);
3556        }
3557
3558        // Get all the committed sub dags up to round 10
3559        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3560
3561        // Now try to commit up to the latest leader (round = 4). Do not provide any
3562        // certified commits.
3563        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3564
3565        // We should have committed up to round 4
3566        assert_eq!(committed_sub_dags.len(), 4);
3567
3568        let last_commit = store
3569            .read_last_commit()
3570            .unwrap()
3571            .expect("Last commit should be set");
3572        assert_eq!(last_commit.reference().index, 4);
3573
3574        println!("Case 1. Provide no certified commits. No commit should happen.");
3575
3576        let last_commit = store
3577            .read_last_commit()
3578            .unwrap()
3579            .expect("Last commit should be set");
3580        assert_eq!(last_commit.reference().index, 4);
3581
3582        println!(
3583            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3584        );
3585
3586        // The commits of leader rounds 5-8 should be committed via the certified
3587        // commits.
3588        let certified_commits = sub_dags_and_commits
3589            .iter()
3590            .skip(3)
3591            .take(5)
3592            .map(|(_, c)| c.clone())
3593            .collect::<Vec<_>>();
3594
3595        // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be
3596        // accepted via the certified commits processing.
3597        let blocks = dag_builder.blocks(8..=12);
3598        for block in blocks {
3599            core.dag_state.write().accept_block(block);
3600        }
3601
3602        // The corresponding blocks of the certified commits should be accepted and
3603        // stored before linearizing and committing the DAG.
3604        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3605            .expect("Should not fail");
3606
3607        let commits = store.scan_commits((6..=10).into()).unwrap();
3608
3609        // We expect all the sub dags up to leader round 10 to be committed.
3610        assert_eq!(commits.len(), 5);
3611
3612        for i in 6..=10 {
3613            let commit = &commits[i - 6];
3614            assert_eq!(commit.reference().index, i as u32);
3615        }
3616    }
3617
3618    #[tokio::test]
3619    async fn try_commit_with_certified_commits_gced_blocks() {
3620        const GC_DEPTH: u32 = 3;
3621        telemetry_subscribers::init_for_testing();
3622
3623        let (mut context, mut key_pairs) = Context::new_for_test(5);
3624        context
3625            .protocol_config
3626            .set_consensus_gc_depth_for_testing(GC_DEPTH);
3627        // context.protocol_config.
3628        // set_narwhal_new_leader_election_schedule_for_testing(val);
3629        let context = Arc::new(context.with_parameters(Parameters {
3630            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3631            ..Default::default()
3632        }));
3633
3634        let store = Arc::new(MemStore::new());
3635        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3636
3637        let block_manager = BlockManager::new(
3638            context.clone(),
3639            dag_state.clone(),
3640            Arc::new(NoopBlockVerifier),
3641        );
3642        let leader_schedule = Arc::new(
3643            LeaderSchedule::from_store(context.clone(), dag_state.clone())
3644                .with_num_commits_per_schedule(10),
3645        );
3646
3647        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3648        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3649        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3650        // Need at least one subscriber to the block broadcast channel.
3651        let _block_receiver = signal_receivers.block_broadcast_receiver();
3652
3653        let (sender, _receiver) = unbounded_channel("consensus_output");
3654        let commit_consumer = CommitConsumer::new(sender.clone(), 0);
3655        let commit_observer = CommitObserver::new(
3656            context.clone(),
3657            commit_consumer,
3658            dag_state.clone(),
3659            store.clone(),
3660            leader_schedule.clone(),
3661        );
3662
3663        let mut core = Core::new(
3664            context.clone(),
3665            leader_schedule,
3666            transaction_consumer,
3667            block_manager,
3668            true,
3669            commit_observer,
3670            signals,
3671            key_pairs.remove(context.own_index.value()).1,
3672            dag_state.clone(),
3673            true,
3674        );
3675
3676        // No new block should have been produced
3677        assert_eq!(
3678            core.last_proposed_round(),
3679            GENESIS_ROUND,
3680            "No block should have been created other than genesis"
3681        );
3682
3683        let dag_str = "DAG {
3684            Round 0 : { 5 },
3685            Round 1 : { * },
3686            Round 2 : {
3687                A -> [-E1],
3688                B -> [-E1],
3689                C -> [-E1],
3690                D -> [-E1],
3691            },
3692            Round 3 : {
3693                A -> [*],
3694                B -> [*],
3695                C -> [*],
3696                D -> [*],
3697            },
3698            Round 4 : {
3699                A -> [*],
3700                B -> [*],
3701                C -> [*],
3702                D -> [*],
3703            },
3704            Round 5 : {
3705                A -> [*],
3706                B -> [*],
3707                C -> [*],
3708                D -> [*],
3709                E -> [A4, B4, C4, D4, E1]
3710            },
3711            Round 6 : { * },
3712            Round 7 : { * },
3713        }";
3714
3715        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3716        dag_builder.print();
3717
3718        // Now get all the committed sub dags from the DagBuilder
3719        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3720            .get_sub_dag_and_certified_commits(1..=5)
3721            .into_iter()
3722            .unzip();
3723
3724        // Now try to commit up to the latest leader (round = 5) with the provided
3725        // certified commits. Not that we have not accepted any blocks. That
3726        // should happen during the commit process.
3727        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3728
3729        // We should have committed up to round 4
3730        assert_eq!(committed_sub_dags.len(), 4);
3731        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3732            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3733
3734            // ensure that block from E1 node has not been committed
3735            for block in committed_sub_dag.blocks.iter() {
3736                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3737                    panic!("Did not expect to commit block E1");
3738                }
3739            }
3740        }
3741    }
3742
3743    #[tokio::test(flavor = "current_thread", start_paused = true)]
3744    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3745        telemetry_subscribers::init_for_testing();
3746        let default_params = Parameters::default();
3747
3748        let (context, _) = Context::new_for_test(6);
3749
3750        // create the cores and their signals for all the authorities
3751        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3752
3753        // Now iterate over a few rounds and ensure the corresponding signals are
3754        // created while network advances
3755        let mut last_round_blocks = Vec::new();
3756        for round in 1..=33 {
3757            let mut this_round_blocks = Vec::new();
3758            // Wait for min round delay to allow blocks to be proposed.
3759            sleep(default_params.min_round_delay).await;
3760            for core_fixture in &mut cores {
3761                // add the blocks from last round
3762                // this will trigger a block creation for the round and a signal should be
3763                // emitted
3764                core_fixture
3765                    .core
3766                    .add_blocks(last_round_blocks.clone())
3767                    .unwrap();
3768                // A "new round" signal should be received given that all the blocks of previous
3769                // round have been processed
3770                let new_round = receive(
3771                    Duration::from_secs(1),
3772                    core_fixture.signal_receivers.new_round_receiver(),
3773                )
3774                .await;
3775                assert_eq!(new_round, round);
3776                // Check that a new block has been proposed.
3777                let extended_block = tokio::time::timeout(
3778                    Duration::from_secs(1),
3779                    core_fixture.block_receiver.recv(),
3780                )
3781                .await
3782                .unwrap()
3783                .unwrap();
3784                assert_eq!(extended_block.block.round(), round);
3785                assert_eq!(
3786                    extended_block.block.author(),
3787                    core_fixture.core.context.own_index
3788                );
3789
3790                // append the new block to this round blocks
3791                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3792                let block = core_fixture.core.last_proposed_block();
3793                // ensure that produced block is referring to the blocks of last_round
3794                assert_eq!(
3795                    block.ancestors().len(),
3796                    core_fixture.core.context.committee.size()
3797                );
3798                for ancestor in block.ancestors() {
3799                    if block.round() > 1 {
3800                        // don't bother with round 1 block which just contains the genesis blocks.
3801                        assert!(
3802                            last_round_blocks
3803                                .iter()
3804                                .any(|block| block.reference() == *ancestor),
3805                            "Reference from previous round should be added"
3806                        );
3807                    }
3808                }
3809            }
3810            last_round_blocks = this_round_blocks;
3811        }
3812        for core_fixture in cores {
3813            // Check commits have been persisted to store
3814            let last_commit = core_fixture
3815                .store
3816                .read_last_commit()
3817                .unwrap()
3818                .expect("last commit should be set");
3819            // There are 31 leader rounds with rounds completed up to and including
3820            // round 33. Round 33 blocks will only include their own blocks, so there
3821            // should only be 30 commits.
3822            // However on a leader schedule change boundary its is possible for a
3823            // new leader to get selected for the same round if the leader elected
3824            // gets swapped allowing for multiple leaders to be committed at a round.
3825            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3826            // otherwise 31.
3827            // NOTE: We used 31 leader rounds to specifically trigger the scenario
3828            // where the leader schedule boundary occurred AND we had a swap to a new
3829            // leader for the same round
3830            let expected_commit_count = 30;
3831            // Leave the code for re-use.
3832            // let expected_commit_count = match num_leaders_per_round {
3833            //    Some(1) => 30,
3834            //    _ => 31,
3835            //};
3836            assert_eq!(last_commit.index(), expected_commit_count);
3837            let all_stored_commits = core_fixture
3838                .store
3839                .scan_commits((0..=CommitIndex::MAX).into())
3840                .unwrap();
3841            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3842            assert_eq!(
3843                core_fixture
3844                    .core
3845                    .leader_schedule
3846                    .leader_swap_table
3847                    .read()
3848                    .bad_nodes
3849                    .len(),
3850                1
3851            );
3852            assert_eq!(
3853                core_fixture
3854                    .core
3855                    .leader_schedule
3856                    .leader_swap_table
3857                    .read()
3858                    .good_nodes
3859                    .len(),
3860                1
3861            );
3862            let expected_reputation_scores =
3863                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3864            assert_eq!(
3865                core_fixture
3866                    .core
3867                    .leader_schedule
3868                    .leader_swap_table
3869                    .read()
3870                    .reputation_scores,
3871                expected_reputation_scores
3872            );
3873        }
3874    }
3875
3876    // TODO: Remove two tests below this when DistributedVoteScoring is enabled.
3877    #[tokio::test(flavor = "current_thread", start_paused = true)]
3878    async fn test_commit_on_leader_schedule_change_boundary_without_multileader_with_vote_scoring()
3879    {
3880        telemetry_subscribers::init_for_testing();
3881        let default_params = Parameters::default();
3882
3883        let (mut context, _) = Context::new_for_test(6);
3884        context
3885            .protocol_config
3886            .set_consensus_distributed_vote_scoring_strategy_for_testing(false);
3887
3888        // create the cores and their signals for all the authorities
3889        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);
3890        // Now iterate over a few rounds and ensure the corresponding signals are
3891        // created while network advances
3892        let mut last_round_blocks = Vec::new();
3893        for round in 1..=63 {
3894            let mut this_round_blocks = Vec::new();
3895
3896            // Wait for min round delay to allow blocks to be proposed.
3897            sleep(default_params.min_round_delay).await;
3898
3899            for core_fixture in &mut cores {
3900                // add the blocks from last round
3901                // this will trigger a block creation for the round and a signal should be
3902                // emitted
3903                core_fixture
3904                    .core
3905                    .add_blocks(last_round_blocks.clone())
3906                    .unwrap();
3907
3908                // A "new round" signal should be received given that all the blocks of previous
3909                // round have been processed
3910                let new_round = receive(
3911                    Duration::from_secs(1),
3912                    core_fixture.signal_receivers.new_round_receiver(),
3913                )
3914                .await;
3915                assert_eq!(new_round, round);
3916
3917                // Check that a new block has been proposed.
3918                let extended_block = tokio::time::timeout(
3919                    Duration::from_secs(1),
3920                    core_fixture.block_receiver.recv(),
3921                )
3922                .await
3923                .unwrap()
3924                .unwrap();
3925                assert_eq!(extended_block.block.round(), round);
3926                assert_eq!(
3927                    extended_block.block.author(),
3928                    core_fixture.core.context.own_index
3929                );
3930
3931                // append the new block to this round blocks
3932                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3933
3934                let block = core_fixture.core.last_proposed_block();
3935
3936                // ensure that produced block is referring to the blocks of last_round
3937                assert_eq!(
3938                    block.ancestors().len(),
3939                    core_fixture.core.context.committee.size()
3940                );
3941                for ancestor in block.ancestors() {
3942                    if block.round() > 1 {
3943                        // don't bother with round 1 block which just contains the genesis blocks.
3944                        assert!(
3945                            last_round_blocks
3946                                .iter()
3947                                .any(|block| block.reference() == *ancestor),
3948                            "Reference from previous round should be added"
3949                        );
3950                    }
3951                }
3952            }
3953
3954            last_round_blocks = this_round_blocks;
3955        }
3956
3957        for core_fixture in cores {
3958            // Check commits have been persisted to store
3959            let last_commit = core_fixture
3960                .store
3961                .read_last_commit()
3962                .unwrap()
3963                .expect("last commit should be set");
3964            // There are 61 leader rounds with rounds completed up to and including
3965            // round 63. Round 63 blocks will only include their own blocks, so there
3966            // should only be 60 commits.
3967            // However on a leader schedule change boundary its is possible for a
3968            // new leader to get selected for the same round if the leader elected
3969            // gets swapped allowing for multiple leaders to be committed at a round.
3970            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3971            // otherwise 61.
3972            // NOTE: We used 61 leader rounds to specifically trigger the scenario
3973            // where the leader schedule boundary occurred AND we had a swap to a new
3974            // leader for the same round
3975            let expected_commit_count = 60;
3976            // Leave the code for re-use.
3977            // let expected_commit_count = match num_leaders_per_round {
3978            //    Some(1) => 60,
3979            //    _ => 61,
3980            //};
3981            assert_eq!(last_commit.index(), expected_commit_count);
3982            let all_stored_commits = core_fixture
3983                .store
3984                .scan_commits((0..=CommitIndex::MAX).into())
3985                .unwrap();
3986            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3987            assert_eq!(
3988                core_fixture
3989                    .core
3990                    .leader_schedule
3991                    .leader_swap_table
3992                    .read()
3993                    .bad_nodes
3994                    .len(),
3995                1
3996            );
3997            assert_eq!(
3998                core_fixture
3999                    .core
4000                    .leader_schedule
4001                    .leader_swap_table
4002                    .read()
4003                    .good_nodes
4004                    .len(),
4005                1
4006            );
4007            let expected_reputation_scores =
4008                ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
4009            assert_eq!(
4010                core_fixture
4011                    .core
4012                    .leader_schedule
4013                    .leader_swap_table
4014                    .read()
4015                    .reputation_scores,
4016                expected_reputation_scores
4017            );
4018        }
4019    }
4020
4021    #[tokio::test]
4022    async fn test_core_signals() {
4023        telemetry_subscribers::init_for_testing();
4024        let default_params = Parameters::default();
4025
4026        let (context, _) = Context::new_for_test(4);
4027        // create the cores and their signals for all the authorities
4028        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4029
4030        // Now iterate over a few rounds and ensure the corresponding signals are
4031        // created while network advances
4032        let mut last_round_blocks = Vec::new();
4033        for round in 1..=10 {
4034            let mut this_round_blocks = Vec::new();
4035
4036            // Wait for min round delay to allow blocks to be proposed.
4037            sleep(default_params.min_round_delay).await;
4038
4039            for core_fixture in &mut cores {
4040                // add the blocks from last round
4041                // this will trigger a block creation for the round and a signal should be
4042                // emitted
4043                core_fixture
4044                    .core
4045                    .add_blocks(last_round_blocks.clone())
4046                    .unwrap();
4047
4048                // A "new round" signal should be received given that all the blocks of previous
4049                // round have been processed
4050                let new_round = receive(
4051                    Duration::from_secs(1),
4052                    core_fixture.signal_receivers.new_round_receiver(),
4053                )
4054                .await;
4055                assert_eq!(new_round, round);
4056
4057                // Check that a new block has been proposed.
4058                let extended_block = tokio::time::timeout(
4059                    Duration::from_secs(1),
4060                    core_fixture.block_receiver.recv(),
4061                )
4062                .await
4063                .unwrap()
4064                .unwrap();
4065                assert_eq!(extended_block.block.round(), round);
4066                assert_eq!(
4067                    extended_block.block.author(),
4068                    core_fixture.core.context.own_index
4069                );
4070
4071                // append the new block to this round blocks
4072                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
4073
4074                let block = core_fixture.core.last_proposed_block();
4075
4076                // ensure that produced block is referring to the blocks of last_round
4077                assert_eq!(
4078                    block.ancestors().len(),
4079                    core_fixture.core.context.committee.size()
4080                );
4081                for ancestor in block.ancestors() {
4082                    if block.round() > 1 {
4083                        // don't bother with round 1 block which just contains the genesis blocks.
4084                        assert!(
4085                            last_round_blocks
4086                                .iter()
4087                                .any(|block| block.reference() == *ancestor),
4088                            "Reference from previous round should be added"
4089                        );
4090                    }
4091                }
4092            }
4093
4094            last_round_blocks = this_round_blocks;
4095        }
4096
4097        for core_fixture in cores {
4098            // Check commits have been persisted to store
4099            let last_commit = core_fixture
4100                .store
4101                .read_last_commit()
4102                .unwrap()
4103                .expect("last commit should be set");
4104            // There are 8 leader rounds with rounds completed up to and including
4105            // round 9. Round 10 blocks will only include their own blocks, so the
4106            // 8th leader will not be committed.
4107            assert_eq!(last_commit.index(), 7);
4108            let all_stored_commits = core_fixture
4109                .store
4110                .scan_commits((0..=CommitIndex::MAX).into())
4111                .unwrap();
4112            assert_eq!(all_stored_commits.len(), 7);
4113        }
4114    }
4115
4116    #[tokio::test]
4117    async fn test_core_compress_proposal_references() {
4118        telemetry_subscribers::init_for_testing();
4119        let default_params = Parameters::default();
4120
4121        let (context, _) = Context::new_for_test(4);
4122        // create the cores and their signals for all the authorities
4123        let mut cores = create_cores(context, vec![1, 1, 1, 1]);
4124
4125        let mut last_round_blocks = Vec::new();
4126        let mut all_blocks = Vec::new();
4127
4128        let excluded_authority = AuthorityIndex::new_for_test(3);
4129
4130        for round in 1..=10 {
4131            let mut this_round_blocks = Vec::new();
4132
4133            for core_fixture in &mut cores {
4134                // do not produce any block for authority 3
4135                if core_fixture.core.context.own_index == excluded_authority {
4136                    continue;
4137                }
4138
4139                // try to propose to ensure that we are covering the case where we miss the
4140                // leader authority 3
4141                core_fixture
4142                    .core
4143                    .add_blocks(last_round_blocks.clone())
4144                    .unwrap();
4145                core_fixture.core.new_block(round, true).unwrap();
4146
4147                let block = core_fixture.core.last_proposed_block();
4148                assert_eq!(block.round(), round);
4149
4150                // append the new block to this round blocks
4151                this_round_blocks.push(block.clone());
4152            }
4153
4154            last_round_blocks = this_round_blocks.clone();
4155            all_blocks.extend(this_round_blocks);
4156        }
4157
4158        // Now send all the produced blocks to core of authority 3. It should produce a
4159        // new block. If no compression would be applied the we should expect
4160        // all the previous blocks to be referenced from round 0..=10. However, since
4161        // compression is applied only the last round's (10) blocks should be
4162        // referenced + the authority's block of round 0.
4163        let core_fixture = &mut cores[excluded_authority];
4164        // Wait for min round delay to allow blocks to be proposed.
4165        sleep(default_params.min_round_delay).await;
4166        // add blocks to trigger proposal.
4167        core_fixture.core.add_blocks(all_blocks).unwrap();
4168
4169        // Assert that a block has been created for round 11 and it references to blocks
4170        // of round 10 for the other peers, and to round 1 for its own block
4171        // (created after recovery).
4172        let block = core_fixture.core.last_proposed_block();
4173        assert_eq!(block.round(), 11);
4174        assert_eq!(block.ancestors().len(), 4);
4175        for block_ref in block.ancestors() {
4176            if block_ref.author == excluded_authority {
4177                assert_eq!(block_ref.round, 1);
4178            } else {
4179                assert_eq!(block_ref.round, 10);
4180            }
4181        }
4182
4183        // Check commits have been persisted to store
4184        let last_commit = core_fixture
4185            .store
4186            .read_last_commit()
4187            .unwrap()
4188            .expect("last commit should be set");
4189        // There are 8 leader rounds with rounds completed up to and including
4190        // round 10. However because there were no blocks produced for authority 3
4191        // 2 leader rounds will be skipped.
4192        assert_eq!(last_commit.index(), 6);
4193        let all_stored_commits = core_fixture
4194            .store
4195            .scan_commits((0..=CommitIndex::MAX).into())
4196            .unwrap();
4197        assert_eq!(all_stored_commits.len(), 6);
4198    }
4199
4200    #[tokio::test]
4201    async fn try_decide_certified() {
4202        // GIVEN
4203        telemetry_subscribers::init_for_testing();
4204
4205        let (context, _) = Context::new_for_test(4);
4206
4207        let authority_index = AuthorityIndex::new_for_test(0);
4208        let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
4209        let mut core = core.core;
4210
4211        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4212        dag_builder.layers(1..=12).build();
4213
4214        let limit = 2;
4215
4216        let blocks = dag_builder.blocks(1..=12);
4217
4218        for block in blocks {
4219            core.dag_state.write().accept_block(block);
4220        }
4221
4222        // WHEN
4223        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4224        let mut certified_commits = sub_dags_and_commits
4225            .into_iter()
4226            .map(|(_, commit)| commit)
4227            .collect::<Vec<_>>();
4228
4229        let leaders = core.try_decide_certified(&mut certified_commits, limit);
4230
4231        // THEN
4232        assert_eq!(leaders.len(), 2);
4233        assert_eq!(certified_commits.len(), 2);
4234    }
4235
4236    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4237        tokio::time::timeout(timeout, receiver.changed())
4238            .await
4239            .expect("Timeout while waiting to read from receiver")
4240            .expect("Signal receive channel shouldn't be closed");
4241        *receiver.borrow_and_update()
4242    }
4243}