1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18 CommitIndex, Round,
19 block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 commit::{CommitAPI as _, CommitRange, TrustedCommit},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::Context,
24 core_thread::CoreThreadDispatcher,
25 dag_state::DagState,
26 error::{ConsensusError, ConsensusResult},
27 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28 stake_aggregator::{QuorumThreshold, StakeAggregator},
29 storage::Store,
30 synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38 context: Arc<Context>,
39 commit_vote_monitor: Arc<CommitVoteMonitor>,
40 block_verifier: Arc<dyn BlockVerifier>,
41 synchronizer: Arc<SynchronizerHandle>,
42 core_dispatcher: Arc<C>,
43 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44 subscription_counter: Arc<SubscriptionCounter>,
45 dag_state: Arc<RwLock<DagState>>,
46 store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50 pub(crate) fn new(
51 context: Arc<Context>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 commit_vote_monitor: Arc<CommitVoteMonitor>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57 dag_state: Arc<RwLock<DagState>>,
58 store: Arc<dyn Store>,
59 ) -> Self {
60 let subscription_counter = Arc::new(SubscriptionCounter::new(
61 context.clone(),
62 core_dispatcher.clone(),
63 ));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcaster,
71 subscription_counter,
72 dag_state,
73 store,
74 }
75 }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80 async fn handle_send_block(
81 &self,
82 peer: AuthorityIndex,
83 serialized_block: ExtendedSerializedBlock,
84 ) -> ConsensusResult<()> {
85 fail_point_async!("consensus-rpc-response");
86 let _s = self
87 .context
88 .metrics
89 .node_metrics
90 .scope_processing_time
91 .with_label_values(&["AuthorityService::handle_stream"])
92 .start_timer();
93 let peer_hostname = &self.context.committee.authority(peer).hostname;
94
95 let signed_block: SignedBlock =
97 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
98
99 if peer != signed_block.author() {
101 self.context
102 .metrics
103 .node_metrics
104 .invalid_blocks
105 .with_label_values(&[
106 peer_hostname.as_str(),
107 "handle_send_block",
108 "UnexpectedAuthority",
109 ])
110 .inc();
111 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
112 info!("Block with wrong authority from {}: {}", peer, e);
113 return Err(e);
114 }
115 let peer_hostname = &self.context.committee.authority(peer).hostname;
116
117 if let Err(e) = self.block_verifier.verify(&signed_block) {
119 self.context
120 .metrics
121 .node_metrics
122 .invalid_blocks
123 .with_label_values(&[
124 peer_hostname.as_str(),
125 "handle_send_block",
126 e.clone().name(),
127 ])
128 .inc();
129 info!("Invalid block from {}: {}", peer, e);
130 return Err(e);
131 }
132 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
133 let block_ref = verified_block.reference();
134 debug!("Received block {} via send block.", block_ref);
135
136 let now = self.context.clock.timestamp_utc_ms();
137 let forward_time_drift =
138 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
139 let latency_to_process_stream =
140 Duration::from_millis(now.saturating_sub(verified_block.timestamp_ms()));
141 self.context
142 .metrics
143 .node_metrics
144 .latency_to_process_stream
145 .with_label_values(&[peer_hostname.as_str()])
146 .observe(latency_to_process_stream.as_secs_f64());
147
148 if !self
149 .context
150 .protocol_config
151 .consensus_median_timestamp_with_checkpoint_enforcement()
152 {
153 if forward_time_drift > self.context.parameters.max_forward_time_drift {
155 self.context
156 .metrics
157 .node_metrics
158 .rejected_future_blocks
159 .with_label_values(&[peer_hostname])
160 .inc();
161 debug!(
162 "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
163 block_ref,
164 verified_block.timestamp_ms(),
165 now,
166 );
167 return Err(ConsensusError::BlockRejected {
168 block_ref,
169 reason: format!(
170 "Block timestamp is too far in the future: {} > {}",
171 verified_block.timestamp_ms(),
172 now
173 ),
174 });
175 }
176
177 if forward_time_drift > Duration::ZERO {
179 self.context
180 .metrics
181 .node_metrics
182 .block_timestamp_drift_ms
183 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
184 .inc_by(forward_time_drift.as_millis() as u64);
185 debug!(
186 "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
187 block_ref,
188 verified_block.timestamp_ms(),
189 now,
190 forward_time_drift.as_millis(),
191 );
192 sleep(forward_time_drift).await;
193 }
194 } else {
195 self.context
196 .metrics
197 .node_metrics
198 .block_timestamp_drift_ms
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
200 .inc_by(forward_time_drift.as_millis() as u64);
201 }
202
203 self.commit_vote_monitor.observe_block(&verified_block);
206
207 let last_commit_index = self.dag_state.read().last_commit_index();
216 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
217 if last_commit_index
220 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
221 < quorum_commit_index
222 {
223 self.context
224 .metrics
225 .node_metrics
226 .rejected_blocks
227 .with_label_values(&["commit_lagging"])
228 .inc();
229 debug!(
230 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
231 block_ref, last_commit_index, quorum_commit_index,
232 );
233 return Err(ConsensusError::BlockRejected {
234 block_ref,
235 reason: format!(
236 "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
237 ),
238 });
239 }
240
241 self.context
242 .metrics
243 .node_metrics
244 .verified_blocks
245 .with_label_values(&[peer_hostname])
246 .inc();
247
248 let missing_ancestors = self
249 .core_dispatcher
250 .add_blocks(vec![verified_block])
251 .await
252 .map_err(|_| ConsensusError::Shutdown)?;
253 if !missing_ancestors.is_empty() {
254 if let Err(err) = self
256 .synchronizer
257 .fetch_blocks(missing_ancestors, peer)
258 .await
259 {
260 warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
261 }
262 }
263
264 let mut excluded_ancestors = serialized_block
267 .excluded_ancestors
268 .into_iter()
269 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
270 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
271 .map_err(ConsensusError::MalformedBlock)?;
272
273 let excluded_ancestors_limit = self.context.committee.size() * 2;
274 if excluded_ancestors.len() > excluded_ancestors_limit {
275 debug!(
276 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
277 excluded_ancestors.len() - excluded_ancestors_limit,
278 peer,
279 peer_hostname,
280 );
281 excluded_ancestors.truncate(excluded_ancestors_limit);
282 }
283
284 self.context
285 .metrics
286 .node_metrics
287 .network_received_excluded_ancestors_from_authority
288 .with_label_values(&[peer_hostname])
289 .inc_by(excluded_ancestors.len() as u64);
290
291 for excluded_ancestor in &excluded_ancestors {
292 let excluded_ancestor_hostname = &self
293 .context
294 .committee
295 .authority(excluded_ancestor.author)
296 .hostname;
297 self.context
298 .metrics
299 .node_metrics
300 .network_excluded_ancestors_count_by_authority
301 .with_label_values(&[excluded_ancestor_hostname])
302 .inc();
303 }
304
305 let missing_excluded_ancestors = self
306 .core_dispatcher
307 .check_block_refs(excluded_ancestors)
308 .await
309 .map_err(|_| ConsensusError::Shutdown)?;
310
311 if !missing_excluded_ancestors.is_empty() {
312 self.context
313 .metrics
314 .node_metrics
315 .network_excluded_ancestors_sent_to_fetch
316 .with_label_values(&[peer_hostname])
317 .inc_by(missing_excluded_ancestors.len() as u64);
318
319 let synchronizer = self.synchronizer.clone();
320 tokio::spawn(async move {
321 if let Err(err) = synchronizer
323 .fetch_blocks(missing_excluded_ancestors, peer)
324 .await
325 {
326 warn!(
327 "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
328 );
329 }
330 });
331 }
332
333 Ok(())
334 }
335
336 async fn handle_subscribe_blocks(
337 &self,
338 peer: AuthorityIndex,
339 last_received: Round,
340 ) -> ConsensusResult<BlockStream> {
341 fail_point_async!("consensus-rpc-response");
342
343 let dag_state = self.dag_state.read();
344 let missed_blocks = stream::iter(
349 dag_state
350 .get_cached_blocks(self.context.own_index, last_received + 1)
351 .into_iter()
352 .map(|block| ExtendedSerializedBlock {
353 block: block.serialized().clone(),
354 excluded_ancestors: vec![],
355 }),
356 );
357
358 let broadcasted_blocks = BroadcastedBlockStream::new(
359 peer,
360 self.rx_block_broadcaster.resubscribe(),
361 self.subscription_counter.clone(),
362 );
363
364 Ok(Box::pin(missed_blocks.chain(
367 broadcasted_blocks.map(ExtendedSerializedBlock::from),
368 )))
369 }
370
371 async fn handle_fetch_blocks(
379 &self,
380 peer: AuthorityIndex,
381 mut block_refs: Vec<BlockRef>,
382 highest_accepted_rounds: Vec<Round>,
383 ) -> ConsensusResult<Vec<Bytes>> {
384 let commit_sync_handle = highest_accepted_rounds.is_empty();
388
389 fail_point_async!("consensus-rpc-response");
390
391 for block in &block_refs {
393 if !self.context.committee.is_valid_index(block.author) {
394 return Err(ConsensusError::InvalidAuthorityIndex {
395 index: block.author,
396 max: self.context.committee.size(),
397 });
398 }
399 if block.round == GENESIS_ROUND {
400 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
401 }
402 }
403
404 if !self.context.protocol_config.consensus_batched_block_sync() {
405 if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
406 return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
407 }
408
409 if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
410 {
411 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
412 highest_accepted_rounds.len(),
413 self.context.committee.size(),
414 ));
415 }
416
417 let blocks = self.dag_state.read().get_blocks(&block_refs);
419
420 let mut ancestor_blocks = vec![];
423 if !commit_sync_handle {
424 let all_ancestors = blocks
425 .iter()
426 .flatten()
427 .flat_map(|block| block.ancestors().to_vec())
428 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
429 .take(MAX_ADDITIONAL_BLOCKS)
430 .collect::<Vec<_>>();
431
432 if !all_ancestors.is_empty() {
433 ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
434 }
435 }
436
437 let result = blocks
439 .into_iter()
440 .chain(ancestor_blocks)
441 .flatten()
442 .map(|block| block.serialized().clone())
443 .collect::<Vec<_>>();
444
445 return Ok(result);
446 }
447
448 if commit_sync_handle {
452 block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
453 } else {
454 block_refs.truncate(self.context.parameters.max_blocks_per_sync);
455 }
456
457 let blocks = if commit_sync_handle {
459 let gc_round = self.dag_state.read().gc_round();
461
462 let mut below_gc_indices = Vec::new();
464 let mut above_gc_indices = Vec::new();
465 let mut below_gc_refs = Vec::new();
466 let mut above_gc_refs = Vec::new();
467 for (i, block_ref) in block_refs.iter().enumerate() {
468 if block_ref.round < gc_round {
469 below_gc_indices.push(i);
470 below_gc_refs.push(*block_ref);
471 } else {
472 above_gc_indices.push(i);
473 above_gc_refs.push(*block_ref);
474 }
475 }
476
477 let mut blocks: Vec<Option<VerifiedBlock>> = vec![None; block_refs.len()];
478
479 if !below_gc_refs.is_empty() {
481 for (idx, block) in below_gc_indices
482 .iter()
483 .zip(self.store.read_blocks(&below_gc_refs)?)
484 {
485 blocks[*idx] = block;
486 }
487 }
488
489 if !above_gc_refs.is_empty() {
491 for (idx, block) in above_gc_indices
492 .iter()
493 .zip(self.dag_state.read().get_blocks(&above_gc_refs))
494 {
495 blocks[*idx] = block;
496 }
497 }
498
499 blocks.into_iter().flatten().collect()
500 } else {
501 block_refs.sort();
504 block_refs.dedup();
505 let dag_state = self.dag_state.read();
506 let mut blocks = dag_state
507 .get_blocks(&block_refs)
508 .into_iter()
509 .flatten()
510 .collect::<Vec<_>>();
511
512 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
516 for block_ref in blocks.iter().map(|b| b.reference()) {
517 let entry = lowest_missing_rounds
518 .entry(block_ref.author)
519 .or_insert(block_ref.round);
520 *entry = (*entry).min(block_ref.round);
521 }
522
523 let own_index = self.context.own_index;
527
528 let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
530 ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
531
532 for (authority, lowest_missing_round) in ordered_missing_rounds {
533 let highest_accepted_round = highest_accepted_rounds[authority];
534 if highest_accepted_round >= lowest_missing_round {
535 continue;
536 }
537
538 let missing_blocks = dag_state.get_cached_blocks_in_range(
539 authority,
540 highest_accepted_round + 1,
541 lowest_missing_round,
542 self.context
543 .parameters
544 .max_blocks_per_sync
545 .saturating_sub(blocks.len()),
546 );
547 blocks.extend(missing_blocks);
548 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
549 blocks.truncate(self.context.parameters.max_blocks_per_sync);
550 break;
551 }
552 }
553
554 blocks
555 };
556
557 let bytes = blocks
559 .into_iter()
560 .map(|block| block.serialized().clone())
561 .collect::<Vec<_>>();
562 Ok(bytes)
563 }
564
565 async fn handle_fetch_commits(
566 &self,
567 _peer: AuthorityIndex,
568 commit_range: CommitRange,
569 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
570 fail_point_async!("consensus-rpc-response");
571
572 let inclusive_end = commit_range.end().min(
575 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
576 - 1,
577 );
578 let mut commits = self
579 .store
580 .scan_commits((commit_range.start()..=inclusive_end).into())?;
581 let mut certifier_block_refs = vec![];
582 'commit: while let Some(c) = commits.last() {
583 let index = c.index();
584 let votes = self.store.read_commit_votes(index)?;
585 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
586 for v in &votes {
587 stake_aggregator.add(v.author, &self.context.committee);
588 }
589 if stake_aggregator.reached_threshold(&self.context.committee) {
590 certifier_block_refs = votes;
591 break 'commit;
592 } else {
593 debug!(
594 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
595 index,
596 stake_aggregator.stake(),
597 stake_aggregator.threshold(&self.context.committee)
598 );
599 self.context
600 .metrics
601 .node_metrics
602 .commit_sync_fetch_commits_handler_uncertified_skipped
603 .inc();
604 commits.pop();
605 }
606 }
607 let certifier_blocks = self
608 .store
609 .read_blocks(&certifier_block_refs)?
610 .into_iter()
611 .flatten()
612 .collect();
613 Ok((commits, certifier_blocks))
614 }
615
616 async fn handle_fetch_latest_blocks(
617 &self,
618 peer: AuthorityIndex,
619 authorities: Vec<AuthorityIndex>,
620 ) -> ConsensusResult<Vec<Bytes>> {
621 fail_point_async!("consensus-rpc-response");
622
623 if authorities.len() > self.context.committee.size() {
624 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
625 }
626
627 for authority in &authorities {
629 if !self.context.committee.is_valid_index(*authority) {
630 return Err(ConsensusError::InvalidAuthorityIndex {
631 index: *authority,
632 max: self.context.committee.size(),
633 });
634 }
635 }
636
637 let mut blocks = vec![];
642 let dag_state = self.dag_state.read();
643 for authority in authorities {
644 let block = dag_state.get_last_block_for_authority(authority);
645
646 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
647
648 if block.round() != GENESIS_ROUND {
651 blocks.push(block);
652 }
653 }
654
655 let result = blocks
657 .into_iter()
658 .map(|block| block.serialized().clone())
659 .collect::<Vec<_>>();
660
661 Ok(result)
662 }
663
664 async fn handle_get_latest_rounds(
665 &self,
666 _peer: AuthorityIndex,
667 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
668 fail_point_async!("consensus-rpc-response");
669
670 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
671
672 let blocks = self
673 .dag_state
674 .read()
675 .get_last_cached_block_per_authority(Round::MAX);
676 let highest_accepted_rounds = blocks
677 .into_iter()
678 .map(|(block, _)| block.round())
679 .collect::<Vec<_>>();
680
681 highest_received_rounds[self.context.own_index] =
684 highest_accepted_rounds[self.context.own_index];
685
686 Ok((highest_received_rounds, highest_accepted_rounds))
687 }
688}
689
690struct Counter {
691 count: usize,
692 subscriptions_by_authority: Vec<usize>,
693}
694
695struct SubscriptionCounter {
698 context: Arc<Context>,
699 counter: parking_lot::Mutex<Counter>,
700 dispatcher: Arc<dyn CoreThreadDispatcher>,
701}
702
703impl SubscriptionCounter {
704 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
705 for (_, authority) in context.committee.authorities() {
707 context
708 .metrics
709 .node_metrics
710 .subscribed_by
711 .with_label_values(&[authority.hostname.as_str()])
712 .set(0);
713 }
714
715 Self {
716 counter: parking_lot::Mutex::new(Counter {
717 count: 0,
718 subscriptions_by_authority: vec![0; context.committee.size()],
719 }),
720 dispatcher,
721 context,
722 }
723 }
724
725 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
726 let mut counter = self.counter.lock();
727 counter.count += 1;
728 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
729 counter.subscriptions_by_authority[peer] += 1;
730 let mut total_stake = 0;
731 for (authority_index, _) in self.context.committee.authorities() {
732 if counter.subscriptions_by_authority[authority_index] >= 1
733 || self.context.own_index == authority_index
734 {
735 total_stake += self.context.committee.stake(authority_index);
736 }
737 }
738 let previous_stake = if original_subscription_by_peer == 0 {
740 total_stake - self.context.committee.stake(peer)
741 } else {
742 total_stake
743 };
744
745 let peer_hostname = &self.context.committee.authority(peer).hostname;
746 self.context
747 .metrics
748 .node_metrics
749 .subscribed_by
750 .with_label_values(&[peer_hostname])
751 .set(1);
752 if !self.context.committee.reached_quorum(previous_stake)
755 && self.context.committee.reached_quorum(total_stake)
756 {
757 self.dispatcher
758 .set_quorum_subscribers_exists(true)
759 .map_err(|_| ConsensusError::Shutdown)?;
760 }
761 drop(counter);
763 Ok(())
764 }
765
766 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
767 let mut counter = self.counter.lock();
768 counter.count -= 1;
769 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
770 counter.subscriptions_by_authority[peer] -= 1;
771 let mut total_stake = 0;
772 for (authority_index, _) in self.context.committee.authorities() {
773 if counter.subscriptions_by_authority[authority_index] >= 1
774 || self.context.own_index == authority_index
775 {
776 total_stake += self.context.committee.stake(authority_index);
777 }
778 }
779 let previous_stake = if original_subscription_by_peer == 1 {
781 total_stake + self.context.committee.stake(peer)
782 } else {
783 total_stake
784 };
785
786 if counter.subscriptions_by_authority[peer] == 0 {
787 let peer_hostname = &self.context.committee.authority(peer).hostname;
788 self.context
789 .metrics
790 .node_metrics
791 .subscribed_by
792 .with_label_values(&[peer_hostname])
793 .set(0);
794 }
795
796 if self.context.committee.reached_quorum(previous_stake)
799 && !self.context.committee.reached_quorum(total_stake)
800 {
801 self.dispatcher
802 .set_quorum_subscribers_exists(false)
803 .map_err(|_| ConsensusError::Shutdown)?;
804 }
805 drop(counter);
807 Ok(())
808 }
809}
810
811type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
814
815struct BroadcastStream<T> {
818 peer: AuthorityIndex,
819 inner: ReusableBoxFuture<
821 'static,
822 (
823 Result<T, broadcast::error::RecvError>,
824 broadcast::Receiver<T>,
825 ),
826 >,
827 subscription_counter: Arc<SubscriptionCounter>,
829}
830
831impl<T: 'static + Clone + Send> BroadcastStream<T> {
832 pub fn new(
833 peer: AuthorityIndex,
834 rx: broadcast::Receiver<T>,
835 subscription_counter: Arc<SubscriptionCounter>,
836 ) -> Self {
837 if let Err(err) = subscription_counter.increment(peer) {
838 match err {
839 ConsensusError::Shutdown => {}
840 _ => panic!("Unexpected error: {err}"),
841 }
842 }
843 Self {
844 peer,
845 inner: ReusableBoxFuture::new(make_recv_future(rx)),
846 subscription_counter,
847 }
848 }
849}
850
851impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
852 type Item = T;
853
854 fn poll_next(
855 mut self: Pin<&mut Self>,
856 cx: &mut task::Context<'_>,
857 ) -> task::Poll<Option<Self::Item>> {
858 let peer = self.peer;
859 let maybe_item = loop {
860 let (result, rx) = ready!(self.inner.poll(cx));
861 self.inner.set(make_recv_future(rx));
862
863 match result {
864 Ok(item) => break Some(item),
865 Err(broadcast::error::RecvError::Closed) => {
866 info!("Block BroadcastedBlockStream {} closed", peer);
867 break None;
868 }
869 Err(broadcast::error::RecvError::Lagged(n)) => {
870 warn!(
871 "Block BroadcastedBlockStream {} lagged by {} messages",
872 peer, n
873 );
874 continue;
875 }
876 }
877 };
878 task::Poll::Ready(maybe_item)
879 }
880}
881
882impl<T> Drop for BroadcastStream<T> {
883 fn drop(&mut self) {
884 if let Err(err) = self.subscription_counter.decrement(self.peer) {
885 match err {
886 ConsensusError::Shutdown => {}
887 _ => panic!("Unexpected error: {err}"),
888 }
889 }
890 }
891}
892
893async fn make_recv_future<T: Clone>(
894 mut rx: broadcast::Receiver<T>,
895) -> (
896 Result<T, broadcast::error::RecvError>,
897 broadcast::Receiver<T>,
898) {
899 let result = rx.recv().await;
900 (result, rx)
901}
902
903#[cfg(test)]
906pub(crate) mod tests {
907 use std::{
908 collections::{BTreeMap, BTreeSet},
909 sync::Arc,
910 time::Duration,
911 };
912
913 use async_trait::async_trait;
914 use bytes::Bytes;
915 use consensus_config::AuthorityIndex;
916 use parking_lot::{Mutex, RwLock};
917 use rstest::rstest;
918 use tokio::{sync::broadcast, time::sleep};
919
920 use crate::{
921 Round,
922 authority_service::AuthorityService,
923 block::{BlockAPI, BlockRef, GENESIS_ROUND, SignedBlock, TestBlock, VerifiedBlock},
924 commit::{CertifiedCommits, CommitDigest, CommitRange, TrustedCommit},
925 commit_vote_monitor::CommitVoteMonitor,
926 context::Context,
927 core_thread::{CoreError, CoreThreadDispatcher},
928 dag_state::DagState,
929 error::ConsensusResult,
930 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
931 round_prober::QuorumRound,
932 storage::{Store, WriteBatch, mem_store::MemStore},
933 synchronizer::Synchronizer,
934 test_dag_builder::DagBuilder,
935 };
936
937 pub(crate) struct FakeCoreThreadDispatcher {
938 blocks: Mutex<Vec<VerifiedBlock>>,
939 }
940
941 impl FakeCoreThreadDispatcher {
942 pub(crate) fn new() -> Self {
943 Self {
944 blocks: Mutex::new(vec![]),
945 }
946 }
947
948 fn get_blocks(&self) -> Vec<VerifiedBlock> {
949 self.blocks.lock().clone()
950 }
951 }
952
953 #[async_trait]
954 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
955 async fn add_blocks(
956 &self,
957 blocks: Vec<VerifiedBlock>,
958 ) -> Result<BTreeSet<BlockRef>, CoreError> {
959 let block_refs = blocks.iter().map(|b| b.reference()).collect();
960 self.blocks.lock().extend(blocks);
961 Ok(block_refs)
962 }
963
964 async fn check_block_refs(
965 &self,
966 _block_refs: Vec<BlockRef>,
967 ) -> Result<BTreeSet<BlockRef>, CoreError> {
968 Ok(BTreeSet::new())
969 }
970
971 async fn add_certified_commits(
972 &self,
973 _commits: CertifiedCommits,
974 ) -> Result<BTreeSet<BlockRef>, CoreError> {
975 todo!()
976 }
977
978 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
979 Ok(())
980 }
981
982 async fn get_missing_blocks(
983 &self,
984 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
985 Ok(Default::default())
986 }
987
988 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
989 todo!()
990 }
991
992 fn set_propagation_delay_and_quorum_rounds(
993 &self,
994 _delay: Round,
995 _received_quorum_rounds: Vec<QuorumRound>,
996 _accepted_quorum_rounds: Vec<QuorumRound>,
997 ) -> Result<(), CoreError> {
998 todo!()
999 }
1000
1001 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
1002 todo!()
1003 }
1004
1005 fn highest_received_rounds(&self) -> Vec<Round> {
1006 todo!()
1007 }
1008 }
1009
1010 #[derive(Default)]
1011 pub(crate) struct FakeNetworkClient {}
1012
1013 #[async_trait]
1014 impl NetworkClient for FakeNetworkClient {
1015 const SUPPORT_STREAMING: bool = false;
1016
1017 async fn send_block(
1018 &self,
1019 _peer: AuthorityIndex,
1020 _block: &VerifiedBlock,
1021 _timeout: Duration,
1022 ) -> ConsensusResult<()> {
1023 unimplemented!("Unimplemented")
1024 }
1025
1026 async fn subscribe_blocks(
1027 &self,
1028 _peer: AuthorityIndex,
1029 _last_received: Round,
1030 _timeout: Duration,
1031 ) -> ConsensusResult<BlockStream> {
1032 unimplemented!("Unimplemented")
1033 }
1034
1035 async fn fetch_blocks(
1036 &self,
1037 _peer: AuthorityIndex,
1038 _block_refs: Vec<BlockRef>,
1039 _highest_accepted_rounds: Vec<Round>,
1040 _timeout: Duration,
1041 ) -> ConsensusResult<Vec<Bytes>> {
1042 unimplemented!("Unimplemented")
1043 }
1044
1045 async fn fetch_commits(
1046 &self,
1047 _peer: AuthorityIndex,
1048 _commit_range: CommitRange,
1049 _timeout: Duration,
1050 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1051 unimplemented!("Unimplemented")
1052 }
1053
1054 async fn fetch_latest_blocks(
1055 &self,
1056 _peer: AuthorityIndex,
1057 _authorities: Vec<AuthorityIndex>,
1058 _timeout: Duration,
1059 ) -> ConsensusResult<Vec<Bytes>> {
1060 unimplemented!("Unimplemented")
1061 }
1062
1063 async fn get_latest_rounds(
1064 &self,
1065 _peer: AuthorityIndex,
1066 _timeout: Duration,
1067 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1068 unimplemented!("Unimplemented")
1069 }
1070 }
1071
1072 #[rstest]
1073 #[tokio::test(flavor = "current_thread", start_paused = true)]
1074 async fn test_handle_send_block(#[values(false, true)] median_based_timestamp: bool) {
1075 let (mut context, _keys) = Context::new_for_test(4);
1076 context
1077 .protocol_config
1078 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1079 median_based_timestamp,
1080 );
1081 let context = Arc::new(context);
1082 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1083 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1084 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1085 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1086 let network_client = Arc::new(FakeNetworkClient::default());
1087 let store = Arc::new(MemStore::new());
1088 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1089 let synchronizer = Synchronizer::start(
1090 network_client,
1091 context.clone(),
1092 core_dispatcher.clone(),
1093 commit_vote_monitor.clone(),
1094 block_verifier.clone(),
1095 dag_state.clone(),
1096 false,
1097 );
1098 let authority_service = Arc::new(AuthorityService::new(
1099 context.clone(),
1100 block_verifier,
1101 commit_vote_monitor,
1102 synchronizer,
1103 core_dispatcher.clone(),
1104 rx_block_broadcast,
1105 dag_state,
1106 store,
1107 ));
1108
1109 let now = context.clock.timestamp_utc_ms();
1111 let max_drift = context.parameters.max_forward_time_drift;
1112 let input_block = VerifiedBlock::new_for_test(
1113 TestBlock::new(9, 0)
1114 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1115 .build(),
1116 );
1117
1118 let service = authority_service.clone();
1119 let serialized = ExtendedSerializedBlock {
1120 block: input_block.serialized().clone(),
1121 excluded_ancestors: vec![],
1122 };
1123
1124 tokio::spawn(async move {
1125 service
1126 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1127 .await
1128 .unwrap();
1129 });
1130
1131 sleep(max_drift / 2).await;
1132
1133 if !median_based_timestamp {
1134 assert!(core_dispatcher.get_blocks().is_empty());
1135 sleep(max_drift).await;
1136 }
1137
1138 let blocks = core_dispatcher.get_blocks();
1139 assert_eq!(blocks.len(), 1);
1140 assert_eq!(blocks[0], input_block);
1141 }
1142
1143 #[tokio::test(flavor = "current_thread", start_paused = true)]
1144 async fn test_handle_fetch_latest_blocks() {
1145 let (context, _keys) = Context::new_for_test(4);
1147 let context = Arc::new(context);
1148 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1149 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1150 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1151 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1152 let network_client = Arc::new(FakeNetworkClient::default());
1153 let store = Arc::new(MemStore::new());
1154 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1155 let synchronizer = Synchronizer::start(
1156 network_client,
1157 context.clone(),
1158 core_dispatcher.clone(),
1159 commit_vote_monitor.clone(),
1160 block_verifier.clone(),
1161 dag_state.clone(),
1162 true,
1163 );
1164 let authority_service = Arc::new(AuthorityService::new(
1165 context.clone(),
1166 block_verifier,
1167 commit_vote_monitor,
1168 synchronizer,
1169 core_dispatcher.clone(),
1170 rx_block_broadcast,
1171 dag_state.clone(),
1172 store,
1173 ));
1174
1175 let mut dag_builder = DagBuilder::new(context.clone());
1178 dag_builder
1179 .layers(1..=10)
1180 .authorities(vec![AuthorityIndex::new_for_test(2)])
1181 .equivocate(1)
1182 .build()
1183 .persist_layers(dag_state);
1184
1185 let authorities_to_request = vec![
1187 AuthorityIndex::new_for_test(1),
1188 AuthorityIndex::new_for_test(2),
1189 ];
1190 let results = authority_service
1191 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1192 .await;
1193
1194 let serialised_blocks = results.unwrap();
1196 for serialised_block in serialised_blocks {
1197 let signed_block: SignedBlock =
1198 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1199 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1200
1201 assert_eq!(verified_block.round(), 10);
1202 }
1203 }
1204
1205 #[tokio::test(flavor = "current_thread", start_paused = true)]
1211 async fn test_handle_fetch_blocks_commit_sync_order_across_gc_boundary() {
1212 let rounds = 20;
1214 let gc_depth = 5;
1215 let (mut context, _keys) = Context::new_for_test(4);
1216 context
1217 .protocol_config
1218 .set_consensus_batched_block_sync_for_testing(true);
1219 context.protocol_config.set_gc_depth_for_testing(gc_depth);
1220 let context = Arc::new(context);
1221 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1222 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1223 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1224 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1225 let network_client = Arc::new(FakeNetworkClient::default());
1226 let store = Arc::new(MemStore::new());
1227 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1228 let synchronizer = Synchronizer::start(
1229 network_client,
1230 context.clone(),
1231 core_dispatcher.clone(),
1232 commit_vote_monitor.clone(),
1233 block_verifier.clone(),
1234 dag_state.clone(),
1235 true,
1236 );
1237 let authority_service = Arc::new(AuthorityService::new(
1238 context.clone(),
1239 block_verifier,
1240 commit_vote_monitor,
1241 synchronizer,
1242 core_dispatcher.clone(),
1243 rx_block_broadcast,
1244 dag_state.clone(),
1245 store.clone(),
1246 ));
1247
1248 let mut dag_builder = DagBuilder::new(context.clone());
1250 dag_builder
1251 .layers(1..=rounds)
1252 .build()
1253 .persist_layers(dag_state.clone());
1254
1255 let all_blocks = dag_builder.blocks(1..=rounds);
1257 store
1258 .write(WriteBatch::new(all_blocks, vec![], vec![], vec![]))
1259 .expect("Failed to write blocks to store");
1260
1261 let leader_round = 15;
1263 let leader_ref = dag_builder
1264 .blocks(leader_round..=leader_round)
1265 .first()
1266 .unwrap()
1267 .reference();
1268 let commit =
1269 TrustedCommit::new_for_test(1, CommitDigest::MIN, 0, leader_ref, vec![leader_ref]);
1270 dag_state.write().set_last_commit(commit);
1271
1272 let gc_round = dag_state.read().gc_round();
1273 assert!(
1274 gc_round > GENESIS_ROUND && gc_round < rounds,
1275 "GC round {gc_round} should be between genesis and max round"
1276 );
1277
1278 let mut blocks_by_round: Vec<Vec<VerifiedBlock>> = vec![vec![]; (rounds + 1) as usize];
1280 for round in 1..=rounds {
1281 blocks_by_round[round as usize] = dag_builder.blocks(round..=round);
1282 }
1283
1284 let below_gc_rounds: Vec<Round> = (1..gc_round).collect();
1286 let above_gc_rounds: Vec<Round> = (gc_round..=rounds).collect();
1287 let validators = context.committee.size();
1288 let mut interleaved_refs = Vec::new();
1289 let max_pairs = std::cmp::min(below_gc_rounds.len(), above_gc_rounds.len());
1290 for i in 0..max_pairs {
1291 let below_round = below_gc_rounds[i];
1292 let auth_idx = i % validators;
1293 if auth_idx < blocks_by_round[below_round as usize].len() {
1294 interleaved_refs.push(blocks_by_round[below_round as usize][auth_idx].reference());
1295 }
1296 let above_round = above_gc_rounds[i];
1297 let auth_idx2 = (i + 1) % validators;
1298 if auth_idx2 < blocks_by_round[above_round as usize].len() {
1299 interleaved_refs.push(blocks_by_round[above_round as usize][auth_idx2].reference());
1300 }
1301 }
1302
1303 assert!(
1305 interleaved_refs.iter().any(|r| r.round < gc_round),
1306 "Should have refs below GC round"
1307 );
1308 assert!(
1309 interleaved_refs.iter().any(|r| r.round >= gc_round),
1310 "Should have refs above GC round"
1311 );
1312
1313 let peer = context.committee.to_authority_index(1).unwrap();
1316 let returned_blocks = authority_service
1317 .handle_fetch_blocks(peer, interleaved_refs.clone(), vec![])
1318 .await
1319 .expect("Should return valid serialized blocks");
1320
1321 assert_eq!(
1323 returned_blocks.len(),
1324 interleaved_refs.len(),
1325 "Should receive all requested blocks"
1326 );
1327 for (i, serialized_block) in returned_blocks.into_iter().enumerate() {
1328 let signed_block: SignedBlock =
1329 bcs::from_bytes(&serialized_block).expect("Error while deserialising block");
1330 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
1331 assert_eq!(
1332 verified_block.reference(),
1333 interleaved_refs[i],
1334 "Block at index {i} should match requested ref. \
1335 Expected {:?}, got {:?}",
1336 interleaved_refs[i],
1337 verified_block.reference()
1338 );
1339 }
1340 }
1341}