1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7 num::NonZeroUsize,
8 sync::Arc,
9 time::Duration,
10};
11
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use futures::{StreamExt as _, stream::FuturesUnordered};
15use iota_macros::fail_point_async;
16use iota_metrics::{
17 monitored_future,
18 monitored_mpsc::{Receiver, Sender, channel},
19 monitored_scope,
20};
21use itertools::Itertools as _;
22use lru::LruCache;
23use parking_lot::{Mutex, RwLock};
24#[cfg(not(test))]
25use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
26use tap::TapFallible;
27use tokio::{
28 runtime::Handle,
29 sync::{mpsc::error::TrySendError, oneshot},
30 task::{JoinError, JoinSet},
31 time::{Instant, sleep, sleep_until, timeout},
32};
33use tracing::{debug, error, info, trace, warn};
34
35use crate::{
36 BlockAPI, CommitIndex, Round,
37 authority_service::COMMIT_LAG_MULTIPLIER,
38 block::{BlockDigest, BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
39 block_verifier::BlockVerifier,
40 commit_vote_monitor::CommitVoteMonitor,
41 context::Context,
42 core_thread::CoreThreadDispatcher,
43 dag_state::DagState,
44 error::{ConsensusError, ConsensusResult},
45 network::NetworkClient,
46 scoring_metrics_store::ErrorSource,
47};
48
49const FETCH_BLOCKS_CONCURRENCY: usize = 5;
51
52pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
56
57const VERIFIED_BLOCKS_CACHE_CAP: usize = 200_000;
59
60const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
62
63const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
65
66const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
70
71const MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK: usize = 1;
75
76const MAX_PERIODIC_SYNC_PEERS: usize = 4;
79
80const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
84
85#[derive(Clone)]
87enum SyncMethod {
88 Live,
89 Periodic,
90}
91
92struct BlocksGuard {
93 map: Arc<InflightBlocksMap>,
94 block_refs: BTreeSet<BlockRef>,
95 peer: AuthorityIndex,
96 method: SyncMethod,
97}
98
99impl Drop for BlocksGuard {
100 fn drop(&mut self) {
101 self.map.unlock_blocks(&self.block_refs, self.peer);
102 }
103}
104
105struct InflightBlocksMap {
111 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
112}
113
114impl InflightBlocksMap {
115 fn new() -> Arc<Self> {
116 Arc::new(Self {
117 inner: Mutex::new(HashMap::new()),
118 })
119 }
120
121 fn lock_blocks(
135 self: &Arc<Self>,
136 missing_block_refs: BTreeSet<BlockRef>,
137 peer: AuthorityIndex,
138 method: SyncMethod,
139 ) -> Option<BlocksGuard> {
140 let mut blocks = BTreeSet::new();
141 let mut inner = self.inner.lock();
142
143 for block_ref in missing_block_refs {
144 let authorities = inner.entry(block_ref).or_default();
145
146 if authorities.contains(&peer) {
148 continue;
149 }
150
151 let total_count = authorities.len();
153
154 let max_limit = match method {
156 SyncMethod::Live => MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK,
157 SyncMethod::Periodic => MAX_AUTHORITIES_TO_FETCH_PER_BLOCK,
158 };
159
160 if total_count < max_limit {
162 assert!(authorities.insert(peer));
163 blocks.insert(block_ref);
164 }
165 }
166
167 if blocks.is_empty() {
168 None
169 } else {
170 Some(BlocksGuard {
171 map: self.clone(),
172 block_refs: blocks,
173 peer,
174 method,
175 })
176 }
177 }
178
179 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
184 let mut blocks_to_fetch = self.inner.lock();
186 for block_ref in block_refs {
187 let authorities = blocks_to_fetch
188 .get_mut(block_ref)
189 .expect("Should have found a non empty map");
190
191 assert!(authorities.remove(&peer), "Peer index should be present!");
192
193 if authorities.is_empty() {
195 blocks_to_fetch.remove(block_ref);
196 }
197 }
198 }
199
200 fn swap_locks(
205 self: &Arc<Self>,
206 blocks_guard: BlocksGuard,
207 peer: AuthorityIndex,
208 ) -> Option<BlocksGuard> {
209 let block_refs = blocks_guard.block_refs.clone();
210 let method = blocks_guard.method.clone();
211
212 drop(blocks_guard);
214
215 self.lock_blocks(block_refs, peer, method)
217 }
218
219 #[cfg(test)]
220 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
221 let inner = self.inner.lock();
222 inner.len()
223 }
224}
225
226enum Command {
227 FetchBlocks {
228 missing_block_refs: BTreeSet<BlockRef>,
229 peer_index: AuthorityIndex,
230 result: oneshot::Sender<Result<(), ConsensusError>>,
231 },
232 FetchOwnLastBlock,
233 KickOffScheduler,
234}
235
236pub(crate) struct SynchronizerHandle {
237 commands_sender: Sender<Command>,
238 tasks: tokio::sync::Mutex<JoinSet<()>>,
239}
240
241impl SynchronizerHandle {
242 pub(crate) async fn fetch_blocks(
245 &self,
246 missing_block_refs: BTreeSet<BlockRef>,
247 peer_index: AuthorityIndex,
248 ) -> ConsensusResult<()> {
249 let (sender, receiver) = oneshot::channel();
250 self.commands_sender
251 .send(Command::FetchBlocks {
252 missing_block_refs,
253 peer_index,
254 result: sender,
255 })
256 .await
257 .map_err(|_err| ConsensusError::Shutdown)?;
258 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
259 }
260
261 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
262 let mut tasks = self.tasks.lock().await;
263 tasks.abort_all();
264 while let Some(result) = tasks.join_next().await {
265 result?
266 }
267 Ok(())
268 }
269}
270
271pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
298 context: Arc<Context>,
299 commands_receiver: Receiver<Command>,
300 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
301 core_dispatcher: Arc<D>,
302 commit_vote_monitor: Arc<CommitVoteMonitor>,
303 dag_state: Arc<RwLock<DagState>>,
304 fetch_blocks_scheduler_task: JoinSet<()>,
305 fetch_own_last_block_task: JoinSet<()>,
306 network_client: Arc<C>,
307 block_verifier: Arc<V>,
308 inflight_blocks_map: Arc<InflightBlocksMap>,
309 verified_blocks_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
310 commands_sender: Sender<Command>,
311}
312
313impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
314 pub fn start(
317 network_client: Arc<C>,
318 context: Arc<Context>,
319 core_dispatcher: Arc<D>,
320 commit_vote_monitor: Arc<CommitVoteMonitor>,
321 block_verifier: Arc<V>,
322 dag_state: Arc<RwLock<DagState>>,
323 sync_last_known_own_block: bool,
324 ) -> Arc<SynchronizerHandle> {
325 let (commands_sender, commands_receiver) =
326 channel("consensus_synchronizer_commands", 1_000);
327 let inflight_blocks_map = InflightBlocksMap::new();
328 let verified_blocks_cache = Arc::new(Mutex::new(LruCache::new(
329 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
330 )));
331
332 let mut fetch_block_senders = BTreeMap::new();
334 let mut tasks = JoinSet::new();
335 for (index, _) in context.committee.authorities() {
336 if index == context.own_index {
337 continue;
338 }
339 let (sender, receiver) =
340 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
341 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
342 index,
343 network_client.clone(),
344 block_verifier.clone(),
345 verified_blocks_cache.clone(),
346 commit_vote_monitor.clone(),
347 context.clone(),
348 core_dispatcher.clone(),
349 dag_state.clone(),
350 receiver,
351 commands_sender.clone(),
352 );
353 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
354 fetch_block_senders.insert(index, sender);
355 }
356
357 let commands_sender_clone = commands_sender.clone();
358
359 if sync_last_known_own_block {
360 commands_sender
361 .try_send(Command::FetchOwnLastBlock)
362 .expect("Failed to sync our last block");
363 }
364
365 tasks.spawn(monitored_future!(async move {
367 let mut s = Self {
368 context,
369 commands_receiver,
370 fetch_block_senders,
371 core_dispatcher,
372 commit_vote_monitor,
373 fetch_blocks_scheduler_task: JoinSet::new(),
374 fetch_own_last_block_task: JoinSet::new(),
375 network_client,
376 block_verifier,
377 inflight_blocks_map,
378 verified_blocks_cache,
379 commands_sender: commands_sender_clone,
380 dag_state,
381 };
382 s.run().await;
383 }));
384
385 Arc::new(SynchronizerHandle {
386 commands_sender,
387 tasks: tokio::sync::Mutex::new(tasks),
388 })
389 }
390
391 async fn run(&mut self) {
393 const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
396 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
397
398 tokio::pin!(scheduler_timeout);
399
400 loop {
401 tokio::select! {
402 Some(command) = self.commands_receiver.recv() => {
403 match command {
404 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
405 if peer_index == self.context.own_index {
406 error!("We should never attempt to fetch blocks from our own node");
407 continue;
408 }
409
410 let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
411
412 let missing_block_refs = missing_block_refs
415 .into_iter()
416 .take(self.context.parameters.max_blocks_per_sync)
417 .collect();
418
419 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index, SyncMethod::Live);
420 let Some(blocks_guard) = blocks_guard else {
421 result.send(Ok(())).ok();
422 continue;
423 };
424
425 let r = self
428 .fetch_block_senders
429 .get(&peer_index)
430 .expect("Fatal error, sender should be present")
431 .try_send(blocks_guard)
432 .map_err(|err| {
433 match err {
434 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
435 TrySendError::Closed(_) => ConsensusError::Shutdown
436 }
437 });
438
439 result.send(r).ok();
440 }
441 Command::FetchOwnLastBlock => {
442 if self.fetch_own_last_block_task.is_empty() {
443 self.start_fetch_own_last_block_task();
444 }
445 }
446 Command::KickOffScheduler => {
447 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
450 Instant::now()
451 } else {
452 Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
453 };
454
455 if timeout < scheduler_timeout.deadline() {
457 scheduler_timeout.as_mut().reset(timeout);
458 }
459 }
460 }
461 },
462 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
463 match result {
464 Ok(()) => {},
465 Err(e) => {
466 if e.is_cancelled() {
467 } else if e.is_panic() {
468 std::panic::resume_unwind(e.into_panic());
469 } else {
470 panic!("fetch our last block task failed: {e}");
471 }
472 },
473 };
474 },
475 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
476 match result {
477 Ok(()) => {},
478 Err(e) => {
479 if e.is_cancelled() {
480 } else if e.is_panic() {
481 std::panic::resume_unwind(e.into_panic());
482 } else {
483 panic!("fetch blocks scheduler task failed: {e}");
484 }
485 },
486 };
487 },
488 () = &mut scheduler_timeout => {
489 if self.fetch_blocks_scheduler_task.is_empty() {
491 if let Err(err) = self.start_fetch_missing_blocks_task().await {
492 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
493 return;
494 };
495 }
496
497 scheduler_timeout
498 .as_mut()
499 .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
500 }
501 }
502 }
503 }
504
505 async fn fetch_blocks_from_authority(
506 peer_index: AuthorityIndex,
507 network_client: Arc<C>,
508 block_verifier: Arc<V>,
509 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
510 commit_vote_monitor: Arc<CommitVoteMonitor>,
511 context: Arc<Context>,
512 core_dispatcher: Arc<D>,
513 dag_state: Arc<RwLock<DagState>>,
514 mut receiver: Receiver<BlocksGuard>,
515 commands_sender: Sender<Command>,
516 ) {
517 const MAX_RETRIES: u32 = 3;
518 let peer_hostname = &context.committee.authority(peer_index).hostname;
519
520 let mut requests = FuturesUnordered::new();
521
522 loop {
523 tokio::select! {
524 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
525 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
527
528 let metrics = &context.metrics.node_metrics;
530 metrics
531 .synchronizer_requested_blocks_by_peer
532 .with_label_values(&[peer_hostname.as_str(), "live"])
533 .inc_by(blocks_guard.block_refs.len() as u64);
534 let mut authors = HashSet::new();
536 for block_ref in &blocks_guard.block_refs {
537 authors.insert(block_ref.author);
538 }
539 for author in authors {
540 let host = &context.committee.authority(author).hostname;
541 metrics
542 .synchronizer_requested_blocks_by_authority
543 .with_label_values(&[host.as_str(), "live"])
544 .inc();
545 }
546
547 requests.push(Self::fetch_blocks_request(
548 network_client.clone(),
549 peer_index,
550 blocks_guard,
551 highest_rounds,
552 FETCH_REQUEST_TIMEOUT,
553 1,
554 ))
555 },
556 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
557 match response {
558 Ok(blocks) => {
559 if let Err(err) = Self::process_fetched_blocks(blocks,
560 peer_index,
561 blocks_guard,
562 core_dispatcher.clone(),
563 block_verifier.clone(),
564 verified_cache.clone(),
565 commit_vote_monitor.clone(),
566 context.clone(),
567 commands_sender.clone(),
568 "live"
569 ).await {
570 context.scoring_metrics_store.update_scoring_metrics_on_block_receival(
571 peer_index,
572 peer_hostname,
573 err.clone(),
574 ErrorSource::Synchronizer,
575 &context.metrics.node_metrics,
576 );
577 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
578 context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
579 }
580 },
581 Err(_) => {
582 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
583 if retries <= MAX_RETRIES {
584 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
585 } else {
586 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
587 drop(blocks_guard);
589 }
590 }
591 }
592 },
593 else => {
594 info!("Fetching blocks from authority {peer_index} task will now abort.");
595 break;
596 }
597 }
598 }
599 }
600
601 async fn process_fetched_blocks(
605 mut serialized_blocks: Vec<Bytes>,
606 peer_index: AuthorityIndex,
607 requested_blocks_guard: BlocksGuard,
608 core_dispatcher: Arc<D>,
609 block_verifier: Arc<V>,
610 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
611 commit_vote_monitor: Arc<CommitVoteMonitor>,
612 context: Arc<Context>,
613 commands_sender: Sender<Command>,
614 sync_method: &str,
615 ) -> ConsensusResult<()> {
616 if serialized_blocks.is_empty() {
617 return Ok(());
618 }
619 let _s = context
620 .metrics
621 .node_metrics
622 .scope_processing_time
623 .with_label_values(&["Synchronizer::process_fetched_blocks"])
624 .start_timer();
625
626 if context.protocol_config.consensus_batched_block_sync() {
628 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
629 } else {
630 if serialized_blocks.len()
633 > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
634 {
635 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
636 }
637 }
638
639 let blocks = Handle::current()
641 .spawn_blocking({
642 let block_verifier = block_verifier.clone();
643 let verified_cache = verified_cache.clone();
644 let context = context.clone();
645 let sync_method = sync_method.to_string();
646 move || {
647 Self::verify_blocks(
648 serialized_blocks,
649 block_verifier,
650 verified_cache,
651 &context,
652 peer_index,
653 &sync_method,
654 )
655 }
656 })
657 .await
658 .expect("Spawn blocking should not fail")?;
659
660 if !context.protocol_config.consensus_batched_block_sync() {
661 let ancestors = blocks
663 .iter()
664 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
665 .flat_map(|b| b.ancestors().to_vec())
666 .collect::<BTreeSet<BlockRef>>();
667
668 for block in &blocks {
671 if !requested_blocks_guard
672 .block_refs
673 .contains(&block.reference())
674 && !ancestors.contains(&block.reference())
675 {
676 return Err(ConsensusError::UnexpectedFetchedBlock {
677 index: peer_index,
678 block_ref: block.reference(),
679 });
680 }
681 }
682 }
683
684 for block in &blocks {
686 commit_vote_monitor.observe_block(block);
687 }
688
689 let metrics = &context.metrics.node_metrics;
690 let peer_hostname = &context.committee.authority(peer_index).hostname;
691 metrics
692 .synchronizer_fetched_blocks_by_peer
693 .with_label_values(&[peer_hostname.as_str(), sync_method])
694 .inc_by(blocks.len() as u64);
695 for block in &blocks {
696 let block_hostname = &context.committee.authority(block.author()).hostname;
697 metrics
698 .synchronizer_fetched_blocks_by_authority
699 .with_label_values(&[block_hostname.as_str(), sync_method])
700 .inc();
701 }
702
703 debug!(
704 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
705 blocks.len(),
706 blocks.iter().map(|b| b.reference().to_string()).join(", "),
707 );
708
709 let missing_blocks = core_dispatcher
713 .add_blocks(blocks)
714 .await
715 .map_err(|_| ConsensusError::Shutdown)?;
716
717 drop(requested_blocks_guard);
720
721 if !missing_blocks.is_empty() {
723 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
725 {
726 warn!("Commands channel is full")
727 }
728 }
729
730 context
731 .metrics
732 .node_metrics
733 .missing_blocks_after_fetch_total
734 .inc_by(missing_blocks.len() as u64);
735
736 Ok(())
737 }
738
739 fn get_highest_accepted_rounds(
740 dag_state: Arc<RwLock<DagState>>,
741 context: &Arc<Context>,
742 ) -> Vec<Round> {
743 let blocks = dag_state
744 .read()
745 .get_last_cached_block_per_authority(Round::MAX);
746 assert_eq!(blocks.len(), context.committee.size());
747
748 blocks
749 .into_iter()
750 .map(|(block, _)| block.round())
751 .collect::<Vec<_>>()
752 }
753
754 fn verify_blocks(
755 serialized_blocks: Vec<Bytes>,
756 block_verifier: Arc<V>,
757 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
758 context: &Context,
759 peer_index: AuthorityIndex,
760 sync_method: &str,
761 ) -> ConsensusResult<Vec<VerifiedBlock>> {
762 let mut verified_blocks = Vec::new();
763 let mut skipped_count = 0u64;
764
765 for serialized_block in serialized_blocks {
766 let block_digest = VerifiedBlock::compute_digest(&serialized_block);
767
768 if verified_cache.lock().get(&block_digest).is_some() {
770 skipped_count += 1;
771 continue; }
773
774 let signed_block: SignedBlock =
775 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
776
777 if let Err(e) = block_verifier.verify(&signed_block) {
778 let hostname = context.committee.authority(peer_index).hostname.clone();
781
782 context
783 .metrics
784 .node_metrics
785 .invalid_blocks
786 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
787 .inc();
788 warn!("Invalid block received from {}: {}", peer_index, e);
789 return Err(e);
790 }
791
792 verified_cache.lock().put(block_digest, ());
794
795 let verified_block = VerifiedBlock::new_verified_with_digest(
796 signed_block,
797 serialized_block,
798 block_digest,
799 );
800
801 let now = context.clock.timestamp_utc_ms();
805 let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
806 if drift > 0 {
807 let peer_hostname = &context
808 .committee
809 .authority(verified_block.author())
810 .hostname;
811 context
812 .metrics
813 .node_metrics
814 .block_timestamp_drift_ms
815 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
816 .inc_by(drift);
817
818 if context
819 .protocol_config
820 .consensus_median_timestamp_with_checkpoint_enforcement()
821 {
822 trace!(
823 "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
824 verified_block.reference(),
825 verified_block.timestamp_ms(),
826 now
827 );
828 } else {
829 warn!(
830 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
831 verified_block.reference(),
832 verified_block.timestamp_ms(),
833 now
834 );
835 continue;
836 }
837 }
838
839 verified_blocks.push(verified_block);
840 }
841
842 if skipped_count > 0 {
844 let peer_hostname = &context.committee.authority(peer_index).hostname;
845 context
846 .metrics
847 .node_metrics
848 .synchronizer_skipped_blocks_by_peer
849 .with_label_values(&[peer_hostname.as_str(), sync_method])
850 .inc_by(skipped_count);
851 }
852
853 Ok(verified_blocks)
854 }
855
856 async fn fetch_blocks_request(
857 network_client: Arc<C>,
858 peer: AuthorityIndex,
859 blocks_guard: BlocksGuard,
860 highest_rounds: Vec<Round>,
861 request_timeout: Duration,
862 mut retries: u32,
863 ) -> (
864 ConsensusResult<Vec<Bytes>>,
865 BlocksGuard,
866 u32,
867 AuthorityIndex,
868 Vec<Round>,
869 ) {
870 let start = Instant::now();
871 let resp = timeout(
872 request_timeout,
873 network_client.fetch_blocks(
874 peer,
875 blocks_guard
876 .block_refs
877 .clone()
878 .into_iter()
879 .collect::<Vec<_>>(),
880 highest_rounds.clone(),
881 request_timeout,
882 ),
883 )
884 .await;
885
886 fail_point_async!("consensus-delay");
887
888 let resp = match resp {
889 Ok(Err(err)) => {
890 sleep_until(start + request_timeout).await;
893 retries += 1;
894 Err(err)
895 } Err(err) => {
897 sleep_until(start + request_timeout).await;
899 retries += 1;
900 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
901 }
902 Ok(result) => result,
903 };
904 (resp, blocks_guard, retries, peer, highest_rounds)
905 }
906
907 fn start_fetch_own_last_block_task(&mut self) {
908 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
909 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
910
911 let context = self.context.clone();
912 let dag_state = self.dag_state.clone();
913 let network_client = self.network_client.clone();
914 let block_verifier = self.block_verifier.clone();
915 let core_dispatcher = self.core_dispatcher.clone();
916
917 self.fetch_own_last_block_task
918 .spawn(monitored_future!(async move {
919 let _scope = monitored_scope("FetchOwnLastBlockTask");
920
921 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
922 let network_client_cloned = network_client.clone();
923 let own_index = context.own_index;
924 async move {
925 sleep(fetch_own_block_delay).await;
926 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
927 (r, authority_index)
928 }
929 };
930
931 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
932 let mut result = Vec::new();
933 for serialized_block in blocks {
934 let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
935 block_verifier.verify(&signed_block).tap_err(|err|{
936 let hostname = context.committee.authority(authority_index).hostname.clone();
937 context
938 .metrics
939 .node_metrics
940 .invalid_blocks
941 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
942 .inc();
943 warn!("Invalid block received from {}: {}", authority_index, err);
944 })?;
945
946 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
947 if verified_block.author() != context.own_index {
948 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
949 }
950 result.push(verified_block);
951 }
952 Ok(result)
953 };
954
955 let mut highest_round = GENESIS_ROUND;
957 let mut received_response = vec![false; context.committee.size()];
959 received_response[context.own_index] = true;
961 let mut total_stake = context.committee.stake(context.own_index);
962 let mut retries = 0;
963 let mut retry_delay_step = Duration::from_millis(500);
964 'main:loop {
965 if context.committee.size() == 1 {
966 highest_round = dag_state.read().get_last_proposed_block().round();
967 info!("Only one node in the network, will not try fetching own last block from peers.");
968 break 'main;
969 }
970
971 let mut results = FuturesUnordered::new();
973
974 for (authority_index, _authority) in context.committee.authorities() {
975 if !received_response[authority_index] {
977 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
978 }
979 }
980
981 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
983 tokio::pin!(timer);
984
985 'inner: loop {
986 tokio::select! {
987 result = results.next() => {
988 let Some((result, authority_index)) = result else {
989 break 'inner;
990 };
991 match result {
992 Ok(result) => {
993 match process_blocks(result, authority_index) {
994 Ok(blocks) => {
995 received_response[authority_index] = true;
996 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
997 highest_round = highest_round.max(max_round);
998
999 total_stake += context.committee.stake(authority_index);
1000 },
1001 Err(err) => {
1002 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
1003 }
1004 }
1005 },
1006 Err(err) => {
1007 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
1008 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
1009 }
1010 }
1011 },
1012 () = &mut timer => {
1013 info!("Timeout while trying to sync our own last block from peers");
1014 break 'inner;
1015 }
1016 }
1017 }
1018
1019 if context.committee.reached_quorum(total_stake) {
1021 info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1022 break 'main;
1023 } else {
1024 info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1025 }
1026
1027 retries += 1;
1028 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
1029 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
1030
1031 sleep(retry_delay_step).await;
1032
1033 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
1034 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
1035 }
1036
1037 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
1039
1040 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
1041 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
1042 }
1043 }));
1044 }
1045
1046 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
1047 let mut missing_blocks = self
1048 .core_dispatcher
1049 .get_missing_blocks()
1050 .await
1051 .map_err(|_err| ConsensusError::Shutdown)?;
1052
1053 if missing_blocks.is_empty() {
1055 return Ok(());
1056 }
1057
1058 let context = self.context.clone();
1059 let network_client = self.network_client.clone();
1060 let block_verifier = self.block_verifier.clone();
1061 let verified_cache = self.verified_blocks_cache.clone();
1062 let commit_vote_monitor = self.commit_vote_monitor.clone();
1063 let core_dispatcher = self.core_dispatcher.clone();
1064 let blocks_to_fetch = self.inflight_blocks_map.clone();
1065 let commands_sender = self.commands_sender.clone();
1066 let dag_state = self.dag_state.clone();
1067
1068 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
1069 trace!(
1070 "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
1071 );
1072 if commit_lagging {
1073 if dag_state.read().gc_enabled() {
1078 return Ok(());
1079 }
1080
1081 let highest_accepted_round = dag_state.read().highest_accepted_round();
1085 missing_blocks = missing_blocks
1086 .into_iter()
1087 .take_while(|(block_ref, _)| {
1088 block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
1089 })
1090 .collect::<BTreeMap<_, _>>();
1091
1092 if missing_blocks.is_empty() {
1095 trace!(
1096 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1097 );
1098 self.context
1099 .metrics
1100 .node_metrics
1101 .fetch_blocks_scheduler_skipped
1102 .with_label_values(&["commit_lagging"])
1103 .inc();
1104 return Ok(());
1105 }
1106 }
1107
1108 self.fetch_blocks_scheduler_task
1109 .spawn(monitored_future!(async move {
1110 let _scope = monitored_scope("FetchMissingBlocksScheduler");
1111
1112 context
1113 .metrics
1114 .node_metrics
1115 .fetch_blocks_scheduler_inflight
1116 .inc();
1117 let total_requested = missing_blocks.len();
1118
1119 fail_point_async!("consensus-delay");
1120
1121 let results = Self::fetch_blocks_from_authorities(
1123 context.clone(),
1124 blocks_to_fetch.clone(),
1125 network_client,
1126 missing_blocks,
1127 dag_state,
1128 )
1129 .await;
1130 context
1131 .metrics
1132 .node_metrics
1133 .fetch_blocks_scheduler_inflight
1134 .dec();
1135 if results.is_empty() {
1136 warn!("No results returned while requesting missing blocks");
1137 return;
1138 }
1139
1140 let mut total_fetched = 0;
1142 for (blocks_guard, fetched_blocks, peer) in results {
1143 total_fetched += fetched_blocks.len();
1144
1145 if let Err(err) = Self::process_fetched_blocks(
1146 fetched_blocks,
1147 peer,
1148 blocks_guard,
1149 core_dispatcher.clone(),
1150 block_verifier.clone(),
1151 verified_cache.clone(),
1152 commit_vote_monitor.clone(),
1153 context.clone(),
1154 commands_sender.clone(),
1155 "periodic",
1156 )
1157 .await
1158 {
1159 warn!(
1160 "Error occurred while processing fetched blocks from peer {peer}: {err}"
1161 );
1162 }
1163 }
1164
1165 debug!(
1166 "Total blocks requested to fetch: {}, total fetched: {}",
1167 total_requested, total_fetched
1168 );
1169 }));
1170 Ok(())
1171 }
1172
1173 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1174 let last_commit_index = self.dag_state.read().last_commit_index();
1175 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1176 let commit_threshold = last_commit_index
1177 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1178 (
1179 commit_threshold < quorum_commit_index,
1180 last_commit_index,
1181 quorum_commit_index,
1182 )
1183 }
1184
1185 fn missing_block_round_threshold(&self) -> Round {
1192 self.context.parameters.commit_sync_batch_size
1193 }
1194
1195 async fn fetch_blocks_from_authorities(
1210 context: Arc<Context>,
1211 inflight_blocks: Arc<InflightBlocksMap>,
1212 network_client: Arc<C>,
1213 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1214 dag_state: Arc<RwLock<DagState>>,
1215 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1216 let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1218 for (missing_block_ref, authorities) in &missing_blocks {
1219 for author in authorities {
1220 if author == &context.own_index {
1221 continue;
1223 }
1224 authority_to_blocks
1225 .entry(*author)
1226 .or_default()
1227 .push(*missing_block_ref);
1228 }
1229 }
1230
1231 #[cfg(not(test))]
1235 let mut rng = StdRng::from_entropy();
1236
1237 #[cfg(not(test))]
1240 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1241 authority_to_blocks
1242 .iter()
1243 .choose_multiple(
1244 &mut rng,
1245 MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1246 )
1247 .into_iter()
1248 .map(|(&peer, blocks)| {
1249 let limited_blocks = blocks
1250 .iter()
1251 .copied()
1252 .take(context.parameters.max_blocks_per_sync)
1253 .collect();
1254 (peer, limited_blocks, "periodic_known")
1255 })
1256 .collect();
1257 #[cfg(test)]
1258 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1260 let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1261 .iter()
1262 .map(|(&peer, blocks)| {
1263 let limited_blocks = blocks
1264 .iter()
1265 .copied()
1266 .take(context.parameters.max_blocks_per_sync)
1267 .collect();
1268 (peer, limited_blocks, "periodic_known")
1269 })
1270 .collect();
1271 items.sort_by_key(|(peer, _, _)| *peer);
1274 items
1275 .into_iter()
1276 .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1277 .collect()
1278 };
1279
1280 let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1283 .iter()
1284 .map(|(peer, _, _)| *peer)
1285 .collect();
1286
1287 let random_candidates: Vec<_> = context
1288 .committee
1289 .authorities()
1290 .filter_map(|(peer_index, _)| {
1291 (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1292 .then_some(peer_index)
1293 })
1294 .collect();
1295 #[cfg(test)]
1296 let random_peers: Vec<AuthorityIndex> = random_candidates
1297 .into_iter()
1298 .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1299 .collect();
1300 #[cfg(not(test))]
1301 let random_peers: Vec<AuthorityIndex> = random_candidates
1302 .into_iter()
1303 .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1304
1305 #[cfg_attr(test, allow(unused_mut))]
1306 let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1307 #[cfg(not(test))]
1310 all_missing_blocks.shuffle(&mut rng);
1311
1312 let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1313
1314 for peer in random_peers {
1315 if let Some(chunk) = block_chunks.next() {
1316 chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1317 } else {
1318 break;
1319 }
1320 }
1321
1322 let mut request_futures = FuturesUnordered::new();
1323
1324 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1325
1326 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1328 for block in &all_missing_blocks {
1329 missing_blocks_per_authority[block.author] += 1;
1330 }
1331 for (missing, (_, authority)) in missing_blocks_per_authority
1332 .into_iter()
1333 .zip(context.committee.authorities())
1334 {
1335 context
1336 .metrics
1337 .node_metrics
1338 .synchronizer_missing_blocks_by_authority
1339 .with_label_values(&[&authority.hostname])
1340 .inc_by(missing as u64);
1341 context
1342 .metrics
1343 .node_metrics
1344 .synchronizer_current_missing_blocks_by_authority
1345 .with_label_values(&[&authority.hostname])
1346 .set(missing as i64);
1347 }
1348
1349 #[cfg_attr(test, expect(unused_mut))]
1352 let mut remaining_peers: Vec<_> = context
1353 .committee
1354 .authorities()
1355 .filter_map(|(peer_index, _)| {
1356 if peer_index != context.own_index
1357 && !chosen_peers_with_blocks
1358 .iter()
1359 .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1360 {
1361 Some(peer_index)
1362 } else {
1363 None
1364 }
1365 })
1366 .collect();
1367
1368 #[cfg(not(test))]
1369 remaining_peers.shuffle(&mut rng);
1370 let mut remaining_peers = remaining_peers.into_iter();
1371
1372 for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1374 let peer_hostname = &context.committee.authority(peer).hostname;
1375 let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1376
1377 if let Some(blocks_guard) =
1380 inflight_blocks.lock_blocks(block_refs.clone(), peer, SyncMethod::Periodic)
1381 {
1382 info!(
1383 "Periodic sync of {} missing blocks from peer {} {}: {}",
1384 block_refs.len(),
1385 peer,
1386 peer_hostname,
1387 block_refs
1388 .iter()
1389 .map(|b| b.to_string())
1390 .collect::<Vec<_>>()
1391 .join(", ")
1392 );
1393 let metrics = &context.metrics.node_metrics;
1395 metrics
1396 .synchronizer_requested_blocks_by_peer
1397 .with_label_values(&[peer_hostname.as_str(), label])
1398 .inc_by(block_refs.len() as u64);
1399 for block_ref in &block_refs {
1400 let block_hostname = &context.committee.authority(block_ref.author).hostname;
1401 metrics
1402 .synchronizer_requested_blocks_by_authority
1403 .with_label_values(&[block_hostname.as_str(), label])
1404 .inc();
1405 }
1406 request_futures.push(Self::fetch_blocks_request(
1407 network_client.clone(),
1408 peer,
1409 blocks_guard,
1410 highest_rounds.clone(),
1411 FETCH_REQUEST_TIMEOUT,
1412 1,
1413 ));
1414 }
1415 }
1416
1417 let mut results = Vec::new();
1418 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1419
1420 tokio::pin!(fetcher_timeout);
1421
1422 loop {
1423 tokio::select! {
1424 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1425 let peer_hostname = &context.committee.authority(peer_index).hostname;
1426 match response {
1427 Ok(fetched_blocks) => {
1428 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1429 results.push((blocks_guard, fetched_blocks, peer_index));
1430
1431 if request_futures.is_empty() {
1433 break;
1434 }
1435 },
1436 Err(_) => {
1437 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1438 if let Some(next_peer) = remaining_peers.next() {
1440 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1442 info!(
1443 "Retrying syncing {} missing blocks from peer {}: {}",
1444 blocks_guard.block_refs.len(),
1445 peer_hostname,
1446 blocks_guard.block_refs
1447 .iter()
1448 .map(|b| b.to_string())
1449 .collect::<Vec<_>>()
1450 .join(", ")
1451 );
1452 let block_refs = blocks_guard.block_refs.clone();
1453 let metrics = &context.metrics.node_metrics;
1455 metrics
1456 .synchronizer_requested_blocks_by_peer
1457 .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1458 .inc_by(block_refs.len() as u64);
1459 for block_ref in &block_refs {
1460 let block_hostname =
1461 &context.committee.authority(block_ref.author).hostname;
1462 metrics
1463 .synchronizer_requested_blocks_by_authority
1464 .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1465 .inc();
1466 }
1467 request_futures.push(Self::fetch_blocks_request(
1468 network_client.clone(),
1469 next_peer,
1470 blocks_guard,
1471 highest_rounds,
1472 FETCH_REQUEST_TIMEOUT,
1473 1,
1474 ));
1475 } else {
1476 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1477 }
1478 } else {
1479 debug!("No more peers left to fetch blocks");
1480 }
1481 }
1482 }
1483 },
1484 _ = &mut fetcher_timeout => {
1485 debug!("Timed out while fetching missing blocks");
1486 break;
1487 }
1488 }
1489 }
1490
1491 results
1492 }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497 use std::{
1498 collections::{BTreeMap, BTreeSet},
1499 num::NonZeroUsize,
1500 sync::Arc,
1501 time::Duration,
1502 };
1503
1504 use async_trait::async_trait;
1505 use bytes::Bytes;
1506 use consensus_config::{AuthorityIndex, Parameters};
1507 use iota_metrics::monitored_mpsc;
1508 use lru::LruCache;
1509 use parking_lot::{Mutex as SyncMutex, RwLock};
1510 use tokio::{sync::Mutex, time::sleep};
1511
1512 use crate::{
1513 CommitDigest, CommitIndex,
1514 authority_service::COMMIT_LAG_MULTIPLIER,
1515 block::{BlockDigest, BlockRef, Round, SignedBlock, TestBlock, VerifiedBlock},
1516 block_verifier::{BlockVerifier, NoopBlockVerifier},
1517 commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1518 commit_vote_monitor::CommitVoteMonitor,
1519 context::Context,
1520 core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1521 dag_state::DagState,
1522 error::{ConsensusError, ConsensusResult},
1523 network::{BlockStream, NetworkClient},
1524 round_prober::QuorumRound,
1525 storage::mem_store::MemStore,
1526 synchronizer::{
1527 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, SyncMethod,
1528 Synchronizer, VERIFIED_BLOCKS_CACHE_CAP,
1529 },
1530 };
1531
1532 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1533 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1534 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1535 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1536
1537 struct FailingBlockVerifier;
1539
1540 impl BlockVerifier for FailingBlockVerifier {
1541 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1542 Err(ConsensusError::WrongEpoch {
1543 expected: 1,
1544 actual: 0,
1545 })
1546 }
1547
1548 fn check_ancestors(
1549 &self,
1550 _block: &VerifiedBlock,
1551 _ancestors: &[Option<VerifiedBlock>],
1552 _gc_enabled: bool,
1553 _gc_round: Round,
1554 ) -> ConsensusResult<()> {
1555 Ok(())
1556 }
1557 }
1558
1559 #[derive(Default)]
1560 struct MockNetworkClient {
1561 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1562 fetch_latest_blocks_requests:
1563 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1564 }
1565
1566 impl MockNetworkClient {
1567 async fn stub_fetch_blocks(
1568 &self,
1569 blocks: Vec<VerifiedBlock>,
1570 peer: AuthorityIndex,
1571 latency: Option<Duration>,
1572 ) {
1573 let mut lock = self.fetch_blocks_requests.lock().await;
1574 let block_refs = blocks
1575 .iter()
1576 .map(|block| block.reference())
1577 .collect::<Vec<_>>();
1578 lock.insert((block_refs, peer), (blocks, latency));
1579 }
1580
1581 async fn stub_fetch_latest_blocks(
1582 &self,
1583 blocks: Vec<VerifiedBlock>,
1584 peer: AuthorityIndex,
1585 authorities: Vec<AuthorityIndex>,
1586 latency: Option<Duration>,
1587 ) {
1588 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1589 lock.entry((peer, authorities))
1590 .or_default()
1591 .push((blocks, latency));
1592 }
1593
1594 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1595 let lock = self.fetch_latest_blocks_requests.lock().await;
1596 lock.len()
1597 }
1598 }
1599
1600 #[async_trait]
1601 impl NetworkClient for MockNetworkClient {
1602 const SUPPORT_STREAMING: bool = false;
1603
1604 async fn send_block(
1605 &self,
1606 _peer: AuthorityIndex,
1607 _serialized_block: &VerifiedBlock,
1608 _timeout: Duration,
1609 ) -> ConsensusResult<()> {
1610 unimplemented!("Unimplemented")
1611 }
1612
1613 async fn subscribe_blocks(
1614 &self,
1615 _peer: AuthorityIndex,
1616 _last_received: Round,
1617 _timeout: Duration,
1618 ) -> ConsensusResult<BlockStream> {
1619 unimplemented!("Unimplemented")
1620 }
1621
1622 async fn fetch_blocks(
1623 &self,
1624 peer: AuthorityIndex,
1625 block_refs: Vec<BlockRef>,
1626 _highest_accepted_rounds: Vec<Round>,
1627 _timeout: Duration,
1628 ) -> ConsensusResult<Vec<Bytes>> {
1629 let mut lock = self.fetch_blocks_requests.lock().await;
1630 let response = lock
1631 .remove(&(block_refs, peer))
1632 .expect("Unexpected fetch blocks request made");
1633
1634 let serialised = response
1635 .0
1636 .into_iter()
1637 .map(|block| block.serialized().clone())
1638 .collect::<Vec<_>>();
1639
1640 drop(lock);
1641
1642 if let Some(latency) = response.1 {
1643 sleep(latency).await;
1644 }
1645
1646 Ok(serialised)
1647 }
1648
1649 async fn fetch_commits(
1650 &self,
1651 _peer: AuthorityIndex,
1652 _commit_range: CommitRange,
1653 _timeout: Duration,
1654 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1655 unimplemented!("Unimplemented")
1656 }
1657
1658 async fn fetch_latest_blocks(
1659 &self,
1660 peer: AuthorityIndex,
1661 authorities: Vec<AuthorityIndex>,
1662 _timeout: Duration,
1663 ) -> ConsensusResult<Vec<Bytes>> {
1664 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1665 let mut responses = lock
1666 .remove(&(peer, authorities.clone()))
1667 .expect("Unexpected fetch blocks request made");
1668
1669 let response = responses.remove(0);
1670 let serialised = response
1671 .0
1672 .into_iter()
1673 .map(|block| block.serialized().clone())
1674 .collect::<Vec<_>>();
1675
1676 if !responses.is_empty() {
1677 lock.insert((peer, authorities), responses);
1678 }
1679
1680 drop(lock);
1681
1682 if let Some(latency) = response.1 {
1683 sleep(latency).await;
1684 }
1685
1686 Ok(serialised)
1687 }
1688
1689 async fn get_latest_rounds(
1690 &self,
1691 _peer: AuthorityIndex,
1692 _timeout: Duration,
1693 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1694 unimplemented!("Unimplemented")
1695 }
1696 }
1697
1698 #[test]
1699 fn test_inflight_blocks_map() {
1700 let map = InflightBlocksMap::new();
1702 let some_block_refs = [
1703 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1704 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1705 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1706 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1707 ];
1708 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1709
1710 {
1712 let mut all_guards = Vec::new();
1713
1714 for i in 1..=2 {
1716 let authority = AuthorityIndex::new_for_test(i);
1717
1718 let guard =
1719 map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1720 let guard = guard.expect("Guard should be created");
1721 assert_eq!(guard.block_refs.len(), 4);
1722
1723 all_guards.push(guard);
1724
1725 let guard =
1727 map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1728 assert!(guard.is_none());
1729 }
1730
1731 let authority_3 = AuthorityIndex::new_for_test(3);
1734
1735 let guard = map.lock_blocks(
1736 missing_block_refs.clone(),
1737 authority_3,
1738 SyncMethod::Periodic,
1739 );
1740 assert!(guard.is_none());
1741
1742 drop(all_guards.remove(0));
1745
1746 let guard = map.lock_blocks(
1747 missing_block_refs.clone(),
1748 authority_3,
1749 SyncMethod::Periodic,
1750 );
1751 let guard = guard.expect("Guard should be successfully acquired");
1752
1753 assert_eq!(guard.block_refs, missing_block_refs);
1754
1755 drop(guard);
1757 drop(all_guards);
1758
1759 assert_eq!(map.num_of_locked_blocks(), 0);
1760 }
1761
1762 {
1764 let authority_1 = AuthorityIndex::new_for_test(1);
1766 let guard = map
1767 .lock_blocks(
1768 missing_block_refs.clone(),
1769 authority_1,
1770 SyncMethod::Periodic,
1771 )
1772 .unwrap();
1773
1774 let authority_2 = AuthorityIndex::new_for_test(2);
1776 let guard = map.swap_locks(guard, authority_2);
1777
1778 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1779 }
1780 }
1781
1782 #[test]
1783 fn test_inflight_blocks_map_live_sync_limit() {
1784 let map = InflightBlocksMap::new();
1786 let some_block_refs = [
1787 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1788 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1789 ];
1790 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1791
1792 let authority_1 = AuthorityIndex::new_for_test(1);
1794 let guard_1 = map
1795 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1796 .expect("Should successfully lock with Live sync");
1797
1798 assert_eq!(guard_1.block_refs.len(), 2);
1799
1800 let authority_2 = AuthorityIndex::new_for_test(2);
1802 let guard_2 = map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1803
1804 assert!(
1805 guard_2.is_none(),
1806 "Should fail to lock - Live limit of 1 reached"
1807 );
1808
1809 drop(guard_1);
1811
1812 let guard_2 = map
1814 .lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live)
1815 .expect("Should successfully lock after authority 1 released");
1816
1817 assert_eq!(guard_2.block_refs.len(), 2);
1818 }
1819
1820 #[test]
1821 fn test_inflight_blocks_map_periodic_allows_more_concurrency() {
1822 let map = InflightBlocksMap::new();
1824 let some_block_refs = [
1825 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1826 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1827 ];
1828 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1829
1830 let authority_1 = AuthorityIndex::new_for_test(1);
1832 let guard_1 = map
1833 .lock_blocks(
1834 missing_block_refs.clone(),
1835 authority_1,
1836 SyncMethod::Periodic,
1837 )
1838 .expect("Should successfully lock with Periodic sync");
1839
1840 assert_eq!(guard_1.block_refs.len(), 2);
1841
1842 let authority_2 = AuthorityIndex::new_for_test(2);
1844 let guard_2 = map
1845 .lock_blocks(
1846 missing_block_refs.clone(),
1847 authority_2,
1848 SyncMethod::Periodic,
1849 )
1850 .expect("Should successfully lock - Periodic allows 2 authorities");
1851
1852 assert_eq!(guard_2.block_refs.len(), 2);
1853
1854 let authority_3 = AuthorityIndex::new_for_test(3);
1856 let guard_3 = map.lock_blocks(
1857 missing_block_refs.clone(),
1858 authority_3,
1859 SyncMethod::Periodic,
1860 );
1861
1862 assert!(
1863 guard_3.is_none(),
1864 "Should fail to lock - Periodic limit of 2 reached"
1865 );
1866
1867 drop(guard_1);
1869
1870 let guard_3 = map
1872 .lock_blocks(
1873 missing_block_refs.clone(),
1874 authority_3,
1875 SyncMethod::Periodic,
1876 )
1877 .expect("Should successfully lock after authority 1 released");
1878
1879 assert_eq!(guard_3.block_refs.len(), 2);
1880 }
1881
1882 #[test]
1883 fn test_inflight_blocks_map_periodic_blocks_live_when_at_live_limit() {
1884 let map = InflightBlocksMap::new();
1886 let some_block_refs = [
1887 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1888 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1889 ];
1890 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1891
1892 let authority_1 = AuthorityIndex::new_for_test(1);
1894 let guard_1 = map
1895 .lock_blocks(
1896 missing_block_refs.clone(),
1897 authority_1,
1898 SyncMethod::Periodic,
1899 )
1900 .expect("Should successfully lock with Periodic sync");
1901
1902 assert_eq!(guard_1.block_refs.len(), 2);
1903
1904 let authority_2 = AuthorityIndex::new_for_test(2);
1907 let guard_2_live =
1908 map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1909
1910 assert!(
1911 guard_2_live.is_none(),
1912 "Should fail to lock with Live - total already at Live limit of 1"
1913 );
1914
1915 let guard_2_periodic = map
1918 .lock_blocks(
1919 missing_block_refs.clone(),
1920 authority_2,
1921 SyncMethod::Periodic,
1922 )
1923 .expect("Should successfully lock with Periodic - under Periodic limit of 2");
1924
1925 assert_eq!(guard_2_periodic.block_refs.len(), 2);
1926 }
1927
1928 #[test]
1929 fn test_inflight_blocks_map_live_then_periodic_interaction() {
1930 let map = InflightBlocksMap::new();
1932 let some_block_refs = [
1933 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1934 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1935 ];
1936 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1937
1938 let authority_1 = AuthorityIndex::new_for_test(1);
1940 let guard_1 = map
1941 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1942 .expect("Should successfully lock with Live sync");
1943
1944 assert_eq!(guard_1.block_refs.len(), 2);
1945
1946 let authority_2 = AuthorityIndex::new_for_test(2);
1948 let guard_2_live =
1949 map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1950
1951 assert!(
1952 guard_2_live.is_none(),
1953 "Should fail to lock with Live - would exceed Live limit of 1"
1954 );
1955
1956 let guard_2 = map
1958 .lock_blocks(
1959 missing_block_refs.clone(),
1960 authority_2,
1961 SyncMethod::Periodic,
1962 )
1963 .expect("Should successfully lock with Periodic - total 2 is at Periodic limit");
1964
1965 assert_eq!(guard_2.block_refs.len(), 2);
1966
1967 let authority_3 = AuthorityIndex::new_for_test(3);
1970 let guard_3 = map.lock_blocks(
1971 missing_block_refs.clone(),
1972 authority_3,
1973 SyncMethod::Periodic,
1974 );
1975
1976 assert!(
1977 guard_3.is_none(),
1978 "Should fail to lock with Periodic - would exceed Periodic limit of 2"
1979 );
1980 }
1981
1982 #[test]
1983 fn test_inflight_blocks_map_partial_locks_mixed_methods() {
1984 let map = InflightBlocksMap::new();
1986 let block_a = BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1987 let block_b = BlockRef::new(2, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1988 let block_c = BlockRef::new(3, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1989 let block_d = BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1990
1991 let guard_a = map
1993 .lock_blocks(
1994 [block_a].into(),
1995 AuthorityIndex::new_for_test(1),
1996 SyncMethod::Live,
1997 )
1998 .expect("Should lock block A");
1999 assert_eq!(guard_a.block_refs.len(), 1);
2000
2001 let guard_b1 = map
2003 .lock_blocks(
2004 [block_b].into(),
2005 AuthorityIndex::new_for_test(1),
2006 SyncMethod::Periodic,
2007 )
2008 .expect("Should lock block B");
2009 let guard_b2 = map
2010 .lock_blocks(
2011 [block_b].into(),
2012 AuthorityIndex::new_for_test(2),
2013 SyncMethod::Periodic,
2014 )
2015 .expect("Should lock block B again");
2016 assert_eq!(guard_b1.block_refs.len(), 1);
2017 assert_eq!(guard_b2.block_refs.len(), 1);
2018
2019 let guard_c = map
2021 .lock_blocks(
2022 [block_c].into(),
2023 AuthorityIndex::new_for_test(1),
2024 SyncMethod::Periodic,
2025 )
2026 .expect("Should lock block C");
2027 assert_eq!(guard_c.block_refs.len(), 1);
2028
2029 let all_blocks = [block_a, block_b, block_c, block_d].into();
2033 let guard_3 = map
2034 .lock_blocks(
2035 all_blocks,
2036 AuthorityIndex::new_for_test(3),
2037 SyncMethod::Periodic,
2038 )
2039 .expect("Should get partial lock");
2040
2041 assert_eq!(
2048 guard_3.block_refs.len(),
2049 3,
2050 "Should lock blocks A, C, and D"
2051 );
2052 assert!(
2053 guard_3.block_refs.contains(&block_a),
2054 "Should contain block A"
2055 );
2056 assert!(
2057 !guard_3.block_refs.contains(&block_b),
2058 "Should NOT contain block B (at limit)"
2059 );
2060 assert!(
2061 guard_3.block_refs.contains(&block_c),
2062 "Should contain block C"
2063 );
2064 assert!(
2065 guard_3.block_refs.contains(&block_d),
2066 "Should contain block D"
2067 );
2068 }
2069
2070 #[test]
2071 fn test_inflight_blocks_map_swap_locks_preserves_method() {
2072 let map = InflightBlocksMap::new();
2074 let some_block_refs = [
2075 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2076 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2077 ];
2078 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
2079
2080 let authority_1 = AuthorityIndex::new_for_test(1);
2082 let guard_1 = map
2083 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
2084 .expect("Should lock with Live sync");
2085
2086 assert_eq!(guard_1.block_refs.len(), 2);
2087
2088 let authority_2 = AuthorityIndex::new_for_test(2);
2090 let guard_2 = map
2091 .swap_locks(guard_1, authority_2)
2092 .expect("Should swap locks");
2093
2094 assert_eq!(guard_2.block_refs, missing_block_refs);
2096
2097 let authority_3 = AuthorityIndex::new_for_test(3);
2099 let guard_3 = map.lock_blocks(missing_block_refs.clone(), authority_3, SyncMethod::Live);
2100 assert!(guard_3.is_none(), "Should fail - Live limit reached");
2101
2102 let guard_3_periodic = map
2104 .lock_blocks(
2105 missing_block_refs.clone(),
2106 authority_3,
2107 SyncMethod::Periodic,
2108 )
2109 .expect("Should lock with Periodic");
2110 assert_eq!(guard_3_periodic.block_refs.len(), 2);
2111 }
2112
2113 #[tokio::test]
2114 async fn test_process_fetched_blocks() {
2115 let (context, _) = Context::new_for_test(4);
2117 let context = Arc::new(context);
2118 let block_verifier = Arc::new(NoopBlockVerifier {});
2119 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2120 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2121 let (commands_sender, _commands_receiver) =
2122 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2123
2124 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2128 expected_blocks.extend(
2129 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2130 );
2131 assert_eq!(
2132 expected_blocks.len(),
2133 context.parameters.max_blocks_per_sync
2134 );
2135
2136 let expected_serialized_blocks = expected_blocks
2137 .iter()
2138 .map(|b| b.serialized().clone())
2139 .collect::<Vec<_>>();
2140
2141 let expected_block_refs = expected_blocks
2142 .iter()
2143 .map(|b| b.reference())
2144 .collect::<BTreeSet<_>>();
2145
2146 let peer_index = AuthorityIndex::new_for_test(2);
2148
2149 let inflight_blocks_map = InflightBlocksMap::new();
2151 let blocks_guard = inflight_blocks_map
2152 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2153 .expect("Failed to lock blocks");
2154
2155 assert_eq!(
2156 inflight_blocks_map.num_of_locked_blocks(),
2157 expected_block_refs.len()
2158 );
2159
2160 let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2162 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2163 )));
2164 let result = Synchronizer::<
2165 MockNetworkClient,
2166 NoopBlockVerifier,
2167 MockCoreThreadDispatcher,
2168 >::process_fetched_blocks(
2169 expected_serialized_blocks,
2170 peer_index,
2171 blocks_guard, core_dispatcher.clone(),
2173 block_verifier,
2174 verified_cache,
2175 commit_vote_monitor,
2176 context.clone(),
2177 commands_sender,
2178 "test",
2179 )
2180 .await;
2181
2182 assert!(result.is_ok());
2184
2185 let added_blocks = core_dispatcher.get_add_blocks().await;
2187 assert_eq!(
2188 added_blocks
2189 .iter()
2190 .map(|b| b.reference())
2191 .collect::<BTreeSet<_>>(),
2192 expected_block_refs,
2193 );
2194
2195 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2197 }
2198
2199 #[tokio::test]
2200 async fn test_process_fetched_blocks_duplicates() {
2201 let (context, _) = Context::new_for_test(4);
2203 let context = Arc::new(context);
2204 let block_verifier = Arc::new(NoopBlockVerifier {});
2205 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2206 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2207 let (commands_sender, _commands_receiver) =
2208 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2209
2210 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2214 expected_blocks.extend(
2215 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2216 );
2217 assert_eq!(
2218 expected_blocks.len(),
2219 context.parameters.max_blocks_per_sync
2220 );
2221
2222 let expected_serialized_blocks = expected_blocks
2223 .iter()
2224 .map(|b| b.serialized().clone())
2225 .collect::<Vec<_>>();
2226
2227 let expected_block_refs = expected_blocks
2228 .iter()
2229 .map(|b| b.reference())
2230 .collect::<BTreeSet<_>>();
2231
2232 let peer_index = AuthorityIndex::new_for_test(2);
2234
2235 let inflight_blocks_map = InflightBlocksMap::new();
2237 let blocks_guard = inflight_blocks_map
2238 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2239 .expect("Failed to lock blocks");
2240
2241 assert_eq!(
2242 inflight_blocks_map.num_of_locked_blocks(),
2243 expected_block_refs.len()
2244 );
2245
2246 let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2248 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2249 )));
2250
2251 let result = Synchronizer::<
2253 MockNetworkClient,
2254 NoopBlockVerifier,
2255 MockCoreThreadDispatcher,
2256 >::process_fetched_blocks(
2257 expected_serialized_blocks.clone(),
2258 peer_index,
2259 blocks_guard,
2260 core_dispatcher.clone(),
2261 block_verifier.clone(),
2262 verified_cache.clone(),
2263 commit_vote_monitor.clone(),
2264 context.clone(),
2265 commands_sender.clone(),
2266 "test",
2267 )
2268 .await;
2269
2270 assert!(result.is_ok());
2272
2273 let added_blocks = core_dispatcher.get_add_blocks().await;
2275 assert_eq!(
2276 added_blocks
2277 .iter()
2278 .map(|b| b.reference())
2279 .collect::<BTreeSet<_>>(),
2280 expected_block_refs,
2281 );
2282
2283 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2285
2286 let blocks_guard_second = inflight_blocks_map
2289 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2290 .expect("Failed to lock blocks for second call");
2291
2292 let result_second = Synchronizer::<
2293 MockNetworkClient,
2294 NoopBlockVerifier,
2295 MockCoreThreadDispatcher,
2296 >::process_fetched_blocks(
2297 expected_serialized_blocks,
2298 peer_index,
2299 blocks_guard_second,
2300 core_dispatcher.clone(),
2301 block_verifier,
2302 verified_cache.clone(),
2303 commit_vote_monitor,
2304 context.clone(),
2305 commands_sender,
2306 "test",
2307 )
2308 .await;
2309
2310 assert!(result_second.is_ok());
2311
2312 let added_blocks_second_call = core_dispatcher.get_add_blocks().await;
2315 assert!(
2316 added_blocks_second_call.is_empty(),
2317 "Expected no blocks to be added on second call due to LruCache, but got {} blocks",
2318 added_blocks_second_call.len()
2319 );
2320
2321 let cache_size = verified_cache.lock().len();
2323 assert_eq!(
2324 cache_size,
2325 expected_block_refs.len(),
2326 "Expected {} entries in the LruCache, but got {}",
2327 expected_block_refs.len(),
2328 cache_size
2329 );
2330 }
2331
2332 #[tokio::test]
2333 async fn test_successful_fetch_blocks_from_peer() {
2334 let (context, _) = Context::new_for_test(4);
2336 let context = Arc::new(context);
2337 let block_verifier = Arc::new(NoopBlockVerifier {});
2338 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2339 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2340 let network_client = Arc::new(MockNetworkClient::default());
2341 let store = Arc::new(MemStore::new());
2342 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2343
2344 let handle = Synchronizer::start(
2345 network_client.clone(),
2346 context,
2347 core_dispatcher.clone(),
2348 commit_vote_monitor,
2349 block_verifier,
2350 dag_state,
2351 false,
2352 );
2353
2354 let expected_blocks = (0..10)
2356 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2357 .collect::<Vec<_>>();
2358 let missing_blocks = expected_blocks
2359 .iter()
2360 .map(|block| block.reference())
2361 .collect::<BTreeSet<_>>();
2362
2363 let peer = AuthorityIndex::new_for_test(1);
2365 network_client
2366 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
2367 .await;
2368
2369 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2371
2372 sleep(Duration::from_millis(1_000)).await;
2374
2375 let added_blocks = core_dispatcher.get_add_blocks().await;
2377 assert_eq!(added_blocks, expected_blocks);
2378 }
2379
2380 #[tokio::test]
2381 async fn saturate_fetch_blocks_from_peer() {
2382 let (context, _) = Context::new_for_test(4);
2384 let context = Arc::new(context);
2385 let block_verifier = Arc::new(NoopBlockVerifier {});
2386 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2387 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2388 let network_client = Arc::new(MockNetworkClient::default());
2389 let store = Arc::new(MemStore::new());
2390 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2391
2392 let handle = Synchronizer::start(
2393 network_client.clone(),
2394 context,
2395 core_dispatcher.clone(),
2396 commit_vote_monitor,
2397 block_verifier,
2398 dag_state,
2399 false,
2400 );
2401
2402 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
2404 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
2405 .collect::<Vec<_>>();
2406
2407 let peer = AuthorityIndex::new_for_test(1);
2409 let mut iter = expected_blocks.iter().peekable();
2410 while let Some(block) = iter.next() {
2411 network_client
2414 .stub_fetch_blocks(
2415 vec![block.clone()],
2416 peer,
2417 Some(Duration::from_millis(5_000)),
2418 )
2419 .await;
2420
2421 let mut missing_blocks = BTreeSet::new();
2422 missing_blocks.insert(block.reference());
2423
2424 if iter.peek().is_none() {
2427 match handle.fetch_blocks(missing_blocks, peer).await {
2428 Err(ConsensusError::SynchronizerSaturated(index, _)) => {
2429 assert_eq!(index, peer);
2430 }
2431 _ => panic!("A saturated synchronizer error was expected"),
2432 }
2433 } else {
2434 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2435 }
2436 }
2437 }
2438
2439 #[tokio::test(flavor = "current_thread", start_paused = true)]
2440 async fn synchronizer_periodic_task_fetch_blocks() {
2441 let (context, _) = Context::new_for_test(4);
2443 let context = Arc::new(context);
2444 let block_verifier = Arc::new(NoopBlockVerifier {});
2445 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2446 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2447 let network_client = Arc::new(MockNetworkClient::default());
2448 let store = Arc::new(MemStore::new());
2449 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2450
2451 let expected_blocks = (0..10)
2453 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2454 .collect::<Vec<_>>();
2455 let missing_blocks = expected_blocks
2456 .iter()
2457 .map(|block| block.reference())
2458 .collect::<BTreeSet<_>>();
2459
2460 core_dispatcher
2462 .stub_missing_blocks(missing_blocks.clone())
2463 .await;
2464
2465 network_client
2469 .stub_fetch_blocks(
2470 expected_blocks.clone(),
2471 AuthorityIndex::new_for_test(1),
2472 Some(FETCH_REQUEST_TIMEOUT),
2473 )
2474 .await;
2475 network_client
2476 .stub_fetch_blocks(
2477 expected_blocks.clone(),
2478 AuthorityIndex::new_for_test(2),
2479 None,
2480 )
2481 .await;
2482
2483 let _handle = Synchronizer::start(
2485 network_client.clone(),
2486 context,
2487 core_dispatcher.clone(),
2488 commit_vote_monitor,
2489 block_verifier,
2490 dag_state,
2491 false,
2492 );
2493
2494 sleep(8 * FETCH_REQUEST_TIMEOUT).await;
2495
2496 let added_blocks = core_dispatcher.get_add_blocks().await;
2498 assert_eq!(added_blocks, expected_blocks);
2499
2500 assert!(
2502 core_dispatcher
2503 .get_missing_blocks()
2504 .await
2505 .unwrap()
2506 .is_empty()
2507 );
2508 }
2509
2510 #[tokio::test(flavor = "current_thread", start_paused = true)]
2511 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2512 let (mut context, _) = Context::new_for_test(4);
2514 context
2515 .protocol_config
2516 .set_consensus_batched_block_sync_for_testing(true);
2517 let context = Arc::new(context);
2518 let block_verifier = Arc::new(NoopBlockVerifier {});
2519 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2520 let network_client = Arc::new(MockNetworkClient::default());
2521 let store = Arc::new(MemStore::new());
2522 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2523 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2524
2525 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2528 let stub_blocks = (sync_missing_block_round_threshold * 2
2529 ..sync_missing_block_round_threshold * 2
2530 + context.parameters.max_blocks_per_sync as u32)
2531 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2532 .collect::<Vec<_>>();
2533 let missing_blocks = stub_blocks
2534 .iter()
2535 .map(|block| block.reference())
2536 .collect::<BTreeSet<_>>();
2537 core_dispatcher
2538 .stub_missing_blocks(missing_blocks.clone())
2539 .await;
2540 let mut expected_blocks = stub_blocks
2544 .iter()
2545 .take(context.parameters.max_blocks_per_sync)
2546 .cloned()
2547 .collect::<Vec<_>>();
2548 network_client
2549 .stub_fetch_blocks(
2550 expected_blocks.clone(),
2551 AuthorityIndex::new_for_test(1),
2552 Some(FETCH_REQUEST_TIMEOUT),
2553 )
2554 .await;
2555 network_client
2556 .stub_fetch_blocks(
2557 expected_blocks.clone(),
2558 AuthorityIndex::new_for_test(2),
2559 None,
2560 )
2561 .await;
2562
2563 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2565 let commit_index: CommitIndex = round - 1;
2566 let blocks = (0..4)
2567 .map(|authority| {
2568 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2569 let block = TestBlock::new(round, authority)
2570 .set_commit_votes(commit_votes)
2571 .build();
2572
2573 VerifiedBlock::new_for_test(block)
2574 })
2575 .collect::<Vec<_>>();
2576
2577 for block in blocks {
2580 commit_vote_monitor.observe_block(&block);
2581 }
2582
2583 let _handle = Synchronizer::start(
2586 network_client.clone(),
2587 context.clone(),
2588 core_dispatcher.clone(),
2589 commit_vote_monitor.clone(),
2590 block_verifier,
2591 dag_state.clone(),
2592 false,
2593 );
2594
2595 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2596
2597 let added_blocks = core_dispatcher.get_add_blocks().await;
2600 assert_eq!(added_blocks, vec![]);
2601
2602 println!("Before advancing");
2603 {
2606 let mut d = dag_state.write();
2607 for index in 1..=commit_index {
2608 let commit =
2609 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2610
2611 d.add_commit(commit);
2612 }
2613
2614 println!("Once advanced");
2615 assert_eq!(
2616 d.last_commit_index(),
2617 commit_vote_monitor.quorum_commit_index()
2618 );
2619 }
2620
2621 core_dispatcher
2623 .stub_missing_blocks(missing_blocks.clone())
2624 .await;
2625
2626 println!("Final sleep");
2627 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2628
2629 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2631 println!("Final await");
2632 added_blocks.sort_by_key(|block| block.reference());
2633 expected_blocks.sort_by_key(|block| block.reference());
2634
2635 assert_eq!(added_blocks, expected_blocks);
2636 }
2637
2638 #[tokio::test(flavor = "current_thread", start_paused = true)]
2639 async fn synchronizer_fetch_own_last_block() {
2640 let (context, _) = Context::new_for_test(4);
2642 let context = Arc::new(context.with_parameters(Parameters {
2643 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2644 ..Default::default()
2645 }));
2646 let block_verifier = Arc::new(NoopBlockVerifier {});
2647 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2648 let network_client = Arc::new(MockNetworkClient::default());
2649 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2650 let store = Arc::new(MemStore::new());
2651 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2652 let our_index = AuthorityIndex::new_for_test(0);
2653
2654 let mut expected_blocks = (8..=10)
2656 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2657 .collect::<Vec<_>>();
2658
2659 let block_1 = expected_blocks.pop().unwrap();
2662 network_client
2663 .stub_fetch_latest_blocks(
2664 vec![block_1.clone()],
2665 AuthorityIndex::new_for_test(1),
2666 vec![our_index],
2667 Some(Duration::from_secs(10)),
2668 )
2669 .await;
2670 network_client
2671 .stub_fetch_latest_blocks(
2672 vec![block_1],
2673 AuthorityIndex::new_for_test(1),
2674 vec![our_index],
2675 None,
2676 )
2677 .await;
2678
2679 let block_2 = expected_blocks.pop().unwrap();
2681 network_client
2682 .stub_fetch_latest_blocks(
2683 vec![block_2.clone()],
2684 AuthorityIndex::new_for_test(2),
2685 vec![our_index],
2686 Some(Duration::from_secs(10)),
2687 )
2688 .await;
2689 network_client
2690 .stub_fetch_latest_blocks(
2691 vec![block_2],
2692 AuthorityIndex::new_for_test(2),
2693 vec![our_index],
2694 None,
2695 )
2696 .await;
2697
2698 let block_3 = expected_blocks.pop().unwrap();
2700 network_client
2701 .stub_fetch_latest_blocks(
2702 vec![block_3.clone()],
2703 AuthorityIndex::new_for_test(3),
2704 vec![our_index],
2705 Some(Duration::from_secs(10)),
2706 )
2707 .await;
2708 network_client
2709 .stub_fetch_latest_blocks(
2710 vec![block_3],
2711 AuthorityIndex::new_for_test(3),
2712 vec![our_index],
2713 None,
2714 )
2715 .await;
2716
2717 let handle = Synchronizer::start(
2719 network_client.clone(),
2720 context.clone(),
2721 core_dispatcher.clone(),
2722 commit_vote_monitor,
2723 block_verifier,
2724 dag_state,
2725 true,
2726 );
2727
2728 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2730
2731 assert_eq!(
2733 core_dispatcher.get_last_own_proposed_round().await,
2734 vec![10]
2735 );
2736
2737 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2739
2740 assert_eq!(
2742 context
2743 .metrics
2744 .node_metrics
2745 .sync_last_known_own_block_retries
2746 .get(),
2747 1
2748 );
2749
2750 if let Err(err) = handle.stop().await {
2752 if err.is_panic() {
2753 std::panic::resume_unwind(err.into_panic());
2754 }
2755 }
2756 }
2757 #[derive(Default)]
2758 struct SyncMockDispatcher {
2759 missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2760 added_blocks: Mutex<Vec<VerifiedBlock>>,
2761 }
2762
2763 #[async_trait::async_trait]
2764 impl CoreThreadDispatcher for SyncMockDispatcher {
2765 async fn get_missing_blocks(
2766 &self,
2767 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2768 Ok(self.missing_blocks.lock().await.clone())
2769 }
2770 async fn add_blocks(
2771 &self,
2772 blocks: Vec<VerifiedBlock>,
2773 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2774 let mut guard = self.added_blocks.lock().await;
2775 guard.extend(blocks.clone());
2776 Ok(blocks.iter().map(|b| b.reference()).collect())
2777 }
2778
2779 async fn check_block_refs(
2782 &self,
2783 block_refs: Vec<BlockRef>,
2784 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2785 Ok(block_refs.into_iter().collect())
2787 }
2788
2789 async fn add_certified_commits(
2790 &self,
2791 _commits: CertifiedCommits,
2792 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2793 Ok(BTreeSet::new())
2795 }
2796
2797 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2798 Ok(())
2799 }
2800
2801 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2802 Ok(())
2803 }
2804
2805 fn set_propagation_delay_and_quorum_rounds(
2806 &self,
2807 _delay: Round,
2808 _received_quorum_rounds: Vec<QuorumRound>,
2809 _accepted_quorum_rounds: Vec<QuorumRound>,
2810 ) -> Result<(), CoreError> {
2811 Ok(())
2812 }
2813
2814 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2815 Ok(())
2816 }
2817
2818 fn highest_received_rounds(&self) -> Vec<Round> {
2819 Vec::new()
2820 }
2821 }
2822
2823 #[tokio::test(flavor = "current_thread")]
2824 async fn known_before_random_peer_fetch() {
2825 {
2826 let (ctx, _) = Context::new_for_test(10);
2828 let context = Arc::new(ctx);
2829 let store = Arc::new(MemStore::new());
2830 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2831 let inflight = InflightBlocksMap::new();
2832
2833 let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2835 let missing_ref = missing_vb.reference();
2836 let missing_blocks = BTreeMap::from([(
2837 missing_ref,
2838 BTreeSet::from([
2839 AuthorityIndex::new_for_test(2),
2840 AuthorityIndex::new_for_test(3),
2841 AuthorityIndex::new_for_test(4),
2842 ]),
2843 )]);
2844
2845 let network_client = Arc::new(MockNetworkClient::default());
2847 for i in 1..=9 {
2849 let peer = AuthorityIndex::new_for_test(i);
2850 if i == 1 || i == 4 {
2851 network_client
2852 .stub_fetch_blocks(
2853 vec![missing_vb.clone()],
2854 peer,
2855 Some(2 * FETCH_REQUEST_TIMEOUT),
2856 )
2857 .await;
2858 continue;
2859 }
2860 network_client
2861 .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2862 .await;
2863 }
2864
2865 let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2868 ::fetch_blocks_from_authorities(
2869 context.clone(),
2870 inflight.clone(),
2871 network_client.clone(),
2872 missing_blocks,
2873 dag_state.clone(),
2874 )
2875 .await;
2876
2877 assert_eq!(results.len(), 2);
2880
2881 let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2883 assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2884 let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2885 assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2886 let expected = missing_vb.serialized().clone();
2888 assert_eq!(hot_bytes, &vec![expected]);
2889 }
2890 }
2891
2892 #[tokio::test(flavor = "current_thread")]
2893 async fn known_before_periodic_peer_fetch_larger_scenario() {
2894 use std::{
2895 collections::{BTreeMap, BTreeSet},
2896 sync::Arc,
2897 };
2898
2899 use parking_lot::RwLock;
2900
2901 use crate::{
2902 block::{Round, TestBlock, VerifiedBlock},
2903 context::Context,
2904 };
2905
2906 let (ctx, _) = Context::new_for_test(10);
2908 let context = Arc::new(ctx);
2909 let store = Arc::new(MemStore::new());
2910 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2911 let inflight = InflightBlocksMap::new();
2912 let network_client = Arc::new(MockNetworkClient::default());
2913
2914 let mut missing_blocks = BTreeMap::new();
2916 let mut missing_vbs = Vec::new();
2917 let known_number_blocks = 10;
2918 for i in 0..1000 {
2919 let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2920 let r = vb.reference();
2921 if i < known_number_blocks {
2922 missing_blocks.insert(
2924 r,
2925 BTreeSet::from([
2926 AuthorityIndex::new_for_test(0),
2927 AuthorityIndex::new_for_test(2),
2928 ]),
2929 );
2930 } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2931 missing_blocks.insert(
2933 r,
2934 BTreeSet::from([
2935 AuthorityIndex::new_for_test(0),
2936 AuthorityIndex::new_for_test(3),
2937 ]),
2938 );
2939 } else {
2940 missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2942 }
2943 missing_vbs.push(vb);
2944 }
2945
2946 let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2948 let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2949 .iter()
2950 .map(|&peer| {
2951 let vbs = missing_vbs
2952 .iter()
2953 .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2954 .take(context.parameters.max_blocks_per_sync)
2955 .cloned()
2956 .collect::<Vec<_>>();
2957 (peer, vbs)
2958 })
2959 .collect();
2960
2961 for (peer, vbs) in known_vbs_by_peer {
2962 if peer == AuthorityIndex::new_for_test(2) {
2963 network_client
2965 .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2966 .await;
2967 network_client
2968 .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2969 .await;
2970 } else {
2971 network_client
2972 .stub_fetch_blocks(vbs.clone(), peer, None)
2973 .await;
2974 }
2975 }
2976
2977 network_client
2979 .stub_fetch_blocks(
2980 missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2981 AuthorityIndex::new_for_test(1),
2982 None,
2983 )
2984 .await;
2985
2986 network_client
2987 .stub_fetch_blocks(
2988 missing_vbs[context.parameters.max_blocks_per_sync
2989 ..2 * context.parameters.max_blocks_per_sync]
2990 .to_vec(),
2991 AuthorityIndex::new_for_test(4),
2992 None,
2993 )
2994 .await;
2995
2996 let results = Synchronizer::<
2998 MockNetworkClient,
2999 NoopBlockVerifier,
3000 SyncMockDispatcher,
3001 >::fetch_blocks_from_authorities(
3002 context.clone(),
3003 inflight.clone(),
3004 network_client.clone(),
3005 missing_blocks,
3006 dag_state.clone(),
3007 )
3008 .await;
3009
3010 assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
3013
3014 let (_guard3, bytes3, peer3) = &results[0];
3016 assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
3017 let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
3018 .iter()
3019 .map(|vb| vb.serialized().clone())
3020 .collect::<Vec<_>>();
3021 assert_eq!(bytes3, &expected2);
3022
3023 let (_guard1, bytes1, peer1) = &results[1];
3025 assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
3026 let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
3027 .iter()
3028 .map(|vb| vb.serialized().clone())
3029 .collect::<Vec<_>>();
3030 assert_eq!(bytes1, &expected1);
3031
3032 let (_guard4, bytes4, peer4) = &results[2];
3034 assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
3035 let expected4 = missing_vbs
3036 [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
3037 .iter()
3038 .map(|vb| vb.serialized().clone())
3039 .collect::<Vec<_>>();
3040 assert_eq!(bytes4, &expected4);
3041
3042 let (_guard5, bytes5, peer5) = &results[3];
3044 assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
3045 let expected5 = missing_vbs[0..known_number_blocks]
3046 .iter()
3047 .map(|vb| vb.serialized().clone())
3048 .collect::<Vec<_>>();
3049 assert_eq!(bytes5, &expected5);
3050 }
3051
3052 #[tokio::test(flavor = "current_thread")]
3053 async fn test_verify_blocks_deduplication() {
3054 let (context, _keys) = Context::new_for_test(4);
3055 let context = Arc::new(context);
3056 let block_verifier = Arc::new(NoopBlockVerifier {});
3057 let failing_verifier = Arc::new(FailingBlockVerifier);
3058 let peer1 = AuthorityIndex::new_for_test(1);
3059 let peer2 = AuthorityIndex::new_for_test(2);
3060
3061 let cache = Arc::new(SyncMutex::new(LruCache::new(NonZeroUsize::new(5).unwrap())));
3063
3064 let block1 = VerifiedBlock::new_for_test(TestBlock::new(10, 0).build());
3066 let serialized1 = vec![block1.serialized().clone()];
3067
3068 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3070 serialized1.clone(), block_verifier.clone(), cache.clone(), &context, peer1, "live",
3071 );
3072 assert_eq!(result.unwrap().len(), 1);
3073
3074 let peer1_hostname = &context.committee.authority(peer1).hostname;
3075 assert_eq!(
3076 context
3077 .metrics
3078 .node_metrics
3079 .synchronizer_skipped_blocks_by_peer
3080 .with_label_values(&[peer1_hostname.as_str(), "live"])
3081 .get(),
3082 0
3083 );
3084
3085 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3087 serialized1, block_verifier.clone(), cache.clone(), &context, peer2, "periodic",
3088 );
3089 assert_eq!(result.unwrap().len(), 0, "Should skip cached block");
3090
3091 let peer2_hostname = &context.committee.authority(peer2).hostname;
3092 assert_eq!(
3093 context
3094 .metrics
3095 .node_metrics
3096 .synchronizer_skipped_blocks_by_peer
3097 .with_label_values(&[peer2_hostname.as_str(), "periodic"])
3098 .get(),
3099 1
3100 );
3101
3102 let invalid_block = VerifiedBlock::new_for_test(TestBlock::new(20, 0).build());
3104 let invalid_serialized = vec![invalid_block.serialized().clone()];
3105
3106 assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3107 invalid_serialized.clone(), failing_verifier.clone(), cache.clone(), &context, peer1, "test",
3108 ).is_err());
3109 assert_eq!(cache.lock().len(), 1, "Invalid block should not be cached");
3110
3111 assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3113 invalid_serialized, failing_verifier, cache.clone(), &context, peer1, "test",
3114 ).is_err());
3115
3116 let blocks: Vec<_> = (0..5)
3118 .map(|i| VerifiedBlock::new_for_test(TestBlock::new(30 + i, 0).build()))
3119 .collect();
3120
3121 for block in &blocks {
3123 Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3124 vec![block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3125 ).unwrap();
3126 }
3127 assert_eq!(cache.lock().len(), 5);
3128
3129 let new_block = VerifiedBlock::new_for_test(TestBlock::new(99, 0).build());
3131 Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3132 vec![new_block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3133 ).unwrap();
3134
3135 let block1_serialized = vec![block1.serialized().clone()];
3138 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3139 block1_serialized, block_verifier.clone(), cache.clone(), &context, peer1, "test",
3140 );
3141 assert_eq!(
3142 result.unwrap().len(),
3143 1,
3144 "Evicted block should be re-verified"
3145 );
3146
3147 let new_block_serialized = vec![new_block.serialized().clone()];
3149 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3150 new_block_serialized, block_verifier, cache, &context, peer1, "test",
3151 );
3152 assert_eq!(result.unwrap().len(), 0, "New block should be cached");
3153 }
3154}