1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7 num::NonZeroUsize,
8 sync::Arc,
9 time::Duration,
10};
11
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use futures::{StreamExt as _, stream::FuturesUnordered};
15use iota_macros::fail_point_async;
16use iota_metrics::{
17 monitored_future,
18 monitored_mpsc::{Receiver, Sender, channel},
19 monitored_scope,
20};
21use itertools::Itertools as _;
22use lru::LruCache;
23use parking_lot::{Mutex, RwLock};
24#[cfg(not(test))]
25use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
26use tap::TapFallible;
27use tokio::{
28 runtime::Handle,
29 sync::{mpsc::error::TrySendError, oneshot},
30 task::{JoinError, JoinSet},
31 time::{Instant, sleep, sleep_until, timeout},
32};
33use tracing::{debug, error, info, trace, warn};
34
35use crate::{
36 BlockAPI, CommitIndex, Round,
37 authority_service::COMMIT_LAG_MULTIPLIER,
38 block::{BlockDigest, BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
39 block_verifier::BlockVerifier,
40 commit_vote_monitor::CommitVoteMonitor,
41 context::Context,
42 core_thread::CoreThreadDispatcher,
43 dag_state::DagState,
44 error::{ConsensusError, ConsensusResult},
45 network::NetworkClient,
46};
47
48const FETCH_BLOCKS_CONCURRENCY: usize = 5;
50
51pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
55
56const VERIFIED_BLOCKS_CACHE_CAP: usize = 200_000;
58
59const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
61
62const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
64
65const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
69
70const MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK: usize = 1;
74
75const MAX_PERIODIC_SYNC_PEERS: usize = 4;
78
79const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
83
84#[derive(Clone)]
86enum SyncMethod {
87 Live,
88 Periodic,
89}
90
91struct BlocksGuard {
92 map: Arc<InflightBlocksMap>,
93 block_refs: BTreeSet<BlockRef>,
94 peer: AuthorityIndex,
95 method: SyncMethod,
96}
97
98impl Drop for BlocksGuard {
99 fn drop(&mut self) {
100 self.map.unlock_blocks(&self.block_refs, self.peer);
101 }
102}
103
104struct InflightBlocksMap {
110 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
111}
112
113impl InflightBlocksMap {
114 fn new() -> Arc<Self> {
115 Arc::new(Self {
116 inner: Mutex::new(HashMap::new()),
117 })
118 }
119
120 fn lock_blocks(
134 self: &Arc<Self>,
135 missing_block_refs: BTreeSet<BlockRef>,
136 peer: AuthorityIndex,
137 method: SyncMethod,
138 ) -> Option<BlocksGuard> {
139 let mut blocks = BTreeSet::new();
140 let mut inner = self.inner.lock();
141
142 for block_ref in missing_block_refs {
143 let authorities = inner.entry(block_ref).or_default();
144
145 if authorities.contains(&peer) {
147 continue;
148 }
149
150 let total_count = authorities.len();
152
153 let max_limit = match method {
155 SyncMethod::Live => MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK,
156 SyncMethod::Periodic => MAX_AUTHORITIES_TO_FETCH_PER_BLOCK,
157 };
158
159 if total_count < max_limit {
161 assert!(authorities.insert(peer));
162 blocks.insert(block_ref);
163 }
164 }
165
166 if blocks.is_empty() {
167 None
168 } else {
169 Some(BlocksGuard {
170 map: self.clone(),
171 block_refs: blocks,
172 peer,
173 method,
174 })
175 }
176 }
177
178 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
183 let mut blocks_to_fetch = self.inner.lock();
185 for block_ref in block_refs {
186 let authorities = blocks_to_fetch
187 .get_mut(block_ref)
188 .expect("Should have found a non empty map");
189
190 assert!(authorities.remove(&peer), "Peer index should be present!");
191
192 if authorities.is_empty() {
194 blocks_to_fetch.remove(block_ref);
195 }
196 }
197 }
198
199 fn swap_locks(
204 self: &Arc<Self>,
205 blocks_guard: BlocksGuard,
206 peer: AuthorityIndex,
207 ) -> Option<BlocksGuard> {
208 let block_refs = blocks_guard.block_refs.clone();
209 let method = blocks_guard.method.clone();
210
211 drop(blocks_guard);
213
214 self.lock_blocks(block_refs, peer, method)
216 }
217
218 #[cfg(test)]
219 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
220 let inner = self.inner.lock();
221 inner.len()
222 }
223}
224
225enum Command {
226 FetchBlocks {
227 missing_block_refs: BTreeSet<BlockRef>,
228 peer_index: AuthorityIndex,
229 result: oneshot::Sender<Result<(), ConsensusError>>,
230 },
231 FetchOwnLastBlock,
232 KickOffScheduler,
233}
234
235pub(crate) struct SynchronizerHandle {
236 commands_sender: Sender<Command>,
237 tasks: tokio::sync::Mutex<JoinSet<()>>,
238}
239
240impl SynchronizerHandle {
241 pub(crate) async fn fetch_blocks(
244 &self,
245 missing_block_refs: BTreeSet<BlockRef>,
246 peer_index: AuthorityIndex,
247 ) -> ConsensusResult<()> {
248 let (sender, receiver) = oneshot::channel();
249 self.commands_sender
250 .send(Command::FetchBlocks {
251 missing_block_refs,
252 peer_index,
253 result: sender,
254 })
255 .await
256 .map_err(|_err| ConsensusError::Shutdown)?;
257 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
258 }
259
260 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
261 let mut tasks = self.tasks.lock().await;
262 tasks.abort_all();
263 while let Some(result) = tasks.join_next().await {
264 result?
265 }
266 Ok(())
267 }
268}
269
270pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
297 context: Arc<Context>,
298 commands_receiver: Receiver<Command>,
299 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
300 core_dispatcher: Arc<D>,
301 commit_vote_monitor: Arc<CommitVoteMonitor>,
302 dag_state: Arc<RwLock<DagState>>,
303 fetch_blocks_scheduler_task: JoinSet<()>,
304 fetch_own_last_block_task: JoinSet<()>,
305 network_client: Arc<C>,
306 block_verifier: Arc<V>,
307 inflight_blocks_map: Arc<InflightBlocksMap>,
308 verified_blocks_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
309 commands_sender: Sender<Command>,
310}
311
312impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
313 pub fn start(
316 network_client: Arc<C>,
317 context: Arc<Context>,
318 core_dispatcher: Arc<D>,
319 commit_vote_monitor: Arc<CommitVoteMonitor>,
320 block_verifier: Arc<V>,
321 dag_state: Arc<RwLock<DagState>>,
322 sync_last_known_own_block: bool,
323 ) -> Arc<SynchronizerHandle> {
324 let (commands_sender, commands_receiver) =
325 channel("consensus_synchronizer_commands", 1_000);
326 let inflight_blocks_map = InflightBlocksMap::new();
327 let verified_blocks_cache = Arc::new(Mutex::new(LruCache::new(
328 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
329 )));
330
331 let mut fetch_block_senders = BTreeMap::new();
333 let mut tasks = JoinSet::new();
334 for (index, _) in context.committee.authorities() {
335 if index == context.own_index {
336 continue;
337 }
338 let (sender, receiver) =
339 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
340 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
341 index,
342 network_client.clone(),
343 block_verifier.clone(),
344 verified_blocks_cache.clone(),
345 commit_vote_monitor.clone(),
346 context.clone(),
347 core_dispatcher.clone(),
348 dag_state.clone(),
349 receiver,
350 commands_sender.clone(),
351 );
352 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
353 fetch_block_senders.insert(index, sender);
354 }
355
356 let commands_sender_clone = commands_sender.clone();
357
358 if sync_last_known_own_block {
359 commands_sender
360 .try_send(Command::FetchOwnLastBlock)
361 .expect("Failed to sync our last block");
362 }
363
364 tasks.spawn(monitored_future!(async move {
366 let mut s = Self {
367 context,
368 commands_receiver,
369 fetch_block_senders,
370 core_dispatcher,
371 commit_vote_monitor,
372 fetch_blocks_scheduler_task: JoinSet::new(),
373 fetch_own_last_block_task: JoinSet::new(),
374 network_client,
375 block_verifier,
376 inflight_blocks_map,
377 verified_blocks_cache,
378 commands_sender: commands_sender_clone,
379 dag_state,
380 };
381 s.run().await;
382 }));
383
384 Arc::new(SynchronizerHandle {
385 commands_sender,
386 tasks: tokio::sync::Mutex::new(tasks),
387 })
388 }
389
390 async fn run(&mut self) {
392 const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
395 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
396
397 tokio::pin!(scheduler_timeout);
398
399 loop {
400 tokio::select! {
401 Some(command) = self.commands_receiver.recv() => {
402 match command {
403 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
404 if peer_index == self.context.own_index {
405 error!("We should never attempt to fetch blocks from our own node");
406 continue;
407 }
408
409 let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
410
411 let missing_block_refs = missing_block_refs
414 .into_iter()
415 .take(self.context.parameters.max_blocks_per_sync)
416 .collect();
417
418 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index, SyncMethod::Live);
419 let Some(blocks_guard) = blocks_guard else {
420 result.send(Ok(())).ok();
421 continue;
422 };
423
424 let r = self
427 .fetch_block_senders
428 .get(&peer_index)
429 .expect("Fatal error, sender should be present")
430 .try_send(blocks_guard)
431 .map_err(|err| {
432 match err {
433 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
434 TrySendError::Closed(_) => ConsensusError::Shutdown
435 }
436 });
437
438 result.send(r).ok();
439 }
440 Command::FetchOwnLastBlock => {
441 if self.fetch_own_last_block_task.is_empty() {
442 self.start_fetch_own_last_block_task();
443 }
444 }
445 Command::KickOffScheduler => {
446 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
449 Instant::now()
450 } else {
451 Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
452 };
453
454 if timeout < scheduler_timeout.deadline() {
456 scheduler_timeout.as_mut().reset(timeout);
457 }
458 }
459 }
460 },
461 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
462 match result {
463 Ok(()) => {},
464 Err(e) => {
465 if e.is_cancelled() {
466 } else if e.is_panic() {
467 std::panic::resume_unwind(e.into_panic());
468 } else {
469 panic!("fetch our last block task failed: {e}");
470 }
471 },
472 };
473 },
474 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
475 match result {
476 Ok(()) => {},
477 Err(e) => {
478 if e.is_cancelled() {
479 } else if e.is_panic() {
480 std::panic::resume_unwind(e.into_panic());
481 } else {
482 panic!("fetch blocks scheduler task failed: {e}");
483 }
484 },
485 };
486 },
487 () = &mut scheduler_timeout => {
488 if self.fetch_blocks_scheduler_task.is_empty() {
490 if let Err(err) = self.start_fetch_missing_blocks_task().await {
491 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
492 return;
493 };
494 }
495
496 scheduler_timeout
497 .as_mut()
498 .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
499 }
500 }
501 }
502 }
503
504 async fn fetch_blocks_from_authority(
505 peer_index: AuthorityIndex,
506 network_client: Arc<C>,
507 block_verifier: Arc<V>,
508 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
509 commit_vote_monitor: Arc<CommitVoteMonitor>,
510 context: Arc<Context>,
511 core_dispatcher: Arc<D>,
512 dag_state: Arc<RwLock<DagState>>,
513 mut receiver: Receiver<BlocksGuard>,
514 commands_sender: Sender<Command>,
515 ) {
516 const MAX_RETRIES: u32 = 3;
517 let peer_hostname = &context.committee.authority(peer_index).hostname;
518
519 let mut requests = FuturesUnordered::new();
520
521 loop {
522 tokio::select! {
523 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
524 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
526
527 let metrics = &context.metrics.node_metrics;
529 metrics
530 .synchronizer_requested_blocks_by_peer
531 .with_label_values(&[peer_hostname.as_str(), "live"])
532 .inc_by(blocks_guard.block_refs.len() as u64);
533 let mut authors = HashSet::new();
535 for block_ref in &blocks_guard.block_refs {
536 authors.insert(block_ref.author);
537 }
538 for author in authors {
539 let host = &context.committee.authority(author).hostname;
540 metrics
541 .synchronizer_requested_blocks_by_authority
542 .with_label_values(&[host.as_str(), "live"])
543 .inc();
544 }
545
546 requests.push(Self::fetch_blocks_request(
547 network_client.clone(),
548 peer_index,
549 blocks_guard,
550 highest_rounds,
551 FETCH_REQUEST_TIMEOUT,
552 1,
553 ))
554 },
555 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
556 match response {
557 Ok(blocks) => {
558 if let Err(err) = Self::process_fetched_blocks(blocks,
559 peer_index,
560 blocks_guard,
561 core_dispatcher.clone(),
562 block_verifier.clone(),
563 verified_cache.clone(),
564 commit_vote_monitor.clone(),
565 context.clone(),
566 commands_sender.clone(),
567 "live"
568 ).await {
569 context.metrics.update_scoring_metrics_on_block_receival(
570 peer_index,
571 peer_hostname,
572 err.clone(),
573 "process_fetched_blocks",
574 );
575 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
576 context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
577 }
578 },
579 Err(_) => {
580 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
581 if retries <= MAX_RETRIES {
582 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
583 } else {
584 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
585 drop(blocks_guard);
587 }
588 }
589 }
590 },
591 else => {
592 info!("Fetching blocks from authority {peer_index} task will now abort.");
593 break;
594 }
595 }
596 }
597 }
598
599 async fn process_fetched_blocks(
603 mut serialized_blocks: Vec<Bytes>,
604 peer_index: AuthorityIndex,
605 requested_blocks_guard: BlocksGuard,
606 core_dispatcher: Arc<D>,
607 block_verifier: Arc<V>,
608 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
609 commit_vote_monitor: Arc<CommitVoteMonitor>,
610 context: Arc<Context>,
611 commands_sender: Sender<Command>,
612 sync_method: &str,
613 ) -> ConsensusResult<()> {
614 if serialized_blocks.is_empty() {
615 return Ok(());
616 }
617 let _s = context
618 .metrics
619 .node_metrics
620 .scope_processing_time
621 .with_label_values(&["Synchronizer::process_fetched_blocks"])
622 .start_timer();
623
624 if context.protocol_config.consensus_batched_block_sync() {
626 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
627 } else {
628 if serialized_blocks.len()
631 > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
632 {
633 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
634 }
635 }
636
637 let blocks = Handle::current()
639 .spawn_blocking({
640 let block_verifier = block_verifier.clone();
641 let verified_cache = verified_cache.clone();
642 let context = context.clone();
643 let sync_method = sync_method.to_string();
644 move || {
645 Self::verify_blocks(
646 serialized_blocks,
647 block_verifier,
648 verified_cache,
649 &context,
650 peer_index,
651 &sync_method,
652 )
653 }
654 })
655 .await
656 .expect("Spawn blocking should not fail")?;
657
658 if !context.protocol_config.consensus_batched_block_sync() {
659 let ancestors = blocks
661 .iter()
662 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
663 .flat_map(|b| b.ancestors().to_vec())
664 .collect::<BTreeSet<BlockRef>>();
665
666 for block in &blocks {
669 if !requested_blocks_guard
670 .block_refs
671 .contains(&block.reference())
672 && !ancestors.contains(&block.reference())
673 {
674 return Err(ConsensusError::UnexpectedFetchedBlock {
675 index: peer_index,
676 block_ref: block.reference(),
677 });
678 }
679 }
680 }
681
682 for block in &blocks {
684 commit_vote_monitor.observe_block(block);
685 }
686
687 let metrics = &context.metrics.node_metrics;
688 let peer_hostname = &context.committee.authority(peer_index).hostname;
689 metrics
690 .synchronizer_fetched_blocks_by_peer
691 .with_label_values(&[peer_hostname.as_str(), sync_method])
692 .inc_by(blocks.len() as u64);
693 for block in &blocks {
694 let block_hostname = &context.committee.authority(block.author()).hostname;
695 metrics
696 .synchronizer_fetched_blocks_by_authority
697 .with_label_values(&[block_hostname.as_str(), sync_method])
698 .inc();
699 }
700
701 debug!(
702 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
703 blocks.len(),
704 blocks.iter().map(|b| b.reference().to_string()).join(", "),
705 );
706
707 let missing_blocks = core_dispatcher
711 .add_blocks(blocks)
712 .await
713 .map_err(|_| ConsensusError::Shutdown)?;
714
715 drop(requested_blocks_guard);
718
719 if !missing_blocks.is_empty() {
721 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
723 {
724 warn!("Commands channel is full")
725 }
726 }
727
728 context
729 .metrics
730 .node_metrics
731 .missing_blocks_after_fetch_total
732 .inc_by(missing_blocks.len() as u64);
733
734 Ok(())
735 }
736
737 fn get_highest_accepted_rounds(
738 dag_state: Arc<RwLock<DagState>>,
739 context: &Arc<Context>,
740 ) -> Vec<Round> {
741 let blocks = dag_state
742 .read()
743 .get_last_cached_block_per_authority(Round::MAX);
744 assert_eq!(blocks.len(), context.committee.size());
745
746 blocks
747 .into_iter()
748 .map(|(block, _)| block.round())
749 .collect::<Vec<_>>()
750 }
751
752 fn verify_blocks(
753 serialized_blocks: Vec<Bytes>,
754 block_verifier: Arc<V>,
755 verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
756 context: &Context,
757 peer_index: AuthorityIndex,
758 sync_method: &str,
759 ) -> ConsensusResult<Vec<VerifiedBlock>> {
760 let mut verified_blocks = Vec::new();
761 let mut skipped_count = 0u64;
762
763 for serialized_block in serialized_blocks {
764 let block_digest = VerifiedBlock::compute_digest(&serialized_block);
765
766 if verified_cache.lock().get(&block_digest).is_some() {
768 skipped_count += 1;
769 continue; }
771
772 let signed_block: SignedBlock =
773 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
774
775 if let Err(e) = block_verifier.verify(&signed_block) {
776 let hostname = context.committee.authority(peer_index).hostname.clone();
779
780 context
781 .metrics
782 .node_metrics
783 .invalid_blocks
784 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
785 .inc();
786 warn!("Invalid block received from {}: {}", peer_index, e);
787 return Err(e);
788 }
789
790 verified_cache.lock().put(block_digest, ());
792
793 let verified_block = VerifiedBlock::new_verified_with_digest(
794 signed_block,
795 serialized_block,
796 block_digest,
797 );
798
799 let now = context.clock.timestamp_utc_ms();
803 let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
804 if drift > 0 {
805 let peer_hostname = &context
806 .committee
807 .authority(verified_block.author())
808 .hostname;
809 context
810 .metrics
811 .node_metrics
812 .block_timestamp_drift_ms
813 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
814 .inc_by(drift);
815
816 if context
817 .protocol_config
818 .consensus_median_timestamp_with_checkpoint_enforcement()
819 {
820 trace!(
821 "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
822 verified_block.reference(),
823 verified_block.timestamp_ms(),
824 now
825 );
826 } else {
827 warn!(
828 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
829 verified_block.reference(),
830 verified_block.timestamp_ms(),
831 now
832 );
833 continue;
834 }
835 }
836
837 verified_blocks.push(verified_block);
838 }
839
840 if skipped_count > 0 {
842 let peer_hostname = &context.committee.authority(peer_index).hostname;
843 context
844 .metrics
845 .node_metrics
846 .synchronizer_skipped_blocks_by_peer
847 .with_label_values(&[peer_hostname.as_str(), sync_method])
848 .inc_by(skipped_count);
849 }
850
851 Ok(verified_blocks)
852 }
853
854 async fn fetch_blocks_request(
855 network_client: Arc<C>,
856 peer: AuthorityIndex,
857 blocks_guard: BlocksGuard,
858 highest_rounds: Vec<Round>,
859 request_timeout: Duration,
860 mut retries: u32,
861 ) -> (
862 ConsensusResult<Vec<Bytes>>,
863 BlocksGuard,
864 u32,
865 AuthorityIndex,
866 Vec<Round>,
867 ) {
868 let start = Instant::now();
869 let resp = timeout(
870 request_timeout,
871 network_client.fetch_blocks(
872 peer,
873 blocks_guard
874 .block_refs
875 .clone()
876 .into_iter()
877 .collect::<Vec<_>>(),
878 highest_rounds.clone(),
879 request_timeout,
880 ),
881 )
882 .await;
883
884 fail_point_async!("consensus-delay");
885
886 let resp = match resp {
887 Ok(Err(err)) => {
888 sleep_until(start + request_timeout).await;
891 retries += 1;
892 Err(err)
893 } Err(err) => {
895 sleep_until(start + request_timeout).await;
897 retries += 1;
898 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
899 }
900 Ok(result) => result,
901 };
902 (resp, blocks_guard, retries, peer, highest_rounds)
903 }
904
905 fn start_fetch_own_last_block_task(&mut self) {
906 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
907 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
908
909 let context = self.context.clone();
910 let dag_state = self.dag_state.clone();
911 let network_client = self.network_client.clone();
912 let block_verifier = self.block_verifier.clone();
913 let core_dispatcher = self.core_dispatcher.clone();
914
915 self.fetch_own_last_block_task
916 .spawn(monitored_future!(async move {
917 let _scope = monitored_scope("FetchOwnLastBlockTask");
918
919 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
920 let network_client_cloned = network_client.clone();
921 let own_index = context.own_index;
922 async move {
923 sleep(fetch_own_block_delay).await;
924 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
925 (r, authority_index)
926 }
927 };
928
929 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
930 let mut result = Vec::new();
931 for serialized_block in blocks {
932 let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
933 block_verifier.verify(&signed_block).tap_err(|err|{
934 let hostname = context.committee.authority(authority_index).hostname.clone();
935 context
936 .metrics
937 .node_metrics
938 .invalid_blocks
939 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
940 .inc();
941 warn!("Invalid block received from {}: {}", authority_index, err);
942 })?;
943
944 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
945 if verified_block.author() != context.own_index {
946 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
947 }
948 result.push(verified_block);
949 }
950 Ok(result)
951 };
952
953 let mut highest_round = GENESIS_ROUND;
955 let mut received_response = vec![false; context.committee.size()];
957 received_response[context.own_index] = true;
959 let mut total_stake = context.committee.stake(context.own_index);
960 let mut retries = 0;
961 let mut retry_delay_step = Duration::from_millis(500);
962 'main:loop {
963 if context.committee.size() == 1 {
964 highest_round = dag_state.read().get_last_proposed_block().round();
965 info!("Only one node in the network, will not try fetching own last block from peers.");
966 break 'main;
967 }
968
969 let mut results = FuturesUnordered::new();
971
972 for (authority_index, _authority) in context.committee.authorities() {
973 if !received_response[authority_index] {
975 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
976 }
977 }
978
979 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
981 tokio::pin!(timer);
982
983 'inner: loop {
984 tokio::select! {
985 result = results.next() => {
986 let Some((result, authority_index)) = result else {
987 break 'inner;
988 };
989 match result {
990 Ok(result) => {
991 match process_blocks(result, authority_index) {
992 Ok(blocks) => {
993 received_response[authority_index] = true;
994 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
995 highest_round = highest_round.max(max_round);
996
997 total_stake += context.committee.stake(authority_index);
998 },
999 Err(err) => {
1000 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
1001 }
1002 }
1003 },
1004 Err(err) => {
1005 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
1006 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
1007 }
1008 }
1009 },
1010 () = &mut timer => {
1011 info!("Timeout while trying to sync our own last block from peers");
1012 break 'inner;
1013 }
1014 }
1015 }
1016
1017 if context.committee.reached_quorum(total_stake) {
1019 info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1020 break 'main;
1021 } else {
1022 info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1023 }
1024
1025 retries += 1;
1026 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
1027 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
1028
1029 sleep(retry_delay_step).await;
1030
1031 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
1032 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
1033 }
1034
1035 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
1037
1038 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
1039 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
1040 }
1041 }));
1042 }
1043
1044 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
1045 let mut missing_blocks = self
1046 .core_dispatcher
1047 .get_missing_blocks()
1048 .await
1049 .map_err(|_err| ConsensusError::Shutdown)?;
1050
1051 if missing_blocks.is_empty() {
1053 return Ok(());
1054 }
1055
1056 let context = self.context.clone();
1057 let network_client = self.network_client.clone();
1058 let block_verifier = self.block_verifier.clone();
1059 let verified_cache = self.verified_blocks_cache.clone();
1060 let commit_vote_monitor = self.commit_vote_monitor.clone();
1061 let core_dispatcher = self.core_dispatcher.clone();
1062 let blocks_to_fetch = self.inflight_blocks_map.clone();
1063 let commands_sender = self.commands_sender.clone();
1064 let dag_state = self.dag_state.clone();
1065
1066 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
1067 trace!(
1068 "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
1069 );
1070 if commit_lagging {
1071 if dag_state.read().gc_enabled() {
1076 return Ok(());
1077 }
1078
1079 let highest_accepted_round = dag_state.read().highest_accepted_round();
1083 missing_blocks = missing_blocks
1084 .into_iter()
1085 .take_while(|(block_ref, _)| {
1086 block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
1087 })
1088 .collect::<BTreeMap<_, _>>();
1089
1090 if missing_blocks.is_empty() {
1093 trace!(
1094 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1095 );
1096 self.context
1097 .metrics
1098 .node_metrics
1099 .fetch_blocks_scheduler_skipped
1100 .with_label_values(&["commit_lagging"])
1101 .inc();
1102 return Ok(());
1103 }
1104 }
1105
1106 self.fetch_blocks_scheduler_task
1107 .spawn(monitored_future!(async move {
1108 let _scope = monitored_scope("FetchMissingBlocksScheduler");
1109
1110 context
1111 .metrics
1112 .node_metrics
1113 .fetch_blocks_scheduler_inflight
1114 .inc();
1115 let total_requested = missing_blocks.len();
1116
1117 fail_point_async!("consensus-delay");
1118
1119 let results = Self::fetch_blocks_from_authorities(
1121 context.clone(),
1122 blocks_to_fetch.clone(),
1123 network_client,
1124 missing_blocks,
1125 dag_state,
1126 )
1127 .await;
1128 context
1129 .metrics
1130 .node_metrics
1131 .fetch_blocks_scheduler_inflight
1132 .dec();
1133 if results.is_empty() {
1134 warn!("No results returned while requesting missing blocks");
1135 return;
1136 }
1137
1138 let mut total_fetched = 0;
1140 for (blocks_guard, fetched_blocks, peer) in results {
1141 total_fetched += fetched_blocks.len();
1142
1143 if let Err(err) = Self::process_fetched_blocks(
1144 fetched_blocks,
1145 peer,
1146 blocks_guard,
1147 core_dispatcher.clone(),
1148 block_verifier.clone(),
1149 verified_cache.clone(),
1150 commit_vote_monitor.clone(),
1151 context.clone(),
1152 commands_sender.clone(),
1153 "periodic",
1154 )
1155 .await
1156 {
1157 warn!(
1158 "Error occurred while processing fetched blocks from peer {peer}: {err}"
1159 );
1160 }
1161 }
1162
1163 debug!(
1164 "Total blocks requested to fetch: {}, total fetched: {}",
1165 total_requested, total_fetched
1166 );
1167 }));
1168 Ok(())
1169 }
1170
1171 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1172 let last_commit_index = self.dag_state.read().last_commit_index();
1173 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1174 let commit_threshold = last_commit_index
1175 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1176 (
1177 commit_threshold < quorum_commit_index,
1178 last_commit_index,
1179 quorum_commit_index,
1180 )
1181 }
1182
1183 fn missing_block_round_threshold(&self) -> Round {
1190 self.context.parameters.commit_sync_batch_size
1191 }
1192
1193 async fn fetch_blocks_from_authorities(
1208 context: Arc<Context>,
1209 inflight_blocks: Arc<InflightBlocksMap>,
1210 network_client: Arc<C>,
1211 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1212 dag_state: Arc<RwLock<DagState>>,
1213 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1214 let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1216 for (missing_block_ref, authorities) in &missing_blocks {
1217 for author in authorities {
1218 if author == &context.own_index {
1219 continue;
1221 }
1222 authority_to_blocks
1223 .entry(*author)
1224 .or_default()
1225 .push(*missing_block_ref);
1226 }
1227 }
1228
1229 #[cfg(not(test))]
1233 let mut rng = StdRng::from_entropy();
1234
1235 #[cfg(not(test))]
1238 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1239 authority_to_blocks
1240 .iter()
1241 .choose_multiple(
1242 &mut rng,
1243 MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1244 )
1245 .into_iter()
1246 .map(|(&peer, blocks)| {
1247 let limited_blocks = blocks
1248 .iter()
1249 .copied()
1250 .take(context.parameters.max_blocks_per_sync)
1251 .collect();
1252 (peer, limited_blocks, "periodic_known")
1253 })
1254 .collect();
1255 #[cfg(test)]
1256 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1258 let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1259 .iter()
1260 .map(|(&peer, blocks)| {
1261 let limited_blocks = blocks
1262 .iter()
1263 .copied()
1264 .take(context.parameters.max_blocks_per_sync)
1265 .collect();
1266 (peer, limited_blocks, "periodic_known")
1267 })
1268 .collect();
1269 items.sort_by_key(|(peer, _, _)| *peer);
1272 items
1273 .into_iter()
1274 .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1275 .collect()
1276 };
1277
1278 let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1281 .iter()
1282 .map(|(peer, _, _)| *peer)
1283 .collect();
1284
1285 let random_candidates: Vec<_> = context
1286 .committee
1287 .authorities()
1288 .filter_map(|(peer_index, _)| {
1289 (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1290 .then_some(peer_index)
1291 })
1292 .collect();
1293 #[cfg(test)]
1294 let random_peers: Vec<AuthorityIndex> = random_candidates
1295 .into_iter()
1296 .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1297 .collect();
1298 #[cfg(not(test))]
1299 let random_peers: Vec<AuthorityIndex> = random_candidates
1300 .into_iter()
1301 .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1302
1303 #[cfg_attr(test, allow(unused_mut))]
1304 let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1305 #[cfg(not(test))]
1308 all_missing_blocks.shuffle(&mut rng);
1309
1310 let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1311
1312 for peer in random_peers {
1313 if let Some(chunk) = block_chunks.next() {
1314 chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1315 } else {
1316 break;
1317 }
1318 }
1319
1320 let mut request_futures = FuturesUnordered::new();
1321
1322 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1323
1324 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1326 for block in &all_missing_blocks {
1327 missing_blocks_per_authority[block.author] += 1;
1328 }
1329 for (missing, (_, authority)) in missing_blocks_per_authority
1330 .into_iter()
1331 .zip(context.committee.authorities())
1332 {
1333 context
1334 .metrics
1335 .node_metrics
1336 .synchronizer_missing_blocks_by_authority
1337 .with_label_values(&[&authority.hostname])
1338 .inc_by(missing as u64);
1339 context
1340 .metrics
1341 .node_metrics
1342 .synchronizer_current_missing_blocks_by_authority
1343 .with_label_values(&[&authority.hostname])
1344 .set(missing as i64);
1345 }
1346
1347 #[cfg_attr(test, expect(unused_mut))]
1350 let mut remaining_peers: Vec<_> = context
1351 .committee
1352 .authorities()
1353 .filter_map(|(peer_index, _)| {
1354 if peer_index != context.own_index
1355 && !chosen_peers_with_blocks
1356 .iter()
1357 .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1358 {
1359 Some(peer_index)
1360 } else {
1361 None
1362 }
1363 })
1364 .collect();
1365
1366 #[cfg(not(test))]
1367 remaining_peers.shuffle(&mut rng);
1368 let mut remaining_peers = remaining_peers.into_iter();
1369
1370 for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1372 let peer_hostname = &context.committee.authority(peer).hostname;
1373 let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1374
1375 if let Some(blocks_guard) =
1378 inflight_blocks.lock_blocks(block_refs.clone(), peer, SyncMethod::Periodic)
1379 {
1380 info!(
1381 "Periodic sync of {} missing blocks from peer {} {}: {}",
1382 block_refs.len(),
1383 peer,
1384 peer_hostname,
1385 block_refs
1386 .iter()
1387 .map(|b| b.to_string())
1388 .collect::<Vec<_>>()
1389 .join(", ")
1390 );
1391 let metrics = &context.metrics.node_metrics;
1393 metrics
1394 .synchronizer_requested_blocks_by_peer
1395 .with_label_values(&[peer_hostname.as_str(), label])
1396 .inc_by(block_refs.len() as u64);
1397 for block_ref in &block_refs {
1398 let block_hostname = &context.committee.authority(block_ref.author).hostname;
1399 metrics
1400 .synchronizer_requested_blocks_by_authority
1401 .with_label_values(&[block_hostname.as_str(), label])
1402 .inc();
1403 }
1404 request_futures.push(Self::fetch_blocks_request(
1405 network_client.clone(),
1406 peer,
1407 blocks_guard,
1408 highest_rounds.clone(),
1409 FETCH_REQUEST_TIMEOUT,
1410 1,
1411 ));
1412 }
1413 }
1414
1415 let mut results = Vec::new();
1416 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1417
1418 tokio::pin!(fetcher_timeout);
1419
1420 loop {
1421 tokio::select! {
1422 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1423 let peer_hostname = &context.committee.authority(peer_index).hostname;
1424 match response {
1425 Ok(fetched_blocks) => {
1426 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1427 results.push((blocks_guard, fetched_blocks, peer_index));
1428
1429 if request_futures.is_empty() {
1431 break;
1432 }
1433 },
1434 Err(_) => {
1435 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1436 if let Some(next_peer) = remaining_peers.next() {
1438 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1440 info!(
1441 "Retrying syncing {} missing blocks from peer {}: {}",
1442 blocks_guard.block_refs.len(),
1443 peer_hostname,
1444 blocks_guard.block_refs
1445 .iter()
1446 .map(|b| b.to_string())
1447 .collect::<Vec<_>>()
1448 .join(", ")
1449 );
1450 let block_refs = blocks_guard.block_refs.clone();
1451 let metrics = &context.metrics.node_metrics;
1453 metrics
1454 .synchronizer_requested_blocks_by_peer
1455 .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1456 .inc_by(block_refs.len() as u64);
1457 for block_ref in &block_refs {
1458 let block_hostname =
1459 &context.committee.authority(block_ref.author).hostname;
1460 metrics
1461 .synchronizer_requested_blocks_by_authority
1462 .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1463 .inc();
1464 }
1465 request_futures.push(Self::fetch_blocks_request(
1466 network_client.clone(),
1467 next_peer,
1468 blocks_guard,
1469 highest_rounds,
1470 FETCH_REQUEST_TIMEOUT,
1471 1,
1472 ));
1473 } else {
1474 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1475 }
1476 } else {
1477 debug!("No more peers left to fetch blocks");
1478 }
1479 }
1480 }
1481 },
1482 _ = &mut fetcher_timeout => {
1483 debug!("Timed out while fetching missing blocks");
1484 break;
1485 }
1486 }
1487 }
1488
1489 results
1490 }
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495 use std::{
1496 collections::{BTreeMap, BTreeSet},
1497 num::NonZeroUsize,
1498 sync::Arc,
1499 time::Duration,
1500 };
1501
1502 use async_trait::async_trait;
1503 use bytes::Bytes;
1504 use consensus_config::{AuthorityIndex, Parameters};
1505 use iota_metrics::monitored_mpsc;
1506 use lru::LruCache;
1507 use parking_lot::{Mutex as SyncMutex, RwLock};
1508 use tokio::{sync::Mutex, time::sleep};
1509
1510 use crate::{
1511 CommitDigest, CommitIndex,
1512 authority_service::COMMIT_LAG_MULTIPLIER,
1513 block::{BlockDigest, BlockRef, Round, SignedBlock, TestBlock, VerifiedBlock},
1514 block_verifier::{BlockVerifier, NoopBlockVerifier},
1515 commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1516 commit_vote_monitor::CommitVoteMonitor,
1517 context::Context,
1518 core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1519 dag_state::DagState,
1520 error::{ConsensusError, ConsensusResult},
1521 network::{BlockStream, NetworkClient},
1522 round_prober::QuorumRound,
1523 storage::mem_store::MemStore,
1524 synchronizer::{
1525 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, SyncMethod,
1526 Synchronizer, VERIFIED_BLOCKS_CACHE_CAP,
1527 },
1528 };
1529
1530 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1531 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1532 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1533 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1534
1535 struct FailingBlockVerifier;
1537
1538 impl BlockVerifier for FailingBlockVerifier {
1539 fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1540 Err(ConsensusError::WrongEpoch {
1541 expected: 1,
1542 actual: 0,
1543 })
1544 }
1545
1546 fn check_ancestors(
1547 &self,
1548 _block: &VerifiedBlock,
1549 _ancestors: &[Option<VerifiedBlock>],
1550 _gc_enabled: bool,
1551 _gc_round: Round,
1552 ) -> ConsensusResult<()> {
1553 Ok(())
1554 }
1555 }
1556
1557 #[derive(Default)]
1558 struct MockNetworkClient {
1559 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1560 fetch_latest_blocks_requests:
1561 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1562 }
1563
1564 impl MockNetworkClient {
1565 async fn stub_fetch_blocks(
1566 &self,
1567 blocks: Vec<VerifiedBlock>,
1568 peer: AuthorityIndex,
1569 latency: Option<Duration>,
1570 ) {
1571 let mut lock = self.fetch_blocks_requests.lock().await;
1572 let block_refs = blocks
1573 .iter()
1574 .map(|block| block.reference())
1575 .collect::<Vec<_>>();
1576 lock.insert((block_refs, peer), (blocks, latency));
1577 }
1578
1579 async fn stub_fetch_latest_blocks(
1580 &self,
1581 blocks: Vec<VerifiedBlock>,
1582 peer: AuthorityIndex,
1583 authorities: Vec<AuthorityIndex>,
1584 latency: Option<Duration>,
1585 ) {
1586 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1587 lock.entry((peer, authorities))
1588 .or_default()
1589 .push((blocks, latency));
1590 }
1591
1592 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1593 let lock = self.fetch_latest_blocks_requests.lock().await;
1594 lock.len()
1595 }
1596 }
1597
1598 #[async_trait]
1599 impl NetworkClient for MockNetworkClient {
1600 const SUPPORT_STREAMING: bool = false;
1601
1602 async fn send_block(
1603 &self,
1604 _peer: AuthorityIndex,
1605 _serialized_block: &VerifiedBlock,
1606 _timeout: Duration,
1607 ) -> ConsensusResult<()> {
1608 unimplemented!("Unimplemented")
1609 }
1610
1611 async fn subscribe_blocks(
1612 &self,
1613 _peer: AuthorityIndex,
1614 _last_received: Round,
1615 _timeout: Duration,
1616 ) -> ConsensusResult<BlockStream> {
1617 unimplemented!("Unimplemented")
1618 }
1619
1620 async fn fetch_blocks(
1621 &self,
1622 peer: AuthorityIndex,
1623 block_refs: Vec<BlockRef>,
1624 _highest_accepted_rounds: Vec<Round>,
1625 _timeout: Duration,
1626 ) -> ConsensusResult<Vec<Bytes>> {
1627 let mut lock = self.fetch_blocks_requests.lock().await;
1628 let response = lock
1629 .remove(&(block_refs, peer))
1630 .expect("Unexpected fetch blocks request made");
1631
1632 let serialised = response
1633 .0
1634 .into_iter()
1635 .map(|block| block.serialized().clone())
1636 .collect::<Vec<_>>();
1637
1638 drop(lock);
1639
1640 if let Some(latency) = response.1 {
1641 sleep(latency).await;
1642 }
1643
1644 Ok(serialised)
1645 }
1646
1647 async fn fetch_commits(
1648 &self,
1649 _peer: AuthorityIndex,
1650 _commit_range: CommitRange,
1651 _timeout: Duration,
1652 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1653 unimplemented!("Unimplemented")
1654 }
1655
1656 async fn fetch_latest_blocks(
1657 &self,
1658 peer: AuthorityIndex,
1659 authorities: Vec<AuthorityIndex>,
1660 _timeout: Duration,
1661 ) -> ConsensusResult<Vec<Bytes>> {
1662 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1663 let mut responses = lock
1664 .remove(&(peer, authorities.clone()))
1665 .expect("Unexpected fetch blocks request made");
1666
1667 let response = responses.remove(0);
1668 let serialised = response
1669 .0
1670 .into_iter()
1671 .map(|block| block.serialized().clone())
1672 .collect::<Vec<_>>();
1673
1674 if !responses.is_empty() {
1675 lock.insert((peer, authorities), responses);
1676 }
1677
1678 drop(lock);
1679
1680 if let Some(latency) = response.1 {
1681 sleep(latency).await;
1682 }
1683
1684 Ok(serialised)
1685 }
1686
1687 async fn get_latest_rounds(
1688 &self,
1689 _peer: AuthorityIndex,
1690 _timeout: Duration,
1691 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1692 unimplemented!("Unimplemented")
1693 }
1694 }
1695
1696 #[test]
1697 fn test_inflight_blocks_map() {
1698 let map = InflightBlocksMap::new();
1700 let some_block_refs = [
1701 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1702 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1703 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1704 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1705 ];
1706 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1707
1708 {
1710 let mut all_guards = Vec::new();
1711
1712 for i in 1..=2 {
1714 let authority = AuthorityIndex::new_for_test(i);
1715
1716 let guard =
1717 map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1718 let guard = guard.expect("Guard should be created");
1719 assert_eq!(guard.block_refs.len(), 4);
1720
1721 all_guards.push(guard);
1722
1723 let guard =
1725 map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1726 assert!(guard.is_none());
1727 }
1728
1729 let authority_3 = AuthorityIndex::new_for_test(3);
1732
1733 let guard = map.lock_blocks(
1734 missing_block_refs.clone(),
1735 authority_3,
1736 SyncMethod::Periodic,
1737 );
1738 assert!(guard.is_none());
1739
1740 drop(all_guards.remove(0));
1743
1744 let guard = map.lock_blocks(
1745 missing_block_refs.clone(),
1746 authority_3,
1747 SyncMethod::Periodic,
1748 );
1749 let guard = guard.expect("Guard should be successfully acquired");
1750
1751 assert_eq!(guard.block_refs, missing_block_refs);
1752
1753 drop(guard);
1755 drop(all_guards);
1756
1757 assert_eq!(map.num_of_locked_blocks(), 0);
1758 }
1759
1760 {
1762 let authority_1 = AuthorityIndex::new_for_test(1);
1764 let guard = map
1765 .lock_blocks(
1766 missing_block_refs.clone(),
1767 authority_1,
1768 SyncMethod::Periodic,
1769 )
1770 .unwrap();
1771
1772 let authority_2 = AuthorityIndex::new_for_test(2);
1774 let guard = map.swap_locks(guard, authority_2);
1775
1776 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1777 }
1778 }
1779
1780 #[test]
1781 fn test_inflight_blocks_map_live_sync_limit() {
1782 let map = InflightBlocksMap::new();
1784 let some_block_refs = [
1785 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1786 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1787 ];
1788 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1789
1790 let authority_1 = AuthorityIndex::new_for_test(1);
1792 let guard_1 = map
1793 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1794 .expect("Should successfully lock with Live sync");
1795
1796 assert_eq!(guard_1.block_refs.len(), 2);
1797
1798 let authority_2 = AuthorityIndex::new_for_test(2);
1800 let guard_2 = map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1801
1802 assert!(
1803 guard_2.is_none(),
1804 "Should fail to lock - Live limit of 1 reached"
1805 );
1806
1807 drop(guard_1);
1809
1810 let guard_2 = map
1812 .lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live)
1813 .expect("Should successfully lock after authority 1 released");
1814
1815 assert_eq!(guard_2.block_refs.len(), 2);
1816 }
1817
1818 #[test]
1819 fn test_inflight_blocks_map_periodic_allows_more_concurrency() {
1820 let map = InflightBlocksMap::new();
1822 let some_block_refs = [
1823 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1824 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1825 ];
1826 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1827
1828 let authority_1 = AuthorityIndex::new_for_test(1);
1830 let guard_1 = map
1831 .lock_blocks(
1832 missing_block_refs.clone(),
1833 authority_1,
1834 SyncMethod::Periodic,
1835 )
1836 .expect("Should successfully lock with Periodic sync");
1837
1838 assert_eq!(guard_1.block_refs.len(), 2);
1839
1840 let authority_2 = AuthorityIndex::new_for_test(2);
1842 let guard_2 = map
1843 .lock_blocks(
1844 missing_block_refs.clone(),
1845 authority_2,
1846 SyncMethod::Periodic,
1847 )
1848 .expect("Should successfully lock - Periodic allows 2 authorities");
1849
1850 assert_eq!(guard_2.block_refs.len(), 2);
1851
1852 let authority_3 = AuthorityIndex::new_for_test(3);
1854 let guard_3 = map.lock_blocks(
1855 missing_block_refs.clone(),
1856 authority_3,
1857 SyncMethod::Periodic,
1858 );
1859
1860 assert!(
1861 guard_3.is_none(),
1862 "Should fail to lock - Periodic limit of 2 reached"
1863 );
1864
1865 drop(guard_1);
1867
1868 let guard_3 = map
1870 .lock_blocks(
1871 missing_block_refs.clone(),
1872 authority_3,
1873 SyncMethod::Periodic,
1874 )
1875 .expect("Should successfully lock after authority 1 released");
1876
1877 assert_eq!(guard_3.block_refs.len(), 2);
1878 }
1879
1880 #[test]
1881 fn test_inflight_blocks_map_periodic_blocks_live_when_at_live_limit() {
1882 let map = InflightBlocksMap::new();
1884 let some_block_refs = [
1885 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1886 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1887 ];
1888 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1889
1890 let authority_1 = AuthorityIndex::new_for_test(1);
1892 let guard_1 = map
1893 .lock_blocks(
1894 missing_block_refs.clone(),
1895 authority_1,
1896 SyncMethod::Periodic,
1897 )
1898 .expect("Should successfully lock with Periodic sync");
1899
1900 assert_eq!(guard_1.block_refs.len(), 2);
1901
1902 let authority_2 = AuthorityIndex::new_for_test(2);
1905 let guard_2_live =
1906 map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1907
1908 assert!(
1909 guard_2_live.is_none(),
1910 "Should fail to lock with Live - total already at Live limit of 1"
1911 );
1912
1913 let guard_2_periodic = map
1916 .lock_blocks(
1917 missing_block_refs.clone(),
1918 authority_2,
1919 SyncMethod::Periodic,
1920 )
1921 .expect("Should successfully lock with Periodic - under Periodic limit of 2");
1922
1923 assert_eq!(guard_2_periodic.block_refs.len(), 2);
1924 }
1925
1926 #[test]
1927 fn test_inflight_blocks_map_live_then_periodic_interaction() {
1928 let map = InflightBlocksMap::new();
1930 let some_block_refs = [
1931 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1932 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1933 ];
1934 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1935
1936 let authority_1 = AuthorityIndex::new_for_test(1);
1938 let guard_1 = map
1939 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1940 .expect("Should successfully lock with Live sync");
1941
1942 assert_eq!(guard_1.block_refs.len(), 2);
1943
1944 let authority_2 = AuthorityIndex::new_for_test(2);
1946 let guard_2_live =
1947 map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1948
1949 assert!(
1950 guard_2_live.is_none(),
1951 "Should fail to lock with Live - would exceed Live limit of 1"
1952 );
1953
1954 let guard_2 = map
1956 .lock_blocks(
1957 missing_block_refs.clone(),
1958 authority_2,
1959 SyncMethod::Periodic,
1960 )
1961 .expect("Should successfully lock with Periodic - total 2 is at Periodic limit");
1962
1963 assert_eq!(guard_2.block_refs.len(), 2);
1964
1965 let authority_3 = AuthorityIndex::new_for_test(3);
1968 let guard_3 = map.lock_blocks(
1969 missing_block_refs.clone(),
1970 authority_3,
1971 SyncMethod::Periodic,
1972 );
1973
1974 assert!(
1975 guard_3.is_none(),
1976 "Should fail to lock with Periodic - would exceed Periodic limit of 2"
1977 );
1978 }
1979
1980 #[test]
1981 fn test_inflight_blocks_map_partial_locks_mixed_methods() {
1982 let map = InflightBlocksMap::new();
1984 let block_a = BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1985 let block_b = BlockRef::new(2, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1986 let block_c = BlockRef::new(3, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1987 let block_d = BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1988
1989 let guard_a = map
1991 .lock_blocks(
1992 [block_a].into(),
1993 AuthorityIndex::new_for_test(1),
1994 SyncMethod::Live,
1995 )
1996 .expect("Should lock block A");
1997 assert_eq!(guard_a.block_refs.len(), 1);
1998
1999 let guard_b1 = map
2001 .lock_blocks(
2002 [block_b].into(),
2003 AuthorityIndex::new_for_test(1),
2004 SyncMethod::Periodic,
2005 )
2006 .expect("Should lock block B");
2007 let guard_b2 = map
2008 .lock_blocks(
2009 [block_b].into(),
2010 AuthorityIndex::new_for_test(2),
2011 SyncMethod::Periodic,
2012 )
2013 .expect("Should lock block B again");
2014 assert_eq!(guard_b1.block_refs.len(), 1);
2015 assert_eq!(guard_b2.block_refs.len(), 1);
2016
2017 let guard_c = map
2019 .lock_blocks(
2020 [block_c].into(),
2021 AuthorityIndex::new_for_test(1),
2022 SyncMethod::Periodic,
2023 )
2024 .expect("Should lock block C");
2025 assert_eq!(guard_c.block_refs.len(), 1);
2026
2027 let all_blocks = [block_a, block_b, block_c, block_d].into();
2031 let guard_3 = map
2032 .lock_blocks(
2033 all_blocks,
2034 AuthorityIndex::new_for_test(3),
2035 SyncMethod::Periodic,
2036 )
2037 .expect("Should get partial lock");
2038
2039 assert_eq!(
2046 guard_3.block_refs.len(),
2047 3,
2048 "Should lock blocks A, C, and D"
2049 );
2050 assert!(
2051 guard_3.block_refs.contains(&block_a),
2052 "Should contain block A"
2053 );
2054 assert!(
2055 !guard_3.block_refs.contains(&block_b),
2056 "Should NOT contain block B (at limit)"
2057 );
2058 assert!(
2059 guard_3.block_refs.contains(&block_c),
2060 "Should contain block C"
2061 );
2062 assert!(
2063 guard_3.block_refs.contains(&block_d),
2064 "Should contain block D"
2065 );
2066 }
2067
2068 #[test]
2069 fn test_inflight_blocks_map_swap_locks_preserves_method() {
2070 let map = InflightBlocksMap::new();
2072 let some_block_refs = [
2073 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2074 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2075 ];
2076 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
2077
2078 let authority_1 = AuthorityIndex::new_for_test(1);
2080 let guard_1 = map
2081 .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
2082 .expect("Should lock with Live sync");
2083
2084 assert_eq!(guard_1.block_refs.len(), 2);
2085
2086 let authority_2 = AuthorityIndex::new_for_test(2);
2088 let guard_2 = map
2089 .swap_locks(guard_1, authority_2)
2090 .expect("Should swap locks");
2091
2092 assert_eq!(guard_2.block_refs, missing_block_refs);
2094
2095 let authority_3 = AuthorityIndex::new_for_test(3);
2097 let guard_3 = map.lock_blocks(missing_block_refs.clone(), authority_3, SyncMethod::Live);
2098 assert!(guard_3.is_none(), "Should fail - Live limit reached");
2099
2100 let guard_3_periodic = map
2102 .lock_blocks(
2103 missing_block_refs.clone(),
2104 authority_3,
2105 SyncMethod::Periodic,
2106 )
2107 .expect("Should lock with Periodic");
2108 assert_eq!(guard_3_periodic.block_refs.len(), 2);
2109 }
2110
2111 #[tokio::test]
2112 async fn test_process_fetched_blocks() {
2113 let (context, _) = Context::new_for_test(4);
2115 let context = Arc::new(context);
2116 let block_verifier = Arc::new(NoopBlockVerifier {});
2117 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2118 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2119 let (commands_sender, _commands_receiver) =
2120 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2121
2122 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2126 expected_blocks.extend(
2127 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2128 );
2129 assert_eq!(
2130 expected_blocks.len(),
2131 context.parameters.max_blocks_per_sync
2132 );
2133
2134 let expected_serialized_blocks = expected_blocks
2135 .iter()
2136 .map(|b| b.serialized().clone())
2137 .collect::<Vec<_>>();
2138
2139 let expected_block_refs = expected_blocks
2140 .iter()
2141 .map(|b| b.reference())
2142 .collect::<BTreeSet<_>>();
2143
2144 let peer_index = AuthorityIndex::new_for_test(2);
2146
2147 let inflight_blocks_map = InflightBlocksMap::new();
2149 let blocks_guard = inflight_blocks_map
2150 .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2151 .expect("Failed to lock blocks");
2152
2153 assert_eq!(
2154 inflight_blocks_map.num_of_locked_blocks(),
2155 expected_block_refs.len()
2156 );
2157
2158 let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2160 NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2161 )));
2162 let result = Synchronizer::<
2163 MockNetworkClient,
2164 NoopBlockVerifier,
2165 MockCoreThreadDispatcher,
2166 >::process_fetched_blocks(
2167 expected_serialized_blocks,
2168 peer_index,
2169 blocks_guard, core_dispatcher.clone(),
2171 block_verifier,
2172 verified_cache,
2173 commit_vote_monitor,
2174 context.clone(),
2175 commands_sender,
2176 "test",
2177 )
2178 .await;
2179
2180 assert!(result.is_ok());
2182
2183 let added_blocks = core_dispatcher.get_add_blocks().await;
2185 assert_eq!(
2186 added_blocks
2187 .iter()
2188 .map(|b| b.reference())
2189 .collect::<BTreeSet<_>>(),
2190 expected_block_refs,
2191 );
2192
2193 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2195 }
2196
2197 #[tokio::test]
2198 async fn test_successful_fetch_blocks_from_peer() {
2199 let (context, _) = Context::new_for_test(4);
2201 let context = Arc::new(context);
2202 let block_verifier = Arc::new(NoopBlockVerifier {});
2203 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2204 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2205 let network_client = Arc::new(MockNetworkClient::default());
2206 let store = Arc::new(MemStore::new());
2207 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2208
2209 let handle = Synchronizer::start(
2210 network_client.clone(),
2211 context,
2212 core_dispatcher.clone(),
2213 commit_vote_monitor,
2214 block_verifier,
2215 dag_state,
2216 false,
2217 );
2218
2219 let expected_blocks = (0..10)
2221 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2222 .collect::<Vec<_>>();
2223 let missing_blocks = expected_blocks
2224 .iter()
2225 .map(|block| block.reference())
2226 .collect::<BTreeSet<_>>();
2227
2228 let peer = AuthorityIndex::new_for_test(1);
2230 network_client
2231 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
2232 .await;
2233
2234 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2236
2237 sleep(Duration::from_millis(1_000)).await;
2239
2240 let added_blocks = core_dispatcher.get_add_blocks().await;
2242 assert_eq!(added_blocks, expected_blocks);
2243 }
2244
2245 #[tokio::test]
2246 async fn saturate_fetch_blocks_from_peer() {
2247 let (context, _) = Context::new_for_test(4);
2249 let context = Arc::new(context);
2250 let block_verifier = Arc::new(NoopBlockVerifier {});
2251 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2252 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2253 let network_client = Arc::new(MockNetworkClient::default());
2254 let store = Arc::new(MemStore::new());
2255 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2256
2257 let handle = Synchronizer::start(
2258 network_client.clone(),
2259 context,
2260 core_dispatcher.clone(),
2261 commit_vote_monitor,
2262 block_verifier,
2263 dag_state,
2264 false,
2265 );
2266
2267 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
2269 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
2270 .collect::<Vec<_>>();
2271
2272 let peer = AuthorityIndex::new_for_test(1);
2274 let mut iter = expected_blocks.iter().peekable();
2275 while let Some(block) = iter.next() {
2276 network_client
2279 .stub_fetch_blocks(
2280 vec![block.clone()],
2281 peer,
2282 Some(Duration::from_millis(5_000)),
2283 )
2284 .await;
2285
2286 let mut missing_blocks = BTreeSet::new();
2287 missing_blocks.insert(block.reference());
2288
2289 if iter.peek().is_none() {
2292 match handle.fetch_blocks(missing_blocks, peer).await {
2293 Err(ConsensusError::SynchronizerSaturated(index, _)) => {
2294 assert_eq!(index, peer);
2295 }
2296 _ => panic!("A saturated synchronizer error was expected"),
2297 }
2298 } else {
2299 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2300 }
2301 }
2302 }
2303
2304 #[tokio::test(flavor = "current_thread", start_paused = true)]
2305 async fn synchronizer_periodic_task_fetch_blocks() {
2306 let (context, _) = Context::new_for_test(4);
2308 let context = Arc::new(context);
2309 let block_verifier = Arc::new(NoopBlockVerifier {});
2310 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2311 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2312 let network_client = Arc::new(MockNetworkClient::default());
2313 let store = Arc::new(MemStore::new());
2314 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2315
2316 let expected_blocks = (0..10)
2318 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2319 .collect::<Vec<_>>();
2320 let missing_blocks = expected_blocks
2321 .iter()
2322 .map(|block| block.reference())
2323 .collect::<BTreeSet<_>>();
2324
2325 core_dispatcher
2327 .stub_missing_blocks(missing_blocks.clone())
2328 .await;
2329
2330 network_client
2334 .stub_fetch_blocks(
2335 expected_blocks.clone(),
2336 AuthorityIndex::new_for_test(1),
2337 Some(FETCH_REQUEST_TIMEOUT),
2338 )
2339 .await;
2340 network_client
2341 .stub_fetch_blocks(
2342 expected_blocks.clone(),
2343 AuthorityIndex::new_for_test(2),
2344 None,
2345 )
2346 .await;
2347
2348 let _handle = Synchronizer::start(
2350 network_client.clone(),
2351 context,
2352 core_dispatcher.clone(),
2353 commit_vote_monitor,
2354 block_verifier,
2355 dag_state,
2356 false,
2357 );
2358
2359 sleep(8 * FETCH_REQUEST_TIMEOUT).await;
2360
2361 let added_blocks = core_dispatcher.get_add_blocks().await;
2363 assert_eq!(added_blocks, expected_blocks);
2364
2365 assert!(
2367 core_dispatcher
2368 .get_missing_blocks()
2369 .await
2370 .unwrap()
2371 .is_empty()
2372 );
2373 }
2374
2375 #[tokio::test(flavor = "current_thread", start_paused = true)]
2376 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2377 let (mut context, _) = Context::new_for_test(4);
2379 context
2380 .protocol_config
2381 .set_consensus_batched_block_sync_for_testing(true);
2382 let context = Arc::new(context);
2383 let block_verifier = Arc::new(NoopBlockVerifier {});
2384 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2385 let network_client = Arc::new(MockNetworkClient::default());
2386 let store = Arc::new(MemStore::new());
2387 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2388 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2389
2390 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2393 let stub_blocks = (sync_missing_block_round_threshold * 2
2394 ..sync_missing_block_round_threshold * 2
2395 + context.parameters.max_blocks_per_sync as u32)
2396 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2397 .collect::<Vec<_>>();
2398 let missing_blocks = stub_blocks
2399 .iter()
2400 .map(|block| block.reference())
2401 .collect::<BTreeSet<_>>();
2402 core_dispatcher
2403 .stub_missing_blocks(missing_blocks.clone())
2404 .await;
2405 let mut expected_blocks = stub_blocks
2409 .iter()
2410 .take(context.parameters.max_blocks_per_sync)
2411 .cloned()
2412 .collect::<Vec<_>>();
2413 network_client
2414 .stub_fetch_blocks(
2415 expected_blocks.clone(),
2416 AuthorityIndex::new_for_test(1),
2417 Some(FETCH_REQUEST_TIMEOUT),
2418 )
2419 .await;
2420 network_client
2421 .stub_fetch_blocks(
2422 expected_blocks.clone(),
2423 AuthorityIndex::new_for_test(2),
2424 None,
2425 )
2426 .await;
2427
2428 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2430 let commit_index: CommitIndex = round - 1;
2431 let blocks = (0..4)
2432 .map(|authority| {
2433 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2434 let block = TestBlock::new(round, authority)
2435 .set_commit_votes(commit_votes)
2436 .build();
2437
2438 VerifiedBlock::new_for_test(block)
2439 })
2440 .collect::<Vec<_>>();
2441
2442 for block in blocks {
2445 commit_vote_monitor.observe_block(&block);
2446 }
2447
2448 let _handle = Synchronizer::start(
2451 network_client.clone(),
2452 context.clone(),
2453 core_dispatcher.clone(),
2454 commit_vote_monitor.clone(),
2455 block_verifier,
2456 dag_state.clone(),
2457 false,
2458 );
2459
2460 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2461
2462 let added_blocks = core_dispatcher.get_add_blocks().await;
2465 assert_eq!(added_blocks, vec![]);
2466
2467 println!("Before advancing");
2468 {
2471 let mut d = dag_state.write();
2472 for index in 1..=commit_index {
2473 let commit =
2474 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2475
2476 d.add_commit(commit);
2477 }
2478
2479 println!("Once advanced");
2480 assert_eq!(
2481 d.last_commit_index(),
2482 commit_vote_monitor.quorum_commit_index()
2483 );
2484 }
2485
2486 core_dispatcher
2488 .stub_missing_blocks(missing_blocks.clone())
2489 .await;
2490
2491 println!("Final sleep");
2492 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2493
2494 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2496 println!("Final await");
2497 added_blocks.sort_by_key(|block| block.reference());
2498 expected_blocks.sort_by_key(|block| block.reference());
2499
2500 assert_eq!(added_blocks, expected_blocks);
2501 }
2502
2503 #[tokio::test(flavor = "current_thread", start_paused = true)]
2504 async fn synchronizer_fetch_own_last_block() {
2505 let (context, _) = Context::new_for_test(4);
2507 let context = Arc::new(context.with_parameters(Parameters {
2508 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2509 ..Default::default()
2510 }));
2511 let block_verifier = Arc::new(NoopBlockVerifier {});
2512 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2513 let network_client = Arc::new(MockNetworkClient::default());
2514 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2515 let store = Arc::new(MemStore::new());
2516 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2517 let our_index = AuthorityIndex::new_for_test(0);
2518
2519 let mut expected_blocks = (8..=10)
2521 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2522 .collect::<Vec<_>>();
2523
2524 let block_1 = expected_blocks.pop().unwrap();
2527 network_client
2528 .stub_fetch_latest_blocks(
2529 vec![block_1.clone()],
2530 AuthorityIndex::new_for_test(1),
2531 vec![our_index],
2532 Some(Duration::from_secs(10)),
2533 )
2534 .await;
2535 network_client
2536 .stub_fetch_latest_blocks(
2537 vec![block_1],
2538 AuthorityIndex::new_for_test(1),
2539 vec![our_index],
2540 None,
2541 )
2542 .await;
2543
2544 let block_2 = expected_blocks.pop().unwrap();
2546 network_client
2547 .stub_fetch_latest_blocks(
2548 vec![block_2.clone()],
2549 AuthorityIndex::new_for_test(2),
2550 vec![our_index],
2551 Some(Duration::from_secs(10)),
2552 )
2553 .await;
2554 network_client
2555 .stub_fetch_latest_blocks(
2556 vec![block_2],
2557 AuthorityIndex::new_for_test(2),
2558 vec![our_index],
2559 None,
2560 )
2561 .await;
2562
2563 let block_3 = expected_blocks.pop().unwrap();
2565 network_client
2566 .stub_fetch_latest_blocks(
2567 vec![block_3.clone()],
2568 AuthorityIndex::new_for_test(3),
2569 vec![our_index],
2570 Some(Duration::from_secs(10)),
2571 )
2572 .await;
2573 network_client
2574 .stub_fetch_latest_blocks(
2575 vec![block_3],
2576 AuthorityIndex::new_for_test(3),
2577 vec![our_index],
2578 None,
2579 )
2580 .await;
2581
2582 let handle = Synchronizer::start(
2584 network_client.clone(),
2585 context.clone(),
2586 core_dispatcher.clone(),
2587 commit_vote_monitor,
2588 block_verifier,
2589 dag_state,
2590 true,
2591 );
2592
2593 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2595
2596 assert_eq!(
2598 core_dispatcher.get_last_own_proposed_round().await,
2599 vec![10]
2600 );
2601
2602 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2604
2605 assert_eq!(
2607 context
2608 .metrics
2609 .node_metrics
2610 .sync_last_known_own_block_retries
2611 .get(),
2612 1
2613 );
2614
2615 if let Err(err) = handle.stop().await {
2617 if err.is_panic() {
2618 std::panic::resume_unwind(err.into_panic());
2619 }
2620 }
2621 }
2622 #[derive(Default)]
2623 struct SyncMockDispatcher {
2624 missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2625 added_blocks: Mutex<Vec<VerifiedBlock>>,
2626 }
2627
2628 #[async_trait::async_trait]
2629 impl CoreThreadDispatcher for SyncMockDispatcher {
2630 async fn get_missing_blocks(
2631 &self,
2632 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2633 Ok(self.missing_blocks.lock().await.clone())
2634 }
2635 async fn add_blocks(
2636 &self,
2637 blocks: Vec<VerifiedBlock>,
2638 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2639 let mut guard = self.added_blocks.lock().await;
2640 guard.extend(blocks.clone());
2641 Ok(blocks.iter().map(|b| b.reference()).collect())
2642 }
2643
2644 async fn check_block_refs(
2647 &self,
2648 block_refs: Vec<BlockRef>,
2649 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2650 Ok(block_refs.into_iter().collect())
2652 }
2653
2654 async fn add_certified_commits(
2655 &self,
2656 _commits: CertifiedCommits,
2657 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2658 Ok(BTreeSet::new())
2660 }
2661
2662 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2663 Ok(())
2664 }
2665
2666 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2667 Ok(())
2668 }
2669
2670 fn set_propagation_delay_and_quorum_rounds(
2671 &self,
2672 _delay: Round,
2673 _received_quorum_rounds: Vec<QuorumRound>,
2674 _accepted_quorum_rounds: Vec<QuorumRound>,
2675 ) -> Result<(), CoreError> {
2676 Ok(())
2677 }
2678
2679 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2680 Ok(())
2681 }
2682
2683 fn highest_received_rounds(&self) -> Vec<Round> {
2684 Vec::new()
2685 }
2686 }
2687
2688 #[tokio::test(flavor = "current_thread")]
2689 async fn known_before_random_peer_fetch() {
2690 {
2691 let (ctx, _) = Context::new_for_test(10);
2693 let context = Arc::new(ctx);
2694 let store = Arc::new(MemStore::new());
2695 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2696 let inflight = InflightBlocksMap::new();
2697
2698 let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2700 let missing_ref = missing_vb.reference();
2701 let missing_blocks = BTreeMap::from([(
2702 missing_ref,
2703 BTreeSet::from([
2704 AuthorityIndex::new_for_test(2),
2705 AuthorityIndex::new_for_test(3),
2706 AuthorityIndex::new_for_test(4),
2707 ]),
2708 )]);
2709
2710 let network_client = Arc::new(MockNetworkClient::default());
2712 for i in 1..=9 {
2714 let peer = AuthorityIndex::new_for_test(i);
2715 if i == 1 || i == 4 {
2716 network_client
2717 .stub_fetch_blocks(
2718 vec![missing_vb.clone()],
2719 peer,
2720 Some(2 * FETCH_REQUEST_TIMEOUT),
2721 )
2722 .await;
2723 continue;
2724 }
2725 network_client
2726 .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2727 .await;
2728 }
2729
2730 let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2733 ::fetch_blocks_from_authorities(
2734 context.clone(),
2735 inflight.clone(),
2736 network_client.clone(),
2737 missing_blocks,
2738 dag_state.clone(),
2739 )
2740 .await;
2741
2742 assert_eq!(results.len(), 2);
2745
2746 let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2748 assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2749 let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2750 assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2751 let expected = missing_vb.serialized().clone();
2753 assert_eq!(hot_bytes, &vec![expected]);
2754 }
2755 }
2756
2757 #[tokio::test(flavor = "current_thread")]
2758 async fn known_before_periodic_peer_fetch_larger_scenario() {
2759 use std::{
2760 collections::{BTreeMap, BTreeSet},
2761 sync::Arc,
2762 };
2763
2764 use parking_lot::RwLock;
2765
2766 use crate::{
2767 block::{Round, TestBlock, VerifiedBlock},
2768 context::Context,
2769 };
2770
2771 let (ctx, _) = Context::new_for_test(10);
2773 let context = Arc::new(ctx);
2774 let store = Arc::new(MemStore::new());
2775 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2776 let inflight = InflightBlocksMap::new();
2777 let network_client = Arc::new(MockNetworkClient::default());
2778
2779 let mut missing_blocks = BTreeMap::new();
2781 let mut missing_vbs = Vec::new();
2782 let known_number_blocks = 10;
2783 for i in 0..1000 {
2784 let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2785 let r = vb.reference();
2786 if i < known_number_blocks {
2787 missing_blocks.insert(
2789 r,
2790 BTreeSet::from([
2791 AuthorityIndex::new_for_test(0),
2792 AuthorityIndex::new_for_test(2),
2793 ]),
2794 );
2795 } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2796 missing_blocks.insert(
2798 r,
2799 BTreeSet::from([
2800 AuthorityIndex::new_for_test(0),
2801 AuthorityIndex::new_for_test(3),
2802 ]),
2803 );
2804 } else {
2805 missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2807 }
2808 missing_vbs.push(vb);
2809 }
2810
2811 let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2813 let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2814 .iter()
2815 .map(|&peer| {
2816 let vbs = missing_vbs
2817 .iter()
2818 .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2819 .take(context.parameters.max_blocks_per_sync)
2820 .cloned()
2821 .collect::<Vec<_>>();
2822 (peer, vbs)
2823 })
2824 .collect();
2825
2826 for (peer, vbs) in known_vbs_by_peer {
2827 if peer == AuthorityIndex::new_for_test(2) {
2828 network_client
2830 .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2831 .await;
2832 network_client
2833 .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2834 .await;
2835 } else {
2836 network_client
2837 .stub_fetch_blocks(vbs.clone(), peer, None)
2838 .await;
2839 }
2840 }
2841
2842 network_client
2844 .stub_fetch_blocks(
2845 missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2846 AuthorityIndex::new_for_test(1),
2847 None,
2848 )
2849 .await;
2850
2851 network_client
2852 .stub_fetch_blocks(
2853 missing_vbs[context.parameters.max_blocks_per_sync
2854 ..2 * context.parameters.max_blocks_per_sync]
2855 .to_vec(),
2856 AuthorityIndex::new_for_test(4),
2857 None,
2858 )
2859 .await;
2860
2861 let results = Synchronizer::<
2863 MockNetworkClient,
2864 NoopBlockVerifier,
2865 SyncMockDispatcher,
2866 >::fetch_blocks_from_authorities(
2867 context.clone(),
2868 inflight.clone(),
2869 network_client.clone(),
2870 missing_blocks,
2871 dag_state.clone(),
2872 )
2873 .await;
2874
2875 assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2878
2879 let (_guard3, bytes3, peer3) = &results[0];
2881 assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
2882 let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
2883 .iter()
2884 .map(|vb| vb.serialized().clone())
2885 .collect::<Vec<_>>();
2886 assert_eq!(bytes3, &expected2);
2887
2888 let (_guard1, bytes1, peer1) = &results[1];
2890 assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
2891 let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
2892 .iter()
2893 .map(|vb| vb.serialized().clone())
2894 .collect::<Vec<_>>();
2895 assert_eq!(bytes1, &expected1);
2896
2897 let (_guard4, bytes4, peer4) = &results[2];
2899 assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
2900 let expected4 = missing_vbs
2901 [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
2902 .iter()
2903 .map(|vb| vb.serialized().clone())
2904 .collect::<Vec<_>>();
2905 assert_eq!(bytes4, &expected4);
2906
2907 let (_guard5, bytes5, peer5) = &results[3];
2909 assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
2910 let expected5 = missing_vbs[0..known_number_blocks]
2911 .iter()
2912 .map(|vb| vb.serialized().clone())
2913 .collect::<Vec<_>>();
2914 assert_eq!(bytes5, &expected5);
2915 }
2916
2917 #[tokio::test(flavor = "current_thread")]
2918 async fn test_verify_blocks_deduplication() {
2919 let (context, _keys) = Context::new_for_test(4);
2920 let context = Arc::new(context);
2921 let block_verifier = Arc::new(NoopBlockVerifier {});
2922 let failing_verifier = Arc::new(FailingBlockVerifier);
2923 let peer1 = AuthorityIndex::new_for_test(1);
2924 let peer2 = AuthorityIndex::new_for_test(2);
2925
2926 let cache = Arc::new(SyncMutex::new(LruCache::new(NonZeroUsize::new(5).unwrap())));
2928
2929 let block1 = VerifiedBlock::new_for_test(TestBlock::new(10, 0).build());
2931 let serialized1 = vec![block1.serialized().clone()];
2932
2933 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2935 serialized1.clone(), block_verifier.clone(), cache.clone(), &context, peer1, "live",
2936 );
2937 assert_eq!(result.unwrap().len(), 1);
2938
2939 let peer1_hostname = &context.committee.authority(peer1).hostname;
2940 assert_eq!(
2941 context
2942 .metrics
2943 .node_metrics
2944 .synchronizer_skipped_blocks_by_peer
2945 .with_label_values(&[peer1_hostname.as_str(), "live"])
2946 .get(),
2947 0
2948 );
2949
2950 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2952 serialized1, block_verifier.clone(), cache.clone(), &context, peer2, "periodic",
2953 );
2954 assert_eq!(result.unwrap().len(), 0, "Should skip cached block");
2955
2956 let peer2_hostname = &context.committee.authority(peer2).hostname;
2957 assert_eq!(
2958 context
2959 .metrics
2960 .node_metrics
2961 .synchronizer_skipped_blocks_by_peer
2962 .with_label_values(&[peer2_hostname.as_str(), "periodic"])
2963 .get(),
2964 1
2965 );
2966
2967 let invalid_block = VerifiedBlock::new_for_test(TestBlock::new(20, 0).build());
2969 let invalid_serialized = vec![invalid_block.serialized().clone()];
2970
2971 assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2972 invalid_serialized.clone(), failing_verifier.clone(), cache.clone(), &context, peer1, "test",
2973 ).is_err());
2974 assert_eq!(cache.lock().len(), 1, "Invalid block should not be cached");
2975
2976 assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2978 invalid_serialized, failing_verifier, cache.clone(), &context, peer1, "test",
2979 ).is_err());
2980
2981 let blocks: Vec<_> = (0..5)
2983 .map(|i| VerifiedBlock::new_for_test(TestBlock::new(30 + i, 0).build()))
2984 .collect();
2985
2986 for block in &blocks {
2988 Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2989 vec![block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
2990 ).unwrap();
2991 }
2992 assert_eq!(cache.lock().len(), 5);
2993
2994 let new_block = VerifiedBlock::new_for_test(TestBlock::new(99, 0).build());
2996 Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2997 vec![new_block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
2998 ).unwrap();
2999
3000 let block1_serialized = vec![block1.serialized().clone()];
3003 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3004 block1_serialized, block_verifier.clone(), cache.clone(), &context, peer1, "test",
3005 );
3006 assert_eq!(
3007 result.unwrap().len(),
3008 1,
3009 "Evicted block should be re-verified"
3010 );
3011
3012 let new_block_serialized = vec![new_block.serialized().clone()];
3014 let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3015 new_block_serialized, block_verifier, cache, &context, peer1, "test",
3016 );
3017 assert_eq!(result.unwrap().len(), 0, "New block should be cached");
3018 }
3019}