1use std::{pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18 CommitIndex, Round,
19 block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 commit::{CommitAPI as _, CommitRange, TrustedCommit},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::Context,
24 core_thread::CoreThreadDispatcher,
25 dag_state::DagState,
26 error::{ConsensusError, ConsensusResult},
27 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28 stake_aggregator::{QuorumThreshold, StakeAggregator},
29 storage::Store,
30 synchronizer::SynchronizerHandle,
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38 context: Arc<Context>,
39 commit_vote_monitor: Arc<CommitVoteMonitor>,
40 block_verifier: Arc<dyn BlockVerifier>,
41 synchronizer: Arc<SynchronizerHandle>,
42 core_dispatcher: Arc<C>,
43 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44 subscription_counter: Arc<SubscriptionCounter>,
45 dag_state: Arc<RwLock<DagState>>,
46 store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50 pub(crate) fn new(
51 context: Arc<Context>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 commit_vote_monitor: Arc<CommitVoteMonitor>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57 dag_state: Arc<RwLock<DagState>>,
58 store: Arc<dyn Store>,
59 ) -> Self {
60 let subscription_counter = Arc::new(SubscriptionCounter::new(
61 context.clone(),
62 core_dispatcher.clone(),
63 ));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcaster,
71 subscription_counter,
72 dag_state,
73 store,
74 }
75 }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80 async fn handle_send_block(
81 &self,
82 peer: AuthorityIndex,
83 serialized_block: ExtendedSerializedBlock,
84 ) -> ConsensusResult<()> {
85 fail_point_async!("consensus-rpc-response");
86
87 let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89 let signed_block: SignedBlock =
91 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93 if peer != signed_block.author() {
95 self.context
96 .metrics
97 .node_metrics
98 .invalid_blocks
99 .with_label_values(&[
100 peer_hostname.as_str(),
101 "handle_send_block",
102 "UnexpectedAuthority",
103 ])
104 .inc();
105 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106 info!("Block with wrong authority from {}: {}", peer, e);
107 return Err(e);
108 }
109 let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111 if let Err(e) = self.block_verifier.verify(&signed_block) {
113 self.context
114 .metrics
115 .node_metrics
116 .invalid_blocks
117 .with_label_values(&[
118 peer_hostname.as_str(),
119 "handle_send_block",
120 e.clone().name(),
121 ])
122 .inc();
123 info!("Invalid block from {}: {}", peer, e);
124 return Err(e);
125 }
126 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127 let block_ref = verified_block.reference();
128 debug!("Received block {} via send block.", block_ref);
129
130 let now = self.context.clock.timestamp_utc_ms();
132 let forward_time_drift =
133 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
134 if forward_time_drift > self.context.parameters.max_forward_time_drift {
135 self.context
136 .metrics
137 .node_metrics
138 .rejected_future_blocks
139 .with_label_values(&[peer_hostname])
140 .inc();
141 debug!(
142 "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
143 block_ref,
144 verified_block.timestamp_ms(),
145 now,
146 );
147 return Err(ConsensusError::BlockRejected {
148 block_ref,
149 reason: format!(
150 "Block timestamp is too far in the future: {} > {}",
151 verified_block.timestamp_ms(),
152 now
153 ),
154 });
155 }
156
157 if forward_time_drift > Duration::ZERO {
159 self.context
160 .metrics
161 .node_metrics
162 .block_timestamp_drift_wait_ms
163 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
164 .inc_by(forward_time_drift.as_millis() as u64);
165 debug!(
166 "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
167 block_ref,
168 verified_block.timestamp_ms(),
169 now,
170 forward_time_drift.as_millis(),
171 );
172 sleep(forward_time_drift).await;
173 }
174
175 self.commit_vote_monitor.observe_block(&verified_block);
178
179 let last_commit_index = self.dag_state.read().last_commit_index();
188 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
189 if last_commit_index
192 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
193 < quorum_commit_index
194 {
195 self.context
196 .metrics
197 .node_metrics
198 .rejected_blocks
199 .with_label_values(&["commit_lagging"])
200 .inc();
201 debug!(
202 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
203 block_ref, last_commit_index, quorum_commit_index,
204 );
205 return Err(ConsensusError::BlockRejected {
206 block_ref,
207 reason: format!(
208 "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
209 ),
210 });
211 }
212
213 self.context
214 .metrics
215 .node_metrics
216 .verified_blocks
217 .with_label_values(&[peer_hostname])
218 .inc();
219
220 let missing_ancestors = self
221 .core_dispatcher
222 .add_blocks(vec![verified_block])
223 .await
224 .map_err(|_| ConsensusError::Shutdown)?;
225 if !missing_ancestors.is_empty() {
226 if let Err(err) = self
228 .synchronizer
229 .fetch_blocks(missing_ancestors, peer)
230 .await
231 {
232 warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
233 }
234 }
235
236 let mut excluded_ancestors = serialized_block
239 .excluded_ancestors
240 .into_iter()
241 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
242 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
243 .map_err(ConsensusError::MalformedBlock)?;
244
245 let excluded_ancestors_limit = self.context.committee.size() * 2;
246 if excluded_ancestors.len() > excluded_ancestors_limit {
247 debug!(
248 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
249 excluded_ancestors.len() - excluded_ancestors_limit,
250 peer,
251 peer_hostname,
252 );
253 excluded_ancestors.truncate(excluded_ancestors_limit);
254 }
255
256 self.context
257 .metrics
258 .node_metrics
259 .network_received_excluded_ancestors_from_authority
260 .with_label_values(&[peer_hostname])
261 .inc_by(excluded_ancestors.len() as u64);
262
263 for excluded_ancestor in &excluded_ancestors {
264 let excluded_ancestor_hostname = &self
265 .context
266 .committee
267 .authority(excluded_ancestor.author)
268 .hostname;
269 self.context
270 .metrics
271 .node_metrics
272 .network_excluded_ancestors_count_by_authority
273 .with_label_values(&[excluded_ancestor_hostname])
274 .inc();
275 }
276
277 let missing_excluded_ancestors = self
278 .core_dispatcher
279 .check_block_refs(excluded_ancestors)
280 .await
281 .map_err(|_| ConsensusError::Shutdown)?;
282
283 if !missing_excluded_ancestors.is_empty() {
284 self.context
285 .metrics
286 .node_metrics
287 .network_excluded_ancestors_sent_to_fetch
288 .with_label_values(&[peer_hostname])
289 .inc_by(missing_excluded_ancestors.len() as u64);
290
291 let synchronizer = self.synchronizer.clone();
292 tokio::spawn(async move {
293 if let Err(err) = synchronizer
295 .fetch_blocks(missing_excluded_ancestors, peer)
296 .await
297 {
298 warn!(
299 "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
300 );
301 }
302 });
303 }
304
305 Ok(())
306 }
307
308 async fn handle_subscribe_blocks(
309 &self,
310 peer: AuthorityIndex,
311 last_received: Round,
312 ) -> ConsensusResult<BlockStream> {
313 fail_point_async!("consensus-rpc-response");
314
315 let dag_state = self.dag_state.read();
316 let missed_blocks = stream::iter(
321 dag_state
322 .get_cached_blocks(self.context.own_index, last_received + 1)
323 .into_iter()
324 .map(|block| ExtendedSerializedBlock {
325 block: block.serialized().clone(),
326 excluded_ancestors: vec![],
327 }),
328 );
329
330 let broadcasted_blocks = BroadcastedBlockStream::new(
331 peer,
332 self.rx_block_broadcaster.resubscribe(),
333 self.subscription_counter.clone(),
334 );
335
336 Ok(Box::pin(missed_blocks.chain(
339 broadcasted_blocks.map(ExtendedSerializedBlock::from),
340 )))
341 }
342
343 async fn handle_fetch_blocks(
344 &self,
345 peer: AuthorityIndex,
346 block_refs: Vec<BlockRef>,
347 highest_accepted_rounds: Vec<Round>,
348 ) -> ConsensusResult<Vec<Bytes>> {
349 fail_point_async!("consensus-rpc-response");
350
351 const MAX_ADDITIONAL_BLOCKS: usize = 10;
352 if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
353 return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
354 }
355
356 if !highest_accepted_rounds.is_empty()
357 && highest_accepted_rounds.len() != self.context.committee.size()
358 {
359 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
360 highest_accepted_rounds.len(),
361 self.context.committee.size(),
362 ));
363 }
364
365 for block in &block_refs {
367 if !self.context.committee.is_valid_index(block.author) {
368 return Err(ConsensusError::InvalidAuthorityIndex {
369 index: block.author,
370 max: self.context.committee.size(),
371 });
372 }
373 if block.round == GENESIS_ROUND {
374 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
375 }
376 }
377
378 let blocks = self.dag_state.read().get_blocks(&block_refs);
380
381 let mut ancestor_blocks = vec![];
384 if !highest_accepted_rounds.is_empty() {
385 let all_ancestors = blocks
386 .iter()
387 .flatten()
388 .flat_map(|block| block.ancestors().to_vec())
389 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
390 .take(MAX_ADDITIONAL_BLOCKS)
391 .collect::<Vec<_>>();
392
393 if !all_ancestors.is_empty() {
394 ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
395 }
396 }
397
398 let result = blocks
400 .into_iter()
401 .chain(ancestor_blocks)
402 .flatten()
403 .map(|block| block.serialized().clone())
404 .collect::<Vec<_>>();
405
406 Ok(result)
407 }
408
409 async fn handle_fetch_commits(
410 &self,
411 _peer: AuthorityIndex,
412 commit_range: CommitRange,
413 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
414 fail_point_async!("consensus-rpc-response");
415
416 let inclusive_end = commit_range.end().min(
419 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
420 - 1,
421 );
422 let mut commits = self
423 .store
424 .scan_commits((commit_range.start()..=inclusive_end).into())?;
425 let mut certifier_block_refs = vec![];
426 'commit: while let Some(c) = commits.last() {
427 let index = c.index();
428 let votes = self.store.read_commit_votes(index)?;
429 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
430 for v in &votes {
431 stake_aggregator.add(v.author, &self.context.committee);
432 }
433 if stake_aggregator.reached_threshold(&self.context.committee) {
434 certifier_block_refs = votes;
435 break 'commit;
436 } else {
437 debug!(
438 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
439 index,
440 stake_aggregator.stake(),
441 stake_aggregator.threshold(&self.context.committee)
442 );
443 self.context
444 .metrics
445 .node_metrics
446 .commit_sync_fetch_commits_handler_uncertified_skipped
447 .inc();
448 commits.pop();
449 }
450 }
451 let certifier_blocks = self
452 .store
453 .read_blocks(&certifier_block_refs)?
454 .into_iter()
455 .flatten()
456 .collect();
457 Ok((commits, certifier_blocks))
458 }
459
460 async fn handle_fetch_latest_blocks(
461 &self,
462 peer: AuthorityIndex,
463 authorities: Vec<AuthorityIndex>,
464 ) -> ConsensusResult<Vec<Bytes>> {
465 fail_point_async!("consensus-rpc-response");
466
467 if authorities.len() > self.context.committee.size() {
468 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
469 }
470
471 for authority in &authorities {
473 if !self.context.committee.is_valid_index(*authority) {
474 return Err(ConsensusError::InvalidAuthorityIndex {
475 index: *authority,
476 max: self.context.committee.size(),
477 });
478 }
479 }
480
481 let mut blocks = vec![];
486 let dag_state = self.dag_state.read();
487 for authority in authorities {
488 let block = dag_state.get_last_block_for_authority(authority);
489
490 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
491
492 if block.round() != GENESIS_ROUND {
495 blocks.push(block);
496 }
497 }
498
499 let result = blocks
501 .into_iter()
502 .map(|block| block.serialized().clone())
503 .collect::<Vec<_>>();
504
505 Ok(result)
506 }
507
508 async fn handle_get_latest_rounds(
509 &self,
510 _peer: AuthorityIndex,
511 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
512 fail_point_async!("consensus-rpc-response");
513
514 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
515
516 let blocks = self
517 .dag_state
518 .read()
519 .get_last_cached_block_per_authority(Round::MAX);
520 let highest_accepted_rounds = blocks
521 .into_iter()
522 .map(|(block, _)| block.round())
523 .collect::<Vec<_>>();
524
525 highest_received_rounds[self.context.own_index] =
528 highest_accepted_rounds[self.context.own_index];
529
530 Ok((highest_received_rounds, highest_accepted_rounds))
531 }
532}
533
534struct Counter {
535 count: usize,
536 subscriptions_by_authority: Vec<usize>,
537}
538
539struct SubscriptionCounter {
542 context: Arc<Context>,
543 counter: parking_lot::Mutex<Counter>,
544 dispatcher: Arc<dyn CoreThreadDispatcher>,
545}
546
547impl SubscriptionCounter {
548 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
549 for (_, authority) in context.committee.authorities() {
551 context
552 .metrics
553 .node_metrics
554 .subscribed_by
555 .with_label_values(&[authority.hostname.as_str()])
556 .set(0);
557 }
558
559 Self {
560 counter: parking_lot::Mutex::new(Counter {
561 count: 0,
562 subscriptions_by_authority: vec![0; context.committee.size()],
563 }),
564 dispatcher,
565 context,
566 }
567 }
568
569 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
570 let mut counter = self.counter.lock();
571 counter.count += 1;
572 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
573 counter.subscriptions_by_authority[peer] += 1;
574 let mut total_stake = 0;
575 for (authority_index, _) in self.context.committee.authorities() {
576 if counter.subscriptions_by_authority[authority_index] >= 1
577 || self.context.own_index == authority_index
578 {
579 total_stake += self.context.committee.stake(authority_index);
580 }
581 }
582 let previous_stake = if original_subscription_by_peer == 0 {
584 total_stake - self.context.committee.stake(peer)
585 } else {
586 total_stake
587 };
588
589 let peer_hostname = &self.context.committee.authority(peer).hostname;
590 self.context
591 .metrics
592 .node_metrics
593 .subscribed_by
594 .with_label_values(&[peer_hostname])
595 .set(1);
596 if !self.context.committee.reached_quorum(previous_stake)
599 && self.context.committee.reached_quorum(total_stake)
600 {
601 self.dispatcher
602 .set_quorum_subscribers_exists(true)
603 .map_err(|_| ConsensusError::Shutdown)?;
604 }
605 drop(counter);
607 Ok(())
608 }
609
610 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
611 let mut counter = self.counter.lock();
612 counter.count -= 1;
613 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
614 counter.subscriptions_by_authority[peer] -= 1;
615 let mut total_stake = 0;
616 for (authority_index, _) in self.context.committee.authorities() {
617 if counter.subscriptions_by_authority[authority_index] >= 1
618 || self.context.own_index == authority_index
619 {
620 total_stake += self.context.committee.stake(authority_index);
621 }
622 }
623 let previous_stake = if original_subscription_by_peer == 1 {
625 total_stake + self.context.committee.stake(peer)
626 } else {
627 total_stake
628 };
629
630 if counter.subscriptions_by_authority[peer] == 0 {
631 let peer_hostname = &self.context.committee.authority(peer).hostname;
632 self.context
633 .metrics
634 .node_metrics
635 .subscribed_by
636 .with_label_values(&[peer_hostname])
637 .set(0);
638 }
639
640 if self.context.committee.reached_quorum(previous_stake)
643 && !self.context.committee.reached_quorum(total_stake)
644 {
645 self.dispatcher
646 .set_quorum_subscribers_exists(false)
647 .map_err(|_| ConsensusError::Shutdown)?;
648 }
649 drop(counter);
651 Ok(())
652 }
653}
654
655type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
658
659struct BroadcastStream<T> {
662 peer: AuthorityIndex,
663 inner: ReusableBoxFuture<
665 'static,
666 (
667 Result<T, broadcast::error::RecvError>,
668 broadcast::Receiver<T>,
669 ),
670 >,
671 subscription_counter: Arc<SubscriptionCounter>,
673}
674
675impl<T: 'static + Clone + Send> BroadcastStream<T> {
676 pub fn new(
677 peer: AuthorityIndex,
678 rx: broadcast::Receiver<T>,
679 subscription_counter: Arc<SubscriptionCounter>,
680 ) -> Self {
681 if let Err(err) = subscription_counter.increment(peer) {
682 match err {
683 ConsensusError::Shutdown => {}
684 _ => panic!("Unexpected error: {err}"),
685 }
686 }
687 Self {
688 peer,
689 inner: ReusableBoxFuture::new(make_recv_future(rx)),
690 subscription_counter,
691 }
692 }
693}
694
695impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
696 type Item = T;
697
698 fn poll_next(
699 mut self: Pin<&mut Self>,
700 cx: &mut task::Context<'_>,
701 ) -> task::Poll<Option<Self::Item>> {
702 let peer = self.peer;
703 let maybe_item = loop {
704 let (result, rx) = ready!(self.inner.poll(cx));
705 self.inner.set(make_recv_future(rx));
706
707 match result {
708 Ok(item) => break Some(item),
709 Err(broadcast::error::RecvError::Closed) => {
710 info!("Block BroadcastedBlockStream {} closed", peer);
711 break None;
712 }
713 Err(broadcast::error::RecvError::Lagged(n)) => {
714 warn!(
715 "Block BroadcastedBlockStream {} lagged by {} messages",
716 peer, n
717 );
718 continue;
719 }
720 }
721 };
722 task::Poll::Ready(maybe_item)
723 }
724}
725
726impl<T> Drop for BroadcastStream<T> {
727 fn drop(&mut self) {
728 if let Err(err) = self.subscription_counter.decrement(self.peer) {
729 match err {
730 ConsensusError::Shutdown => {}
731 _ => panic!("Unexpected error: {err}"),
732 }
733 }
734 }
735}
736
737async fn make_recv_future<T: Clone>(
738 mut rx: broadcast::Receiver<T>,
739) -> (
740 Result<T, broadcast::error::RecvError>,
741 broadcast::Receiver<T>,
742) {
743 let result = rx.recv().await;
744 (result, rx)
745}
746
747#[cfg(test)]
750mod tests {
751 use std::{
752 collections::{BTreeMap, BTreeSet},
753 sync::Arc,
754 time::Duration,
755 };
756
757 use async_trait::async_trait;
758 use bytes::Bytes;
759 use consensus_config::AuthorityIndex;
760 use parking_lot::{Mutex, RwLock};
761 use tokio::{sync::broadcast, time::sleep};
762
763 use crate::{
764 Round,
765 authority_service::AuthorityService,
766 block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
767 commit::{CertifiedCommits, CommitRange},
768 commit_vote_monitor::CommitVoteMonitor,
769 context::Context,
770 core_thread::{CoreError, CoreThreadDispatcher},
771 dag_state::DagState,
772 error::ConsensusResult,
773 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
774 round_prober::QuorumRound,
775 storage::mem_store::MemStore,
776 synchronizer::Synchronizer,
777 test_dag_builder::DagBuilder,
778 };
779
780 struct FakeCoreThreadDispatcher {
781 blocks: Mutex<Vec<VerifiedBlock>>,
782 }
783
784 impl FakeCoreThreadDispatcher {
785 fn new() -> Self {
786 Self {
787 blocks: Mutex::new(vec![]),
788 }
789 }
790
791 fn get_blocks(&self) -> Vec<VerifiedBlock> {
792 self.blocks.lock().clone()
793 }
794 }
795
796 #[async_trait]
797 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
798 async fn add_blocks(
799 &self,
800 blocks: Vec<VerifiedBlock>,
801 ) -> Result<BTreeSet<BlockRef>, CoreError> {
802 let block_refs = blocks.iter().map(|b| b.reference()).collect();
803 self.blocks.lock().extend(blocks);
804 Ok(block_refs)
805 }
806
807 async fn check_block_refs(
808 &self,
809 _block_refs: Vec<BlockRef>,
810 ) -> Result<BTreeSet<BlockRef>, CoreError> {
811 Ok(BTreeSet::new())
812 }
813
814 async fn add_certified_commits(
815 &self,
816 _commits: CertifiedCommits,
817 ) -> Result<BTreeSet<BlockRef>, CoreError> {
818 todo!()
819 }
820
821 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
822 Ok(())
823 }
824
825 async fn get_missing_blocks(
826 &self,
827 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
828 Ok(Default::default())
829 }
830
831 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
832 todo!()
833 }
834
835 fn set_propagation_delay_and_quorum_rounds(
836 &self,
837 _delay: Round,
838 _received_quorum_rounds: Vec<QuorumRound>,
839 _accepted_quorum_rounds: Vec<QuorumRound>,
840 ) -> Result<(), CoreError> {
841 todo!()
842 }
843
844 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
845 todo!()
846 }
847
848 fn highest_received_rounds(&self) -> Vec<Round> {
849 todo!()
850 }
851 }
852
853 #[derive(Default)]
854 struct FakeNetworkClient {}
855
856 #[async_trait]
857 impl NetworkClient for FakeNetworkClient {
858 const SUPPORT_STREAMING: bool = false;
859
860 async fn send_block(
861 &self,
862 _peer: AuthorityIndex,
863 _block: &VerifiedBlock,
864 _timeout: Duration,
865 ) -> ConsensusResult<()> {
866 unimplemented!("Unimplemented")
867 }
868
869 async fn subscribe_blocks(
870 &self,
871 _peer: AuthorityIndex,
872 _last_received: Round,
873 _timeout: Duration,
874 ) -> ConsensusResult<BlockStream> {
875 unimplemented!("Unimplemented")
876 }
877
878 async fn fetch_blocks(
879 &self,
880 _peer: AuthorityIndex,
881 _block_refs: Vec<BlockRef>,
882 _highest_accepted_rounds: Vec<Round>,
883 _timeout: Duration,
884 ) -> ConsensusResult<Vec<Bytes>> {
885 unimplemented!("Unimplemented")
886 }
887
888 async fn fetch_commits(
889 &self,
890 _peer: AuthorityIndex,
891 _commit_range: CommitRange,
892 _timeout: Duration,
893 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
894 unimplemented!("Unimplemented")
895 }
896
897 async fn fetch_latest_blocks(
898 &self,
899 _peer: AuthorityIndex,
900 _authorities: Vec<AuthorityIndex>,
901 _timeout: Duration,
902 ) -> ConsensusResult<Vec<Bytes>> {
903 unimplemented!("Unimplemented")
904 }
905
906 async fn get_latest_rounds(
907 &self,
908 _peer: AuthorityIndex,
909 _timeout: Duration,
910 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
911 unimplemented!("Unimplemented")
912 }
913 }
914
915 #[tokio::test(flavor = "current_thread", start_paused = true)]
916 async fn test_handle_send_block() {
917 let (context, _keys) = Context::new_for_test(4);
918 let context = Arc::new(context);
919 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
920 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
921 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
922 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
923 let network_client = Arc::new(FakeNetworkClient::default());
924 let store = Arc::new(MemStore::new());
925 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
926 let synchronizer = Synchronizer::start(
927 network_client,
928 context.clone(),
929 core_dispatcher.clone(),
930 commit_vote_monitor.clone(),
931 block_verifier.clone(),
932 dag_state.clone(),
933 false,
934 );
935 let authority_service = Arc::new(AuthorityService::new(
936 context.clone(),
937 block_verifier,
938 commit_vote_monitor,
939 synchronizer,
940 core_dispatcher.clone(),
941 rx_block_broadcast,
942 dag_state,
943 store,
944 ));
945
946 let now = context.clock.timestamp_utc_ms();
948 let max_drift = context.parameters.max_forward_time_drift;
949 let input_block = VerifiedBlock::new_for_test(
950 TestBlock::new(9, 0)
951 .set_timestamp_ms(now + max_drift.as_millis() as u64)
952 .build(),
953 );
954
955 let service = authority_service.clone();
956 let serialized = ExtendedSerializedBlock {
957 block: input_block.serialized().clone(),
958 excluded_ancestors: vec![],
959 };
960
961 tokio::spawn(async move {
962 service
963 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
964 .await
965 .unwrap();
966 });
967
968 sleep(max_drift / 2).await;
969 assert!(core_dispatcher.get_blocks().is_empty());
970
971 sleep(max_drift).await;
972 let blocks = core_dispatcher.get_blocks();
973 assert_eq!(blocks.len(), 1);
974 assert_eq!(blocks[0], input_block);
975 }
976
977 #[tokio::test(flavor = "current_thread", start_paused = true)]
978 async fn test_handle_fetch_latest_blocks() {
979 let (context, _keys) = Context::new_for_test(4);
981 let context = Arc::new(context);
982 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
983 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
984 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
985 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
986 let network_client = Arc::new(FakeNetworkClient::default());
987 let store = Arc::new(MemStore::new());
988 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
989 let synchronizer = Synchronizer::start(
990 network_client,
991 context.clone(),
992 core_dispatcher.clone(),
993 commit_vote_monitor.clone(),
994 block_verifier.clone(),
995 dag_state.clone(),
996 true,
997 );
998 let authority_service = Arc::new(AuthorityService::new(
999 context.clone(),
1000 block_verifier,
1001 commit_vote_monitor,
1002 synchronizer,
1003 core_dispatcher.clone(),
1004 rx_block_broadcast,
1005 dag_state.clone(),
1006 store,
1007 ));
1008
1009 let mut dag_builder = DagBuilder::new(context.clone());
1012 dag_builder
1013 .layers(1..=10)
1014 .authorities(vec![AuthorityIndex::new_for_test(2)])
1015 .equivocate(1)
1016 .build()
1017 .persist_layers(dag_state);
1018
1019 let authorities_to_request = vec![
1021 AuthorityIndex::new_for_test(1),
1022 AuthorityIndex::new_for_test(2),
1023 ];
1024 let results = authority_service
1025 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1026 .await;
1027
1028 let serialised_blocks = results.unwrap();
1030 for serialised_block in serialised_blocks {
1031 let signed_block: SignedBlock =
1032 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1033 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1034
1035 assert_eq!(verified_block.round(), 10);
1036 }
1037 }
1038}