consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18    CommitIndex, Round,
19    block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20    block_verifier::BlockVerifier,
21    commit::{CommitAPI as _, CommitRange, TrustedCommit},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::Context,
24    core_thread::CoreThreadDispatcher,
25    dag_state::DagState,
26    error::{ConsensusError, ConsensusResult},
27    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28    stake_aggregator::{QuorumThreshold, StakeAggregator},
29    storage::Store,
30    synchronizer::SynchronizerHandle,
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35/// Authority's network service implementation, agnostic to the actual
36/// networking stack used.
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38    context: Arc<Context>,
39    commit_vote_monitor: Arc<CommitVoteMonitor>,
40    block_verifier: Arc<dyn BlockVerifier>,
41    synchronizer: Arc<SynchronizerHandle>,
42    core_dispatcher: Arc<C>,
43    rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44    subscription_counter: Arc<SubscriptionCounter>,
45    dag_state: Arc<RwLock<DagState>>,
46    store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50    pub(crate) fn new(
51        context: Arc<Context>,
52        block_verifier: Arc<dyn BlockVerifier>,
53        commit_vote_monitor: Arc<CommitVoteMonitor>,
54        synchronizer: Arc<SynchronizerHandle>,
55        core_dispatcher: Arc<C>,
56        rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57        dag_state: Arc<RwLock<DagState>>,
58        store: Arc<dyn Store>,
59    ) -> Self {
60        let subscription_counter = Arc::new(SubscriptionCounter::new(
61            context.clone(),
62            core_dispatcher.clone(),
63        ));
64        Self {
65            context,
66            block_verifier,
67            commit_vote_monitor,
68            synchronizer,
69            core_dispatcher,
70            rx_block_broadcaster,
71            subscription_counter,
72            dag_state,
73            store,
74        }
75    }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80    async fn handle_send_block(
81        &self,
82        peer: AuthorityIndex,
83        serialized_block: ExtendedSerializedBlock,
84    ) -> ConsensusResult<()> {
85        fail_point_async!("consensus-rpc-response");
86
87        let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89        // TODO: dedup block verifications, here and with fetched blocks.
90        let signed_block: SignedBlock =
91            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93        // Reject blocks not produced by the peer.
94        if peer != signed_block.author() {
95            self.context
96                .metrics
97                .node_metrics
98                .invalid_blocks
99                .with_label_values(&[
100                    peer_hostname.as_str(),
101                    "handle_send_block",
102                    "UnexpectedAuthority",
103                ])
104                .inc();
105            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106            info!("Block with wrong authority from {}: {}", peer, e);
107            return Err(e);
108        }
109        let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111        // Reject blocks failing validations.
112        if let Err(e) = self.block_verifier.verify(&signed_block) {
113            self.context
114                .metrics
115                .node_metrics
116                .invalid_blocks
117                .with_label_values(&[
118                    peer_hostname.as_str(),
119                    "handle_send_block",
120                    e.clone().name(),
121                ])
122                .inc();
123            info!("Invalid block from {}: {}", peer, e);
124            return Err(e);
125        }
126        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127        let block_ref = verified_block.reference();
128        debug!("Received block {} via send block.", block_ref);
129
130        // Reject block with timestamp too far in the future.
131        let now = self.context.clock.timestamp_utc_ms();
132        let forward_time_drift =
133            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
134        if forward_time_drift > self.context.parameters.max_forward_time_drift {
135            self.context
136                .metrics
137                .node_metrics
138                .rejected_future_blocks
139                .with_label_values(&[peer_hostname])
140                .inc();
141            debug!(
142                "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
143                block_ref,
144                verified_block.timestamp_ms(),
145                now,
146            );
147            return Err(ConsensusError::BlockRejected {
148                block_ref,
149                reason: format!(
150                    "Block timestamp is too far in the future: {} > {}",
151                    verified_block.timestamp_ms(),
152                    now
153                ),
154            });
155        }
156
157        // Wait until the block's timestamp is current.
158        if forward_time_drift > Duration::ZERO {
159            self.context
160                .metrics
161                .node_metrics
162                .block_timestamp_drift_wait_ms
163                .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
164                .inc_by(forward_time_drift.as_millis() as u64);
165            debug!(
166                "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
167                block_ref,
168                verified_block.timestamp_ms(),
169                now,
170                forward_time_drift.as_millis(),
171            );
172            sleep(forward_time_drift).await;
173        }
174
175        // Observe the block for the commit votes. When local commit is lagging too
176        // much, commit sync loop will trigger fetching.
177        self.commit_vote_monitor.observe_block(&verified_block);
178
179        // Reject blocks when local commit index is lagging too far from quorum commit
180        // index.
181        //
182        // IMPORTANT: this must be done after observing votes from the block, otherwise
183        // observed quorum commit will no longer progress.
184        //
185        // Since the main issue with too many suspended blocks is memory usage not CPU,
186        // it is ok to reject after block verifications instead of before.
187        let last_commit_index = self.dag_state.read().last_commit_index();
188        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
189        // The threshold to ignore block should be larger than commit_sync_batch_size,
190        // to avoid excessive block rejections and synchronizations.
191        if last_commit_index
192            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
193            < quorum_commit_index
194        {
195            self.context
196                .metrics
197                .node_metrics
198                .rejected_blocks
199                .with_label_values(&["commit_lagging"])
200                .inc();
201            debug!(
202                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
203                block_ref, last_commit_index, quorum_commit_index,
204            );
205            return Err(ConsensusError::BlockRejected {
206                block_ref,
207                reason: format!(
208                    "Last commit index is lagging quorum commit index too much ({} < {})",
209                    last_commit_index, quorum_commit_index,
210                ),
211            });
212        }
213
214        self.context
215            .metrics
216            .node_metrics
217            .verified_blocks
218            .with_label_values(&[peer_hostname])
219            .inc();
220
221        let missing_ancestors = self
222            .core_dispatcher
223            .add_blocks(vec![verified_block])
224            .await
225            .map_err(|_| ConsensusError::Shutdown)?;
226        if !missing_ancestors.is_empty() {
227            // schedule the fetching of them from this peer
228            if let Err(err) = self
229                .synchronizer
230                .fetch_blocks(missing_ancestors, peer)
231                .await
232            {
233                warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
234            }
235        }
236
237        // After processing the block, process the excluded ancestors
238
239        let mut excluded_ancestors = serialized_block
240            .excluded_ancestors
241            .into_iter()
242            .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
243            .collect::<Result<Vec<BlockRef>, bcs::Error>>()
244            .map_err(ConsensusError::MalformedBlock)?;
245
246        let excluded_ancestors_limit = self.context.committee.size() * 2;
247        if excluded_ancestors.len() > excluded_ancestors_limit {
248            debug!(
249                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
250                excluded_ancestors.len() - excluded_ancestors_limit,
251                peer,
252                peer_hostname,
253            );
254            excluded_ancestors.truncate(excluded_ancestors_limit);
255        }
256
257        self.context
258            .metrics
259            .node_metrics
260            .network_received_excluded_ancestors_from_authority
261            .with_label_values(&[peer_hostname])
262            .inc_by(excluded_ancestors.len() as u64);
263
264        for excluded_ancestor in &excluded_ancestors {
265            let excluded_ancestor_hostname = &self
266                .context
267                .committee
268                .authority(excluded_ancestor.author)
269                .hostname;
270            self.context
271                .metrics
272                .node_metrics
273                .network_excluded_ancestors_count_by_authority
274                .with_label_values(&[excluded_ancestor_hostname])
275                .inc();
276        }
277
278        let missing_excluded_ancestors = self
279            .core_dispatcher
280            .check_block_refs(excluded_ancestors)
281            .await
282            .map_err(|_| ConsensusError::Shutdown)?;
283
284        if !missing_excluded_ancestors.is_empty() {
285            self.context
286                .metrics
287                .node_metrics
288                .network_excluded_ancestors_sent_to_fetch
289                .with_label_values(&[peer_hostname])
290                .inc_by(missing_excluded_ancestors.len() as u64);
291
292            let synchronizer = self.synchronizer.clone();
293            tokio::spawn(async move {
294                // schedule the fetching of them from this peer in the background
295                if let Err(err) = synchronizer
296                    .fetch_blocks(missing_excluded_ancestors, peer)
297                    .await
298                {
299                    warn!(
300                        "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
301                    );
302                }
303            });
304        }
305
306        Ok(())
307    }
308
309    async fn handle_subscribe_blocks(
310        &self,
311        peer: AuthorityIndex,
312        last_received: Round,
313    ) -> ConsensusResult<BlockStream> {
314        fail_point_async!("consensus-rpc-response");
315
316        let dag_state = self.dag_state.read();
317        // Find recent own blocks that have not been received by the peer.
318        // If last_received is a valid and more blocks have been proposed since then,
319        // this call is guaranteed to return at least some recent blocks, which
320        // will help with liveness.
321        let missed_blocks = stream::iter(
322            dag_state
323                .get_cached_blocks(self.context.own_index, last_received + 1)
324                .into_iter()
325                .map(|block| ExtendedSerializedBlock {
326                    block: block.serialized().clone(),
327                    excluded_ancestors: vec![],
328                }),
329        );
330
331        let broadcasted_blocks = BroadcastedBlockStream::new(
332            peer,
333            self.rx_block_broadcaster.resubscribe(),
334            self.subscription_counter.clone(),
335        );
336
337        // Return a stream of blocks that first yields missed blocks as requested, then
338        // new blocks.
339        Ok(Box::pin(missed_blocks.chain(
340            broadcasted_blocks.map(ExtendedSerializedBlock::from),
341        )))
342    }
343
344    async fn handle_fetch_blocks(
345        &self,
346        peer: AuthorityIndex,
347        block_refs: Vec<BlockRef>,
348        highest_accepted_rounds: Vec<Round>,
349    ) -> ConsensusResult<Vec<Bytes>> {
350        fail_point_async!("consensus-rpc-response");
351
352        const MAX_ADDITIONAL_BLOCKS: usize = 10;
353        if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
354            return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
355        }
356
357        if !highest_accepted_rounds.is_empty()
358            && highest_accepted_rounds.len() != self.context.committee.size()
359        {
360            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
361                highest_accepted_rounds.len(),
362                self.context.committee.size(),
363            ));
364        }
365
366        // Some quick validation of the requested block refs
367        for block in &block_refs {
368            if !self.context.committee.is_valid_index(block.author) {
369                return Err(ConsensusError::InvalidAuthorityIndex {
370                    index: block.author,
371                    max: self.context.committee.size(),
372                });
373            }
374            if block.round == GENESIS_ROUND {
375                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
376            }
377        }
378
379        // For now ask dag state directly
380        let blocks = self.dag_state.read().get_blocks(&block_refs);
381
382        // Now check if an ancestor's round is higher than the one that the peer has. If
383        // yes, then serve that ancestor blocks up to `MAX_ADDITIONAL_BLOCKS`.
384        let mut ancestor_blocks = vec![];
385        if !highest_accepted_rounds.is_empty() {
386            let all_ancestors = blocks
387                .iter()
388                .flatten()
389                .flat_map(|block| block.ancestors().to_vec())
390                .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
391                .take(MAX_ADDITIONAL_BLOCKS)
392                .collect::<Vec<_>>();
393
394            if !all_ancestors.is_empty() {
395                ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
396            }
397        }
398
399        // Return the serialised blocks & the ancestor blocks
400        let result = blocks
401            .into_iter()
402            .chain(ancestor_blocks)
403            .flatten()
404            .map(|block| block.serialized().clone())
405            .collect::<Vec<_>>();
406
407        Ok(result)
408    }
409
410    async fn handle_fetch_commits(
411        &self,
412        _peer: AuthorityIndex,
413        commit_range: CommitRange,
414    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
415        fail_point_async!("consensus-rpc-response");
416
417        // Compute an inclusive end index and bound the maximum number of commits
418        // scanned.
419        let inclusive_end = commit_range.end().min(
420            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
421                - 1,
422        );
423        let mut commits = self
424            .store
425            .scan_commits((commit_range.start()..=inclusive_end).into())?;
426        let mut certifier_block_refs = vec![];
427        'commit: while let Some(c) = commits.last() {
428            let index = c.index();
429            let votes = self.store.read_commit_votes(index)?;
430            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
431            for v in &votes {
432                stake_aggregator.add(v.author, &self.context.committee);
433            }
434            if stake_aggregator.reached_threshold(&self.context.committee) {
435                certifier_block_refs = votes;
436                break 'commit;
437            } else {
438                debug!(
439                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
440                    index,
441                    stake_aggregator.stake(),
442                    stake_aggregator.threshold(&self.context.committee)
443                );
444                self.context
445                    .metrics
446                    .node_metrics
447                    .commit_sync_fetch_commits_handler_uncertified_skipped
448                    .inc();
449                commits.pop();
450            }
451        }
452        let certifier_blocks = self
453            .store
454            .read_blocks(&certifier_block_refs)?
455            .into_iter()
456            .flatten()
457            .collect();
458        Ok((commits, certifier_blocks))
459    }
460
461    async fn handle_fetch_latest_blocks(
462        &self,
463        peer: AuthorityIndex,
464        authorities: Vec<AuthorityIndex>,
465    ) -> ConsensusResult<Vec<Bytes>> {
466        fail_point_async!("consensus-rpc-response");
467
468        if authorities.len() > self.context.committee.size() {
469            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
470        }
471
472        // Ensure that those are valid authorities
473        for authority in &authorities {
474            if !self.context.committee.is_valid_index(*authority) {
475                return Err(ConsensusError::InvalidAuthorityIndex {
476                    index: *authority,
477                    max: self.context.committee.size(),
478                });
479            }
480        }
481
482        // Read from the dag state to find the latest blocks.
483        // TODO: at the moment we don't look into the block manager for suspended
484        // blocks. Ideally we want in the future if we think we would like to
485        // tackle the majority of cases.
486        let mut blocks = vec![];
487        let dag_state = self.dag_state.read();
488        for authority in authorities {
489            let block = dag_state.get_last_block_for_authority(authority);
490
491            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
492
493            // no reason to serve back the genesis block - it's equal as if it has not
494            // received any block
495            if block.round() != GENESIS_ROUND {
496                blocks.push(block);
497            }
498        }
499
500        // Return the serialised blocks
501        let result = blocks
502            .into_iter()
503            .map(|block| block.serialized().clone())
504            .collect::<Vec<_>>();
505
506        Ok(result)
507    }
508
509    async fn handle_get_latest_rounds(
510        &self,
511        _peer: AuthorityIndex,
512    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
513        fail_point_async!("consensus-rpc-response");
514
515        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
516
517        let blocks = self
518            .dag_state
519            .read()
520            .get_last_cached_block_per_authority(Round::MAX);
521        let highest_accepted_rounds = blocks
522            .into_iter()
523            .map(|(block, _)| block.round())
524            .collect::<Vec<_>>();
525
526        // Own blocks do not go through the core dispatcher, so they need to be set
527        // separately.
528        highest_received_rounds[self.context.own_index] =
529            highest_accepted_rounds[self.context.own_index];
530
531        Ok((highest_received_rounds, highest_accepted_rounds))
532    }
533}
534
535struct Counter {
536    count: usize,
537    subscriptions_by_authority: Vec<usize>,
538}
539
540/// Atomically counts the number of active subscriptions to the block broadcast
541/// stream, and dispatch commands to core based on the changes.
542struct SubscriptionCounter {
543    context: Arc<Context>,
544    counter: parking_lot::Mutex<Counter>,
545    dispatcher: Arc<dyn CoreThreadDispatcher>,
546}
547
548impl SubscriptionCounter {
549    fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
550        // Set the subscribed peers by default to 0
551        for (_, authority) in context.committee.authorities() {
552            context
553                .metrics
554                .node_metrics
555                .subscribed_by
556                .with_label_values(&[authority.hostname.as_str()])
557                .set(0);
558        }
559
560        Self {
561            counter: parking_lot::Mutex::new(Counter {
562                count: 0,
563                subscriptions_by_authority: vec![0; context.committee.size()],
564            }),
565            dispatcher,
566            context,
567        }
568    }
569
570    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
571        let mut counter = self.counter.lock();
572        counter.count += 1;
573        counter.subscriptions_by_authority[peer] += 1;
574
575        let peer_hostname = &self.context.committee.authority(peer).hostname;
576        self.context
577            .metrics
578            .node_metrics
579            .subscribed_by
580            .with_label_values(&[peer_hostname])
581            .set(1);
582
583        if counter.count == 1 {
584            self.dispatcher
585                .set_subscriber_exists(true)
586                .map_err(|_| ConsensusError::Shutdown)?;
587        }
588        Ok(())
589    }
590
591    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
592        let mut counter = self.counter.lock();
593        counter.count -= 1;
594        counter.subscriptions_by_authority[peer] -= 1;
595
596        if counter.subscriptions_by_authority[peer] == 0 {
597            let peer_hostname = &self.context.committee.authority(peer).hostname;
598            self.context
599                .metrics
600                .node_metrics
601                .subscribed_by
602                .with_label_values(&[peer_hostname])
603                .set(0);
604        }
605
606        if counter.count == 0 {
607            self.dispatcher
608                .set_subscriber_exists(false)
609                .map_err(|_| ConsensusError::Shutdown)?;
610        }
611        Ok(())
612    }
613}
614
615/// Each broadcasted block stream wraps a broadcast receiver for blocks.
616/// It yields blocks that are broadcasted after the stream is created.
617type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
618
619/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference
620/// is that this tolerates lags with only logging, without yielding errors.
621struct BroadcastStream<T> {
622    peer: AuthorityIndex,
623    // Stores the receiver across poll_next() calls.
624    inner: ReusableBoxFuture<
625        'static,
626        (
627            Result<T, broadcast::error::RecvError>,
628            broadcast::Receiver<T>,
629        ),
630    >,
631    // Counts total subscriptions / active BroadcastStreams.
632    subscription_counter: Arc<SubscriptionCounter>,
633}
634
635impl<T: 'static + Clone + Send> BroadcastStream<T> {
636    pub fn new(
637        peer: AuthorityIndex,
638        rx: broadcast::Receiver<T>,
639        subscription_counter: Arc<SubscriptionCounter>,
640    ) -> Self {
641        if let Err(err) = subscription_counter.increment(peer) {
642            match err {
643                ConsensusError::Shutdown => {}
644                _ => panic!("Unexpected error: {err}"),
645            }
646        }
647        Self {
648            peer,
649            inner: ReusableBoxFuture::new(make_recv_future(rx)),
650            subscription_counter,
651        }
652    }
653}
654
655impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
656    type Item = T;
657
658    fn poll_next(
659        mut self: Pin<&mut Self>,
660        cx: &mut task::Context<'_>,
661    ) -> task::Poll<Option<Self::Item>> {
662        let peer = self.peer;
663        let maybe_item = loop {
664            let (result, rx) = ready!(self.inner.poll(cx));
665            self.inner.set(make_recv_future(rx));
666
667            match result {
668                Ok(item) => break Some(item),
669                Err(broadcast::error::RecvError::Closed) => {
670                    info!("Block BroadcastedBlockStream {} closed", peer);
671                    break None;
672                }
673                Err(broadcast::error::RecvError::Lagged(n)) => {
674                    warn!(
675                        "Block BroadcastedBlockStream {} lagged by {} messages",
676                        peer, n
677                    );
678                    continue;
679                }
680            }
681        };
682        task::Poll::Ready(maybe_item)
683    }
684}
685
686impl<T> Drop for BroadcastStream<T> {
687    fn drop(&mut self) {
688        if let Err(err) = self.subscription_counter.decrement(self.peer) {
689            match err {
690                ConsensusError::Shutdown => {}
691                _ => panic!("Unexpected error: {err}"),
692            }
693        }
694    }
695}
696
697async fn make_recv_future<T: Clone>(
698    mut rx: broadcast::Receiver<T>,
699) -> (
700    Result<T, broadcast::error::RecvError>,
701    broadcast::Receiver<T>,
702) {
703    let result = rx.recv().await;
704    (result, rx)
705}
706
707// TODO: add a unit test for BroadcastStream.
708
709#[cfg(test)]
710mod tests {
711    use std::{collections::BTreeSet, sync::Arc, time::Duration};
712
713    use async_trait::async_trait;
714    use bytes::Bytes;
715    use consensus_config::AuthorityIndex;
716    use parking_lot::{Mutex, RwLock};
717    use tokio::{sync::broadcast, time::sleep};
718
719    use crate::{
720        Round,
721        authority_service::AuthorityService,
722        block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
723        commit::{CertifiedCommits, CommitRange},
724        commit_vote_monitor::CommitVoteMonitor,
725        context::Context,
726        core_thread::{CoreError, CoreThreadDispatcher},
727        dag_state::DagState,
728        error::ConsensusResult,
729        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
730        round_prober::QuorumRound,
731        storage::mem_store::MemStore,
732        synchronizer::Synchronizer,
733        test_dag_builder::DagBuilder,
734    };
735
736    struct FakeCoreThreadDispatcher {
737        blocks: Mutex<Vec<VerifiedBlock>>,
738    }
739
740    impl FakeCoreThreadDispatcher {
741        fn new() -> Self {
742            Self {
743                blocks: Mutex::new(vec![]),
744            }
745        }
746
747        fn get_blocks(&self) -> Vec<VerifiedBlock> {
748            self.blocks.lock().clone()
749        }
750    }
751
752    #[async_trait]
753    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
754        async fn add_blocks(
755            &self,
756            blocks: Vec<VerifiedBlock>,
757        ) -> Result<BTreeSet<BlockRef>, CoreError> {
758            let block_refs = blocks.iter().map(|b| b.reference()).collect();
759            self.blocks.lock().extend(blocks);
760            Ok(block_refs)
761        }
762
763        async fn check_block_refs(
764            &self,
765            _block_refs: Vec<BlockRef>,
766        ) -> Result<BTreeSet<BlockRef>, CoreError> {
767            Ok(BTreeSet::new())
768        }
769
770        async fn add_certified_commits(
771            &self,
772            _commits: CertifiedCommits,
773        ) -> Result<BTreeSet<BlockRef>, CoreError> {
774            todo!()
775        }
776
777        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
778            Ok(())
779        }
780
781        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
782            Ok(Default::default())
783        }
784
785        fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
786            todo!()
787        }
788
789        fn set_propagation_delay_and_quorum_rounds(
790            &self,
791            _delay: Round,
792            _received_quorum_rounds: Vec<QuorumRound>,
793            _accepted_quorum_rounds: Vec<QuorumRound>,
794        ) -> Result<(), CoreError> {
795            todo!()
796        }
797
798        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
799            todo!()
800        }
801
802        fn highest_received_rounds(&self) -> Vec<Round> {
803            todo!()
804        }
805    }
806
807    #[derive(Default)]
808    struct FakeNetworkClient {}
809
810    #[async_trait]
811    impl NetworkClient for FakeNetworkClient {
812        const SUPPORT_STREAMING: bool = false;
813
814        async fn send_block(
815            &self,
816            _peer: AuthorityIndex,
817            _block: &VerifiedBlock,
818            _timeout: Duration,
819        ) -> ConsensusResult<()> {
820            unimplemented!("Unimplemented")
821        }
822
823        async fn subscribe_blocks(
824            &self,
825            _peer: AuthorityIndex,
826            _last_received: Round,
827            _timeout: Duration,
828        ) -> ConsensusResult<BlockStream> {
829            unimplemented!("Unimplemented")
830        }
831
832        async fn fetch_blocks(
833            &self,
834            _peer: AuthorityIndex,
835            _block_refs: Vec<BlockRef>,
836            _highest_accepted_rounds: Vec<Round>,
837            _timeout: Duration,
838        ) -> ConsensusResult<Vec<Bytes>> {
839            unimplemented!("Unimplemented")
840        }
841
842        async fn fetch_commits(
843            &self,
844            _peer: AuthorityIndex,
845            _commit_range: CommitRange,
846            _timeout: Duration,
847        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
848            unimplemented!("Unimplemented")
849        }
850
851        async fn fetch_latest_blocks(
852            &self,
853            _peer: AuthorityIndex,
854            _authorities: Vec<AuthorityIndex>,
855            _timeout: Duration,
856        ) -> ConsensusResult<Vec<Bytes>> {
857            unimplemented!("Unimplemented")
858        }
859
860        async fn get_latest_rounds(
861            &self,
862            _peer: AuthorityIndex,
863            _timeout: Duration,
864        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
865            unimplemented!("Unimplemented")
866        }
867    }
868
869    #[tokio::test(flavor = "current_thread", start_paused = true)]
870    async fn test_handle_send_block() {
871        let (context, _keys) = Context::new_for_test(4);
872        let context = Arc::new(context);
873        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
874        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
875        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
876        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
877        let network_client = Arc::new(FakeNetworkClient::default());
878        let store = Arc::new(MemStore::new());
879        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
880        let synchronizer = Synchronizer::start(
881            network_client,
882            context.clone(),
883            core_dispatcher.clone(),
884            commit_vote_monitor.clone(),
885            block_verifier.clone(),
886            dag_state.clone(),
887            false,
888        );
889        let authority_service = Arc::new(AuthorityService::new(
890            context.clone(),
891            block_verifier,
892            commit_vote_monitor,
893            synchronizer,
894            core_dispatcher.clone(),
895            rx_block_broadcast,
896            dag_state,
897            store,
898        ));
899
900        // Test delaying blocks with time drift.
901        let now = context.clock.timestamp_utc_ms();
902        let max_drift = context.parameters.max_forward_time_drift;
903        let input_block = VerifiedBlock::new_for_test(
904            TestBlock::new(9, 0)
905                .set_timestamp_ms(now + max_drift.as_millis() as u64)
906                .build(),
907        );
908
909        let service = authority_service.clone();
910        let serialized = ExtendedSerializedBlock {
911            block: input_block.serialized().clone(),
912            excluded_ancestors: vec![],
913        };
914
915        tokio::spawn(async move {
916            service
917                .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
918                .await
919                .unwrap();
920        });
921
922        sleep(max_drift / 2).await;
923        assert!(core_dispatcher.get_blocks().is_empty());
924
925        sleep(max_drift).await;
926        let blocks = core_dispatcher.get_blocks();
927        assert_eq!(blocks.len(), 1);
928        assert_eq!(blocks[0], input_block);
929    }
930
931    #[tokio::test(flavor = "current_thread", start_paused = true)]
932    async fn test_handle_fetch_latest_blocks() {
933        // GIVEN
934        let (context, _keys) = Context::new_for_test(4);
935        let context = Arc::new(context);
936        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
937        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
938        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
939        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
940        let network_client = Arc::new(FakeNetworkClient::default());
941        let store = Arc::new(MemStore::new());
942        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
943        let synchronizer = Synchronizer::start(
944            network_client,
945            context.clone(),
946            core_dispatcher.clone(),
947            commit_vote_monitor.clone(),
948            block_verifier.clone(),
949            dag_state.clone(),
950            true,
951        );
952        let authority_service = Arc::new(AuthorityService::new(
953            context.clone(),
954            block_verifier,
955            commit_vote_monitor,
956            synchronizer,
957            core_dispatcher.clone(),
958            rx_block_broadcast,
959            dag_state.clone(),
960            store,
961        ));
962
963        // Create some blocks for a few authorities. Create some equivocations as well
964        // and store in dag state.
965        let mut dag_builder = DagBuilder::new(context.clone());
966        dag_builder
967            .layers(1..=10)
968            .authorities(vec![AuthorityIndex::new_for_test(2)])
969            .equivocate(1)
970            .build()
971            .persist_layers(dag_state);
972
973        // WHEN
974        let authorities_to_request = vec![
975            AuthorityIndex::new_for_test(1),
976            AuthorityIndex::new_for_test(2),
977        ];
978        let results = authority_service
979            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
980            .await;
981
982        // THEN
983        let serialised_blocks = results.unwrap();
984        for serialised_block in serialised_blocks {
985            let signed_block: SignedBlock =
986                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
987            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
988
989            assert_eq!(verified_block.round(), 10);
990        }
991    }
992}