1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7 num::NonZeroUsize,
8 sync::Arc,
9 time::Duration,
10};
11
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use futures::{StreamExt as _, stream::FuturesUnordered};
15use iota_macros::fail_point_async;
16use iota_metrics::{
17 monitored_future,
18 monitored_mpsc::{Receiver, Sender, channel},
19 monitored_scope,
20};
21use itertools::Itertools as _;
22use lru::LruCache;
23use parking_lot::{Mutex, RwLock};
24#[cfg(not(test))]
25use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
26use tap::TapFallible;
27use tokio::{
28 runtime::Handle,
29 sync::{mpsc::error::TrySendError, oneshot},
30 task::{JoinError, JoinSet},
31 time::{Instant, sleep, sleep_until, timeout},
32};
33use tracing::{debug, error, info, instrument, trace, warn};
34
35use crate::{
36 BlockAPI, CommitIndex, Round,
37 authority_service::COMMIT_LAG_MULTIPLIER,
38 block::{BlockDigest, BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
39 block_verifier::BlockVerifier,
40 commit_vote_monitor::CommitVoteMonitor,
41 context::Context,
42 core_thread::CoreThreadDispatcher,
43 dag_state::DagState,
44 error::{ConsensusError, ConsensusResult},
45 network::NetworkClient,
46 scoring_metrics_store::ErrorSource,
47};
48
49const FETCH_BLOCKS_CONCURRENCY: usize = 5;
51
52pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
56
57const VERIFIED_BLOCKS_CACHE_CAP: usize = 200_000;
59
60const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
62
63const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
65
66const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
70
71const MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK: usize = 1;
75
76const MAX_PERIODIC_SYNC_PEERS: usize = 4;
79
80const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
84
85#[derive(Clone)]
87enum SyncMethod {
88 Live,
89 Periodic,
90}
91
92struct BlocksGuard {
93 map: Arc<InflightBlocksMap>,
94 block_refs: BTreeSet<BlockRef>,
95 peer: AuthorityIndex,
96 method: SyncMethod,
97}
98
99impl Drop for BlocksGuard {
100 fn drop(&mut self) {
101 self.map.unlock_blocks(&self.block_refs, self.peer);
102 }
103}
104
105struct InflightBlocksMap {
111 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
112}
113
114impl InflightBlocksMap {
115 fn new() -> Arc<Self> {
116 Arc::new(Self {
117 inner: Mutex::new(HashMap::new()),
118 })
119 }
120
121 fn lock_blocks(
135 self: &Arc<Self>,
136 missing_block_refs: BTreeSet<BlockRef>,
137 peer: AuthorityIndex,
138 method: SyncMethod,
139 ) -> Option<BlocksGuard> {
140 let mut blocks = BTreeSet::new();
141 let mut inner = self.inner.lock();
142
143 for block_ref in missing_block_refs {
144 let authorities = inner.entry(block_ref).or_default();
145
146 if authorities.contains(&peer) {
148 continue;
149 }
150
151 let total_count = authorities.len();
153
154 let max_limit = match method {
156 SyncMethod::Live => MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK,
157 SyncMethod::Periodic => MAX_AUTHORITIES_TO_FETCH_PER_BLOCK,
158 };
159
160 if total_count < max_limit {
162 assert!(authorities.insert(peer));
163 blocks.insert(block_ref);
164 }
165 }
166
167 if blocks.is_empty() {
168 None
169 } else {
170 Some(BlocksGuard {
171 map: self.clone(),
172 block_refs: blocks,
173 peer,
174 method,
175 })
176 }
177 }
178
179 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
184 let mut blocks_to_fetch = self.inner.lock();
186 for block_ref in block_refs {
187 let authorities = blocks_to_fetch
188 .get_mut(block_ref)
189 .expect("Should have found a non empty map");
190
191 assert!(authorities.remove(&peer), "Peer index should be present!");
192
193 if authorities.is_empty() {
195 blocks_to_fetch.remove(block_ref);
196 }
197 }
198 }
199
200 fn swap_locks(
205 self: &Arc<Self>,
206 blocks_guard: BlocksGuard,
207 peer: AuthorityIndex,
208 ) -> Option<BlocksGuard> {
209 let block_refs = blocks_guard.block_refs.clone();
210 let method = blocks_guard.method.clone();
211
212 drop(blocks_guard);
214
215 self.lock_blocks(block_refs, peer, method)
217 }
218
219 #[cfg(test)]
220 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
221 let inner = self.inner.lock();
222 inner.len()
223 }
224}
225
226enum Command {
227 FetchBlocks {
228 missing_block_refs: BTreeSet<BlockRef>,
229 peer_index: AuthorityIndex,
230 result: oneshot::Sender<Result<(), ConsensusError>>,
231 },
232 FetchOwnLastBlock,
233 KickOffScheduler,
234}
235
236pub(crate) struct SynchronizerHandle {
237 commands_sender: Sender<Command>,
238 tasks: tokio::sync::Mutex<JoinSet<()>>,
239}
240
241impl SynchronizerHandle {
242 pub(crate) async fn fetch_blocks(
245 &self,
246 missing_block_refs: BTreeSet<BlockRef>,
247 peer_index: AuthorityIndex,
248 ) -> ConsensusResult<()> {
249 let (sender, receiver) = oneshot::channel();
250 self.commands_sender
251 .send(Command::FetchBlocks {
252 missing_block_refs,
253 peer_index,
254 result: sender,
255 })
256 .await
257 .map_err(|_err| ConsensusError::Shutdown)?;
258 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
259 }
260
261 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
262 let mut tasks = self.tasks.lock().await;
263 tasks.abort_all();
264 while let Some(result) = tasks.join_next().await {
265 result?
266 }
267 Ok(())
268 }
269}
270
271pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
298 context: Arc<Context>,
299 commands_receiver: Receiver<Command>,
300 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
301 core_dispatcher: Arc<D>,
302 commit_vote_monitor: Arc<CommitVoteMonitor>,
303 dag_state: Arc<RwLock<DagState>>,
304 fetch_blocks_scheduler_task: JoinSet<()>,
305 fetch_own_last_block_task: JoinSet<()>,
306 network_client: Arc<C>,
307 block_verifier: Arc<V>,
308 inflight_blocks_map: Arc<InflightBlocksMap>,
309 verified_blocks_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
310 commands_sender: Sender<Command>,
311}
312
313impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
314 pub fn start(
317 network_client: Arc<C>,
318 context: Arc<Context>,
319 core_dispatcher: Arc<D>,
320 commit_vote_monitor: Arc<CommitVoteMonitor>,
321 block_verifier: Arc<V>,
322 dag_state: Arc<RwLock<DagState>>,
323 sync_last_known_own_block: bool,
324 ) -> Arc<SynchronizerHandle> {
325 let (commands_sender, commands_receiver) =
326 channel("consensus_synchronizer_commands", 1_000);
327 let inflight_blocks_map = InflightBlocksMap::new();
328 let verified_blocks_cache = Arc::new(Mutex::new(LruCache::new(
329 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
330 )));
331
332 let mut fetch_block_senders = BTreeMap::new();
334 let mut tasks = JoinSet::new();
335 for (index, _) in context.committee.authorities() {
336 if index == context.own_index {
337 continue;
338 }
339 let (sender, receiver) =
340 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
341 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
342 index,
343 network_client.clone(),
344 block_verifier.clone(),
345 verified_blocks_cache.clone(),
346 commit_vote_monitor.clone(),
347 context.clone(),
348 core_dispatcher.clone(),
349 dag_state.clone(),
350 receiver,
351 commands_sender.clone(),
352 );
353 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
354 fetch_block_senders.insert(index, sender);
355 }
356
357 let commands_sender_clone = commands_sender.clone();
358
359 if sync_last_known_own_block {
360 commands_sender
361 .try_send(Command::FetchOwnLastBlock)
362 .expect("Failed to sync our last block");
363 }
364
365 tasks.spawn(monitored_future!(async move {
367 let mut s = Self {
368 context,
369 commands_receiver,
370 fetch_block_senders,
371 core_dispatcher,
372 commit_vote_monitor,
373 fetch_blocks_scheduler_task: JoinSet::new(),
374 fetch_own_last_block_task: JoinSet::new(),
375 network_client,
376 block_verifier,
377 inflight_blocks_map,
378 verified_blocks_cache,
379 commands_sender: commands_sender_clone,
380 dag_state,
381 };
382 s.run().await;
383 }));
384
385 Arc::new(SynchronizerHandle {
386 commands_sender,
387 tasks: tokio::sync::Mutex::new(tasks),
388 })
389 }
390
391 async fn run(&mut self) {
393 const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
396 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
397
398 tokio::pin!(scheduler_timeout);
399
400 loop {
401 tokio::select! {
402 Some(command) = self.commands_receiver.recv() => {
403 match command {
404 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
405 if peer_index == self.context.own_index {
406 error!("We should never attempt to fetch blocks from our own node");
407 continue;
408 }
409
410 let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
411
412 let missing_block_refs = missing_block_refs
415 .into_iter()
416 .take(self.context.parameters.max_blocks_per_sync)
417 .collect();
418
419 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index, SyncMethod::Live);
420 let Some(blocks_guard) = blocks_guard else {
421 result.send(Ok(())).ok();
422 continue;
423 };
424
425 let r = self
428 .fetch_block_senders
429 .get(&peer_index)
430 .expect("Fatal error, sender should be present")
431 .try_send(blocks_guard)
432 .map_err(|err| {
433 match err {
434 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
435 TrySendError::Closed(_) => ConsensusError::Shutdown
436 }
437 });
438
439 result.send(r).ok();
440 }
441 Command::FetchOwnLastBlock => {
442 if self.fetch_own_last_block_task.is_empty() {
443 self.start_fetch_own_last_block_task();
444 }
445 }
446 Command::KickOffScheduler => {
447 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
450 Instant::now()
451 } else {
452 Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
453 };
454
455 if timeout < scheduler_timeout.deadline() {
457 scheduler_timeout.as_mut().reset(timeout);
458 }
459 }
460 }
461 },
462 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
463 match result {
464 Ok(()) => {},
465 Err(e) => {
466 if e.is_cancelled() {
467 } else if e.is_panic() {
468 std::panic::resume_unwind(e.into_panic());
469 } else {
470 panic!("fetch our last block task failed: {e}");
471 }
472 },
473 };
474 },
475 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
476 match result {
477 Ok(()) => {},
478 Err(e) => {
479 if e.is_cancelled() {
480 } else if e.is_panic() {
481 std::panic::resume_unwind(e.into_panic());
482 } else {
483 panic!("fetch blocks scheduler task failed: {e}");
484 }
485 },
486 };
487 },
488 () = &mut scheduler_timeout => {
489 if self.fetch_blocks_scheduler_task.is_empty() {
491 if let Err(err) = self.start_fetch_missing_blocks_task().await {
492 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
493 return;
494 };
495 }
496
497 scheduler_timeout
498 .as_mut()
499 .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
500 }
501 }
502 }
503 }
504
505 async fn fetch_blocks_from_authority(
506 peer_index: AuthorityIndex,
507 network_client: Arc<C>,
508 block_verifier: Arc<V>,
509 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
510 commit_vote_monitor: Arc<CommitVoteMonitor>,
511 context: Arc<Context>,
512 core_dispatcher: Arc<D>,
513 dag_state: Arc<RwLock<DagState>>,
514 mut receiver: Receiver<BlocksGuard>,
515 commands_sender: Sender<Command>,
516 ) {
517 const MAX_RETRIES: u32 = 3;
518 let peer_hostname = &context.committee.authority(peer_index).hostname;
519
520 let mut requests = FuturesUnordered::new();
521
522 loop {
523 tokio::select! {
524 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
525 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
527
528 let metrics = &context.metrics.node_metrics;
530 metrics
531 .synchronizer_requested_blocks_by_peer
532 .with_label_values(&[peer_hostname.as_str(), "live"])
533 .inc_by(blocks_guard.block_refs.len() as u64);
534 let mut authors = HashSet::new();
536 for block_ref in &blocks_guard.block_refs {
537 authors.insert(block_ref.author);
538 }
539 for author in authors {
540 let host = &context.committee.authority(author).hostname;
541 metrics
542 .synchronizer_requested_blocks_by_authority
543 .with_label_values(&[host.as_str(), "live"])
544 .inc();
545 }
546
547 requests.push(Self::fetch_blocks_request(
548 network_client.clone(),
549 peer_index,
550 blocks_guard,
551 highest_rounds,
552 FETCH_REQUEST_TIMEOUT,
553 1,
554 ))
555 },
556 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
557 match response {
558 Ok(blocks) => {
559 if let Err(err) = Self::process_fetched_blocks(blocks,
560 peer_index,
561 blocks_guard,
562 core_dispatcher.clone(),
563 block_verifier.clone(),
564 verified_cache.clone(),
565 commit_vote_monitor.clone(),
566 context.clone(),
567 commands_sender.clone(),
568 "live"
569 ).await {
570 context.scoring_metrics_store.update_scoring_metrics_on_block_receival(
571 peer_index,
572 peer_hostname,
573 err.clone(),
574 ErrorSource::Synchronizer,
575 &context.metrics.node_metrics,
576 );
577 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
578 context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
579 }
580 },
581 Err(_) => {
582 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
583 if retries <= MAX_RETRIES {
584 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
585 } else {
586 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
587 drop(blocks_guard);
589 }
590 }
591 }
592 },
593 else => {
594 info!("Fetching blocks from authority {peer_index} task will now abort.");
595 break;
596 }
597 }
598 }
599 }
600
601 async fn process_fetched_blocks(
605 mut serialized_blocks: Vec<Bytes>,
606 peer_index: AuthorityIndex,
607 requested_blocks_guard: BlocksGuard,
608 core_dispatcher: Arc<D>,
609 block_verifier: Arc<V>,
610 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
611 commit_vote_monitor: Arc<CommitVoteMonitor>,
612 context: Arc<Context>,
613 commands_sender: Sender<Command>,
614 sync_method: &str,
615 ) -> ConsensusResult<()> {
616 if serialized_blocks.is_empty() {
617 return Ok(());
618 }
619 let _s = context
620 .metrics
621 .node_metrics
622 .scope_processing_time
623 .with_label_values(&["Synchronizer::process_fetched_blocks"])
624 .start_timer();
625
626 if context.protocol_config.consensus_batched_block_sync() {
628 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
629 } else {
630 if serialized_blocks.len()
633 > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
634 {
635 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
636 }
637 }
638
639 let blocks = Handle::current()
641 .spawn_blocking({
642 let block_verifier = block_verifier.clone();
643 let verified_cache = verified_cache.clone();
644 let context = context.clone();
645 let sync_method = sync_method.to_string();
646 move || {
647 Self::verify_blocks(
648 serialized_blocks,
649 block_verifier,
650 verified_cache,
651 &context,
652 peer_index,
653 &sync_method,
654 )
655 }
656 })
657 .await
658 .expect("Spawn blocking should not fail")?;
659
660 if !context.protocol_config.consensus_batched_block_sync() {
661 let ancestors = blocks
663 .iter()
664 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
665 .flat_map(|b| b.ancestors().to_vec())
666 .collect::<BTreeSet<BlockRef>>();
667
668 for block in &blocks {
671 if !requested_blocks_guard
672 .block_refs
673 .contains(&block.reference())
674 && !ancestors.contains(&block.reference())
675 {
676 return Err(ConsensusError::UnexpectedFetchedBlock {
677 index: peer_index,
678 block_ref: block.reference(),
679 });
680 }
681 }
682 }
683
684 for block in &blocks {
686 commit_vote_monitor.observe_block(block);
687 }
688
689 let metrics = &context.metrics.node_metrics;
690 let peer_hostname = &context.committee.authority(peer_index).hostname;
691 metrics
692 .synchronizer_fetched_blocks_by_peer
693 .with_label_values(&[peer_hostname.as_str(), sync_method])
694 .inc_by(blocks.len() as u64);
695 for block in &blocks {
696 let block_hostname = &context.committee.authority(block.author()).hostname;
697 metrics
698 .synchronizer_fetched_blocks_by_authority
699 .with_label_values(&[block_hostname.as_str(), sync_method])
700 .inc();
701 }
702
703 debug!(
704 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
705 blocks.len(),
706 blocks.iter().map(|b| b.reference().to_string()).join(", "),
707 );
708
709 let missing_blocks = core_dispatcher
713 .add_blocks(blocks)
714 .await
715 .map_err(|_| ConsensusError::Shutdown)?;
716
717 drop(requested_blocks_guard);
720
721 if !missing_blocks.is_empty() {
723 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
725 {
726 warn!("Commands channel is full")
727 }
728 }
729
730 context
731 .metrics
732 .node_metrics
733 .missing_blocks_after_fetch_total
734 .inc_by(missing_blocks.len() as u64);
735
736 Ok(())
737 }
738
739 fn get_highest_accepted_rounds(
740 dag_state: Arc<RwLock<DagState>>,
741 context: &Arc<Context>,
742 ) -> Vec<Round> {
743 let blocks = dag_state
744 .read()
745 .get_last_cached_block_per_authority(Round::MAX);
746 assert_eq!(blocks.len(), context.committee.size());
747
748 blocks
749 .into_iter()
750 .map(|(block, _)| block.round())
751 .collect::<Vec<_>>()
752 }
753
754 #[instrument(level = "trace", skip_all)]
755 fn verify_blocks(
756 serialized_blocks: Vec<Bytes>,
757 block_verifier: Arc<V>,
758 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
759 context: &Context,
760 peer_index: AuthorityIndex,
761 sync_method: &str,
762 ) -> ConsensusResult<Vec<VerifiedBlock>> {
763 let mut verified_blocks = Vec::new();
764 let mut skipped_count = 0u64;
765
766 for serialized_block in serialized_blocks {
767 let block_digest = VerifiedBlock::compute_digest(&serialized_block);
768
769 if verified_cache.lock().get(&block_digest).is_some() {
771 skipped_count += 1;
772 continue; }
774
775 let signed_block: SignedBlock =
776 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
777
778 if let Err(e) = block_verifier.verify(&signed_block) {
779 let hostname = context.committee.authority(peer_index).hostname.clone();
782
783 context
784 .metrics
785 .node_metrics
786 .invalid_blocks
787 .with_label_values(&[hostname.as_str(), "synchronizer", e.name()])
788 .inc();
789 warn!("Invalid block received from {}: {}", peer_index, e);
790 return Err(e);
791 }
792
793 verified_cache.lock().put(block_digest, ());
795
796 let verified_block = VerifiedBlock::new_verified_with_digest(
797 signed_block,
798 serialized_block,
799 block_digest,
800 );
801
802 let now = context.clock.timestamp_utc_ms();
806 let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
807 if drift > 0 {
808 let peer_hostname = &context
809 .committee
810 .authority(verified_block.author())
811 .hostname;
812 context
813 .metrics
814 .node_metrics
815 .block_timestamp_drift_ms
816 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
817 .inc_by(drift);
818
819 if context
820 .protocol_config
821 .consensus_median_timestamp_with_checkpoint_enforcement()
822 {
823 trace!(
824 "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
825 verified_block.reference(),
826 verified_block.timestamp_ms(),
827 now
828 );
829 } else {
830 warn!(
831 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
832 verified_block.reference(),
833 verified_block.timestamp_ms(),
834 now
835 );
836 continue;
837 }
838 }
839
840 verified_blocks.push(verified_block);
841 }
842
843 if skipped_count > 0 {
845 let peer_hostname = &context.committee.authority(peer_index).hostname;
846 context
847 .metrics
848 .node_metrics
849 .synchronizer_skipped_blocks_by_peer
850 .with_label_values(&[peer_hostname.as_str(), sync_method])
851 .inc_by(skipped_count);
852 }
853
854 Ok(verified_blocks)
855 }
856
857 async fn fetch_blocks_request(
858 network_client: Arc<C>,
859 peer: AuthorityIndex,
860 blocks_guard: BlocksGuard,
861 highest_rounds: Vec<Round>,
862 request_timeout: Duration,
863 mut retries: u32,
864 ) -> (
865 ConsensusResult<Vec<Bytes>>,
866 BlocksGuard,
867 u32,
868 AuthorityIndex,
869 Vec<Round>,
870 ) {
871 let start = Instant::now();
872 let resp = timeout(
873 request_timeout,
874 network_client.fetch_blocks(
875 peer,
876 blocks_guard
877 .block_refs
878 .clone()
879 .into_iter()
880 .collect::<Vec<_>>(),
881 highest_rounds.clone(),
882 request_timeout,
883 ),
884 )
885 .await;
886
887 fail_point_async!("consensus-delay");
888
889 let resp = match resp {
890 Ok(Err(err)) => {
891 sleep_until(start + request_timeout).await;
894 retries += 1;
895 Err(err)
896 } Err(err) => {
898 sleep_until(start + request_timeout).await;
900 retries += 1;
901 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
902 }
903 Ok(result) => result,
904 };
905 (resp, blocks_guard, retries, peer, highest_rounds)
906 }
907
908 fn start_fetch_own_last_block_task(&mut self) {
909 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
910 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
911
912 let context = self.context.clone();
913 let dag_state = self.dag_state.clone();
914 let network_client = self.network_client.clone();
915 let block_verifier = self.block_verifier.clone();
916 let core_dispatcher = self.core_dispatcher.clone();
917
918 self.fetch_own_last_block_task
919 .spawn(monitored_future!(async move {
920 let _scope = monitored_scope("FetchOwnLastBlockTask");
921
922 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
923 let network_client_cloned = network_client.clone();
924 let own_index = context.own_index;
925 async move {
926 sleep(fetch_own_block_delay).await;
927 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
928 (r, authority_index)
929 }
930 };
931
932 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
933 let mut result = Vec::new();
934 for serialized_block in blocks {
935 let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
936 block_verifier.verify(&signed_block).tap_err(|err|{
937 let hostname = context.committee.authority(authority_index).hostname.clone();
938 context
939 .metrics
940 .node_metrics
941 .invalid_blocks
942 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
943 .inc();
944 warn!("Invalid block received from {}: {}", authority_index, err);
945 })?;
946
947 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
948 if verified_block.author() != context.own_index {
949 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
950 }
951 result.push(verified_block);
952 }
953 Ok(result)
954 };
955
956 let mut highest_round = GENESIS_ROUND;
958 let mut received_response = vec![false; context.committee.size()];
960 received_response[context.own_index] = true;
962 let mut total_stake = context.committee.stake(context.own_index);
963 let mut retries = 0;
964 let mut retry_delay_step = Duration::from_millis(500);
965 'main:loop {
966 if context.committee.size() == 1 {
967 highest_round = dag_state.read().get_last_proposed_block().round();
968 info!("Only one node in the network, will not try fetching own last block from peers.");
969 break 'main;
970 }
971
972 let mut results = FuturesUnordered::new();
974
975 for (authority_index, _authority) in context.committee.authorities() {
976 if !received_response[authority_index] {
978 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
979 }
980 }
981
982 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
984 tokio::pin!(timer);
985
986 'inner: loop {
987 tokio::select! {
988 result = results.next() => {
989 let Some((result, authority_index)) = result else {
990 break 'inner;
991 };
992 match result {
993 Ok(result) => {
994 match process_blocks(result, authority_index) {
995 Ok(blocks) => {
996 received_response[authority_index] = true;
997 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
998 highest_round = highest_round.max(max_round);
999
1000 total_stake += context.committee.stake(authority_index);
1001 },
1002 Err(err) => {
1003 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
1004 }
1005 }
1006 },
1007 Err(err) => {
1008 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
1009 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
1010 }
1011 }
1012 },
1013 () = &mut timer => {
1014 info!("Timeout while trying to sync our own last block from peers");
1015 break 'inner;
1016 }
1017 }
1018 }
1019
1020 if context.committee.reached_quorum(total_stake) {
1022 info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1023 break 'main;
1024 } else {
1025 info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1026 }
1027
1028 retries += 1;
1029 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
1030 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
1031
1032 sleep(retry_delay_step).await;
1033
1034 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
1035 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
1036 }
1037
1038 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
1040
1041 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
1042 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
1043 }
1044 }));
1045 }
1046
1047 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
1048 let mut missing_blocks = self
1049 .core_dispatcher
1050 .get_missing_blocks()
1051 .await
1052 .map_err(|_err| ConsensusError::Shutdown)?;
1053
1054 if missing_blocks.is_empty() {
1056 return Ok(());
1057 }
1058
1059 let context = self.context.clone();
1060 let network_client = self.network_client.clone();
1061 let block_verifier = self.block_verifier.clone();
1062 let verified_cache = self.verified_blocks_cache.clone();
1063 let commit_vote_monitor = self.commit_vote_monitor.clone();
1064 let core_dispatcher = self.core_dispatcher.clone();
1065 let blocks_to_fetch = self.inflight_blocks_map.clone();
1066 let commands_sender = self.commands_sender.clone();
1067 let dag_state = self.dag_state.clone();
1068
1069 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
1070 trace!(
1071 "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
1072 );
1073 if commit_lagging {
1074 if dag_state.read().gc_enabled() {
1079 return Ok(());
1080 }
1081
1082 let highest_accepted_round = dag_state.read().highest_accepted_round();
1086 missing_blocks = missing_blocks
1087 .into_iter()
1088 .take_while(|(block_ref, _)| {
1089 block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
1090 })
1091 .collect::<BTreeMap<_, _>>();
1092
1093 if missing_blocks.is_empty() {
1096 trace!(
1097 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1098 );
1099 self.context
1100 .metrics
1101 .node_metrics
1102 .fetch_blocks_scheduler_skipped
1103 .with_label_values(&["commit_lagging"])
1104 .inc();
1105 return Ok(());
1106 }
1107 }
1108
1109 self.fetch_blocks_scheduler_task
1110 .spawn(monitored_future!(async move {
1111 let _scope = monitored_scope("FetchMissingBlocksScheduler");
1112
1113 context
1114 .metrics
1115 .node_metrics
1116 .fetch_blocks_scheduler_inflight
1117 .inc();
1118 let total_requested = missing_blocks.len();
1119
1120 fail_point_async!("consensus-delay");
1121
1122 let results = Self::fetch_blocks_from_authorities(
1124 context.clone(),
1125 blocks_to_fetch.clone(),
1126 network_client,
1127 missing_blocks,
1128 dag_state,
1129 )
1130 .await;
1131 context
1132 .metrics
1133 .node_metrics
1134 .fetch_blocks_scheduler_inflight
1135 .dec();
1136 if results.is_empty() {
1137 warn!("No results returned while requesting missing blocks");
1138 return;
1139 }
1140
1141 let mut total_fetched = 0;
1143 for (blocks_guard, fetched_blocks, peer) in results {
1144 total_fetched += fetched_blocks.len();
1145
1146 if let Err(err) = Self::process_fetched_blocks(
1147 fetched_blocks,
1148 peer,
1149 blocks_guard,
1150 core_dispatcher.clone(),
1151 block_verifier.clone(),
1152 verified_cache.clone(),
1153 commit_vote_monitor.clone(),
1154 context.clone(),
1155 commands_sender.clone(),
1156 "periodic",
1157 )
1158 .await
1159 {
1160 warn!(
1161 "Error occurred while processing fetched blocks from peer {peer}: {err}"
1162 );
1163 }
1164 }
1165
1166 debug!(
1167 "Total blocks requested to fetch: {}, total fetched: {}",
1168 total_requested, total_fetched
1169 );
1170 }));
1171 Ok(())
1172 }
1173
1174 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1175 let last_commit_index = self.dag_state.read().last_commit_index();
1176 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1177 let commit_threshold = last_commit_index
1178 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1179 (
1180 commit_threshold < quorum_commit_index,
1181 last_commit_index,
1182 quorum_commit_index,
1183 )
1184 }
1185
1186 fn missing_block_round_threshold(&self) -> Round {
1193 self.context.parameters.commit_sync_batch_size
1194 }
1195
1196 async fn fetch_blocks_from_authorities(
1211 context: Arc<Context>,
1212 inflight_blocks: Arc<InflightBlocksMap>,
1213 network_client: Arc<C>,
1214 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1215 dag_state: Arc<RwLock<DagState>>,
1216 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1217 let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1219 for (missing_block_ref, authorities) in &missing_blocks {
1220 for author in authorities {
1221 if author == &context.own_index {
1222 continue;
1224 }
1225 authority_to_blocks
1226 .entry(*author)
1227 .or_default()
1228 .push(*missing_block_ref);
1229 }
1230 }
1231
1232 #[cfg(not(test))]
1236 let mut rng = StdRng::from_entropy();
1237
1238 #[cfg(not(test))]
1241 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1242 authority_to_blocks
1243 .iter()
1244 .choose_multiple(
1245 &mut rng,
1246 MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1247 )
1248 .into_iter()
1249 .map(|(&peer, blocks)| {
1250 let limited_blocks = blocks
1251 .iter()
1252 .copied()
1253 .take(context.parameters.max_blocks_per_sync)
1254 .collect();
1255 (peer, limited_blocks, "periodic_known")
1256 })
1257 .collect();
1258 #[cfg(test)]
1259 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1261 let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1262 .iter()
1263 .map(|(&peer, blocks)| {
1264 let limited_blocks = blocks
1265 .iter()
1266 .copied()
1267 .take(context.parameters.max_blocks_per_sync)
1268 .collect();
1269 (peer, limited_blocks, "periodic_known")
1270 })
1271 .collect();
1272 items.sort_by_key(|(peer, _, _)| *peer);
1275 items
1276 .into_iter()
1277 .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1278 .collect()
1279 };
1280
1281 let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1284 .iter()
1285 .map(|(peer, _, _)| *peer)
1286 .collect();
1287
1288 let random_candidates: Vec<_> = context
1289 .committee
1290 .authorities()
1291 .filter_map(|(peer_index, _)| {
1292 (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1293 .then_some(peer_index)
1294 })
1295 .collect();
1296 #[cfg(test)]
1297 let random_peers: Vec<AuthorityIndex> = random_candidates
1298 .into_iter()
1299 .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1300 .collect();
1301 #[cfg(not(test))]
1302 let random_peers: Vec<AuthorityIndex> = random_candidates
1303 .into_iter()
1304 .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1305
1306 #[cfg_attr(test, allow(unused_mut))]
1307 let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1308 #[cfg(not(test))]
1311 all_missing_blocks.shuffle(&mut rng);
1312
1313 let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1314
1315 for peer in random_peers {
1316 if let Some(chunk) = block_chunks.next() {
1317 chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1318 } else {
1319 break;
1320 }
1321 }
1322
1323 let mut request_futures = FuturesUnordered::new();
1324
1325 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1326
1327 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1329 for block in &all_missing_blocks {
1330 missing_blocks_per_authority[block.author] += 1;
1331 }
1332 for (missing, (_, authority)) in missing_blocks_per_authority
1333 .into_iter()
1334 .zip(context.committee.authorities())
1335 {
1336 context
1337 .metrics
1338 .node_metrics
1339 .synchronizer_missing_blocks_by_authority
1340 .with_label_values(&[&authority.hostname])
1341 .inc_by(missing as u64);
1342 context
1343 .metrics
1344 .node_metrics
1345 .synchronizer_current_missing_blocks_by_authority
1346 .with_label_values(&[&authority.hostname])
1347 .set(missing as i64);
1348 }
1349
1350 #[cfg_attr(test, expect(unused_mut))]
1353 let mut remaining_peers: Vec<_> = context
1354 .committee
1355 .authorities()
1356 .filter_map(|(peer_index, _)| {
1357 if peer_index != context.own_index
1358 && !chosen_peers_with_blocks
1359 .iter()
1360 .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1361 {
1362 Some(peer_index)
1363 } else {
1364 None
1365 }
1366 })
1367 .collect();
1368
1369 #[cfg(not(test))]
1370 remaining_peers.shuffle(&mut rng);
1371 let mut remaining_peers = remaining_peers.into_iter();
1372
1373 for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1375 let peer_hostname = &context.committee.authority(peer).hostname;
1376 let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1377
1378 if let Some(blocks_guard) =
1381 inflight_blocks.lock_blocks(block_refs.clone(), peer, SyncMethod::Periodic)
1382 {
1383 info!(
1384 "Periodic sync of {} missing blocks from peer {} {}: {}",
1385 block_refs.len(),
1386 peer,
1387 peer_hostname,
1388 block_refs
1389 .iter()
1390 .map(|b| b.to_string())
1391 .collect::<Vec<_>>()
1392 .join(", ")
1393 );
1394 let metrics = &context.metrics.node_metrics;
1396 metrics
1397 .synchronizer_requested_blocks_by_peer
1398 .with_label_values(&[peer_hostname.as_str(), label])
1399 .inc_by(block_refs.len() as u64);
1400 for block_ref in &block_refs {
1401 let block_hostname = &context.committee.authority(block_ref.author).hostname;
1402 metrics
1403 .synchronizer_requested_blocks_by_authority
1404 .with_label_values(&[block_hostname.as_str(), label])
1405 .inc();
1406 }
1407 request_futures.push(Self::fetch_blocks_request(
1408 network_client.clone(),
1409 peer,
1410 blocks_guard,
1411 highest_rounds.clone(),
1412 FETCH_REQUEST_TIMEOUT,
1413 1,
1414 ));
1415 }
1416 }
1417
1418 let mut results = Vec::new();
1419 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1420
1421 tokio::pin!(fetcher_timeout);
1422
1423 loop {
1424 tokio::select! {
1425 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1426 let peer_hostname = &context.committee.authority(peer_index).hostname;
1427 match response {
1428 Ok(fetched_blocks) => {
1429 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1430 results.push((blocks_guard, fetched_blocks, peer_index));
1431
1432 if request_futures.is_empty() {
1434 break;
1435 }
1436 },
1437 Err(_) => {
1438 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1439 if let Some(next_peer) = remaining_peers.next() {
1441 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1443 info!(
1444 "Retrying syncing {} missing blocks from peer {}: {}",
1445 blocks_guard.block_refs.len(),
1446 peer_hostname,
1447 blocks_guard.block_refs
1448 .iter()
1449 .map(|b| b.to_string())
1450 .collect::<Vec<_>>()
1451 .join(", ")
1452 );
1453 let block_refs = blocks_guard.block_refs.clone();
1454 let metrics = &context.metrics.node_metrics;
1456 metrics
1457 .synchronizer_requested_blocks_by_peer
1458 .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1459 .inc_by(block_refs.len() as u64);
1460 for block_ref in &block_refs {
1461 let block_hostname =
1462 &context.committee.authority(block_ref.author).hostname;
1463 metrics
1464 .synchronizer_requested_blocks_by_authority
1465 .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1466 .inc();
1467 }
1468 request_futures.push(Self::fetch_blocks_request(
1469 network_client.clone(),
1470 next_peer,
1471 blocks_guard,
1472 highest_rounds,
1473 FETCH_REQUEST_TIMEOUT,
1474 1,
1475 ));
1476 } else {
1477 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1478 }
1479 } else {
1480 debug!("No more peers left to fetch blocks");
1481 }
1482 }
1483 }
1484 },
1485 _ = &mut fetcher_timeout => {
1486 debug!("Timed out while fetching missing blocks");
1487 break;
1488 }
1489 }
1490 }
1491
1492 results
1493 }
1494}
1495
1496#[cfg(test)]
1497mod tests {
1498 use std::{
1499 collections::{BTreeMap, BTreeSet},
1500 num::NonZeroUsize,
1501 sync::Arc,
1502 time::Duration,
1503 };
1504
1505 use async_trait::async_trait;
1506 use bytes::Bytes;
1507 use consensus_config::{AuthorityIndex, Parameters};
1508 use iota_metrics::monitored_mpsc;
1509 use lru::LruCache;
1510 use parking_lot::{Mutex as SyncMutex, RwLock};
1511 use tokio::{sync::Mutex, time::sleep};
1512
1513 use crate::{
1514 CommitDigest, CommitIndex,
1515 authority_service::COMMIT_LAG_MULTIPLIER,
1516 block::{BlockDigest, BlockRef, Round, SignedBlock, TestBlock, VerifiedBlock},
1517 block_verifier::{BlockVerifier, NoopBlockVerifier},
1518 commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1519 commit_vote_monitor::CommitVoteMonitor,
1520 context::Context,
1521 core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1522 dag_state::DagState,
1523 error::{ConsensusError, ConsensusResult},
1524 network::{BlockStream, NetworkClient},
1525 round_prober::QuorumRound,
1526 storage::mem_store::MemStore,
1527 synchronizer::{
1528 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, SyncMethod,
1529 Synchronizer, VERIFIED_BLOCKS_CACHE_CAP,
1530 },
1531 };
1532
1533 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1534 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1535 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1536 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1537
1538 struct FailingBlockVerifier;
1540
1541 impl BlockVerifier for FailingBlockVerifier {
1542 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1543 Err(ConsensusError::WrongEpoch {
1544 expected: 1,
1545 actual: 0,
1546 })
1547 }
1548
1549 fn check_ancestors(
1550 &self,
1551 _block: &VerifiedBlock,
1552 _ancestors: &[Option<VerifiedBlock>],
1553 _gc_enabled: bool,
1554 _gc_round: Round,
1555 ) -> ConsensusResult<()> {
1556 Ok(())
1557 }
1558 }
1559
1560 #[derive(Default)]
1561 struct MockNetworkClient {
1562 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1563 fetch_latest_blocks_requests:
1564 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1565 }
1566
1567 impl MockNetworkClient {
1568 async fn stub_fetch_blocks(
1569 &self,
1570 blocks: Vec<VerifiedBlock>,
1571 peer: AuthorityIndex,
1572 latency: Option<Duration>,
1573 ) {
1574 let mut lock = self.fetch_blocks_requests.lock().await;
1575 let block_refs = blocks
1576 .iter()
1577 .map(|block| block.reference())
1578 .collect::<Vec<_>>();
1579 lock.insert((block_refs, peer), (blocks, latency));
1580 }
1581
1582 async fn stub_fetch_latest_blocks(
1583 &self,
1584 blocks: Vec<VerifiedBlock>,
1585 peer: AuthorityIndex,
1586 authorities: Vec<AuthorityIndex>,
1587 latency: Option<Duration>,
1588 ) {
1589 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1590 lock.entry((peer, authorities))
1591 .or_default()
1592 .push((blocks, latency));
1593 }
1594
1595 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1596 let lock = self.fetch_latest_blocks_requests.lock().await;
1597 lock.len()
1598 }
1599 }
1600
1601 #[async_trait]
1602 impl NetworkClient for MockNetworkClient {
1603 const SUPPORT_STREAMING: bool = false;
1604
1605 async fn send_block(
1606 &self,
1607 _peer: AuthorityIndex,
1608 _serialized_block: &VerifiedBlock,
1609 _timeout: Duration,
1610 ) -> ConsensusResult<()> {
1611 unimplemented!("Unimplemented")
1612 }
1613
1614 async fn subscribe_blocks(
1615 &self,
1616 _peer: AuthorityIndex,
1617 _last_received: Round,
1618 _timeout: Duration,
1619 ) -> ConsensusResult<BlockStream> {
1620 unimplemented!("Unimplemented")
1621 }
1622
1623 async fn fetch_blocks(
1624 &self,
1625 peer: AuthorityIndex,
1626 block_refs: Vec<BlockRef>,
1627 _highest_accepted_rounds: Vec<Round>,
1628 _timeout: Duration,
1629 ) -> ConsensusResult<Vec<Bytes>> {
1630 let mut lock = self.fetch_blocks_requests.lock().await;
1631 let response = lock
1632 .remove(&(block_refs, peer))
1633 .expect("Unexpected fetch blocks request made");
1634
1635 let serialised = response
1636 .0
1637 .into_iter()
1638 .map(|block| block.serialized().clone())
1639 .collect::<Vec<_>>();
1640
1641 drop(lock);
1642
1643 if let Some(latency) = response.1 {
1644 sleep(latency).await;
1645 }
1646
1647 Ok(serialised)
1648 }
1649
1650 async fn fetch_commits(
1651 &self,
1652 _peer: AuthorityIndex,
1653 _commit_range: CommitRange,
1654 _timeout: Duration,
1655 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1656 unimplemented!("Unimplemented")
1657 }
1658
1659 async fn fetch_latest_blocks(
1660 &self,
1661 peer: AuthorityIndex,
1662 authorities: Vec<AuthorityIndex>,
1663 _timeout: Duration,
1664 ) -> ConsensusResult<Vec<Bytes>> {
1665 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1666 let mut responses = lock
1667 .remove(&(peer, authorities.clone()))
1668 .expect("Unexpected fetch blocks request made");
1669
1670 let response = responses.remove(0);
1671 let serialised = response
1672 .0
1673 .into_iter()
1674 .map(|block| block.serialized().clone())
1675 .collect::<Vec<_>>();
1676
1677 if !responses.is_empty() {
1678 lock.insert((peer, authorities), responses);
1679 }
1680
1681 drop(lock);
1682
1683 if let Some(latency) = response.1 {
1684 sleep(latency).await;
1685 }
1686
1687 Ok(serialised)
1688 }
1689
1690 async fn get_latest_rounds(
1691 &self,
1692 _peer: AuthorityIndex,
1693 _timeout: Duration,
1694 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1695 unimplemented!("Unimplemented")
1696 }
1697 }
1698
1699 #[test]
1700 fn test_inflight_blocks_map() {
1701 let map = InflightBlocksMap::new();
1703 let some_block_refs = [
1704 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1705 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1706 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1707 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1708 ];
1709 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1710
1711 {
1713 let mut all_guards = Vec::new();
1714
1715 for i in 1..=2 {
1717 let authority = AuthorityIndex::new_for_test(i);
1718
1719 let guard =
1720 map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1721 let guard = guard.expect("Guard should be created");
1722 assert_eq!(guard.block_refs.len(), 4);
1723
1724 all_guards.push(guard);
1725
1726 let guard =
1728 map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1729 assert!(guard.is_none());
1730 }
1731
1732 let authority_3 = AuthorityIndex::new_for_test(3);
1735
1736 let guard = map.lock_blocks(
1737 missing_block_refs.clone(),
1738 authority_3,
1739 SyncMethod::Periodic,
1740 );
1741 assert!(guard.is_none());
1742
1743 drop(all_guards.remove(0));
1746
1747 let guard = map.lock_blocks(
1748 missing_block_refs.clone(),
1749 authority_3,
1750 SyncMethod::Periodic,
1751 );
1752 let guard = guard.expect("Guard should be successfully acquired");
1753
1754 assert_eq!(guard.block_refs, missing_block_refs);
1755
1756 drop(guard);
1758 drop(all_guards);
1759
1760 assert_eq!(map.num_of_locked_blocks(), 0);
1761 }
1762
1763 {
1765 let authority_1 = AuthorityIndex::new_for_test(1);
1767 let guard = map
1768 .lock_blocks(
1769 missing_block_refs.clone(),
1770 authority_1,
1771 SyncMethod::Periodic,
1772 )
1773 .unwrap();
1774
1775 let authority_2 = AuthorityIndex::new_for_test(2);
1777 let guard = map.swap_locks(guard, authority_2);
1778
1779 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1780 }
1781 }
1782
1783 #[test]
1784 fn test_inflight_blocks_map_live_sync_limit() {
1785 let map = InflightBlocksMap::new();
1787 let some_block_refs = [
1788 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1789 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1790 ];
1791 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1792
1793 let authority_1 = AuthorityIndex::new_for_test(1);
1795 let guard_1 = map
1796 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1797 .expect("Should successfully lock with Live sync");
1798
1799 assert_eq!(guard_1.block_refs.len(), 2);
1800
1801 let authority_2 = AuthorityIndex::new_for_test(2);
1803 let guard_2 = map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1804
1805 assert!(
1806 guard_2.is_none(),
1807 "Should fail to lock - Live limit of 1 reached"
1808 );
1809
1810 drop(guard_1);
1812
1813 let guard_2 = map
1815 .lock_blocks(missing_block_refs, authority_2, SyncMethod::Live)
1816 .expect("Should successfully lock after authority 1 released");
1817
1818 assert_eq!(guard_2.block_refs.len(), 2);
1819 }
1820
1821 #[test]
1822 fn test_inflight_blocks_map_periodic_allows_more_concurrency() {
1823 let map = InflightBlocksMap::new();
1825 let some_block_refs = [
1826 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1827 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1828 ];
1829 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1830
1831 let authority_1 = AuthorityIndex::new_for_test(1);
1833 let guard_1 = map
1834 .lock_blocks(
1835 missing_block_refs.clone(),
1836 authority_1,
1837 SyncMethod::Periodic,
1838 )
1839 .expect("Should successfully lock with Periodic sync");
1840
1841 assert_eq!(guard_1.block_refs.len(), 2);
1842
1843 let authority_2 = AuthorityIndex::new_for_test(2);
1845 let guard_2 = map
1846 .lock_blocks(
1847 missing_block_refs.clone(),
1848 authority_2,
1849 SyncMethod::Periodic,
1850 )
1851 .expect("Should successfully lock - Periodic allows 2 authorities");
1852
1853 assert_eq!(guard_2.block_refs.len(), 2);
1854
1855 let authority_3 = AuthorityIndex::new_for_test(3);
1857 let guard_3 = map.lock_blocks(
1858 missing_block_refs.clone(),
1859 authority_3,
1860 SyncMethod::Periodic,
1861 );
1862
1863 assert!(
1864 guard_3.is_none(),
1865 "Should fail to lock - Periodic limit of 2 reached"
1866 );
1867
1868 drop(guard_1);
1870
1871 let guard_3 = map
1873 .lock_blocks(missing_block_refs, authority_3, SyncMethod::Periodic)
1874 .expect("Should successfully lock after authority 1 released");
1875
1876 assert_eq!(guard_3.block_refs.len(), 2);
1877 }
1878
1879 #[test]
1880 fn test_inflight_blocks_map_periodic_blocks_live_when_at_live_limit() {
1881 let map = InflightBlocksMap::new();
1883 let some_block_refs = [
1884 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1885 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1886 ];
1887 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1888
1889 let authority_1 = AuthorityIndex::new_for_test(1);
1891 let guard_1 = map
1892 .lock_blocks(
1893 missing_block_refs.clone(),
1894 authority_1,
1895 SyncMethod::Periodic,
1896 )
1897 .expect("Should successfully lock with Periodic sync");
1898
1899 assert_eq!(guard_1.block_refs.len(), 2);
1900
1901 let authority_2 = AuthorityIndex::new_for_test(2);
1904 let guard_2_live =
1905 map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1906
1907 assert!(
1908 guard_2_live.is_none(),
1909 "Should fail to lock with Live - total already at Live limit of 1"
1910 );
1911
1912 let guard_2_periodic = map
1915 .lock_blocks(missing_block_refs, authority_2, SyncMethod::Periodic)
1916 .expect("Should successfully lock with Periodic - under Periodic limit of 2");
1917
1918 assert_eq!(guard_2_periodic.block_refs.len(), 2);
1919 }
1920
1921 #[test]
1922 fn test_inflight_blocks_map_live_then_periodic_interaction() {
1923 let map = InflightBlocksMap::new();
1925 let some_block_refs = [
1926 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1927 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1928 ];
1929 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1930
1931 let authority_1 = AuthorityIndex::new_for_test(1);
1933 let guard_1 = map
1934 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1935 .expect("Should successfully lock with Live sync");
1936
1937 assert_eq!(guard_1.block_refs.len(), 2);
1938
1939 let authority_2 = AuthorityIndex::new_for_test(2);
1941 let guard_2_live =
1942 map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1943
1944 assert!(
1945 guard_2_live.is_none(),
1946 "Should fail to lock with Live - would exceed Live limit of 1"
1947 );
1948
1949 let guard_2 = map
1951 .lock_blocks(
1952 missing_block_refs.clone(),
1953 authority_2,
1954 SyncMethod::Periodic,
1955 )
1956 .expect("Should successfully lock with Periodic - total 2 is at Periodic limit");
1957
1958 assert_eq!(guard_2.block_refs.len(), 2);
1959
1960 let authority_3 = AuthorityIndex::new_for_test(3);
1963 let guard_3 = map.lock_blocks(missing_block_refs, authority_3, SyncMethod::Periodic);
1964
1965 assert!(
1966 guard_3.is_none(),
1967 "Should fail to lock with Periodic - would exceed Periodic limit of 2"
1968 );
1969 }
1970
1971 #[test]
1972 fn test_inflight_blocks_map_partial_locks_mixed_methods() {
1973 let map = InflightBlocksMap::new();
1975 let block_a = BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1976 let block_b = BlockRef::new(2, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1977 let block_c = BlockRef::new(3, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1978 let block_d = BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1979
1980 let guard_a = map
1982 .lock_blocks(
1983 [block_a].into(),
1984 AuthorityIndex::new_for_test(1),
1985 SyncMethod::Live,
1986 )
1987 .expect("Should lock block A");
1988 assert_eq!(guard_a.block_refs.len(), 1);
1989
1990 let guard_b1 = map
1992 .lock_blocks(
1993 [block_b].into(),
1994 AuthorityIndex::new_for_test(1),
1995 SyncMethod::Periodic,
1996 )
1997 .expect("Should lock block B");
1998 let guard_b2 = map
1999 .lock_blocks(
2000 [block_b].into(),
2001 AuthorityIndex::new_for_test(2),
2002 SyncMethod::Periodic,
2003 )
2004 .expect("Should lock block B again");
2005 assert_eq!(guard_b1.block_refs.len(), 1);
2006 assert_eq!(guard_b2.block_refs.len(), 1);
2007
2008 let guard_c = map
2010 .lock_blocks(
2011 [block_c].into(),
2012 AuthorityIndex::new_for_test(1),
2013 SyncMethod::Periodic,
2014 )
2015 .expect("Should lock block C");
2016 assert_eq!(guard_c.block_refs.len(), 1);
2017
2018 let all_blocks = [block_a, block_b, block_c, block_d].into();
2022 let guard_3 = map
2023 .lock_blocks(
2024 all_blocks,
2025 AuthorityIndex::new_for_test(3),
2026 SyncMethod::Periodic,
2027 )
2028 .expect("Should get partial lock");
2029
2030 assert_eq!(
2037 guard_3.block_refs.len(),
2038 3,
2039 "Should lock blocks A, C, and D"
2040 );
2041 assert!(
2042 guard_3.block_refs.contains(&block_a),
2043 "Should contain block A"
2044 );
2045 assert!(
2046 !guard_3.block_refs.contains(&block_b),
2047 "Should NOT contain block B (at limit)"
2048 );
2049 assert!(
2050 guard_3.block_refs.contains(&block_c),
2051 "Should contain block C"
2052 );
2053 assert!(
2054 guard_3.block_refs.contains(&block_d),
2055 "Should contain block D"
2056 );
2057 }
2058
2059 #[test]
2060 fn test_inflight_blocks_map_swap_locks_preserves_method() {
2061 let map = InflightBlocksMap::new();
2063 let some_block_refs = [
2064 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2065 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2066 ];
2067 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
2068
2069 let authority_1 = AuthorityIndex::new_for_test(1);
2071 let guard_1 = map
2072 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
2073 .expect("Should lock with Live sync");
2074
2075 assert_eq!(guard_1.block_refs.len(), 2);
2076
2077 let authority_2 = AuthorityIndex::new_for_test(2);
2079 let guard_2 = map
2080 .swap_locks(guard_1, authority_2)
2081 .expect("Should swap locks");
2082
2083 assert_eq!(guard_2.block_refs, missing_block_refs);
2085
2086 let authority_3 = AuthorityIndex::new_for_test(3);
2088 let guard_3 = map.lock_blocks(missing_block_refs.clone(), authority_3, SyncMethod::Live);
2089 assert!(guard_3.is_none(), "Should fail - Live limit reached");
2090
2091 let guard_3_periodic = map
2093 .lock_blocks(missing_block_refs, authority_3, SyncMethod::Periodic)
2094 .expect("Should lock with Periodic");
2095 assert_eq!(guard_3_periodic.block_refs.len(), 2);
2096 }
2097
2098 #[tokio::test]
2099 async fn test_process_fetched_blocks() {
2100 let (context, _) = Context::new_for_test(4);
2102 let context = Arc::new(context);
2103 let block_verifier = Arc::new(NoopBlockVerifier {});
2104 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2105 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2106 let (commands_sender, _commands_receiver) =
2107 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2108
2109 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2113 expected_blocks.extend(
2114 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2115 );
2116 assert_eq!(
2117 expected_blocks.len(),
2118 context.parameters.max_blocks_per_sync
2119 );
2120
2121 let expected_serialized_blocks = expected_blocks
2122 .iter()
2123 .map(|b| b.serialized().clone())
2124 .collect::<Vec<_>>();
2125
2126 let expected_block_refs = expected_blocks
2127 .iter()
2128 .map(|b| b.reference())
2129 .collect::<BTreeSet<_>>();
2130
2131 let peer_index = AuthorityIndex::new_for_test(2);
2133
2134 let inflight_blocks_map = InflightBlocksMap::new();
2136 let blocks_guard = inflight_blocks_map
2137 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2138 .expect("Failed to lock blocks");
2139
2140 assert_eq!(
2141 inflight_blocks_map.num_of_locked_blocks(),
2142 expected_block_refs.len()
2143 );
2144
2145 let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2147 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2148 )));
2149 let result = Synchronizer::<
2150 MockNetworkClient,
2151 NoopBlockVerifier,
2152 MockCoreThreadDispatcher,
2153 >::process_fetched_blocks(
2154 expected_serialized_blocks,
2155 peer_index,
2156 blocks_guard, core_dispatcher.clone(),
2158 block_verifier,
2159 verified_cache,
2160 commit_vote_monitor,
2161 context.clone(),
2162 commands_sender,
2163 "test",
2164 )
2165 .await;
2166
2167 assert!(result.is_ok());
2169
2170 let added_blocks = core_dispatcher.get_add_blocks().await;
2172 assert_eq!(
2173 added_blocks
2174 .iter()
2175 .map(|b| b.reference())
2176 .collect::<BTreeSet<_>>(),
2177 expected_block_refs,
2178 );
2179
2180 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2182 }
2183
2184 #[tokio::test]
2185 async fn test_process_fetched_blocks_duplicates() {
2186 let (context, _) = Context::new_for_test(4);
2188 let context = Arc::new(context);
2189 let block_verifier = Arc::new(NoopBlockVerifier {});
2190 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2191 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2192 let (commands_sender, _commands_receiver) =
2193 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2194
2195 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2199 expected_blocks.extend(
2200 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2201 );
2202 assert_eq!(
2203 expected_blocks.len(),
2204 context.parameters.max_blocks_per_sync
2205 );
2206
2207 let expected_serialized_blocks = expected_blocks
2208 .iter()
2209 .map(|b| b.serialized().clone())
2210 .collect::<Vec<_>>();
2211
2212 let expected_block_refs = expected_blocks
2213 .iter()
2214 .map(|b| b.reference())
2215 .collect::<BTreeSet<_>>();
2216
2217 let peer_index = AuthorityIndex::new_for_test(2);
2219
2220 let inflight_blocks_map = InflightBlocksMap::new();
2222 let blocks_guard = inflight_blocks_map
2223 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2224 .expect("Failed to lock blocks");
2225
2226 assert_eq!(
2227 inflight_blocks_map.num_of_locked_blocks(),
2228 expected_block_refs.len()
2229 );
2230
2231 let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2233 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2234 )));
2235
2236 let result = Synchronizer::<
2238 MockNetworkClient,
2239 NoopBlockVerifier,
2240 MockCoreThreadDispatcher,
2241 >::process_fetched_blocks(
2242 expected_serialized_blocks.clone(),
2243 peer_index,
2244 blocks_guard,
2245 core_dispatcher.clone(),
2246 block_verifier.clone(),
2247 verified_cache.clone(),
2248 commit_vote_monitor.clone(),
2249 context.clone(),
2250 commands_sender.clone(),
2251 "test",
2252 )
2253 .await;
2254
2255 assert!(result.is_ok());
2257
2258 let added_blocks = core_dispatcher.get_add_blocks().await;
2260 assert_eq!(
2261 added_blocks
2262 .iter()
2263 .map(|b| b.reference())
2264 .collect::<BTreeSet<_>>(),
2265 expected_block_refs,
2266 );
2267
2268 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2270
2271 let blocks_guard_second = inflight_blocks_map
2274 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2275 .expect("Failed to lock blocks for second call");
2276
2277 let result_second = Synchronizer::<
2278 MockNetworkClient,
2279 NoopBlockVerifier,
2280 MockCoreThreadDispatcher,
2281 >::process_fetched_blocks(
2282 expected_serialized_blocks,
2283 peer_index,
2284 blocks_guard_second,
2285 core_dispatcher.clone(),
2286 block_verifier,
2287 verified_cache.clone(),
2288 commit_vote_monitor,
2289 context.clone(),
2290 commands_sender,
2291 "test",
2292 )
2293 .await;
2294
2295 assert!(result_second.is_ok());
2296
2297 let added_blocks_second_call = core_dispatcher.get_add_blocks().await;
2300 assert!(
2301 added_blocks_second_call.is_empty(),
2302 "Expected no blocks to be added on second call due to LruCache, but got {} blocks",
2303 added_blocks_second_call.len()
2304 );
2305
2306 let cache_size = verified_cache.lock().len();
2308 assert_eq!(
2309 cache_size,
2310 expected_block_refs.len(),
2311 "Expected {} entries in the LruCache, but got {}",
2312 expected_block_refs.len(),
2313 cache_size
2314 );
2315 }
2316
2317 #[tokio::test]
2318 async fn test_successful_fetch_blocks_from_peer() {
2319 let (context, _) = Context::new_for_test(4);
2321 let context = Arc::new(context);
2322 let block_verifier = Arc::new(NoopBlockVerifier {});
2323 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2324 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2325 let network_client = Arc::new(MockNetworkClient::default());
2326 let store = Arc::new(MemStore::new());
2327 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2328
2329 let handle = Synchronizer::start(
2330 network_client.clone(),
2331 context,
2332 core_dispatcher.clone(),
2333 commit_vote_monitor,
2334 block_verifier,
2335 dag_state,
2336 false,
2337 );
2338
2339 let expected_blocks = (0..10)
2341 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2342 .collect::<Vec<_>>();
2343 let missing_blocks = expected_blocks
2344 .iter()
2345 .map(|block| block.reference())
2346 .collect::<BTreeSet<_>>();
2347
2348 let peer = AuthorityIndex::new_for_test(1);
2350 network_client
2351 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
2352 .await;
2353
2354 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2356
2357 sleep(Duration::from_millis(1_000)).await;
2359
2360 let added_blocks = core_dispatcher.get_add_blocks().await;
2362 assert_eq!(added_blocks, expected_blocks);
2363 }
2364
2365 #[tokio::test]
2366 async fn saturate_fetch_blocks_from_peer() {
2367 let (context, _) = Context::new_for_test(4);
2369 let context = Arc::new(context);
2370 let block_verifier = Arc::new(NoopBlockVerifier {});
2371 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2372 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2373 let network_client = Arc::new(MockNetworkClient::default());
2374 let store = Arc::new(MemStore::new());
2375 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2376
2377 let handle = Synchronizer::start(
2378 network_client.clone(),
2379 context,
2380 core_dispatcher.clone(),
2381 commit_vote_monitor,
2382 block_verifier,
2383 dag_state,
2384 false,
2385 );
2386
2387 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
2389 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
2390 .collect::<Vec<_>>();
2391
2392 let peer = AuthorityIndex::new_for_test(1);
2394 let mut iter = expected_blocks.iter().peekable();
2395 while let Some(block) = iter.next() {
2396 network_client
2399 .stub_fetch_blocks(
2400 vec![block.clone()],
2401 peer,
2402 Some(Duration::from_millis(5_000)),
2403 )
2404 .await;
2405
2406 let mut missing_blocks = BTreeSet::new();
2407 missing_blocks.insert(block.reference());
2408
2409 if iter.peek().is_none() {
2412 match handle.fetch_blocks(missing_blocks, peer).await {
2413 Err(ConsensusError::SynchronizerSaturated(index, _)) => {
2414 assert_eq!(index, peer);
2415 }
2416 _ => panic!("A saturated synchronizer error was expected"),
2417 }
2418 } else {
2419 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2420 }
2421 }
2422 }
2423
2424 #[tokio::test(flavor = "current_thread", start_paused = true)]
2425 async fn synchronizer_periodic_task_fetch_blocks() {
2426 let (context, _) = Context::new_for_test(4);
2428 let context = Arc::new(context);
2429 let block_verifier = Arc::new(NoopBlockVerifier {});
2430 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2431 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2432 let network_client = Arc::new(MockNetworkClient::default());
2433 let store = Arc::new(MemStore::new());
2434 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2435
2436 let expected_blocks = (0..10)
2438 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2439 .collect::<Vec<_>>();
2440 let missing_blocks = expected_blocks
2441 .iter()
2442 .map(|block| block.reference())
2443 .collect::<BTreeSet<_>>();
2444
2445 core_dispatcher
2447 .stub_missing_blocks(missing_blocks.clone())
2448 .await;
2449
2450 network_client
2454 .stub_fetch_blocks(
2455 expected_blocks.clone(),
2456 AuthorityIndex::new_for_test(1),
2457 Some(FETCH_REQUEST_TIMEOUT),
2458 )
2459 .await;
2460 network_client
2461 .stub_fetch_blocks(
2462 expected_blocks.clone(),
2463 AuthorityIndex::new_for_test(2),
2464 None,
2465 )
2466 .await;
2467
2468 let _handle = Synchronizer::start(
2470 network_client.clone(),
2471 context,
2472 core_dispatcher.clone(),
2473 commit_vote_monitor,
2474 block_verifier,
2475 dag_state,
2476 false,
2477 );
2478
2479 sleep(8 * FETCH_REQUEST_TIMEOUT).await;
2480
2481 let added_blocks = core_dispatcher.get_add_blocks().await;
2483 assert_eq!(added_blocks, expected_blocks);
2484
2485 assert!(
2487 core_dispatcher
2488 .get_missing_blocks()
2489 .await
2490 .unwrap()
2491 .is_empty()
2492 );
2493 }
2494
2495 #[tokio::test(flavor = "current_thread", start_paused = true)]
2496 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2497 let (mut context, _) = Context::new_for_test(4);
2499 context
2500 .protocol_config
2501 .set_consensus_batched_block_sync_for_testing(true);
2502 let context = Arc::new(context);
2503 let block_verifier = Arc::new(NoopBlockVerifier {});
2504 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2505 let network_client = Arc::new(MockNetworkClient::default());
2506 let store = Arc::new(MemStore::new());
2507 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2508 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2509
2510 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2513 let stub_blocks = (sync_missing_block_round_threshold * 2
2514 ..sync_missing_block_round_threshold * 2
2515 + context.parameters.max_blocks_per_sync as u32)
2516 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2517 .collect::<Vec<_>>();
2518 let missing_blocks = stub_blocks
2519 .iter()
2520 .map(|block| block.reference())
2521 .collect::<BTreeSet<_>>();
2522 core_dispatcher
2523 .stub_missing_blocks(missing_blocks.clone())
2524 .await;
2525 let mut expected_blocks = stub_blocks
2529 .iter()
2530 .take(context.parameters.max_blocks_per_sync)
2531 .cloned()
2532 .collect::<Vec<_>>();
2533 network_client
2534 .stub_fetch_blocks(
2535 expected_blocks.clone(),
2536 AuthorityIndex::new_for_test(1),
2537 Some(FETCH_REQUEST_TIMEOUT),
2538 )
2539 .await;
2540 network_client
2541 .stub_fetch_blocks(
2542 expected_blocks.clone(),
2543 AuthorityIndex::new_for_test(2),
2544 None,
2545 )
2546 .await;
2547
2548 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2550 let commit_index: CommitIndex = round - 1;
2551 let blocks = (0..4)
2552 .map(|authority| {
2553 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2554 let block = TestBlock::new(round, authority)
2555 .set_commit_votes(commit_votes)
2556 .build();
2557
2558 VerifiedBlock::new_for_test(block)
2559 })
2560 .collect::<Vec<_>>();
2561
2562 for block in blocks {
2565 commit_vote_monitor.observe_block(&block);
2566 }
2567
2568 let _handle = Synchronizer::start(
2571 network_client.clone(),
2572 context.clone(),
2573 core_dispatcher.clone(),
2574 commit_vote_monitor.clone(),
2575 block_verifier,
2576 dag_state.clone(),
2577 false,
2578 );
2579
2580 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2581
2582 let added_blocks = core_dispatcher.get_add_blocks().await;
2585 assert_eq!(added_blocks, vec![]);
2586
2587 println!("Before advancing");
2588 {
2591 let mut d = dag_state.write();
2592 for index in 1..=commit_index {
2593 let commit =
2594 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2595
2596 d.add_commit(commit);
2597 }
2598
2599 println!("Once advanced");
2600 assert_eq!(
2601 d.last_commit_index(),
2602 commit_vote_monitor.quorum_commit_index()
2603 );
2604 }
2605
2606 core_dispatcher
2608 .stub_missing_blocks(missing_blocks.clone())
2609 .await;
2610
2611 println!("Final sleep");
2612 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2613
2614 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2616 println!("Final await");
2617 added_blocks.sort_by_key(|block| block.reference());
2618 expected_blocks.sort_by_key(|block| block.reference());
2619
2620 assert_eq!(added_blocks, expected_blocks);
2621 }
2622
2623 #[tokio::test(flavor = "current_thread", start_paused = true)]
2624 async fn synchronizer_fetch_own_last_block() {
2625 let (context, _) = Context::new_for_test(4);
2627 let context = Arc::new(context.with_parameters(Parameters {
2628 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2629 ..Default::default()
2630 }));
2631 let block_verifier = Arc::new(NoopBlockVerifier {});
2632 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2633 let network_client = Arc::new(MockNetworkClient::default());
2634 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2635 let store = Arc::new(MemStore::new());
2636 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2637 let our_index = AuthorityIndex::new_for_test(0);
2638
2639 let mut expected_blocks = (8..=10)
2641 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2642 .collect::<Vec<_>>();
2643
2644 let block_1 = expected_blocks.pop().unwrap();
2647 network_client
2648 .stub_fetch_latest_blocks(
2649 vec![block_1.clone()],
2650 AuthorityIndex::new_for_test(1),
2651 vec![our_index],
2652 Some(Duration::from_secs(10)),
2653 )
2654 .await;
2655 network_client
2656 .stub_fetch_latest_blocks(
2657 vec![block_1],
2658 AuthorityIndex::new_for_test(1),
2659 vec![our_index],
2660 None,
2661 )
2662 .await;
2663
2664 let block_2 = expected_blocks.pop().unwrap();
2666 network_client
2667 .stub_fetch_latest_blocks(
2668 vec![block_2.clone()],
2669 AuthorityIndex::new_for_test(2),
2670 vec![our_index],
2671 Some(Duration::from_secs(10)),
2672 )
2673 .await;
2674 network_client
2675 .stub_fetch_latest_blocks(
2676 vec![block_2],
2677 AuthorityIndex::new_for_test(2),
2678 vec![our_index],
2679 None,
2680 )
2681 .await;
2682
2683 let block_3 = expected_blocks.pop().unwrap();
2685 network_client
2686 .stub_fetch_latest_blocks(
2687 vec![block_3.clone()],
2688 AuthorityIndex::new_for_test(3),
2689 vec![our_index],
2690 Some(Duration::from_secs(10)),
2691 )
2692 .await;
2693 network_client
2694 .stub_fetch_latest_blocks(
2695 vec![block_3],
2696 AuthorityIndex::new_for_test(3),
2697 vec![our_index],
2698 None,
2699 )
2700 .await;
2701
2702 let handle = Synchronizer::start(
2704 network_client.clone(),
2705 context.clone(),
2706 core_dispatcher.clone(),
2707 commit_vote_monitor,
2708 block_verifier,
2709 dag_state,
2710 true,
2711 );
2712
2713 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2715
2716 assert_eq!(
2718 core_dispatcher.get_last_own_proposed_round().await,
2719 vec![10]
2720 );
2721
2722 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2724
2725 assert_eq!(
2727 context
2728 .metrics
2729 .node_metrics
2730 .sync_last_known_own_block_retries
2731 .get(),
2732 1
2733 );
2734
2735 if let Err(err) = handle.stop().await {
2737 if err.is_panic() {
2738 std::panic::resume_unwind(err.into_panic());
2739 }
2740 }
2741 }
2742 #[derive(Default)]
2743 struct SyncMockDispatcher {
2744 missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2745 added_blocks: Mutex<Vec<VerifiedBlock>>,
2746 }
2747
2748 #[async_trait::async_trait]
2749 impl CoreThreadDispatcher for SyncMockDispatcher {
2750 async fn get_missing_blocks(
2751 &self,
2752 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2753 Ok(self.missing_blocks.lock().await.clone())
2754 }
2755 async fn add_blocks(
2756 &self,
2757 blocks: Vec<VerifiedBlock>,
2758 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2759 let mut guard = self.added_blocks.lock().await;
2760 guard.extend(blocks.clone());
2761 Ok(blocks.iter().map(|b| b.reference()).collect())
2762 }
2763
2764 async fn check_block_refs(
2767 &self,
2768 block_refs: Vec<BlockRef>,
2769 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2770 Ok(block_refs.into_iter().collect())
2772 }
2773
2774 async fn add_certified_commits(
2775 &self,
2776 _commits: CertifiedCommits,
2777 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2778 Ok(BTreeSet::new())
2780 }
2781
2782 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2783 Ok(())
2784 }
2785
2786 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2787 Ok(())
2788 }
2789
2790 fn set_propagation_delay_and_quorum_rounds(
2791 &self,
2792 _delay: Round,
2793 _received_quorum_rounds: Vec<QuorumRound>,
2794 _accepted_quorum_rounds: Vec<QuorumRound>,
2795 ) -> Result<(), CoreError> {
2796 Ok(())
2797 }
2798
2799 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2800 Ok(())
2801 }
2802
2803 fn highest_received_rounds(&self) -> Vec<Round> {
2804 Vec::new()
2805 }
2806 }
2807
2808 #[tokio::test(flavor = "current_thread")]
2809 async fn known_before_random_peer_fetch() {
2810 {
2811 let (ctx, _) = Context::new_for_test(10);
2813 let context = Arc::new(ctx);
2814 let store = Arc::new(MemStore::new());
2815 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2816 let inflight = InflightBlocksMap::new();
2817
2818 let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2820 let missing_ref = missing_vb.reference();
2821 let missing_blocks = BTreeMap::from([(
2822 missing_ref,
2823 BTreeSet::from([
2824 AuthorityIndex::new_for_test(2),
2825 AuthorityIndex::new_for_test(3),
2826 AuthorityIndex::new_for_test(4),
2827 ]),
2828 )]);
2829
2830 let network_client = Arc::new(MockNetworkClient::default());
2832 for i in 1..=9 {
2834 let peer = AuthorityIndex::new_for_test(i);
2835 if i == 1 || i == 4 {
2836 network_client
2837 .stub_fetch_blocks(
2838 vec![missing_vb.clone()],
2839 peer,
2840 Some(2 * FETCH_REQUEST_TIMEOUT),
2841 )
2842 .await;
2843 continue;
2844 }
2845 network_client
2846 .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2847 .await;
2848 }
2849
2850 let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2853 ::fetch_blocks_from_authorities(
2854 context.clone(),
2855 inflight.clone(),
2856 network_client.clone(),
2857 missing_blocks,
2858 dag_state.clone(),
2859 )
2860 .await;
2861
2862 assert_eq!(results.len(), 2);
2865
2866 let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2868 assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2869 let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2870 assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2871 let expected = missing_vb.serialized().clone();
2873 assert_eq!(hot_bytes, &vec![expected]);
2874 }
2875 }
2876
2877 #[tokio::test(flavor = "current_thread")]
2878 async fn known_before_periodic_peer_fetch_larger_scenario() {
2879 use std::{
2880 collections::{BTreeMap, BTreeSet},
2881 sync::Arc,
2882 };
2883
2884 use parking_lot::RwLock;
2885
2886 use crate::{
2887 block::{Round, TestBlock, VerifiedBlock},
2888 context::Context,
2889 };
2890
2891 let (ctx, _) = Context::new_for_test(10);
2893 let context = Arc::new(ctx);
2894 let store = Arc::new(MemStore::new());
2895 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2896 let inflight = InflightBlocksMap::new();
2897 let network_client = Arc::new(MockNetworkClient::default());
2898
2899 let mut missing_blocks = BTreeMap::new();
2901 let mut missing_vbs = Vec::new();
2902 let known_number_blocks = 10;
2903 for i in 0..1000 {
2904 let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2905 let r = vb.reference();
2906 if i < known_number_blocks {
2907 missing_blocks.insert(
2909 r,
2910 BTreeSet::from([
2911 AuthorityIndex::new_for_test(0),
2912 AuthorityIndex::new_for_test(2),
2913 ]),
2914 );
2915 } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2916 missing_blocks.insert(
2918 r,
2919 BTreeSet::from([
2920 AuthorityIndex::new_for_test(0),
2921 AuthorityIndex::new_for_test(3),
2922 ]),
2923 );
2924 } else {
2925 missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2927 }
2928 missing_vbs.push(vb);
2929 }
2930
2931 let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2933 let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2934 .iter()
2935 .map(|&peer| {
2936 let vbs = missing_vbs
2937 .iter()
2938 .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2939 .take(context.parameters.max_blocks_per_sync)
2940 .cloned()
2941 .collect::<Vec<_>>();
2942 (peer, vbs)
2943 })
2944 .collect();
2945
2946 for (peer, vbs) in known_vbs_by_peer {
2947 if peer == AuthorityIndex::new_for_test(2) {
2948 network_client
2950 .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2951 .await;
2952 network_client
2953 .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2954 .await;
2955 } else {
2956 network_client
2957 .stub_fetch_blocks(vbs.clone(), peer, None)
2958 .await;
2959 }
2960 }
2961
2962 network_client
2964 .stub_fetch_blocks(
2965 missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2966 AuthorityIndex::new_for_test(1),
2967 None,
2968 )
2969 .await;
2970
2971 network_client
2972 .stub_fetch_blocks(
2973 missing_vbs[context.parameters.max_blocks_per_sync
2974 ..2 * context.parameters.max_blocks_per_sync]
2975 .to_vec(),
2976 AuthorityIndex::new_for_test(4),
2977 None,
2978 )
2979 .await;
2980
2981 let results = Synchronizer::<
2983 MockNetworkClient,
2984 NoopBlockVerifier,
2985 SyncMockDispatcher,
2986 >::fetch_blocks_from_authorities(
2987 context.clone(),
2988 inflight.clone(),
2989 network_client.clone(),
2990 missing_blocks,
2991 dag_state.clone(),
2992 )
2993 .await;
2994
2995 assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2998
2999 let (_guard3, bytes3, peer3) = &results[0];
3001 assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
3002 let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
3003 .iter()
3004 .map(|vb| vb.serialized().clone())
3005 .collect::<Vec<_>>();
3006 assert_eq!(bytes3, &expected2);
3007
3008 let (_guard1, bytes1, peer1) = &results[1];
3010 assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
3011 let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
3012 .iter()
3013 .map(|vb| vb.serialized().clone())
3014 .collect::<Vec<_>>();
3015 assert_eq!(bytes1, &expected1);
3016
3017 let (_guard4, bytes4, peer4) = &results[2];
3019 assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
3020 let expected4 = missing_vbs
3021 [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
3022 .iter()
3023 .map(|vb| vb.serialized().clone())
3024 .collect::<Vec<_>>();
3025 assert_eq!(bytes4, &expected4);
3026
3027 let (_guard5, bytes5, peer5) = &results[3];
3029 assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
3030 let expected5 = missing_vbs[0..known_number_blocks]
3031 .iter()
3032 .map(|vb| vb.serialized().clone())
3033 .collect::<Vec<_>>();
3034 assert_eq!(bytes5, &expected5);
3035 }
3036
3037 #[tokio::test(flavor = "current_thread")]
3038 async fn test_verify_blocks_deduplication() {
3039 let (context, _keys) = Context::new_for_test(4);
3040 let context = Arc::new(context);
3041 let block_verifier = Arc::new(NoopBlockVerifier {});
3042 let failing_verifier = Arc::new(FailingBlockVerifier);
3043 let peer1 = AuthorityIndex::new_for_test(1);
3044 let peer2 = AuthorityIndex::new_for_test(2);
3045
3046 let cache = Arc::new(SyncMutex::new(LruCache::new(NonZeroUsize::new(5).unwrap())));
3048
3049 let block1 = VerifiedBlock::new_for_test(TestBlock::new(10, 0).build());
3051 let serialized1 = vec![block1.serialized().clone()];
3052
3053 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3055 serialized1.clone(), block_verifier.clone(), cache.clone(), &context, peer1, "live",
3056 );
3057 assert_eq!(result.unwrap().len(), 1);
3058
3059 let peer1_hostname = &context.committee.authority(peer1).hostname;
3060 assert_eq!(
3061 context
3062 .metrics
3063 .node_metrics
3064 .synchronizer_skipped_blocks_by_peer
3065 .with_label_values(&[peer1_hostname.as_str(), "live"])
3066 .get(),
3067 0
3068 );
3069
3070 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3072 serialized1, block_verifier.clone(), cache.clone(), &context, peer2, "periodic",
3073 );
3074 assert_eq!(result.unwrap().len(), 0, "Should skip cached block");
3075
3076 let peer2_hostname = &context.committee.authority(peer2).hostname;
3077 assert_eq!(
3078 context
3079 .metrics
3080 .node_metrics
3081 .synchronizer_skipped_blocks_by_peer
3082 .with_label_values(&[peer2_hostname.as_str(), "periodic"])
3083 .get(),
3084 1
3085 );
3086
3087 let invalid_block = VerifiedBlock::new_for_test(TestBlock::new(20, 0).build());
3089 let invalid_serialized = vec![invalid_block.serialized().clone()];
3090
3091 assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3092 invalid_serialized.clone(), failing_verifier.clone(), cache.clone(), &context, peer1, "test",
3093 ).is_err());
3094 assert_eq!(cache.lock().len(), 1, "Invalid block should not be cached");
3095
3096 assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3098 invalid_serialized, failing_verifier, cache.clone(), &context, peer1, "test",
3099 ).is_err());
3100
3101 let blocks: Vec<_> = (0..5)
3103 .map(|i| VerifiedBlock::new_for_test(TestBlock::new(30 + i, 0).build()))
3104 .collect();
3105
3106 for block in &blocks {
3108 Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3109 vec![block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3110 ).unwrap();
3111 }
3112 assert_eq!(cache.lock().len(), 5);
3113
3114 let new_block = VerifiedBlock::new_for_test(TestBlock::new(99, 0).build());
3116 Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3117 vec![new_block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3118 ).unwrap();
3119
3120 let block1_serialized = vec![block1.serialized().clone()];
3123 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3124 block1_serialized, block_verifier.clone(), cache.clone(), &context, peer1, "test",
3125 );
3126 assert_eq!(
3127 result.unwrap().len(),
3128 1,
3129 "Evicted block should be re-verified"
3130 );
3131
3132 let new_block_serialized = vec![new_block.serialized().clone()];
3134 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3135 new_block_serialized, block_verifier, cache, &context, peer1, "test",
3136 );
3137 assert_eq!(result.unwrap().len(), 0, "New block should be cached");
3138 }
3139}