consensus_core/
round_prober.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! RoundProber periodically checks each peer for the latest rounds they
6//! received and accepted from others. This provides insight into how
7//! effectively each authority's blocks are propagated and accepted across the
8//! network.
9//!
10//! Unlike inferring accepted rounds from the DAG of each block, RoundProber has
11//! the benefit that it remains active even when peers are not proposing. This
12//! makes it essential for determining when to disable optimizations that
13//! improve DAG quality but may compromise liveness.
14//!
15//! RoundProber's data sources include the `highest_received_rounds` &
16//! `highest_accepted_rounds` tracked by the CoreThreadDispatcher and DagState.
17//! The received rounds are updated after blocks are verified but before
18//! checking for dependencies. This should make the values more indicative of
19//! how well authorities propagate blocks, and less influenced by the quality of
20//! ancestors in the proposed blocks. The accepted rounds are updated after
21//! checking for dependencies which should indicate the quality of the proposed
22//! blocks including its ancestors.
23
24use std::{sync::Arc, time::Duration};
25
26use consensus_config::{AuthorityIndex, Committee};
27use futures::stream::{FuturesUnordered, StreamExt as _};
28use iota_common::sync::notify_once::NotifyOnce;
29use iota_metrics::monitored_scope;
30use parking_lot::RwLock;
31use tokio::{task::JoinHandle, time::MissedTickBehavior};
32
33use crate::{
34    BlockAPI as _, Round, context::Context, core_thread::CoreThreadDispatcher, dag_state::DagState,
35    network::NetworkClient,
36};
37
38/// A [`QuorumRound`] is a round range [low, high]. It is computed from
39/// highest received or accepted rounds of an authority reported by all
40/// authorities.
41/// The bounds represent:
42/// - the highest round lower or equal to rounds from a quorum (low)
43/// - the lowest round higher or equal to rounds from a quorum (high)
44///
45/// [`QuorumRound`] is useful because:
46/// - [low, high] range is BFT, always between the lowest and highest rounds of
47///   honest validators, with < validity threshold of malicious stake.
48/// - It provides signals about how well blocks from an authority propagates in
49///   the network. If low bound for an authority is lower than its last proposed
50///   round, the last proposed block has not propagated to a quorum. If a new
51///   block is proposed from the authority, it will not get accepted immediately
52///   by a quorum.
53pub(crate) type QuorumRound = (Round, Round);
54
55// Handle to control the RoundProber loop and read latest round gaps.
56pub(crate) struct RoundProberHandle {
57    prober_task: JoinHandle<()>,
58    shutdown_notify: Arc<NotifyOnce>,
59}
60
61impl RoundProberHandle {
62    pub(crate) async fn stop(self) {
63        let _ = self.shutdown_notify.notify();
64        // Do not abort prober task, which waits for requests to be cancelled.
65        if let Err(e) = self.prober_task.await {
66            if e.is_panic() {
67                std::panic::resume_unwind(e.into_panic());
68            }
69        }
70    }
71}
72
73pub(crate) struct RoundProber<C: NetworkClient> {
74    context: Arc<Context>,
75    core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
76    dag_state: Arc<RwLock<DagState>>,
77    network_client: Arc<C>,
78    shutdown_notify: Arc<NotifyOnce>,
79}
80
81impl<C: NetworkClient> RoundProber<C> {
82    pub(crate) fn new(
83        context: Arc<Context>,
84        core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
85        dag_state: Arc<RwLock<DagState>>,
86        network_client: Arc<C>,
87    ) -> Self {
88        Self {
89            context,
90            core_thread_dispatcher,
91            dag_state,
92            network_client,
93            shutdown_notify: Arc::new(NotifyOnce::new()),
94        }
95    }
96
97    pub(crate) fn start(self) -> RoundProberHandle {
98        let shutdown_notify = self.shutdown_notify.clone();
99        let loop_shutdown_notify = shutdown_notify.clone();
100        let prober_task = tokio::spawn(async move {
101            // With 200 validators, this would result in 200 * 4 * 200 / 2 = 80KB of
102            // additional bandwidth usage per sec. We can consider using
103            // adaptive intervals, for example 10s by default but reduced to 2s
104            // when the propagation delay is higher.
105            let mut interval = tokio::time::interval(Duration::from_millis(
106                self.context.parameters.round_prober_interval_ms,
107            ));
108            interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
109            loop {
110                tokio::select! {
111                    _ = interval.tick() => {
112                        self.probe().await;
113                    }
114                    _ = loop_shutdown_notify.wait() => {
115                        break;
116                    }
117                }
118            }
119        });
120        RoundProberHandle {
121            prober_task,
122            shutdown_notify,
123        }
124    }
125
126    // Probes each peer for the latest rounds they received from others.
127    // Returns the quorum round for each authority, and the propagation delay
128    // of own blocks.
129    pub(crate) async fn probe(&self) -> (Vec<QuorumRound>, Vec<QuorumRound>, Round) {
130        let _scope = monitored_scope("RoundProber");
131
132        let node_metrics = &self.context.metrics.node_metrics;
133        let request_timeout =
134            Duration::from_millis(self.context.parameters.round_prober_request_timeout_ms);
135        let own_index = self.context.own_index;
136        let mut requests = FuturesUnordered::new();
137
138        for (peer, _) in self.context.committee.authorities() {
139            if peer == own_index {
140                continue;
141            }
142            let network_client = self.network_client.clone();
143            requests.push(async move {
144                let result = tokio::time::timeout(
145                    request_timeout,
146                    network_client.get_latest_rounds(peer, request_timeout),
147                )
148                .await;
149                (peer, result)
150            });
151        }
152
153        let mut highest_received_rounds =
154            vec![vec![0; self.context.committee.size()]; self.context.committee.size()];
155        let mut highest_accepted_rounds =
156            vec![vec![0; self.context.committee.size()]; self.context.committee.size()];
157
158        let blocks = self
159            .dag_state
160            .read()
161            .get_last_cached_block_per_authority(Round::MAX);
162        let local_highest_accepted_rounds = blocks
163            .into_iter()
164            .map(|(block, _)| block.round())
165            .collect::<Vec<_>>();
166        let last_proposed_round = local_highest_accepted_rounds[own_index];
167
168        // For our own index, the highest received & accepted round is our last
169        // accepted round or our last proposed round.
170        highest_received_rounds[own_index] = self.core_thread_dispatcher.highest_received_rounds();
171        highest_accepted_rounds[own_index] = local_highest_accepted_rounds;
172        highest_received_rounds[own_index][own_index] = last_proposed_round;
173        highest_accepted_rounds[own_index][own_index] = last_proposed_round;
174
175        loop {
176            tokio::select! {
177                result = requests.next() => {
178                    let Some((peer, result)) = result else { break };
179                    match result {
180                        Ok(Ok((received, accepted))) => {
181                            if received.len() == self.context.committee.size()
182                            {
183                                highest_received_rounds[peer] = received;
184                            } else {
185                                node_metrics.round_prober_request_errors.with_label_values(&["invalid_received_rounds"]).inc();
186                                tracing::warn!("Received invalid number of received rounds from peer {}", peer);
187                            }
188
189                            if self
190                                .context
191                                .protocol_config
192                                .consensus_round_prober_probe_accepted_rounds() {
193                                    if accepted.len() == self.context.committee.size() {
194                                        highest_accepted_rounds[peer] = accepted;
195                                    } else {
196                                        node_metrics.round_prober_request_errors.with_label_values(&["invalid_accepted_rounds"]).inc();
197                                        tracing::warn!("Received invalid number of accepted rounds from peer {}", peer);
198                                    }
199                                }
200
201                        },
202                        // When a request fails, the highest received rounds from that authority will be 0
203                        // for the subsequent computations.
204                        // For propagation delay, this behavior is desirable because the computed delay
205                        // increases as this authority has more difficulty communicating with peers. Logic
206                        // triggered by high delay should usually be triggered with frequent probing failures
207                        // as well.
208                        // For quorum rounds computed for peer, this means the values should be used for
209                        // positive signals (peer A can propagate its blocks well) rather than negative signals
210                        // (peer A cannot propagate its blocks well). It can be difficult to distinguish between
211                        // own probing failures and actual propagation issues.
212                        Ok(Err(err)) => {
213                            node_metrics.round_prober_request_errors.with_label_values(&["failed_fetch"]).inc();
214                            tracing::debug!("Failed to get latest rounds from peer {}: {:?}", peer, err);
215                        },
216                        Err(_) => {
217                            node_metrics.round_prober_request_errors.with_label_values(&["timeout"]).inc();
218                            tracing::debug!("Timeout while getting latest rounds from peer {}", peer);
219                        },
220                    }
221                }
222                _ = self.shutdown_notify.wait() => break,
223            }
224        }
225
226        let received_quorum_rounds: Vec<_> = self
227            .context
228            .committee
229            .authorities()
230            .map(|(peer, _)| {
231                compute_quorum_round(&self.context.committee, peer, &highest_received_rounds)
232            })
233            .collect();
234        for ((low, high), (_, authority)) in received_quorum_rounds
235            .iter()
236            .zip(self.context.committee.authorities())
237        {
238            node_metrics
239                .round_prober_received_quorum_round_gaps
240                .with_label_values(&[&authority.hostname])
241                .set((high - low) as i64);
242            node_metrics
243                .round_prober_low_received_quorum_round
244                .with_label_values(&[&authority.hostname])
245                .set(*low as i64);
246            // The gap can be negative if this validator is lagging behind the network.
247            node_metrics
248                .round_prober_current_received_round_gaps
249                .with_label_values(&[&authority.hostname])
250                .set(last_proposed_round as i64 - *low as i64);
251        }
252
253        let accepted_quorum_rounds: Vec<_> = self
254            .context
255            .committee
256            .authorities()
257            .map(|(peer, _)| {
258                compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
259            })
260            .collect();
261        for ((low, high), (_, authority)) in accepted_quorum_rounds
262            .iter()
263            .zip(self.context.committee.authorities())
264        {
265            node_metrics
266                .round_prober_accepted_quorum_round_gaps
267                .with_label_values(&[&authority.hostname])
268                .set((high - low) as i64);
269            node_metrics
270                .round_prober_low_accepted_quorum_round
271                .with_label_values(&[&authority.hostname])
272                .set(*low as i64);
273            // The gap can be negative if this validator is lagging behind the network.
274            node_metrics
275                .round_prober_current_accepted_round_gaps
276                .with_label_values(&[&authority.hostname])
277                .set(last_proposed_round as i64 - *low as i64);
278        }
279        // TODO: consider using own quorum round gap to control proposing in addition to
280        // propagation delay. For now they seem to be about the same.
281
282        // It is possible more blocks arrive at a quorum of peers before the
283        // get_latest_rounds requests arrive.
284        // Using the lower bound to increase sensitivity about block propagation issues
285        // that can reduce round rate.
286        // Because of the nature of TCP and block streaming, propagation delay is
287        // expected to be 0 in most cases, even when the actual latency of
288        // broadcasting blocks is high.
289        let propagation_delay =
290            last_proposed_round.saturating_sub(received_quorum_rounds[own_index].0);
291        node_metrics
292            .round_prober_propagation_delays
293            .observe(propagation_delay as f64);
294        node_metrics
295            .round_prober_last_propagation_delay
296            .set(propagation_delay as i64);
297        if let Err(e) = self
298            .core_thread_dispatcher
299            .set_propagation_delay_and_quorum_rounds(
300                propagation_delay,
301                received_quorum_rounds.clone(),
302                accepted_quorum_rounds.clone(),
303            )
304        {
305            tracing::warn!(
306                "Failed to set propagation delay and quorum rounds {received_quorum_rounds:?} on Core: {:?}",
307                e
308            );
309        }
310
311        (
312            received_quorum_rounds,
313            accepted_quorum_rounds,
314            propagation_delay,
315        )
316    }
317}
318
319/// For the peer specified with target_index, compute and return its
320/// [`QuorumRound`].
321fn compute_quorum_round(
322    committee: &Committee,
323    target_index: AuthorityIndex,
324    highest_received_rounds: &[Vec<Round>],
325) -> QuorumRound {
326    let mut rounds_with_stake = highest_received_rounds
327        .iter()
328        .zip(committee.authorities())
329        .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
330        .collect::<Vec<_>>();
331    rounds_with_stake.sort();
332
333    // Forward iteration and stopping at validity threshold would produce the same
334    // result currently, with fault tolerance of f/3f+1 votes. But it is not
335    // semantically correct, and will provide an incorrect value when fault
336    // tolerance and validity threshold are different.
337    let mut total_stake = 0;
338    let mut low = 0;
339    for (round, stake) in rounds_with_stake.iter().rev() {
340        total_stake += stake;
341        if total_stake >= committee.quorum_threshold() {
342            low = *round;
343            break;
344        }
345    }
346
347    let mut total_stake = 0;
348    let mut high = 0;
349    for (round, stake) in rounds_with_stake.iter() {
350        total_stake += stake;
351        if total_stake >= committee.quorum_threshold() {
352            high = *round;
353            break;
354        }
355    }
356
357    (low, high)
358}
359
360#[cfg(test)]
361mod test {
362    use std::{collections::BTreeSet, sync::Arc, time::Duration};
363
364    use async_trait::async_trait;
365    use bytes::Bytes;
366    use consensus_config::AuthorityIndex;
367    use parking_lot::{Mutex, RwLock};
368
369    use super::QuorumRound;
370    use crate::{
371        Round, TestBlock, VerifiedBlock,
372        block::BlockRef,
373        commit::{CertifiedCommits, CommitRange},
374        context::Context,
375        core_thread::{CoreError, CoreThreadDispatcher},
376        dag_state::DagState,
377        error::{ConsensusError, ConsensusResult},
378        network::{BlockStream, NetworkClient},
379        round_prober::{RoundProber, compute_quorum_round},
380        storage::mem_store::MemStore,
381    };
382
383    struct FakeThreadDispatcher {
384        highest_received_rounds: Vec<Round>,
385        propagation_delay: Mutex<Round>,
386        received_quorum_rounds: Mutex<Vec<QuorumRound>>,
387        accepted_quorum_rounds: Mutex<Vec<QuorumRound>>,
388    }
389
390    impl FakeThreadDispatcher {
391        fn new(highest_received_rounds: Vec<Round>) -> Self {
392            Self {
393                highest_received_rounds,
394                propagation_delay: Mutex::new(0),
395                received_quorum_rounds: Mutex::new(Vec::new()),
396                accepted_quorum_rounds: Mutex::new(Vec::new()),
397            }
398        }
399
400        fn propagation_delay(&self) -> Round {
401            *self.propagation_delay.lock()
402        }
403
404        fn received_quorum_rounds(&self) -> Vec<QuorumRound> {
405            self.received_quorum_rounds.lock().clone()
406        }
407
408        fn accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
409            self.accepted_quorum_rounds.lock().clone()
410        }
411    }
412
413    #[async_trait]
414    impl CoreThreadDispatcher for FakeThreadDispatcher {
415        async fn add_blocks(
416            &self,
417            _blocks: Vec<VerifiedBlock>,
418        ) -> Result<BTreeSet<BlockRef>, CoreError> {
419            unimplemented!()
420        }
421
422        async fn add_certified_commits(
423            &self,
424            _commits: CertifiedCommits,
425        ) -> Result<BTreeSet<BlockRef>, CoreError> {
426            unimplemented!()
427        }
428
429        async fn check_block_refs(
430            &self,
431            _block_refs: Vec<BlockRef>,
432        ) -> Result<BTreeSet<BlockRef>, CoreError> {
433            unimplemented!()
434        }
435
436        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
437            unimplemented!()
438        }
439
440        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
441            unimplemented!()
442        }
443
444        fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
445            unimplemented!()
446        }
447
448        fn set_propagation_delay_and_quorum_rounds(
449            &self,
450            delay: Round,
451            received_quorum_rounds: Vec<QuorumRound>,
452            accepted_quorum_rounds: Vec<QuorumRound>,
453        ) -> Result<(), CoreError> {
454            let mut received_quorum_round_per_authority = self.received_quorum_rounds.lock();
455            *received_quorum_round_per_authority = received_quorum_rounds;
456            let mut accepted_quorum_round_per_authority = self.accepted_quorum_rounds.lock();
457            *accepted_quorum_round_per_authority = accepted_quorum_rounds;
458            let mut propagation_delay = self.propagation_delay.lock();
459            *propagation_delay = delay;
460            Ok(())
461        }
462
463        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
464            unimplemented!()
465        }
466
467        fn highest_received_rounds(&self) -> Vec<Round> {
468            self.highest_received_rounds.clone()
469        }
470    }
471
472    struct FakeNetworkClient {
473        highest_received_rounds: Vec<Vec<Round>>,
474        highest_accepted_rounds: Vec<Vec<Round>>,
475    }
476
477    impl FakeNetworkClient {
478        fn new(
479            highest_received_rounds: Vec<Vec<Round>>,
480            highest_accepted_rounds: Vec<Vec<Round>>,
481        ) -> Self {
482            Self {
483                highest_received_rounds,
484                highest_accepted_rounds,
485            }
486        }
487    }
488
489    #[async_trait]
490    #[async_trait::async_trait]
491    impl NetworkClient for FakeNetworkClient {
492        const SUPPORT_STREAMING: bool = true;
493
494        async fn send_block(
495            &self,
496            _peer: AuthorityIndex,
497            _serialized_block: &VerifiedBlock,
498            _timeout: Duration,
499        ) -> ConsensusResult<()> {
500            unimplemented!("Unimplemented")
501        }
502
503        async fn subscribe_blocks(
504            &self,
505            _peer: AuthorityIndex,
506            _last_received: Round,
507            _timeout: Duration,
508        ) -> ConsensusResult<BlockStream> {
509            unimplemented!("Unimplemented")
510        }
511
512        async fn fetch_blocks(
513            &self,
514            _peer: AuthorityIndex,
515            _block_refs: Vec<BlockRef>,
516            _highest_accepted_rounds: Vec<Round>,
517            _timeout: Duration,
518        ) -> ConsensusResult<Vec<Bytes>> {
519            unimplemented!("Unimplemented")
520        }
521
522        async fn fetch_commits(
523            &self,
524            _peer: AuthorityIndex,
525            _commit_range: CommitRange,
526            _timeout: Duration,
527        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
528            unimplemented!("Unimplemented")
529        }
530
531        async fn fetch_latest_blocks(
532            &self,
533            _peer: AuthorityIndex,
534            _authorities: Vec<AuthorityIndex>,
535            _timeout: Duration,
536        ) -> ConsensusResult<Vec<Bytes>> {
537            unimplemented!("Unimplemented")
538        }
539
540        async fn get_latest_rounds(
541            &self,
542            peer: AuthorityIndex,
543            _timeout: Duration,
544        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
545            let received_rounds = self.highest_received_rounds[peer].clone();
546            let accepted_rounds = self.highest_accepted_rounds[peer].clone();
547            if received_rounds.is_empty() && accepted_rounds.is_empty() {
548                Err(ConsensusError::NetworkRequestTimeout("test".to_string()))
549            } else {
550                Ok((received_rounds, accepted_rounds))
551            }
552        }
553    }
554
555    #[tokio::test]
556    async fn test_round_prober() {
557        const NUM_AUTHORITIES: usize = 7;
558        let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
559        let core_thread_dispatcher = Arc::new(FakeThreadDispatcher::new(vec![
560            110, 120, 130, 140, 150, 160, 170,
561        ]));
562        let store = Arc::new(MemStore::new());
563        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
564        // Have some peers return error or incorrect number of rounds.
565        let network_client = Arc::new(FakeNetworkClient::new(
566            vec![
567                vec![],
568                vec![109, 121, 131, 0, 151, 161, 171],
569                vec![101, 0, 103, 104, 105, 166, 107],
570                vec![],
571                vec![100, 102, 133, 0, 155, 106, 177],
572                vec![105, 115, 103, 0, 125, 126, 127],
573                vec![10, 20, 30, 40, 50, 60],
574            ], // highest_received_rounds
575            vec![
576                vec![],
577                vec![0, 121, 131, 0, 151, 161, 171],
578                vec![1, 0, 103, 104, 105, 166, 107],
579                vec![],
580                vec![0, 102, 133, 0, 155, 106, 177],
581                vec![1, 115, 103, 0, 125, 126, 127],
582                vec![1, 20, 30, 40, 50, 60],
583            ], // highest_accepted_rounds
584        ));
585        let prober = RoundProber::new(
586            context.clone(),
587            core_thread_dispatcher.clone(),
588            dag_state.clone(),
589            network_client.clone(),
590        );
591
592        // Create test blocks for each authority with incrementing rounds starting at
593        // 110
594        let blocks = (0..NUM_AUTHORITIES)
595            .map(|authority| {
596                let round = 110 + (authority as u32 * 10);
597                VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build())
598            })
599            .collect::<Vec<_>>();
600
601        dag_state.write().accept_blocks(blocks);
602
603        // Compute quorum rounds and propagation delay based on last proposed round =
604        // 110, and highest received rounds:
605        // 110, 120, 130, 140, 150, 160, 170,
606        // 109, 121, 131, 0,   151, 161, 171,
607        // 101, 0,   103, 104, 105, 166, 107,
608        // 0,   0,   0,   0,   0,   0,   0,
609        // 100, 102, 133, 0,   155, 106, 177,
610        // 105, 115, 103, 0,   125, 126, 127,
611        // 0,   0,   0,   0,   0,   0,   0,
612
613        let (received_quorum_rounds, accepted_quorum_rounds, propagation_delay) =
614            prober.probe().await;
615
616        assert_eq!(
617            received_quorum_rounds,
618            vec![
619                (100, 105),
620                (0, 115),
621                (103, 130),
622                (0, 0),
623                (105, 150),
624                (106, 160),
625                (107, 170)
626            ]
627        );
628
629        assert_eq!(
630            core_thread_dispatcher.received_quorum_rounds(),
631            vec![
632                (100, 105),
633                (0, 115),
634                (103, 130),
635                (0, 0),
636                (105, 150),
637                (106, 160),
638                (107, 170)
639            ]
640        );
641        // 110 - 100 = 10
642        assert_eq!(propagation_delay, 10);
643        assert_eq!(core_thread_dispatcher.propagation_delay(), 10);
644
645        assert_eq!(
646            accepted_quorum_rounds,
647            vec![
648                (0, 1),
649                (0, 115),
650                (103, 130),
651                (0, 0),
652                (105, 150),
653                (106, 160),
654                (107, 170)
655            ]
656        );
657
658        assert_eq!(
659            core_thread_dispatcher.accepted_quorum_rounds(),
660            vec![
661                (0, 1),
662                (0, 115),
663                (103, 130),
664                (0, 0),
665                (105, 150),
666                (106, 160),
667                (107, 170)
668            ]
669        );
670    }
671
672    #[tokio::test]
673    async fn test_compute_quorum_round() {
674        let (context, _) = Context::new_for_test(4);
675
676        // Observe latest rounds from peers.
677        let highest_received_rounds = vec![
678            vec![10, 11, 12, 13],
679            vec![5, 2, 7, 4],
680            vec![0, 0, 0, 0],
681            vec![3, 4, 5, 6],
682        ];
683
684        let round = compute_quorum_round(
685            &context.committee,
686            AuthorityIndex::new_for_test(0),
687            &highest_received_rounds,
688        );
689        assert_eq!(round, (3, 5));
690
691        let round = compute_quorum_round(
692            &context.committee,
693            AuthorityIndex::new_for_test(1),
694            &highest_received_rounds,
695        );
696        assert_eq!(round, (2, 4));
697
698        let round = compute_quorum_round(
699            &context.committee,
700            AuthorityIndex::new_for_test(2),
701            &highest_received_rounds,
702        );
703        assert_eq!(round, (5, 7));
704
705        let round = compute_quorum_round(
706            &context.committee,
707            AuthorityIndex::new_for_test(3),
708            &highest_received_rounds,
709        );
710        assert_eq!(round, (4, 6));
711    }
712}