1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18 CommitIndex, Round,
19 block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 commit::{CommitAPI as _, CommitRange, TrustedCommit},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::Context,
24 core_thread::CoreThreadDispatcher,
25 dag_state::DagState,
26 error::{ConsensusError, ConsensusResult},
27 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28 stake_aggregator::{QuorumThreshold, StakeAggregator},
29 storage::Store,
30 synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38 context: Arc<Context>,
39 commit_vote_monitor: Arc<CommitVoteMonitor>,
40 block_verifier: Arc<dyn BlockVerifier>,
41 synchronizer: Arc<SynchronizerHandle>,
42 core_dispatcher: Arc<C>,
43 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44 subscription_counter: Arc<SubscriptionCounter>,
45 dag_state: Arc<RwLock<DagState>>,
46 store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50 pub(crate) fn new(
51 context: Arc<Context>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 commit_vote_monitor: Arc<CommitVoteMonitor>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57 dag_state: Arc<RwLock<DagState>>,
58 store: Arc<dyn Store>,
59 ) -> Self {
60 let subscription_counter = Arc::new(SubscriptionCounter::new(
61 context.clone(),
62 core_dispatcher.clone(),
63 ));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcaster,
71 subscription_counter,
72 dag_state,
73 store,
74 }
75 }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80 async fn handle_send_block(
81 &self,
82 peer: AuthorityIndex,
83 serialized_block: ExtendedSerializedBlock,
84 ) -> ConsensusResult<()> {
85 fail_point_async!("consensus-rpc-response");
86
87 let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89 let signed_block: SignedBlock =
91 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93 if peer != signed_block.author() {
95 self.context
96 .metrics
97 .node_metrics
98 .invalid_blocks
99 .with_label_values(&[
100 peer_hostname.as_str(),
101 "handle_send_block",
102 "UnexpectedAuthority",
103 ])
104 .inc();
105 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106 info!("Block with wrong authority from {}: {}", peer, e);
107 return Err(e);
108 }
109 let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111 if let Err(e) = self.block_verifier.verify(&signed_block) {
113 self.context
114 .metrics
115 .node_metrics
116 .invalid_blocks
117 .with_label_values(&[
118 peer_hostname.as_str(),
119 "handle_send_block",
120 e.clone().name(),
121 ])
122 .inc();
123 info!("Invalid block from {}: {}", peer, e);
124 return Err(e);
125 }
126 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127 let block_ref = verified_block.reference();
128 debug!("Received block {} via send block.", block_ref);
129
130 let now = self.context.clock.timestamp_utc_ms();
131 let forward_time_drift =
132 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
133
134 if !self
135 .context
136 .protocol_config
137 .consensus_median_timestamp_with_checkpoint_enforcement()
138 {
139 if forward_time_drift > self.context.parameters.max_forward_time_drift {
141 self.context
142 .metrics
143 .node_metrics
144 .rejected_future_blocks
145 .with_label_values(&[peer_hostname])
146 .inc();
147 debug!(
148 "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
149 block_ref,
150 verified_block.timestamp_ms(),
151 now,
152 );
153 return Err(ConsensusError::BlockRejected {
154 block_ref,
155 reason: format!(
156 "Block timestamp is too far in the future: {} > {}",
157 verified_block.timestamp_ms(),
158 now
159 ),
160 });
161 }
162
163 if forward_time_drift > Duration::ZERO {
165 self.context
166 .metrics
167 .node_metrics
168 .block_timestamp_drift_ms
169 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
170 .inc_by(forward_time_drift.as_millis() as u64);
171 debug!(
172 "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
173 block_ref,
174 verified_block.timestamp_ms(),
175 now,
176 forward_time_drift.as_millis(),
177 );
178 sleep(forward_time_drift).await;
179 }
180 } else {
181 self.context
182 .metrics
183 .node_metrics
184 .block_timestamp_drift_ms
185 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
186 .inc_by(forward_time_drift.as_millis() as u64);
187 }
188
189 self.commit_vote_monitor.observe_block(&verified_block);
192
193 let last_commit_index = self.dag_state.read().last_commit_index();
202 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
203 if last_commit_index
206 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
207 < quorum_commit_index
208 {
209 self.context
210 .metrics
211 .node_metrics
212 .rejected_blocks
213 .with_label_values(&["commit_lagging"])
214 .inc();
215 debug!(
216 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
217 block_ref, last_commit_index, quorum_commit_index,
218 );
219 return Err(ConsensusError::BlockRejected {
220 block_ref,
221 reason: format!(
222 "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
223 ),
224 });
225 }
226
227 self.context
228 .metrics
229 .node_metrics
230 .verified_blocks
231 .with_label_values(&[peer_hostname])
232 .inc();
233
234 let missing_ancestors = self
235 .core_dispatcher
236 .add_blocks(vec![verified_block])
237 .await
238 .map_err(|_| ConsensusError::Shutdown)?;
239 if !missing_ancestors.is_empty() {
240 if let Err(err) = self
242 .synchronizer
243 .fetch_blocks(missing_ancestors, peer)
244 .await
245 {
246 warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
247 }
248 }
249
250 let mut excluded_ancestors = serialized_block
253 .excluded_ancestors
254 .into_iter()
255 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
256 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
257 .map_err(ConsensusError::MalformedBlock)?;
258
259 let excluded_ancestors_limit = self.context.committee.size() * 2;
260 if excluded_ancestors.len() > excluded_ancestors_limit {
261 debug!(
262 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
263 excluded_ancestors.len() - excluded_ancestors_limit,
264 peer,
265 peer_hostname,
266 );
267 excluded_ancestors.truncate(excluded_ancestors_limit);
268 }
269
270 self.context
271 .metrics
272 .node_metrics
273 .network_received_excluded_ancestors_from_authority
274 .with_label_values(&[peer_hostname])
275 .inc_by(excluded_ancestors.len() as u64);
276
277 for excluded_ancestor in &excluded_ancestors {
278 let excluded_ancestor_hostname = &self
279 .context
280 .committee
281 .authority(excluded_ancestor.author)
282 .hostname;
283 self.context
284 .metrics
285 .node_metrics
286 .network_excluded_ancestors_count_by_authority
287 .with_label_values(&[excluded_ancestor_hostname])
288 .inc();
289 }
290
291 let missing_excluded_ancestors = self
292 .core_dispatcher
293 .check_block_refs(excluded_ancestors)
294 .await
295 .map_err(|_| ConsensusError::Shutdown)?;
296
297 if !missing_excluded_ancestors.is_empty() {
298 self.context
299 .metrics
300 .node_metrics
301 .network_excluded_ancestors_sent_to_fetch
302 .with_label_values(&[peer_hostname])
303 .inc_by(missing_excluded_ancestors.len() as u64);
304
305 let synchronizer = self.synchronizer.clone();
306 tokio::spawn(async move {
307 if let Err(err) = synchronizer
309 .fetch_blocks(missing_excluded_ancestors, peer)
310 .await
311 {
312 warn!(
313 "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
314 );
315 }
316 });
317 }
318
319 Ok(())
320 }
321
322 async fn handle_subscribe_blocks(
323 &self,
324 peer: AuthorityIndex,
325 last_received: Round,
326 ) -> ConsensusResult<BlockStream> {
327 fail_point_async!("consensus-rpc-response");
328
329 let dag_state = self.dag_state.read();
330 let missed_blocks = stream::iter(
335 dag_state
336 .get_cached_blocks(self.context.own_index, last_received + 1)
337 .into_iter()
338 .map(|block| ExtendedSerializedBlock {
339 block: block.serialized().clone(),
340 excluded_ancestors: vec![],
341 }),
342 );
343
344 let broadcasted_blocks = BroadcastedBlockStream::new(
345 peer,
346 self.rx_block_broadcaster.resubscribe(),
347 self.subscription_counter.clone(),
348 );
349
350 Ok(Box::pin(missed_blocks.chain(
353 broadcasted_blocks.map(ExtendedSerializedBlock::from),
354 )))
355 }
356
357 async fn handle_fetch_blocks(
365 &self,
366 peer: AuthorityIndex,
367 mut block_refs: Vec<BlockRef>,
368 highest_accepted_rounds: Vec<Round>,
369 ) -> ConsensusResult<Vec<Bytes>> {
370 let commit_sync_handle = highest_accepted_rounds.is_empty();
374
375 fail_point_async!("consensus-rpc-response");
376
377 for block in &block_refs {
379 if !self.context.committee.is_valid_index(block.author) {
380 return Err(ConsensusError::InvalidAuthorityIndex {
381 index: block.author,
382 max: self.context.committee.size(),
383 });
384 }
385 if block.round == GENESIS_ROUND {
386 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
387 }
388 }
389
390 if !self.context.protocol_config.consensus_batched_block_sync() {
391 if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
392 return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
393 }
394
395 if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
396 {
397 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
398 highest_accepted_rounds.len(),
399 self.context.committee.size(),
400 ));
401 }
402
403 let blocks = self.dag_state.read().get_blocks(&block_refs);
405
406 let mut ancestor_blocks = vec![];
409 if !commit_sync_handle {
410 let all_ancestors = blocks
411 .iter()
412 .flatten()
413 .flat_map(|block| block.ancestors().to_vec())
414 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
415 .take(MAX_ADDITIONAL_BLOCKS)
416 .collect::<Vec<_>>();
417
418 if !all_ancestors.is_empty() {
419 ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
420 }
421 }
422
423 let result = blocks
425 .into_iter()
426 .chain(ancestor_blocks)
427 .flatten()
428 .map(|block| block.serialized().clone())
429 .collect::<Vec<_>>();
430
431 return Ok(result);
432 }
433
434 if commit_sync_handle {
438 block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
439 } else {
440 block_refs.truncate(self.context.parameters.max_blocks_per_sync);
441 }
442
443 let blocks = if commit_sync_handle {
445 self.dag_state
447 .read()
448 .get_blocks(&block_refs)
449 .into_iter()
450 .flatten()
451 .collect()
452 } else {
453 block_refs.sort();
456 block_refs.dedup();
457 let dag_state = self.dag_state.read();
458 let mut blocks = dag_state
459 .get_blocks(&block_refs)
460 .into_iter()
461 .flatten()
462 .collect::<Vec<_>>();
463
464 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
468 for block_ref in blocks.iter().map(|b| b.reference()) {
469 let entry = lowest_missing_rounds
470 .entry(block_ref.author)
471 .or_insert(block_ref.round);
472 *entry = (*entry).min(block_ref.round);
473 }
474
475 let own_index = self.context.own_index;
479
480 let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
482 ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
483
484 for (authority, lowest_missing_round) in ordered_missing_rounds {
485 let highest_accepted_round = highest_accepted_rounds[authority];
486 if highest_accepted_round >= lowest_missing_round {
487 continue;
488 }
489
490 let missing_blocks = dag_state.get_cached_blocks_in_range(
491 authority,
492 highest_accepted_round + 1,
493 lowest_missing_round,
494 self.context
495 .parameters
496 .max_blocks_per_sync
497 .saturating_sub(blocks.len()),
498 );
499 blocks.extend(missing_blocks);
500 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
501 blocks.truncate(self.context.parameters.max_blocks_per_sync);
502 break;
503 }
504 }
505
506 blocks
507 };
508
509 let bytes = blocks
511 .into_iter()
512 .map(|block| block.serialized().clone())
513 .collect::<Vec<_>>();
514 Ok(bytes)
515 }
516
517 async fn handle_fetch_commits(
518 &self,
519 _peer: AuthorityIndex,
520 commit_range: CommitRange,
521 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
522 fail_point_async!("consensus-rpc-response");
523
524 let inclusive_end = commit_range.end().min(
527 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
528 - 1,
529 );
530 let mut commits = self
531 .store
532 .scan_commits((commit_range.start()..=inclusive_end).into())?;
533 let mut certifier_block_refs = vec![];
534 'commit: while let Some(c) = commits.last() {
535 let index = c.index();
536 let votes = self.store.read_commit_votes(index)?;
537 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
538 for v in &votes {
539 stake_aggregator.add(v.author, &self.context.committee);
540 }
541 if stake_aggregator.reached_threshold(&self.context.committee) {
542 certifier_block_refs = votes;
543 break 'commit;
544 } else {
545 debug!(
546 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
547 index,
548 stake_aggregator.stake(),
549 stake_aggregator.threshold(&self.context.committee)
550 );
551 self.context
552 .metrics
553 .node_metrics
554 .commit_sync_fetch_commits_handler_uncertified_skipped
555 .inc();
556 commits.pop();
557 }
558 }
559 let certifier_blocks = self
560 .store
561 .read_blocks(&certifier_block_refs)?
562 .into_iter()
563 .flatten()
564 .collect();
565 Ok((commits, certifier_blocks))
566 }
567
568 async fn handle_fetch_latest_blocks(
569 &self,
570 peer: AuthorityIndex,
571 authorities: Vec<AuthorityIndex>,
572 ) -> ConsensusResult<Vec<Bytes>> {
573 fail_point_async!("consensus-rpc-response");
574
575 if authorities.len() > self.context.committee.size() {
576 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
577 }
578
579 for authority in &authorities {
581 if !self.context.committee.is_valid_index(*authority) {
582 return Err(ConsensusError::InvalidAuthorityIndex {
583 index: *authority,
584 max: self.context.committee.size(),
585 });
586 }
587 }
588
589 let mut blocks = vec![];
594 let dag_state = self.dag_state.read();
595 for authority in authorities {
596 let block = dag_state.get_last_block_for_authority(authority);
597
598 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
599
600 if block.round() != GENESIS_ROUND {
603 blocks.push(block);
604 }
605 }
606
607 let result = blocks
609 .into_iter()
610 .map(|block| block.serialized().clone())
611 .collect::<Vec<_>>();
612
613 Ok(result)
614 }
615
616 async fn handle_get_latest_rounds(
617 &self,
618 _peer: AuthorityIndex,
619 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
620 fail_point_async!("consensus-rpc-response");
621
622 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
623
624 let blocks = self
625 .dag_state
626 .read()
627 .get_last_cached_block_per_authority(Round::MAX);
628 let highest_accepted_rounds = blocks
629 .into_iter()
630 .map(|(block, _)| block.round())
631 .collect::<Vec<_>>();
632
633 highest_received_rounds[self.context.own_index] =
636 highest_accepted_rounds[self.context.own_index];
637
638 Ok((highest_received_rounds, highest_accepted_rounds))
639 }
640}
641
642struct Counter {
643 count: usize,
644 subscriptions_by_authority: Vec<usize>,
645}
646
647struct SubscriptionCounter {
650 context: Arc<Context>,
651 counter: parking_lot::Mutex<Counter>,
652 dispatcher: Arc<dyn CoreThreadDispatcher>,
653}
654
655impl SubscriptionCounter {
656 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
657 for (_, authority) in context.committee.authorities() {
659 context
660 .metrics
661 .node_metrics
662 .subscribed_by
663 .with_label_values(&[authority.hostname.as_str()])
664 .set(0);
665 }
666
667 Self {
668 counter: parking_lot::Mutex::new(Counter {
669 count: 0,
670 subscriptions_by_authority: vec![0; context.committee.size()],
671 }),
672 dispatcher,
673 context,
674 }
675 }
676
677 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
678 let mut counter = self.counter.lock();
679 counter.count += 1;
680 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
681 counter.subscriptions_by_authority[peer] += 1;
682 let mut total_stake = 0;
683 for (authority_index, _) in self.context.committee.authorities() {
684 if counter.subscriptions_by_authority[authority_index] >= 1
685 || self.context.own_index == authority_index
686 {
687 total_stake += self.context.committee.stake(authority_index);
688 }
689 }
690 let previous_stake = if original_subscription_by_peer == 0 {
692 total_stake - self.context.committee.stake(peer)
693 } else {
694 total_stake
695 };
696
697 let peer_hostname = &self.context.committee.authority(peer).hostname;
698 self.context
699 .metrics
700 .node_metrics
701 .subscribed_by
702 .with_label_values(&[peer_hostname])
703 .set(1);
704 if !self.context.committee.reached_quorum(previous_stake)
707 && self.context.committee.reached_quorum(total_stake)
708 {
709 self.dispatcher
710 .set_quorum_subscribers_exists(true)
711 .map_err(|_| ConsensusError::Shutdown)?;
712 }
713 drop(counter);
715 Ok(())
716 }
717
718 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
719 let mut counter = self.counter.lock();
720 counter.count -= 1;
721 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
722 counter.subscriptions_by_authority[peer] -= 1;
723 let mut total_stake = 0;
724 for (authority_index, _) in self.context.committee.authorities() {
725 if counter.subscriptions_by_authority[authority_index] >= 1
726 || self.context.own_index == authority_index
727 {
728 total_stake += self.context.committee.stake(authority_index);
729 }
730 }
731 let previous_stake = if original_subscription_by_peer == 1 {
733 total_stake + self.context.committee.stake(peer)
734 } else {
735 total_stake
736 };
737
738 if counter.subscriptions_by_authority[peer] == 0 {
739 let peer_hostname = &self.context.committee.authority(peer).hostname;
740 self.context
741 .metrics
742 .node_metrics
743 .subscribed_by
744 .with_label_values(&[peer_hostname])
745 .set(0);
746 }
747
748 if self.context.committee.reached_quorum(previous_stake)
751 && !self.context.committee.reached_quorum(total_stake)
752 {
753 self.dispatcher
754 .set_quorum_subscribers_exists(false)
755 .map_err(|_| ConsensusError::Shutdown)?;
756 }
757 drop(counter);
759 Ok(())
760 }
761}
762
763type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
766
767struct BroadcastStream<T> {
770 peer: AuthorityIndex,
771 inner: ReusableBoxFuture<
773 'static,
774 (
775 Result<T, broadcast::error::RecvError>,
776 broadcast::Receiver<T>,
777 ),
778 >,
779 subscription_counter: Arc<SubscriptionCounter>,
781}
782
783impl<T: 'static + Clone + Send> BroadcastStream<T> {
784 pub fn new(
785 peer: AuthorityIndex,
786 rx: broadcast::Receiver<T>,
787 subscription_counter: Arc<SubscriptionCounter>,
788 ) -> Self {
789 if let Err(err) = subscription_counter.increment(peer) {
790 match err {
791 ConsensusError::Shutdown => {}
792 _ => panic!("Unexpected error: {err}"),
793 }
794 }
795 Self {
796 peer,
797 inner: ReusableBoxFuture::new(make_recv_future(rx)),
798 subscription_counter,
799 }
800 }
801}
802
803impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
804 type Item = T;
805
806 fn poll_next(
807 mut self: Pin<&mut Self>,
808 cx: &mut task::Context<'_>,
809 ) -> task::Poll<Option<Self::Item>> {
810 let peer = self.peer;
811 let maybe_item = loop {
812 let (result, rx) = ready!(self.inner.poll(cx));
813 self.inner.set(make_recv_future(rx));
814
815 match result {
816 Ok(item) => break Some(item),
817 Err(broadcast::error::RecvError::Closed) => {
818 info!("Block BroadcastedBlockStream {} closed", peer);
819 break None;
820 }
821 Err(broadcast::error::RecvError::Lagged(n)) => {
822 warn!(
823 "Block BroadcastedBlockStream {} lagged by {} messages",
824 peer, n
825 );
826 continue;
827 }
828 }
829 };
830 task::Poll::Ready(maybe_item)
831 }
832}
833
834impl<T> Drop for BroadcastStream<T> {
835 fn drop(&mut self) {
836 if let Err(err) = self.subscription_counter.decrement(self.peer) {
837 match err {
838 ConsensusError::Shutdown => {}
839 _ => panic!("Unexpected error: {err}"),
840 }
841 }
842 }
843}
844
845async fn make_recv_future<T: Clone>(
846 mut rx: broadcast::Receiver<T>,
847) -> (
848 Result<T, broadcast::error::RecvError>,
849 broadcast::Receiver<T>,
850) {
851 let result = rx.recv().await;
852 (result, rx)
853}
854
855#[cfg(test)]
858pub(crate) mod tests {
859 use std::{
860 collections::{BTreeMap, BTreeSet},
861 sync::Arc,
862 time::Duration,
863 };
864
865 use async_trait::async_trait;
866 use bytes::Bytes;
867 use consensus_config::AuthorityIndex;
868 use parking_lot::{Mutex, RwLock};
869 use rstest::rstest;
870 use tokio::{sync::broadcast, time::sleep};
871
872 use crate::{
873 Round,
874 authority_service::AuthorityService,
875 block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
876 commit::{CertifiedCommits, CommitRange},
877 commit_vote_monitor::CommitVoteMonitor,
878 context::Context,
879 core_thread::{CoreError, CoreThreadDispatcher},
880 dag_state::DagState,
881 error::ConsensusResult,
882 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
883 round_prober::QuorumRound,
884 storage::mem_store::MemStore,
885 synchronizer::Synchronizer,
886 test_dag_builder::DagBuilder,
887 };
888
889 pub(crate) struct FakeCoreThreadDispatcher {
890 blocks: Mutex<Vec<VerifiedBlock>>,
891 }
892
893 impl FakeCoreThreadDispatcher {
894 pub(crate) fn new() -> Self {
895 Self {
896 blocks: Mutex::new(vec![]),
897 }
898 }
899
900 fn get_blocks(&self) -> Vec<VerifiedBlock> {
901 self.blocks.lock().clone()
902 }
903 }
904
905 #[async_trait]
906 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
907 async fn add_blocks(
908 &self,
909 blocks: Vec<VerifiedBlock>,
910 ) -> Result<BTreeSet<BlockRef>, CoreError> {
911 let block_refs = blocks.iter().map(|b| b.reference()).collect();
912 self.blocks.lock().extend(blocks);
913 Ok(block_refs)
914 }
915
916 async fn check_block_refs(
917 &self,
918 _block_refs: Vec<BlockRef>,
919 ) -> Result<BTreeSet<BlockRef>, CoreError> {
920 Ok(BTreeSet::new())
921 }
922
923 async fn add_certified_commits(
924 &self,
925 _commits: CertifiedCommits,
926 ) -> Result<BTreeSet<BlockRef>, CoreError> {
927 todo!()
928 }
929
930 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
931 Ok(())
932 }
933
934 async fn get_missing_blocks(
935 &self,
936 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
937 Ok(Default::default())
938 }
939
940 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
941 todo!()
942 }
943
944 fn set_propagation_delay_and_quorum_rounds(
945 &self,
946 _delay: Round,
947 _received_quorum_rounds: Vec<QuorumRound>,
948 _accepted_quorum_rounds: Vec<QuorumRound>,
949 ) -> Result<(), CoreError> {
950 todo!()
951 }
952
953 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
954 todo!()
955 }
956
957 fn highest_received_rounds(&self) -> Vec<Round> {
958 todo!()
959 }
960 }
961
962 #[derive(Default)]
963 struct FakeNetworkClient {}
964
965 #[async_trait]
966 impl NetworkClient for FakeNetworkClient {
967 const SUPPORT_STREAMING: bool = false;
968
969 async fn send_block(
970 &self,
971 _peer: AuthorityIndex,
972 _block: &VerifiedBlock,
973 _timeout: Duration,
974 ) -> ConsensusResult<()> {
975 unimplemented!("Unimplemented")
976 }
977
978 async fn subscribe_blocks(
979 &self,
980 _peer: AuthorityIndex,
981 _last_received: Round,
982 _timeout: Duration,
983 ) -> ConsensusResult<BlockStream> {
984 unimplemented!("Unimplemented")
985 }
986
987 async fn fetch_blocks(
988 &self,
989 _peer: AuthorityIndex,
990 _block_refs: Vec<BlockRef>,
991 _highest_accepted_rounds: Vec<Round>,
992 _timeout: Duration,
993 ) -> ConsensusResult<Vec<Bytes>> {
994 unimplemented!("Unimplemented")
995 }
996
997 async fn fetch_commits(
998 &self,
999 _peer: AuthorityIndex,
1000 _commit_range: CommitRange,
1001 _timeout: Duration,
1002 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1003 unimplemented!("Unimplemented")
1004 }
1005
1006 async fn fetch_latest_blocks(
1007 &self,
1008 _peer: AuthorityIndex,
1009 _authorities: Vec<AuthorityIndex>,
1010 _timeout: Duration,
1011 ) -> ConsensusResult<Vec<Bytes>> {
1012 unimplemented!("Unimplemented")
1013 }
1014
1015 async fn get_latest_rounds(
1016 &self,
1017 _peer: AuthorityIndex,
1018 _timeout: Duration,
1019 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1020 unimplemented!("Unimplemented")
1021 }
1022 }
1023
1024 #[rstest]
1025 #[tokio::test(flavor = "current_thread", start_paused = true)]
1026 async fn test_handle_send_block(#[values(false, true)] median_based_timestamp: bool) {
1027 let (mut context, _keys) = Context::new_for_test(4);
1028 context
1029 .protocol_config
1030 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1031 median_based_timestamp,
1032 );
1033 let context = Arc::new(context);
1034 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1035 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1036 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1037 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1038 let network_client = Arc::new(FakeNetworkClient::default());
1039 let store = Arc::new(MemStore::new());
1040 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1041 let synchronizer = Synchronizer::start(
1042 network_client,
1043 context.clone(),
1044 core_dispatcher.clone(),
1045 commit_vote_monitor.clone(),
1046 block_verifier.clone(),
1047 dag_state.clone(),
1048 false,
1049 );
1050 let authority_service = Arc::new(AuthorityService::new(
1051 context.clone(),
1052 block_verifier,
1053 commit_vote_monitor,
1054 synchronizer,
1055 core_dispatcher.clone(),
1056 rx_block_broadcast,
1057 dag_state,
1058 store,
1059 ));
1060
1061 let now = context.clock.timestamp_utc_ms();
1063 let max_drift = context.parameters.max_forward_time_drift;
1064 let input_block = VerifiedBlock::new_for_test(
1065 TestBlock::new(9, 0)
1066 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1067 .build(),
1068 );
1069
1070 let service = authority_service.clone();
1071 let serialized = ExtendedSerializedBlock {
1072 block: input_block.serialized().clone(),
1073 excluded_ancestors: vec![],
1074 };
1075
1076 tokio::spawn(async move {
1077 service
1078 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1079 .await
1080 .unwrap();
1081 });
1082
1083 sleep(max_drift / 2).await;
1084
1085 if !median_based_timestamp {
1086 assert!(core_dispatcher.get_blocks().is_empty());
1087 sleep(max_drift).await;
1088 }
1089
1090 let blocks = core_dispatcher.get_blocks();
1091 assert_eq!(blocks.len(), 1);
1092 assert_eq!(blocks[0], input_block);
1093 }
1094
1095 #[tokio::test(flavor = "current_thread", start_paused = true)]
1096 async fn test_handle_fetch_latest_blocks() {
1097 let (context, _keys) = Context::new_for_test(4);
1099 let context = Arc::new(context);
1100 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1101 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1102 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1103 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1104 let network_client = Arc::new(FakeNetworkClient::default());
1105 let store = Arc::new(MemStore::new());
1106 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1107 let synchronizer = Synchronizer::start(
1108 network_client,
1109 context.clone(),
1110 core_dispatcher.clone(),
1111 commit_vote_monitor.clone(),
1112 block_verifier.clone(),
1113 dag_state.clone(),
1114 true,
1115 );
1116 let authority_service = Arc::new(AuthorityService::new(
1117 context.clone(),
1118 block_verifier,
1119 commit_vote_monitor,
1120 synchronizer,
1121 core_dispatcher.clone(),
1122 rx_block_broadcast,
1123 dag_state.clone(),
1124 store,
1125 ));
1126
1127 let mut dag_builder = DagBuilder::new(context.clone());
1130 dag_builder
1131 .layers(1..=10)
1132 .authorities(vec![AuthorityIndex::new_for_test(2)])
1133 .equivocate(1)
1134 .build()
1135 .persist_layers(dag_state);
1136
1137 let authorities_to_request = vec![
1139 AuthorityIndex::new_for_test(1),
1140 AuthorityIndex::new_for_test(2),
1141 ];
1142 let results = authority_service
1143 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1144 .await;
1145
1146 let serialised_blocks = results.unwrap();
1148 for serialised_block in serialised_blocks {
1149 let signed_block: SignedBlock =
1150 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1151 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1152
1153 assert_eq!(verified_block.round(), 10);
1154 }
1155 }
1156}