consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18    CommitIndex, Round,
19    block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    commit::{CommitAPI as _, CommitRange, TrustedCommit},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::Context,
24    core_thread::CoreThreadDispatcher,
25    dag_state::DagState,
26    error::{ConsensusError, ConsensusResult},
27    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28    stake_aggregator::{QuorumThreshold, StakeAggregator},
29    storage::Store,
30    synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35/// Authority's network service implementation, agnostic to the actual
36/// networking stack used.
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38    context: Arc<Context>,
39    commit_vote_monitor: Arc<CommitVoteMonitor>,
40    block_verifier: Arc<dyn BlockVerifier>,
41    synchronizer: Arc<SynchronizerHandle>,
42    core_dispatcher: Arc<C>,
43    rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44    subscription_counter: Arc<SubscriptionCounter>,
45    dag_state: Arc<RwLock<DagState>>,
46    store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50    pub(crate) fn new(
51        context: Arc<Context>,
52        block_verifier: Arc<dyn BlockVerifier>,
53        commit_vote_monitor: Arc<CommitVoteMonitor>,
54        synchronizer: Arc<SynchronizerHandle>,
55        core_dispatcher: Arc<C>,
56        rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57        dag_state: Arc<RwLock<DagState>>,
58        store: Arc<dyn Store>,
59    ) -> Self {
60        let subscription_counter = Arc::new(SubscriptionCounter::new(
61            context.clone(),
62            core_dispatcher.clone(),
63        ));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcaster,
71            subscription_counter,
72            dag_state,
73            store,
74        }
75    }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80    async fn handle_send_block(
81        &self,
82        peer: AuthorityIndex,
83        serialized_block: ExtendedSerializedBlock,
84    ) -> ConsensusResult<()> {
85        fail_point_async!("consensus-rpc-response");
86
87        let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89        // TODO: dedup block verifications, here and with fetched blocks.
90        let signed_block: SignedBlock =
91            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93        // Reject blocks not produced by the peer.
94        if peer != signed_block.author() {
95            self.context
96                .metrics
97                .node_metrics
98                .invalid_blocks
99                .with_label_values(&[
100                    peer_hostname.as_str(),
101                    "handle_send_block",
102                    "UnexpectedAuthority",
103                ])
104                .inc();
105            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106            info!("Block with wrong authority from {}: {}", peer, e);
107            return Err(e);
108        }
109        let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111        // Reject blocks failing validations.
112        if let Err(e) = self.block_verifier.verify(&signed_block) {
113            self.context
114                .metrics
115                .node_metrics
116                .invalid_blocks
117                .with_label_values(&[
118                    peer_hostname.as_str(),
119                    "handle_send_block",
120                    e.clone().name(),
121                ])
122                .inc();
123            info!("Invalid block from {}: {}", peer, e);
124            return Err(e);
125        }
126        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127        let block_ref = verified_block.reference();
128        debug!("Received block {} via send block.", block_ref);
129
130        // Reject block with timestamp too far in the future.
131        let now = self.context.clock.timestamp_utc_ms();
132        let forward_time_drift =
133            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
134        if forward_time_drift > self.context.parameters.max_forward_time_drift {
135            self.context
136                .metrics
137                .node_metrics
138                .rejected_future_blocks
139                .with_label_values(&[peer_hostname])
140                .inc();
141            debug!(
142                "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
143                block_ref,
144                verified_block.timestamp_ms(),
145                now,
146            );
147            return Err(ConsensusError::BlockRejected {
148                block_ref,
149                reason: format!(
150                    "Block timestamp is too far in the future: {} > {}",
151                    verified_block.timestamp_ms(),
152                    now
153                ),
154            });
155        }
156
157        // Wait until the block's timestamp is current.
158        if forward_time_drift > Duration::ZERO {
159            self.context
160                .metrics
161                .node_metrics
162                .block_timestamp_drift_wait_ms
163                .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
164                .inc_by(forward_time_drift.as_millis() as u64);
165            debug!(
166                "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
167                block_ref,
168                verified_block.timestamp_ms(),
169                now,
170                forward_time_drift.as_millis(),
171            );
172            sleep(forward_time_drift).await;
173        }
174
175        // Observe the block for the commit votes. When local commit is lagging too
176        // much, commit sync loop will trigger fetching.
177        self.commit_vote_monitor.observe_block(&verified_block);
178
179        // Reject blocks when local commit index is lagging too far from quorum commit
180        // index.
181        //
182        // IMPORTANT: this must be done after observing votes from the block, otherwise
183        // observed quorum commit will no longer progress.
184        //
185        // Since the main issue with too many suspended blocks is memory usage not CPU,
186        // it is ok to reject after block verifications instead of before.
187        let last_commit_index = self.dag_state.read().last_commit_index();
188        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
189        // The threshold to ignore block should be larger than commit_sync_batch_size,
190        // to avoid excessive block rejections and synchronizations.
191        if last_commit_index
192            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
193            < quorum_commit_index
194        {
195            self.context
196                .metrics
197                .node_metrics
198                .rejected_blocks
199                .with_label_values(&["commit_lagging"])
200                .inc();
201            debug!(
202                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
203                block_ref, last_commit_index, quorum_commit_index,
204            );
205            return Err(ConsensusError::BlockRejected {
206                block_ref,
207                reason: format!(
208                    "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
209                ),
210            });
211        }
212
213        self.context
214            .metrics
215            .node_metrics
216            .verified_blocks
217            .with_label_values(&[peer_hostname])
218            .inc();
219
220        let missing_ancestors = self
221            .core_dispatcher
222            .add_blocks(vec![verified_block])
223            .await
224            .map_err(|_| ConsensusError::Shutdown)?;
225        if !missing_ancestors.is_empty() {
226            // schedule the fetching of them from this peer
227            if let Err(err) = self
228                .synchronizer
229                .fetch_blocks(missing_ancestors, peer)
230                .await
231            {
232                warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
233            }
234        }
235
236        // After processing the block, process the excluded ancestors
237
238        let mut excluded_ancestors = serialized_block
239            .excluded_ancestors
240            .into_iter()
241            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
242            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
243            .map_err(ConsensusError::MalformedBlock)?;
244
245        let excluded_ancestors_limit = self.context.committee.size() * 2;
246        if excluded_ancestors.len() > excluded_ancestors_limit {
247            debug!(
248                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
249                excluded_ancestors.len() - excluded_ancestors_limit,
250                peer,
251                peer_hostname,
252            );
253            excluded_ancestors.truncate(excluded_ancestors_limit);
254        }
255
256        self.context
257            .metrics
258            .node_metrics
259            .network_received_excluded_ancestors_from_authority
260            .with_label_values(&[peer_hostname])
261            .inc_by(excluded_ancestors.len() as u64);
262
263        for excluded_ancestor in &excluded_ancestors {
264            let excluded_ancestor_hostname = &self
265                .context
266                .committee
267                .authority(excluded_ancestor.author)
268                .hostname;
269            self.context
270                .metrics
271                .node_metrics
272                .network_excluded_ancestors_count_by_authority
273                .with_label_values(&[excluded_ancestor_hostname])
274                .inc();
275        }
276
277        let missing_excluded_ancestors = self
278            .core_dispatcher
279            .check_block_refs(excluded_ancestors)
280            .await
281            .map_err(|_| ConsensusError::Shutdown)?;
282
283        if !missing_excluded_ancestors.is_empty() {
284            self.context
285                .metrics
286                .node_metrics
287                .network_excluded_ancestors_sent_to_fetch
288                .with_label_values(&[peer_hostname])
289                .inc_by(missing_excluded_ancestors.len() as u64);
290
291            let synchronizer = self.synchronizer.clone();
292            tokio::spawn(async move {
293                // schedule the fetching of them from this peer in the background
294                if let Err(err) = synchronizer
295                    .fetch_blocks(missing_excluded_ancestors, peer)
296                    .await
297                {
298                    warn!(
299                        "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
300                    );
301                }
302            });
303        }
304
305        Ok(())
306    }
307
308    async fn handle_subscribe_blocks(
309        &self,
310        peer: AuthorityIndex,
311        last_received: Round,
312    ) -> ConsensusResult<BlockStream> {
313        fail_point_async!("consensus-rpc-response");
314
315        let dag_state = self.dag_state.read();
316        // Find recent own blocks that have not been received by the peer.
317        // If last_received is a valid and more blocks have been proposed since then,
318        // this call is guaranteed to return at least some recent blocks, which
319        // will help with liveness.
320        let missed_blocks = stream::iter(
321            dag_state
322                .get_cached_blocks(self.context.own_index, last_received + 1)
323                .into_iter()
324                .map(|block| ExtendedSerializedBlock {
325                    block: block.serialized().clone(),
326                    excluded_ancestors: vec![],
327                }),
328        );
329
330        let broadcasted_blocks = BroadcastedBlockStream::new(
331            peer,
332            self.rx_block_broadcaster.resubscribe(),
333            self.subscription_counter.clone(),
334        );
335
336        // Return a stream of blocks that first yields missed blocks as requested, then
337        // new blocks.
338        Ok(Box::pin(missed_blocks.chain(
339            broadcasted_blocks.map(ExtendedSerializedBlock::from),
340        )))
341    }
342
343    // Handles two types of requests:
344    // 1. Missing block for block sync:
345    //    - uses highest_accepted_rounds.
346    //    - at most max_blocks_per_sync blocks should be returned.
347    // 2. Committed block for commit sync:
348    //    - does not use highest_accepted_rounds.
349    //    - at most max_blocks_per_fetch blocks should be returned.
350    async fn handle_fetch_blocks(
351        &self,
352        peer: AuthorityIndex,
353        mut block_refs: Vec<BlockRef>,
354        highest_accepted_rounds: Vec<Round>,
355    ) -> ConsensusResult<Vec<Bytes>> {
356        // This method is used for both commit sync and periodic/live synchronizer.
357        // For commit sync, we do not use highest_accepted_rounds and the fetch size is
358        // larger.
359        let commit_sync_handle = highest_accepted_rounds.is_empty();
360
361        fail_point_async!("consensus-rpc-response");
362
363        // Some quick validation of the requested block refs
364        for block in &block_refs {
365            if !self.context.committee.is_valid_index(block.author) {
366                return Err(ConsensusError::InvalidAuthorityIndex {
367                    index: block.author,
368                    max: self.context.committee.size(),
369                });
370            }
371            if block.round == GENESIS_ROUND {
372                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
373            }
374        }
375
376        if !self.context.protocol_config.consensus_batched_block_sync() {
377            if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
378                return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
379            }
380
381            if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
382            {
383                return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
384                    highest_accepted_rounds.len(),
385                    self.context.committee.size(),
386                ));
387            }
388
389            // For now ask dag state directly
390            let blocks = self.dag_state.read().get_blocks(&block_refs);
391
392            // Now check if an ancestor's round is higher than the one that the peer has. If
393            // yes, then serve that ancestor blocks up to `MAX_ADDITIONAL_BLOCKS`.
394            let mut ancestor_blocks = vec![];
395            if !commit_sync_handle {
396                let all_ancestors = blocks
397                    .iter()
398                    .flatten()
399                    .flat_map(|block| block.ancestors().to_vec())
400                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
401                    .take(MAX_ADDITIONAL_BLOCKS)
402                    .collect::<Vec<_>>();
403
404                if !all_ancestors.is_empty() {
405                    ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
406                }
407            }
408
409            // Return the serialised blocks & the ancestor blocks
410            let result = blocks
411                .into_iter()
412                .chain(ancestor_blocks)
413                .flatten()
414                .map(|block| block.serialized().clone())
415                .collect::<Vec<_>>();
416
417            return Ok(result);
418        }
419
420        // For commit sync, the fetch size is larger. For periodic/live synchronizer,
421        // the fetch size is smaller.else { Instead of rejecting the request, we
422        // truncate the size to allow an easy update of this parameter in the future.
423        if commit_sync_handle {
424            block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
425        } else {
426            block_refs.truncate(self.context.parameters.max_blocks_per_sync);
427        }
428
429        // Get requested blocks from store.
430        let blocks = if commit_sync_handle {
431            // For commit sync, we respond with all blocks from the store
432            self.dag_state
433                .read()
434                .get_blocks(&block_refs)
435                .into_iter()
436                .flatten()
437                .collect()
438        } else {
439            // For periodic or live synchronizer, we respond with requested blocks from the
440            // store and with additional blocks from the cache
441            block_refs.sort();
442            block_refs.dedup();
443            let dag_state = self.dag_state.read();
444            let mut blocks = dag_state
445                .get_blocks(&block_refs)
446                .into_iter()
447                .flatten()
448                .collect::<Vec<_>>();
449
450            // Get additional blocks for authorities with missing block, if they are
451            // available in cache. Compute the lowest missing round per
452            // requested authority.
453            let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
454            for block_ref in blocks.iter().map(|b| b.reference()) {
455                let entry = lowest_missing_rounds
456                    .entry(block_ref.author)
457                    .or_insert(block_ref.round);
458                *entry = (*entry).min(block_ref.round);
459            }
460
461            // Retrieve additional blocks per authority, from peer's highest accepted round
462            // + 1 to lowest missing round (exclusive) per requested authority. Start with
463            //   own blocks.
464            let own_index = self.context.own_index;
465
466            // Collect and sort so own_index comes first
467            let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
468            ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
469
470            for (authority, lowest_missing_round) in ordered_missing_rounds {
471                let highest_accepted_round = highest_accepted_rounds[authority];
472                if highest_accepted_round >= lowest_missing_round {
473                    continue;
474                }
475
476                let missing_blocks = dag_state.get_cached_blocks_in_range(
477                    authority,
478                    highest_accepted_round + 1,
479                    lowest_missing_round,
480                    self.context
481                        .parameters
482                        .max_blocks_per_sync
483                        .saturating_sub(blocks.len()),
484                );
485                blocks.extend(missing_blocks);
486                if blocks.len() >= self.context.parameters.max_blocks_per_sync {
487                    blocks.truncate(self.context.parameters.max_blocks_per_sync);
488                    break;
489                }
490            }
491
492            blocks
493        };
494
495        // Return the serialized blocks
496        let bytes = blocks
497            .into_iter()
498            .map(|block| block.serialized().clone())
499            .collect::<Vec<_>>();
500        Ok(bytes)
501    }
502
503    async fn handle_fetch_commits(
504        &self,
505        _peer: AuthorityIndex,
506        commit_range: CommitRange,
507    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
508        fail_point_async!("consensus-rpc-response");
509
510        // Compute an inclusive end index and bound the maximum number of commits
511        // scanned.
512        let inclusive_end = commit_range.end().min(
513            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
514                - 1,
515        );
516        let mut commits = self
517            .store
518            .scan_commits((commit_range.start()..=inclusive_end).into())?;
519        let mut certifier_block_refs = vec![];
520        'commit: while let Some(c) = commits.last() {
521            let index = c.index();
522            let votes = self.store.read_commit_votes(index)?;
523            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
524            for v in &votes {
525                stake_aggregator.add(v.author, &self.context.committee);
526            }
527            if stake_aggregator.reached_threshold(&self.context.committee) {
528                certifier_block_refs = votes;
529                break 'commit;
530            } else {
531                debug!(
532                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
533                    index,
534                    stake_aggregator.stake(),
535                    stake_aggregator.threshold(&self.context.committee)
536                );
537                self.context
538                    .metrics
539                    .node_metrics
540                    .commit_sync_fetch_commits_handler_uncertified_skipped
541                    .inc();
542                commits.pop();
543            }
544        }
545        let certifier_blocks = self
546            .store
547            .read_blocks(&certifier_block_refs)?
548            .into_iter()
549            .flatten()
550            .collect();
551        Ok((commits, certifier_blocks))
552    }
553
554    async fn handle_fetch_latest_blocks(
555        &self,
556        peer: AuthorityIndex,
557        authorities: Vec<AuthorityIndex>,
558    ) -> ConsensusResult<Vec<Bytes>> {
559        fail_point_async!("consensus-rpc-response");
560
561        if authorities.len() > self.context.committee.size() {
562            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
563        }
564
565        // Ensure that those are valid authorities
566        for authority in &authorities {
567            if !self.context.committee.is_valid_index(*authority) {
568                return Err(ConsensusError::InvalidAuthorityIndex {
569                    index: *authority,
570                    max: self.context.committee.size(),
571                });
572            }
573        }
574
575        // Read from the dag state to find the latest blocks.
576        // TODO: at the moment we don't look into the block manager for suspended
577        // blocks. Ideally we want in the future if we think we would like to
578        // tackle the majority of cases.
579        let mut blocks = vec![];
580        let dag_state = self.dag_state.read();
581        for authority in authorities {
582            let block = dag_state.get_last_block_for_authority(authority);
583
584            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
585
586            // no reason to serve back the genesis block - it's equal as if it has not
587            // received any block
588            if block.round() != GENESIS_ROUND {
589                blocks.push(block);
590            }
591        }
592
593        // Return the serialised blocks
594        let result = blocks
595            .into_iter()
596            .map(|block| block.serialized().clone())
597            .collect::<Vec<_>>();
598
599        Ok(result)
600    }
601
602    async fn handle_get_latest_rounds(
603        &self,
604        _peer: AuthorityIndex,
605    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
606        fail_point_async!("consensus-rpc-response");
607
608        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
609
610        let blocks = self
611            .dag_state
612            .read()
613            .get_last_cached_block_per_authority(Round::MAX);
614        let highest_accepted_rounds = blocks
615            .into_iter()
616            .map(|(block, _)| block.round())
617            .collect::<Vec<_>>();
618
619        // Own blocks do not go through the core dispatcher, so they need to be set
620        // separately.
621        highest_received_rounds[self.context.own_index] =
622            highest_accepted_rounds[self.context.own_index];
623
624        Ok((highest_received_rounds, highest_accepted_rounds))
625    }
626}
627
628struct Counter {
629    count: usize,
630    subscriptions_by_authority: Vec<usize>,
631}
632
633/// Atomically counts the number of active subscriptions to the block broadcast
634/// stream, and dispatch commands to core based on the changes.
635struct SubscriptionCounter {
636    context: Arc<Context>,
637    counter: parking_lot::Mutex<Counter>,
638    dispatcher: Arc<dyn CoreThreadDispatcher>,
639}
640
641impl SubscriptionCounter {
642    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
643        // Set the subscribed peers by default to 0
644        for (_, authority) in context.committee.authorities() {
645            context
646                .metrics
647                .node_metrics
648                .subscribed_by
649                .with_label_values(&[authority.hostname.as_str()])
650                .set(0);
651        }
652
653        Self {
654            counter: parking_lot::Mutex::new(Counter {
655                count: 0,
656                subscriptions_by_authority: vec![0; context.committee.size()],
657            }),
658            dispatcher,
659            context,
660        }
661    }
662
663    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
664        let mut counter = self.counter.lock();
665        counter.count += 1;
666        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
667        counter.subscriptions_by_authority[peer] += 1;
668        let mut total_stake = 0;
669        for (authority_index, _) in self.context.committee.authorities() {
670            if counter.subscriptions_by_authority[authority_index] >= 1
671                || self.context.own_index == authority_index
672            {
673                total_stake += self.context.committee.stake(authority_index);
674            }
675        }
676        // Stake of subscriptions before a new peer was subscribed
677        let previous_stake = if original_subscription_by_peer == 0 {
678            total_stake - self.context.committee.stake(peer)
679        } else {
680            total_stake
681        };
682
683        let peer_hostname = &self.context.committee.authority(peer).hostname;
684        self.context
685            .metrics
686            .node_metrics
687            .subscribed_by
688            .with_label_values(&[peer_hostname])
689            .set(1);
690        // If the subscription count reaches quorum, notify the dispatcher and get ready
691        // to propose blocks.
692        if !self.context.committee.reached_quorum(previous_stake)
693            && self.context.committee.reached_quorum(total_stake)
694        {
695            self.dispatcher
696                .set_quorum_subscribers_exists(true)
697                .map_err(|_| ConsensusError::Shutdown)?;
698        }
699        // Drop the counter after sending the command to the dispatcher
700        drop(counter);
701        Ok(())
702    }
703
704    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
705        let mut counter = self.counter.lock();
706        counter.count -= 1;
707        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
708        counter.subscriptions_by_authority[peer] -= 1;
709        let mut total_stake = 0;
710        for (authority_index, _) in self.context.committee.authorities() {
711            if counter.subscriptions_by_authority[authority_index] >= 1
712                || self.context.own_index == authority_index
713            {
714                total_stake += self.context.committee.stake(authority_index);
715            }
716        }
717        // Stake of subscriptions before a peer was dropped
718        let previous_stake = if original_subscription_by_peer == 1 {
719            total_stake + self.context.committee.stake(peer)
720        } else {
721            total_stake
722        };
723
724        if counter.subscriptions_by_authority[peer] == 0 {
725            let peer_hostname = &self.context.committee.authority(peer).hostname;
726            self.context
727                .metrics
728                .node_metrics
729                .subscribed_by
730                .with_label_values(&[peer_hostname])
731                .set(0);
732        }
733
734        // If the subscription count drops below quorum, notify the dispatcher to stop
735        // proposing blocks.
736        if self.context.committee.reached_quorum(previous_stake)
737            && !self.context.committee.reached_quorum(total_stake)
738        {
739            self.dispatcher
740                .set_quorum_subscribers_exists(false)
741                .map_err(|_| ConsensusError::Shutdown)?;
742        }
743        // Drop the counter after sending the command to the dispatcher
744        drop(counter);
745        Ok(())
746    }
747}
748
749/// Each broadcasted block stream wraps a broadcast receiver for blocks.
750/// It yields blocks that are broadcasted after the stream is created.
751type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
752
753/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference
754/// is that this tolerates lags with only logging, without yielding errors.
755struct BroadcastStream<T> {
756    peer: AuthorityIndex,
757    // Stores the receiver across poll_next() calls.
758    inner: ReusableBoxFuture<
759        'static,
760        (
761            Result<T, broadcast::error::RecvError>,
762            broadcast::Receiver<T>,
763        ),
764    >,
765    // Counts total subscriptions / active BroadcastStreams.
766    subscription_counter: Arc<SubscriptionCounter>,
767}
768
769impl<T: 'static + Clone + Send> BroadcastStream<T> {
770    pub fn new(
771        peer: AuthorityIndex,
772        rx: broadcast::Receiver<T>,
773        subscription_counter: Arc<SubscriptionCounter>,
774    ) -> Self {
775        if let Err(err) = subscription_counter.increment(peer) {
776            match err {
777                ConsensusError::Shutdown => {}
778                _ => panic!("Unexpected error: {err}"),
779            }
780        }
781        Self {
782            peer,
783            inner: ReusableBoxFuture::new(make_recv_future(rx)),
784            subscription_counter,
785        }
786    }
787}
788
789impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
790    type Item = T;
791
792    fn poll_next(
793        mut self: Pin<&mut Self>,
794        cx: &mut task::Context<'_>,
795    ) -> task::Poll<Option<Self::Item>> {
796        let peer = self.peer;
797        let maybe_item = loop {
798            let (result, rx) = ready!(self.inner.poll(cx));
799            self.inner.set(make_recv_future(rx));
800
801            match result {
802                Ok(item) => break Some(item),
803                Err(broadcast::error::RecvError::Closed) => {
804                    info!("Block BroadcastedBlockStream {} closed", peer);
805                    break None;
806                }
807                Err(broadcast::error::RecvError::Lagged(n)) => {
808                    warn!(
809                        "Block BroadcastedBlockStream {} lagged by {} messages",
810                        peer, n
811                    );
812                    continue;
813                }
814            }
815        };
816        task::Poll::Ready(maybe_item)
817    }
818}
819
820impl<T> Drop for BroadcastStream<T> {
821    fn drop(&mut self) {
822        if let Err(err) = self.subscription_counter.decrement(self.peer) {
823            match err {
824                ConsensusError::Shutdown => {}
825                _ => panic!("Unexpected error: {err}"),
826            }
827        }
828    }
829}
830
831async fn make_recv_future<T: Clone>(
832    mut rx: broadcast::Receiver<T>,
833) -> (
834    Result<T, broadcast::error::RecvError>,
835    broadcast::Receiver<T>,
836) {
837    let result = rx.recv().await;
838    (result, rx)
839}
840
841// TODO: add a unit test for BroadcastStream.
842
843#[cfg(test)]
844pub(crate) mod tests {
845    use std::{
846        collections::{BTreeMap, BTreeSet},
847        sync::Arc,
848        time::Duration,
849    };
850
851    use async_trait::async_trait;
852    use bytes::Bytes;
853    use consensus_config::AuthorityIndex;
854    use parking_lot::{Mutex, RwLock};
855    use tokio::{sync::broadcast, time::sleep};
856
857    use crate::{
858        Round,
859        authority_service::AuthorityService,
860        block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
861        commit::{CertifiedCommits, CommitRange},
862        commit_vote_monitor::CommitVoteMonitor,
863        context::Context,
864        core_thread::{CoreError, CoreThreadDispatcher},
865        dag_state::DagState,
866        error::ConsensusResult,
867        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
868        round_prober::QuorumRound,
869        storage::mem_store::MemStore,
870        synchronizer::Synchronizer,
871        test_dag_builder::DagBuilder,
872    };
873
874    pub(crate) struct FakeCoreThreadDispatcher {
875        blocks: Mutex<Vec<VerifiedBlock>>,
876    }
877
878    impl FakeCoreThreadDispatcher {
879        pub(crate) fn new() -> Self {
880            Self {
881                blocks: Mutex::new(vec![]),
882            }
883        }
884
885        fn get_blocks(&self) -> Vec<VerifiedBlock> {
886            self.blocks.lock().clone()
887        }
888    }
889
890    #[async_trait]
891    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
892        async fn add_blocks(
893            &self,
894            blocks: Vec<VerifiedBlock>,
895        ) -> Result<BTreeSet<BlockRef>, CoreError> {
896            let block_refs = blocks.iter().map(|b| b.reference()).collect();
897            self.blocks.lock().extend(blocks);
898            Ok(block_refs)
899        }
900
901        async fn check_block_refs(
902            &self,
903            _block_refs: Vec<BlockRef>,
904        ) -> Result<BTreeSet<BlockRef>, CoreError> {
905            Ok(BTreeSet::new())
906        }
907
908        async fn add_certified_commits(
909            &self,
910            _commits: CertifiedCommits,
911        ) -> Result<BTreeSet<BlockRef>, CoreError> {
912            todo!()
913        }
914
915        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
916            Ok(())
917        }
918
919        async fn get_missing_blocks(
920            &self,
921        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
922            Ok(Default::default())
923        }
924
925        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
926            todo!()
927        }
928
929        fn set_propagation_delay_and_quorum_rounds(
930            &self,
931            _delay: Round,
932            _received_quorum_rounds: Vec<QuorumRound>,
933            _accepted_quorum_rounds: Vec<QuorumRound>,
934        ) -> Result<(), CoreError> {
935            todo!()
936        }
937
938        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
939            todo!()
940        }
941
942        fn highest_received_rounds(&self) -> Vec<Round> {
943            todo!()
944        }
945    }
946
947    #[derive(Default)]
948    struct FakeNetworkClient {}
949
950    #[async_trait]
951    impl NetworkClient for FakeNetworkClient {
952        const SUPPORT_STREAMING: bool = false;
953
954        async fn send_block(
955            &self,
956            _peer: AuthorityIndex,
957            _block: &VerifiedBlock,
958            _timeout: Duration,
959        ) -> ConsensusResult<()> {
960            unimplemented!("Unimplemented")
961        }
962
963        async fn subscribe_blocks(
964            &self,
965            _peer: AuthorityIndex,
966            _last_received: Round,
967            _timeout: Duration,
968        ) -> ConsensusResult<BlockStream> {
969            unimplemented!("Unimplemented")
970        }
971
972        async fn fetch_blocks(
973            &self,
974            _peer: AuthorityIndex,
975            _block_refs: Vec<BlockRef>,
976            _highest_accepted_rounds: Vec<Round>,
977            _timeout: Duration,
978        ) -> ConsensusResult<Vec<Bytes>> {
979            unimplemented!("Unimplemented")
980        }
981
982        async fn fetch_commits(
983            &self,
984            _peer: AuthorityIndex,
985            _commit_range: CommitRange,
986            _timeout: Duration,
987        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
988            unimplemented!("Unimplemented")
989        }
990
991        async fn fetch_latest_blocks(
992            &self,
993            _peer: AuthorityIndex,
994            _authorities: Vec<AuthorityIndex>,
995            _timeout: Duration,
996        ) -> ConsensusResult<Vec<Bytes>> {
997            unimplemented!("Unimplemented")
998        }
999
1000        async fn get_latest_rounds(
1001            &self,
1002            _peer: AuthorityIndex,
1003            _timeout: Duration,
1004        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1005            unimplemented!("Unimplemented")
1006        }
1007    }
1008
1009    #[tokio::test(flavor = "current_thread", start_paused = true)]
1010    async fn test_handle_send_block() {
1011        let (context, _keys) = Context::new_for_test(4);
1012        let context = Arc::new(context);
1013        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1014        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1015        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1016        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1017        let network_client = Arc::new(FakeNetworkClient::default());
1018        let store = Arc::new(MemStore::new());
1019        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1020        let synchronizer = Synchronizer::start(
1021            network_client,
1022            context.clone(),
1023            core_dispatcher.clone(),
1024            commit_vote_monitor.clone(),
1025            block_verifier.clone(),
1026            dag_state.clone(),
1027            false,
1028        );
1029        let authority_service = Arc::new(AuthorityService::new(
1030            context.clone(),
1031            block_verifier,
1032            commit_vote_monitor,
1033            synchronizer,
1034            core_dispatcher.clone(),
1035            rx_block_broadcast,
1036            dag_state,
1037            store,
1038        ));
1039
1040        // Test delaying blocks with time drift.
1041        let now = context.clock.timestamp_utc_ms();
1042        let max_drift = context.parameters.max_forward_time_drift;
1043        let input_block = VerifiedBlock::new_for_test(
1044            TestBlock::new(9, 0)
1045                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1046                .build(),
1047        );
1048
1049        let service = authority_service.clone();
1050        let serialized = ExtendedSerializedBlock {
1051            block: input_block.serialized().clone(),
1052            excluded_ancestors: vec![],
1053        };
1054
1055        tokio::spawn(async move {
1056            service
1057                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1058                .await
1059                .unwrap();
1060        });
1061
1062        sleep(max_drift / 2).await;
1063        assert!(core_dispatcher.get_blocks().is_empty());
1064
1065        sleep(max_drift).await;
1066        let blocks = core_dispatcher.get_blocks();
1067        assert_eq!(blocks.len(), 1);
1068        assert_eq!(blocks[0], input_block);
1069    }
1070
1071    #[tokio::test(flavor = "current_thread", start_paused = true)]
1072    async fn test_handle_fetch_latest_blocks() {
1073        // GIVEN
1074        let (context, _keys) = Context::new_for_test(4);
1075        let context = Arc::new(context);
1076        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1077        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1078        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1079        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1080        let network_client = Arc::new(FakeNetworkClient::default());
1081        let store = Arc::new(MemStore::new());
1082        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1083        let synchronizer = Synchronizer::start(
1084            network_client,
1085            context.clone(),
1086            core_dispatcher.clone(),
1087            commit_vote_monitor.clone(),
1088            block_verifier.clone(),
1089            dag_state.clone(),
1090            true,
1091        );
1092        let authority_service = Arc::new(AuthorityService::new(
1093            context.clone(),
1094            block_verifier,
1095            commit_vote_monitor,
1096            synchronizer,
1097            core_dispatcher.clone(),
1098            rx_block_broadcast,
1099            dag_state.clone(),
1100            store,
1101        ));
1102
1103        // Create some blocks for a few authorities. Create some equivocations as well
1104        // and store in dag state.
1105        let mut dag_builder = DagBuilder::new(context.clone());
1106        dag_builder
1107            .layers(1..=10)
1108            .authorities(vec![AuthorityIndex::new_for_test(2)])
1109            .equivocate(1)
1110            .build()
1111            .persist_layers(dag_state);
1112
1113        // WHEN
1114        let authorities_to_request = vec![
1115            AuthorityIndex::new_for_test(1),
1116            AuthorityIndex::new_for_test(2),
1117        ];
1118        let results = authority_service
1119            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1120            .await;
1121
1122        // THEN
1123        let serialised_blocks = results.unwrap();
1124        for serialised_block in serialised_blocks {
1125            let signed_block: SignedBlock =
1126                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1127            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1128
1129            assert_eq!(verified_block.round(), 10);
1130        }
1131    }
1132}