consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18    CommitIndex, Round,
19    block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    commit::{CommitAPI as _, CommitRange, TrustedCommit},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::Context,
24    core_thread::CoreThreadDispatcher,
25    dag_state::DagState,
26    error::{ConsensusError, ConsensusResult},
27    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28    stake_aggregator::{QuorumThreshold, StakeAggregator},
29    storage::Store,
30    synchronizer::SynchronizerHandle,
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35/// Authority's network service implementation, agnostic to the actual
36/// networking stack used.
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38    context: Arc<Context>,
39    commit_vote_monitor: Arc<CommitVoteMonitor>,
40    block_verifier: Arc<dyn BlockVerifier>,
41    synchronizer: Arc<SynchronizerHandle>,
42    core_dispatcher: Arc<C>,
43    rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44    subscription_counter: Arc<SubscriptionCounter>,
45    dag_state: Arc<RwLock<DagState>>,
46    store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50    pub(crate) fn new(
51        context: Arc<Context>,
52        block_verifier: Arc<dyn BlockVerifier>,
53        commit_vote_monitor: Arc<CommitVoteMonitor>,
54        synchronizer: Arc<SynchronizerHandle>,
55        core_dispatcher: Arc<C>,
56        rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57        dag_state: Arc<RwLock<DagState>>,
58        store: Arc<dyn Store>,
59    ) -> Self {
60        let subscription_counter = Arc::new(SubscriptionCounter::new(
61            context.clone(),
62            core_dispatcher.clone(),
63        ));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcaster,
71            subscription_counter,
72            dag_state,
73            store,
74        }
75    }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80    async fn handle_send_block(
81        &self,
82        peer: AuthorityIndex,
83        serialized_block: ExtendedSerializedBlock,
84    ) -> ConsensusResult<()> {
85        fail_point_async!("consensus-rpc-response");
86
87        let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89        // TODO: dedup block verifications, here and with fetched blocks.
90        let signed_block: SignedBlock =
91            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93        // Reject blocks not produced by the peer.
94        if peer != signed_block.author() {
95            self.context
96                .metrics
97                .node_metrics
98                .invalid_blocks
99                .with_label_values(&[
100                    peer_hostname.as_str(),
101                    "handle_send_block",
102                    "UnexpectedAuthority",
103                ])
104                .inc();
105            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106            info!("Block with wrong authority from {}: {}", peer, e);
107            return Err(e);
108        }
109        let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111        // Reject blocks failing validations.
112        if let Err(e) = self.block_verifier.verify(&signed_block) {
113            self.context
114                .metrics
115                .node_metrics
116                .invalid_blocks
117                .with_label_values(&[
118                    peer_hostname.as_str(),
119                    "handle_send_block",
120                    e.clone().name(),
121                ])
122                .inc();
123            info!("Invalid block from {}: {}", peer, e);
124            return Err(e);
125        }
126        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127        let block_ref = verified_block.reference();
128        debug!("Received block {} via send block.", block_ref);
129
130        // Reject block with timestamp too far in the future.
131        let now = self.context.clock.timestamp_utc_ms();
132        let forward_time_drift =
133            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
134        if forward_time_drift > self.context.parameters.max_forward_time_drift {
135            self.context
136                .metrics
137                .node_metrics
138                .rejected_future_blocks
139                .with_label_values(&[peer_hostname])
140                .inc();
141            debug!(
142                "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
143                block_ref,
144                verified_block.timestamp_ms(),
145                now,
146            );
147            return Err(ConsensusError::BlockRejected {
148                block_ref,
149                reason: format!(
150                    "Block timestamp is too far in the future: {} > {}",
151                    verified_block.timestamp_ms(),
152                    now
153                ),
154            });
155        }
156
157        // Wait until the block's timestamp is current.
158        if forward_time_drift > Duration::ZERO {
159            self.context
160                .metrics
161                .node_metrics
162                .block_timestamp_drift_wait_ms
163                .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
164                .inc_by(forward_time_drift.as_millis() as u64);
165            debug!(
166                "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
167                block_ref,
168                verified_block.timestamp_ms(),
169                now,
170                forward_time_drift.as_millis(),
171            );
172            sleep(forward_time_drift).await;
173        }
174
175        // Observe the block for the commit votes. When local commit is lagging too
176        // much, commit sync loop will trigger fetching.
177        self.commit_vote_monitor.observe_block(&verified_block);
178
179        // Reject blocks when local commit index is lagging too far from quorum commit
180        // index.
181        //
182        // IMPORTANT: this must be done after observing votes from the block, otherwise
183        // observed quorum commit will no longer progress.
184        //
185        // Since the main issue with too many suspended blocks is memory usage not CPU,
186        // it is ok to reject after block verifications instead of before.
187        let last_commit_index = self.dag_state.read().last_commit_index();
188        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
189        // The threshold to ignore block should be larger than commit_sync_batch_size,
190        // to avoid excessive block rejections and synchronizations.
191        if last_commit_index
192            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
193            < quorum_commit_index
194        {
195            self.context
196                .metrics
197                .node_metrics
198                .rejected_blocks
199                .with_label_values(&["commit_lagging"])
200                .inc();
201            debug!(
202                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
203                block_ref, last_commit_index, quorum_commit_index,
204            );
205            return Err(ConsensusError::BlockRejected {
206                block_ref,
207                reason: format!(
208                    "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
209                ),
210            });
211        }
212
213        self.context
214            .metrics
215            .node_metrics
216            .verified_blocks
217            .with_label_values(&[peer_hostname])
218            .inc();
219
220        let missing_ancestors = self
221            .core_dispatcher
222            .add_blocks(vec![verified_block])
223            .await
224            .map_err(|_| ConsensusError::Shutdown)?;
225        if !missing_ancestors.is_empty() {
226            // schedule the fetching of them from this peer
227            if let Err(err) = self
228                .synchronizer
229                .fetch_blocks(missing_ancestors, peer)
230                .await
231            {
232                warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
233            }
234        }
235
236        // After processing the block, process the excluded ancestors
237
238        let mut excluded_ancestors = serialized_block
239            .excluded_ancestors
240            .into_iter()
241            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
242            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
243            .map_err(ConsensusError::MalformedBlock)?;
244
245        let excluded_ancestors_limit = self.context.committee.size() * 2;
246        if excluded_ancestors.len() > excluded_ancestors_limit {
247            debug!(
248                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
249                excluded_ancestors.len() - excluded_ancestors_limit,
250                peer,
251                peer_hostname,
252            );
253            excluded_ancestors.truncate(excluded_ancestors_limit);
254        }
255
256        self.context
257            .metrics
258            .node_metrics
259            .network_received_excluded_ancestors_from_authority
260            .with_label_values(&[peer_hostname])
261            .inc_by(excluded_ancestors.len() as u64);
262
263        for excluded_ancestor in &excluded_ancestors {
264            let excluded_ancestor_hostname = &self
265                .context
266                .committee
267                .authority(excluded_ancestor.author)
268                .hostname;
269            self.context
270                .metrics
271                .node_metrics
272                .network_excluded_ancestors_count_by_authority
273                .with_label_values(&[excluded_ancestor_hostname])
274                .inc();
275        }
276
277        let missing_excluded_ancestors = self
278            .core_dispatcher
279            .check_block_refs(excluded_ancestors)
280            .await
281            .map_err(|_| ConsensusError::Shutdown)?;
282
283        if !missing_excluded_ancestors.is_empty() {
284            self.context
285                .metrics
286                .node_metrics
287                .network_excluded_ancestors_sent_to_fetch
288                .with_label_values(&[peer_hostname])
289                .inc_by(missing_excluded_ancestors.len() as u64);
290
291            let synchronizer = self.synchronizer.clone();
292            tokio::spawn(async move {
293                // schedule the fetching of them from this peer in the background
294                if let Err(err) = synchronizer
295                    .fetch_blocks(missing_excluded_ancestors, peer)
296                    .await
297                {
298                    warn!(
299                        "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
300                    );
301                }
302            });
303        }
304
305        Ok(())
306    }
307
308    async fn handle_subscribe_blocks(
309        &self,
310        peer: AuthorityIndex,
311        last_received: Round,
312    ) -> ConsensusResult<BlockStream> {
313        fail_point_async!("consensus-rpc-response");
314
315        let dag_state = self.dag_state.read();
316        // Find recent own blocks that have not been received by the peer.
317        // If last_received is a valid and more blocks have been proposed since then,
318        // this call is guaranteed to return at least some recent blocks, which
319        // will help with liveness.
320        let missed_blocks = stream::iter(
321            dag_state
322                .get_cached_blocks(self.context.own_index, last_received + 1)
323                .into_iter()
324                .map(|block| ExtendedSerializedBlock {
325                    block: block.serialized().clone(),
326                    excluded_ancestors: vec![],
327                }),
328        );
329
330        let broadcasted_blocks = BroadcastedBlockStream::new(
331            peer,
332            self.rx_block_broadcaster.resubscribe(),
333            self.subscription_counter.clone(),
334        );
335
336        // Return a stream of blocks that first yields missed blocks as requested, then
337        // new blocks.
338        Ok(Box::pin(missed_blocks.chain(
339            broadcasted_blocks.map(ExtendedSerializedBlock::from),
340        )))
341    }
342
343    async fn handle_fetch_blocks(
344        &self,
345        peer: AuthorityIndex,
346        block_refs: Vec<BlockRef>,
347        highest_accepted_rounds: Vec<Round>,
348    ) -> ConsensusResult<Vec<Bytes>> {
349        fail_point_async!("consensus-rpc-response");
350
351        const MAX_ADDITIONAL_BLOCKS: usize = 10;
352        if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
353            return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
354        }
355
356        if !highest_accepted_rounds.is_empty()
357            && highest_accepted_rounds.len() != self.context.committee.size()
358        {
359            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
360                highest_accepted_rounds.len(),
361                self.context.committee.size(),
362            ));
363        }
364
365        // Some quick validation of the requested block refs
366        for block in &block_refs {
367            if !self.context.committee.is_valid_index(block.author) {
368                return Err(ConsensusError::InvalidAuthorityIndex {
369                    index: block.author,
370                    max: self.context.committee.size(),
371                });
372            }
373            if block.round == GENESIS_ROUND {
374                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
375            }
376        }
377
378        // For now ask dag state directly
379        let blocks = self.dag_state.read().get_blocks(&block_refs);
380
381        // Now check if an ancestor's round is higher than the one that the peer has. If
382        // yes, then serve that ancestor blocks up to `MAX_ADDITIONAL_BLOCKS`.
383        let mut ancestor_blocks = vec![];
384        if !highest_accepted_rounds.is_empty() {
385            let all_ancestors = blocks
386                .iter()
387                .flatten()
388                .flat_map(|block| block.ancestors().to_vec())
389                .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
390                .take(MAX_ADDITIONAL_BLOCKS)
391                .collect::<Vec<_>>();
392
393            if !all_ancestors.is_empty() {
394                ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
395            }
396        }
397
398        // Return the serialised blocks & the ancestor blocks
399        let result = blocks
400            .into_iter()
401            .chain(ancestor_blocks)
402            .flatten()
403            .map(|block| block.serialized().clone())
404            .collect::<Vec<_>>();
405
406        Ok(result)
407    }
408
409    async fn handle_fetch_commits(
410        &self,
411        _peer: AuthorityIndex,
412        commit_range: CommitRange,
413    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
414        fail_point_async!("consensus-rpc-response");
415
416        // Compute an inclusive end index and bound the maximum number of commits
417        // scanned.
418        let inclusive_end = commit_range.end().min(
419            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
420                - 1,
421        );
422        let mut commits = self
423            .store
424            .scan_commits((commit_range.start()..=inclusive_end).into())?;
425        let mut certifier_block_refs = vec![];
426        'commit: while let Some(c) = commits.last() {
427            let index = c.index();
428            let votes = self.store.read_commit_votes(index)?;
429            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
430            for v in &votes {
431                stake_aggregator.add(v.author, &self.context.committee);
432            }
433            if stake_aggregator.reached_threshold(&self.context.committee) {
434                certifier_block_refs = votes;
435                break 'commit;
436            } else {
437                debug!(
438                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
439                    index,
440                    stake_aggregator.stake(),
441                    stake_aggregator.threshold(&self.context.committee)
442                );
443                self.context
444                    .metrics
445                    .node_metrics
446                    .commit_sync_fetch_commits_handler_uncertified_skipped
447                    .inc();
448                commits.pop();
449            }
450        }
451        let certifier_blocks = self
452            .store
453            .read_blocks(&certifier_block_refs)?
454            .into_iter()
455            .flatten()
456            .collect();
457        Ok((commits, certifier_blocks))
458    }
459
460    async fn handle_fetch_latest_blocks(
461        &self,
462        peer: AuthorityIndex,
463        authorities: Vec<AuthorityIndex>,
464    ) -> ConsensusResult<Vec<Bytes>> {
465        fail_point_async!("consensus-rpc-response");
466
467        if authorities.len() > self.context.committee.size() {
468            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
469        }
470
471        // Ensure that those are valid authorities
472        for authority in &authorities {
473            if !self.context.committee.is_valid_index(*authority) {
474                return Err(ConsensusError::InvalidAuthorityIndex {
475                    index: *authority,
476                    max: self.context.committee.size(),
477                });
478            }
479        }
480
481        // Read from the dag state to find the latest blocks.
482        // TODO: at the moment we don't look into the block manager for suspended
483        // blocks. Ideally we want in the future if we think we would like to
484        // tackle the majority of cases.
485        let mut blocks = vec![];
486        let dag_state = self.dag_state.read();
487        for authority in authorities {
488            let block = dag_state.get_last_block_for_authority(authority);
489
490            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
491
492            // no reason to serve back the genesis block - it's equal as if it has not
493            // received any block
494            if block.round() != GENESIS_ROUND {
495                blocks.push(block);
496            }
497        }
498
499        // Return the serialised blocks
500        let result = blocks
501            .into_iter()
502            .map(|block| block.serialized().clone())
503            .collect::<Vec<_>>();
504
505        Ok(result)
506    }
507
508    async fn handle_get_latest_rounds(
509        &self,
510        _peer: AuthorityIndex,
511    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
512        fail_point_async!("consensus-rpc-response");
513
514        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
515
516        let blocks = self
517            .dag_state
518            .read()
519            .get_last_cached_block_per_authority(Round::MAX);
520        let highest_accepted_rounds = blocks
521            .into_iter()
522            .map(|(block, _)| block.round())
523            .collect::<Vec<_>>();
524
525        // Own blocks do not go through the core dispatcher, so they need to be set
526        // separately.
527        highest_received_rounds[self.context.own_index] =
528            highest_accepted_rounds[self.context.own_index];
529
530        Ok((highest_received_rounds, highest_accepted_rounds))
531    }
532}
533
534struct Counter {
535    count: usize,
536    subscriptions_by_authority: Vec<usize>,
537}
538
539/// Atomically counts the number of active subscriptions to the block broadcast
540/// stream, and dispatch commands to core based on the changes.
541struct SubscriptionCounter {
542    context: Arc<Context>,
543    counter: parking_lot::Mutex<Counter>,
544    dispatcher: Arc<dyn CoreThreadDispatcher>,
545}
546
547impl SubscriptionCounter {
548    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
549        // Set the subscribed peers by default to 0
550        for (_, authority) in context.committee.authorities() {
551            context
552                .metrics
553                .node_metrics
554                .subscribed_by
555                .with_label_values(&[authority.hostname.as_str()])
556                .set(0);
557        }
558
559        Self {
560            counter: parking_lot::Mutex::new(Counter {
561                count: 0,
562                subscriptions_by_authority: vec![0; context.committee.size()],
563            }),
564            dispatcher,
565            context,
566        }
567    }
568
569    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
570        let mut counter = self.counter.lock();
571        counter.count += 1;
572        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
573        counter.subscriptions_by_authority[peer] += 1;
574        let mut total_stake = 0;
575        for (authority_index, _) in self.context.committee.authorities() {
576            if counter.subscriptions_by_authority[authority_index] >= 1
577                || self.context.own_index == authority_index
578            {
579                total_stake += self.context.committee.stake(authority_index);
580            }
581        }
582        // Stake of subscriptions before a new peer was subscribed
583        let previous_stake = if original_subscription_by_peer == 0 {
584            total_stake - self.context.committee.stake(peer)
585        } else {
586            total_stake
587        };
588
589        let peer_hostname = &self.context.committee.authority(peer).hostname;
590        self.context
591            .metrics
592            .node_metrics
593            .subscribed_by
594            .with_label_values(&[peer_hostname])
595            .set(1);
596        // If the subscription count reaches quorum, notify the dispatcher and get ready
597        // to propose blocks.
598        if !self.context.committee.reached_quorum(previous_stake)
599            && self.context.committee.reached_quorum(total_stake)
600        {
601            self.dispatcher
602                .set_quorum_subscribers_exists(true)
603                .map_err(|_| ConsensusError::Shutdown)?;
604        }
605        // Drop the counter after sending the command to the dispatcher
606        drop(counter);
607        Ok(())
608    }
609
610    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
611        let mut counter = self.counter.lock();
612        counter.count -= 1;
613        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
614        counter.subscriptions_by_authority[peer] -= 1;
615        let mut total_stake = 0;
616        for (authority_index, _) in self.context.committee.authorities() {
617            if counter.subscriptions_by_authority[authority_index] >= 1
618                || self.context.own_index == authority_index
619            {
620                total_stake += self.context.committee.stake(authority_index);
621            }
622        }
623        // Stake of subscriptions before a peer was dropped
624        let previous_stake = if original_subscription_by_peer == 1 {
625            total_stake + self.context.committee.stake(peer)
626        } else {
627            total_stake
628        };
629
630        if counter.subscriptions_by_authority[peer] == 0 {
631            let peer_hostname = &self.context.committee.authority(peer).hostname;
632            self.context
633                .metrics
634                .node_metrics
635                .subscribed_by
636                .with_label_values(&[peer_hostname])
637                .set(0);
638        }
639
640        // If the subscription count drops below quorum, notify the dispatcher to stop
641        // proposing blocks.
642        if self.context.committee.reached_quorum(previous_stake)
643            && !self.context.committee.reached_quorum(total_stake)
644        {
645            self.dispatcher
646                .set_quorum_subscribers_exists(false)
647                .map_err(|_| ConsensusError::Shutdown)?;
648        }
649        // Drop the counter after sending the command to the dispatcher
650        drop(counter);
651        Ok(())
652    }
653}
654
655/// Each broadcasted block stream wraps a broadcast receiver for blocks.
656/// It yields blocks that are broadcasted after the stream is created.
657type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
658
659/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference
660/// is that this tolerates lags with only logging, without yielding errors.
661struct BroadcastStream<T> {
662    peer: AuthorityIndex,
663    // Stores the receiver across poll_next() calls.
664    inner: ReusableBoxFuture<
665        'static,
666        (
667            Result<T, broadcast::error::RecvError>,
668            broadcast::Receiver<T>,
669        ),
670    >,
671    // Counts total subscriptions / active BroadcastStreams.
672    subscription_counter: Arc<SubscriptionCounter>,
673}
674
675impl<T: 'static + Clone + Send> BroadcastStream<T> {
676    pub fn new(
677        peer: AuthorityIndex,
678        rx: broadcast::Receiver<T>,
679        subscription_counter: Arc<SubscriptionCounter>,
680    ) -> Self {
681        if let Err(err) = subscription_counter.increment(peer) {
682            match err {
683                ConsensusError::Shutdown => {}
684                _ => panic!("Unexpected error: {err}"),
685            }
686        }
687        Self {
688            peer,
689            inner: ReusableBoxFuture::new(make_recv_future(rx)),
690            subscription_counter,
691        }
692    }
693}
694
695impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
696    type Item = T;
697
698    fn poll_next(
699        mut self: Pin<&mut Self>,
700        cx: &mut task::Context<'_>,
701    ) -> task::Poll<Option<Self::Item>> {
702        let peer = self.peer;
703        let maybe_item = loop {
704            let (result, rx) = ready!(self.inner.poll(cx));
705            self.inner.set(make_recv_future(rx));
706
707            match result {
708                Ok(item) => break Some(item),
709                Err(broadcast::error::RecvError::Closed) => {
710                    info!("Block BroadcastedBlockStream {} closed", peer);
711                    break None;
712                }
713                Err(broadcast::error::RecvError::Lagged(n)) => {
714                    warn!(
715                        "Block BroadcastedBlockStream {} lagged by {} messages",
716                        peer, n
717                    );
718                    continue;
719                }
720            }
721        };
722        task::Poll::Ready(maybe_item)
723    }
724}
725
726impl<T> Drop for BroadcastStream<T> {
727    fn drop(&mut self) {
728        if let Err(err) = self.subscription_counter.decrement(self.peer) {
729            match err {
730                ConsensusError::Shutdown => {}
731                _ => panic!("Unexpected error: {err}"),
732            }
733        }
734    }
735}
736
737async fn make_recv_future<T: Clone>(
738    mut rx: broadcast::Receiver<T>,
739) -> (
740    Result<T, broadcast::error::RecvError>,
741    broadcast::Receiver<T>,
742) {
743    let result = rx.recv().await;
744    (result, rx)
745}
746
747// TODO: add a unit test for BroadcastStream.
748
749#[cfg(test)]
750mod tests {
751    use std::{
752        collections::{BTreeMap, BTreeSet},
753        sync::Arc,
754        time::Duration,
755    };
756
757    use async_trait::async_trait;
758    use bytes::Bytes;
759    use consensus_config::AuthorityIndex;
760    use parking_lot::{Mutex, RwLock};
761    use tokio::{sync::broadcast, time::sleep};
762
763    use crate::{
764        Round,
765        authority_service::AuthorityService,
766        block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
767        commit::{CertifiedCommits, CommitRange},
768        commit_vote_monitor::CommitVoteMonitor,
769        context::Context,
770        core_thread::{CoreError, CoreThreadDispatcher},
771        dag_state::DagState,
772        error::ConsensusResult,
773        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
774        round_prober::QuorumRound,
775        storage::mem_store::MemStore,
776        synchronizer::Synchronizer,
777        test_dag_builder::DagBuilder,
778    };
779
780    struct FakeCoreThreadDispatcher {
781        blocks: Mutex<Vec<VerifiedBlock>>,
782    }
783
784    impl FakeCoreThreadDispatcher {
785        fn new() -> Self {
786            Self {
787                blocks: Mutex::new(vec![]),
788            }
789        }
790
791        fn get_blocks(&self) -> Vec<VerifiedBlock> {
792            self.blocks.lock().clone()
793        }
794    }
795
796    #[async_trait]
797    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
798        async fn add_blocks(
799            &self,
800            blocks: Vec<VerifiedBlock>,
801        ) -> Result<BTreeSet<BlockRef>, CoreError> {
802            let block_refs = blocks.iter().map(|b| b.reference()).collect();
803            self.blocks.lock().extend(blocks);
804            Ok(block_refs)
805        }
806
807        async fn check_block_refs(
808            &self,
809            _block_refs: Vec<BlockRef>,
810        ) -> Result<BTreeSet<BlockRef>, CoreError> {
811            Ok(BTreeSet::new())
812        }
813
814        async fn add_certified_commits(
815            &self,
816            _commits: CertifiedCommits,
817        ) -> Result<BTreeSet<BlockRef>, CoreError> {
818            todo!()
819        }
820
821        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
822            Ok(())
823        }
824
825        async fn get_missing_blocks(
826            &self,
827        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
828            Ok(Default::default())
829        }
830
831        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
832            todo!()
833        }
834
835        fn set_propagation_delay_and_quorum_rounds(
836            &self,
837            _delay: Round,
838            _received_quorum_rounds: Vec<QuorumRound>,
839            _accepted_quorum_rounds: Vec<QuorumRound>,
840        ) -> Result<(), CoreError> {
841            todo!()
842        }
843
844        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
845            todo!()
846        }
847
848        fn highest_received_rounds(&self) -> Vec<Round> {
849            todo!()
850        }
851    }
852
853    #[derive(Default)]
854    struct FakeNetworkClient {}
855
856    #[async_trait]
857    impl NetworkClient for FakeNetworkClient {
858        const SUPPORT_STREAMING: bool = false;
859
860        async fn send_block(
861            &self,
862            _peer: AuthorityIndex,
863            _block: &VerifiedBlock,
864            _timeout: Duration,
865        ) -> ConsensusResult<()> {
866            unimplemented!("Unimplemented")
867        }
868
869        async fn subscribe_blocks(
870            &self,
871            _peer: AuthorityIndex,
872            _last_received: Round,
873            _timeout: Duration,
874        ) -> ConsensusResult<BlockStream> {
875            unimplemented!("Unimplemented")
876        }
877
878        async fn fetch_blocks(
879            &self,
880            _peer: AuthorityIndex,
881            _block_refs: Vec<BlockRef>,
882            _highest_accepted_rounds: Vec<Round>,
883            _timeout: Duration,
884        ) -> ConsensusResult<Vec<Bytes>> {
885            unimplemented!("Unimplemented")
886        }
887
888        async fn fetch_commits(
889            &self,
890            _peer: AuthorityIndex,
891            _commit_range: CommitRange,
892            _timeout: Duration,
893        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
894            unimplemented!("Unimplemented")
895        }
896
897        async fn fetch_latest_blocks(
898            &self,
899            _peer: AuthorityIndex,
900            _authorities: Vec<AuthorityIndex>,
901            _timeout: Duration,
902        ) -> ConsensusResult<Vec<Bytes>> {
903            unimplemented!("Unimplemented")
904        }
905
906        async fn get_latest_rounds(
907            &self,
908            _peer: AuthorityIndex,
909            _timeout: Duration,
910        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
911            unimplemented!("Unimplemented")
912        }
913    }
914
915    #[tokio::test(flavor = "current_thread", start_paused = true)]
916    async fn test_handle_send_block() {
917        let (context, _keys) = Context::new_for_test(4);
918        let context = Arc::new(context);
919        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
920        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
921        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
922        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
923        let network_client = Arc::new(FakeNetworkClient::default());
924        let store = Arc::new(MemStore::new());
925        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
926        let synchronizer = Synchronizer::start(
927            network_client,
928            context.clone(),
929            core_dispatcher.clone(),
930            commit_vote_monitor.clone(),
931            block_verifier.clone(),
932            dag_state.clone(),
933            false,
934        );
935        let authority_service = Arc::new(AuthorityService::new(
936            context.clone(),
937            block_verifier,
938            commit_vote_monitor,
939            synchronizer,
940            core_dispatcher.clone(),
941            rx_block_broadcast,
942            dag_state,
943            store,
944        ));
945
946        // Test delaying blocks with time drift.
947        let now = context.clock.timestamp_utc_ms();
948        let max_drift = context.parameters.max_forward_time_drift;
949        let input_block = VerifiedBlock::new_for_test(
950            TestBlock::new(9, 0)
951                .set_timestamp_ms(now + max_drift.as_millis() as u64)
952                .build(),
953        );
954
955        let service = authority_service.clone();
956        let serialized = ExtendedSerializedBlock {
957            block: input_block.serialized().clone(),
958            excluded_ancestors: vec![],
959        };
960
961        tokio::spawn(async move {
962            service
963                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
964                .await
965                .unwrap();
966        });
967
968        sleep(max_drift / 2).await;
969        assert!(core_dispatcher.get_blocks().is_empty());
970
971        sleep(max_drift).await;
972        let blocks = core_dispatcher.get_blocks();
973        assert_eq!(blocks.len(), 1);
974        assert_eq!(blocks[0], input_block);
975    }
976
977    #[tokio::test(flavor = "current_thread", start_paused = true)]
978    async fn test_handle_fetch_latest_blocks() {
979        // GIVEN
980        let (context, _keys) = Context::new_for_test(4);
981        let context = Arc::new(context);
982        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
983        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
984        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
985        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
986        let network_client = Arc::new(FakeNetworkClient::default());
987        let store = Arc::new(MemStore::new());
988        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
989        let synchronizer = Synchronizer::start(
990            network_client,
991            context.clone(),
992            core_dispatcher.clone(),
993            commit_vote_monitor.clone(),
994            block_verifier.clone(),
995            dag_state.clone(),
996            true,
997        );
998        let authority_service = Arc::new(AuthorityService::new(
999            context.clone(),
1000            block_verifier,
1001            commit_vote_monitor,
1002            synchronizer,
1003            core_dispatcher.clone(),
1004            rx_block_broadcast,
1005            dag_state.clone(),
1006            store,
1007        ));
1008
1009        // Create some blocks for a few authorities. Create some equivocations as well
1010        // and store in dag state.
1011        let mut dag_builder = DagBuilder::new(context.clone());
1012        dag_builder
1013            .layers(1..=10)
1014            .authorities(vec![AuthorityIndex::new_for_test(2)])
1015            .equivocate(1)
1016            .build()
1017            .persist_layers(dag_state);
1018
1019        // WHEN
1020        let authorities_to_request = vec![
1021            AuthorityIndex::new_for_test(1),
1022            AuthorityIndex::new_for_test(2),
1023        ];
1024        let results = authority_service
1025            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1026            .await;
1027
1028        // THEN
1029        let serialised_blocks = results.unwrap();
1030        for serialised_block in serialised_blocks {
1031            let signed_block: SignedBlock =
1032                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1033            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1034
1035            assert_eq!(verified_block.round(), 10);
1036        }
1037    }
1038}