consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18    CommitIndex, Round,
19    block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    commit::{CommitAPI as _, CommitRange, TrustedCommit},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::Context,
24    core_thread::CoreThreadDispatcher,
25    dag_state::DagState,
26    error::{ConsensusError, ConsensusResult},
27    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28    stake_aggregator::{QuorumThreshold, StakeAggregator},
29    storage::Store,
30    synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35/// Authority's network service implementation, agnostic to the actual
36/// networking stack used.
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38    context: Arc<Context>,
39    commit_vote_monitor: Arc<CommitVoteMonitor>,
40    block_verifier: Arc<dyn BlockVerifier>,
41    synchronizer: Arc<SynchronizerHandle>,
42    core_dispatcher: Arc<C>,
43    rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44    subscription_counter: Arc<SubscriptionCounter>,
45    dag_state: Arc<RwLock<DagState>>,
46    store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50    pub(crate) fn new(
51        context: Arc<Context>,
52        block_verifier: Arc<dyn BlockVerifier>,
53        commit_vote_monitor: Arc<CommitVoteMonitor>,
54        synchronizer: Arc<SynchronizerHandle>,
55        core_dispatcher: Arc<C>,
56        rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57        dag_state: Arc<RwLock<DagState>>,
58        store: Arc<dyn Store>,
59    ) -> Self {
60        let subscription_counter = Arc::new(SubscriptionCounter::new(
61            context.clone(),
62            core_dispatcher.clone(),
63        ));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcaster,
71            subscription_counter,
72            dag_state,
73            store,
74        }
75    }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80    async fn handle_send_block(
81        &self,
82        peer: AuthorityIndex,
83        serialized_block: ExtendedSerializedBlock,
84    ) -> ConsensusResult<()> {
85        fail_point_async!("consensus-rpc-response");
86
87        let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89        // TODO: dedup block verifications, here and with fetched blocks.
90        let signed_block: SignedBlock =
91            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93        // Reject blocks not produced by the peer.
94        if peer != signed_block.author() {
95            self.context
96                .metrics
97                .node_metrics
98                .invalid_blocks
99                .with_label_values(&[
100                    peer_hostname.as_str(),
101                    "handle_send_block",
102                    "UnexpectedAuthority",
103                ])
104                .inc();
105            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106            info!("Block with wrong authority from {}: {}", peer, e);
107            return Err(e);
108        }
109        let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111        // Reject blocks failing validations.
112        if let Err(e) = self.block_verifier.verify(&signed_block) {
113            self.context
114                .metrics
115                .node_metrics
116                .invalid_blocks
117                .with_label_values(&[
118                    peer_hostname.as_str(),
119                    "handle_send_block",
120                    e.clone().name(),
121                ])
122                .inc();
123            info!("Invalid block from {}: {}", peer, e);
124            return Err(e);
125        }
126        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127        let block_ref = verified_block.reference();
128        debug!("Received block {} via send block.", block_ref);
129
130        let now = self.context.clock.timestamp_utc_ms();
131        let forward_time_drift =
132            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
133
134        if !self
135            .context
136            .protocol_config
137            .consensus_median_timestamp_with_checkpoint_enforcement()
138        {
139            // Reject block with timestamp too far in the future.
140            if forward_time_drift > self.context.parameters.max_forward_time_drift {
141                self.context
142                    .metrics
143                    .node_metrics
144                    .rejected_future_blocks
145                    .with_label_values(&[peer_hostname])
146                    .inc();
147                debug!(
148                    "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
149                    block_ref,
150                    verified_block.timestamp_ms(),
151                    now,
152                );
153                return Err(ConsensusError::BlockRejected {
154                    block_ref,
155                    reason: format!(
156                        "Block timestamp is too far in the future: {} > {}",
157                        verified_block.timestamp_ms(),
158                        now
159                    ),
160                });
161            }
162
163            // Wait until the block's timestamp is current.
164            if forward_time_drift > Duration::ZERO {
165                self.context
166                    .metrics
167                    .node_metrics
168                    .block_timestamp_drift_ms
169                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
170                    .inc_by(forward_time_drift.as_millis() as u64);
171                debug!(
172                    "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
173                    block_ref,
174                    verified_block.timestamp_ms(),
175                    now,
176                    forward_time_drift.as_millis(),
177                );
178                sleep(forward_time_drift).await;
179            }
180        } else {
181            self.context
182                .metrics
183                .node_metrics
184                .block_timestamp_drift_ms
185                .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
186                .inc_by(forward_time_drift.as_millis() as u64);
187        }
188
189        // Observe the block for the commit votes. When local commit is lagging too
190        // much, commit sync loop will trigger fetching.
191        self.commit_vote_monitor.observe_block(&verified_block);
192
193        // Reject blocks when local commit index is lagging too far from quorum commit
194        // index.
195        //
196        // IMPORTANT: this must be done after observing votes from the block, otherwise
197        // observed quorum commit will no longer progress.
198        //
199        // Since the main issue with too many suspended blocks is memory usage not CPU,
200        // it is ok to reject after block verifications instead of before.
201        let last_commit_index = self.dag_state.read().last_commit_index();
202        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
203        // The threshold to ignore block should be larger than commit_sync_batch_size,
204        // to avoid excessive block rejections and synchronizations.
205        if last_commit_index
206            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
207            < quorum_commit_index
208        {
209            self.context
210                .metrics
211                .node_metrics
212                .rejected_blocks
213                .with_label_values(&["commit_lagging"])
214                .inc();
215            debug!(
216                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
217                block_ref, last_commit_index, quorum_commit_index,
218            );
219            return Err(ConsensusError::BlockRejected {
220                block_ref,
221                reason: format!(
222                    "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
223                ),
224            });
225        }
226
227        self.context
228            .metrics
229            .node_metrics
230            .verified_blocks
231            .with_label_values(&[peer_hostname])
232            .inc();
233
234        let missing_ancestors = self
235            .core_dispatcher
236            .add_blocks(vec![verified_block])
237            .await
238            .map_err(|_| ConsensusError::Shutdown)?;
239        if !missing_ancestors.is_empty() {
240            // schedule the fetching of them from this peer
241            if let Err(err) = self
242                .synchronizer
243                .fetch_blocks(missing_ancestors, peer)
244                .await
245            {
246                warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
247            }
248        }
249
250        // After processing the block, process the excluded ancestors
251
252        let mut excluded_ancestors = serialized_block
253            .excluded_ancestors
254            .into_iter()
255            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
256            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
257            .map_err(ConsensusError::MalformedBlock)?;
258
259        let excluded_ancestors_limit = self.context.committee.size() * 2;
260        if excluded_ancestors.len() > excluded_ancestors_limit {
261            debug!(
262                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
263                excluded_ancestors.len() - excluded_ancestors_limit,
264                peer,
265                peer_hostname,
266            );
267            excluded_ancestors.truncate(excluded_ancestors_limit);
268        }
269
270        self.context
271            .metrics
272            .node_metrics
273            .network_received_excluded_ancestors_from_authority
274            .with_label_values(&[peer_hostname])
275            .inc_by(excluded_ancestors.len() as u64);
276
277        for excluded_ancestor in &excluded_ancestors {
278            let excluded_ancestor_hostname = &self
279                .context
280                .committee
281                .authority(excluded_ancestor.author)
282                .hostname;
283            self.context
284                .metrics
285                .node_metrics
286                .network_excluded_ancestors_count_by_authority
287                .with_label_values(&[excluded_ancestor_hostname])
288                .inc();
289        }
290
291        let missing_excluded_ancestors = self
292            .core_dispatcher
293            .check_block_refs(excluded_ancestors)
294            .await
295            .map_err(|_| ConsensusError::Shutdown)?;
296
297        if !missing_excluded_ancestors.is_empty() {
298            self.context
299                .metrics
300                .node_metrics
301                .network_excluded_ancestors_sent_to_fetch
302                .with_label_values(&[peer_hostname])
303                .inc_by(missing_excluded_ancestors.len() as u64);
304
305            let synchronizer = self.synchronizer.clone();
306            tokio::spawn(async move {
307                // schedule the fetching of them from this peer in the background
308                if let Err(err) = synchronizer
309                    .fetch_blocks(missing_excluded_ancestors, peer)
310                    .await
311                {
312                    warn!(
313                        "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
314                    );
315                }
316            });
317        }
318
319        Ok(())
320    }
321
322    async fn handle_subscribe_blocks(
323        &self,
324        peer: AuthorityIndex,
325        last_received: Round,
326    ) -> ConsensusResult<BlockStream> {
327        fail_point_async!("consensus-rpc-response");
328
329        let dag_state = self.dag_state.read();
330        // Find recent own blocks that have not been received by the peer.
331        // If last_received is a valid and more blocks have been proposed since then,
332        // this call is guaranteed to return at least some recent blocks, which
333        // will help with liveness.
334        let missed_blocks = stream::iter(
335            dag_state
336                .get_cached_blocks(self.context.own_index, last_received + 1)
337                .into_iter()
338                .map(|block| ExtendedSerializedBlock {
339                    block: block.serialized().clone(),
340                    excluded_ancestors: vec![],
341                }),
342        );
343
344        let broadcasted_blocks = BroadcastedBlockStream::new(
345            peer,
346            self.rx_block_broadcaster.resubscribe(),
347            self.subscription_counter.clone(),
348        );
349
350        // Return a stream of blocks that first yields missed blocks as requested, then
351        // new blocks.
352        Ok(Box::pin(missed_blocks.chain(
353            broadcasted_blocks.map(ExtendedSerializedBlock::from),
354        )))
355    }
356
357    // Handles two types of requests:
358    // 1. Missing block for block sync:
359    //    - uses highest_accepted_rounds.
360    //    - at most max_blocks_per_sync blocks should be returned.
361    // 2. Committed block for commit sync:
362    //    - does not use highest_accepted_rounds.
363    //    - at most max_blocks_per_fetch blocks should be returned.
364    async fn handle_fetch_blocks(
365        &self,
366        peer: AuthorityIndex,
367        mut block_refs: Vec<BlockRef>,
368        highest_accepted_rounds: Vec<Round>,
369    ) -> ConsensusResult<Vec<Bytes>> {
370        // This method is used for both commit sync and periodic/live synchronizer.
371        // For commit sync, we do not use highest_accepted_rounds and the fetch size is
372        // larger.
373        let commit_sync_handle = highest_accepted_rounds.is_empty();
374
375        fail_point_async!("consensus-rpc-response");
376
377        // Some quick validation of the requested block refs
378        for block in &block_refs {
379            if !self.context.committee.is_valid_index(block.author) {
380                return Err(ConsensusError::InvalidAuthorityIndex {
381                    index: block.author,
382                    max: self.context.committee.size(),
383                });
384            }
385            if block.round == GENESIS_ROUND {
386                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
387            }
388        }
389
390        if !self.context.protocol_config.consensus_batched_block_sync() {
391            if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
392                return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
393            }
394
395            if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
396            {
397                return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
398                    highest_accepted_rounds.len(),
399                    self.context.committee.size(),
400                ));
401            }
402
403            // For now ask dag state directly
404            let blocks = self.dag_state.read().get_blocks(&block_refs);
405
406            // Now check if an ancestor's round is higher than the one that the peer has. If
407            // yes, then serve that ancestor blocks up to `MAX_ADDITIONAL_BLOCKS`.
408            let mut ancestor_blocks = vec![];
409            if !commit_sync_handle {
410                let all_ancestors = blocks
411                    .iter()
412                    .flatten()
413                    .flat_map(|block| block.ancestors().to_vec())
414                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
415                    .take(MAX_ADDITIONAL_BLOCKS)
416                    .collect::<Vec<_>>();
417
418                if !all_ancestors.is_empty() {
419                    ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
420                }
421            }
422
423            // Return the serialised blocks & the ancestor blocks
424            let result = blocks
425                .into_iter()
426                .chain(ancestor_blocks)
427                .flatten()
428                .map(|block| block.serialized().clone())
429                .collect::<Vec<_>>();
430
431            return Ok(result);
432        }
433
434        // For commit sync, the fetch size is larger. For periodic/live synchronizer,
435        // the fetch size is smaller.else { Instead of rejecting the request, we
436        // truncate the size to allow an easy update of this parameter in the future.
437        if commit_sync_handle {
438            block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
439        } else {
440            block_refs.truncate(self.context.parameters.max_blocks_per_sync);
441        }
442
443        // Get requested blocks from store.
444        let blocks = if commit_sync_handle {
445            // For commit sync, we respond with all blocks from the store
446            self.dag_state
447                .read()
448                .get_blocks(&block_refs)
449                .into_iter()
450                .flatten()
451                .collect()
452        } else {
453            // For periodic or live synchronizer, we respond with requested blocks from the
454            // store and with additional blocks from the cache
455            block_refs.sort();
456            block_refs.dedup();
457            let dag_state = self.dag_state.read();
458            let mut blocks = dag_state
459                .get_blocks(&block_refs)
460                .into_iter()
461                .flatten()
462                .collect::<Vec<_>>();
463
464            // Get additional blocks for authorities with missing block, if they are
465            // available in cache. Compute the lowest missing round per
466            // requested authority.
467            let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
468            for block_ref in blocks.iter().map(|b| b.reference()) {
469                let entry = lowest_missing_rounds
470                    .entry(block_ref.author)
471                    .or_insert(block_ref.round);
472                *entry = (*entry).min(block_ref.round);
473            }
474
475            // Retrieve additional blocks per authority, from peer's highest accepted round
476            // + 1 to lowest missing round (exclusive) per requested authority. Start with
477            //   own blocks.
478            let own_index = self.context.own_index;
479
480            // Collect and sort so own_index comes first
481            let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
482            ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
483
484            for (authority, lowest_missing_round) in ordered_missing_rounds {
485                let highest_accepted_round = highest_accepted_rounds[authority];
486                if highest_accepted_round >= lowest_missing_round {
487                    continue;
488                }
489
490                let missing_blocks = dag_state.get_cached_blocks_in_range(
491                    authority,
492                    highest_accepted_round + 1,
493                    lowest_missing_round,
494                    self.context
495                        .parameters
496                        .max_blocks_per_sync
497                        .saturating_sub(blocks.len()),
498                );
499                blocks.extend(missing_blocks);
500                if blocks.len() >= self.context.parameters.max_blocks_per_sync {
501                    blocks.truncate(self.context.parameters.max_blocks_per_sync);
502                    break;
503                }
504            }
505
506            blocks
507        };
508
509        // Return the serialized blocks
510        let bytes = blocks
511            .into_iter()
512            .map(|block| block.serialized().clone())
513            .collect::<Vec<_>>();
514        Ok(bytes)
515    }
516
517    async fn handle_fetch_commits(
518        &self,
519        _peer: AuthorityIndex,
520        commit_range: CommitRange,
521    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
522        fail_point_async!("consensus-rpc-response");
523
524        // Compute an inclusive end index and bound the maximum number of commits
525        // scanned.
526        let inclusive_end = commit_range.end().min(
527            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
528                - 1,
529        );
530        let mut commits = self
531            .store
532            .scan_commits((commit_range.start()..=inclusive_end).into())?;
533        let mut certifier_block_refs = vec![];
534        'commit: while let Some(c) = commits.last() {
535            let index = c.index();
536            let votes = self.store.read_commit_votes(index)?;
537            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
538            for v in &votes {
539                stake_aggregator.add(v.author, &self.context.committee);
540            }
541            if stake_aggregator.reached_threshold(&self.context.committee) {
542                certifier_block_refs = votes;
543                break 'commit;
544            } else {
545                debug!(
546                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
547                    index,
548                    stake_aggregator.stake(),
549                    stake_aggregator.threshold(&self.context.committee)
550                );
551                self.context
552                    .metrics
553                    .node_metrics
554                    .commit_sync_fetch_commits_handler_uncertified_skipped
555                    .inc();
556                commits.pop();
557            }
558        }
559        let certifier_blocks = self
560            .store
561            .read_blocks(&certifier_block_refs)?
562            .into_iter()
563            .flatten()
564            .collect();
565        Ok((commits, certifier_blocks))
566    }
567
568    async fn handle_fetch_latest_blocks(
569        &self,
570        peer: AuthorityIndex,
571        authorities: Vec<AuthorityIndex>,
572    ) -> ConsensusResult<Vec<Bytes>> {
573        fail_point_async!("consensus-rpc-response");
574
575        if authorities.len() > self.context.committee.size() {
576            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
577        }
578
579        // Ensure that those are valid authorities
580        for authority in &authorities {
581            if !self.context.committee.is_valid_index(*authority) {
582                return Err(ConsensusError::InvalidAuthorityIndex {
583                    index: *authority,
584                    max: self.context.committee.size(),
585                });
586            }
587        }
588
589        // Read from the dag state to find the latest blocks.
590        // TODO: at the moment we don't look into the block manager for suspended
591        // blocks. Ideally we want in the future if we think we would like to
592        // tackle the majority of cases.
593        let mut blocks = vec![];
594        let dag_state = self.dag_state.read();
595        for authority in authorities {
596            let block = dag_state.get_last_block_for_authority(authority);
597
598            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
599
600            // no reason to serve back the genesis block - it's equal as if it has not
601            // received any block
602            if block.round() != GENESIS_ROUND {
603                blocks.push(block);
604            }
605        }
606
607        // Return the serialised blocks
608        let result = blocks
609            .into_iter()
610            .map(|block| block.serialized().clone())
611            .collect::<Vec<_>>();
612
613        Ok(result)
614    }
615
616    async fn handle_get_latest_rounds(
617        &self,
618        _peer: AuthorityIndex,
619    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
620        fail_point_async!("consensus-rpc-response");
621
622        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
623
624        let blocks = self
625            .dag_state
626            .read()
627            .get_last_cached_block_per_authority(Round::MAX);
628        let highest_accepted_rounds = blocks
629            .into_iter()
630            .map(|(block, _)| block.round())
631            .collect::<Vec<_>>();
632
633        // Own blocks do not go through the core dispatcher, so they need to be set
634        // separately.
635        highest_received_rounds[self.context.own_index] =
636            highest_accepted_rounds[self.context.own_index];
637
638        Ok((highest_received_rounds, highest_accepted_rounds))
639    }
640}
641
642struct Counter {
643    count: usize,
644    subscriptions_by_authority: Vec<usize>,
645}
646
647/// Atomically counts the number of active subscriptions to the block broadcast
648/// stream, and dispatch commands to core based on the changes.
649struct SubscriptionCounter {
650    context: Arc<Context>,
651    counter: parking_lot::Mutex<Counter>,
652    dispatcher: Arc<dyn CoreThreadDispatcher>,
653}
654
655impl SubscriptionCounter {
656    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
657        // Set the subscribed peers by default to 0
658        for (_, authority) in context.committee.authorities() {
659            context
660                .metrics
661                .node_metrics
662                .subscribed_by
663                .with_label_values(&[authority.hostname.as_str()])
664                .set(0);
665        }
666
667        Self {
668            counter: parking_lot::Mutex::new(Counter {
669                count: 0,
670                subscriptions_by_authority: vec![0; context.committee.size()],
671            }),
672            dispatcher,
673            context,
674        }
675    }
676
677    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
678        let mut counter = self.counter.lock();
679        counter.count += 1;
680        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
681        counter.subscriptions_by_authority[peer] += 1;
682        let mut total_stake = 0;
683        for (authority_index, _) in self.context.committee.authorities() {
684            if counter.subscriptions_by_authority[authority_index] >= 1
685                || self.context.own_index == authority_index
686            {
687                total_stake += self.context.committee.stake(authority_index);
688            }
689        }
690        // Stake of subscriptions before a new peer was subscribed
691        let previous_stake = if original_subscription_by_peer == 0 {
692            total_stake - self.context.committee.stake(peer)
693        } else {
694            total_stake
695        };
696
697        let peer_hostname = &self.context.committee.authority(peer).hostname;
698        self.context
699            .metrics
700            .node_metrics
701            .subscribed_by
702            .with_label_values(&[peer_hostname])
703            .set(1);
704        // If the subscription count reaches quorum, notify the dispatcher and get ready
705        // to propose blocks.
706        if !self.context.committee.reached_quorum(previous_stake)
707            && self.context.committee.reached_quorum(total_stake)
708        {
709            self.dispatcher
710                .set_quorum_subscribers_exists(true)
711                .map_err(|_| ConsensusError::Shutdown)?;
712        }
713        // Drop the counter after sending the command to the dispatcher
714        drop(counter);
715        Ok(())
716    }
717
718    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
719        let mut counter = self.counter.lock();
720        counter.count -= 1;
721        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
722        counter.subscriptions_by_authority[peer] -= 1;
723        let mut total_stake = 0;
724        for (authority_index, _) in self.context.committee.authorities() {
725            if counter.subscriptions_by_authority[authority_index] >= 1
726                || self.context.own_index == authority_index
727            {
728                total_stake += self.context.committee.stake(authority_index);
729            }
730        }
731        // Stake of subscriptions before a peer was dropped
732        let previous_stake = if original_subscription_by_peer == 1 {
733            total_stake + self.context.committee.stake(peer)
734        } else {
735            total_stake
736        };
737
738        if counter.subscriptions_by_authority[peer] == 0 {
739            let peer_hostname = &self.context.committee.authority(peer).hostname;
740            self.context
741                .metrics
742                .node_metrics
743                .subscribed_by
744                .with_label_values(&[peer_hostname])
745                .set(0);
746        }
747
748        // If the subscription count drops below quorum, notify the dispatcher to stop
749        // proposing blocks.
750        if self.context.committee.reached_quorum(previous_stake)
751            && !self.context.committee.reached_quorum(total_stake)
752        {
753            self.dispatcher
754                .set_quorum_subscribers_exists(false)
755                .map_err(|_| ConsensusError::Shutdown)?;
756        }
757        // Drop the counter after sending the command to the dispatcher
758        drop(counter);
759        Ok(())
760    }
761}
762
763/// Each broadcasted block stream wraps a broadcast receiver for blocks.
764/// It yields blocks that are broadcasted after the stream is created.
765type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
766
767/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference
768/// is that this tolerates lags with only logging, without yielding errors.
769struct BroadcastStream<T> {
770    peer: AuthorityIndex,
771    // Stores the receiver across poll_next() calls.
772    inner: ReusableBoxFuture<
773        'static,
774        (
775            Result<T, broadcast::error::RecvError>,
776            broadcast::Receiver<T>,
777        ),
778    >,
779    // Counts total subscriptions / active BroadcastStreams.
780    subscription_counter: Arc<SubscriptionCounter>,
781}
782
783impl<T: 'static + Clone + Send> BroadcastStream<T> {
784    pub fn new(
785        peer: AuthorityIndex,
786        rx: broadcast::Receiver<T>,
787        subscription_counter: Arc<SubscriptionCounter>,
788    ) -> Self {
789        if let Err(err) = subscription_counter.increment(peer) {
790            match err {
791                ConsensusError::Shutdown => {}
792                _ => panic!("Unexpected error: {err}"),
793            }
794        }
795        Self {
796            peer,
797            inner: ReusableBoxFuture::new(make_recv_future(rx)),
798            subscription_counter,
799        }
800    }
801}
802
803impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
804    type Item = T;
805
806    fn poll_next(
807        mut self: Pin<&mut Self>,
808        cx: &mut task::Context<'_>,
809    ) -> task::Poll<Option<Self::Item>> {
810        let peer = self.peer;
811        let maybe_item = loop {
812            let (result, rx) = ready!(self.inner.poll(cx));
813            self.inner.set(make_recv_future(rx));
814
815            match result {
816                Ok(item) => break Some(item),
817                Err(broadcast::error::RecvError::Closed) => {
818                    info!("Block BroadcastedBlockStream {} closed", peer);
819                    break None;
820                }
821                Err(broadcast::error::RecvError::Lagged(n)) => {
822                    warn!(
823                        "Block BroadcastedBlockStream {} lagged by {} messages",
824                        peer, n
825                    );
826                    continue;
827                }
828            }
829        };
830        task::Poll::Ready(maybe_item)
831    }
832}
833
834impl<T> Drop for BroadcastStream<T> {
835    fn drop(&mut self) {
836        if let Err(err) = self.subscription_counter.decrement(self.peer) {
837            match err {
838                ConsensusError::Shutdown => {}
839                _ => panic!("Unexpected error: {err}"),
840            }
841        }
842    }
843}
844
845async fn make_recv_future<T: Clone>(
846    mut rx: broadcast::Receiver<T>,
847) -> (
848    Result<T, broadcast::error::RecvError>,
849    broadcast::Receiver<T>,
850) {
851    let result = rx.recv().await;
852    (result, rx)
853}
854
855// TODO: add a unit test for BroadcastStream.
856
857#[cfg(test)]
858pub(crate) mod tests {
859    use std::{
860        collections::{BTreeMap, BTreeSet},
861        sync::Arc,
862        time::Duration,
863    };
864
865    use async_trait::async_trait;
866    use bytes::Bytes;
867    use consensus_config::AuthorityIndex;
868    use parking_lot::{Mutex, RwLock};
869    use rstest::rstest;
870    use tokio::{sync::broadcast, time::sleep};
871
872    use crate::{
873        Round,
874        authority_service::AuthorityService,
875        block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
876        commit::{CertifiedCommits, CommitRange},
877        commit_vote_monitor::CommitVoteMonitor,
878        context::Context,
879        core_thread::{CoreError, CoreThreadDispatcher},
880        dag_state::DagState,
881        error::ConsensusResult,
882        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
883        round_prober::QuorumRound,
884        storage::mem_store::MemStore,
885        synchronizer::Synchronizer,
886        test_dag_builder::DagBuilder,
887    };
888
889    pub(crate) struct FakeCoreThreadDispatcher {
890        blocks: Mutex<Vec<VerifiedBlock>>,
891    }
892
893    impl FakeCoreThreadDispatcher {
894        pub(crate) fn new() -> Self {
895            Self {
896                blocks: Mutex::new(vec![]),
897            }
898        }
899
900        fn get_blocks(&self) -> Vec<VerifiedBlock> {
901            self.blocks.lock().clone()
902        }
903    }
904
905    #[async_trait]
906    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
907        async fn add_blocks(
908            &self,
909            blocks: Vec<VerifiedBlock>,
910        ) -> Result<BTreeSet<BlockRef>, CoreError> {
911            let block_refs = blocks.iter().map(|b| b.reference()).collect();
912            self.blocks.lock().extend(blocks);
913            Ok(block_refs)
914        }
915
916        async fn check_block_refs(
917            &self,
918            _block_refs: Vec<BlockRef>,
919        ) -> Result<BTreeSet<BlockRef>, CoreError> {
920            Ok(BTreeSet::new())
921        }
922
923        async fn add_certified_commits(
924            &self,
925            _commits: CertifiedCommits,
926        ) -> Result<BTreeSet<BlockRef>, CoreError> {
927            todo!()
928        }
929
930        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
931            Ok(())
932        }
933
934        async fn get_missing_blocks(
935            &self,
936        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
937            Ok(Default::default())
938        }
939
940        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
941            todo!()
942        }
943
944        fn set_propagation_delay_and_quorum_rounds(
945            &self,
946            _delay: Round,
947            _received_quorum_rounds: Vec<QuorumRound>,
948            _accepted_quorum_rounds: Vec<QuorumRound>,
949        ) -> Result<(), CoreError> {
950            todo!()
951        }
952
953        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
954            todo!()
955        }
956
957        fn highest_received_rounds(&self) -> Vec<Round> {
958            todo!()
959        }
960    }
961
962    #[derive(Default)]
963    struct FakeNetworkClient {}
964
965    #[async_trait]
966    impl NetworkClient for FakeNetworkClient {
967        const SUPPORT_STREAMING: bool = false;
968
969        async fn send_block(
970            &self,
971            _peer: AuthorityIndex,
972            _block: &VerifiedBlock,
973            _timeout: Duration,
974        ) -> ConsensusResult<()> {
975            unimplemented!("Unimplemented")
976        }
977
978        async fn subscribe_blocks(
979            &self,
980            _peer: AuthorityIndex,
981            _last_received: Round,
982            _timeout: Duration,
983        ) -> ConsensusResult<BlockStream> {
984            unimplemented!("Unimplemented")
985        }
986
987        async fn fetch_blocks(
988            &self,
989            _peer: AuthorityIndex,
990            _block_refs: Vec<BlockRef>,
991            _highest_accepted_rounds: Vec<Round>,
992            _timeout: Duration,
993        ) -> ConsensusResult<Vec<Bytes>> {
994            unimplemented!("Unimplemented")
995        }
996
997        async fn fetch_commits(
998            &self,
999            _peer: AuthorityIndex,
1000            _commit_range: CommitRange,
1001            _timeout: Duration,
1002        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1003            unimplemented!("Unimplemented")
1004        }
1005
1006        async fn fetch_latest_blocks(
1007            &self,
1008            _peer: AuthorityIndex,
1009            _authorities: Vec<AuthorityIndex>,
1010            _timeout: Duration,
1011        ) -> ConsensusResult<Vec<Bytes>> {
1012            unimplemented!("Unimplemented")
1013        }
1014
1015        async fn get_latest_rounds(
1016            &self,
1017            _peer: AuthorityIndex,
1018            _timeout: Duration,
1019        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1020            unimplemented!("Unimplemented")
1021        }
1022    }
1023
1024    #[rstest]
1025    #[tokio::test(flavor = "current_thread", start_paused = true)]
1026    async fn test_handle_send_block(#[values(false, true)] median_based_timestamp: bool) {
1027        let (mut context, _keys) = Context::new_for_test(4);
1028        context
1029            .protocol_config
1030            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1031                median_based_timestamp,
1032            );
1033        let context = Arc::new(context);
1034        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1035        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1036        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1037        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1038        let network_client = Arc::new(FakeNetworkClient::default());
1039        let store = Arc::new(MemStore::new());
1040        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1041        let synchronizer = Synchronizer::start(
1042            network_client,
1043            context.clone(),
1044            core_dispatcher.clone(),
1045            commit_vote_monitor.clone(),
1046            block_verifier.clone(),
1047            dag_state.clone(),
1048            false,
1049        );
1050        let authority_service = Arc::new(AuthorityService::new(
1051            context.clone(),
1052            block_verifier,
1053            commit_vote_monitor,
1054            synchronizer,
1055            core_dispatcher.clone(),
1056            rx_block_broadcast,
1057            dag_state,
1058            store,
1059        ));
1060
1061        // Test delaying blocks with time drift.
1062        let now = context.clock.timestamp_utc_ms();
1063        let max_drift = context.parameters.max_forward_time_drift;
1064        let input_block = VerifiedBlock::new_for_test(
1065            TestBlock::new(9, 0)
1066                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1067                .build(),
1068        );
1069
1070        let service = authority_service.clone();
1071        let serialized = ExtendedSerializedBlock {
1072            block: input_block.serialized().clone(),
1073            excluded_ancestors: vec![],
1074        };
1075
1076        tokio::spawn(async move {
1077            service
1078                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1079                .await
1080                .unwrap();
1081        });
1082
1083        sleep(max_drift / 2).await;
1084
1085        if !median_based_timestamp {
1086            assert!(core_dispatcher.get_blocks().is_empty());
1087            sleep(max_drift).await;
1088        }
1089
1090        let blocks = core_dispatcher.get_blocks();
1091        assert_eq!(blocks.len(), 1);
1092        assert_eq!(blocks[0], input_block);
1093    }
1094
1095    #[tokio::test(flavor = "current_thread", start_paused = true)]
1096    async fn test_handle_fetch_latest_blocks() {
1097        // GIVEN
1098        let (context, _keys) = Context::new_for_test(4);
1099        let context = Arc::new(context);
1100        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1101        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1102        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1103        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1104        let network_client = Arc::new(FakeNetworkClient::default());
1105        let store = Arc::new(MemStore::new());
1106        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1107        let synchronizer = Synchronizer::start(
1108            network_client,
1109            context.clone(),
1110            core_dispatcher.clone(),
1111            commit_vote_monitor.clone(),
1112            block_verifier.clone(),
1113            dag_state.clone(),
1114            true,
1115        );
1116        let authority_service = Arc::new(AuthorityService::new(
1117            context.clone(),
1118            block_verifier,
1119            commit_vote_monitor,
1120            synchronizer,
1121            core_dispatcher.clone(),
1122            rx_block_broadcast,
1123            dag_state.clone(),
1124            store,
1125        ));
1126
1127        // Create some blocks for a few authorities. Create some equivocations as well
1128        // and store in dag state.
1129        let mut dag_builder = DagBuilder::new(context.clone());
1130        dag_builder
1131            .layers(1..=10)
1132            .authorities(vec![AuthorityIndex::new_for_test(2)])
1133            .equivocate(1)
1134            .build()
1135            .persist_layers(dag_state);
1136
1137        // WHEN
1138        let authorities_to_request = vec![
1139            AuthorityIndex::new_for_test(1),
1140            AuthorityIndex::new_for_test(2),
1141        ];
1142        let results = authority_service
1143            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1144            .await;
1145
1146        // THEN
1147        let serialised_blocks = results.unwrap();
1148        for serialised_block in serialised_blocks {
1149            let signed_block: SignedBlock =
1150                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1151            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1152
1153            assert_eq!(verified_block.round(), 10);
1154        }
1155    }
1156}