consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18    CommitIndex, Round,
19    block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    commit::{CommitAPI as _, CommitRange, TrustedCommit},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::Context,
24    core_thread::CoreThreadDispatcher,
25    dag_state::DagState,
26    error::{ConsensusError, ConsensusResult},
27    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28    stake_aggregator::{QuorumThreshold, StakeAggregator},
29    storage::Store,
30    synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35/// Authority's network service implementation, agnostic to the actual
36/// networking stack used.
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38    context: Arc<Context>,
39    commit_vote_monitor: Arc<CommitVoteMonitor>,
40    block_verifier: Arc<dyn BlockVerifier>,
41    synchronizer: Arc<SynchronizerHandle>,
42    core_dispatcher: Arc<C>,
43    rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44    subscription_counter: Arc<SubscriptionCounter>,
45    dag_state: Arc<RwLock<DagState>>,
46    store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50    pub(crate) fn new(
51        context: Arc<Context>,
52        block_verifier: Arc<dyn BlockVerifier>,
53        commit_vote_monitor: Arc<CommitVoteMonitor>,
54        synchronizer: Arc<SynchronizerHandle>,
55        core_dispatcher: Arc<C>,
56        rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57        dag_state: Arc<RwLock<DagState>>,
58        store: Arc<dyn Store>,
59    ) -> Self {
60        let subscription_counter = Arc::new(SubscriptionCounter::new(
61            context.clone(),
62            core_dispatcher.clone(),
63        ));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcaster,
71            subscription_counter,
72            dag_state,
73            store,
74        }
75    }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80    async fn handle_send_block(
81        &self,
82        peer: AuthorityIndex,
83        serialized_block: ExtendedSerializedBlock,
84    ) -> ConsensusResult<()> {
85        fail_point_async!("consensus-rpc-response");
86        let _s = self
87            .context
88            .metrics
89            .node_metrics
90            .scope_processing_time
91            .with_label_values(&["AuthorityService::handle_stream"])
92            .start_timer();
93        let peer_hostname = &self.context.committee.authority(peer).hostname;
94
95        // TODO: dedup block verifications, here and with fetched blocks.
96        let signed_block: SignedBlock =
97            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
98
99        // Reject blocks not produced by the peer.
100        if peer != signed_block.author() {
101            self.context
102                .metrics
103                .node_metrics
104                .invalid_blocks
105                .with_label_values(&[
106                    peer_hostname.as_str(),
107                    "handle_send_block",
108                    "UnexpectedAuthority",
109                ])
110                .inc();
111            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
112            info!("Block with wrong authority from {}: {}", peer, e);
113            return Err(e);
114        }
115        let peer_hostname = &self.context.committee.authority(peer).hostname;
116
117        // Reject blocks failing validations.
118        if let Err(e) = self.block_verifier.verify(&signed_block) {
119            self.context
120                .metrics
121                .node_metrics
122                .invalid_blocks
123                .with_label_values(&[
124                    peer_hostname.as_str(),
125                    "handle_send_block",
126                    e.clone().name(),
127                ])
128                .inc();
129            info!("Invalid block from {}: {}", peer, e);
130            return Err(e);
131        }
132        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
133        let block_ref = verified_block.reference();
134        debug!("Received block {} via send block.", block_ref);
135
136        let now = self.context.clock.timestamp_utc_ms();
137        let forward_time_drift =
138            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
139        let latency_to_process_stream =
140            Duration::from_millis(now.saturating_sub(verified_block.timestamp_ms()));
141        self.context
142            .metrics
143            .node_metrics
144            .latency_to_process_stream
145            .with_label_values(&[peer_hostname.as_str()])
146            .observe(latency_to_process_stream.as_secs_f64());
147
148        if !self
149            .context
150            .protocol_config
151            .consensus_median_timestamp_with_checkpoint_enforcement()
152        {
153            // Reject block with timestamp too far in the future.
154            if forward_time_drift > self.context.parameters.max_forward_time_drift {
155                self.context
156                    .metrics
157                    .node_metrics
158                    .rejected_future_blocks
159                    .with_label_values(&[peer_hostname])
160                    .inc();
161                debug!(
162                    "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
163                    block_ref,
164                    verified_block.timestamp_ms(),
165                    now,
166                );
167                return Err(ConsensusError::BlockRejected {
168                    block_ref,
169                    reason: format!(
170                        "Block timestamp is too far in the future: {} > {}",
171                        verified_block.timestamp_ms(),
172                        now
173                    ),
174                });
175            }
176
177            // Wait until the block's timestamp is current.
178            if forward_time_drift > Duration::ZERO {
179                self.context
180                    .metrics
181                    .node_metrics
182                    .block_timestamp_drift_ms
183                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
184                    .inc_by(forward_time_drift.as_millis() as u64);
185                debug!(
186                    "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
187                    block_ref,
188                    verified_block.timestamp_ms(),
189                    now,
190                    forward_time_drift.as_millis(),
191                );
192                sleep(forward_time_drift).await;
193            }
194        } else {
195            self.context
196                .metrics
197                .node_metrics
198                .block_timestamp_drift_ms
199                .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
200                .inc_by(forward_time_drift.as_millis() as u64);
201        }
202
203        // Observe the block for the commit votes. When local commit is lagging too
204        // much, commit sync loop will trigger fetching.
205        self.commit_vote_monitor.observe_block(&verified_block);
206
207        // Reject blocks when local commit index is lagging too far from quorum commit
208        // index.
209        //
210        // IMPORTANT: this must be done after observing votes from the block, otherwise
211        // observed quorum commit will no longer progress.
212        //
213        // Since the main issue with too many suspended blocks is memory usage not CPU,
214        // it is ok to reject after block verifications instead of before.
215        let last_commit_index = self.dag_state.read().last_commit_index();
216        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
217        // The threshold to ignore block should be larger than commit_sync_batch_size,
218        // to avoid excessive block rejections and synchronizations.
219        if last_commit_index
220            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
221            < quorum_commit_index
222        {
223            self.context
224                .metrics
225                .node_metrics
226                .rejected_blocks
227                .with_label_values(&["commit_lagging"])
228                .inc();
229            debug!(
230                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
231                block_ref, last_commit_index, quorum_commit_index,
232            );
233            return Err(ConsensusError::BlockRejected {
234                block_ref,
235                reason: format!(
236                    "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
237                ),
238            });
239        }
240
241        self.context
242            .metrics
243            .node_metrics
244            .verified_blocks
245            .with_label_values(&[peer_hostname])
246            .inc();
247
248        let missing_ancestors = self
249            .core_dispatcher
250            .add_blocks(vec![verified_block])
251            .await
252            .map_err(|_| ConsensusError::Shutdown)?;
253        if !missing_ancestors.is_empty() {
254            // schedule the fetching of them from this peer
255            if let Err(err) = self
256                .synchronizer
257                .fetch_blocks(missing_ancestors, peer)
258                .await
259            {
260                warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
261            }
262        }
263
264        // After processing the block, process the excluded ancestors
265
266        let mut excluded_ancestors = serialized_block
267            .excluded_ancestors
268            .into_iter()
269            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
270            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
271            .map_err(ConsensusError::MalformedBlock)?;
272
273        let excluded_ancestors_limit = self.context.committee.size() * 2;
274        if excluded_ancestors.len() > excluded_ancestors_limit {
275            debug!(
276                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
277                excluded_ancestors.len() - excluded_ancestors_limit,
278                peer,
279                peer_hostname,
280            );
281            excluded_ancestors.truncate(excluded_ancestors_limit);
282        }
283
284        self.context
285            .metrics
286            .node_metrics
287            .network_received_excluded_ancestors_from_authority
288            .with_label_values(&[peer_hostname])
289            .inc_by(excluded_ancestors.len() as u64);
290
291        for excluded_ancestor in &excluded_ancestors {
292            let excluded_ancestor_hostname = &self
293                .context
294                .committee
295                .authority(excluded_ancestor.author)
296                .hostname;
297            self.context
298                .metrics
299                .node_metrics
300                .network_excluded_ancestors_count_by_authority
301                .with_label_values(&[excluded_ancestor_hostname])
302                .inc();
303        }
304
305        let missing_excluded_ancestors = self
306            .core_dispatcher
307            .check_block_refs(excluded_ancestors)
308            .await
309            .map_err(|_| ConsensusError::Shutdown)?;
310
311        if !missing_excluded_ancestors.is_empty() {
312            self.context
313                .metrics
314                .node_metrics
315                .network_excluded_ancestors_sent_to_fetch
316                .with_label_values(&[peer_hostname])
317                .inc_by(missing_excluded_ancestors.len() as u64);
318
319            let synchronizer = self.synchronizer.clone();
320            tokio::spawn(async move {
321                // schedule the fetching of them from this peer in the background
322                if let Err(err) = synchronizer
323                    .fetch_blocks(missing_excluded_ancestors, peer)
324                    .await
325                {
326                    warn!(
327                        "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
328                    );
329                }
330            });
331        }
332
333        Ok(())
334    }
335
336    async fn handle_subscribe_blocks(
337        &self,
338        peer: AuthorityIndex,
339        last_received: Round,
340    ) -> ConsensusResult<BlockStream> {
341        fail_point_async!("consensus-rpc-response");
342
343        let dag_state = self.dag_state.read();
344        // Find recent own blocks that have not been received by the peer.
345        // If last_received is a valid and more blocks have been proposed since then,
346        // this call is guaranteed to return at least some recent blocks, which
347        // will help with liveness.
348        let missed_blocks = stream::iter(
349            dag_state
350                .get_cached_blocks(self.context.own_index, last_received + 1)
351                .into_iter()
352                .map(|block| ExtendedSerializedBlock {
353                    block: block.serialized().clone(),
354                    excluded_ancestors: vec![],
355                }),
356        );
357
358        let broadcasted_blocks = BroadcastedBlockStream::new(
359            peer,
360            self.rx_block_broadcaster.resubscribe(),
361            self.subscription_counter.clone(),
362        );
363
364        // Return a stream of blocks that first yields missed blocks as requested, then
365        // new blocks.
366        Ok(Box::pin(missed_blocks.chain(
367            broadcasted_blocks.map(ExtendedSerializedBlock::from),
368        )))
369    }
370
371    // Handles two types of requests:
372    // 1. Missing block for block sync:
373    //    - uses highest_accepted_rounds.
374    //    - at most max_blocks_per_sync blocks should be returned.
375    // 2. Committed block for commit sync:
376    //    - does not use highest_accepted_rounds.
377    //    - at most max_blocks_per_fetch blocks should be returned.
378    async fn handle_fetch_blocks(
379        &self,
380        peer: AuthorityIndex,
381        mut block_refs: Vec<BlockRef>,
382        highest_accepted_rounds: Vec<Round>,
383    ) -> ConsensusResult<Vec<Bytes>> {
384        // This method is used for both commit sync and periodic/live synchronizer.
385        // For commit sync, we do not use highest_accepted_rounds and the fetch size is
386        // larger.
387        let commit_sync_handle = highest_accepted_rounds.is_empty();
388
389        fail_point_async!("consensus-rpc-response");
390
391        // Some quick validation of the requested block refs
392        for block in &block_refs {
393            if !self.context.committee.is_valid_index(block.author) {
394                return Err(ConsensusError::InvalidAuthorityIndex {
395                    index: block.author,
396                    max: self.context.committee.size(),
397                });
398            }
399            if block.round == GENESIS_ROUND {
400                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
401            }
402        }
403
404        if !self.context.protocol_config.consensus_batched_block_sync() {
405            if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
406                return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
407            }
408
409            if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
410            {
411                return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
412                    highest_accepted_rounds.len(),
413                    self.context.committee.size(),
414                ));
415            }
416
417            // For now ask dag state directly
418            let blocks = self.dag_state.read().get_blocks(&block_refs);
419
420            // Now check if an ancestor's round is higher than the one that the peer has. If
421            // yes, then serve that ancestor blocks up to `MAX_ADDITIONAL_BLOCKS`.
422            let mut ancestor_blocks = vec![];
423            if !commit_sync_handle {
424                let all_ancestors = blocks
425                    .iter()
426                    .flatten()
427                    .flat_map(|block| block.ancestors().to_vec())
428                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
429                    .take(MAX_ADDITIONAL_BLOCKS)
430                    .collect::<Vec<_>>();
431
432                if !all_ancestors.is_empty() {
433                    ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
434                }
435            }
436
437            // Return the serialised blocks & the ancestor blocks
438            let result = blocks
439                .into_iter()
440                .chain(ancestor_blocks)
441                .flatten()
442                .map(|block| block.serialized().clone())
443                .collect::<Vec<_>>();
444
445            return Ok(result);
446        }
447
448        // For commit sync, the fetch size is larger. For periodic/live synchronizer,
449        // the fetch size is smaller.else { Instead of rejecting the request, we
450        // truncate the size to allow an easy update of this parameter in the future.
451        if commit_sync_handle {
452            block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
453        } else {
454            block_refs.truncate(self.context.parameters.max_blocks_per_sync);
455        }
456
457        // Get requested blocks from store.
458        let blocks = if commit_sync_handle {
459            // For commit sync, optimize by fetching from store for blocks below GC round
460            let gc_round = self.dag_state.read().gc_round();
461
462            // Partition block_refs into those below and at-or-above GC round
463            let (below_gc, above_gc): (Vec<_>, Vec<_>) = block_refs
464                .iter()
465                .partition(|block_ref| block_ref.round < gc_round);
466
467            let mut blocks = Vec::new();
468
469            // Fetch blocks below GC from store
470            if !below_gc.is_empty() {
471                let store_blocks = self
472                    .store
473                    .read_blocks(&below_gc)?
474                    .into_iter()
475                    .flatten()
476                    .collect::<Vec<_>>();
477                blocks.extend(store_blocks);
478            }
479
480            // Fetch blocks at-or-above GC from dag_state
481            if !above_gc.is_empty() {
482                let dag_blocks = self
483                    .dag_state
484                    .read()
485                    .get_blocks(&above_gc)
486                    .into_iter()
487                    .flatten()
488                    .collect::<Vec<_>>();
489                blocks.extend(dag_blocks);
490            }
491
492            blocks
493        } else {
494            // For periodic or live synchronizer, we respond with requested blocks from the
495            // store and with additional blocks from the cache
496            block_refs.sort();
497            block_refs.dedup();
498            let dag_state = self.dag_state.read();
499            let mut blocks = dag_state
500                .get_blocks(&block_refs)
501                .into_iter()
502                .flatten()
503                .collect::<Vec<_>>();
504
505            // Get additional blocks for authorities with missing block, if they are
506            // available in cache. Compute the lowest missing round per
507            // requested authority.
508            let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
509            for block_ref in blocks.iter().map(|b| b.reference()) {
510                let entry = lowest_missing_rounds
511                    .entry(block_ref.author)
512                    .or_insert(block_ref.round);
513                *entry = (*entry).min(block_ref.round);
514            }
515
516            // Retrieve additional blocks per authority, from peer's highest accepted round
517            // + 1 to lowest missing round (exclusive) per requested authority. Start with
518            //   own blocks.
519            let own_index = self.context.own_index;
520
521            // Collect and sort so own_index comes first
522            let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
523            ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
524
525            for (authority, lowest_missing_round) in ordered_missing_rounds {
526                let highest_accepted_round = highest_accepted_rounds[authority];
527                if highest_accepted_round >= lowest_missing_round {
528                    continue;
529                }
530
531                let missing_blocks = dag_state.get_cached_blocks_in_range(
532                    authority,
533                    highest_accepted_round + 1,
534                    lowest_missing_round,
535                    self.context
536                        .parameters
537                        .max_blocks_per_sync
538                        .saturating_sub(blocks.len()),
539                );
540                blocks.extend(missing_blocks);
541                if blocks.len() >= self.context.parameters.max_blocks_per_sync {
542                    blocks.truncate(self.context.parameters.max_blocks_per_sync);
543                    break;
544                }
545            }
546
547            blocks
548        };
549
550        // Return the serialized blocks
551        let bytes = blocks
552            .into_iter()
553            .map(|block| block.serialized().clone())
554            .collect::<Vec<_>>();
555        Ok(bytes)
556    }
557
558    async fn handle_fetch_commits(
559        &self,
560        _peer: AuthorityIndex,
561        commit_range: CommitRange,
562    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
563        fail_point_async!("consensus-rpc-response");
564
565        // Compute an inclusive end index and bound the maximum number of commits
566        // scanned.
567        let inclusive_end = commit_range.end().min(
568            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
569                - 1,
570        );
571        let mut commits = self
572            .store
573            .scan_commits((commit_range.start()..=inclusive_end).into())?;
574        let mut certifier_block_refs = vec![];
575        'commit: while let Some(c) = commits.last() {
576            let index = c.index();
577            let votes = self.store.read_commit_votes(index)?;
578            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
579            for v in &votes {
580                stake_aggregator.add(v.author, &self.context.committee);
581            }
582            if stake_aggregator.reached_threshold(&self.context.committee) {
583                certifier_block_refs = votes;
584                break 'commit;
585            } else {
586                debug!(
587                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
588                    index,
589                    stake_aggregator.stake(),
590                    stake_aggregator.threshold(&self.context.committee)
591                );
592                self.context
593                    .metrics
594                    .node_metrics
595                    .commit_sync_fetch_commits_handler_uncertified_skipped
596                    .inc();
597                commits.pop();
598            }
599        }
600        let certifier_blocks = self
601            .store
602            .read_blocks(&certifier_block_refs)?
603            .into_iter()
604            .flatten()
605            .collect();
606        Ok((commits, certifier_blocks))
607    }
608
609    async fn handle_fetch_latest_blocks(
610        &self,
611        peer: AuthorityIndex,
612        authorities: Vec<AuthorityIndex>,
613    ) -> ConsensusResult<Vec<Bytes>> {
614        fail_point_async!("consensus-rpc-response");
615
616        if authorities.len() > self.context.committee.size() {
617            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
618        }
619
620        // Ensure that those are valid authorities
621        for authority in &authorities {
622            if !self.context.committee.is_valid_index(*authority) {
623                return Err(ConsensusError::InvalidAuthorityIndex {
624                    index: *authority,
625                    max: self.context.committee.size(),
626                });
627            }
628        }
629
630        // Read from the dag state to find the latest blocks.
631        // TODO: at the moment we don't look into the block manager for suspended
632        // blocks. Ideally we want in the future if we think we would like to
633        // tackle the majority of cases.
634        let mut blocks = vec![];
635        let dag_state = self.dag_state.read();
636        for authority in authorities {
637            let block = dag_state.get_last_block_for_authority(authority);
638
639            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
640
641            // no reason to serve back the genesis block - it's equal as if it has not
642            // received any block
643            if block.round() != GENESIS_ROUND {
644                blocks.push(block);
645            }
646        }
647
648        // Return the serialised blocks
649        let result = blocks
650            .into_iter()
651            .map(|block| block.serialized().clone())
652            .collect::<Vec<_>>();
653
654        Ok(result)
655    }
656
657    async fn handle_get_latest_rounds(
658        &self,
659        _peer: AuthorityIndex,
660    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
661        fail_point_async!("consensus-rpc-response");
662
663        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
664
665        let blocks = self
666            .dag_state
667            .read()
668            .get_last_cached_block_per_authority(Round::MAX);
669        let highest_accepted_rounds = blocks
670            .into_iter()
671            .map(|(block, _)| block.round())
672            .collect::<Vec<_>>();
673
674        // Own blocks do not go through the core dispatcher, so they need to be set
675        // separately.
676        highest_received_rounds[self.context.own_index] =
677            highest_accepted_rounds[self.context.own_index];
678
679        Ok((highest_received_rounds, highest_accepted_rounds))
680    }
681}
682
683struct Counter {
684    count: usize,
685    subscriptions_by_authority: Vec<usize>,
686}
687
688/// Atomically counts the number of active subscriptions to the block broadcast
689/// stream, and dispatch commands to core based on the changes.
690struct SubscriptionCounter {
691    context: Arc<Context>,
692    counter: parking_lot::Mutex<Counter>,
693    dispatcher: Arc<dyn CoreThreadDispatcher>,
694}
695
696impl SubscriptionCounter {
697    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
698        // Set the subscribed peers by default to 0
699        for (_, authority) in context.committee.authorities() {
700            context
701                .metrics
702                .node_metrics
703                .subscribed_by
704                .with_label_values(&[authority.hostname.as_str()])
705                .set(0);
706        }
707
708        Self {
709            counter: parking_lot::Mutex::new(Counter {
710                count: 0,
711                subscriptions_by_authority: vec![0; context.committee.size()],
712            }),
713            dispatcher,
714            context,
715        }
716    }
717
718    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
719        let mut counter = self.counter.lock();
720        counter.count += 1;
721        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
722        counter.subscriptions_by_authority[peer] += 1;
723        let mut total_stake = 0;
724        for (authority_index, _) in self.context.committee.authorities() {
725            if counter.subscriptions_by_authority[authority_index] >= 1
726                || self.context.own_index == authority_index
727            {
728                total_stake += self.context.committee.stake(authority_index);
729            }
730        }
731        // Stake of subscriptions before a new peer was subscribed
732        let previous_stake = if original_subscription_by_peer == 0 {
733            total_stake - self.context.committee.stake(peer)
734        } else {
735            total_stake
736        };
737
738        let peer_hostname = &self.context.committee.authority(peer).hostname;
739        self.context
740            .metrics
741            .node_metrics
742            .subscribed_by
743            .with_label_values(&[peer_hostname])
744            .set(1);
745        // If the subscription count reaches quorum, notify the dispatcher and get ready
746        // to propose blocks.
747        if !self.context.committee.reached_quorum(previous_stake)
748            && self.context.committee.reached_quorum(total_stake)
749        {
750            self.dispatcher
751                .set_quorum_subscribers_exists(true)
752                .map_err(|_| ConsensusError::Shutdown)?;
753        }
754        // Drop the counter after sending the command to the dispatcher
755        drop(counter);
756        Ok(())
757    }
758
759    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
760        let mut counter = self.counter.lock();
761        counter.count -= 1;
762        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
763        counter.subscriptions_by_authority[peer] -= 1;
764        let mut total_stake = 0;
765        for (authority_index, _) in self.context.committee.authorities() {
766            if counter.subscriptions_by_authority[authority_index] >= 1
767                || self.context.own_index == authority_index
768            {
769                total_stake += self.context.committee.stake(authority_index);
770            }
771        }
772        // Stake of subscriptions before a peer was dropped
773        let previous_stake = if original_subscription_by_peer == 1 {
774            total_stake + self.context.committee.stake(peer)
775        } else {
776            total_stake
777        };
778
779        if counter.subscriptions_by_authority[peer] == 0 {
780            let peer_hostname = &self.context.committee.authority(peer).hostname;
781            self.context
782                .metrics
783                .node_metrics
784                .subscribed_by
785                .with_label_values(&[peer_hostname])
786                .set(0);
787        }
788
789        // If the subscription count drops below quorum, notify the dispatcher to stop
790        // proposing blocks.
791        if self.context.committee.reached_quorum(previous_stake)
792            && !self.context.committee.reached_quorum(total_stake)
793        {
794            self.dispatcher
795                .set_quorum_subscribers_exists(false)
796                .map_err(|_| ConsensusError::Shutdown)?;
797        }
798        // Drop the counter after sending the command to the dispatcher
799        drop(counter);
800        Ok(())
801    }
802}
803
804/// Each broadcasted block stream wraps a broadcast receiver for blocks.
805/// It yields blocks that are broadcasted after the stream is created.
806type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
807
808/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference
809/// is that this tolerates lags with only logging, without yielding errors.
810struct BroadcastStream<T> {
811    peer: AuthorityIndex,
812    // Stores the receiver across poll_next() calls.
813    inner: ReusableBoxFuture<
814        'static,
815        (
816            Result<T, broadcast::error::RecvError>,
817            broadcast::Receiver<T>,
818        ),
819    >,
820    // Counts total subscriptions / active BroadcastStreams.
821    subscription_counter: Arc<SubscriptionCounter>,
822}
823
824impl<T: 'static + Clone + Send> BroadcastStream<T> {
825    pub fn new(
826        peer: AuthorityIndex,
827        rx: broadcast::Receiver<T>,
828        subscription_counter: Arc<SubscriptionCounter>,
829    ) -> Self {
830        if let Err(err) = subscription_counter.increment(peer) {
831            match err {
832                ConsensusError::Shutdown => {}
833                _ => panic!("Unexpected error: {err}"),
834            }
835        }
836        Self {
837            peer,
838            inner: ReusableBoxFuture::new(make_recv_future(rx)),
839            subscription_counter,
840        }
841    }
842}
843
844impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
845    type Item = T;
846
847    fn poll_next(
848        mut self: Pin<&mut Self>,
849        cx: &mut task::Context<'_>,
850    ) -> task::Poll<Option<Self::Item>> {
851        let peer = self.peer;
852        let maybe_item = loop {
853            let (result, rx) = ready!(self.inner.poll(cx));
854            self.inner.set(make_recv_future(rx));
855
856            match result {
857                Ok(item) => break Some(item),
858                Err(broadcast::error::RecvError::Closed) => {
859                    info!("Block BroadcastedBlockStream {} closed", peer);
860                    break None;
861                }
862                Err(broadcast::error::RecvError::Lagged(n)) => {
863                    warn!(
864                        "Block BroadcastedBlockStream {} lagged by {} messages",
865                        peer, n
866                    );
867                    continue;
868                }
869            }
870        };
871        task::Poll::Ready(maybe_item)
872    }
873}
874
875impl<T> Drop for BroadcastStream<T> {
876    fn drop(&mut self) {
877        if let Err(err) = self.subscription_counter.decrement(self.peer) {
878            match err {
879                ConsensusError::Shutdown => {}
880                _ => panic!("Unexpected error: {err}"),
881            }
882        }
883    }
884}
885
886async fn make_recv_future<T: Clone>(
887    mut rx: broadcast::Receiver<T>,
888) -> (
889    Result<T, broadcast::error::RecvError>,
890    broadcast::Receiver<T>,
891) {
892    let result = rx.recv().await;
893    (result, rx)
894}
895
896// TODO: add a unit test for BroadcastStream.
897
898#[cfg(test)]
899pub(crate) mod tests {
900    use std::{
901        collections::{BTreeMap, BTreeSet},
902        sync::Arc,
903        time::Duration,
904    };
905
906    use async_trait::async_trait;
907    use bytes::Bytes;
908    use consensus_config::AuthorityIndex;
909    use parking_lot::{Mutex, RwLock};
910    use rstest::rstest;
911    use tokio::{sync::broadcast, time::sleep};
912
913    use crate::{
914        Round,
915        authority_service::AuthorityService,
916        block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
917        commit::{CertifiedCommits, CommitRange},
918        commit_vote_monitor::CommitVoteMonitor,
919        context::Context,
920        core_thread::{CoreError, CoreThreadDispatcher},
921        dag_state::DagState,
922        error::ConsensusResult,
923        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
924        round_prober::QuorumRound,
925        storage::mem_store::MemStore,
926        synchronizer::Synchronizer,
927        test_dag_builder::DagBuilder,
928    };
929
930    pub(crate) struct FakeCoreThreadDispatcher {
931        blocks: Mutex<Vec<VerifiedBlock>>,
932    }
933
934    impl FakeCoreThreadDispatcher {
935        pub(crate) fn new() -> Self {
936            Self {
937                blocks: Mutex::new(vec![]),
938            }
939        }
940
941        fn get_blocks(&self) -> Vec<VerifiedBlock> {
942            self.blocks.lock().clone()
943        }
944    }
945
946    #[async_trait]
947    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
948        async fn add_blocks(
949            &self,
950            blocks: Vec<VerifiedBlock>,
951        ) -> Result<BTreeSet<BlockRef>, CoreError> {
952            let block_refs = blocks.iter().map(|b| b.reference()).collect();
953            self.blocks.lock().extend(blocks);
954            Ok(block_refs)
955        }
956
957        async fn check_block_refs(
958            &self,
959            _block_refs: Vec<BlockRef>,
960        ) -> Result<BTreeSet<BlockRef>, CoreError> {
961            Ok(BTreeSet::new())
962        }
963
964        async fn add_certified_commits(
965            &self,
966            _commits: CertifiedCommits,
967        ) -> Result<BTreeSet<BlockRef>, CoreError> {
968            todo!()
969        }
970
971        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
972            Ok(())
973        }
974
975        async fn get_missing_blocks(
976            &self,
977        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
978            Ok(Default::default())
979        }
980
981        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
982            todo!()
983        }
984
985        fn set_propagation_delay_and_quorum_rounds(
986            &self,
987            _delay: Round,
988            _received_quorum_rounds: Vec<QuorumRound>,
989            _accepted_quorum_rounds: Vec<QuorumRound>,
990        ) -> Result<(), CoreError> {
991            todo!()
992        }
993
994        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
995            todo!()
996        }
997
998        fn highest_received_rounds(&self) -> Vec<Round> {
999            todo!()
1000        }
1001    }
1002
1003    #[derive(Default)]
1004    struct FakeNetworkClient {}
1005
1006    #[async_trait]
1007    impl NetworkClient for FakeNetworkClient {
1008        const SUPPORT_STREAMING: bool = false;
1009
1010        async fn send_block(
1011            &self,
1012            _peer: AuthorityIndex,
1013            _block: &VerifiedBlock,
1014            _timeout: Duration,
1015        ) -> ConsensusResult<()> {
1016            unimplemented!("Unimplemented")
1017        }
1018
1019        async fn subscribe_blocks(
1020            &self,
1021            _peer: AuthorityIndex,
1022            _last_received: Round,
1023            _timeout: Duration,
1024        ) -> ConsensusResult<BlockStream> {
1025            unimplemented!("Unimplemented")
1026        }
1027
1028        async fn fetch_blocks(
1029            &self,
1030            _peer: AuthorityIndex,
1031            _block_refs: Vec<BlockRef>,
1032            _highest_accepted_rounds: Vec<Round>,
1033            _timeout: Duration,
1034        ) -> ConsensusResult<Vec<Bytes>> {
1035            unimplemented!("Unimplemented")
1036        }
1037
1038        async fn fetch_commits(
1039            &self,
1040            _peer: AuthorityIndex,
1041            _commit_range: CommitRange,
1042            _timeout: Duration,
1043        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1044            unimplemented!("Unimplemented")
1045        }
1046
1047        async fn fetch_latest_blocks(
1048            &self,
1049            _peer: AuthorityIndex,
1050            _authorities: Vec<AuthorityIndex>,
1051            _timeout: Duration,
1052        ) -> ConsensusResult<Vec<Bytes>> {
1053            unimplemented!("Unimplemented")
1054        }
1055
1056        async fn get_latest_rounds(
1057            &self,
1058            _peer: AuthorityIndex,
1059            _timeout: Duration,
1060        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1061            unimplemented!("Unimplemented")
1062        }
1063    }
1064
1065    #[rstest]
1066    #[tokio::test(flavor = "current_thread", start_paused = true)]
1067    async fn test_handle_send_block(#[values(false, true)] median_based_timestamp: bool) {
1068        let (mut context, _keys) = Context::new_for_test(4);
1069        context
1070            .protocol_config
1071            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1072                median_based_timestamp,
1073            );
1074        let context = Arc::new(context);
1075        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1076        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1077        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1078        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1079        let network_client = Arc::new(FakeNetworkClient::default());
1080        let store = Arc::new(MemStore::new());
1081        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1082        let synchronizer = Synchronizer::start(
1083            network_client,
1084            context.clone(),
1085            core_dispatcher.clone(),
1086            commit_vote_monitor.clone(),
1087            block_verifier.clone(),
1088            dag_state.clone(),
1089            false,
1090        );
1091        let authority_service = Arc::new(AuthorityService::new(
1092            context.clone(),
1093            block_verifier,
1094            commit_vote_monitor,
1095            synchronizer,
1096            core_dispatcher.clone(),
1097            rx_block_broadcast,
1098            dag_state,
1099            store,
1100        ));
1101
1102        // Test delaying blocks with time drift.
1103        let now = context.clock.timestamp_utc_ms();
1104        let max_drift = context.parameters.max_forward_time_drift;
1105        let input_block = VerifiedBlock::new_for_test(
1106            TestBlock::new(9, 0)
1107                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1108                .build(),
1109        );
1110
1111        let service = authority_service.clone();
1112        let serialized = ExtendedSerializedBlock {
1113            block: input_block.serialized().clone(),
1114            excluded_ancestors: vec![],
1115        };
1116
1117        tokio::spawn(async move {
1118            service
1119                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1120                .await
1121                .unwrap();
1122        });
1123
1124        sleep(max_drift / 2).await;
1125
1126        if !median_based_timestamp {
1127            assert!(core_dispatcher.get_blocks().is_empty());
1128            sleep(max_drift).await;
1129        }
1130
1131        let blocks = core_dispatcher.get_blocks();
1132        assert_eq!(blocks.len(), 1);
1133        assert_eq!(blocks[0], input_block);
1134    }
1135
1136    #[tokio::test(flavor = "current_thread", start_paused = true)]
1137    async fn test_handle_fetch_latest_blocks() {
1138        // GIVEN
1139        let (context, _keys) = Context::new_for_test(4);
1140        let context = Arc::new(context);
1141        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1142        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1143        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1144        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1145        let network_client = Arc::new(FakeNetworkClient::default());
1146        let store = Arc::new(MemStore::new());
1147        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1148        let synchronizer = Synchronizer::start(
1149            network_client,
1150            context.clone(),
1151            core_dispatcher.clone(),
1152            commit_vote_monitor.clone(),
1153            block_verifier.clone(),
1154            dag_state.clone(),
1155            true,
1156        );
1157        let authority_service = Arc::new(AuthorityService::new(
1158            context.clone(),
1159            block_verifier,
1160            commit_vote_monitor,
1161            synchronizer,
1162            core_dispatcher.clone(),
1163            rx_block_broadcast,
1164            dag_state.clone(),
1165            store,
1166        ));
1167
1168        // Create some blocks for a few authorities. Create some equivocations as well
1169        // and store in dag state.
1170        let mut dag_builder = DagBuilder::new(context.clone());
1171        dag_builder
1172            .layers(1..=10)
1173            .authorities(vec![AuthorityIndex::new_for_test(2)])
1174            .equivocate(1)
1175            .build()
1176            .persist_layers(dag_state);
1177
1178        // WHEN
1179        let authorities_to_request = vec![
1180            AuthorityIndex::new_for_test(1),
1181            AuthorityIndex::new_for_test(2),
1182        ];
1183        let results = authority_service
1184            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1185            .await;
1186
1187        // THEN
1188        let serialised_blocks = results.unwrap();
1189        for serialised_block in serialised_blocks {
1190            let signed_block: SignedBlock =
1191                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1192            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1193
1194            assert_eq!(verified_block.round(), 10);
1195        }
1196    }
1197}