1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use consensus_config::AuthorityIndex;
10use futures::{Stream, StreamExt, ready, stream, task};
11use iota_macros::fail_point_async;
12use parking_lot::RwLock;
13use tokio::{sync::broadcast, time::sleep};
14use tokio_util::sync::ReusableBoxFuture;
15use tracing::{debug, info, warn};
16
17use crate::{
18 CommitIndex, Round,
19 block::{BlockAPI as _, BlockRef, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
20 block_verifier::BlockVerifier,
21 commit::{CommitAPI as _, CommitRange, TrustedCommit},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::Context,
24 core_thread::CoreThreadDispatcher,
25 dag_state::DagState,
26 error::{ConsensusError, ConsensusResult},
27 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
28 stake_aggregator::{QuorumThreshold, StakeAggregator},
29 storage::Store,
30 synchronizer::{MAX_ADDITIONAL_BLOCKS, SynchronizerHandle},
31};
32
33pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
38 context: Arc<Context>,
39 commit_vote_monitor: Arc<CommitVoteMonitor>,
40 block_verifier: Arc<dyn BlockVerifier>,
41 synchronizer: Arc<SynchronizerHandle>,
42 core_dispatcher: Arc<C>,
43 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
44 subscription_counter: Arc<SubscriptionCounter>,
45 dag_state: Arc<RwLock<DagState>>,
46 store: Arc<dyn Store>,
47}
48
49impl<C: CoreThreadDispatcher> AuthorityService<C> {
50 pub(crate) fn new(
51 context: Arc<Context>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 commit_vote_monitor: Arc<CommitVoteMonitor>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
57 dag_state: Arc<RwLock<DagState>>,
58 store: Arc<dyn Store>,
59 ) -> Self {
60 let subscription_counter = Arc::new(SubscriptionCounter::new(
61 context.clone(),
62 core_dispatcher.clone(),
63 ));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcaster,
71 subscription_counter,
72 dag_state,
73 store,
74 }
75 }
76}
77
78#[async_trait]
79impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
80 async fn handle_send_block(
81 &self,
82 peer: AuthorityIndex,
83 serialized_block: ExtendedSerializedBlock,
84 ) -> ConsensusResult<()> {
85 fail_point_async!("consensus-rpc-response");
86
87 let peer_hostname = &self.context.committee.authority(peer).hostname;
88
89 let signed_block: SignedBlock =
91 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
92
93 if peer != signed_block.author() {
95 self.context
96 .metrics
97 .node_metrics
98 .invalid_blocks
99 .with_label_values(&[
100 peer_hostname.as_str(),
101 "handle_send_block",
102 "UnexpectedAuthority",
103 ])
104 .inc();
105 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
106 info!("Block with wrong authority from {}: {}", peer, e);
107 return Err(e);
108 }
109 let peer_hostname = &self.context.committee.authority(peer).hostname;
110
111 if let Err(e) = self.block_verifier.verify(&signed_block) {
113 self.context
114 .metrics
115 .node_metrics
116 .invalid_blocks
117 .with_label_values(&[
118 peer_hostname.as_str(),
119 "handle_send_block",
120 e.clone().name(),
121 ])
122 .inc();
123 info!("Invalid block from {}: {}", peer, e);
124 return Err(e);
125 }
126 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
127 let block_ref = verified_block.reference();
128 debug!("Received block {} via send block.", block_ref);
129
130 let now = self.context.clock.timestamp_utc_ms();
132 let forward_time_drift =
133 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
134 if forward_time_drift > self.context.parameters.max_forward_time_drift {
135 self.context
136 .metrics
137 .node_metrics
138 .rejected_future_blocks
139 .with_label_values(&[peer_hostname])
140 .inc();
141 debug!(
142 "Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
143 block_ref,
144 verified_block.timestamp_ms(),
145 now,
146 );
147 return Err(ConsensusError::BlockRejected {
148 block_ref,
149 reason: format!(
150 "Block timestamp is too far in the future: {} > {}",
151 verified_block.timestamp_ms(),
152 now
153 ),
154 });
155 }
156
157 if forward_time_drift > Duration::ZERO {
159 self.context
160 .metrics
161 .node_metrics
162 .block_timestamp_drift_wait_ms
163 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
164 .inc_by(forward_time_drift.as_millis() as u64);
165 debug!(
166 "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
167 block_ref,
168 verified_block.timestamp_ms(),
169 now,
170 forward_time_drift.as_millis(),
171 );
172 sleep(forward_time_drift).await;
173 }
174
175 self.commit_vote_monitor.observe_block(&verified_block);
178
179 let last_commit_index = self.dag_state.read().last_commit_index();
188 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
189 if last_commit_index
192 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
193 < quorum_commit_index
194 {
195 self.context
196 .metrics
197 .node_metrics
198 .rejected_blocks
199 .with_label_values(&["commit_lagging"])
200 .inc();
201 debug!(
202 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
203 block_ref, last_commit_index, quorum_commit_index,
204 );
205 return Err(ConsensusError::BlockRejected {
206 block_ref,
207 reason: format!(
208 "Last commit index is lagging quorum commit index too much ({last_commit_index} < {quorum_commit_index})",
209 ),
210 });
211 }
212
213 self.context
214 .metrics
215 .node_metrics
216 .verified_blocks
217 .with_label_values(&[peer_hostname])
218 .inc();
219
220 let missing_ancestors = self
221 .core_dispatcher
222 .add_blocks(vec![verified_block])
223 .await
224 .map_err(|_| ConsensusError::Shutdown)?;
225 if !missing_ancestors.is_empty() {
226 if let Err(err) = self
228 .synchronizer
229 .fetch_blocks(missing_ancestors, peer)
230 .await
231 {
232 warn!("Errored while trying to fetch missing ancestors via synchronizer: {err}");
233 }
234 }
235
236 let mut excluded_ancestors = serialized_block
239 .excluded_ancestors
240 .into_iter()
241 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
242 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
243 .map_err(ConsensusError::MalformedBlock)?;
244
245 let excluded_ancestors_limit = self.context.committee.size() * 2;
246 if excluded_ancestors.len() > excluded_ancestors_limit {
247 debug!(
248 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
249 excluded_ancestors.len() - excluded_ancestors_limit,
250 peer,
251 peer_hostname,
252 );
253 excluded_ancestors.truncate(excluded_ancestors_limit);
254 }
255
256 self.context
257 .metrics
258 .node_metrics
259 .network_received_excluded_ancestors_from_authority
260 .with_label_values(&[peer_hostname])
261 .inc_by(excluded_ancestors.len() as u64);
262
263 for excluded_ancestor in &excluded_ancestors {
264 let excluded_ancestor_hostname = &self
265 .context
266 .committee
267 .authority(excluded_ancestor.author)
268 .hostname;
269 self.context
270 .metrics
271 .node_metrics
272 .network_excluded_ancestors_count_by_authority
273 .with_label_values(&[excluded_ancestor_hostname])
274 .inc();
275 }
276
277 let missing_excluded_ancestors = self
278 .core_dispatcher
279 .check_block_refs(excluded_ancestors)
280 .await
281 .map_err(|_| ConsensusError::Shutdown)?;
282
283 if !missing_excluded_ancestors.is_empty() {
284 self.context
285 .metrics
286 .node_metrics
287 .network_excluded_ancestors_sent_to_fetch
288 .with_label_values(&[peer_hostname])
289 .inc_by(missing_excluded_ancestors.len() as u64);
290
291 let synchronizer = self.synchronizer.clone();
292 tokio::spawn(async move {
293 if let Err(err) = synchronizer
295 .fetch_blocks(missing_excluded_ancestors, peer)
296 .await
297 {
298 warn!(
299 "Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
300 );
301 }
302 });
303 }
304
305 Ok(())
306 }
307
308 async fn handle_subscribe_blocks(
309 &self,
310 peer: AuthorityIndex,
311 last_received: Round,
312 ) -> ConsensusResult<BlockStream> {
313 fail_point_async!("consensus-rpc-response");
314
315 let dag_state = self.dag_state.read();
316 let missed_blocks = stream::iter(
321 dag_state
322 .get_cached_blocks(self.context.own_index, last_received + 1)
323 .into_iter()
324 .map(|block| ExtendedSerializedBlock {
325 block: block.serialized().clone(),
326 excluded_ancestors: vec![],
327 }),
328 );
329
330 let broadcasted_blocks = BroadcastedBlockStream::new(
331 peer,
332 self.rx_block_broadcaster.resubscribe(),
333 self.subscription_counter.clone(),
334 );
335
336 Ok(Box::pin(missed_blocks.chain(
339 broadcasted_blocks.map(ExtendedSerializedBlock::from),
340 )))
341 }
342
343 async fn handle_fetch_blocks(
351 &self,
352 peer: AuthorityIndex,
353 mut block_refs: Vec<BlockRef>,
354 highest_accepted_rounds: Vec<Round>,
355 ) -> ConsensusResult<Vec<Bytes>> {
356 let commit_sync_handle = highest_accepted_rounds.is_empty();
360
361 fail_point_async!("consensus-rpc-response");
362
363 for block in &block_refs {
365 if !self.context.committee.is_valid_index(block.author) {
366 return Err(ConsensusError::InvalidAuthorityIndex {
367 index: block.author,
368 max: self.context.committee.size(),
369 });
370 }
371 if block.round == GENESIS_ROUND {
372 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
373 }
374 }
375
376 if !self.context.protocol_config.consensus_batched_block_sync() {
377 if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
378 return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
379 }
380
381 if !commit_sync_handle && highest_accepted_rounds.len() != self.context.committee.size()
382 {
383 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
384 highest_accepted_rounds.len(),
385 self.context.committee.size(),
386 ));
387 }
388
389 let blocks = self.dag_state.read().get_blocks(&block_refs);
391
392 let mut ancestor_blocks = vec![];
395 if !commit_sync_handle {
396 let all_ancestors = blocks
397 .iter()
398 .flatten()
399 .flat_map(|block| block.ancestors().to_vec())
400 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
401 .take(MAX_ADDITIONAL_BLOCKS)
402 .collect::<Vec<_>>();
403
404 if !all_ancestors.is_empty() {
405 ancestor_blocks = self.dag_state.read().get_blocks(&all_ancestors);
406 }
407 }
408
409 let result = blocks
411 .into_iter()
412 .chain(ancestor_blocks)
413 .flatten()
414 .map(|block| block.serialized().clone())
415 .collect::<Vec<_>>();
416
417 return Ok(result);
418 }
419
420 if commit_sync_handle {
424 block_refs.truncate(self.context.parameters.max_blocks_per_fetch);
425 } else {
426 block_refs.truncate(self.context.parameters.max_blocks_per_sync);
427 }
428
429 let blocks = if commit_sync_handle {
431 self.dag_state
433 .read()
434 .get_blocks(&block_refs)
435 .into_iter()
436 .flatten()
437 .collect()
438 } else {
439 block_refs.sort();
442 block_refs.dedup();
443 let dag_state = self.dag_state.read();
444 let mut blocks = dag_state
445 .get_blocks(&block_refs)
446 .into_iter()
447 .flatten()
448 .collect::<Vec<_>>();
449
450 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
454 for block_ref in blocks.iter().map(|b| b.reference()) {
455 let entry = lowest_missing_rounds
456 .entry(block_ref.author)
457 .or_insert(block_ref.round);
458 *entry = (*entry).min(block_ref.round);
459 }
460
461 let own_index = self.context.own_index;
465
466 let mut ordered_missing_rounds: Vec<_> = lowest_missing_rounds.into_iter().collect();
468 ordered_missing_rounds.sort_by_key(|(auth, _)| if *auth == own_index { 0 } else { 1 });
469
470 for (authority, lowest_missing_round) in ordered_missing_rounds {
471 let highest_accepted_round = highest_accepted_rounds[authority];
472 if highest_accepted_round >= lowest_missing_round {
473 continue;
474 }
475
476 let missing_blocks = dag_state.get_cached_blocks_in_range(
477 authority,
478 highest_accepted_round + 1,
479 lowest_missing_round,
480 self.context
481 .parameters
482 .max_blocks_per_sync
483 .saturating_sub(blocks.len()),
484 );
485 blocks.extend(missing_blocks);
486 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
487 blocks.truncate(self.context.parameters.max_blocks_per_sync);
488 break;
489 }
490 }
491
492 blocks
493 };
494
495 let bytes = blocks
497 .into_iter()
498 .map(|block| block.serialized().clone())
499 .collect::<Vec<_>>();
500 Ok(bytes)
501 }
502
503 async fn handle_fetch_commits(
504 &self,
505 _peer: AuthorityIndex,
506 commit_range: CommitRange,
507 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
508 fail_point_async!("consensus-rpc-response");
509
510 let inclusive_end = commit_range.end().min(
513 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
514 - 1,
515 );
516 let mut commits = self
517 .store
518 .scan_commits((commit_range.start()..=inclusive_end).into())?;
519 let mut certifier_block_refs = vec![];
520 'commit: while let Some(c) = commits.last() {
521 let index = c.index();
522 let votes = self.store.read_commit_votes(index)?;
523 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
524 for v in &votes {
525 stake_aggregator.add(v.author, &self.context.committee);
526 }
527 if stake_aggregator.reached_threshold(&self.context.committee) {
528 certifier_block_refs = votes;
529 break 'commit;
530 } else {
531 debug!(
532 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
533 index,
534 stake_aggregator.stake(),
535 stake_aggregator.threshold(&self.context.committee)
536 );
537 self.context
538 .metrics
539 .node_metrics
540 .commit_sync_fetch_commits_handler_uncertified_skipped
541 .inc();
542 commits.pop();
543 }
544 }
545 let certifier_blocks = self
546 .store
547 .read_blocks(&certifier_block_refs)?
548 .into_iter()
549 .flatten()
550 .collect();
551 Ok((commits, certifier_blocks))
552 }
553
554 async fn handle_fetch_latest_blocks(
555 &self,
556 peer: AuthorityIndex,
557 authorities: Vec<AuthorityIndex>,
558 ) -> ConsensusResult<Vec<Bytes>> {
559 fail_point_async!("consensus-rpc-response");
560
561 if authorities.len() > self.context.committee.size() {
562 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
563 }
564
565 for authority in &authorities {
567 if !self.context.committee.is_valid_index(*authority) {
568 return Err(ConsensusError::InvalidAuthorityIndex {
569 index: *authority,
570 max: self.context.committee.size(),
571 });
572 }
573 }
574
575 let mut blocks = vec![];
580 let dag_state = self.dag_state.read();
581 for authority in authorities {
582 let block = dag_state.get_last_block_for_authority(authority);
583
584 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
585
586 if block.round() != GENESIS_ROUND {
589 blocks.push(block);
590 }
591 }
592
593 let result = blocks
595 .into_iter()
596 .map(|block| block.serialized().clone())
597 .collect::<Vec<_>>();
598
599 Ok(result)
600 }
601
602 async fn handle_get_latest_rounds(
603 &self,
604 _peer: AuthorityIndex,
605 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
606 fail_point_async!("consensus-rpc-response");
607
608 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
609
610 let blocks = self
611 .dag_state
612 .read()
613 .get_last_cached_block_per_authority(Round::MAX);
614 let highest_accepted_rounds = blocks
615 .into_iter()
616 .map(|(block, _)| block.round())
617 .collect::<Vec<_>>();
618
619 highest_received_rounds[self.context.own_index] =
622 highest_accepted_rounds[self.context.own_index];
623
624 Ok((highest_received_rounds, highest_accepted_rounds))
625 }
626}
627
628struct Counter {
629 count: usize,
630 subscriptions_by_authority: Vec<usize>,
631}
632
633struct SubscriptionCounter {
636 context: Arc<Context>,
637 counter: parking_lot::Mutex<Counter>,
638 dispatcher: Arc<dyn CoreThreadDispatcher>,
639}
640
641impl SubscriptionCounter {
642 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
643 for (_, authority) in context.committee.authorities() {
645 context
646 .metrics
647 .node_metrics
648 .subscribed_by
649 .with_label_values(&[authority.hostname.as_str()])
650 .set(0);
651 }
652
653 Self {
654 counter: parking_lot::Mutex::new(Counter {
655 count: 0,
656 subscriptions_by_authority: vec![0; context.committee.size()],
657 }),
658 dispatcher,
659 context,
660 }
661 }
662
663 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
664 let mut counter = self.counter.lock();
665 counter.count += 1;
666 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
667 counter.subscriptions_by_authority[peer] += 1;
668 let mut total_stake = 0;
669 for (authority_index, _) in self.context.committee.authorities() {
670 if counter.subscriptions_by_authority[authority_index] >= 1
671 || self.context.own_index == authority_index
672 {
673 total_stake += self.context.committee.stake(authority_index);
674 }
675 }
676 let previous_stake = if original_subscription_by_peer == 0 {
678 total_stake - self.context.committee.stake(peer)
679 } else {
680 total_stake
681 };
682
683 let peer_hostname = &self.context.committee.authority(peer).hostname;
684 self.context
685 .metrics
686 .node_metrics
687 .subscribed_by
688 .with_label_values(&[peer_hostname])
689 .set(1);
690 if !self.context.committee.reached_quorum(previous_stake)
693 && self.context.committee.reached_quorum(total_stake)
694 {
695 self.dispatcher
696 .set_quorum_subscribers_exists(true)
697 .map_err(|_| ConsensusError::Shutdown)?;
698 }
699 drop(counter);
701 Ok(())
702 }
703
704 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
705 let mut counter = self.counter.lock();
706 counter.count -= 1;
707 let original_subscription_by_peer = counter.subscriptions_by_authority[peer];
708 counter.subscriptions_by_authority[peer] -= 1;
709 let mut total_stake = 0;
710 for (authority_index, _) in self.context.committee.authorities() {
711 if counter.subscriptions_by_authority[authority_index] >= 1
712 || self.context.own_index == authority_index
713 {
714 total_stake += self.context.committee.stake(authority_index);
715 }
716 }
717 let previous_stake = if original_subscription_by_peer == 1 {
719 total_stake + self.context.committee.stake(peer)
720 } else {
721 total_stake
722 };
723
724 if counter.subscriptions_by_authority[peer] == 0 {
725 let peer_hostname = &self.context.committee.authority(peer).hostname;
726 self.context
727 .metrics
728 .node_metrics
729 .subscribed_by
730 .with_label_values(&[peer_hostname])
731 .set(0);
732 }
733
734 if self.context.committee.reached_quorum(previous_stake)
737 && !self.context.committee.reached_quorum(total_stake)
738 {
739 self.dispatcher
740 .set_quorum_subscribers_exists(false)
741 .map_err(|_| ConsensusError::Shutdown)?;
742 }
743 drop(counter);
745 Ok(())
746 }
747}
748
749type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
752
753struct BroadcastStream<T> {
756 peer: AuthorityIndex,
757 inner: ReusableBoxFuture<
759 'static,
760 (
761 Result<T, broadcast::error::RecvError>,
762 broadcast::Receiver<T>,
763 ),
764 >,
765 subscription_counter: Arc<SubscriptionCounter>,
767}
768
769impl<T: 'static + Clone + Send> BroadcastStream<T> {
770 pub fn new(
771 peer: AuthorityIndex,
772 rx: broadcast::Receiver<T>,
773 subscription_counter: Arc<SubscriptionCounter>,
774 ) -> Self {
775 if let Err(err) = subscription_counter.increment(peer) {
776 match err {
777 ConsensusError::Shutdown => {}
778 _ => panic!("Unexpected error: {err}"),
779 }
780 }
781 Self {
782 peer,
783 inner: ReusableBoxFuture::new(make_recv_future(rx)),
784 subscription_counter,
785 }
786 }
787}
788
789impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
790 type Item = T;
791
792 fn poll_next(
793 mut self: Pin<&mut Self>,
794 cx: &mut task::Context<'_>,
795 ) -> task::Poll<Option<Self::Item>> {
796 let peer = self.peer;
797 let maybe_item = loop {
798 let (result, rx) = ready!(self.inner.poll(cx));
799 self.inner.set(make_recv_future(rx));
800
801 match result {
802 Ok(item) => break Some(item),
803 Err(broadcast::error::RecvError::Closed) => {
804 info!("Block BroadcastedBlockStream {} closed", peer);
805 break None;
806 }
807 Err(broadcast::error::RecvError::Lagged(n)) => {
808 warn!(
809 "Block BroadcastedBlockStream {} lagged by {} messages",
810 peer, n
811 );
812 continue;
813 }
814 }
815 };
816 task::Poll::Ready(maybe_item)
817 }
818}
819
820impl<T> Drop for BroadcastStream<T> {
821 fn drop(&mut self) {
822 if let Err(err) = self.subscription_counter.decrement(self.peer) {
823 match err {
824 ConsensusError::Shutdown => {}
825 _ => panic!("Unexpected error: {err}"),
826 }
827 }
828 }
829}
830
831async fn make_recv_future<T: Clone>(
832 mut rx: broadcast::Receiver<T>,
833) -> (
834 Result<T, broadcast::error::RecvError>,
835 broadcast::Receiver<T>,
836) {
837 let result = rx.recv().await;
838 (result, rx)
839}
840
841#[cfg(test)]
844pub(crate) mod tests {
845 use std::{
846 collections::{BTreeMap, BTreeSet},
847 sync::Arc,
848 time::Duration,
849 };
850
851 use async_trait::async_trait;
852 use bytes::Bytes;
853 use consensus_config::AuthorityIndex;
854 use parking_lot::{Mutex, RwLock};
855 use tokio::{sync::broadcast, time::sleep};
856
857 use crate::{
858 Round,
859 authority_service::AuthorityService,
860 block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
861 commit::{CertifiedCommits, CommitRange},
862 commit_vote_monitor::CommitVoteMonitor,
863 context::Context,
864 core_thread::{CoreError, CoreThreadDispatcher},
865 dag_state::DagState,
866 error::ConsensusResult,
867 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
868 round_prober::QuorumRound,
869 storage::mem_store::MemStore,
870 synchronizer::Synchronizer,
871 test_dag_builder::DagBuilder,
872 };
873
874 pub(crate) struct FakeCoreThreadDispatcher {
875 blocks: Mutex<Vec<VerifiedBlock>>,
876 }
877
878 impl FakeCoreThreadDispatcher {
879 pub(crate) fn new() -> Self {
880 Self {
881 blocks: Mutex::new(vec![]),
882 }
883 }
884
885 fn get_blocks(&self) -> Vec<VerifiedBlock> {
886 self.blocks.lock().clone()
887 }
888 }
889
890 #[async_trait]
891 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
892 async fn add_blocks(
893 &self,
894 blocks: Vec<VerifiedBlock>,
895 ) -> Result<BTreeSet<BlockRef>, CoreError> {
896 let block_refs = blocks.iter().map(|b| b.reference()).collect();
897 self.blocks.lock().extend(blocks);
898 Ok(block_refs)
899 }
900
901 async fn check_block_refs(
902 &self,
903 _block_refs: Vec<BlockRef>,
904 ) -> Result<BTreeSet<BlockRef>, CoreError> {
905 Ok(BTreeSet::new())
906 }
907
908 async fn add_certified_commits(
909 &self,
910 _commits: CertifiedCommits,
911 ) -> Result<BTreeSet<BlockRef>, CoreError> {
912 todo!()
913 }
914
915 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
916 Ok(())
917 }
918
919 async fn get_missing_blocks(
920 &self,
921 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
922 Ok(Default::default())
923 }
924
925 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
926 todo!()
927 }
928
929 fn set_propagation_delay_and_quorum_rounds(
930 &self,
931 _delay: Round,
932 _received_quorum_rounds: Vec<QuorumRound>,
933 _accepted_quorum_rounds: Vec<QuorumRound>,
934 ) -> Result<(), CoreError> {
935 todo!()
936 }
937
938 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
939 todo!()
940 }
941
942 fn highest_received_rounds(&self) -> Vec<Round> {
943 todo!()
944 }
945 }
946
947 #[derive(Default)]
948 struct FakeNetworkClient {}
949
950 #[async_trait]
951 impl NetworkClient for FakeNetworkClient {
952 const SUPPORT_STREAMING: bool = false;
953
954 async fn send_block(
955 &self,
956 _peer: AuthorityIndex,
957 _block: &VerifiedBlock,
958 _timeout: Duration,
959 ) -> ConsensusResult<()> {
960 unimplemented!("Unimplemented")
961 }
962
963 async fn subscribe_blocks(
964 &self,
965 _peer: AuthorityIndex,
966 _last_received: Round,
967 _timeout: Duration,
968 ) -> ConsensusResult<BlockStream> {
969 unimplemented!("Unimplemented")
970 }
971
972 async fn fetch_blocks(
973 &self,
974 _peer: AuthorityIndex,
975 _block_refs: Vec<BlockRef>,
976 _highest_accepted_rounds: Vec<Round>,
977 _timeout: Duration,
978 ) -> ConsensusResult<Vec<Bytes>> {
979 unimplemented!("Unimplemented")
980 }
981
982 async fn fetch_commits(
983 &self,
984 _peer: AuthorityIndex,
985 _commit_range: CommitRange,
986 _timeout: Duration,
987 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
988 unimplemented!("Unimplemented")
989 }
990
991 async fn fetch_latest_blocks(
992 &self,
993 _peer: AuthorityIndex,
994 _authorities: Vec<AuthorityIndex>,
995 _timeout: Duration,
996 ) -> ConsensusResult<Vec<Bytes>> {
997 unimplemented!("Unimplemented")
998 }
999
1000 async fn get_latest_rounds(
1001 &self,
1002 _peer: AuthorityIndex,
1003 _timeout: Duration,
1004 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1005 unimplemented!("Unimplemented")
1006 }
1007 }
1008
1009 #[tokio::test(flavor = "current_thread", start_paused = true)]
1010 async fn test_handle_send_block() {
1011 let (context, _keys) = Context::new_for_test(4);
1012 let context = Arc::new(context);
1013 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1014 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1015 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1016 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1017 let network_client = Arc::new(FakeNetworkClient::default());
1018 let store = Arc::new(MemStore::new());
1019 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1020 let synchronizer = Synchronizer::start(
1021 network_client,
1022 context.clone(),
1023 core_dispatcher.clone(),
1024 commit_vote_monitor.clone(),
1025 block_verifier.clone(),
1026 dag_state.clone(),
1027 false,
1028 );
1029 let authority_service = Arc::new(AuthorityService::new(
1030 context.clone(),
1031 block_verifier,
1032 commit_vote_monitor,
1033 synchronizer,
1034 core_dispatcher.clone(),
1035 rx_block_broadcast,
1036 dag_state,
1037 store,
1038 ));
1039
1040 let now = context.clock.timestamp_utc_ms();
1042 let max_drift = context.parameters.max_forward_time_drift;
1043 let input_block = VerifiedBlock::new_for_test(
1044 TestBlock::new(9, 0)
1045 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1046 .build(),
1047 );
1048
1049 let service = authority_service.clone();
1050 let serialized = ExtendedSerializedBlock {
1051 block: input_block.serialized().clone(),
1052 excluded_ancestors: vec![],
1053 };
1054
1055 tokio::spawn(async move {
1056 service
1057 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1058 .await
1059 .unwrap();
1060 });
1061
1062 sleep(max_drift / 2).await;
1063 assert!(core_dispatcher.get_blocks().is_empty());
1064
1065 sleep(max_drift).await;
1066 let blocks = core_dispatcher.get_blocks();
1067 assert_eq!(blocks.len(), 1);
1068 assert_eq!(blocks[0], input_block);
1069 }
1070
1071 #[tokio::test(flavor = "current_thread", start_paused = true)]
1072 async fn test_handle_fetch_latest_blocks() {
1073 let (context, _keys) = Context::new_for_test(4);
1075 let context = Arc::new(context);
1076 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1077 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1078 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1079 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1080 let network_client = Arc::new(FakeNetworkClient::default());
1081 let store = Arc::new(MemStore::new());
1082 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1083 let synchronizer = Synchronizer::start(
1084 network_client,
1085 context.clone(),
1086 core_dispatcher.clone(),
1087 commit_vote_monitor.clone(),
1088 block_verifier.clone(),
1089 dag_state.clone(),
1090 true,
1091 );
1092 let authority_service = Arc::new(AuthorityService::new(
1093 context.clone(),
1094 block_verifier,
1095 commit_vote_monitor,
1096 synchronizer,
1097 core_dispatcher.clone(),
1098 rx_block_broadcast,
1099 dag_state.clone(),
1100 store,
1101 ));
1102
1103 let mut dag_builder = DagBuilder::new(context.clone());
1106 dag_builder
1107 .layers(1..=10)
1108 .authorities(vec![AuthorityIndex::new_for_test(2)])
1109 .equivocate(1)
1110 .build()
1111 .persist_layers(dag_state);
1112
1113 let authorities_to_request = vec![
1115 AuthorityIndex::new_for_test(1),
1116 AuthorityIndex::new_for_test(2),
1117 ];
1118 let results = authority_service
1119 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1120 .await;
1121
1122 let serialised_blocks = results.unwrap();
1124 for serialised_block in serialised_blocks {
1125 let signed_block: SignedBlock =
1126 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1127 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1128
1129 assert_eq!(verified_block.round(), 10);
1130 }
1131 }
1132}