1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18 CommitIndex, Round,
19 block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 commit::{CommitAPI as _, CommitRange, TrustedCommit},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::Context,
24 core_thread::CoreThreadDispatcher,
25 dag_state::DagState,
26 error::{ConsensusError, ConsensusResult},
27 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28 stake_aggregator::{QuorumThreshold, StakeAggregator},
29 storage::Store,
30 synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38 context: Arc<Context>,
39 commit_vote_monitor: Arc<CommitVoteMonitor>,
40 block_verifier: Arc<dyn BlockVerifier>,
41 synchronizer: Arc<SynchronizerHandle>,
42 core_dispatcher: Arc<C>,
43 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44 subscription_counter: Arc<SubscriptionCounter>,
45 dag_state: Arc<RwLock<DagState>>,
46 store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50 pub(crate) fn new(
51 context: Arc<Context>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 commit_vote_monitor: Arc<CommitVoteMonitor>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57 dag_state: Arc<RwLock<DagState>>,
58 store: Arc<dyn Store>,
59 ) -> Self {
60 let subscription_counter = Arc::new(SubscriptionCounter::new(
61 context.clone(),
62 core_dispatcher.clone(),
63 ));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcaster,
71 subscription_counter,
72 dag_state,
73 store,
74 }
75 }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80 async fn handle_send_block(
81 &self,
82 peer: AuthorityIndex,
83 serialized_block: ExtendedSerializedBlock,
84 ) -> ConsensusResult<()> {
85 fail_point_async!("consensus-rpc-response");
86 let _s = self
87 .context
88 .metrics
89 .node_metrics
90 .scope_processing_time
91 .with_label_values(&["AuthorityService::handle_stream"])
92 .start_timer();
93 let peer_hostname = &self.context.committee.authority(peer).hostname;
94
95 let signed_block: SignedBlock =
97 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
98
99 if peer != signed_block.author() {
101 self.context
102 .metrics
103 .node_metrics
104 .invalid_blocks
105 .with_label_values(&[
106 peer_hostname.as_str(),
107 "handle_send_block",
108 "UnexpectedAuthority",
109 ])
110 .inc();
111 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
112 info!("Block with wrong authority from {}: {}", peer, e);
113 return Err(e);
114 }
115 let peer_hostname = &self.context.committee.authority(peer).hostname;
116
117 if let Err(e) = self.block_verifier.verify(&signed_block) {
119 self.context
120 .metrics
121 .node_metrics
122 .invalid_blocks
123 .with_label_values(&[
124 peer_hostname.as_str(),
125 "handle_send_block",
126 e.clone().name(),
127 ])
128 .inc();
129 info!("Invalid block from {}: {}", peer, e);
130 return Err(e);
131 }
132 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
133 let block_ref = verified_block.reference();
134 debug!("Received block {} via send block.", block_ref);
135
136 let now = self.context.clock.timestamp_utc_ms();
137 let forward_time_drift =
138 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
139 let latency_to_process_stream =
140 Duration::from_millis(now.saturating_sub(verified_block.timestamp_ms()));
141 self.context
142 .metrics
143 .node_metrics
144 .latency_to_process_stream
145 .with_label_values(&[peer_hostname.as_str()])
146 .observe(latency_to_process_stream.as_secs_f64());
147
148 if !self
149 .context
150 .protocol_config
151 .consensus_median_timestamp_with_checkpoint_enforcement()
152 {
153 if forward_time_drift > self.context.parameters.max_forward_time_drift {
155 self.context
156 .metrics
157 .node_metrics
158 .rejected_future_blocks
159 .with_label_values(&[peer_hostname])
160 .inc();
161 debug!(
162 "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
163 block_ref,
164 verified_block.timestamp_ms(),
165 now,
166 );
167 return Err(ConsensusError::BlockRejected {
168 block_ref,
169 reason: format!(
170 "Block timestamp is too far in the future: {} > {}",
171 verified_block.timestamp_ms(),
172 now
173 ),
174 });
175 }
176
177 if forward_time_drift > Duration::ZERO {
179 self.context
180 .metrics
181 .node_metrics
182 .block_timestamp_drift_ms
183 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
184 .inc_by(forward_time_drift.as_millis() as u64);
185 debug!(
186 "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
187 block_ref,
188 verified_block.timestamp_ms(),
189 now,
190 forward_time_drift.as_millis(),
191 );
192 sleep(forward_time_drift).await;
193 }
194 } else {
195 self.context
196 .metrics
197 .node_metrics
198 .block_timestamp_drift_ms
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
200 .inc_by(forward_time_drift.as_millis() as u64);
201 }
202
203 self.commit_vote_monitor.observe_block(&verified_block);
206
207 let last_commit_index = self.dag_state.read().last_commit_index();
216 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
217 if last_commit_index
220 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
221 < quorum_commit_index
222 {
223 self.context
224 .metrics
225 .node_metrics
226 .rejected_blocks
227 .with_label_values(&["commit_lagging"])
228 .inc();
229 debug!(
230 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
231 block_ref, last_commit_index, quorum_commit_index,
232 );
233 return Err(ConsensusError::BlockRejected {
234 block_ref,
235 reason: format!(
236 "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
237 ),
238 });
239 }
240
241 self.context
242 .metrics
243 .node_metrics
244 .verified_blocks
245 .with_label_values(&[peer_hostname])
246 .inc();
247
248 let missing_ancestors = self
249 .core_dispatcher
250 .add_blocks(vec![verified_block])
251 .await
252 .map_err(|_| ConsensusError::Shutdown)?;
253 if !missing_ancestors.is_empty() {
254 if let Err(err) = self
256 .synchronizer
257 .fetch_blocks(missing_ancestors, peer)
258 .await
259 {
260 warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
261 }
262 }
263
264 let mut excluded_ancestors = serialized_block
267 .excluded_ancestors
268 .into_iter()
269 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
270 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
271 .map_err(ConsensusError::MalformedBlock)?;
272
273 let excluded_ancestors_limit = self.context.committee.size() * 2;
274 if excluded_ancestors.len() > excluded_ancestors_limit {
275 debug!(
276 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
277 excluded_ancestors.len() - excluded_ancestors_limit,
278 peer,
279 peer_hostname,
280 );
281 excluded_ancestors.truncate(excluded_ancestors_limit);
282 }
283
284 self.context
285 .metrics
286 .node_metrics
287 .network_received_excluded_ancestors_from_authority
288 .with_label_values(&[peer_hostname])
289 .inc_by(excluded_ancestors.len() as u64);
290
291 for excluded_ancestor in &excluded_ancestors {
292 let excluded_ancestor_hostname = &self
293 .context
294 .committee
295 .authority(excluded_ancestor.author)
296 .hostname;
297 self.context
298 .metrics
299 .node_metrics
300 .network_excluded_ancestors_count_by_authority
301 .with_label_values(&[excluded_ancestor_hostname])
302 .inc();
303 }
304
305 let missing_excluded_ancestors = self
306 .core_dispatcher
307 .check_block_refs(excluded_ancestors)
308 .await
309 .map_err(|_| ConsensusError::Shutdown)?;
310
311 if !missing_excluded_ancestors.is_empty() {
312 self.context
313 .metrics
314 .node_metrics
315 .network_excluded_ancestors_sent_to_fetch
316 .with_label_values(&[peer_hostname])
317 .inc_by(missing_excluded_ancestors.len() as u64);
318
319 let synchronizer = self.synchronizer.clone();
320 tokio::spawn(async move {
321 if let Err(err) = synchronizer
323 .fetch_blocks(missing_excluded_ancestors, peer)
324 .await
325 {
326 warn!(
327 "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
328 );
329 }
330 });
331 }
332
333 Ok(())
334 }
335
336 async fn handle_subscribe_blocks(
337 &self,
338 peer: AuthorityIndex,
339 last_received: Round,
340 ) -> ConsensusResult<BlockStream> {
341 fail_point_async!("consensus-rpc-response");
342
343 let dag_state = self.dag_state.read();
344 let missed_blocks = stream::iter(
349 dag_state
350 .get_cached_blocks(self.context.own_index, last_received + 1)
351 .into_iter()
352 .map(|block| ExtendedSerializedBlock {
353 block: block.serialized().clone(),
354 excluded_ancestors: vec![],
355 }),
356 );
357
358 let broadcasted_blocks = BroadcastedBlockStream::new(
359 peer,
360 self.rx_block_broadcaster.resubscribe(),
361 self.subscription_counter.clone(),
362 );
363
364 Ok(Box::pin(missed_blocks.chain(
367 broadcasted_blocks.map(ExtendedSerializedBlock::from),
368 )))
369 }
370
371 async fn handle_fetch_blocks(
379 &self,
380 peer: AuthorityIndex,
381 mut block_refs: Vec<BlockRef>,
382 highest_accepted_rounds: Vec<Round>,
383 ) -> ConsensusResult<Vec<Bytes>> {
384 let commit_sync_handle = highest_accepted_rounds.is_empty();
388
389 fail_point_async!("consensus-rpc-response");
390
391 for block in &block_refs {
393 if !self.context.committee.is_valid_index(block.author) {
394 return Err(ConsensusError::InvalidAuthorityIndex {
395 index: block.author,
396 max: self.context.committee.size(),
397 });
398 }
399 if block.round == GENESIS_ROUND {
400 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
401 }
402 }
403
404 if !self.context.protocol_config.consensus_batched_block_sync() {
405 if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
406 return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
407 }
408
409 if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
410 {
411 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
412 highest_accepted_rounds.len(),
413 self.context.committee.size(),
414 ));
415 }
416
417 let blocks = self.dag_state.read().get_blocks(&block_refs);
419
420 let mut ancestor_blocks = vec![];
423 if !commit_sync_handle {
424 let all_ancestors = blocks
425 .iter()
426 .flatten()
427 .flat_map(|block| block.ancestors().to_vec())
428 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
429 .take(MAX_ADDITIONAL_BLOCKS)
430 .collect::<Vec<_>>();
431
432 if !all_ancestors.is_empty() {
433 ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
434 }
435 }
436
437 let result = blocks
439 .into_iter()
440 .chain(ancestor_blocks)
441 .flatten()
442 .map(|block| block.serialized().clone())
443 .collect::<Vec<_>>();
444
445 return Ok(result);
446 }
447
448 if commit_sync_handle {
452 block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
453 } else {
454 block_refs.truncate(self.context.parameters.max_blocks_per_sync);
455 }
456
457 let blocks = if commit_sync_handle {
459 let gc_round = self.dag_state.read().gc_round();
461
462 let (below_gc, above_gc): (Vec<_>, Vec<_>) = block_refs
464 .iter()
465 .partition(|block_ref| block_ref.round < gc_round);
466
467 let mut blocks = Vec::new();
468
469 if !below_gc.is_empty() {
471 let store_blocks = self
472 .store
473 .read_blocks(&below_gc)?
474 .into_iter()
475 .flatten()
476 .collect::<Vec<_>>();
477 blocks.extend(store_blocks);
478 }
479
480 if !above_gc.is_empty() {
482 let dag_blocks = self
483 .dag_state
484 .read()
485 .get_blocks(&above_gc)
486 .into_iter()
487 .flatten()
488 .collect::<Vec<_>>();
489 blocks.extend(dag_blocks);
490 }
491
492 blocks
493 } else {
494 block_refs.sort();
497 block_refs.dedup();
498 let dag_state = self.dag_state.read();
499 let mut blocks = dag_state
500 .get_blocks(&block_refs)
501 .into_iter()
502 .flatten()
503 .collect::<Vec<_>>();
504
505 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
509 for block_ref in blocks.iter().map(|b| b.reference()) {
510 let entry = lowest_missing_rounds
511 .entry(block_ref.author)
512 .or_insert(block_ref.round);
513 *entry = (*entry).min(block_ref.round);
514 }
515
516 let own_index = self.context.own_index;
520
521 let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
523 ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
524
525 for (authority, lowest_missing_round) in ordered_missing_rounds {
526 let highest_accepted_round = highest_accepted_rounds[authority];
527 if highest_accepted_round >= lowest_missing_round {
528 continue;
529 }
530
531 let missing_blocks = dag_state.get_cached_blocks_in_range(
532 authority,
533 highest_accepted_round + 1,
534 lowest_missing_round,
535 self.context
536 .parameters
537 .max_blocks_per_sync
538 .saturating_sub(blocks.len()),
539 );
540 blocks.extend(missing_blocks);
541 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
542 blocks.truncate(self.context.parameters.max_blocks_per_sync);
543 break;
544 }
545 }
546
547 blocks
548 };
549
550 let bytes = blocks
552 .into_iter()
553 .map(|block| block.serialized().clone())
554 .collect::<Vec<_>>();
555 Ok(bytes)
556 }
557
558 async fn handle_fetch_commits(
559 &self,
560 _peer: AuthorityIndex,
561 commit_range: CommitRange,
562 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
563 fail_point_async!("consensus-rpc-response");
564
565 let inclusive_end = commit_range.end().min(
568 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
569 - 1,
570 );
571 let mut commits = self
572 .store
573 .scan_commits((commit_range.start()..=inclusive_end).into())?;
574 let mut certifier_block_refs = vec![];
575 'commit: while let Some(c) = commits.last() {
576 let index = c.index();
577 let votes = self.store.read_commit_votes(index)?;
578 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
579 for v in &votes {
580 stake_aggregator.add(v.author, &self.context.committee);
581 }
582 if stake_aggregator.reached_threshold(&self.context.committee) {
583 certifier_block_refs = votes;
584 break 'commit;
585 } else {
586 debug!(
587 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
588 index,
589 stake_aggregator.stake(),
590 stake_aggregator.threshold(&self.context.committee)
591 );
592 self.context
593 .metrics
594 .node_metrics
595 .commit_sync_fetch_commits_handler_uncertified_skipped
596 .inc();
597 commits.pop();
598 }
599 }
600 let certifier_blocks = self
601 .store
602 .read_blocks(&certifier_block_refs)?
603 .into_iter()
604 .flatten()
605 .collect();
606 Ok((commits, certifier_blocks))
607 }
608
609 async fn handle_fetch_latest_blocks(
610 &self,
611 peer: AuthorityIndex,
612 authorities: Vec<AuthorityIndex>,
613 ) -> ConsensusResult<Vec<Bytes>> {
614 fail_point_async!("consensus-rpc-response");
615
616 if authorities.len() > self.context.committee.size() {
617 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
618 }
619
620 for authority in &authorities {
622 if !self.context.committee.is_valid_index(*authority) {
623 return Err(ConsensusError::InvalidAuthorityIndex {
624 index: *authority,
625 max: self.context.committee.size(),
626 });
627 }
628 }
629
630 let mut blocks = vec![];
635 let dag_state = self.dag_state.read();
636 for authority in authorities {
637 let block = dag_state.get_last_block_for_authority(authority);
638
639 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
640
641 if block.round() != GENESIS_ROUND {
644 blocks.push(block);
645 }
646 }
647
648 let result = blocks
650 .into_iter()
651 .map(|block| block.serialized().clone())
652 .collect::<Vec<_>>();
653
654 Ok(result)
655 }
656
657 async fn handle_get_latest_rounds(
658 &self,
659 _peer: AuthorityIndex,
660 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
661 fail_point_async!("consensus-rpc-response");
662
663 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
664
665 let blocks = self
666 .dag_state
667 .read()
668 .get_last_cached_block_per_authority(Round::MAX);
669 let highest_accepted_rounds = blocks
670 .into_iter()
671 .map(|(block, _)| block.round())
672 .collect::<Vec<_>>();
673
674 highest_received_rounds[self.context.own_index] =
677 highest_accepted_rounds[self.context.own_index];
678
679 Ok((highest_received_rounds, highest_accepted_rounds))
680 }
681}
682
683struct Counter {
684 count: usize,
685 subscriptions_by_authority: Vec<usize>,
686}
687
688struct SubscriptionCounter {
691 context: Arc<Context>,
692 counter: parking_lot::Mutex<Counter>,
693 dispatcher: Arc<dyn CoreThreadDispatcher>,
694}
695
696impl SubscriptionCounter {
697 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
698 for (_, authority) in context.committee.authorities() {
700 context
701 .metrics
702 .node_metrics
703 .subscribed_by
704 .with_label_values(&[authority.hostname.as_str()])
705 .set(0);
706 }
707
708 Self {
709 counter: parking_lot::Mutex::new(Counter {
710 count: 0,
711 subscriptions_by_authority: vec![0; context.committee.size()],
712 }),
713 dispatcher,
714 context,
715 }
716 }
717
718 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
719 let mut counter = self.counter.lock();
720 counter.count += 1;
721 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
722 counter.subscriptions_by_authority[peer] += 1;
723 let mut total_stake = 0;
724 for (authority_index, _) in self.context.committee.authorities() {
725 if counter.subscriptions_by_authority[authority_index] >= 1
726 || self.context.own_index == authority_index
727 {
728 total_stake += self.context.committee.stake(authority_index);
729 }
730 }
731 let previous_stake = if original_subscription_by_peer == 0 {
733 total_stake - self.context.committee.stake(peer)
734 } else {
735 total_stake
736 };
737
738 let peer_hostname = &self.context.committee.authority(peer).hostname;
739 self.context
740 .metrics
741 .node_metrics
742 .subscribed_by
743 .with_label_values(&[peer_hostname])
744 .set(1);
745 if !self.context.committee.reached_quorum(previous_stake)
748 && self.context.committee.reached_quorum(total_stake)
749 {
750 self.dispatcher
751 .set_quorum_subscribers_exists(true)
752 .map_err(|_| ConsensusError::Shutdown)?;
753 }
754 drop(counter);
756 Ok(())
757 }
758
759 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
760 let mut counter = self.counter.lock();
761 counter.count -= 1;
762 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
763 counter.subscriptions_by_authority[peer] -= 1;
764 let mut total_stake = 0;
765 for (authority_index, _) in self.context.committee.authorities() {
766 if counter.subscriptions_by_authority[authority_index] >= 1
767 || self.context.own_index == authority_index
768 {
769 total_stake += self.context.committee.stake(authority_index);
770 }
771 }
772 let previous_stake = if original_subscription_by_peer == 1 {
774 total_stake + self.context.committee.stake(peer)
775 } else {
776 total_stake
777 };
778
779 if counter.subscriptions_by_authority[peer] == 0 {
780 let peer_hostname = &self.context.committee.authority(peer).hostname;
781 self.context
782 .metrics
783 .node_metrics
784 .subscribed_by
785 .with_label_values(&[peer_hostname])
786 .set(0);
787 }
788
789 if self.context.committee.reached_quorum(previous_stake)
792 && !self.context.committee.reached_quorum(total_stake)
793 {
794 self.dispatcher
795 .set_quorum_subscribers_exists(false)
796 .map_err(|_| ConsensusError::Shutdown)?;
797 }
798 drop(counter);
800 Ok(())
801 }
802}
803
804type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
807
808struct BroadcastStream<T> {
811 peer: AuthorityIndex,
812 inner: ReusableBoxFuture<
814 'static,
815 (
816 Result<T, broadcast::error::RecvError>,
817 broadcast::Receiver<T>,
818 ),
819 >,
820 subscription_counter: Arc<SubscriptionCounter>,
822}
823
824impl<T: 'static + Clone + Send> BroadcastStream<T> {
825 pub fn new(
826 peer: AuthorityIndex,
827 rx: broadcast::Receiver<T>,
828 subscription_counter: Arc<SubscriptionCounter>,
829 ) -> Self {
830 if let Err(err) = subscription_counter.increment(peer) {
831 match err {
832 ConsensusError::Shutdown => {}
833 _ => panic!("Unexpected error: {err}"),
834 }
835 }
836 Self {
837 peer,
838 inner: ReusableBoxFuture::new(make_recv_future(rx)),
839 subscription_counter,
840 }
841 }
842}
843
844impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
845 type Item = T;
846
847 fn poll_next(
848 mut self: Pin<&mut Self>,
849 cx: &mut task::Context<'_>,
850 ) -> task::Poll<Option<Self::Item>> {
851 let peer = self.peer;
852 let maybe_item = loop {
853 let (result, rx) = ready!(self.inner.poll(cx));
854 self.inner.set(make_recv_future(rx));
855
856 match result {
857 Ok(item) => break Some(item),
858 Err(broadcast::error::RecvError::Closed) => {
859 info!("Block BroadcastedBlockStream {} closed", peer);
860 break None;
861 }
862 Err(broadcast::error::RecvError::Lagged(n)) => {
863 warn!(
864 "Block BroadcastedBlockStream {} lagged by {} messages",
865 peer, n
866 );
867 continue;
868 }
869 }
870 };
871 task::Poll::Ready(maybe_item)
872 }
873}
874
875impl<T> Drop for BroadcastStream<T> {
876 fn drop(&mut self) {
877 if let Err(err) = self.subscription_counter.decrement(self.peer) {
878 match err {
879 ConsensusError::Shutdown => {}
880 _ => panic!("Unexpected error: {err}"),
881 }
882 }
883 }
884}
885
886async fn make_recv_future<T: Clone>(
887 mut rx: broadcast::Receiver<T>,
888) -> (
889 Result<T, broadcast::error::RecvError>,
890 broadcast::Receiver<T>,
891) {
892 let result = rx.recv().await;
893 (result, rx)
894}
895
896#[cfg(test)]
899pub(crate) mod tests {
900 use std::{
901 collections::{BTreeMap, BTreeSet},
902 sync::Arc,
903 time::Duration,
904 };
905
906 use async_trait::async_trait;
907 use bytes::Bytes;
908 use consensus_config::AuthorityIndex;
909 use parking_lot::{Mutex, RwLock};
910 use rstest::rstest;
911 use tokio::{sync::broadcast, time::sleep};
912
913 use crate::{
914 Round,
915 authority_service::AuthorityService,
916 block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
917 commit::{CertifiedCommits, CommitRange},
918 commit_vote_monitor::CommitVoteMonitor,
919 context::Context,
920 core_thread::{CoreError, CoreThreadDispatcher},
921 dag_state::DagState,
922 error::ConsensusResult,
923 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
924 round_prober::QuorumRound,
925 storage::mem_store::MemStore,
926 synchronizer::Synchronizer,
927 test_dag_builder::DagBuilder,
928 };
929
930 pub(crate) struct FakeCoreThreadDispatcher {
931 blocks: Mutex<Vec<VerifiedBlock>>,
932 }
933
934 impl FakeCoreThreadDispatcher {
935 pub(crate) fn new() -> Self {
936 Self {
937 blocks: Mutex::new(vec![]),
938 }
939 }
940
941 fn get_blocks(&self) -> Vec<VerifiedBlock> {
942 self.blocks.lock().clone()
943 }
944 }
945
946 #[async_trait]
947 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
948 async fn add_blocks(
949 &self,
950 blocks: Vec<VerifiedBlock>,
951 ) -> Result<BTreeSet<BlockRef>, CoreError> {
952 let block_refs = blocks.iter().map(|b| b.reference()).collect();
953 self.blocks.lock().extend(blocks);
954 Ok(block_refs)
955 }
956
957 async fn check_block_refs(
958 &self,
959 _block_refs: Vec<BlockRef>,
960 ) -> Result<BTreeSet<BlockRef>, CoreError> {
961 Ok(BTreeSet::new())
962 }
963
964 async fn add_certified_commits(
965 &self,
966 _commits: CertifiedCommits,
967 ) -> Result<BTreeSet<BlockRef>, CoreError> {
968 todo!()
969 }
970
971 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
972 Ok(())
973 }
974
975 async fn get_missing_blocks(
976 &self,
977 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
978 Ok(Default::default())
979 }
980
981 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
982 todo!()
983 }
984
985 fn set_propagation_delay_and_quorum_rounds(
986 &self,
987 _delay: Round,
988 _received_quorum_rounds: Vec<QuorumRound>,
989 _accepted_quorum_rounds: Vec<QuorumRound>,
990 ) -> Result<(), CoreError> {
991 todo!()
992 }
993
994 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
995 todo!()
996 }
997
998 fn highest_received_rounds(&self) -> Vec<Round> {
999 todo!()
1000 }
1001 }
1002
1003 #[derive(Default)]
1004 struct FakeNetworkClient {}
1005
1006 #[async_trait]
1007 impl NetworkClient for FakeNetworkClient {
1008 const SUPPORT_STREAMING: bool = false;
1009
1010 async fn send_block(
1011 &self,
1012 _peer: AuthorityIndex,
1013 _block: &VerifiedBlock,
1014 _timeout: Duration,
1015 ) -> ConsensusResult<()> {
1016 unimplemented!("Unimplemented")
1017 }
1018
1019 async fn subscribe_blocks(
1020 &self,
1021 _peer: AuthorityIndex,
1022 _last_received: Round,
1023 _timeout: Duration,
1024 ) -> ConsensusResult<BlockStream> {
1025 unimplemented!("Unimplemented")
1026 }
1027
1028 async fn fetch_blocks(
1029 &self,
1030 _peer: AuthorityIndex,
1031 _block_refs: Vec<BlockRef>,
1032 _highest_accepted_rounds: Vec<Round>,
1033 _timeout: Duration,
1034 ) -> ConsensusResult<Vec<Bytes>> {
1035 unimplemented!("Unimplemented")
1036 }
1037
1038 async fn fetch_commits(
1039 &self,
1040 _peer: AuthorityIndex,
1041 _commit_range: CommitRange,
1042 _timeout: Duration,
1043 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1044 unimplemented!("Unimplemented")
1045 }
1046
1047 async fn fetch_latest_blocks(
1048 &self,
1049 _peer: AuthorityIndex,
1050 _authorities: Vec<AuthorityIndex>,
1051 _timeout: Duration,
1052 ) -> ConsensusResult<Vec<Bytes>> {
1053 unimplemented!("Unimplemented")
1054 }
1055
1056 async fn get_latest_rounds(
1057 &self,
1058 _peer: AuthorityIndex,
1059 _timeout: Duration,
1060 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1061 unimplemented!("Unimplemented")
1062 }
1063 }
1064
1065 #[rstest]
1066 #[tokio::test(flavor = "current_thread", start_paused = true)]
1067 async fn test_handle_send_block(#[values(false, true)] median_based_timestamp: bool) {
1068 let (mut context, _keys) = Context::new_for_test(4);
1069 context
1070 .protocol_config
1071 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1072 median_based_timestamp,
1073 );
1074 let context = Arc::new(context);
1075 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1076 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1077 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1078 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1079 let network_client = Arc::new(FakeNetworkClient::default());
1080 let store = Arc::new(MemStore::new());
1081 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1082 let synchronizer = Synchronizer::start(
1083 network_client,
1084 context.clone(),
1085 core_dispatcher.clone(),
1086 commit_vote_monitor.clone(),
1087 block_verifier.clone(),
1088 dag_state.clone(),
1089 false,
1090 );
1091 let authority_service = Arc::new(AuthorityService::new(
1092 context.clone(),
1093 block_verifier,
1094 commit_vote_monitor,
1095 synchronizer,
1096 core_dispatcher.clone(),
1097 rx_block_broadcast,
1098 dag_state,
1099 store,
1100 ));
1101
1102 let now = context.clock.timestamp_utc_ms();
1104 let max_drift = context.parameters.max_forward_time_drift;
1105 let input_block = VerifiedBlock::new_for_test(
1106 TestBlock::new(9, 0)
1107 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1108 .build(),
1109 );
1110
1111 let service = authority_service.clone();
1112 let serialized = ExtendedSerializedBlock {
1113 block: input_block.serialized().clone(),
1114 excluded_ancestors: vec![],
1115 };
1116
1117 tokio::spawn(async move {
1118 service
1119 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1120 .await
1121 .unwrap();
1122 });
1123
1124 sleep(max_drift / 2).await;
1125
1126 if !median_based_timestamp {
1127 assert!(core_dispatcher.get_blocks().is_empty());
1128 sleep(max_drift).await;
1129 }
1130
1131 let blocks = core_dispatcher.get_blocks();
1132 assert_eq!(blocks.len(), 1);
1133 assert_eq!(blocks[0], input_block);
1134 }
1135
1136 #[tokio::test(flavor = "current_thread", start_paused = true)]
1137 async fn test_handle_fetch_latest_blocks() {
1138 let (context, _keys) = Context::new_for_test(4);
1140 let context = Arc::new(context);
1141 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1142 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1143 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1144 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1145 let network_client = Arc::new(FakeNetworkClient::default());
1146 let store = Arc::new(MemStore::new());
1147 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1148 let synchronizer = Synchronizer::start(
1149 network_client,
1150 context.clone(),
1151 core_dispatcher.clone(),
1152 commit_vote_monitor.clone(),
1153 block_verifier.clone(),
1154 dag_state.clone(),
1155 true,
1156 );
1157 let authority_service = Arc::new(AuthorityService::new(
1158 context.clone(),
1159 block_verifier,
1160 commit_vote_monitor,
1161 synchronizer,
1162 core_dispatcher.clone(),
1163 rx_block_broadcast,
1164 dag_state.clone(),
1165 store,
1166 ));
1167
1168 let mut dag_builder = DagBuilder::new(context.clone());
1171 dag_builder
1172 .layers(1..=10)
1173 .authorities(vec![AuthorityIndex::new_for_test(2)])
1174 .equivocate(1)
1175 .build()
1176 .persist_layers(dag_state);
1177
1178 let authorities_to_request = vec![
1180 AuthorityIndex::new_for_test(1),
1181 AuthorityIndex::new_for_test(2),
1182 ];
1183 let results = authority_service
1184 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1185 .await;
1186
1187 let serialised_blocks = results.unwrap();
1189 for serialised_block in serialised_blocks {
1190 let signed_block: SignedBlock =
1191 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1192 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1193
1194 assert_eq!(verified_block.round(), 10);
1195 }
1196 }
1197}