1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7 sync::Arc,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
24use tap::TapFallible;
25use tokio::{
26 runtime::Handle,
27 sync::{mpsc::error::TrySendError, oneshot},
28 task::{JoinError, JoinSet},
29 time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34 BlockAPI, CommitIndex, Round,
35 authority_service::COMMIT_LAG_MULTIPLIER,
36 block::{BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
37 block_verifier::BlockVerifier,
38 commit_vote_monitor::CommitVoteMonitor,
39 context::Context,
40 core_thread::CoreThreadDispatcher,
41 dag_state::DagState,
42 error::{ConsensusError, ConsensusResult},
43 network::NetworkClient,
44};
45
46const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
53
54const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
56
57const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
59
60const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 3;
64
65const MAX_PERIODIC_SYNC_PEERS: usize = 4;
68
69const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
73
74struct BlocksGuard {
75 map: Arc<InflightBlocksMap>,
76 block_refs: BTreeSet<BlockRef>,
77 peer: AuthorityIndex,
78}
79
80impl Drop for BlocksGuard {
81 fn drop(&mut self) {
82 self.map.unlock_blocks(&self.block_refs, self.peer);
83 }
84}
85
86struct InflightBlocksMap {
92 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
93}
94
95impl InflightBlocksMap {
96 fn new() -> Arc<Self> {
97 Arc::new(Self {
98 inner: Mutex::new(HashMap::new()),
99 })
100 }
101
102 fn lock_blocks(
110 self: &Arc<Self>,
111 missing_block_refs: BTreeSet<BlockRef>,
112 peer: AuthorityIndex,
113 ) -> Option<BlocksGuard> {
114 let mut blocks = BTreeSet::new();
115 let mut inner = self.inner.lock();
116
117 for block_ref in missing_block_refs {
118 let authorities = inner.entry(block_ref).or_default();
122 if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
123 && authorities.get(&peer).is_none()
124 {
125 assert!(authorities.insert(peer));
126 blocks.insert(block_ref);
127 }
128 }
129
130 if blocks.is_empty() {
131 None
132 } else {
133 Some(BlocksGuard {
134 map: self.clone(),
135 block_refs: blocks,
136 peer,
137 })
138 }
139 }
140
141 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
146 let mut blocks_to_fetch = self.inner.lock();
148 for block_ref in block_refs {
149 let authorities = blocks_to_fetch
150 .get_mut(block_ref)
151 .expect("Should have found a non empty map");
152
153 assert!(authorities.remove(&peer), "Peer index should be present!");
154
155 if authorities.is_empty() {
157 blocks_to_fetch.remove(block_ref);
158 }
159 }
160 }
161
162 fn swap_locks(
167 self: &Arc<Self>,
168 blocks_guard: BlocksGuard,
169 peer: AuthorityIndex,
170 ) -> Option<BlocksGuard> {
171 let block_refs = blocks_guard.block_refs.clone();
172
173 drop(blocks_guard);
175
176 self.lock_blocks(block_refs, peer)
178 }
179
180 #[cfg(test)]
181 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
182 let inner = self.inner.lock();
183 inner.len()
184 }
185}
186
187enum Command {
188 FetchBlocks {
189 missing_block_refs: BTreeSet<BlockRef>,
190 peer_index: AuthorityIndex,
191 result: oneshot::Sender<Result<(), ConsensusError>>,
192 },
193 FetchOwnLastBlock,
194 KickOffScheduler,
195}
196
197pub(crate) struct SynchronizerHandle {
198 commands_sender: Sender<Command>,
199 tasks: tokio::sync::Mutex<JoinSet<()>>,
200}
201
202impl SynchronizerHandle {
203 pub(crate) async fn fetch_blocks(
206 &self,
207 missing_block_refs: BTreeSet<BlockRef>,
208 peer_index: AuthorityIndex,
209 ) -> ConsensusResult<()> {
210 let (sender, receiver) = oneshot::channel();
211 self.commands_sender
212 .send(Command::FetchBlocks {
213 missing_block_refs,
214 peer_index,
215 result: sender,
216 })
217 .await
218 .map_err(|_err| ConsensusError::Shutdown)?;
219 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
220 }
221
222 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
223 let mut tasks = self.tasks.lock().await;
224 tasks.abort_all();
225 while let Some(result) = tasks.join_next().await {
226 result?
227 }
228 Ok(())
229 }
230}
231
232pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
259 context: Arc<Context>,
260 commands_receiver: Receiver<Command>,
261 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
262 core_dispatcher: Arc<D>,
263 commit_vote_monitor: Arc<CommitVoteMonitor>,
264 dag_state: Arc<RwLock<DagState>>,
265 fetch_blocks_scheduler_task: JoinSet<()>,
266 fetch_own_last_block_task: JoinSet<()>,
267 network_client: Arc<C>,
268 block_verifier: Arc<V>,
269 inflight_blocks_map: Arc<InflightBlocksMap>,
270 commands_sender: Sender<Command>,
271}
272
273impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
274 pub fn start(
277 network_client: Arc<C>,
278 context: Arc<Context>,
279 core_dispatcher: Arc<D>,
280 commit_vote_monitor: Arc<CommitVoteMonitor>,
281 block_verifier: Arc<V>,
282 dag_state: Arc<RwLock<DagState>>,
283 sync_last_known_own_block: bool,
284 ) -> Arc<SynchronizerHandle> {
285 let (commands_sender, commands_receiver) =
286 channel("consensus_synchronizer_commands", 1_000);
287 let inflight_blocks_map = InflightBlocksMap::new();
288
289 let mut fetch_block_senders = BTreeMap::new();
291 let mut tasks = JoinSet::new();
292 for (index, _) in context.committee.authorities() {
293 if index == context.own_index {
294 continue;
295 }
296 let (sender, receiver) =
297 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
298 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
299 index,
300 network_client.clone(),
301 block_verifier.clone(),
302 commit_vote_monitor.clone(),
303 context.clone(),
304 core_dispatcher.clone(),
305 dag_state.clone(),
306 receiver,
307 commands_sender.clone(),
308 );
309 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
310 fetch_block_senders.insert(index, sender);
311 }
312
313 let commands_sender_clone = commands_sender.clone();
314
315 if sync_last_known_own_block {
316 commands_sender
317 .try_send(Command::FetchOwnLastBlock)
318 .expect("Failed to sync our last block");
319 }
320
321 tasks.spawn(monitored_future!(async move {
323 let mut s = Self {
324 context,
325 commands_receiver,
326 fetch_block_senders,
327 core_dispatcher,
328 commit_vote_monitor,
329 fetch_blocks_scheduler_task: JoinSet::new(),
330 fetch_own_last_block_task: JoinSet::new(),
331 network_client,
332 block_verifier,
333 inflight_blocks_map,
334 commands_sender: commands_sender_clone,
335 dag_state,
336 };
337 s.run().await;
338 }));
339
340 Arc::new(SynchronizerHandle {
341 commands_sender,
342 tasks: tokio::sync::Mutex::new(tasks),
343 })
344 }
345
346 async fn run(&mut self) {
348 const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
351 let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
352
353 tokio::pin!(scheduler_timeout);
354
355 loop {
356 tokio::select! {
357 Some(command) = self.commands_receiver.recv() => {
358 match command {
359 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
360 if peer_index == self.context.own_index {
361 error!("We should never attempt to fetch blocks from our own node");
362 continue;
363 }
364
365 let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
366
367 let missing_block_refs = missing_block_refs
370 .into_iter()
371 .take(self.context.parameters.max_blocks_per_sync)
372 .collect();
373
374 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
375 let Some(blocks_guard) = blocks_guard else {
376 result.send(Ok(())).ok();
377 continue;
378 };
379
380 let r = self
383 .fetch_block_senders
384 .get(&peer_index)
385 .expect("Fatal error, sender should be present")
386 .try_send(blocks_guard)
387 .map_err(|err| {
388 match err {
389 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
390 TrySendError::Closed(_) => ConsensusError::Shutdown
391 }
392 });
393
394 result.send(r).ok();
395 }
396 Command::FetchOwnLastBlock => {
397 if self.fetch_own_last_block_task.is_empty() {
398 self.start_fetch_own_last_block_task();
399 }
400 }
401 Command::KickOffScheduler => {
402 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
405 Instant::now()
406 } else {
407 Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
408 };
409
410 if timeout < scheduler_timeout.deadline() {
412 scheduler_timeout.as_mut().reset(timeout);
413 }
414 }
415 }
416 },
417 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
418 match result {
419 Ok(()) => {},
420 Err(e) => {
421 if e.is_cancelled() {
422 } else if e.is_panic() {
423 std::panic::resume_unwind(e.into_panic());
424 } else {
425 panic!("fetch our last block task failed: {e}");
426 }
427 },
428 };
429 },
430 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
431 match result {
432 Ok(()) => {},
433 Err(e) => {
434 if e.is_cancelled() {
435 } else if e.is_panic() {
436 std::panic::resume_unwind(e.into_panic());
437 } else {
438 panic!("fetch blocks scheduler task failed: {e}");
439 }
440 },
441 };
442 },
443 () = &mut scheduler_timeout => {
444 if self.fetch_blocks_scheduler_task.is_empty() {
446 if let Err(err) = self.start_fetch_missing_blocks_task().await {
447 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
448 return;
449 };
450 }
451
452 scheduler_timeout
453 .as_mut()
454 .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
455 }
456 }
457 }
458 }
459
460 async fn fetch_blocks_from_authority(
461 peer_index: AuthorityIndex,
462 network_client: Arc<C>,
463 block_verifier: Arc<V>,
464 commit_vote_monitor: Arc<CommitVoteMonitor>,
465 context: Arc<Context>,
466 core_dispatcher: Arc<D>,
467 dag_state: Arc<RwLock<DagState>>,
468 mut receiver: Receiver<BlocksGuard>,
469 commands_sender: Sender<Command>,
470 ) {
471 const MAX_RETRIES: u32 = 3;
472 let peer_hostname = &context.committee.authority(peer_index).hostname;
473
474 let mut requests = FuturesUnordered::new();
475
476 loop {
477 tokio::select! {
478 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
479 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
481
482 let metrics = &context.metrics.node_metrics;
484 metrics
485 .synchronizer_requested_blocks_by_peer
486 .with_label_values(&[peer_hostname.as_str(), "live"])
487 .inc_by(blocks_guard.block_refs.len() as u64);
488 let mut authors = HashSet::new();
490 for block_ref in &blocks_guard.block_refs {
491 authors.insert(block_ref.author);
492 }
493 for author in authors {
494 let host = &context.committee.authority(author).hostname;
495 metrics
496 .synchronizer_requested_blocks_by_authority
497 .with_label_values(&[host.as_str(), "live"])
498 .inc();
499 }
500
501 requests.push(Self::fetch_blocks_request(
502 network_client.clone(),
503 peer_index,
504 blocks_guard,
505 highest_rounds,
506 FETCH_REQUEST_TIMEOUT,
507 1,
508 ))
509 },
510 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
511 match response {
512 Ok(blocks) => {
513 if let Err(err) = Self::process_fetched_blocks(blocks,
514 peer_index,
515 blocks_guard,
516 core_dispatcher.clone(),
517 block_verifier.clone(),
518 commit_vote_monitor.clone(),
519 context.clone(),
520 commands_sender.clone(),
521 "live"
522 ).await {
523 context.metrics.update_scoring_metrics_on_block_receival(
524 peer_index,
525 peer_hostname,
526 err.clone(),
527 "process_fetched_blocks",
528 );
529 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
530 context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
531 }
532 },
533 Err(_) => {
534 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
535 if retries <= MAX_RETRIES {
536 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
537 } else {
538 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
539 drop(blocks_guard);
541 }
542 }
543 }
544 },
545 else => {
546 info!("Fetching blocks from authority {peer_index} task will now abort.");
547 break;
548 }
549 }
550 }
551 }
552
553 async fn process_fetched_blocks(
557 mut serialized_blocks: Vec<Bytes>,
558 peer_index: AuthorityIndex,
559 requested_blocks_guard: BlocksGuard,
560 core_dispatcher: Arc<D>,
561 block_verifier: Arc<V>,
562 commit_vote_monitor: Arc<CommitVoteMonitor>,
563 context: Arc<Context>,
564 commands_sender: Sender<Command>,
565 sync_method: &str,
566 ) -> ConsensusResult<()> {
567 if serialized_blocks.is_empty() {
568 return Ok(());
569 }
570
571 if context.protocol_config.consensus_batched_block_sync() {
573 serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
574 } else {
575 if serialized_blocks.len()
578 > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
579 {
580 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
581 }
582 }
583
584 let blocks = Handle::current()
586 .spawn_blocking({
587 let block_verifier = block_verifier.clone();
588 let context = context.clone();
589 move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
590 })
591 .await
592 .expect("Spawn blocking should not fail")?;
593
594 if !context.protocol_config.consensus_batched_block_sync() {
595 let ancestors = blocks
597 .iter()
598 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
599 .flat_map(|b| b.ancestors().to_vec())
600 .collect::<BTreeSet<BlockRef>>();
601
602 for block in &blocks {
605 if !requested_blocks_guard
606 .block_refs
607 .contains(&block.reference())
608 && !ancestors.contains(&block.reference())
609 {
610 return Err(ConsensusError::UnexpectedFetchedBlock {
611 index: peer_index,
612 block_ref: block.reference(),
613 });
614 }
615 }
616 }
617
618 for block in &blocks {
620 commit_vote_monitor.observe_block(block);
621 }
622
623 let metrics = &context.metrics.node_metrics;
624 let peer_hostname = &context.committee.authority(peer_index).hostname;
625 metrics
626 .synchronizer_fetched_blocks_by_peer
627 .with_label_values(&[peer_hostname.as_str(), sync_method])
628 .inc_by(blocks.len() as u64);
629 for block in &blocks {
630 let block_hostname = &context.committee.authority(block.author()).hostname;
631 metrics
632 .synchronizer_fetched_blocks_by_authority
633 .with_label_values(&[block_hostname.as_str(), sync_method])
634 .inc();
635 }
636
637 debug!(
638 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
639 blocks.len(),
640 blocks.iter().map(|b| b.reference().to_string()).join(", "),
641 );
642
643 let missing_blocks = core_dispatcher
647 .add_blocks(blocks)
648 .await
649 .map_err(|_| ConsensusError::Shutdown)?;
650
651 drop(requested_blocks_guard);
654
655 if !missing_blocks.is_empty() {
657 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
659 {
660 warn!("Commands channel is full")
661 }
662 }
663
664 context
665 .metrics
666 .node_metrics
667 .missing_blocks_after_fetch_total
668 .inc_by(missing_blocks.len() as u64);
669
670 Ok(())
671 }
672
673 fn get_highest_accepted_rounds(
674 dag_state: Arc<RwLock<DagState>>,
675 context: &Arc<Context>,
676 ) -> Vec<Round> {
677 let blocks = dag_state
678 .read()
679 .get_last_cached_block_per_authority(Round::MAX);
680 assert_eq!(blocks.len(), context.committee.size());
681
682 blocks
683 .into_iter()
684 .map(|(block, _)| block.round())
685 .collect::<Vec<_>>()
686 }
687
688 fn verify_blocks(
689 serialized_blocks: Vec<Bytes>,
690 block_verifier: Arc<V>,
691 context: &Context,
692 peer_index: AuthorityIndex,
693 ) -> ConsensusResult<Vec<VerifiedBlock>> {
694 let mut verified_blocks = Vec::new();
695
696 for serialized_block in serialized_blocks {
697 let signed_block: SignedBlock =
698 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
699
700 if let Err(e) = block_verifier.verify(&signed_block) {
702 let hostname = context.committee.authority(peer_index).hostname.clone();
705
706 context
707 .metrics
708 .node_metrics
709 .invalid_blocks
710 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
711 .inc();
712 warn!("Invalid block received from {}: {}", peer_index, e);
713 return Err(e);
714 }
715 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
716
717 let now = context.clock.timestamp_utc_ms();
721 if now < verified_block.timestamp_ms() {
722 warn!(
723 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
724 verified_block.reference(),
725 verified_block.timestamp_ms(),
726 now
727 );
728 continue;
729 }
730
731 verified_blocks.push(verified_block);
732 }
733
734 Ok(verified_blocks)
735 }
736
737 async fn fetch_blocks_request(
738 network_client: Arc<C>,
739 peer: AuthorityIndex,
740 blocks_guard: BlocksGuard,
741 highest_rounds: Vec<Round>,
742 request_timeout: Duration,
743 mut retries: u32,
744 ) -> (
745 ConsensusResult<Vec<Bytes>>,
746 BlocksGuard,
747 u32,
748 AuthorityIndex,
749 Vec<Round>,
750 ) {
751 let start = Instant::now();
752 let resp = timeout(
753 request_timeout,
754 network_client.fetch_blocks(
755 peer,
756 blocks_guard
757 .block_refs
758 .clone()
759 .into_iter()
760 .collect::<Vec<_>>(),
761 highest_rounds.clone(),
762 request_timeout,
763 ),
764 )
765 .await;
766
767 fail_point_async!("consensus-delay");
768
769 let resp = match resp {
770 Ok(Err(err)) => {
771 sleep_until(start + request_timeout).await;
774 retries += 1;
775 Err(err)
776 } Err(err) => {
778 sleep_until(start + request_timeout).await;
780 retries += 1;
781 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
782 }
783 Ok(result) => result,
784 };
785 (resp, blocks_guard, retries, peer, highest_rounds)
786 }
787
788 fn start_fetch_own_last_block_task(&mut self) {
789 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
790 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
791
792 let context = self.context.clone();
793 let dag_state = self.dag_state.clone();
794 let network_client = self.network_client.clone();
795 let block_verifier = self.block_verifier.clone();
796 let core_dispatcher = self.core_dispatcher.clone();
797
798 self.fetch_own_last_block_task
799 .spawn(monitored_future!(async move {
800 let _scope = monitored_scope("FetchOwnLastBlockTask");
801
802 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
803 let network_client_cloned = network_client.clone();
804 let own_index = context.own_index;
805 async move {
806 sleep(fetch_own_block_delay).await;
807 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
808 (r, authority_index)
809 }
810 };
811
812 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
813 let mut result = Vec::new();
814 for serialized_block in blocks {
815 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
816 block_verifier.verify(&signed_block).tap_err(|err|{
817 let hostname = context.committee.authority(authority_index).hostname.clone();
818 context
819 .metrics
820 .node_metrics
821 .invalid_blocks
822 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
823 .inc();
824 warn!("Invalid block received from {}: {}", authority_index, err);
825 })?;
826
827 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
828 if verified_block.author() != context.own_index {
829 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
830 }
831 result.push(verified_block);
832 }
833 Ok(result)
834 };
835
836 let mut highest_round = GENESIS_ROUND;
838 let mut received_response = vec![false; context.committee.size()];
840 received_response[context.own_index] = true;
842 let mut total_stake = context.committee.stake(context.own_index);
843 let mut retries = 0;
844 let mut retry_delay_step = Duration::from_millis(500);
845 'main:loop {
846 if context.committee.size() == 1 {
847 highest_round = dag_state.read().get_last_proposed_block().round();
848 info!("Only one node in the network, will not try fetching own last block from peers.");
849 break 'main;
850 }
851
852 let mut results = FuturesUnordered::new();
854
855 for (authority_index, _authority) in context.committee.authorities() {
856 if !received_response[authority_index] {
858 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
859 }
860 }
861
862 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
864 tokio::pin!(timer);
865
866 'inner: loop {
867 tokio::select! {
868 result = results.next() => {
869 let Some((result, authority_index)) = result else {
870 break 'inner;
871 };
872 match result {
873 Ok(result) => {
874 match process_blocks(result, authority_index) {
875 Ok(blocks) => {
876 received_response[authority_index] = true;
877 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
878 highest_round = highest_round.max(max_round);
879
880 total_stake += context.committee.stake(authority_index);
881 },
882 Err(err) => {
883 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
884 }
885 }
886 },
887 Err(err) => {
888 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
889 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
890 }
891 }
892 },
893 () = &mut timer => {
894 info!("Timeout while trying to sync our own last block from peers");
895 break 'inner;
896 }
897 }
898 }
899
900 if context.committee.reached_quorum(total_stake) {
902 info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
903 break 'main;
904 } else {
905 info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
906 }
907
908 retries += 1;
909 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
910 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
911
912 sleep(retry_delay_step).await;
913
914 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
915 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
916 }
917
918 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
920
921 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
922 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
923 }
924 }));
925 }
926
927 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
928 let mut missing_blocks = self
929 .core_dispatcher
930 .get_missing_blocks()
931 .await
932 .map_err(|_err| ConsensusError::Shutdown)?;
933
934 if missing_blocks.is_empty() {
936 return Ok(());
937 }
938
939 let context = self.context.clone();
940 let network_client = self.network_client.clone();
941 let block_verifier = self.block_verifier.clone();
942 let commit_vote_monitor = self.commit_vote_monitor.clone();
943 let core_dispatcher = self.core_dispatcher.clone();
944 let blocks_to_fetch = self.inflight_blocks_map.clone();
945 let commands_sender = self.commands_sender.clone();
946 let dag_state = self.dag_state.clone();
947
948 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
949 trace!(
950 "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
951 );
952 if commit_lagging {
953 if dag_state.read().gc_enabled() {
958 return Ok(());
959 }
960
961 let highest_accepted_round = dag_state.read().highest_accepted_round();
965 missing_blocks = missing_blocks
966 .into_iter()
967 .take_while(|(block_ref, _)| {
968 block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
969 })
970 .collect::<BTreeMap<_, _>>();
971
972 if missing_blocks.is_empty() {
975 trace!(
976 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
977 );
978 self.context
979 .metrics
980 .node_metrics
981 .fetch_blocks_scheduler_skipped
982 .with_label_values(&["commit_lagging"])
983 .inc();
984 return Ok(());
985 }
986 }
987
988 self.fetch_blocks_scheduler_task
989 .spawn(monitored_future!(async move {
990 let _scope = monitored_scope("FetchMissingBlocksScheduler");
991
992 context
993 .metrics
994 .node_metrics
995 .fetch_blocks_scheduler_inflight
996 .inc();
997 let total_requested = missing_blocks.len();
998
999 fail_point_async!("consensus-delay");
1000
1001 let results = Self::fetch_blocks_from_authorities(
1003 context.clone(),
1004 blocks_to_fetch.clone(),
1005 network_client,
1006 missing_blocks,
1007 dag_state,
1008 )
1009 .await;
1010 context
1011 .metrics
1012 .node_metrics
1013 .fetch_blocks_scheduler_inflight
1014 .dec();
1015 if results.is_empty() {
1016 warn!("No results returned while requesting missing blocks");
1017 return;
1018 }
1019
1020 let mut total_fetched = 0;
1022 for (blocks_guard, fetched_blocks, peer) in results {
1023 total_fetched += fetched_blocks.len();
1024
1025 if let Err(err) = Self::process_fetched_blocks(
1026 fetched_blocks,
1027 peer,
1028 blocks_guard,
1029 core_dispatcher.clone(),
1030 block_verifier.clone(),
1031 commit_vote_monitor.clone(),
1032 context.clone(),
1033 commands_sender.clone(),
1034 "periodic",
1035 )
1036 .await
1037 {
1038 warn!(
1039 "Error occurred while processing fetched blocks from peer {peer}: {err}"
1040 );
1041 }
1042 }
1043
1044 debug!(
1045 "Total blocks requested to fetch: {}, total fetched: {}",
1046 total_requested, total_fetched
1047 );
1048 }));
1049 Ok(())
1050 }
1051
1052 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1053 let last_commit_index = self.dag_state.read().last_commit_index();
1054 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1055 let commit_threshold = last_commit_index
1056 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1057 (
1058 commit_threshold < quorum_commit_index,
1059 last_commit_index,
1060 quorum_commit_index,
1061 )
1062 }
1063
1064 fn missing_block_round_threshold(&self) -> Round {
1071 self.context.parameters.commit_sync_batch_size
1072 }
1073
1074 async fn fetch_blocks_from_authorities(
1089 context: Arc<Context>,
1090 inflight_blocks: Arc<InflightBlocksMap>,
1091 network_client: Arc<C>,
1092 missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1093 dag_state: Arc<RwLock<DagState>>,
1094 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1095 let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1097 for (missing_block_ref, authorities) in &missing_blocks {
1098 for author in authorities {
1099 if author == &context.own_index {
1100 continue;
1102 }
1103 authority_to_blocks
1104 .entry(*author)
1105 .or_default()
1106 .push(*missing_block_ref);
1107 }
1108 }
1109
1110 #[cfg(not(test))]
1114 let mut rng = StdRng::from_entropy();
1115
1116 #[cfg(not(test))]
1119 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1120 authority_to_blocks
1121 .iter()
1122 .choose_multiple(
1123 &mut rng,
1124 MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1125 )
1126 .into_iter()
1127 .map(|(&peer, blocks)| {
1128 let limited_blocks = blocks
1129 .iter()
1130 .copied()
1131 .take(context.parameters.max_blocks_per_sync)
1132 .collect();
1133 (peer, limited_blocks, "periodic_known")
1134 })
1135 .collect();
1136 #[cfg(test)]
1137 let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1139 let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1140 .iter()
1141 .map(|(&peer, blocks)| {
1142 let limited_blocks = blocks
1143 .iter()
1144 .copied()
1145 .take(context.parameters.max_blocks_per_sync)
1146 .collect();
1147 (peer, limited_blocks, "periodic_known")
1148 })
1149 .collect();
1150 items.sort_by_key(|(peer, _, _)| *peer);
1153 items
1154 .into_iter()
1155 .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1156 .collect()
1157 };
1158
1159 let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1162 .iter()
1163 .map(|(peer, _, _)| *peer)
1164 .collect();
1165
1166 let random_candidates: Vec<_> = context
1167 .committee
1168 .authorities()
1169 .filter_map(|(peer_index, _)| {
1170 (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1171 .then_some(peer_index)
1172 })
1173 .collect();
1174 #[cfg(test)]
1175 let random_peers: Vec<AuthorityIndex> = random_candidates
1176 .into_iter()
1177 .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1178 .collect();
1179 #[cfg(not(test))]
1180 let random_peers: Vec<AuthorityIndex> = random_candidates
1181 .into_iter()
1182 .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1183
1184 #[cfg_attr(test, allow(unused_mut))]
1185 let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1186 #[cfg(not(test))]
1189 all_missing_blocks.shuffle(&mut rng);
1190
1191 let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1192
1193 for peer in random_peers {
1194 if let Some(chunk) = block_chunks.next() {
1195 chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1196 } else {
1197 break;
1198 }
1199 }
1200
1201 let mut request_futures = FuturesUnordered::new();
1202
1203 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1204
1205 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1207 for block in &all_missing_blocks {
1208 missing_blocks_per_authority[block.author] += 1;
1209 }
1210 for (missing, (_, authority)) in missing_blocks_per_authority
1211 .into_iter()
1212 .zip(context.committee.authorities())
1213 {
1214 context
1215 .metrics
1216 .node_metrics
1217 .synchronizer_missing_blocks_by_authority
1218 .with_label_values(&[&authority.hostname])
1219 .inc_by(missing as u64);
1220 context
1221 .metrics
1222 .node_metrics
1223 .synchronizer_current_missing_blocks_by_authority
1224 .with_label_values(&[&authority.hostname])
1225 .set(missing as i64);
1226 }
1227
1228 #[cfg_attr(test, expect(unused_mut))]
1231 let mut remaining_peers: Vec<_> = context
1232 .committee
1233 .authorities()
1234 .filter_map(|(peer_index, _)| {
1235 if peer_index != context.own_index
1236 && !chosen_peers_with_blocks
1237 .iter()
1238 .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1239 {
1240 Some(peer_index)
1241 } else {
1242 None
1243 }
1244 })
1245 .collect();
1246
1247 #[cfg(not(test))]
1248 remaining_peers.shuffle(&mut rng);
1249 let mut remaining_peers = remaining_peers.into_iter();
1250
1251 for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1253 let peer_hostname = &context.committee.authority(peer).hostname;
1254 let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1255
1256 if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1259 info!(
1260 "Periodic sync of {} missing blocks from peer {} {}: {}",
1261 block_refs.len(),
1262 peer,
1263 peer_hostname,
1264 block_refs
1265 .iter()
1266 .map(|b| b.to_string())
1267 .collect::<Vec<_>>()
1268 .join(", ")
1269 );
1270 let metrics = &context.metrics.node_metrics;
1272 metrics
1273 .synchronizer_requested_blocks_by_peer
1274 .with_label_values(&[peer_hostname.as_str(), label])
1275 .inc_by(block_refs.len() as u64);
1276 for block_ref in &block_refs {
1277 let block_hostname = &context.committee.authority(block_ref.author).hostname;
1278 metrics
1279 .synchronizer_requested_blocks_by_authority
1280 .with_label_values(&[block_hostname.as_str(), label])
1281 .inc();
1282 }
1283 request_futures.push(Self::fetch_blocks_request(
1284 network_client.clone(),
1285 peer,
1286 blocks_guard,
1287 highest_rounds.clone(),
1288 FETCH_REQUEST_TIMEOUT,
1289 1,
1290 ));
1291 }
1292 }
1293
1294 let mut results = Vec::new();
1295 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1296
1297 tokio::pin!(fetcher_timeout);
1298
1299 loop {
1300 tokio::select! {
1301 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1302 let peer_hostname = &context.committee.authority(peer_index).hostname;
1303 match response {
1304 Ok(fetched_blocks) => {
1305 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1306 results.push((blocks_guard, fetched_blocks, peer_index));
1307
1308 if request_futures.is_empty() {
1310 break;
1311 }
1312 },
1313 Err(_) => {
1314 context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1315 if let Some(next_peer) = remaining_peers.next() {
1317 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1319 info!(
1320 "Retrying syncing {} missing blocks from peer {}: {}",
1321 blocks_guard.block_refs.len(),
1322 peer_hostname,
1323 blocks_guard.block_refs
1324 .iter()
1325 .map(|b| b.to_string())
1326 .collect::<Vec<_>>()
1327 .join(", ")
1328 );
1329 let block_refs = blocks_guard.block_refs.clone();
1330 let metrics = &context.metrics.node_metrics;
1332 metrics
1333 .synchronizer_requested_blocks_by_peer
1334 .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1335 .inc_by(block_refs.len() as u64);
1336 for block_ref in &block_refs {
1337 let block_hostname =
1338 &context.committee.authority(block_ref.author).hostname;
1339 metrics
1340 .synchronizer_requested_blocks_by_authority
1341 .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1342 .inc();
1343 }
1344 request_futures.push(Self::fetch_blocks_request(
1345 network_client.clone(),
1346 next_peer,
1347 blocks_guard,
1348 highest_rounds,
1349 FETCH_REQUEST_TIMEOUT,
1350 1,
1351 ));
1352 } else {
1353 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1354 }
1355 } else {
1356 debug!("No more peers left to fetch blocks");
1357 }
1358 }
1359 }
1360 },
1361 _ = &mut fetcher_timeout => {
1362 debug!("Timed out while fetching missing blocks");
1363 break;
1364 }
1365 }
1366 }
1367
1368 results
1369 }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use std::{
1375 collections::{BTreeMap, BTreeSet},
1376 sync::Arc,
1377 time::Duration,
1378 };
1379
1380 use async_trait::async_trait;
1381 use bytes::Bytes;
1382 use consensus_config::{AuthorityIndex, Parameters};
1383 use iota_metrics::monitored_mpsc;
1384 use parking_lot::RwLock;
1385 use tokio::{sync::Mutex, time::sleep};
1386
1387 use crate::{
1388 CommitDigest, CommitIndex,
1389 authority_service::COMMIT_LAG_MULTIPLIER,
1390 block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1391 block_verifier::NoopBlockVerifier,
1392 commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1393 commit_vote_monitor::CommitVoteMonitor,
1394 context::Context,
1395 core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1396 dag_state::DagState,
1397 error::{ConsensusError, ConsensusResult},
1398 network::{BlockStream, NetworkClient},
1399 round_prober::QuorumRound,
1400 storage::mem_store::MemStore,
1401 synchronizer::{
1402 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1403 },
1404 };
1405
1406 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1407 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1408 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1409 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1410
1411 #[derive(Default)]
1412 struct MockNetworkClient {
1413 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1414 fetch_latest_blocks_requests:
1415 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1416 }
1417
1418 impl MockNetworkClient {
1419 async fn stub_fetch_blocks(
1420 &self,
1421 blocks: Vec<VerifiedBlock>,
1422 peer: AuthorityIndex,
1423 latency: Option<Duration>,
1424 ) {
1425 let mut lock = self.fetch_blocks_requests.lock().await;
1426 let block_refs = blocks
1427 .iter()
1428 .map(|block| block.reference())
1429 .collect::<Vec<_>>();
1430 lock.insert((block_refs, peer), (blocks, latency));
1431 }
1432
1433 async fn stub_fetch_latest_blocks(
1434 &self,
1435 blocks: Vec<VerifiedBlock>,
1436 peer: AuthorityIndex,
1437 authorities: Vec<AuthorityIndex>,
1438 latency: Option<Duration>,
1439 ) {
1440 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1441 lock.entry((peer, authorities))
1442 .or_default()
1443 .push((blocks, latency));
1444 }
1445
1446 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1447 let lock = self.fetch_latest_blocks_requests.lock().await;
1448 lock.len()
1449 }
1450 }
1451
1452 #[async_trait]
1453 impl NetworkClient for MockNetworkClient {
1454 const SUPPORT_STREAMING: bool = false;
1455
1456 async fn send_block(
1457 &self,
1458 _peer: AuthorityIndex,
1459 _serialized_block: &VerifiedBlock,
1460 _timeout: Duration,
1461 ) -> ConsensusResult<()> {
1462 unimplemented!("Unimplemented")
1463 }
1464
1465 async fn subscribe_blocks(
1466 &self,
1467 _peer: AuthorityIndex,
1468 _last_received: Round,
1469 _timeout: Duration,
1470 ) -> ConsensusResult<BlockStream> {
1471 unimplemented!("Unimplemented")
1472 }
1473
1474 async fn fetch_blocks(
1475 &self,
1476 peer: AuthorityIndex,
1477 block_refs: Vec<BlockRef>,
1478 _highest_accepted_rounds: Vec<Round>,
1479 _timeout: Duration,
1480 ) -> ConsensusResult<Vec<Bytes>> {
1481 let mut lock = self.fetch_blocks_requests.lock().await;
1482 let response = lock
1483 .remove(&(block_refs, peer))
1484 .expect("Unexpected fetch blocks request made");
1485
1486 let serialised = response
1487 .0
1488 .into_iter()
1489 .map(|block| block.serialized().clone())
1490 .collect::<Vec<_>>();
1491
1492 drop(lock);
1493
1494 if let Some(latency) = response.1 {
1495 sleep(latency).await;
1496 }
1497
1498 Ok(serialised)
1499 }
1500
1501 async fn fetch_commits(
1502 &self,
1503 _peer: AuthorityIndex,
1504 _commit_range: CommitRange,
1505 _timeout: Duration,
1506 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1507 unimplemented!("Unimplemented")
1508 }
1509
1510 async fn fetch_latest_blocks(
1511 &self,
1512 peer: AuthorityIndex,
1513 authorities: Vec<AuthorityIndex>,
1514 _timeout: Duration,
1515 ) -> ConsensusResult<Vec<Bytes>> {
1516 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1517 let mut responses = lock
1518 .remove(&(peer, authorities.clone()))
1519 .expect("Unexpected fetch blocks request made");
1520
1521 let response = responses.remove(0);
1522 let serialised = response
1523 .0
1524 .into_iter()
1525 .map(|block| block.serialized().clone())
1526 .collect::<Vec<_>>();
1527
1528 if !responses.is_empty() {
1529 lock.insert((peer, authorities), responses);
1530 }
1531
1532 drop(lock);
1533
1534 if let Some(latency) = response.1 {
1535 sleep(latency).await;
1536 }
1537
1538 Ok(serialised)
1539 }
1540
1541 async fn get_latest_rounds(
1542 &self,
1543 _peer: AuthorityIndex,
1544 _timeout: Duration,
1545 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1546 unimplemented!("Unimplemented")
1547 }
1548 }
1549
1550 #[test]
1551 fn test_inflight_blocks_map() {
1552 let map = InflightBlocksMap::new();
1554 let some_block_refs = [
1555 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1556 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1557 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1558 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1559 ];
1560 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1561
1562 {
1564 let mut all_guards = Vec::new();
1565
1566 for i in 1..=3 {
1568 let authority = AuthorityIndex::new_for_test(i);
1569
1570 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1571 let guard = guard.expect("Guard should be created");
1572 assert_eq!(guard.block_refs.len(), 4);
1573
1574 all_guards.push(guard);
1575
1576 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1578 assert!(guard.is_none());
1579 }
1580
1581 let authority_4 = AuthorityIndex::new_for_test(4);
1584
1585 let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1586 assert!(guard.is_none());
1587
1588 drop(all_guards.remove(0));
1591
1592 let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1593 let guard = guard.expect("Guard should be successfully acquired");
1594
1595 assert_eq!(guard.block_refs, missing_block_refs);
1596
1597 drop(guard);
1599 drop(all_guards);
1600
1601 assert_eq!(map.num_of_locked_blocks(), 0);
1602 }
1603
1604 {
1606 let authority_1 = AuthorityIndex::new_for_test(1);
1608 let guard = map
1609 .lock_blocks(missing_block_refs.clone(), authority_1)
1610 .unwrap();
1611
1612 let authority_2 = AuthorityIndex::new_for_test(2);
1614 let guard = map.swap_locks(guard, authority_2);
1615
1616 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1617 }
1618 }
1619
1620 #[tokio::test]
1621 async fn test_process_fetched_blocks() {
1622 let (context, _) = Context::new_for_test(4);
1624 let context = Arc::new(context);
1625 let block_verifier = Arc::new(NoopBlockVerifier {});
1626 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1627 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1628 let (commands_sender, _commands_receiver) =
1629 monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1630
1631 let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
1635 expected_blocks.extend(
1636 (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
1637 );
1638 assert_eq!(
1639 expected_blocks.len(),
1640 context.parameters.max_blocks_per_sync
1641 );
1642
1643 let expected_serialized_blocks = expected_blocks
1644 .iter()
1645 .map(|b| b.serialized().clone())
1646 .collect::<Vec<_>>();
1647
1648 let expected_block_refs = expected_blocks
1649 .iter()
1650 .map(|b| b.reference())
1651 .collect::<BTreeSet<_>>();
1652
1653 let peer_index = AuthorityIndex::new_for_test(2);
1655
1656 let inflight_blocks_map = InflightBlocksMap::new();
1658 let blocks_guard = inflight_blocks_map
1659 .lock_blocks(expected_block_refs.clone(), peer_index)
1660 .expect("Failed to lock blocks");
1661
1662 assert_eq!(
1663 inflight_blocks_map.num_of_locked_blocks(),
1664 expected_block_refs.len()
1665 );
1666
1667 let result = Synchronizer::<
1669 MockNetworkClient,
1670 NoopBlockVerifier,
1671 MockCoreThreadDispatcher,
1672 >::process_fetched_blocks(
1673 expected_serialized_blocks,
1674 peer_index,
1675 blocks_guard, core_dispatcher.clone(),
1677 block_verifier,
1678 commit_vote_monitor,
1679 context.clone(),
1680 commands_sender,
1681 "test",
1682 )
1683 .await;
1684
1685 assert!(result.is_ok());
1687
1688 let added_blocks = core_dispatcher.get_add_blocks().await;
1690 assert_eq!(
1691 added_blocks
1692 .iter()
1693 .map(|b| b.reference())
1694 .collect::<BTreeSet<_>>(),
1695 expected_block_refs,
1696 );
1697
1698 assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
1700 }
1701
1702 #[tokio::test]
1703 async fn test_successful_fetch_blocks_from_peer() {
1704 let (context, _) = Context::new_for_test(4);
1706 let context = Arc::new(context);
1707 let block_verifier = Arc::new(NoopBlockVerifier {});
1708 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1709 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1710 let network_client = Arc::new(MockNetworkClient::default());
1711 let store = Arc::new(MemStore::new());
1712 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1713
1714 let handle = Synchronizer::start(
1715 network_client.clone(),
1716 context,
1717 core_dispatcher.clone(),
1718 commit_vote_monitor,
1719 block_verifier,
1720 dag_state,
1721 false,
1722 );
1723
1724 let expected_blocks = (0..10)
1726 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1727 .collect::<Vec<_>>();
1728 let missing_blocks = expected_blocks
1729 .iter()
1730 .map(|block| block.reference())
1731 .collect::<BTreeSet<_>>();
1732
1733 let peer = AuthorityIndex::new_for_test(1);
1735 network_client
1736 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1737 .await;
1738
1739 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1741
1742 sleep(Duration::from_millis(1_000)).await;
1744
1745 let added_blocks = core_dispatcher.get_add_blocks().await;
1747 assert_eq!(added_blocks, expected_blocks);
1748 }
1749
1750 #[tokio::test]
1751 async fn saturate_fetch_blocks_from_peer() {
1752 let (context, _) = Context::new_for_test(4);
1754 let context = Arc::new(context);
1755 let block_verifier = Arc::new(NoopBlockVerifier {});
1756 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1757 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1758 let network_client = Arc::new(MockNetworkClient::default());
1759 let store = Arc::new(MemStore::new());
1760 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1761
1762 let handle = Synchronizer::start(
1763 network_client.clone(),
1764 context,
1765 core_dispatcher.clone(),
1766 commit_vote_monitor,
1767 block_verifier,
1768 dag_state,
1769 false,
1770 );
1771
1772 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1774 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1775 .collect::<Vec<_>>();
1776
1777 let peer = AuthorityIndex::new_for_test(1);
1779 let mut iter = expected_blocks.iter().peekable();
1780 while let Some(block) = iter.next() {
1781 network_client
1784 .stub_fetch_blocks(
1785 vec![block.clone()],
1786 peer,
1787 Some(Duration::from_millis(5_000)),
1788 )
1789 .await;
1790
1791 let mut missing_blocks = BTreeSet::new();
1792 missing_blocks.insert(block.reference());
1793
1794 if iter.peek().is_none() {
1797 match handle.fetch_blocks(missing_blocks, peer).await {
1798 Err(ConsensusError::SynchronizerSaturated(index, _)) => {
1799 assert_eq!(index, peer);
1800 }
1801 _ => panic!("A saturated synchronizer error was expected"),
1802 }
1803 } else {
1804 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1805 }
1806 }
1807 }
1808
1809 #[tokio::test(flavor = "current_thread", start_paused = true)]
1810 async fn synchronizer_periodic_task_fetch_blocks() {
1811 let (context, _) = Context::new_for_test(4);
1813 let context = Arc::new(context);
1814 let block_verifier = Arc::new(NoopBlockVerifier {});
1815 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1816 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1817 let network_client = Arc::new(MockNetworkClient::default());
1818 let store = Arc::new(MemStore::new());
1819 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1820
1821 let expected_blocks = (0..10)
1823 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1824 .collect::<Vec<_>>();
1825 let missing_blocks = expected_blocks
1826 .iter()
1827 .map(|block| block.reference())
1828 .collect::<BTreeSet<_>>();
1829
1830 core_dispatcher
1832 .stub_missing_blocks(missing_blocks.clone())
1833 .await;
1834
1835 network_client
1839 .stub_fetch_blocks(
1840 expected_blocks.clone(),
1841 AuthorityIndex::new_for_test(1),
1842 Some(FETCH_REQUEST_TIMEOUT),
1843 )
1844 .await;
1845 network_client
1846 .stub_fetch_blocks(
1847 expected_blocks.clone(),
1848 AuthorityIndex::new_for_test(2),
1849 None,
1850 )
1851 .await;
1852
1853 let _handle = Synchronizer::start(
1855 network_client.clone(),
1856 context,
1857 core_dispatcher.clone(),
1858 commit_vote_monitor,
1859 block_verifier,
1860 dag_state,
1861 false,
1862 );
1863
1864 sleep(8 * FETCH_REQUEST_TIMEOUT).await;
1865
1866 let added_blocks = core_dispatcher.get_add_blocks().await;
1868 assert_eq!(added_blocks, expected_blocks);
1869
1870 assert!(
1872 core_dispatcher
1873 .get_missing_blocks()
1874 .await
1875 .unwrap()
1876 .is_empty()
1877 );
1878 }
1879
1880 #[tokio::test(flavor = "current_thread", start_paused = true)]
1881 async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds()
1882 {
1883 let (mut context, _) = Context::new_for_test(4);
1885
1886 context
1889 .protocol_config
1890 .set_consensus_gc_depth_for_testing(0);
1891 context
1893 .protocol_config
1894 .set_consensus_batched_block_sync_for_testing(true);
1895
1896 let context = Arc::new(context);
1897
1898 let block_verifier = Arc::new(NoopBlockVerifier {});
1899 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1900 let network_client = Arc::new(MockNetworkClient::default());
1901 let store = Arc::new(MemStore::new());
1902 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1903 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1904
1905 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1908 let expected_blocks = (1..=sync_missing_block_round_threshold * 2)
1909 .flat_map(|round| {
1910 vec![
1911 VerifiedBlock::new_for_test(TestBlock::new(round, 1).build()),
1912 VerifiedBlock::new_for_test(TestBlock::new(round, 2).build()),
1913 VerifiedBlock::new_for_test(TestBlock::new(round, 3).build()),
1914 ]
1915 .into_iter()
1916 })
1917 .collect::<Vec<_>>();
1918
1919 let missing_blocks = expected_blocks
1920 .iter()
1921 .map(|block| block.reference())
1922 .collect::<BTreeSet<_>>();
1923 core_dispatcher.stub_missing_blocks(missing_blocks).await;
1924
1925 let stub_blocks = expected_blocks
1927 .iter()
1928 .map(|block| (block.reference(), block.clone()))
1929 .collect::<BTreeMap<_, _>>();
1930
1931 let stub_block_author_1 = stub_blocks
1933 .iter()
1934 .filter(|(block, _)| block.author == AuthorityIndex::new_for_test(1))
1935 .take(context.parameters.max_blocks_per_sync)
1936 .map(|(_, block)| block.clone())
1937 .collect::<Vec<_>>();
1938
1939 let stub_block_author_2 = stub_blocks
1940 .iter()
1941 .filter(|(block, _)| block.author == AuthorityIndex::new_for_test(2))
1942 .take(context.parameters.max_blocks_per_sync)
1943 .map(|(_, block)| block.clone())
1944 .collect::<Vec<_>>();
1945
1946 let stub_block_author_3 = stub_blocks
1948 .iter()
1949 .take(context.parameters.max_blocks_per_sync)
1950 .map(|(_, block)| block.clone())
1951 .collect::<Vec<_>>();
1952
1953 let mut expected_blocks: Vec<_> = Vec::new();
1954 expected_blocks.extend(stub_block_author_1.clone());
1955 expected_blocks.extend(stub_block_author_2.clone());
1956 expected_blocks.extend(stub_block_author_3.clone());
1957
1958 network_client
1959 .stub_fetch_blocks(stub_block_author_1, AuthorityIndex::new_for_test(1), None)
1960 .await;
1961 network_client
1962 .stub_fetch_blocks(stub_block_author_2, AuthorityIndex::new_for_test(2), None)
1963 .await;
1964 network_client
1965 .stub_fetch_blocks(stub_block_author_3, AuthorityIndex::new_for_test(3), None)
1966 .await;
1967
1968 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1970 let commit_index: CommitIndex = round - 1;
1971 let blocks = (0..4)
1972 .map(|authority| {
1973 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1974 let block = TestBlock::new(round, authority)
1975 .set_commit_votes(commit_votes)
1976 .build();
1977
1978 VerifiedBlock::new_for_test(block)
1979 })
1980 .collect::<Vec<_>>();
1981
1982 for block in blocks {
1985 commit_vote_monitor.observe_block(&block);
1986 }
1987
1988 let _handle = Synchronizer::start(
1991 network_client.clone(),
1992 context.clone(),
1993 core_dispatcher.clone(),
1994 commit_vote_monitor.clone(),
1995 block_verifier.clone(),
1996 dag_state.clone(),
1997 false,
1998 );
1999
2000 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2001
2002 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2005
2006 added_blocks.sort_by_key(|block| block.reference());
2007 expected_blocks.sort_by_key(|block| block.reference());
2008
2009 assert_eq!(added_blocks, expected_blocks);
2010 }
2011
2012 #[tokio::test(flavor = "current_thread", start_paused = true)]
2013 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2014 let (mut context, _) = Context::new_for_test(4);
2016 context
2017 .protocol_config
2018 .set_consensus_batched_block_sync_for_testing(true);
2019 let context = Arc::new(context);
2020 let block_verifier = Arc::new(NoopBlockVerifier {});
2021 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2022 let network_client = Arc::new(MockNetworkClient::default());
2023 let store = Arc::new(MemStore::new());
2024 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2025 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2026
2027 let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2030 let stub_blocks = (sync_missing_block_round_threshold * 2
2031 ..sync_missing_block_round_threshold * 2
2032 + context.parameters.max_blocks_per_sync as u32)
2033 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2034 .collect::<Vec<_>>();
2035 let missing_blocks = stub_blocks
2036 .iter()
2037 .map(|block| block.reference())
2038 .collect::<BTreeSet<_>>();
2039 core_dispatcher
2040 .stub_missing_blocks(missing_blocks.clone())
2041 .await;
2042 let mut expected_blocks = stub_blocks
2046 .iter()
2047 .take(context.parameters.max_blocks_per_sync)
2048 .cloned()
2049 .collect::<Vec<_>>();
2050 network_client
2051 .stub_fetch_blocks(
2052 expected_blocks.clone(),
2053 AuthorityIndex::new_for_test(1),
2054 Some(FETCH_REQUEST_TIMEOUT),
2055 )
2056 .await;
2057 network_client
2058 .stub_fetch_blocks(
2059 expected_blocks.clone(),
2060 AuthorityIndex::new_for_test(2),
2061 None,
2062 )
2063 .await;
2064
2065 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2067 let commit_index: CommitIndex = round - 1;
2068 let blocks = (0..4)
2069 .map(|authority| {
2070 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2071 let block = TestBlock::new(round, authority)
2072 .set_commit_votes(commit_votes)
2073 .build();
2074
2075 VerifiedBlock::new_for_test(block)
2076 })
2077 .collect::<Vec<_>>();
2078
2079 for block in blocks {
2082 commit_vote_monitor.observe_block(&block);
2083 }
2084
2085 let _handle = Synchronizer::start(
2088 network_client.clone(),
2089 context.clone(),
2090 core_dispatcher.clone(),
2091 commit_vote_monitor.clone(),
2092 block_verifier,
2093 dag_state.clone(),
2094 false,
2095 );
2096
2097 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2098
2099 let added_blocks = core_dispatcher.get_add_blocks().await;
2102 assert_eq!(added_blocks, vec![]);
2103
2104 println!("Before advancing");
2105 {
2108 let mut d = dag_state.write();
2109 for index in 1..=commit_index {
2110 let commit =
2111 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2112
2113 d.add_commit(commit);
2114 }
2115
2116 println!("Once advanced");
2117 assert_eq!(
2118 d.last_commit_index(),
2119 commit_vote_monitor.quorum_commit_index()
2120 );
2121 }
2122
2123 core_dispatcher
2125 .stub_missing_blocks(missing_blocks.clone())
2126 .await;
2127
2128 println!("Final sleep");
2129 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2130
2131 let mut added_blocks = core_dispatcher.get_add_blocks().await;
2133 println!("Final await");
2134 added_blocks.sort_by_key(|block| block.reference());
2135 expected_blocks.sort_by_key(|block| block.reference());
2136
2137 assert_eq!(added_blocks, expected_blocks);
2138 }
2139
2140 #[tokio::test(flavor = "current_thread", start_paused = true)]
2141 async fn synchronizer_fetch_own_last_block() {
2142 let (context, _) = Context::new_for_test(4);
2144 let context = Arc::new(context.with_parameters(Parameters {
2145 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2146 ..Default::default()
2147 }));
2148 let block_verifier = Arc::new(NoopBlockVerifier {});
2149 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2150 let network_client = Arc::new(MockNetworkClient::default());
2151 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2152 let store = Arc::new(MemStore::new());
2153 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2154 let our_index = AuthorityIndex::new_for_test(0);
2155
2156 let mut expected_blocks = (8..=10)
2158 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2159 .collect::<Vec<_>>();
2160
2161 let block_1 = expected_blocks.pop().unwrap();
2164 network_client
2165 .stub_fetch_latest_blocks(
2166 vec![block_1.clone()],
2167 AuthorityIndex::new_for_test(1),
2168 vec![our_index],
2169 Some(Duration::from_secs(10)),
2170 )
2171 .await;
2172 network_client
2173 .stub_fetch_latest_blocks(
2174 vec![block_1],
2175 AuthorityIndex::new_for_test(1),
2176 vec![our_index],
2177 None,
2178 )
2179 .await;
2180
2181 let block_2 = expected_blocks.pop().unwrap();
2183 network_client
2184 .stub_fetch_latest_blocks(
2185 vec![block_2.clone()],
2186 AuthorityIndex::new_for_test(2),
2187 vec![our_index],
2188 Some(Duration::from_secs(10)),
2189 )
2190 .await;
2191 network_client
2192 .stub_fetch_latest_blocks(
2193 vec![block_2],
2194 AuthorityIndex::new_for_test(2),
2195 vec![our_index],
2196 None,
2197 )
2198 .await;
2199
2200 let block_3 = expected_blocks.pop().unwrap();
2202 network_client
2203 .stub_fetch_latest_blocks(
2204 vec![block_3.clone()],
2205 AuthorityIndex::new_for_test(3),
2206 vec![our_index],
2207 Some(Duration::from_secs(10)),
2208 )
2209 .await;
2210 network_client
2211 .stub_fetch_latest_blocks(
2212 vec![block_3],
2213 AuthorityIndex::new_for_test(3),
2214 vec![our_index],
2215 None,
2216 )
2217 .await;
2218
2219 let handle = Synchronizer::start(
2221 network_client.clone(),
2222 context.clone(),
2223 core_dispatcher.clone(),
2224 commit_vote_monitor,
2225 block_verifier,
2226 dag_state,
2227 true,
2228 );
2229
2230 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2232
2233 assert_eq!(
2235 core_dispatcher.get_last_own_proposed_round().await,
2236 vec![10]
2237 );
2238
2239 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2241
2242 assert_eq!(
2244 context
2245 .metrics
2246 .node_metrics
2247 .sync_last_known_own_block_retries
2248 .get(),
2249 1
2250 );
2251
2252 if let Err(err) = handle.stop().await {
2254 if err.is_panic() {
2255 std::panic::resume_unwind(err.into_panic());
2256 }
2257 }
2258 }
2259 #[derive(Default)]
2260 struct SyncMockDispatcher {
2261 missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2262 added_blocks: Mutex<Vec<VerifiedBlock>>,
2263 }
2264
2265 #[async_trait::async_trait]
2266 impl CoreThreadDispatcher for SyncMockDispatcher {
2267 async fn get_missing_blocks(
2268 &self,
2269 ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2270 Ok(self.missing_blocks.lock().await.clone())
2271 }
2272 async fn add_blocks(
2273 &self,
2274 blocks: Vec<VerifiedBlock>,
2275 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2276 let mut guard = self.added_blocks.lock().await;
2277 guard.extend(blocks.clone());
2278 Ok(blocks.iter().map(|b| b.reference()).collect())
2279 }
2280
2281 async fn check_block_refs(
2284 &self,
2285 block_refs: Vec<BlockRef>,
2286 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2287 Ok(block_refs.into_iter().collect())
2289 }
2290
2291 async fn add_certified_commits(
2292 &self,
2293 _commits: CertifiedCommits,
2294 ) -> Result<BTreeSet<BlockRef>, CoreError> {
2295 Ok(BTreeSet::new())
2297 }
2298
2299 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2300 Ok(())
2301 }
2302
2303 fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2304 Ok(())
2305 }
2306
2307 fn set_propagation_delay_and_quorum_rounds(
2308 &self,
2309 _delay: Round,
2310 _received_quorum_rounds: Vec<QuorumRound>,
2311 _accepted_quorum_rounds: Vec<QuorumRound>,
2312 ) -> Result<(), CoreError> {
2313 Ok(())
2314 }
2315
2316 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2317 Ok(())
2318 }
2319
2320 fn highest_received_rounds(&self) -> Vec<Round> {
2321 Vec::new()
2322 }
2323 }
2324
2325 #[tokio::test(flavor = "current_thread")]
2326 async fn known_before_random_peer_fetch() {
2327 {
2328 let (ctx, _) = Context::new_for_test(10);
2330 let context = Arc::new(ctx);
2331 let store = Arc::new(MemStore::new());
2332 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2333 let inflight = InflightBlocksMap::new();
2334
2335 let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2337 let missing_ref = missing_vb.reference();
2338 let missing_blocks = BTreeMap::from([(
2339 missing_ref,
2340 BTreeSet::from([
2341 AuthorityIndex::new_for_test(2),
2342 AuthorityIndex::new_for_test(3),
2343 AuthorityIndex::new_for_test(4),
2344 ]),
2345 )]);
2346
2347 let network_client = Arc::new(MockNetworkClient::default());
2349 for i in 1..=9 {
2351 let peer = AuthorityIndex::new_for_test(i);
2352 if i == 1 || i == 4 {
2353 network_client
2354 .stub_fetch_blocks(
2355 vec![missing_vb.clone()],
2356 peer,
2357 Some(2 * FETCH_REQUEST_TIMEOUT),
2358 )
2359 .await;
2360 continue;
2361 }
2362 network_client
2363 .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2364 .await;
2365 }
2366
2367 let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2370 ::fetch_blocks_from_authorities(
2371 context.clone(),
2372 inflight.clone(),
2373 network_client.clone(),
2374 missing_blocks,
2375 dag_state.clone(),
2376 )
2377 .await;
2378
2379 assert_eq!(results.len(), 2);
2382
2383 let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2385 assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2386 let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2387 assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2388 let expected = missing_vb.serialized().clone();
2390 assert_eq!(hot_bytes, &vec![expected]);
2391 }
2392 }
2393
2394 #[tokio::test(flavor = "current_thread")]
2395 async fn known_before_periodic_peer_fetch_larger_scenario() {
2396 use std::{
2397 collections::{BTreeMap, BTreeSet},
2398 sync::Arc,
2399 };
2400
2401 use parking_lot::RwLock;
2402
2403 use crate::{
2404 block::{Round, TestBlock, VerifiedBlock},
2405 context::Context,
2406 };
2407
2408 let (ctx, _) = Context::new_for_test(10);
2410 let context = Arc::new(ctx);
2411 let store = Arc::new(MemStore::new());
2412 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2413 let inflight = InflightBlocksMap::new();
2414 let network_client = Arc::new(MockNetworkClient::default());
2415
2416 let mut missing_blocks = BTreeMap::new();
2418 let mut missing_vbs = Vec::new();
2419 let known_number_blocks = 10;
2420 for i in 0..1000 {
2421 let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2422 let r = vb.reference();
2423 if i < known_number_blocks {
2424 missing_blocks.insert(
2426 r,
2427 BTreeSet::from([
2428 AuthorityIndex::new_for_test(0),
2429 AuthorityIndex::new_for_test(2),
2430 ]),
2431 );
2432 } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2433 missing_blocks.insert(
2435 r,
2436 BTreeSet::from([
2437 AuthorityIndex::new_for_test(0),
2438 AuthorityIndex::new_for_test(3),
2439 ]),
2440 );
2441 } else {
2442 missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2444 }
2445 missing_vbs.push(vb);
2446 }
2447
2448 let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2450 let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2451 .iter()
2452 .map(|&peer| {
2453 let vbs = missing_vbs
2454 .iter()
2455 .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2456 .take(context.parameters.max_blocks_per_sync)
2457 .cloned()
2458 .collect::<Vec<_>>();
2459 (peer, vbs)
2460 })
2461 .collect();
2462
2463 for (peer, vbs) in known_vbs_by_peer {
2464 if peer == AuthorityIndex::new_for_test(2) {
2465 network_client
2467 .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2468 .await;
2469 network_client
2470 .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2471 .await;
2472 } else {
2473 network_client
2474 .stub_fetch_blocks(vbs.clone(), peer, None)
2475 .await;
2476 }
2477 }
2478
2479 network_client
2481 .stub_fetch_blocks(
2482 missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2483 AuthorityIndex::new_for_test(1),
2484 None,
2485 )
2486 .await;
2487
2488 network_client
2489 .stub_fetch_blocks(
2490 missing_vbs[context.parameters.max_blocks_per_sync
2491 ..2 * context.parameters.max_blocks_per_sync]
2492 .to_vec(),
2493 AuthorityIndex::new_for_test(4),
2494 None,
2495 )
2496 .await;
2497
2498 let results = Synchronizer::<
2500 MockNetworkClient,
2501 NoopBlockVerifier,
2502 SyncMockDispatcher,
2503 >::fetch_blocks_from_authorities(
2504 context.clone(),
2505 inflight.clone(),
2506 network_client.clone(),
2507 missing_blocks,
2508 dag_state.clone(),
2509 )
2510 .await;
2511
2512 assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2515
2516 let (_guard3, bytes3, peer3) = &results[0];
2518 assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
2519 let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
2520 .iter()
2521 .map(|vb| vb.serialized().clone())
2522 .collect::<Vec<_>>();
2523 assert_eq!(bytes3, &expected2);
2524
2525 let (_guard1, bytes1, peer1) = &results[1];
2527 assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
2528 let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
2529 .iter()
2530 .map(|vb| vb.serialized().clone())
2531 .collect::<Vec<_>>();
2532 assert_eq!(bytes1, &expected1);
2533
2534 let (_guard4, bytes4, peer4) = &results[2];
2536 assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
2537 let expected4 = missing_vbs
2538 [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
2539 .iter()
2540 .map(|vb| vb.serialized().clone())
2541 .collect::<Vec<_>>();
2542 assert_eq!(bytes4, &expected4);
2543
2544 let (_guard5, bytes5, peer5) = &results[3];
2546 assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
2547 let expected5 = missing_vbs[0..known_number_blocks]
2548 .iter()
2549 .map(|vb| vb.serialized().clone())
2550 .collect::<Vec<_>>();
2551 assert_eq!(bytes5, &expected5);
2552 }
2553}