consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18    CommitIndex, Round,
19    block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    commit::{CommitAPI as _, CommitRange, TrustedCommit},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::Context,
24    core_thread::CoreThreadDispatcher,
25    dag_state::DagState,
26    error::{ConsensusError, ConsensusResult},
27    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28    stake_aggregator::{QuorumThreshold, StakeAggregator},
29    storage::Store,
30    synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35/// Authority's network service implementation, agnostic to the actual
36/// networking stack used.
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38    context: Arc<Context>,
39    commit_vote_monitor: Arc<CommitVoteMonitor>,
40    block_verifier: Arc<dyn BlockVerifier>,
41    synchronizer: Arc<SynchronizerHandle>,
42    core_dispatcher: Arc<C>,
43    rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44    subscription_counter: Arc<SubscriptionCounter>,
45    dag_state: Arc<RwLock<DagState>>,
46    store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50    pub(crate) fn new(
51        context: Arc<Context>,
52        block_verifier: Arc<dyn BlockVerifier>,
53        commit_vote_monitor: Arc<CommitVoteMonitor>,
54        synchronizer: Arc<SynchronizerHandle>,
55        core_dispatcher: Arc<C>,
56        rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57        dag_state: Arc<RwLock<DagState>>,
58        store: Arc<dyn Store>,
59    ) -> Self {
60        let subscription_counter = Arc::new(SubscriptionCounter::new(
61            context.clone(),
62            core_dispatcher.clone(),
63        ));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcaster,
71            subscription_counter,
72            dag_state,
73            store,
74        }
75    }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80    async fn handle_send_block(
81        &self,
82        peer: AuthorityIndex,
83        serialized_block: ExtendedSerializedBlock,
84    ) -> ConsensusResult<()> {
85        fail_point_async!("consensus-rpc-response");
86        let _s = self
87            .context
88            .metrics
89            .node_metrics
90            .scope_processing_time
91            .with_label_values(&["AuthorityService::handle_stream"])
92            .start_timer();
93        let peer_hostname = &self.context.committee.authority(peer).hostname;
94
95        // TODO: dedup block verifications, here and with fetched blocks.
96        let signed_block: SignedBlock =
97            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
98
99        // Reject blocks not produced by the peer.
100        if peer != signed_block.author() {
101            self.context
102                .metrics
103                .node_metrics
104                .invalid_blocks
105                .with_label_values(&[
106                    peer_hostname.as_str(),
107                    "handle_send_block",
108                    "UnexpectedAuthority",
109                ])
110                .inc();
111            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
112            info!("Block with wrong authority from {}: {}", peer, e);
113            return Err(e);
114        }
115        let peer_hostname = &self.context.committee.authority(peer).hostname;
116
117        // Reject blocks failing validations.
118        if let Err(e) = self.block_verifier.verify(&signed_block) {
119            self.context
120                .metrics
121                .node_metrics
122                .invalid_blocks
123                .with_label_values(&[
124                    peer_hostname.as_str(),
125                    "handle_send_block",
126                    e.clone().name(),
127                ])
128                .inc();
129            info!("Invalid block from {}: {}", peer, e);
130            return Err(e);
131        }
132        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
133        let block_ref = verified_block.reference();
134        debug!("Received block {} via send block.", block_ref);
135
136        let now = self.context.clock.timestamp_utc_ms();
137        let forward_time_drift =
138            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
139        let latency_to_process_stream =
140            Duration::from_millis(now.saturating_sub(verified_block.timestamp_ms()));
141        self.context
142            .metrics
143            .node_metrics
144            .latency_to_process_stream
145            .with_label_values(&[peer_hostname.as_str()])
146            .observe(latency_to_process_stream.as_secs_f64());
147
148        if !self
149            .context
150            .protocol_config
151            .consensus_median_timestamp_with_checkpoint_enforcement()
152        {
153            // Reject block with timestamp too far in the future.
154            if forward_time_drift > self.context.parameters.max_forward_time_drift {
155                self.context
156                    .metrics
157                    .node_metrics
158                    .rejected_future_blocks
159                    .with_label_values(&[peer_hostname])
160                    .inc();
161                debug!(
162                    "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
163                    block_ref,
164                    verified_block.timestamp_ms(),
165                    now,
166                );
167                return Err(ConsensusError::BlockRejected {
168                    block_ref,
169                    reason: format!(
170                        "Block timestamp is too far in the future: {} > {}",
171                        verified_block.timestamp_ms(),
172                        now
173                    ),
174                });
175            }
176
177            // Wait until the block's timestamp is current.
178            if forward_time_drift > Duration::ZERO {
179                self.context
180                    .metrics
181                    .node_metrics
182                    .block_timestamp_drift_ms
183                    .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
184                    .inc_by(forward_time_drift.as_millis() as u64);
185                debug!(
186                    "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
187                    block_ref,
188                    verified_block.timestamp_ms(),
189                    now,
190                    forward_time_drift.as_millis(),
191                );
192                sleep(forward_time_drift).await;
193            }
194        } else {
195            self.context
196                .metrics
197                .node_metrics
198                .block_timestamp_drift_ms
199                .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
200                .inc_by(forward_time_drift.as_millis() as u64);
201        }
202
203        // Observe the block for the commit votes. When local commit is lagging too
204        // much, commit sync loop will trigger fetching.
205        self.commit_vote_monitor.observe_block(&verified_block);
206
207        // Reject blocks when local commit index is lagging too far from quorum commit
208        // index.
209        //
210        // IMPORTANT: this must be done after observing votes from the block, otherwise
211        // observed quorum commit will no longer progress.
212        //
213        // Since the main issue with too many suspended blocks is memory usage not CPU,
214        // it is ok to reject after block verifications instead of before.
215        let last_commit_index = self.dag_state.read().last_commit_index();
216        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
217        // The threshold to ignore block should be larger than commit_sync_batch_size,
218        // to avoid excessive block rejections and synchronizations.
219        if last_commit_index
220            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
221            < quorum_commit_index
222        {
223            self.context
224                .metrics
225                .node_metrics
226                .rejected_blocks
227                .with_label_values(&["commit_lagging"])
228                .inc();
229            debug!(
230                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
231                block_ref, last_commit_index, quorum_commit_index,
232            );
233            return Err(ConsensusError::BlockRejected {
234                block_ref,
235                reason: format!(
236                    "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
237                ),
238            });
239        }
240
241        self.context
242            .metrics
243            .node_metrics
244            .verified_blocks
245            .with_label_values(&[peer_hostname])
246            .inc();
247
248        let missing_ancestors = self
249            .core_dispatcher
250            .add_blocks(vec![verified_block])
251            .await
252            .map_err(|_| ConsensusError::Shutdown)?;
253        if !missing_ancestors.is_empty() {
254            // schedule the fetching of them from this peer
255            if let Err(err) = self
256                .synchronizer
257                .fetch_blocks(missing_ancestors, peer)
258                .await
259            {
260                warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
261            }
262        }
263
264        // After processing the block, process the excluded ancestors
265
266        let mut excluded_ancestors = serialized_block
267            .excluded_ancestors
268            .into_iter()
269            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
270            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
271            .map_err(ConsensusError::MalformedBlock)?;
272
273        let excluded_ancestors_limit = self.context.committee.size() * 2;
274        if excluded_ancestors.len() > excluded_ancestors_limit {
275            debug!(
276                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
277                excluded_ancestors.len() - excluded_ancestors_limit,
278                peer,
279                peer_hostname,
280            );
281            excluded_ancestors.truncate(excluded_ancestors_limit);
282        }
283
284        self.context
285            .metrics
286            .node_metrics
287            .network_received_excluded_ancestors_from_authority
288            .with_label_values(&[peer_hostname])
289            .inc_by(excluded_ancestors.len() as u64);
290
291        for excluded_ancestor in &excluded_ancestors {
292            let excluded_ancestor_hostname = &self
293                .context
294                .committee
295                .authority(excluded_ancestor.author)
296                .hostname;
297            self.context
298                .metrics
299                .node_metrics
300                .network_excluded_ancestors_count_by_authority
301                .with_label_values(&[excluded_ancestor_hostname])
302                .inc();
303        }
304
305        let missing_excluded_ancestors = self
306            .core_dispatcher
307            .check_block_refs(excluded_ancestors)
308            .await
309            .map_err(|_| ConsensusError::Shutdown)?;
310
311        if !missing_excluded_ancestors.is_empty() {
312            self.context
313                .metrics
314                .node_metrics
315                .network_excluded_ancestors_sent_to_fetch
316                .with_label_values(&[peer_hostname])
317                .inc_by(missing_excluded_ancestors.len() as u64);
318
319            let synchronizer = self.synchronizer.clone();
320            tokio::spawn(async move {
321                // schedule the fetching of them from this peer in the background
322                if let Err(err) = synchronizer
323                    .fetch_blocks(missing_excluded_ancestors, peer)
324                    .await
325                {
326                    warn!(
327                        "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
328                    );
329                }
330            });
331        }
332
333        Ok(())
334    }
335
336    async fn handle_subscribe_blocks(
337        &self,
338        peer: AuthorityIndex,
339        last_received: Round,
340    ) -> ConsensusResult<BlockStream> {
341        fail_point_async!("consensus-rpc-response");
342
343        let dag_state = self.dag_state.read();
344        // Find recent own blocks that have not been received by the peer.
345        // If last_received is a valid and more blocks have been proposed since then,
346        // this call is guaranteed to return at least some recent blocks, which
347        // will help with liveness.
348        let missed_blocks = stream::iter(
349            dag_state
350                .get_cached_blocks(self.context.own_index, last_received + 1)
351                .into_iter()
352                .map(|block| ExtendedSerializedBlock {
353                    block: block.serialized().clone(),
354                    excluded_ancestors: vec![],
355                }),
356        );
357
358        let broadcasted_blocks = BroadcastedBlockStream::new(
359            peer,
360            self.rx_block_broadcaster.resubscribe(),
361            self.subscription_counter.clone(),
362        );
363
364        // Return a stream of blocks that first yields missed blocks as requested, then
365        // new blocks.
366        Ok(Box::pin(missed_blocks.chain(
367            broadcasted_blocks.map(ExtendedSerializedBlock::from),
368        )))
369    }
370
371    // Handles two types of requests:
372    // 1. Missing block for block sync:
373    //    - uses highest_accepted_rounds.
374    //    - at most max_blocks_per_sync blocks should be returned.
375    // 2. Committed block for commit sync:
376    //    - does not use highest_accepted_rounds.
377    //    - at most max_blocks_per_fetch blocks should be returned.
378    async fn handle_fetch_blocks(
379        &self,
380        peer: AuthorityIndex,
381        mut block_refs: Vec<BlockRef>,
382        highest_accepted_rounds: Vec<Round>,
383    ) -> ConsensusResult<Vec<Bytes>> {
384        // This method is used for both commit sync and periodic/live synchronizer.
385        // For commit sync, we do not use highest_accepted_rounds and the fetch size is
386        // larger.
387        let commit_sync_handle = highest_accepted_rounds.is_empty();
388
389        fail_point_async!("consensus-rpc-response");
390
391        // Some quick validation of the requested block refs
392        for block in &block_refs {
393            if !self.context.committee.is_valid_index(block.author) {
394                return Err(ConsensusError::InvalidAuthorityIndex {
395                    index: block.author,
396                    max: self.context.committee.size(),
397                });
398            }
399            if block.round == GENESIS_ROUND {
400                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
401            }
402        }
403
404        if !self.context.protocol_config.consensus_batched_block_sync() {
405            if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
406                return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
407            }
408
409            if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
410            {
411                return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
412                    highest_accepted_rounds.len(),
413                    self.context.committee.size(),
414                ));
415            }
416
417            // For now ask dag state directly
418            let blocks = self.dag_state.read().get_blocks(&block_refs);
419
420            // Now check if an ancestor's round is higher than the one that the peer has. If
421            // yes, then serve that ancestor blocks up to `MAX_ADDITIONAL_BLOCKS`.
422            let mut ancestor_blocks = vec![];
423            if !commit_sync_handle {
424                let all_ancestors = blocks
425                    .iter()
426                    .flatten()
427                    .flat_map(|block| block.ancestors().to_vec())
428                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
429                    .take(MAX_ADDITIONAL_BLOCKS)
430                    .collect::<Vec<_>>();
431
432                if !all_ancestors.is_empty() {
433                    ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
434                }
435            }
436
437            // Return the serialised blocks & the ancestor blocks
438            let result = blocks
439                .into_iter()
440                .chain(ancestor_blocks)
441                .flatten()
442                .map(|block| block.serialized().clone())
443                .collect::<Vec<_>>();
444
445            return Ok(result);
446        }
447
448        // For commit sync, the fetch size is larger. For periodic/live synchronizer,
449        // the fetch size is smaller.else { Instead of rejecting the request, we
450        // truncate the size to allow an easy update of this parameter in the future.
451        if commit_sync_handle {
452            block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
453        } else {
454            block_refs.truncate(self.context.parameters.max_blocks_per_sync);
455        }
456
457        // Get requested blocks from store.
458        let blocks = if commit_sync_handle {
459            // For commit sync, optimize by fetching from store for blocks below GC round
460            let gc_round = self.dag_state.read().gc_round();
461
462            // Separate indices for below/above GC while preserving original order
463            let mut below_gc_indices = Vec::new();
464            let mut above_gc_indices = Vec::new();
465            let mut below_gc_refs = Vec::new();
466            let mut above_gc_refs = Vec::new();
467            for (i, block_ref) in block_refs.iter().enumerate() {
468                if block_ref.round < gc_round {
469                    below_gc_indices.push(i);
470                    below_gc_refs.push(*block_ref);
471                } else {
472                    above_gc_indices.push(i);
473                    above_gc_refs.push(*block_ref);
474                }
475            }
476
477            let mut blocks: Vec<Option<VerifiedBlock>> = vec![None; block_refs.len()];
478
479            // Fetch blocks below GC from store
480            if !below_gc_refs.is_empty() {
481                for (idx, block) in below_gc_indices
482                    .iter()
483                    .zip(self.store.read_blocks(&below_gc_refs)?)
484                {
485                    blocks[*idx] = block;
486                }
487            }
488
489            // Fetch blocks at-or-above GC from dag_state
490            if !above_gc_refs.is_empty() {
491                for (idx, block) in above_gc_indices
492                    .iter()
493                    .zip(self.dag_state.read().get_blocks(&above_gc_refs))
494                {
495                    blocks[*idx] = block;
496                }
497            }
498
499            blocks.into_iter().flatten().collect()
500        } else {
501            // For periodic or live synchronizer, we respond with requested blocks from the
502            // store and with additional blocks from the cache
503            block_refs.sort();
504            block_refs.dedup();
505            let dag_state = self.dag_state.read();
506            let mut blocks = dag_state
507                .get_blocks(&block_refs)
508                .into_iter()
509                .flatten()
510                .collect::<Vec<_>>();
511
512            // Get additional blocks for authorities with missing block, if they are
513            // available in cache. Compute the lowest missing round per
514            // requested authority.
515            let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
516            for block_ref in blocks.iter().map(|b| b.reference()) {
517                let entry = lowest_missing_rounds
518                    .entry(block_ref.author)
519                    .or_insert(block_ref.round);
520                *entry = (*entry).min(block_ref.round);
521            }
522
523            // Retrieve additional blocks per authority, from peer's highest accepted round
524            // + 1 to lowest missing round (exclusive) per requested authority. Start with
525            //   own blocks.
526            let own_index = self.context.own_index;
527
528            // Collect and sort so own_index comes first
529            let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
530            ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
531
532            for (authority, lowest_missing_round) in ordered_missing_rounds {
533                let highest_accepted_round = highest_accepted_rounds[authority];
534                if highest_accepted_round >= lowest_missing_round {
535                    continue;
536                }
537
538                let missing_blocks = dag_state.get_cached_blocks_in_range(
539                    authority,
540                    highest_accepted_round + 1,
541                    lowest_missing_round,
542                    self.context
543                        .parameters
544                        .max_blocks_per_sync
545                        .saturating_sub(blocks.len()),
546                );
547                blocks.extend(missing_blocks);
548                if blocks.len() >= self.context.parameters.max_blocks_per_sync {
549                    blocks.truncate(self.context.parameters.max_blocks_per_sync);
550                    break;
551                }
552            }
553
554            blocks
555        };
556
557        // Return the serialized blocks
558        let bytes = blocks
559            .into_iter()
560            .map(|block| block.serialized().clone())
561            .collect::<Vec<_>>();
562        Ok(bytes)
563    }
564
565    async fn handle_fetch_commits(
566        &self,
567        _peer: AuthorityIndex,
568        commit_range: CommitRange,
569    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
570        fail_point_async!("consensus-rpc-response");
571
572        // Compute an inclusive end index and bound the maximum number of commits
573        // scanned.
574        let inclusive_end = commit_range.end().min(
575            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
576                - 1,
577        );
578        let mut commits = self
579            .store
580            .scan_commits((commit_range.start()..=inclusive_end).into())?;
581        let mut certifier_block_refs = vec![];
582        'commit: while let Some(c) = commits.last() {
583            let index = c.index();
584            let votes = self.store.read_commit_votes(index)?;
585            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
586            for v in &votes {
587                stake_aggregator.add(v.author, &self.context.committee);
588            }
589            if stake_aggregator.reached_threshold(&self.context.committee) {
590                certifier_block_refs = votes;
591                break 'commit;
592            } else {
593                debug!(
594                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
595                    index,
596                    stake_aggregator.stake(),
597                    stake_aggregator.threshold(&self.context.committee)
598                );
599                self.context
600                    .metrics
601                    .node_metrics
602                    .commit_sync_fetch_commits_handler_uncertified_skipped
603                    .inc();
604                commits.pop();
605            }
606        }
607        let certifier_blocks = self
608            .store
609            .read_blocks(&certifier_block_refs)?
610            .into_iter()
611            .flatten()
612            .collect();
613        Ok((commits, certifier_blocks))
614    }
615
616    async fn handle_fetch_latest_blocks(
617        &self,
618        peer: AuthorityIndex,
619        authorities: Vec<AuthorityIndex>,
620    ) -> ConsensusResult<Vec<Bytes>> {
621        fail_point_async!("consensus-rpc-response");
622
623        if authorities.len() > self.context.committee.size() {
624            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
625        }
626
627        // Ensure that those are valid authorities
628        for authority in &authorities {
629            if !self.context.committee.is_valid_index(*authority) {
630                return Err(ConsensusError::InvalidAuthorityIndex {
631                    index: *authority,
632                    max: self.context.committee.size(),
633                });
634            }
635        }
636
637        // Read from the dag state to find the latest blocks.
638        // TODO: at the moment we don't look into the block manager for suspended
639        // blocks. Ideally we want in the future if we think we would like to
640        // tackle the majority of cases.
641        let mut blocks = vec![];
642        let dag_state = self.dag_state.read();
643        for authority in authorities {
644            let block = dag_state.get_last_block_for_authority(authority);
645
646            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
647
648            // no reason to serve back the genesis block - it's equal as if it has not
649            // received any block
650            if block.round() != GENESIS_ROUND {
651                blocks.push(block);
652            }
653        }
654
655        // Return the serialised blocks
656        let result = blocks
657            .into_iter()
658            .map(|block| block.serialized().clone())
659            .collect::<Vec<_>>();
660
661        Ok(result)
662    }
663
664    async fn handle_get_latest_rounds(
665        &self,
666        _peer: AuthorityIndex,
667    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
668        fail_point_async!("consensus-rpc-response");
669
670        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
671
672        let blocks = self
673            .dag_state
674            .read()
675            .get_last_cached_block_per_authority(Round::MAX);
676        let highest_accepted_rounds = blocks
677            .into_iter()
678            .map(|(block, _)| block.round())
679            .collect::<Vec<_>>();
680
681        // Own blocks do not go through the core dispatcher, so they need to be set
682        // separately.
683        highest_received_rounds[self.context.own_index] =
684            highest_accepted_rounds[self.context.own_index];
685
686        Ok((highest_received_rounds, highest_accepted_rounds))
687    }
688}
689
690struct Counter {
691    count: usize,
692    subscriptions_by_authority: Vec<usize>,
693}
694
695/// Atomically counts the number of active subscriptions to the block broadcast
696/// stream, and dispatch commands to core based on the changes.
697struct SubscriptionCounter {
698    context: Arc<Context>,
699    counter: parking_lot::Mutex<Counter>,
700    dispatcher: Arc<dyn CoreThreadDispatcher>,
701}
702
703impl SubscriptionCounter {
704    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
705        // Set the subscribed peers by default to 0
706        for (_, authority) in context.committee.authorities() {
707            context
708                .metrics
709                .node_metrics
710                .subscribed_by
711                .with_label_values(&[authority.hostname.as_str()])
712                .set(0);
713        }
714
715        Self {
716            counter: parking_lot::Mutex::new(Counter {
717                count: 0,
718                subscriptions_by_authority: vec![0; context.committee.size()],
719            }),
720            dispatcher,
721            context,
722        }
723    }
724
725    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
726        let mut counter = self.counter.lock();
727        counter.count += 1;
728        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
729        counter.subscriptions_by_authority[peer] += 1;
730        let mut total_stake = 0;
731        for (authority_index, _) in self.context.committee.authorities() {
732            if counter.subscriptions_by_authority[authority_index] >= 1
733                || self.context.own_index == authority_index
734            {
735                total_stake += self.context.committee.stake(authority_index);
736            }
737        }
738        // Stake of subscriptions before a new peer was subscribed
739        let previous_stake = if original_subscription_by_peer == 0 {
740            total_stake - self.context.committee.stake(peer)
741        } else {
742            total_stake
743        };
744
745        let peer_hostname = &self.context.committee.authority(peer).hostname;
746        self.context
747            .metrics
748            .node_metrics
749            .subscribed_by
750            .with_label_values(&[peer_hostname])
751            .set(1);
752        // If the subscription count reaches quorum, notify the dispatcher and get ready
753        // to propose blocks.
754        if !self.context.committee.reached_quorum(previous_stake)
755            && self.context.committee.reached_quorum(total_stake)
756        {
757            self.dispatcher
758                .set_quorum_subscribers_exists(true)
759                .map_err(|_| ConsensusError::Shutdown)?;
760        }
761        // Drop the counter after sending the command to the dispatcher
762        drop(counter);
763        Ok(())
764    }
765
766    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
767        let mut counter = self.counter.lock();
768        counter.count -= 1;
769        let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
770        counter.subscriptions_by_authority[peer] -= 1;
771        let mut total_stake = 0;
772        for (authority_index, _) in self.context.committee.authorities() {
773            if counter.subscriptions_by_authority[authority_index] >= 1
774                || self.context.own_index == authority_index
775            {
776                total_stake += self.context.committee.stake(authority_index);
777            }
778        }
779        // Stake of subscriptions before a peer was dropped
780        let previous_stake = if original_subscription_by_peer == 1 {
781            total_stake + self.context.committee.stake(peer)
782        } else {
783            total_stake
784        };
785
786        if counter.subscriptions_by_authority[peer] == 0 {
787            let peer_hostname = &self.context.committee.authority(peer).hostname;
788            self.context
789                .metrics
790                .node_metrics
791                .subscribed_by
792                .with_label_values(&[peer_hostname])
793                .set(0);
794        }
795
796        // If the subscription count drops below quorum, notify the dispatcher to stop
797        // proposing blocks.
798        if self.context.committee.reached_quorum(previous_stake)
799            && !self.context.committee.reached_quorum(total_stake)
800        {
801            self.dispatcher
802                .set_quorum_subscribers_exists(false)
803                .map_err(|_| ConsensusError::Shutdown)?;
804        }
805        // Drop the counter after sending the command to the dispatcher
806        drop(counter);
807        Ok(())
808    }
809}
810
811/// Each broadcasted block stream wraps a broadcast receiver for blocks.
812/// It yields blocks that are broadcasted after the stream is created.
813type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
814
815/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference
816/// is that this tolerates lags with only logging, without yielding errors.
817struct BroadcastStream<T> {
818    peer: AuthorityIndex,
819    // Stores the receiver across poll_next() calls.
820    inner: ReusableBoxFuture<
821        'static,
822        (
823            Result<T, broadcast::error::RecvError>,
824            broadcast::Receiver<T>,
825        ),
826    >,
827    // Counts total subscriptions / active BroadcastStreams.
828    subscription_counter: Arc<SubscriptionCounter>,
829}
830
831impl<T: 'static + Clone + Send> BroadcastStream<T> {
832    pub fn new(
833        peer: AuthorityIndex,
834        rx: broadcast::Receiver<T>,
835        subscription_counter: Arc<SubscriptionCounter>,
836    ) -> Self {
837        if let Err(err) = subscription_counter.increment(peer) {
838            match err {
839                ConsensusError::Shutdown => {}
840                _ => panic!("Unexpected error: {err}"),
841            }
842        }
843        Self {
844            peer,
845            inner: ReusableBoxFuture::new(make_recv_future(rx)),
846            subscription_counter,
847        }
848    }
849}
850
851impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
852    type Item = T;
853
854    fn poll_next(
855        mut self: Pin<&mut Self>,
856        cx: &mut task::Context<'_>,
857    ) -> task::Poll<Option<Self::Item>> {
858        let peer = self.peer;
859        let maybe_item = loop {
860            let (result, rx) = ready!(self.inner.poll(cx));
861            self.inner.set(make_recv_future(rx));
862
863            match result {
864                Ok(item) => break Some(item),
865                Err(broadcast::error::RecvError::Closed) => {
866                    info!("Block BroadcastedBlockStream {} closed", peer);
867                    break None;
868                }
869                Err(broadcast::error::RecvError::Lagged(n)) => {
870                    warn!(
871                        "Block BroadcastedBlockStream {} lagged by {} messages",
872                        peer, n
873                    );
874                    continue;
875                }
876            }
877        };
878        task::Poll::Ready(maybe_item)
879    }
880}
881
882impl<T> Drop for BroadcastStream<T> {
883    fn drop(&mut self) {
884        if let Err(err) = self.subscription_counter.decrement(self.peer) {
885            match err {
886                ConsensusError::Shutdown => {}
887                _ => panic!("Unexpected error: {err}"),
888            }
889        }
890    }
891}
892
893async fn make_recv_future<T: Clone>(
894    mut rx: broadcast::Receiver<T>,
895) -> (
896    Result<T, broadcast::error::RecvError>,
897    broadcast::Receiver<T>,
898) {
899    let result = rx.recv().await;
900    (result, rx)
901}
902
903// TODO: add a unit test for BroadcastStream.
904
905#[cfg(test)]
906pub(crate) mod tests {
907    use std::{
908        collections::{BTreeMap, BTreeSet},
909        sync::Arc,
910        time::Duration,
911    };
912
913    use async_trait::async_trait;
914    use bytes::Bytes;
915    use consensus_config::AuthorityIndex;
916    use parking_lot::{Mutex, RwLock};
917    use rstest::rstest;
918    use tokio::{sync::broadcast, time::sleep};
919
920    use crate::{
921        Round,
922        authority_service::AuthorityService,
923        block::{BlockAPI, BlockRef, GENESIS_ROUND, SignedBlock, TestBlock, VerifiedBlock},
924        commit::{CertifiedCommits, CommitDigest, CommitRange, TrustedCommit},
925        commit_vote_monitor::CommitVoteMonitor,
926        context::Context,
927        core_thread::{CoreError, CoreThreadDispatcher},
928        dag_state::DagState,
929        error::ConsensusResult,
930        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
931        round_prober::QuorumRound,
932        storage::{Store, WriteBatch, mem_store::MemStore},
933        synchronizer::Synchronizer,
934        test_dag_builder::DagBuilder,
935    };
936
937    pub(crate) struct FakeCoreThreadDispatcher {
938        blocks: Mutex<Vec<VerifiedBlock>>,
939    }
940
941    impl FakeCoreThreadDispatcher {
942        pub(crate) fn new() -> Self {
943            Self {
944                blocks: Mutex::new(vec![]),
945            }
946        }
947
948        fn get_blocks(&self) -> Vec<VerifiedBlock> {
949            self.blocks.lock().clone()
950        }
951    }
952
953    #[async_trait]
954    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
955        async fn add_blocks(
956            &self,
957            blocks: Vec<VerifiedBlock>,
958        ) -> Result<BTreeSet<BlockRef>, CoreError> {
959            let block_refs = blocks.iter().map(|b| b.reference()).collect();
960            self.blocks.lock().extend(blocks);
961            Ok(block_refs)
962        }
963
964        async fn check_block_refs(
965            &self,
966            _block_refs: Vec<BlockRef>,
967        ) -> Result<BTreeSet<BlockRef>, CoreError> {
968            Ok(BTreeSet::new())
969        }
970
971        async fn add_certified_commits(
972            &self,
973            _commits: CertifiedCommits,
974        ) -> Result<BTreeSet<BlockRef>, CoreError> {
975            todo!()
976        }
977
978        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
979            Ok(())
980        }
981
982        async fn get_missing_blocks(
983            &self,
984        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
985            Ok(Default::default())
986        }
987
988        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
989            todo!()
990        }
991
992        fn set_propagation_delay_and_quorum_rounds(
993            &self,
994            _delay: Round,
995            _received_quorum_rounds: Vec<QuorumRound>,
996            _accepted_quorum_rounds: Vec<QuorumRound>,
997        ) -> Result<(), CoreError> {
998            todo!()
999        }
1000
1001        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
1002            todo!()
1003        }
1004
1005        fn highest_received_rounds(&self) -> Vec<Round> {
1006            todo!()
1007        }
1008    }
1009
1010    #[derive(Default)]
1011    pub(crate) struct FakeNetworkClient {}
1012
1013    #[async_trait]
1014    impl NetworkClient for FakeNetworkClient {
1015        const SUPPORT_STREAMING: bool = false;
1016
1017        async fn send_block(
1018            &self,
1019            _peer: AuthorityIndex,
1020            _block: &VerifiedBlock,
1021            _timeout: Duration,
1022        ) -> ConsensusResult<()> {
1023            unimplemented!("Unimplemented")
1024        }
1025
1026        async fn subscribe_blocks(
1027            &self,
1028            _peer: AuthorityIndex,
1029            _last_received: Round,
1030            _timeout: Duration,
1031        ) -> ConsensusResult<BlockStream> {
1032            unimplemented!("Unimplemented")
1033        }
1034
1035        async fn fetch_blocks(
1036            &self,
1037            _peer: AuthorityIndex,
1038            _block_refs: Vec<BlockRef>,
1039            _highest_accepted_rounds: Vec<Round>,
1040            _timeout: Duration,
1041        ) -> ConsensusResult<Vec<Bytes>> {
1042            unimplemented!("Unimplemented")
1043        }
1044
1045        async fn fetch_commits(
1046            &self,
1047            _peer: AuthorityIndex,
1048            _commit_range: CommitRange,
1049            _timeout: Duration,
1050        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1051            unimplemented!("Unimplemented")
1052        }
1053
1054        async fn fetch_latest_blocks(
1055            &self,
1056            _peer: AuthorityIndex,
1057            _authorities: Vec<AuthorityIndex>,
1058            _timeout: Duration,
1059        ) -> ConsensusResult<Vec<Bytes>> {
1060            unimplemented!("Unimplemented")
1061        }
1062
1063        async fn get_latest_rounds(
1064            &self,
1065            _peer: AuthorityIndex,
1066            _timeout: Duration,
1067        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1068            unimplemented!("Unimplemented")
1069        }
1070    }
1071
1072    #[rstest]
1073    #[tokio::test(flavor = "current_thread", start_paused = true)]
1074    async fn test_handle_send_block(#[values(false, true)] median_based_timestamp: bool) {
1075        let (mut context, _keys) = Context::new_for_test(4);
1076        context
1077            .protocol_config
1078            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1079                median_based_timestamp,
1080            );
1081        let context = Arc::new(context);
1082        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1083        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1084        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1085        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1086        let network_client = Arc::new(FakeNetworkClient::default());
1087        let store = Arc::new(MemStore::new());
1088        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1089        let synchronizer = Synchronizer::start(
1090            network_client,
1091            context.clone(),
1092            core_dispatcher.clone(),
1093            commit_vote_monitor.clone(),
1094            block_verifier.clone(),
1095            dag_state.clone(),
1096            false,
1097        );
1098        let authority_service = Arc::new(AuthorityService::new(
1099            context.clone(),
1100            block_verifier,
1101            commit_vote_monitor,
1102            synchronizer,
1103            core_dispatcher.clone(),
1104            rx_block_broadcast,
1105            dag_state,
1106            store,
1107        ));
1108
1109        // Test delaying blocks with time drift.
1110        let now = context.clock.timestamp_utc_ms();
1111        let max_drift = context.parameters.max_forward_time_drift;
1112        let input_block = VerifiedBlock::new_for_test(
1113            TestBlock::new(9, 0)
1114                .set_timestamp_ms(now + max_drift.as_millis() as u64)
1115                .build(),
1116        );
1117
1118        let service = authority_service.clone();
1119        let serialized = ExtendedSerializedBlock {
1120            block: input_block.serialized().clone(),
1121            excluded_ancestors: vec![],
1122        };
1123
1124        tokio::spawn(async move {
1125            service
1126                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1127                .await
1128                .unwrap();
1129        });
1130
1131        sleep(max_drift / 2).await;
1132
1133        if !median_based_timestamp {
1134            assert!(core_dispatcher.get_blocks().is_empty());
1135            sleep(max_drift).await;
1136        }
1137
1138        let blocks = core_dispatcher.get_blocks();
1139        assert_eq!(blocks.len(), 1);
1140        assert_eq!(blocks[0], input_block);
1141    }
1142
1143    #[tokio::test(flavor = "current_thread", start_paused = true)]
1144    async fn test_handle_fetch_latest_blocks() {
1145        // GIVEN
1146        let (context, _keys) = Context::new_for_test(4);
1147        let context = Arc::new(context);
1148        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1149        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1150        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1151        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1152        let network_client = Arc::new(FakeNetworkClient::default());
1153        let store = Arc::new(MemStore::new());
1154        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1155        let synchronizer = Synchronizer::start(
1156            network_client,
1157            context.clone(),
1158            core_dispatcher.clone(),
1159            commit_vote_monitor.clone(),
1160            block_verifier.clone(),
1161            dag_state.clone(),
1162            true,
1163        );
1164        let authority_service = Arc::new(AuthorityService::new(
1165            context.clone(),
1166            block_verifier,
1167            commit_vote_monitor,
1168            synchronizer,
1169            core_dispatcher.clone(),
1170            rx_block_broadcast,
1171            dag_state.clone(),
1172            store,
1173        ));
1174
1175        // Create some blocks for a few authorities. Create some equivocations as well
1176        // and store in dag state.
1177        let mut dag_builder = DagBuilder::new(context.clone());
1178        dag_builder
1179            .layers(1..=10)
1180            .authorities(vec![AuthorityIndex::new_for_test(2)])
1181            .equivocate(1)
1182            .build()
1183            .persist_layers(dag_state);
1184
1185        // WHEN
1186        let authorities_to_request = vec![
1187            AuthorityIndex::new_for_test(1),
1188            AuthorityIndex::new_for_test(2),
1189        ];
1190        let results = authority_service
1191            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1192            .await;
1193
1194        // THEN
1195        let serialised_blocks = results.unwrap();
1196        for serialised_block in serialised_blocks {
1197            let signed_block: SignedBlock =
1198                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1199            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1200
1201            assert_eq!(verified_block.round(), 10);
1202        }
1203    }
1204
1205    /// Tests that handle_fetch_blocks preserves the original request order
1206    /// of block refs when they span the GC boundary — i.e. some are fetched
1207    /// from the persistent store (below GC) and others from in-memory
1208    /// dag_state (at or above GC). The interleaved input order must be
1209    /// maintained in the response.
1210    #[tokio::test(flavor = "current_thread", start_paused = true)]
1211    async fn test_handle_fetch_blocks_commit_sync_order_across_gc_boundary() {
1212        // GIVEN
1213        let rounds = 20;
1214        let gc_depth = 5;
1215        let (mut context, _keys) = Context::new_for_test(4);
1216        context
1217            .protocol_config
1218            .set_consensus_batched_block_sync_for_testing(true);
1219        context.protocol_config.set_gc_depth_for_testing(gc_depth);
1220        let context = Arc::new(context);
1221        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1222        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1223        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1224        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1225        let network_client = Arc::new(FakeNetworkClient::default());
1226        let store = Arc::new(MemStore::new());
1227        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1228        let synchronizer = Synchronizer::start(
1229            network_client,
1230            context.clone(),
1231            core_dispatcher.clone(),
1232            commit_vote_monitor.clone(),
1233            block_verifier.clone(),
1234            dag_state.clone(),
1235            true,
1236        );
1237        let authority_service = Arc::new(AuthorityService::new(
1238            context.clone(),
1239            block_verifier,
1240            commit_vote_monitor,
1241            synchronizer,
1242            core_dispatcher.clone(),
1243            rx_block_broadcast,
1244            dag_state.clone(),
1245            store.clone(),
1246        ));
1247
1248        // Build DAG and persist all blocks to dag_state
1249        let mut dag_builder = DagBuilder::new(context.clone());
1250        dag_builder
1251            .layers(1..=rounds)
1252            .build()
1253            .persist_layers(dag_state.clone());
1254
1255        // Also write all blocks to the store so below-GC refs can be found
1256        let all_blocks = dag_builder.blocks(1..=rounds);
1257        store
1258            .write(WriteBatch::new(all_blocks, vec![], vec![], vec![]))
1259            .expect("Failed to write blocks to store");
1260
1261        // Set last_commit so gc_round() = leader_round - gc_depth = 15 - 5 = 10
1262        let leader_round = 15;
1263        let leader_ref = dag_builder
1264            .blocks(leader_round..=leader_round)
1265            .first()
1266            .unwrap()
1267            .reference();
1268        let commit =
1269            TrustedCommit::new_for_test(1, CommitDigest::MIN, 0, leader_ref, vec![leader_ref]);
1270        dag_state.write().set_last_commit(commit);
1271
1272        let gc_round = dag_state.read().gc_round();
1273        assert!(
1274            gc_round > GENESIS_ROUND && gc_round < rounds,
1275            "GC round {gc_round} should be between genesis and max round"
1276        );
1277
1278        // Collect blocks per round for easy access
1279        let mut blocks_by_round: Vec<Vec<VerifiedBlock>> = vec![vec![]; (rounds + 1) as usize];
1280        for round in 1..=rounds {
1281            blocks_by_round[round as usize] = dag_builder.blocks(round..=round);
1282        }
1283
1284        // Create interleaved block_refs that alternate between below-GC and above-GC
1285        let below_gc_rounds: Vec<Round> = (1..gc_round).collect();
1286        let above_gc_rounds: Vec<Round> = (gc_round..=rounds).collect();
1287        let validators = context.committee.size();
1288        let mut interleaved_refs = Vec::new();
1289        let max_pairs = std::cmp::min(below_gc_rounds.len(), above_gc_rounds.len());
1290        for i in 0..max_pairs {
1291            let below_round = below_gc_rounds[i];
1292            let auth_idx = i % validators;
1293            if auth_idx < blocks_by_round[below_round as usize].len() {
1294                interleaved_refs.push(blocks_by_round[below_round as usize][auth_idx].reference());
1295            }
1296            let above_round = above_gc_rounds[i];
1297            let auth_idx2 = (i + 1) % validators;
1298            if auth_idx2 < blocks_by_round[above_round as usize].len() {
1299                interleaved_refs.push(blocks_by_round[above_round as usize][auth_idx2].reference());
1300            }
1301        }
1302
1303        // Verify we have refs from both sides of the GC boundary
1304        assert!(
1305            interleaved_refs.iter().any(|r| r.round < gc_round),
1306            "Should have refs below GC round"
1307        );
1308        assert!(
1309            interleaved_refs.iter().any(|r| r.round >= gc_round),
1310            "Should have refs above GC round"
1311        );
1312
1313        // WHEN: call handle_fetch_blocks with empty highest_accepted_rounds (commit
1314        // sync path)
1315        let peer = context.committee.to_authority_index(1).unwrap();
1316        let returned_blocks = authority_service
1317            .handle_fetch_blocks(peer, interleaved_refs.clone(), vec![])
1318            .await
1319            .expect("Should return valid serialized blocks");
1320
1321        // THEN: each returned block should match the corresponding input ref
1322        assert_eq!(
1323            returned_blocks.len(),
1324            interleaved_refs.len(),
1325            "Should receive all requested blocks"
1326        );
1327        for (i, serialized_block) in returned_blocks.into_iter().enumerate() {
1328            let signed_block: SignedBlock =
1329                bcs::from_bytes(&serialized_block).expect("Error while deserialising block");
1330            let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
1331            assert_eq!(
1332                verified_block.reference(),
1333                interleaved_refs[i],
1334                "Block at index {i} should match requested ref. \
1335                 Expected {:?}, got {:?}",
1336                interleaved_refs[i],
1337                verified_block.reference()
1338            );
1339        }
1340    }
1341}