1use std::{pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18 CommitIndex, Round,
19 block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 commit::{CommitAPI as _, CommitRange, TrustedCommit},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::Context,
24 core_thread::CoreThreadDispatcher,
25 dag_state::DagState,
26 error::{ConsensusError, ConsensusResult},
27 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28 stake_aggregator::{QuorumThreshold, StakeAggregator},
29 storage::Store,
30 synchronizer::SynchronizerHandle,
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38 context: Arc<Context>,
39 commit_vote_monitor: Arc<CommitVoteMonitor>,
40 block_verifier: Arc<dyn BlockVerifier>,
41 synchronizer: Arc<SynchronizerHandle>,
42 core_dispatcher: Arc<C>,
43 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44 subscription_counter: Arc<SubscriptionCounter>,
45 dag_state: Arc<RwLock<DagState>>,
46 store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50 pub(crate) fn new(
51 context: Arc<Context>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 commit_vote_monitor: Arc<CommitVoteMonitor>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57 dag_state: Arc<RwLock<DagState>>,
58 store: Arc<dyn Store>,
59 ) -> Self {
60 let subscription_counter = Arc::new(SubscriptionCounter::new(
61 context.clone(),
62 core_dispatcher.clone(),
63 ));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcaster,
71 subscription_counter,
72 dag_state,
73 store,
74 }
75 }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80 async fn handle_send_block(
81 &self,
82 peer: AuthorityIndex,
83 serialized_block: ExtendedSerializedBlock,
84 ) -> ConsensusResult<()> {
85 fail_point_async!("consensus-rpc-response");
86
87 let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89 let signed_block: SignedBlock =
91 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93 if peer != signed_block.author() {
95 self.context
96 .metrics
97 .node_metrics
98 .invalid_blocks
99 .with_label_values(&[
100 peer_hostname.as_str(),
101 "handle_send_block",
102 "UnexpectedAuthority",
103 ])
104 .inc();
105 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106 info!("Block with wrong authority from {}: {}", peer, e);
107 return Err(e);
108 }
109 let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111 if let Err(e) = self.block_verifier.verify(&signed_block) {
113 self.context
114 .metrics
115 .node_metrics
116 .invalid_blocks
117 .with_label_values(&[
118 peer_hostname.as_str(),
119 "handle_send_block",
120 e.clone().name(),
121 ])
122 .inc();
123 info!("Invalid block from {}: {}", peer, e);
124 return Err(e);
125 }
126 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127 let block_ref = verified_block.reference();
128 debug!("Received block {} via send block.", block_ref);
129
130 let now = self.context.clock.timestamp_utc_ms();
132 let forward_time_drift =
133 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
134 if forward_time_drift > self.context.parameters.max_forward_time_drift {
135 self.context
136 .metrics
137 .node_metrics
138 .rejected_future_blocks
139 .with_label_values(&[peer_hostname])
140 .inc();
141 debug!(
142 "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
143 block_ref,
144 verified_block.timestamp_ms(),
145 now,
146 );
147 return Err(ConsensusError::BlockRejected {
148 block_ref,
149 reason: format!(
150 "Block timestamp is too far in the future: {} > {}",
151 verified_block.timestamp_ms(),
152 now
153 ),
154 });
155 }
156
157 if forward_time_drift > Duration::ZERO {
159 self.context
160 .metrics
161 .node_metrics
162 .block_timestamp_drift_wait_ms
163 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
164 .inc_by(forward_time_drift.as_millis() as u64);
165 debug!(
166 "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
167 block_ref,
168 verified_block.timestamp_ms(),
169 now,
170 forward_time_drift.as_millis(),
171 );
172 sleep(forward_time_drift).await;
173 }
174
175 self.commit_vote_monitor.observe_block(&verified_block);
178
179 let last_commit_index = self.dag_state.read().last_commit_index();
188 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
189 if last_commit_index
192 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
193 < quorum_commit_index
194 {
195 self.context
196 .metrics
197 .node_metrics
198 .rejected_blocks
199 .with_label_values(&["commit_lagging"])
200 .inc();
201 debug!(
202 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
203 block_ref, last_commit_index, quorum_commit_index,
204 );
205 return Err(ConsensusError::BlockRejected {
206 block_ref,
207 reason: format!(
208 "Last commit index is lagging quorum commit index too much ({} < {})",
209 last_commit_index, quorum_commit_index,
210 ),
211 });
212 }
213
214 self.context
215 .metrics
216 .node_metrics
217 .verified_blocks
218 .with_label_values(&[peer_hostname])
219 .inc();
220
221 let missing_ancestors = self
222 .core_dispatcher
223 .add_blocks(vec![verified_block])
224 .await
225 .map_err(|_| ConsensusError::Shutdown)?;
226 if !missing_ancestors.is_empty() {
227 if let Err(err) = self
229 .synchronizer
230 .fetch_blocks(missing_ancestors, peer)
231 .await
232 {
233 warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
234 }
235 }
236
237 let mut excluded_ancestors = serialized_block
240 .excluded_ancestors
241 .into_iter()
242 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
243 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
244 .map_err(ConsensusError::MalformedBlock)?;
245
246 let excluded_ancestors_limit = self.context.committee.size() * 2;
247 if excluded_ancestors.len() > excluded_ancestors_limit {
248 debug!(
249 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
250 excluded_ancestors.len() - excluded_ancestors_limit,
251 peer,
252 peer_hostname,
253 );
254 excluded_ancestors.truncate(excluded_ancestors_limit);
255 }
256
257 self.context
258 .metrics
259 .node_metrics
260 .network_received_excluded_ancestors_from_authority
261 .with_label_values(&[peer_hostname])
262 .inc_by(excluded_ancestors.len() as u64);
263
264 for excluded_ancestor in &excluded_ancestors {
265 let excluded_ancestor_hostname = &self
266 .context
267 .committee
268 .authority(excluded_ancestor.author)
269 .hostname;
270 self.context
271 .metrics
272 .node_metrics
273 .network_excluded_ancestors_count_by_authority
274 .with_label_values(&[excluded_ancestor_hostname])
275 .inc();
276 }
277
278 let missing_excluded_ancestors = self
279 .core_dispatcher
280 .check_block_refs(excluded_ancestors)
281 .await
282 .map_err(|_| ConsensusError::Shutdown)?;
283
284 if !missing_excluded_ancestors.is_empty() {
285 self.context
286 .metrics
287 .node_metrics
288 .network_excluded_ancestors_sent_to_fetch
289 .with_label_values(&[peer_hostname])
290 .inc_by(missing_excluded_ancestors.len() as u64);
291
292 let synchronizer = self.synchronizer.clone();
293 tokio::spawn(async move {
294 if let Err(err) = synchronizer
296 .fetch_blocks(missing_excluded_ancestors, peer)
297 .await
298 {
299 warn!(
300 "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
301 );
302 }
303 });
304 }
305
306 Ok(())
307 }
308
309 async fn handle_subscribe_blocks(
310 &self,
311 peer: AuthorityIndex,
312 last_received: Round,
313 ) -> ConsensusResult<BlockStream> {
314 fail_point_async!("consensus-rpc-response");
315
316 let dag_state = self.dag_state.read();
317 let missed_blocks = stream::iter(
322 dag_state
323 .get_cached_blocks(self.context.own_index, last_received + 1)
324 .into_iter()
325 .map(|block| ExtendedSerializedBlock {
326 block: block.serialized().clone(),
327 excluded_ancestors: vec![],
328 }),
329 );
330
331 let broadcasted_blocks = BroadcastedBlockStream::new(
332 peer,
333 self.rx_block_broadcaster.resubscribe(),
334 self.subscription_counter.clone(),
335 );
336
337 Ok(Box::pin(missed_blocks.chain(
340 broadcasted_blocks.map(ExtendedSerializedBlock::from),
341 )))
342 }
343
344 async fn handle_fetch_blocks(
345 &self,
346 peer: AuthorityIndex,
347 block_refs: Vec<BlockRef>,
348 highest_accepted_rounds: Vec<Round>,
349 ) -> ConsensusResult<Vec<Bytes>> {
350 fail_point_async!("consensus-rpc-response");
351
352 const MAX_ADDITIONAL_BLOCKS: usize = 10;
353 if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
354 return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
355 }
356
357 if !highest_accepted_rounds.is_empty()
358 && highest_accepted_rounds.len() != self.context.committee.size()
359 {
360 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
361 highest_accepted_rounds.len(),
362 self.context.committee.size(),
363 ));
364 }
365
366 for block in &block_refs {
368 if !self.context.committee.is_valid_index(block.author) {
369 return Err(ConsensusError::InvalidAuthorityIndex {
370 index: block.author,
371 max: self.context.committee.size(),
372 });
373 }
374 if block.round == GENESIS_ROUND {
375 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
376 }
377 }
378
379 let blocks = self.dag_state.read().get_blocks(&block_refs);
381
382 let mut ancestor_blocks = vec![];
385 if !highest_accepted_rounds.is_empty() {
386 let all_ancestors = blocks
387 .iter()
388 .flatten()
389 .flat_map(|block| block.ancestors().to_vec())
390 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
391 .take(MAX_ADDITIONAL_BLOCKS)
392 .collect::<Vec<_>>();
393
394 if !all_ancestors.is_empty() {
395 ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
396 }
397 }
398
399 let result = blocks
401 .into_iter()
402 .chain(ancestor_blocks)
403 .flatten()
404 .map(|block| block.serialized().clone())
405 .collect::<Vec<_>>();
406
407 Ok(result)
408 }
409
410 async fn handle_fetch_commits(
411 &self,
412 _peer: AuthorityIndex,
413 commit_range: CommitRange,
414 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
415 fail_point_async!("consensus-rpc-response");
416
417 let inclusive_end = commit_range.end().min(
420 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
421 - 1,
422 );
423 let mut commits = self
424 .store
425 .scan_commits((commit_range.start()..=inclusive_end).into())?;
426 let mut certifier_block_refs = vec![];
427 'commit: while let Some(c) = commits.last() {
428 let index = c.index();
429 let votes = self.store.read_commit_votes(index)?;
430 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
431 for v in &votes {
432 stake_aggregator.add(v.author, &self.context.committee);
433 }
434 if stake_aggregator.reached_threshold(&self.context.committee) {
435 certifier_block_refs = votes;
436 break 'commit;
437 } else {
438 debug!(
439 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
440 index,
441 stake_aggregator.stake(),
442 stake_aggregator.threshold(&self.context.committee)
443 );
444 self.context
445 .metrics
446 .node_metrics
447 .commit_sync_fetch_commits_handler_uncertified_skipped
448 .inc();
449 commits.pop();
450 }
451 }
452 let certifier_blocks = self
453 .store
454 .read_blocks(&certifier_block_refs)?
455 .into_iter()
456 .flatten()
457 .collect();
458 Ok((commits, certifier_blocks))
459 }
460
461 async fn handle_fetch_latest_blocks(
462 &self,
463 peer: AuthorityIndex,
464 authorities: Vec<AuthorityIndex>,
465 ) -> ConsensusResult<Vec<Bytes>> {
466 fail_point_async!("consensus-rpc-response");
467
468 if authorities.len() > self.context.committee.size() {
469 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
470 }
471
472 for authority in &authorities {
474 if !self.context.committee.is_valid_index(*authority) {
475 return Err(ConsensusError::InvalidAuthorityIndex {
476 index: *authority,
477 max: self.context.committee.size(),
478 });
479 }
480 }
481
482 let mut blocks = vec![];
487 let dag_state = self.dag_state.read();
488 for authority in authorities {
489 let block = dag_state.get_last_block_for_authority(authority);
490
491 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
492
493 if block.round() != GENESIS_ROUND {
496 blocks.push(block);
497 }
498 }
499
500 let result = blocks
502 .into_iter()
503 .map(|block| block.serialized().clone())
504 .collect::<Vec<_>>();
505
506 Ok(result)
507 }
508
509 async fn handle_get_latest_rounds(
510 &self,
511 _peer: AuthorityIndex,
512 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
513 fail_point_async!("consensus-rpc-response");
514
515 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
516
517 let blocks = self
518 .dag_state
519 .read()
520 .get_last_cached_block_per_authority(Round::MAX);
521 let highest_accepted_rounds = blocks
522 .into_iter()
523 .map(|(block, _)| block.round())
524 .collect::<Vec<_>>();
525
526 highest_received_rounds[self.context.own_index] =
529 highest_accepted_rounds[self.context.own_index];
530
531 Ok((highest_received_rounds, highest_accepted_rounds))
532 }
533}
534
535struct Counter {
536 count: usize,
537 subscriptions_by_authority: Vec<usize>,
538}
539
540struct SubscriptionCounter {
543 context: Arc<Context>,
544 counter: parking_lot::Mutex<Counter>,
545 dispatcher: Arc<dyn CoreThreadDispatcher>,
546}
547
548impl SubscriptionCounter {
549 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
550 for (_, authority) in context.committee.authorities() {
552 context
553 .metrics
554 .node_metrics
555 .subscribed_by
556 .with_label_values(&[authority.hostname.as_str()])
557 .set(0);
558 }
559
560 Self {
561 counter: parking_lot::Mutex::new(Counter {
562 count: 0,
563 subscriptions_by_authority: vec![0; context.committee.size()],
564 }),
565 dispatcher,
566 context,
567 }
568 }
569
570 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
571 let mut counter = self.counter.lock();
572 counter.count += 1;
573 counter.subscriptions_by_authority[peer] += 1;
574
575 let peer_hostname = &self.context.committee.authority(peer).hostname;
576 self.context
577 .metrics
578 .node_metrics
579 .subscribed_by
580 .with_label_values(&[peer_hostname])
581 .set(1);
582
583 if counter.count == 1 {
584 self.dispatcher
585 .set_subscriber_exists(true)
586 .map_err(|_| ConsensusError::Shutdown)?;
587 }
588 Ok(())
589 }
590
591 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
592 let mut counter = self.counter.lock();
593 counter.count -= 1;
594 counter.subscriptions_by_authority[peer] -= 1;
595
596 if counter.subscriptions_by_authority[peer] == 0 {
597 let peer_hostname = &self.context.committee.authority(peer).hostname;
598 self.context
599 .metrics
600 .node_metrics
601 .subscribed_by
602 .with_label_values(&[peer_hostname])
603 .set(0);
604 }
605
606 if counter.count == 0 {
607 self.dispatcher
608 .set_subscriber_exists(false)
609 .map_err(|_| ConsensusError::Shutdown)?;
610 }
611 Ok(())
612 }
613}
614
615type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
618
619struct BroadcastStream<T> {
622 peer: AuthorityIndex,
623 inner: ReusableBoxFuture<
625 'static,
626 (
627 Result<T, broadcast::error::RecvError>,
628 broadcast::Receiver<T>,
629 ),
630 >,
631 subscription_counter: Arc<SubscriptionCounter>,
633}
634
635impl<T: 'static + Clone + Send> BroadcastStream<T> {
636 pub fn new(
637 peer: AuthorityIndex,
638 rx: broadcast::Receiver<T>,
639 subscription_counter: Arc<SubscriptionCounter>,
640 ) -> Self {
641 if let Err(err) = subscription_counter.increment(peer) {
642 match err {
643 ConsensusError::Shutdown => {}
644 _ => panic!("Unexpected error: {err}"),
645 }
646 }
647 Self {
648 peer,
649 inner: ReusableBoxFuture::new(make_recv_future(rx)),
650 subscription_counter,
651 }
652 }
653}
654
655impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
656 type Item = T;
657
658 fn poll_next(
659 mut self: Pin<&mut Self>,
660 cx: &mut task::Context<'_>,
661 ) -> task::Poll<Option<Self::Item>> {
662 let peer = self.peer;
663 let maybe_item = loop {
664 let (result, rx) = ready!(self.inner.poll(cx));
665 self.inner.set(make_recv_future(rx));
666
667 match result {
668 Ok(item) => break Some(item),
669 Err(broadcast::error::RecvError::Closed) => {
670 info!("Block BroadcastedBlockStream {} closed", peer);
671 break None;
672 }
673 Err(broadcast::error::RecvError::Lagged(n)) => {
674 warn!(
675 "Block BroadcastedBlockStream {} lagged by {} messages",
676 peer, n
677 );
678 continue;
679 }
680 }
681 };
682 task::Poll::Ready(maybe_item)
683 }
684}
685
686impl<T> Drop for BroadcastStream<T> {
687 fn drop(&mut self) {
688 if let Err(err) = self.subscription_counter.decrement(self.peer) {
689 match err {
690 ConsensusError::Shutdown => {}
691 _ => panic!("Unexpected error: {err}"),
692 }
693 }
694 }
695}
696
697async fn make_recv_future<T: Clone>(
698 mut rx: broadcast::Receiver<T>,
699) -> (
700 Result<T, broadcast::error::RecvError>,
701 broadcast::Receiver<T>,
702) {
703 let result = rx.recv().await;
704 (result, rx)
705}
706
707#[cfg(test)]
710mod tests {
711 use std::{collections::BTreeSet, sync::Arc, time::Duration};
712
713 use async_trait::async_trait;
714 use bytes::Bytes;
715 use consensus_config::AuthorityIndex;
716 use parking_lot::{Mutex, RwLock};
717 use tokio::{sync::broadcast, time::sleep};
718
719 use crate::{
720 Round,
721 authority_service::AuthorityService,
722 block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
723 commit::{CertifiedCommits, CommitRange},
724 commit_vote_monitor::CommitVoteMonitor,
725 context::Context,
726 core_thread::{CoreError, CoreThreadDispatcher},
727 dag_state::DagState,
728 error::ConsensusResult,
729 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
730 round_prober::QuorumRound,
731 storage::mem_store::MemStore,
732 synchronizer::Synchronizer,
733 test_dag_builder::DagBuilder,
734 };
735
736 struct FakeCoreThreadDispatcher {
737 blocks: Mutex<Vec<VerifiedBlock>>,
738 }
739
740 impl FakeCoreThreadDispatcher {
741 fn new() -> Self {
742 Self {
743 blocks: Mutex::new(vec![]),
744 }
745 }
746
747 fn get_blocks(&self) -> Vec<VerifiedBlock> {
748 self.blocks.lock().clone()
749 }
750 }
751
752 #[async_trait]
753 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
754 async fn add_blocks(
755 &self,
756 blocks: Vec<VerifiedBlock>,
757 ) -> Result<BTreeSet<BlockRef>, CoreError> {
758 let block_refs = blocks.iter().map(|b| b.reference()).collect();
759 self.blocks.lock().extend(blocks);
760 Ok(block_refs)
761 }
762
763 async fn check_block_refs(
764 &self,
765 _block_refs: Vec<BlockRef>,
766 ) -> Result<BTreeSet<BlockRef>, CoreError> {
767 Ok(BTreeSet::new())
768 }
769
770 async fn add_certified_commits(
771 &self,
772 _commits: CertifiedCommits,
773 ) -> Result<BTreeSet<BlockRef>, CoreError> {
774 todo!()
775 }
776
777 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
778 Ok(())
779 }
780
781 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
782 Ok(Default::default())
783 }
784
785 fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
786 todo!()
787 }
788
789 fn set_propagation_delay_and_quorum_rounds(
790 &self,
791 _delay: Round,
792 _received_quorum_rounds: Vec<QuorumRound>,
793 _accepted_quorum_rounds: Vec<QuorumRound>,
794 ) -> Result<(), CoreError> {
795 todo!()
796 }
797
798 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
799 todo!()
800 }
801
802 fn highest_received_rounds(&self) -> Vec<Round> {
803 todo!()
804 }
805 }
806
807 #[derive(Default)]
808 struct FakeNetworkClient {}
809
810 #[async_trait]
811 impl NetworkClient for FakeNetworkClient {
812 const SUPPORT_STREAMING: bool = false;
813
814 async fn send_block(
815 &self,
816 _peer: AuthorityIndex,
817 _block: &VerifiedBlock,
818 _timeout: Duration,
819 ) -> ConsensusResult<()> {
820 unimplemented!("Unimplemented")
821 }
822
823 async fn subscribe_blocks(
824 &self,
825 _peer: AuthorityIndex,
826 _last_received: Round,
827 _timeout: Duration,
828 ) -> ConsensusResult<BlockStream> {
829 unimplemented!("Unimplemented")
830 }
831
832 async fn fetch_blocks(
833 &self,
834 _peer: AuthorityIndex,
835 _block_refs: Vec<BlockRef>,
836 _highest_accepted_rounds: Vec<Round>,
837 _timeout: Duration,
838 ) -> ConsensusResult<Vec<Bytes>> {
839 unimplemented!("Unimplemented")
840 }
841
842 async fn fetch_commits(
843 &self,
844 _peer: AuthorityIndex,
845 _commit_range: CommitRange,
846 _timeout: Duration,
847 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
848 unimplemented!("Unimplemented")
849 }
850
851 async fn fetch_latest_blocks(
852 &self,
853 _peer: AuthorityIndex,
854 _authorities: Vec<AuthorityIndex>,
855 _timeout: Duration,
856 ) -> ConsensusResult<Vec<Bytes>> {
857 unimplemented!("Unimplemented")
858 }
859
860 async fn get_latest_rounds(
861 &self,
862 _peer: AuthorityIndex,
863 _timeout: Duration,
864 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
865 unimplemented!("Unimplemented")
866 }
867 }
868
869 #[tokio::test(flavor = "current_thread", start_paused = true)]
870 async fn test_handle_send_block() {
871 let (context, _keys) = Context::new_for_test(4);
872 let context = Arc::new(context);
873 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
874 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
875 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
876 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
877 let network_client = Arc::new(FakeNetworkClient::default());
878 let store = Arc::new(MemStore::new());
879 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
880 let synchronizer = Synchronizer::start(
881 network_client,
882 context.clone(),
883 core_dispatcher.clone(),
884 commit_vote_monitor.clone(),
885 block_verifier.clone(),
886 dag_state.clone(),
887 false,
888 );
889 let authority_service = Arc::new(AuthorityService::new(
890 context.clone(),
891 block_verifier,
892 commit_vote_monitor,
893 synchronizer,
894 core_dispatcher.clone(),
895 rx_block_broadcast,
896 dag_state,
897 store,
898 ));
899
900 let now = context.clock.timestamp_utc_ms();
902 let max_drift = context.parameters.max_forward_time_drift;
903 let input_block = VerifiedBlock::new_for_test(
904 TestBlock::new(9, 0)
905 .set_timestamp_ms(now + max_drift.as_millis() as u64)
906 .build(),
907 );
908
909 let service = authority_service.clone();
910 let serialized = ExtendedSerializedBlock {
911 block: input_block.serialized().clone(),
912 excluded_ancestors: vec![],
913 };
914
915 tokio::spawn(async move {
916 service
917 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
918 .await
919 .unwrap();
920 });
921
922 sleep(max_drift / 2).await;
923 assert!(core_dispatcher.get_blocks().is_empty());
924
925 sleep(max_drift).await;
926 let blocks = core_dispatcher.get_blocks();
927 assert_eq!(blocks.len(), 1);
928 assert_eq!(blocks[0], input_block);
929 }
930
931 #[tokio::test(flavor = "current_thread", start_paused = true)]
932 async fn test_handle_fetch_latest_blocks() {
933 let (context, _keys) = Context::new_for_test(4);
935 let context = Arc::new(context);
936 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
937 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
938 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
939 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
940 let network_client = Arc::new(FakeNetworkClient::default());
941 let store = Arc::new(MemStore::new());
942 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
943 let synchronizer = Synchronizer::start(
944 network_client,
945 context.clone(),
946 core_dispatcher.clone(),
947 commit_vote_monitor.clone(),
948 block_verifier.clone(),
949 dag_state.clone(),
950 true,
951 );
952 let authority_service = Arc::new(AuthorityService::new(
953 context.clone(),
954 block_verifier,
955 commit_vote_monitor,
956 synchronizer,
957 core_dispatcher.clone(),
958 rx_block_broadcast,
959 dag_state.clone(),
960 store,
961 ));
962
963 let mut dag_builder = DagBuilder::new(context.clone());
966 dag_builder
967 .layers(1..=10)
968 .authorities(vec![AuthorityIndex::new_for_test(2)])
969 .equivocate(1)
970 .build()
971 .persist_layers(dag_state);
972
973 let authorities_to_request = vec![
975 AuthorityIndex::new_for_test(1),
976 AuthorityIndex::new_for_test(2),
977 ];
978 let results = authority_service
979 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
980 .await;
981
982 let serialised_blocks = results.unwrap();
984 for serialised_block in serialised_blocks {
985 let signed_block: SignedBlock =
986 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
987 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
988
989 assert_eq!(verified_block.round(), 10);
990 }
991 }
992}