1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7 sync::Arc,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
24use tap::TapFallible;
25use tokio::{
26 runtime::Handle,
27 sync::{mpsc::error::TrySendError, oneshot},
28 task::{JoinError, JoinSet},
29 time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34 BlockAPI, CommitIndex, Round,
35 authority_service::COMMIT_LAG_MULTIPLIER,
36 block::{BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
37 block_verifier::BlockVerifier,
38 commit_vote_monitor::CommitVoteMonitor,
39 context::Context,
40 core_thread::CoreThreadDispatcher,
41 dag_state::DagState,
42 error::{ConsensusError, ConsensusResult},
43 network::NetworkClient,
44};
45
46const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
53
54const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
56
57const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
59
60const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 3;
64
65const MAX_PERIODIC_SYNC_PEERS: usize = 4;
68
69const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
73
74struct BlocksGuard {
75 map: Arc<InflightBlocksMap>,
76 block_refs: BTreeSet<BlockRef>,
77 peer: AuthorityIndex,
78}
79
80impl Drop for BlocksGuard {
81 fn drop(&mut self) {
82 self.map.unlock_blocks(&self.block_refs, self.peer);
83 }
84}
85
86struct InflightBlocksMap {
92 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
93}
94
95impl InflightBlocksMap {
96 fn new() -> Arc<Self> {
97 Arc::new(Self {
98 inner: Mutex::new(HashMap::new()),
99 })
100 }
101
102 fn lock_blocks(
110 self: &Arc<Self>,
111 missing_block_refs: BTreeSet<BlockRef>,
112 peer: AuthorityIndex,
113 ) -> Option<BlocksGuard> {
114 let mut blocks = BTreeSet::new();
115 let mut inner = self.inner.lock();
116
117 for block_ref in missing_block_refs {
118 let authorities = inner.entry(block_ref).or_default();
122 if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
123 && authorities.get(&peer).is_none()
124 {
125 assert!(authorities.insert(peer));
126 blocks.insert(block_ref);
127 }
128 }
129
130 if blocks.is_empty() {
131 None
132 } else {
133 Some(BlocksGuard {
134 map: self.clone(),
135 block_refs: blocks,
136 peer,
137 })
138 }
139 }
140
141 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
146 let mut blocks_to_fetch = self.inner.lock();
148 for block_ref in block_refs {
149 let authorities = blocks_to_fetch
150 .get_mut(block_ref)
151 .expect("Should have found a non empty map");
152
153 assert!(authorities.remove(&peer), "Peer index should be present!");
154
155 if authorities.is_empty() {
157 blocks_to_fetch.remove(block_ref);
158 }
159 }
160 }
161
162 fn swap_locks(
167 self: &Arc<Self>,
168 blocks_guard: BlocksGuard,
169 peer: AuthorityIndex,
170 ) -> Option<BlocksGuard> {
171 let block_refs = blocks_guard.block_refs.clone();
172
173 drop(blocks_guard);
175
176 self.lock_blocks(block_refs, peer)
178 }
179
180 #[cfg(test)]
181 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
182 let inner = self.inner.lock();
183 inner.len()
184 }
185}
186
187enum Command {
188 FetchBlocks {
189 missing_block_refs: BTreeSet<BlockRef>,
190 peer_index: AuthorityIndex,
191 result: oneshot::Sender<Result<(), ConsensusError>>,
192 },
193 FetchOwnLastBlock,
194 KickOffScheduler,
195}
196
197pub(crate) struct SynchronizerHandle {
198 commands_sender: Sender<Command>,
199 tasks: tokio::sync::Mutex<JoinSet<()>>,
200}
201
202impl SynchronizerHandle {
203 pub(crate) async fn fetch_blocks(
206 &self,
207 missing_block_refs: BTreeSet<BlockRef>,
208 peer_index: AuthorityIndex,
209 ) -> ConsensusResult<()> {
210 let (sender, receiver) = oneshot::channel();
211 self.commands_sender
212 .send(Command::FetchBlocks {
213 missing_block_refs,
214 peer_index,
215 result: sender,
216 })
217 .await
218 .map_err(|_err| ConsensusError::Shutdown)?;
219 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
220 }
221
222 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
223 let mut tasks = self.tasks.lock().await;
224 tasks.abort_all();
225 while let Some(result) = tasks.join_next().await {
226 result?
227 }
228 Ok(())
229 }
230}
231
232pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
259 context: Arc<Context>,
260 commands_receiver: Receiver<Command>,
261 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
262 core_dispatcher: Arc<D>,
263 commit_vote_monitor: Arc<CommitVoteMonitor>,
264 dag_state: Arc<RwLock<DagState>>,
265 fetch_blocks_scheduler_task: JoinSet<()>,
266 fetch_own_last_block_task: JoinSet<()>,
267 network_client: Arc<C>,
268 block_verifier: Arc<V>,
269 inflight_blocks_map: Arc<InflightBlocksMap>,
270 commands_sender: Sender<Command>,
271}
272
273impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
274 pub fn start(
277 network_client: Arc<C>,
278 context: Arc<Context>,
279 core_dispatcher: Arc<D>,
280 commit_vote_monitor: Arc<CommitVoteMonitor>,
281 block_verifier: Arc<V>,
282 dag_state: Arc<RwLock<DagState>>,
283 sync_last_known_own_block: bool,
284 ) -> Arc<SynchronizerHandle> {
285 let (commands_sender, commands_receiver) =
286 channel("consensus_synchronizer_commands", 1_000);
287 let inflight_blocks_map = InflightBlocksMap::new();
288
289 let mut fetch_block_senders = BTreeMap::new();
291 let mut tasks = JoinSet::new();
292 for (index, _) in context.committee.authorities() {
293 if index == context.own_index {
294 continue;
295 }
296 let (sender, receiver) =
297 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
298 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
299 index,
300 network_client.clone(),
301 block_verifier.clone(),
302 commit_vote_monitor.clone(),
303 context.clone(),
304 core_dispatcher.clone(),
305 dag_state.clone(),
306 receiver,
307 commands_sender.clone(),
308 );
309 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
310 fetch_block_senders.insert(index, sender);
311 }
312
313 let commands_sender_clone = commands_sender.clone();
314
315 if sync_last_known_own_block {
316 commands_sender
317 .try_send(Command::FetchOwnLastBlock)
318 .expect("Failed to sync our last block");
319 }
320
321 tasks.spawn(monitored_future!(async move {
323 let mut s = Self {
324 context,
325 commands_receiver,
326 fetch_block_senders,
327 core_dispatcher,
328 commit_vote_monitor,
329 fetch_blocks_scheduler_task: JoinSet::new(),
330 fetch_own_last_block_task: JoinSet::new(),
331 network_client,
332 block_verifier,
333 inflight_blocks_map,
334 commands_sender: commands_sender_clone,
335 dag_state,
336 };
337 s.run().await;
338 }));
339
340 Arc::new(SynchronizerHandle {
341 commands_sender,
342 tasks: tokio::sync::Mutex::new(tasks),
343 })
344 }
345
346 async fn run(&mut self) {
348 const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
351 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
352
353 tokio::pin!(scheduler_timeout);
354
355 loop {
356 tokio::select! {
357 Some(command) = self.commands_receiver.recv() => {
358 match command {
359 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
360 if peer_index == self.context.own_index {
361 error!("We should never attempt to fetch blocks from our own node");
362 continue;
363 }
364
365 let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
366
367 let missing_block_refs = missing_block_refs
370 .into_iter()
371 .take(self.context.parameters.max_blocks_per_sync)
372 .collect();
373
374 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
375 let Some(blocks_guard) = blocks_guard else {
376 result.send(Ok(())).ok();
377 continue;
378 };
379
380 let r = self
383 .fetch_block_senders
384 .get(&peer_index)
385 .expect("Fatal error, sender should be present")
386 .try_send(blocks_guard)
387 .map_err(|err| {
388 match err {
389 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
390 TrySendError::Closed(_) => ConsensusError::Shutdown
391 }
392 });
393
394 result.send(r).ok();
395 }
396 Command::FetchOwnLastBlock => {
397 if self.fetch_own_last_block_task.is_empty() {
398 self.start_fetch_own_last_block_task();
399 }
400 }
401 Command::KickOffScheduler => {
402 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
405 Instant::now()
406 } else {
407 Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
408 };
409
410 if timeout < scheduler_timeout.deadline() {
412 scheduler_timeout.as_mut().reset(timeout);
413 }
414 }
415 }
416 },
417 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
418 match result {
419 Ok(()) => {},
420 Err(e) => {
421 if e.is_cancelled() {
422 } else if e.is_panic() {
423 std::panic::resume_unwind(e.into_panic());
424 } else {
425 panic!("fetch our last block task failed: {e}");
426 }
427 },
428 };
429 },
430 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
431 match result {
432 Ok(()) => {},
433 Err(e) => {
434 if e.is_cancelled() {
435 } else if e.is_panic() {
436 std::panic::resume_unwind(e.into_panic());
437 } else {
438 panic!("fetch blocks scheduler task failed: {e}");
439 }
440 },
441 };
442 },
443 () = &mut scheduler_timeout => {
444 if self.fetch_blocks_scheduler_task.is_empty() {
446 if let Err(err) = self.start_fetch_missing_blocks_task().await {
447 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
448 return;
449 };
450 }
451
452 scheduler_timeout
453 .as_mut()
454 .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
455 }
456 }
457 }
458 }
459
460 async fn fetch_blocks_from_authority(
461 peer_index: AuthorityIndex,
462 network_client: Arc<C>,
463 block_verifier: Arc<V>,
464 commit_vote_monitor: Arc<CommitVoteMonitor>,
465 context: Arc<Context>,
466 core_dispatcher: Arc<D>,
467 dag_state: Arc<RwLock<DagState>>,
468 mut receiver: Receiver<BlocksGuard>,
469 commands_sender: Sender<Command>,
470 ) {
471 const MAX_RETRIES: u32 = 3;
472 let peer_hostname = &context.committee.authority(peer_index).hostname;
473
474 let mut requests = FuturesUnordered::new();
475
476 loop {
477 tokio::select! {
478 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
479 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
481
482 let metrics = &context.metrics.node_metrics;
484 metrics
485 .synchronizer_requested_blocks_by_peer
486 .with_label_values(&[peer_hostname.as_str(), "live"])
487 .inc_by(blocks_guard.block_refs.len() as u64);
488 let mut authors = HashSet::new();
490 for block_ref in &blocks_guard.block_refs {
491 authors.insert(block_ref.author);
492 }
493 for author in authors {
494 let host = &context.committee.authority(author).hostname;
495 metrics
496 .synchronizer_requested_blocks_by_authority
497 .with_label_values(&[host.as_str(), "live"])
498 .inc();
499 }
500
501 requests.push(Self::fetch_blocks_request(
502 network_client.clone(),
503 peer_index,
504 blocks_guard,
505 highest_rounds,
506 FETCH_REQUEST_TIMEOUT,
507 1,
508 ))
509 },
510 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
511 match response {
512 Ok(blocks) => {
513 if let Err(err) = Self::process_fetched_blocks(blocks,
514 peer_index,
515 blocks_guard,
516 core_dispatcher.clone(),
517 block_verifier.clone(),
518 commit_vote_monitor.clone(),
519 context.clone(),
520 commands_sender.clone(),
521 "live"
522 ).await {
523 context.metrics.update_scoring_metrics_on_block_receival(
524 peer_index,
525 peer_hostname,
526 err.clone(),
527 "process_fetched_blocks",
528 );
529 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
530 context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
531 }
532 },
533 Err(_) => {
534 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
535 if retries <= MAX_RETRIES {
536 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
537 } else {
538 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
539 drop(blocks_guard);
541 }
542 }
543 }
544 },
545 else => {
546 info!("Fetching blocks from authority {peer_index} task will now abort.");
547 break;
548 }
549 }
550 }
551 }
552
553 async fn process_fetched_blocks(
557 mut serialized_blocks: Vec<Bytes>,
558 peer_index: AuthorityIndex,
559 requested_blocks_guard: BlocksGuard,
560 core_dispatcher: Arc<D>,
561 block_verifier: Arc<V>,
562 commit_vote_monitor: Arc<CommitVoteMonitor>,
563 context: Arc<Context>,
564 commands_sender: Sender<Command>,
565 sync_method: &str,
566 ) -> ConsensusResult<()> {
567 if serialized_blocks.is_empty() {
568 return Ok(());
569 }
570
571 if context.protocol_config.consensus_batched_block_sync() {
573 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
574 } else {
575 if serialized_blocks.len()
578 > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
579 {
580 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
581 }
582 }
583
584 let blocks = Handle::current()
586 .spawn_blocking({
587 let block_verifier = block_verifier.clone();
588 let context = context.clone();
589 move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
590 })
591 .await
592 .expect("Spawn blocking should not fail")?;
593
594 if !context.protocol_config.consensus_batched_block_sync() {
595 let ancestors = blocks
597 .iter()
598 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
599 .flat_map(|b| b.ancestors().to_vec())
600 .collect::<BTreeSet<BlockRef>>();
601
602 for block in &blocks {
605 if !requested_blocks_guard
606 .block_refs
607 .contains(&block.reference())
608 && !ancestors.contains(&block.reference())
609 {
610 return Err(ConsensusError::UnexpectedFetchedBlock {
611 index: peer_index,
612 block_ref: block.reference(),
613 });
614 }
615 }
616 }
617
618 for block in &blocks {
620 commit_vote_monitor.observe_block(block);
621 }
622
623 let metrics = &context.metrics.node_metrics;
624 let peer_hostname = &context.committee.authority(peer_index).hostname;
625 metrics
626 .synchronizer_fetched_blocks_by_peer
627 .with_label_values(&[peer_hostname.as_str(), sync_method])
628 .inc_by(blocks.len() as u64);
629 for block in &blocks {
630 let block_hostname = &context.committee.authority(block.author()).hostname;
631 metrics
632 .synchronizer_fetched_blocks_by_authority
633 .with_label_values(&[block_hostname.as_str(), sync_method])
634 .inc();
635 }
636
637 debug!(
638 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
639 blocks.len(),
640 blocks.iter().map(|b| b.reference().to_string()).join(", "),
641 );
642
643 let missing_blocks = core_dispatcher
647 .add_blocks(blocks)
648 .await
649 .map_err(|_| ConsensusError::Shutdown)?;
650
651 drop(requested_blocks_guard);
654
655 if !missing_blocks.is_empty() {
657 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
659 {
660 warn!("Commands channel is full")
661 }
662 }
663
664 context
665 .metrics
666 .node_metrics
667 .missing_blocks_after_fetch_total
668 .inc_by(missing_blocks.len() as u64);
669
670 Ok(())
671 }
672
673 fn get_highest_accepted_rounds(
674 dag_state: Arc<RwLock<DagState>>,
675 context: &Arc<Context>,
676 ) -> Vec<Round> {
677 let blocks = dag_state
678 .read()
679 .get_last_cached_block_per_authority(Round::MAX);
680 assert_eq!(blocks.len(), context.committee.size());
681
682 blocks
683 .into_iter()
684 .map(|(block, _)| block.round())
685 .collect::<Vec<_>>()
686 }
687
688 fn verify_blocks(
689 serialized_blocks: Vec<Bytes>,
690 block_verifier: Arc<V>,
691 context: &Context,
692 peer_index: AuthorityIndex,
693 ) -> ConsensusResult<Vec<VerifiedBlock>> {
694 let mut verified_blocks = Vec::new();
695
696 for serialized_block in serialized_blocks {
697 let signed_block: SignedBlock =
698 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
699
700 if let Err(e) = block_verifier.verify(&signed_block) {
702 let hostname = context.committee.authority(peer_index).hostname.clone();
705
706 context
707 .metrics
708 .node_metrics
709 .invalid_blocks
710 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
711 .inc();
712 warn!("Invalid block received from {}: {}", peer_index, e);
713 return Err(e);
714 }
715 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
716
717 let now = context.clock.timestamp_utc_ms();
721 let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
722 if drift > 0 {
723 let peer_hostname = &context
724 .committee
725 .authority(verified_block.author())
726 .hostname;
727 context
728 .metrics
729 .node_metrics
730 .block_timestamp_drift_ms
731 .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
732 .inc_by(drift);
733
734 if context
735 .protocol_config
736 .consensus_median_timestamp_with_checkpoint_enforcement()
737 {
738 trace!(
739 "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
740 verified_block.reference(),
741 verified_block.timestamp_ms(),
742 now
743 );
744 } else {
745 warn!(
746 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
747 verified_block.reference(),
748 verified_block.timestamp_ms(),
749 now
750 );
751 continue;
752 }
753 }
754
755 verified_blocks.push(verified_block);
756 }
757
758 Ok(verified_blocks)
759 }
760
761 async fn fetch_blocks_request(
762 network_client: Arc<C>,
763 peer: AuthorityIndex,
764 blocks_guard: BlocksGuard,
765 highest_rounds: Vec<Round>,
766 request_timeout: Duration,
767 mut retries: u32,
768 ) -> (
769 ConsensusResult<Vec<Bytes>>,
770 BlocksGuard,
771 u32,
772 AuthorityIndex,
773 Vec<Round>,
774 ) {
775 let start = Instant::now();
776 let resp = timeout(
777 request_timeout,
778 network_client.fetch_blocks(
779 peer,
780 blocks_guard
781 .block_refs
782 .clone()
783 .into_iter()
784 .collect::<Vec<_>>(),
785 highest_rounds.clone(),
786 request_timeout,
787 ),
788 )
789 .await;
790
791 fail_point_async!("consensus-delay");
792
793 let resp = match resp {
794 Ok(Err(err)) => {
795 sleep_until(start + request_timeout).await;
798 retries += 1;
799 Err(err)
800 } Err(err) => {
802 sleep_until(start + request_timeout).await;
804 retries += 1;
805 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
806 }
807 Ok(result) => result,
808 };
809 (resp, blocks_guard, retries, peer, highest_rounds)
810 }
811
812 fn start_fetch_own_last_block_task(&mut self) {
813 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
814 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
815
816 let context = self.context.clone();
817 let dag_state = self.dag_state.clone();
818 let network_client = self.network_client.clone();
819 let block_verifier = self.block_verifier.clone();
820 let core_dispatcher = self.core_dispatcher.clone();
821
822 self.fetch_own_last_block_task
823 .spawn(monitored_future!(async move {
824 let _scope = monitored_scope("FetchOwnLastBlockTask");
825
826 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
827 let network_client_cloned = network_client.clone();
828 let own_index = context.own_index;
829 async move {
830 sleep(fetch_own_block_delay).await;
831 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
832 (r, authority_index)
833 }
834 };
835
836 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
837 let mut result = Vec::new();
838 for serialized_block in blocks {
839 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
840 block_verifier.verify(&signed_block).tap_err(|err|{
841 let hostname = context.committee.authority(authority_index).hostname.clone();
842 context
843 .metrics
844 .node_metrics
845 .invalid_blocks
846 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
847 .inc();
848 warn!("Invalid block received from {}: {}", authority_index, err);
849 })?;
850
851 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
852 if verified_block.author() != context.own_index {
853 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
854 }
855 result.push(verified_block);
856 }
857 Ok(result)
858 };
859
860 let mut highest_round = GENESIS_ROUND;
862 let mut received_response = vec![false; context.committee.size()];
864 received_response[context.own_index] = true;
866 let mut total_stake = context.committee.stake(context.own_index);
867 let mut retries = 0;
868 let mut retry_delay_step = Duration::from_millis(500);
869 'main:loop {
870 if context.committee.size() == 1 {
871 highest_round = dag_state.read().get_last_proposed_block().round();
872 info!("Only one node in the network, will not try fetching own last block from peers.");
873 break 'main;
874 }
875
876 let mut results = FuturesUnordered::new();
878
879 for (authority_index, _authority) in context.committee.authorities() {
880 if !received_response[authority_index] {
882 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
883 }
884 }
885
886 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
888 tokio::pin!(timer);
889
890 'inner: loop {
891 tokio::select! {
892 result = results.next() => {
893 let Some((result, authority_index)) = result else {
894 break 'inner;
895 };
896 match result {
897 Ok(result) => {
898 match process_blocks(result, authority_index) {
899 Ok(blocks) => {
900 received_response[authority_index] = true;
901 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
902 highest_round = highest_round.max(max_round);
903
904 total_stake += context.committee.stake(authority_index);
905 },
906 Err(err) => {
907 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
908 }
909 }
910 },
911 Err(err) => {
912 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
913 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
914 }
915 }
916 },
917 () = &mut timer => {
918 info!("Timeout while trying to sync our own last block from peers");
919 break 'inner;
920 }
921 }
922 }
923
924 if context.committee.reached_quorum(total_stake) {
926 info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
927 break 'main;
928 } else {
929 info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
930 }
931
932 retries += 1;
933 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
934 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
935
936 sleep(retry_delay_step).await;
937
938 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
939 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
940 }
941
942 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
944
945 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
946 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
947 }
948 }));
949 }
950
951 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
952 let mut missing_blocks = self
953 .core_dispatcher
954 .get_missing_blocks()
955 .await
956 .map_err(|_err| ConsensusError::Shutdown)?;
957
958 if missing_blocks.is_empty() {
960 return Ok(());
961 }
962
963 let context = self.context.clone();
964 let network_client = self.network_client.clone();
965 let block_verifier = self.block_verifier.clone();
966 let commit_vote_monitor = self.commit_vote_monitor.clone();
967 let core_dispatcher = self.core_dispatcher.clone();
968 let blocks_to_fetch = self.inflight_blocks_map.clone();
969 let commands_sender = self.commands_sender.clone();
970 let dag_state = self.dag_state.clone();
971
972 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
973 trace!(
974 "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
975 );
976 if commit_lagging {
977 if dag_state.read().gc_enabled() {
982 return Ok(());
983 }
984
985 let highest_accepted_round = dag_state.read().highest_accepted_round();
989 missing_blocks = missing_blocks
990 .into_iter()
991 .take_while(|(block_ref, _)| {
992 block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
993 })
994 .collect::<BTreeMap<_, _>>();
995
996 if missing_blocks.is_empty() {
999 trace!(
1000 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1001 );
1002 self.context
1003 .metrics
1004 .node_metrics
1005 .fetch_blocks_scheduler_skipped
1006 .with_label_values(&["commit_lagging"])
1007 .inc();
1008 return Ok(());
1009 }
1010 }
1011
1012 self.fetch_blocks_scheduler_task
1013 .spawn(monitored_future!(async move {
1014 let _scope = monitored_scope("FetchMissingBlocksScheduler");
1015
1016 context
1017 .metrics
1018 .node_metrics
1019 .fetch_blocks_scheduler_inflight
1020 .inc();
1021 let total_requested = missing_blocks.len();
1022
1023 fail_point_async!("consensus-delay");
1024
1025 let results = Self::fetch_blocks_from_authorities(
1027 context.clone(),
1028 blocks_to_fetch.clone(),
1029 network_client,
1030 missing_blocks,
1031 dag_state,
1032 )
1033 .await;
1034 context
1035 .metrics
1036 .node_metrics
1037 .fetch_blocks_scheduler_inflight
1038 .dec();
1039 if results.is_empty() {
1040 warn!("No results returned while requesting missing blocks");
1041 return;
1042 }
1043
1044 let mut total_fetched = 0;
1046 for (blocks_guard, fetched_blocks, peer) in results {
1047 total_fetched += fetched_blocks.len();
1048
1049 if let Err(err) = Self::process_fetched_blocks(
1050 fetched_blocks,
1051 peer,
1052 blocks_guard,
1053 core_dispatcher.clone(),
1054 block_verifier.clone(),
1055 commit_vote_monitor.clone(),
1056 context.clone(),
1057 commands_sender.clone(),
1058 "periodic",
1059 )
1060 .await
1061 {
1062 warn!(
1063 "Error occurred while processing fetched blocks from peer {peer}: {err}"
1064 );
1065 }
1066 }
1067
1068 debug!(
1069 "Total blocks requested to fetch: {}, total fetched: {}",
1070 total_requested, total_fetched
1071 );
1072 }));
1073 Ok(())
1074 }
1075
1076 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1077 let last_commit_index = self.dag_state.read().last_commit_index();
1078 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1079 let commit_threshold = last_commit_index
1080 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1081 (
1082 commit_threshold < quorum_commit_index,
1083 last_commit_index,
1084 quorum_commit_index,
1085 )
1086 }
1087
1088 fn missing_block_round_threshold(&self) -> Round {
1095 self.context.parameters.commit_sync_batch_size
1096 }
1097
1098 async fn fetch_blocks_from_authorities(
1113 context: Arc<Context>,
1114 inflight_blocks: Arc<InflightBlocksMap>,
1115 network_client: Arc<C>,
1116 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1117 dag_state: Arc<RwLock<DagState>>,
1118 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1119 let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1121 for (missing_block_ref, authorities) in &missing_blocks {
1122 for author in authorities {
1123 if author == &context.own_index {
1124 continue;
1126 }
1127 authority_to_blocks
1128 .entry(*author)
1129 .or_default()
1130 .push(*missing_block_ref);
1131 }
1132 }
1133
1134 #[cfg(not(test))]
1138 let mut rng = StdRng::from_entropy();
1139
1140 #[cfg(not(test))]
1143 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1144 authority_to_blocks
1145 .iter()
1146 .choose_multiple(
1147 &mut rng,
1148 MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1149 )
1150 .into_iter()
1151 .map(|(&peer, blocks)| {
1152 let limited_blocks = blocks
1153 .iter()
1154 .copied()
1155 .take(context.parameters.max_blocks_per_sync)
1156 .collect();
1157 (peer, limited_blocks, "periodic_known")
1158 })
1159 .collect();
1160 #[cfg(test)]
1161 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1163 let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1164 .iter()
1165 .map(|(&peer, blocks)| {
1166 let limited_blocks = blocks
1167 .iter()
1168 .copied()
1169 .take(context.parameters.max_blocks_per_sync)
1170 .collect();
1171 (peer, limited_blocks, "periodic_known")
1172 })
1173 .collect();
1174 items.sort_by_key(|(peer, _, _)| *peer);
1177 items
1178 .into_iter()
1179 .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1180 .collect()
1181 };
1182
1183 let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1186 .iter()
1187 .map(|(peer, _, _)| *peer)
1188 .collect();
1189
1190 let random_candidates: Vec<_> = context
1191 .committee
1192 .authorities()
1193 .filter_map(|(peer_index, _)| {
1194 (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1195 .then_some(peer_index)
1196 })
1197 .collect();
1198 #[cfg(test)]
1199 let random_peers: Vec<AuthorityIndex> = random_candidates
1200 .into_iter()
1201 .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1202 .collect();
1203 #[cfg(not(test))]
1204 let random_peers: Vec<AuthorityIndex> = random_candidates
1205 .into_iter()
1206 .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1207
1208 #[cfg_attr(test, allow(unused_mut))]
1209 let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1210 #[cfg(not(test))]
1213 all_missing_blocks.shuffle(&mut rng);
1214
1215 let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1216
1217 for peer in random_peers {
1218 if let Some(chunk) = block_chunks.next() {
1219 chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1220 } else {
1221 break;
1222 }
1223 }
1224
1225 let mut request_futures = FuturesUnordered::new();
1226
1227 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1228
1229 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1231 for block in &all_missing_blocks {
1232 missing_blocks_per_authority[block.author] += 1;
1233 }
1234 for (missing, (_, authority)) in missing_blocks_per_authority
1235 .into_iter()
1236 .zip(context.committee.authorities())
1237 {
1238 context
1239 .metrics
1240 .node_metrics
1241 .synchronizer_missing_blocks_by_authority
1242 .with_label_values(&[&authority.hostname])
1243 .inc_by(missing as u64);
1244 context
1245 .metrics
1246 .node_metrics
1247 .synchronizer_current_missing_blocks_by_authority
1248 .with_label_values(&[&authority.hostname])
1249 .set(missing as i64);
1250 }
1251
1252 #[cfg_attr(test, expect(unused_mut))]
1255 let mut remaining_peers: Vec<_> = context
1256 .committee
1257 .authorities()
1258 .filter_map(|(peer_index, _)| {
1259 if peer_index != context.own_index
1260 && !chosen_peers_with_blocks
1261 .iter()
1262 .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1263 {
1264 Some(peer_index)
1265 } else {
1266 None
1267 }
1268 })
1269 .collect();
1270
1271 #[cfg(not(test))]
1272 remaining_peers.shuffle(&mut rng);
1273 let mut remaining_peers = remaining_peers.into_iter();
1274
1275 for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1277 let peer_hostname = &context.committee.authority(peer).hostname;
1278 let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1279
1280 if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1283 info!(
1284 "Periodic sync of {} missing blocks from peer {} {}: {}",
1285 block_refs.len(),
1286 peer,
1287 peer_hostname,
1288 block_refs
1289 .iter()
1290 .map(|b| b.to_string())
1291 .collect::<Vec<_>>()
1292 .join(", ")
1293 );
1294 let metrics = &context.metrics.node_metrics;
1296 metrics
1297 .synchronizer_requested_blocks_by_peer
1298 .with_label_values(&[peer_hostname.as_str(), label])
1299 .inc_by(block_refs.len() as u64);
1300 for block_ref in &block_refs {
1301 let block_hostname = &context.committee.authority(block_ref.author).hostname;
1302 metrics
1303 .synchronizer_requested_blocks_by_authority
1304 .with_label_values(&[block_hostname.as_str(), label])
1305 .inc();
1306 }
1307 request_futures.push(Self::fetch_blocks_request(
1308 network_client.clone(),
1309 peer,
1310 blocks_guard,
1311 highest_rounds.clone(),
1312 FETCH_REQUEST_TIMEOUT,
1313 1,
1314 ));
1315 }
1316 }
1317
1318 let mut results = Vec::new();
1319 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1320
1321 tokio::pin!(fetcher_timeout);
1322
1323 loop {
1324 tokio::select! {
1325 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1326 let peer_hostname = &context.committee.authority(peer_index).hostname;
1327 match response {
1328 Ok(fetched_blocks) => {
1329 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1330 results.push((blocks_guard, fetched_blocks, peer_index));
1331
1332 if request_futures.is_empty() {
1334 break;
1335 }
1336 },
1337 Err(_) => {
1338 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1339 if let Some(next_peer) = remaining_peers.next() {
1341 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1343 info!(
1344 "Retrying syncing {} missing blocks from peer {}: {}",
1345 blocks_guard.block_refs.len(),
1346 peer_hostname,
1347 blocks_guard.block_refs
1348 .iter()
1349 .map(|b| b.to_string())
1350 .collect::<Vec<_>>()
1351 .join(", ")
1352 );
1353 let block_refs = blocks_guard.block_refs.clone();
1354 let metrics = &context.metrics.node_metrics;
1356 metrics
1357 .synchronizer_requested_blocks_by_peer
1358 .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1359 .inc_by(block_refs.len() as u64);
1360 for block_ref in &block_refs {
1361 let block_hostname =
1362 &context.committee.authority(block_ref.author).hostname;
1363 metrics
1364 .synchronizer_requested_blocks_by_authority
1365 .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1366 .inc();
1367 }
1368 request_futures.push(Self::fetch_blocks_request(
1369 network_client.clone(),
1370 next_peer,
1371 blocks_guard,
1372 highest_rounds,
1373 FETCH_REQUEST_TIMEOUT,
1374 1,
1375 ));
1376 } else {
1377 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1378 }
1379 } else {
1380 debug!("No more peers left to fetch blocks");
1381 }
1382 }
1383 }
1384 },
1385 _ = &mut fetcher_timeout => {
1386 debug!("Timed out while fetching missing blocks");
1387 break;
1388 }
1389 }
1390 }
1391
1392 results
1393 }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398 use std::{
1399 collections::{BTreeMap, BTreeSet},
1400 sync::Arc,
1401 time::Duration,
1402 };
1403
1404 use async_trait::async_trait;
1405 use bytes::Bytes;
1406 use consensus_config::{AuthorityIndex, Parameters};
1407 use iota_metrics::monitored_mpsc;
1408 use parking_lot::RwLock;
1409 use tokio::{sync::Mutex, time::sleep};
1410
1411 use crate::{
1412 CommitDigest, CommitIndex,
1413 authority_service::COMMIT_LAG_MULTIPLIER,
1414 block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1415 block_verifier::NoopBlockVerifier,
1416 commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1417 commit_vote_monitor::CommitVoteMonitor,
1418 context::Context,
1419 core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1420 dag_state::DagState,
1421 error::{ConsensusError, ConsensusResult},
1422 network::{BlockStream, NetworkClient},
1423 round_prober::QuorumRound,
1424 storage::mem_store::MemStore,
1425 synchronizer::{
1426 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1427 },
1428 };
1429
1430 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1431 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1432 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1433 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1434
1435 #[derive(Default)]
1436 struct MockNetworkClient {
1437 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1438 fetch_latest_blocks_requests:
1439 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1440 }
1441
1442 impl MockNetworkClient {
1443 async fn stub_fetch_blocks(
1444 &self,
1445 blocks: Vec<VerifiedBlock>,
1446 peer: AuthorityIndex,
1447 latency: Option<Duration>,
1448 ) {
1449 let mut lock = self.fetch_blocks_requests.lock().await;
1450 let block_refs = blocks
1451 .iter()
1452 .map(|block| block.reference())
1453 .collect::<Vec<_>>();
1454 lock.insert((block_refs, peer), (blocks, latency));
1455 }
1456
1457 async fn stub_fetch_latest_blocks(
1458 &self,
1459 blocks: Vec<VerifiedBlock>,
1460 peer: AuthorityIndex,
1461 authorities: Vec<AuthorityIndex>,
1462 latency: Option<Duration>,
1463 ) {
1464 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1465 lock.entry((peer, authorities))
1466 .or_default()
1467 .push((blocks, latency));
1468 }
1469
1470 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1471 let lock = self.fetch_latest_blocks_requests.lock().await;
1472 lock.len()
1473 }
1474 }
1475
1476 #[async_trait]
1477 impl NetworkClient for MockNetworkClient {
1478 const SUPPORT_STREAMING: bool = false;
1479
1480 async fn send_block(
1481 &self,
1482 _peer: AuthorityIndex,
1483 _serialized_block: &VerifiedBlock,
1484 _timeout: Duration,
1485 ) -> ConsensusResult<()> {
1486 unimplemented!("Unimplemented")
1487 }
1488
1489 async fn subscribe_blocks(
1490 &self,
1491 _peer: AuthorityIndex,
1492 _last_received: Round,
1493 _timeout: Duration,
1494 ) -> ConsensusResult<BlockStream> {
1495 unimplemented!("Unimplemented")
1496 }
1497
1498 async fn fetch_blocks(
1499 &self,
1500 peer: AuthorityIndex,
1501 block_refs: Vec<BlockRef>,
1502 _highest_accepted_rounds: Vec<Round>,
1503 _timeout: Duration,
1504 ) -> ConsensusResult<Vec<Bytes>> {
1505 let mut lock = self.fetch_blocks_requests.lock().await;
1506 let response = lock
1507 .remove(&(block_refs, peer))
1508 .expect("Unexpected fetch blocks request made");
1509
1510 let serialised = response
1511 .0
1512 .into_iter()
1513 .map(|block| block.serialized().clone())
1514 .collect::<Vec<_>>();
1515
1516 drop(lock);
1517
1518 if let Some(latency) = response.1 {
1519 sleep(latency).await;
1520 }
1521
1522 Ok(serialised)
1523 }
1524
1525 async fn fetch_commits(
1526 &self,
1527 _peer: AuthorityIndex,
1528 _commit_range: CommitRange,
1529 _timeout: Duration,
1530 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1531 unimplemented!("Unimplemented")
1532 }
1533
1534 async fn fetch_latest_blocks(
1535 &self,
1536 peer: AuthorityIndex,
1537 authorities: Vec<AuthorityIndex>,
1538 _timeout: Duration,
1539 ) -> ConsensusResult<Vec<Bytes>> {
1540 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1541 let mut responses = lock
1542 .remove(&(peer, authorities.clone()))
1543 .expect("Unexpected fetch blocks request made");
1544
1545 let response = responses.remove(0);
1546 let serialised = response
1547 .0
1548 .into_iter()
1549 .map(|block| block.serialized().clone())
1550 .collect::<Vec<_>>();
1551
1552 if !responses.is_empty() {
1553 lock.insert((peer, authorities), responses);
1554 }
1555
1556 drop(lock);
1557
1558 if let Some(latency) = response.1 {
1559 sleep(latency).await;
1560 }
1561
1562 Ok(serialised)
1563 }
1564
1565 async fn get_latest_rounds(
1566 &self,
1567 _peer: AuthorityIndex,
1568 _timeout: Duration,
1569 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1570 unimplemented!("Unimplemented")
1571 }
1572 }
1573
1574 #[test]
1575 fn test_inflight_blocks_map() {
1576 let map = InflightBlocksMap::new();
1578 let some_block_refs = [
1579 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1580 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1581 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1582 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1583 ];
1584 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1585
1586 {
1588 let mut all_guards = Vec::new();
1589
1590 for i in 1..=3 {
1592 let authority = AuthorityIndex::new_for_test(i);
1593
1594 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1595 let guard = guard.expect("Guard should be created");
1596 assert_eq!(guard.block_refs.len(), 4);
1597
1598 all_guards.push(guard);
1599
1600 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1602 assert!(guard.is_none());
1603 }
1604
1605 let authority_4 = AuthorityIndex::new_for_test(4);
1608
1609 let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1610 assert!(guard.is_none());
1611
1612 drop(all_guards.remove(0));
1615
1616 let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1617 let guard = guard.expect("Guard should be successfully acquired");
1618
1619 assert_eq!(guard.block_refs, missing_block_refs);
1620
1621 drop(guard);
1623 drop(all_guards);
1624
1625 assert_eq!(map.num_of_locked_blocks(), 0);
1626 }
1627
1628 {
1630 let authority_1 = AuthorityIndex::new_for_test(1);
1632 let guard = map
1633 .lock_blocks(missing_block_refs.clone(), authority_1)
1634 .unwrap();
1635
1636 let authority_2 = AuthorityIndex::new_for_test(2);
1638 let guard = map.swap_locks(guard, authority_2);
1639
1640 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1641 }
1642 }
1643
1644 #[tokio::test]
1645 async fn test_process_fetched_blocks() {
1646 let (context, _) = Context::new_for_test(4);
1648 let context = Arc::new(context);
1649 let block_verifier = Arc::new(NoopBlockVerifier {});
1650 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1651 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1652 let (commands_sender, _commands_receiver) =
1653 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1654
1655 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
1659 expected_blocks.extend(
1660 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
1661 );
1662 assert_eq!(
1663 expected_blocks.len(),
1664 context.parameters.max_blocks_per_sync
1665 );
1666
1667 let expected_serialized_blocks = expected_blocks
1668 .iter()
1669 .map(|b| b.serialized().clone())
1670 .collect::<Vec<_>>();
1671
1672 let expected_block_refs = expected_blocks
1673 .iter()
1674 .map(|b| b.reference())
1675 .collect::<BTreeSet<_>>();
1676
1677 let peer_index = AuthorityIndex::new_for_test(2);
1679
1680 let inflight_blocks_map = InflightBlocksMap::new();
1682 let blocks_guard = inflight_blocks_map
1683 .lock_blocks(expected_block_refs.clone(), peer_index)
1684 .expect("Failed to lock blocks");
1685
1686 assert_eq!(
1687 inflight_blocks_map.num_of_locked_blocks(),
1688 expected_block_refs.len()
1689 );
1690
1691 let result = Synchronizer::<
1693 MockNetworkClient,
1694 NoopBlockVerifier,
1695 MockCoreThreadDispatcher,
1696 >::process_fetched_blocks(
1697 expected_serialized_blocks,
1698 peer_index,
1699 blocks_guard, core_dispatcher.clone(),
1701 block_verifier,
1702 commit_vote_monitor,
1703 context.clone(),
1704 commands_sender,
1705 "test",
1706 )
1707 .await;
1708
1709 assert!(result.is_ok());
1711
1712 let added_blocks = core_dispatcher.get_add_blocks().await;
1714 assert_eq!(
1715 added_blocks
1716 .iter()
1717 .map(|b| b.reference())
1718 .collect::<BTreeSet<_>>(),
1719 expected_block_refs,
1720 );
1721
1722 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
1724 }
1725
1726 #[tokio::test]
1727 async fn test_successful_fetch_blocks_from_peer() {
1728 let (context, _) = Context::new_for_test(4);
1730 let context = Arc::new(context);
1731 let block_verifier = Arc::new(NoopBlockVerifier {});
1732 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1733 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1734 let network_client = Arc::new(MockNetworkClient::default());
1735 let store = Arc::new(MemStore::new());
1736 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1737
1738 let handle = Synchronizer::start(
1739 network_client.clone(),
1740 context,
1741 core_dispatcher.clone(),
1742 commit_vote_monitor,
1743 block_verifier,
1744 dag_state,
1745 false,
1746 );
1747
1748 let expected_blocks = (0..10)
1750 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1751 .collect::<Vec<_>>();
1752 let missing_blocks = expected_blocks
1753 .iter()
1754 .map(|block| block.reference())
1755 .collect::<BTreeSet<_>>();
1756
1757 let peer = AuthorityIndex::new_for_test(1);
1759 network_client
1760 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1761 .await;
1762
1763 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1765
1766 sleep(Duration::from_millis(1_000)).await;
1768
1769 let added_blocks = core_dispatcher.get_add_blocks().await;
1771 assert_eq!(added_blocks, expected_blocks);
1772 }
1773
1774 #[tokio::test]
1775 async fn saturate_fetch_blocks_from_peer() {
1776 let (context, _) = Context::new_for_test(4);
1778 let context = Arc::new(context);
1779 let block_verifier = Arc::new(NoopBlockVerifier {});
1780 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1781 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1782 let network_client = Arc::new(MockNetworkClient::default());
1783 let store = Arc::new(MemStore::new());
1784 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1785
1786 let handle = Synchronizer::start(
1787 network_client.clone(),
1788 context,
1789 core_dispatcher.clone(),
1790 commit_vote_monitor,
1791 block_verifier,
1792 dag_state,
1793 false,
1794 );
1795
1796 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1798 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1799 .collect::<Vec<_>>();
1800
1801 let peer = AuthorityIndex::new_for_test(1);
1803 let mut iter = expected_blocks.iter().peekable();
1804 while let Some(block) = iter.next() {
1805 network_client
1808 .stub_fetch_blocks(
1809 vec![block.clone()],
1810 peer,
1811 Some(Duration::from_millis(5_000)),
1812 )
1813 .await;
1814
1815 let mut missing_blocks = BTreeSet::new();
1816 missing_blocks.insert(block.reference());
1817
1818 if iter.peek().is_none() {
1821 match handle.fetch_blocks(missing_blocks, peer).await {
1822 Err(ConsensusError::SynchronizerSaturated(index, _)) => {
1823 assert_eq!(index, peer);
1824 }
1825 _ => panic!("A saturated synchronizer error was expected"),
1826 }
1827 } else {
1828 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1829 }
1830 }
1831 }
1832
1833 #[tokio::test(flavor = "current_thread", start_paused = true)]
1834 async fn synchronizer_periodic_task_fetch_blocks() {
1835 let (context, _) = Context::new_for_test(4);
1837 let context = Arc::new(context);
1838 let block_verifier = Arc::new(NoopBlockVerifier {});
1839 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1840 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1841 let network_client = Arc::new(MockNetworkClient::default());
1842 let store = Arc::new(MemStore::new());
1843 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1844
1845 let expected_blocks = (0..10)
1847 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1848 .collect::<Vec<_>>();
1849 let missing_blocks = expected_blocks
1850 .iter()
1851 .map(|block| block.reference())
1852 .collect::<BTreeSet<_>>();
1853
1854 core_dispatcher
1856 .stub_missing_blocks(missing_blocks.clone())
1857 .await;
1858
1859 network_client
1863 .stub_fetch_blocks(
1864 expected_blocks.clone(),
1865 AuthorityIndex::new_for_test(1),
1866 Some(FETCH_REQUEST_TIMEOUT),
1867 )
1868 .await;
1869 network_client
1870 .stub_fetch_blocks(
1871 expected_blocks.clone(),
1872 AuthorityIndex::new_for_test(2),
1873 None,
1874 )
1875 .await;
1876
1877 let _handle = Synchronizer::start(
1879 network_client.clone(),
1880 context,
1881 core_dispatcher.clone(),
1882 commit_vote_monitor,
1883 block_verifier,
1884 dag_state,
1885 false,
1886 );
1887
1888 sleep(8 * FETCH_REQUEST_TIMEOUT).await;
1889
1890 let added_blocks = core_dispatcher.get_add_blocks().await;
1892 assert_eq!(added_blocks, expected_blocks);
1893
1894 assert!(
1896 core_dispatcher
1897 .get_missing_blocks()
1898 .await
1899 .unwrap()
1900 .is_empty()
1901 );
1902 }
1903
1904 #[tokio::test(flavor = "current_thread", start_paused = true)]
1905 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1906 let (mut context, _) = Context::new_for_test(4);
1908 context
1909 .protocol_config
1910 .set_consensus_batched_block_sync_for_testing(true);
1911 let context = Arc::new(context);
1912 let block_verifier = Arc::new(NoopBlockVerifier {});
1913 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1914 let network_client = Arc::new(MockNetworkClient::default());
1915 let store = Arc::new(MemStore::new());
1916 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1917 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1918
1919 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1922 let stub_blocks = (sync_missing_block_round_threshold * 2
1923 ..sync_missing_block_round_threshold * 2
1924 + context.parameters.max_blocks_per_sync as u32)
1925 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1926 .collect::<Vec<_>>();
1927 let missing_blocks = stub_blocks
1928 .iter()
1929 .map(|block| block.reference())
1930 .collect::<BTreeSet<_>>();
1931 core_dispatcher
1932 .stub_missing_blocks(missing_blocks.clone())
1933 .await;
1934 let mut expected_blocks = stub_blocks
1938 .iter()
1939 .take(context.parameters.max_blocks_per_sync)
1940 .cloned()
1941 .collect::<Vec<_>>();
1942 network_client
1943 .stub_fetch_blocks(
1944 expected_blocks.clone(),
1945 AuthorityIndex::new_for_test(1),
1946 Some(FETCH_REQUEST_TIMEOUT),
1947 )
1948 .await;
1949 network_client
1950 .stub_fetch_blocks(
1951 expected_blocks.clone(),
1952 AuthorityIndex::new_for_test(2),
1953 None,
1954 )
1955 .await;
1956
1957 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1959 let commit_index: CommitIndex = round - 1;
1960 let blocks = (0..4)
1961 .map(|authority| {
1962 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1963 let block = TestBlock::new(round, authority)
1964 .set_commit_votes(commit_votes)
1965 .build();
1966
1967 VerifiedBlock::new_for_test(block)
1968 })
1969 .collect::<Vec<_>>();
1970
1971 for block in blocks {
1974 commit_vote_monitor.observe_block(&block);
1975 }
1976
1977 let _handle = Synchronizer::start(
1980 network_client.clone(),
1981 context.clone(),
1982 core_dispatcher.clone(),
1983 commit_vote_monitor.clone(),
1984 block_verifier,
1985 dag_state.clone(),
1986 false,
1987 );
1988
1989 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1990
1991 let added_blocks = core_dispatcher.get_add_blocks().await;
1994 assert_eq!(added_blocks, vec![]);
1995
1996 println!("Before advancing");
1997 {
2000 let mut d = dag_state.write();
2001 for index in 1..=commit_index {
2002 let commit =
2003 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2004
2005 d.add_commit(commit);
2006 }
2007
2008 println!("Once advanced");
2009 assert_eq!(
2010 d.last_commit_index(),
2011 commit_vote_monitor.quorum_commit_index()
2012 );
2013 }
2014
2015 core_dispatcher
2017 .stub_missing_blocks(missing_blocks.clone())
2018 .await;
2019
2020 println!("Final sleep");
2021 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2022
2023 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2025 println!("Final await");
2026 added_blocks.sort_by_key(|block| block.reference());
2027 expected_blocks.sort_by_key(|block| block.reference());
2028
2029 assert_eq!(added_blocks, expected_blocks);
2030 }
2031
2032 #[tokio::test(flavor = "current_thread", start_paused = true)]
2033 async fn synchronizer_fetch_own_last_block() {
2034 let (context, _) = Context::new_for_test(4);
2036 let context = Arc::new(context.with_parameters(Parameters {
2037 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2038 ..Default::default()
2039 }));
2040 let block_verifier = Arc::new(NoopBlockVerifier {});
2041 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2042 let network_client = Arc::new(MockNetworkClient::default());
2043 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2044 let store = Arc::new(MemStore::new());
2045 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2046 let our_index = AuthorityIndex::new_for_test(0);
2047
2048 let mut expected_blocks = (8..=10)
2050 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2051 .collect::<Vec<_>>();
2052
2053 let block_1 = expected_blocks.pop().unwrap();
2056 network_client
2057 .stub_fetch_latest_blocks(
2058 vec![block_1.clone()],
2059 AuthorityIndex::new_for_test(1),
2060 vec![our_index],
2061 Some(Duration::from_secs(10)),
2062 )
2063 .await;
2064 network_client
2065 .stub_fetch_latest_blocks(
2066 vec![block_1],
2067 AuthorityIndex::new_for_test(1),
2068 vec![our_index],
2069 None,
2070 )
2071 .await;
2072
2073 let block_2 = expected_blocks.pop().unwrap();
2075 network_client
2076 .stub_fetch_latest_blocks(
2077 vec![block_2.clone()],
2078 AuthorityIndex::new_for_test(2),
2079 vec![our_index],
2080 Some(Duration::from_secs(10)),
2081 )
2082 .await;
2083 network_client
2084 .stub_fetch_latest_blocks(
2085 vec![block_2],
2086 AuthorityIndex::new_for_test(2),
2087 vec![our_index],
2088 None,
2089 )
2090 .await;
2091
2092 let block_3 = expected_blocks.pop().unwrap();
2094 network_client
2095 .stub_fetch_latest_blocks(
2096 vec![block_3.clone()],
2097 AuthorityIndex::new_for_test(3),
2098 vec![our_index],
2099 Some(Duration::from_secs(10)),
2100 )
2101 .await;
2102 network_client
2103 .stub_fetch_latest_blocks(
2104 vec![block_3],
2105 AuthorityIndex::new_for_test(3),
2106 vec![our_index],
2107 None,
2108 )
2109 .await;
2110
2111 let handle = Synchronizer::start(
2113 network_client.clone(),
2114 context.clone(),
2115 core_dispatcher.clone(),
2116 commit_vote_monitor,
2117 block_verifier,
2118 dag_state,
2119 true,
2120 );
2121
2122 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2124
2125 assert_eq!(
2127 core_dispatcher.get_last_own_proposed_round().await,
2128 vec![10]
2129 );
2130
2131 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2133
2134 assert_eq!(
2136 context
2137 .metrics
2138 .node_metrics
2139 .sync_last_known_own_block_retries
2140 .get(),
2141 1
2142 );
2143
2144 if let Err(err) = handle.stop().await {
2146 if err.is_panic() {
2147 std::panic::resume_unwind(err.into_panic());
2148 }
2149 }
2150 }
2151 #[derive(Default)]
2152 struct SyncMockDispatcher {
2153 missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2154 added_blocks: Mutex<Vec<VerifiedBlock>>,
2155 }
2156
2157 #[async_trait::async_trait]
2158 impl CoreThreadDispatcher for SyncMockDispatcher {
2159 async fn get_missing_blocks(
2160 &self,
2161 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2162 Ok(self.missing_blocks.lock().await.clone())
2163 }
2164 async fn add_blocks(
2165 &self,
2166 blocks: Vec<VerifiedBlock>,
2167 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2168 let mut guard = self.added_blocks.lock().await;
2169 guard.extend(blocks.clone());
2170 Ok(blocks.iter().map(|b| b.reference()).collect())
2171 }
2172
2173 async fn check_block_refs(
2176 &self,
2177 block_refs: Vec<BlockRef>,
2178 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2179 Ok(block_refs.into_iter().collect())
2181 }
2182
2183 async fn add_certified_commits(
2184 &self,
2185 _commits: CertifiedCommits,
2186 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2187 Ok(BTreeSet::new())
2189 }
2190
2191 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2192 Ok(())
2193 }
2194
2195 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2196 Ok(())
2197 }
2198
2199 fn set_propagation_delay_and_quorum_rounds(
2200 &self,
2201 _delay: Round,
2202 _received_quorum_rounds: Vec<QuorumRound>,
2203 _accepted_quorum_rounds: Vec<QuorumRound>,
2204 ) -> Result<(), CoreError> {
2205 Ok(())
2206 }
2207
2208 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2209 Ok(())
2210 }
2211
2212 fn highest_received_rounds(&self) -> Vec<Round> {
2213 Vec::new()
2214 }
2215 }
2216
2217 #[tokio::test(flavor = "current_thread")]
2218 async fn known_before_random_peer_fetch() {
2219 {
2220 let (ctx, _) = Context::new_for_test(10);
2222 let context = Arc::new(ctx);
2223 let store = Arc::new(MemStore::new());
2224 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2225 let inflight = InflightBlocksMap::new();
2226
2227 let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2229 let missing_ref = missing_vb.reference();
2230 let missing_blocks = BTreeMap::from([(
2231 missing_ref,
2232 BTreeSet::from([
2233 AuthorityIndex::new_for_test(2),
2234 AuthorityIndex::new_for_test(3),
2235 AuthorityIndex::new_for_test(4),
2236 ]),
2237 )]);
2238
2239 let network_client = Arc::new(MockNetworkClient::default());
2241 for i in 1..=9 {
2243 let peer = AuthorityIndex::new_for_test(i);
2244 if i == 1 || i == 4 {
2245 network_client
2246 .stub_fetch_blocks(
2247 vec![missing_vb.clone()],
2248 peer,
2249 Some(2 * FETCH_REQUEST_TIMEOUT),
2250 )
2251 .await;
2252 continue;
2253 }
2254 network_client
2255 .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2256 .await;
2257 }
2258
2259 let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2262 ::fetch_blocks_from_authorities(
2263 context.clone(),
2264 inflight.clone(),
2265 network_client.clone(),
2266 missing_blocks,
2267 dag_state.clone(),
2268 )
2269 .await;
2270
2271 assert_eq!(results.len(), 2);
2274
2275 let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2277 assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2278 let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2279 assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2280 let expected = missing_vb.serialized().clone();
2282 assert_eq!(hot_bytes, &vec![expected]);
2283 }
2284 }
2285
2286 #[tokio::test(flavor = "current_thread")]
2287 async fn known_before_periodic_peer_fetch_larger_scenario() {
2288 use std::{
2289 collections::{BTreeMap, BTreeSet},
2290 sync::Arc,
2291 };
2292
2293 use parking_lot::RwLock;
2294
2295 use crate::{
2296 block::{Round, TestBlock, VerifiedBlock},
2297 context::Context,
2298 };
2299
2300 let (ctx, _) = Context::new_for_test(10);
2302 let context = Arc::new(ctx);
2303 let store = Arc::new(MemStore::new());
2304 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2305 let inflight = InflightBlocksMap::new();
2306 let network_client = Arc::new(MockNetworkClient::default());
2307
2308 let mut missing_blocks = BTreeMap::new();
2310 let mut missing_vbs = Vec::new();
2311 let known_number_blocks = 10;
2312 for i in 0..1000 {
2313 let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2314 let r = vb.reference();
2315 if i < known_number_blocks {
2316 missing_blocks.insert(
2318 r,
2319 BTreeSet::from([
2320 AuthorityIndex::new_for_test(0),
2321 AuthorityIndex::new_for_test(2),
2322 ]),
2323 );
2324 } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2325 missing_blocks.insert(
2327 r,
2328 BTreeSet::from([
2329 AuthorityIndex::new_for_test(0),
2330 AuthorityIndex::new_for_test(3),
2331 ]),
2332 );
2333 } else {
2334 missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2336 }
2337 missing_vbs.push(vb);
2338 }
2339
2340 let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2342 let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2343 .iter()
2344 .map(|&peer| {
2345 let vbs = missing_vbs
2346 .iter()
2347 .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2348 .take(context.parameters.max_blocks_per_sync)
2349 .cloned()
2350 .collect::<Vec<_>>();
2351 (peer, vbs)
2352 })
2353 .collect();
2354
2355 for (peer, vbs) in known_vbs_by_peer {
2356 if peer == AuthorityIndex::new_for_test(2) {
2357 network_client
2359 .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2360 .await;
2361 network_client
2362 .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2363 .await;
2364 } else {
2365 network_client
2366 .stub_fetch_blocks(vbs.clone(), peer, None)
2367 .await;
2368 }
2369 }
2370
2371 network_client
2373 .stub_fetch_blocks(
2374 missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2375 AuthorityIndex::new_for_test(1),
2376 None,
2377 )
2378 .await;
2379
2380 network_client
2381 .stub_fetch_blocks(
2382 missing_vbs[context.parameters.max_blocks_per_sync
2383 ..2 * context.parameters.max_blocks_per_sync]
2384 .to_vec(),
2385 AuthorityIndex::new_for_test(4),
2386 None,
2387 )
2388 .await;
2389
2390 let results = Synchronizer::<
2392 MockNetworkClient,
2393 NoopBlockVerifier,
2394 SyncMockDispatcher,
2395 >::fetch_blocks_from_authorities(
2396 context.clone(),
2397 inflight.clone(),
2398 network_client.clone(),
2399 missing_blocks,
2400 dag_state.clone(),
2401 )
2402 .await;
2403
2404 assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2407
2408 let (_guard3, bytes3, peer3) = &results[0];
2410 assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
2411 let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
2412 .iter()
2413 .map(|vb| vb.serialized().clone())
2414 .collect::<Vec<_>>();
2415 assert_eq!(bytes3, &expected2);
2416
2417 let (_guard1, bytes1, peer1) = &results[1];
2419 assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
2420 let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
2421 .iter()
2422 .map(|vb| vb.serialized().clone())
2423 .collect::<Vec<_>>();
2424 assert_eq!(bytes1, &expected1);
2425
2426 let (_guard4, bytes4, peer4) = &results[2];
2428 assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
2429 let expected4 = missing_vbs
2430 [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
2431 .iter()
2432 .map(|vb| vb.serialized().clone())
2433 .collect::<Vec<_>>();
2434 assert_eq!(bytes4, &expected4);
2435
2436 let (_guard5, bytes5, peer5) = &results[3];
2438 assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
2439 let expected5 = missing_vbs[0..known_number_blocks]
2440 .iter()
2441 .map(|vb| vb.serialized().clone())
2442 .collect::<Vec<_>>();
2443 assert_eq!(bytes5, &expected5);
2444 }
2445}