1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 sync::Arc,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::{prelude::SliceRandom, rngs::ThreadRng};
24use tap::TapFallible;
25use tokio::{
26 runtime::Handle,
27 sync::{mpsc::error::TrySendError, oneshot},
28 task::{JoinError, JoinSet},
29 time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34 BlockAPI, CommitIndex, Round,
35 authority_service::COMMIT_LAG_MULTIPLIER,
36 block::{BlockRef, SignedBlock, VerifiedBlock},
37 block_verifier::BlockVerifier,
38 commit_vote_monitor::CommitVoteMonitor,
39 context::Context,
40 core_thread::CoreThreadDispatcher,
41 dag_state::DagState,
42 error::{ConsensusError, ConsensusResult},
43 network::NetworkClient,
44};
45
46const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
51const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
52
53const MAX_BLOCKS_PER_FETCH: usize = 32;
57
58const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
59
60const SYNC_MISSING_BLOCK_ROUND_THRESHOLD: u32 = 50;
67
68struct BlocksGuard {
69 map: Arc<InflightBlocksMap>,
70 block_refs: BTreeSet<BlockRef>,
71 peer: AuthorityIndex,
72}
73
74impl Drop for BlocksGuard {
75 fn drop(&mut self) {
76 self.map.unlock_blocks(&self.block_refs, self.peer);
77 }
78}
79
80struct InflightBlocksMap {
86 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
87}
88
89impl InflightBlocksMap {
90 fn new() -> Arc<Self> {
91 Arc::new(Self {
92 inner: Mutex::new(HashMap::new()),
93 })
94 }
95
96 fn lock_blocks(
104 self: &Arc<Self>,
105 missing_block_refs: BTreeSet<BlockRef>,
106 peer: AuthorityIndex,
107 ) -> Option<BlocksGuard> {
108 let mut blocks = BTreeSet::new();
109 let mut inner = self.inner.lock();
110
111 for block_ref in missing_block_refs {
112 let authorities = inner.entry(block_ref).or_default();
116 if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
117 && authorities.get(&peer).is_none()
118 {
119 assert!(authorities.insert(peer));
120 blocks.insert(block_ref);
121 }
122 }
123
124 if blocks.is_empty() {
125 None
126 } else {
127 Some(BlocksGuard {
128 map: self.clone(),
129 block_refs: blocks,
130 peer,
131 })
132 }
133 }
134
135 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
140 let mut blocks_to_fetch = self.inner.lock();
142 for block_ref in block_refs {
143 let authorities = blocks_to_fetch
144 .get_mut(block_ref)
145 .expect("Should have found a non empty map");
146
147 assert!(authorities.remove(&peer), "Peer index should be present!");
148
149 if authorities.is_empty() {
151 blocks_to_fetch.remove(block_ref);
152 }
153 }
154 }
155
156 fn swap_locks(
161 self: &Arc<Self>,
162 blocks_guard: BlocksGuard,
163 peer: AuthorityIndex,
164 ) -> Option<BlocksGuard> {
165 let block_refs = blocks_guard.block_refs.clone();
166
167 drop(blocks_guard);
169
170 self.lock_blocks(block_refs, peer)
172 }
173
174 #[cfg(test)]
175 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
176 let inner = self.inner.lock();
177 inner.len()
178 }
179}
180
181enum Command {
182 FetchBlocks {
183 missing_block_refs: BTreeSet<BlockRef>,
184 peer_index: AuthorityIndex,
185 result: oneshot::Sender<Result<(), ConsensusError>>,
186 },
187 FetchOwnLastBlock,
188 KickOffScheduler,
189}
190
191pub(crate) struct SynchronizerHandle {
192 commands_sender: Sender<Command>,
193 tasks: tokio::sync::Mutex<JoinSet<()>>,
194}
195
196impl SynchronizerHandle {
197 pub(crate) async fn fetch_blocks(
200 &self,
201 missing_block_refs: BTreeSet<BlockRef>,
202 peer_index: AuthorityIndex,
203 ) -> ConsensusResult<()> {
204 let (sender, receiver) = oneshot::channel();
205 self.commands_sender
206 .send(Command::FetchBlocks {
207 missing_block_refs,
208 peer_index,
209 result: sender,
210 })
211 .await
212 .map_err(|_err| ConsensusError::Shutdown)?;
213 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
214 }
215
216 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
217 let mut tasks = self.tasks.lock().await;
218 tasks.abort_all();
219 while let Some(result) = tasks.join_next().await {
220 result?
221 }
222 Ok(())
223 }
224}
225
226pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
253 context: Arc<Context>,
254 commands_receiver: Receiver<Command>,
255 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
256 core_dispatcher: Arc<D>,
257 commit_vote_monitor: Arc<CommitVoteMonitor>,
258 dag_state: Arc<RwLock<DagState>>,
259 fetch_blocks_scheduler_task: JoinSet<()>,
260 fetch_own_last_block_task: JoinSet<()>,
261 network_client: Arc<C>,
262 block_verifier: Arc<V>,
263 inflight_blocks_map: Arc<InflightBlocksMap>,
264 commands_sender: Sender<Command>,
265}
266
267impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
268 pub fn start(
271 network_client: Arc<C>,
272 context: Arc<Context>,
273 core_dispatcher: Arc<D>,
274 commit_vote_monitor: Arc<CommitVoteMonitor>,
275 block_verifier: Arc<V>,
276 dag_state: Arc<RwLock<DagState>>,
277 sync_last_known_own_block: bool,
278 ) -> Arc<SynchronizerHandle> {
279 let (commands_sender, commands_receiver) =
280 channel("consensus_synchronizer_commands", 1_000);
281 let inflight_blocks_map = InflightBlocksMap::new();
282
283 let mut fetch_block_senders = BTreeMap::new();
285 let mut tasks = JoinSet::new();
286 for (index, _) in context.committee.authorities() {
287 if index == context.own_index {
288 continue;
289 }
290 let (sender, receiver) =
291 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
292 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
293 index,
294 network_client.clone(),
295 block_verifier.clone(),
296 commit_vote_monitor.clone(),
297 context.clone(),
298 core_dispatcher.clone(),
299 dag_state.clone(),
300 receiver,
301 commands_sender.clone(),
302 );
303 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
304 fetch_block_senders.insert(index, sender);
305 }
306
307 let commands_sender_clone = commands_sender.clone();
308
309 if sync_last_known_own_block {
310 commands_sender
311 .try_send(Command::FetchOwnLastBlock)
312 .expect("Failed to sync our last block");
313 }
314
315 tasks.spawn(monitored_future!(async move {
317 let mut s = Self {
318 context,
319 commands_receiver,
320 fetch_block_senders,
321 core_dispatcher,
322 commit_vote_monitor,
323 fetch_blocks_scheduler_task: JoinSet::new(),
324 fetch_own_last_block_task: JoinSet::new(),
325 network_client,
326 block_verifier,
327 inflight_blocks_map,
328 commands_sender: commands_sender_clone,
329 dag_state,
330 };
331 s.run().await;
332 }));
333
334 Arc::new(SynchronizerHandle {
335 commands_sender,
336 tasks: tokio::sync::Mutex::new(tasks),
337 })
338 }
339
340 async fn run(&mut self) {
342 const SYNCHRONIZER_TIMEOUT: Duration = Duration::from_millis(500);
345 let scheduler_timeout = sleep_until(Instant::now() + SYNCHRONIZER_TIMEOUT);
346
347 tokio::pin!(scheduler_timeout);
348
349 loop {
350 tokio::select! {
351 Some(command) = self.commands_receiver.recv() => {
352 match command {
353 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
354 if peer_index == self.context.own_index {
355 error!("We should never attempt to fetch blocks from our own node");
356 continue;
357 }
358
359 let missing_block_refs = missing_block_refs
362 .into_iter()
363 .take(MAX_BLOCKS_PER_FETCH)
364 .collect();
365
366 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
367 let Some(blocks_guard) = blocks_guard else {
368 result.send(Ok(())).ok();
369 continue;
370 };
371
372 let r = self
375 .fetch_block_senders
376 .get(&peer_index)
377 .expect("Fatal error, sender should be present")
378 .try_send(blocks_guard)
379 .map_err(|err| {
380 match err {
381 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index),
382 TrySendError::Closed(_) => ConsensusError::Shutdown
383 }
384 });
385
386 result.send(r).ok();
387 }
388 Command::FetchOwnLastBlock => {
389 if self.fetch_own_last_block_task.is_empty() {
390 self.start_fetch_own_last_block_task();
391 }
392 }
393 Command::KickOffScheduler => {
394 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
397 Instant::now()
398 } else {
399 Instant::now() + SYNCHRONIZER_TIMEOUT.checked_div(2).unwrap()
400 };
401
402 if timeout < scheduler_timeout.deadline() {
404 scheduler_timeout.as_mut().reset(timeout);
405 }
406 }
407 }
408 },
409 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
410 match result {
411 Ok(()) => {},
412 Err(e) => {
413 if e.is_cancelled() {
414 } else if e.is_panic() {
415 std::panic::resume_unwind(e.into_panic());
416 } else {
417 panic!("fetch our last block task failed: {e}");
418 }
419 },
420 };
421 },
422 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
423 match result {
424 Ok(()) => {},
425 Err(e) => {
426 if e.is_cancelled() {
427 } else if e.is_panic() {
428 std::panic::resume_unwind(e.into_panic());
429 } else {
430 panic!("fetch blocks scheduler task failed: {e}");
431 }
432 },
433 };
434 },
435 () = &mut scheduler_timeout => {
436 if self.fetch_blocks_scheduler_task.is_empty() {
438 if let Err(err) = self.start_fetch_missing_blocks_task().await {
439 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
440 return;
441 };
442 }
443
444 scheduler_timeout
445 .as_mut()
446 .reset(Instant::now() + SYNCHRONIZER_TIMEOUT);
447 }
448 }
449 }
450 }
451
452 async fn fetch_blocks_from_authority(
453 peer_index: AuthorityIndex,
454 network_client: Arc<C>,
455 block_verifier: Arc<V>,
456 commit_vote_monitor: Arc<CommitVoteMonitor>,
457 context: Arc<Context>,
458 core_dispatcher: Arc<D>,
459 dag_state: Arc<RwLock<DagState>>,
460 mut receiver: Receiver<BlocksGuard>,
461 commands_sender: Sender<Command>,
462 ) {
463 const MAX_RETRIES: u32 = 5;
464 let peer_hostname = &context.committee.authority(peer_index).hostname;
465
466 let mut requests = FuturesUnordered::new();
467
468 loop {
469 tokio::select! {
470 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
471 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
473
474 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, 1))
475 },
476 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
477 match response {
478 Ok(blocks) => {
479 if let Err(err) = Self::process_fetched_blocks(blocks,
480 peer_index,
481 blocks_guard,
482 core_dispatcher.clone(),
483 block_verifier.clone(),
484 commit_vote_monitor.clone(),
485 context.clone(),
486 commands_sender.clone(),
487 "live"
488 ).await {
489 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
490 }
491 },
492 Err(_) => {
493 if retries <= MAX_RETRIES {
494 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
495 } else {
496 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
497 drop(blocks_guard);
499 }
500 }
501 }
502 },
503 else => {
504 info!("Fetching blocks from authority {peer_index} task will now abort.");
505 break;
506 }
507 }
508 }
509 }
510
511 async fn process_fetched_blocks(
515 serialized_blocks: Vec<Bytes>,
516 peer_index: AuthorityIndex,
517 requested_blocks_guard: BlocksGuard,
518 core_dispatcher: Arc<D>,
519 block_verifier: Arc<V>,
520 commit_vote_monitor: Arc<CommitVoteMonitor>,
521 context: Arc<Context>,
522 commands_sender: Sender<Command>,
523 sync_method: &str,
524 ) -> ConsensusResult<()> {
525 const MAX_ADDITIONAL_BLOCKS: usize = 10;
528
529 if serialized_blocks.len() > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
532 {
533 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
534 }
535
536 let blocks = Handle::current()
538 .spawn_blocking({
539 let block_verifier = block_verifier.clone();
540 let context = context.clone();
541 move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
542 })
543 .await
544 .expect("Spawn blocking should not fail")?;
545
546 let ancestors = blocks
548 .iter()
549 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
550 .flat_map(|b| b.ancestors().to_vec())
551 .collect::<BTreeSet<BlockRef>>();
552
553 for block in &blocks {
556 if !requested_blocks_guard
557 .block_refs
558 .contains(&block.reference())
559 && !ancestors.contains(&block.reference())
560 {
561 return Err(ConsensusError::UnexpectedFetchedBlock {
562 index: peer_index,
563 block_ref: block.reference(),
564 });
565 }
566 }
567
568 for block in &blocks {
570 commit_vote_monitor.observe_block(block);
571 }
572
573 let metrics = &context.metrics.node_metrics;
574 let peer_hostname = &context.committee.authority(peer_index).hostname;
575 metrics
576 .synchronizer_fetched_blocks_by_peer
577 .with_label_values(&[peer_hostname.as_str(), sync_method])
578 .inc_by(blocks.len() as u64);
579 for block in &blocks {
580 let block_hostname = &context.committee.authority(block.author()).hostname;
581 metrics
582 .synchronizer_fetched_blocks_by_authority
583 .with_label_values(&[block_hostname.as_str(), sync_method])
584 .inc();
585 }
586
587 debug!(
588 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
589 blocks.len(),
590 blocks.iter().map(|b| b.reference().to_string()).join(", "),
591 );
592
593 let missing_blocks = core_dispatcher
597 .add_blocks(blocks)
598 .await
599 .map_err(|_| ConsensusError::Shutdown)?;
600
601 drop(requested_blocks_guard);
604
605 if !missing_blocks.is_empty() {
607 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
609 {
610 warn!("Commands channel is full")
611 }
612 }
613
614 context
615 .metrics
616 .node_metrics
617 .missing_blocks_after_fetch_total
618 .inc_by(missing_blocks.len() as u64);
619
620 Ok(())
621 }
622
623 fn get_highest_accepted_rounds(
624 dag_state: Arc<RwLock<DagState>>,
625 context: &Arc<Context>,
626 ) -> Vec<Round> {
627 let blocks = dag_state
628 .read()
629 .get_last_cached_block_per_authority(Round::MAX);
630 assert_eq!(blocks.len(), context.committee.size());
631
632 blocks
633 .into_iter()
634 .map(|(block, _)| block.round())
635 .collect::<Vec<_>>()
636 }
637
638 fn verify_blocks(
639 serialized_blocks: Vec<Bytes>,
640 block_verifier: Arc<V>,
641 context: &Context,
642 peer_index: AuthorityIndex,
643 ) -> ConsensusResult<Vec<VerifiedBlock>> {
644 let mut verified_blocks = Vec::new();
645
646 for serialized_block in serialized_blocks {
647 let signed_block: SignedBlock =
648 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
649
650 if let Err(e) = block_verifier.verify(&signed_block) {
652 let hostname = context.committee.authority(peer_index).hostname.clone();
655
656 context
657 .metrics
658 .node_metrics
659 .invalid_blocks
660 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
661 .inc();
662 warn!("Invalid block received from {}: {}", peer_index, e);
663 return Err(e);
664 }
665 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
666
667 let now = context.clock.timestamp_utc_ms();
671 if now < verified_block.timestamp_ms() {
672 warn!(
673 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
674 verified_block.reference(),
675 verified_block.timestamp_ms(),
676 now
677 );
678 continue;
679 }
680
681 verified_blocks.push(verified_block);
682 }
683
684 Ok(verified_blocks)
685 }
686
687 async fn fetch_blocks_request(
688 network_client: Arc<C>,
689 peer: AuthorityIndex,
690 blocks_guard: BlocksGuard,
691 highest_rounds: Vec<Round>,
692 request_timeout: Duration,
693 mut retries: u32,
694 ) -> (
695 ConsensusResult<Vec<Bytes>>,
696 BlocksGuard,
697 u32,
698 AuthorityIndex,
699 Vec<Round>,
700 ) {
701 let start = Instant::now();
702 let resp = timeout(
703 request_timeout,
704 network_client.fetch_blocks(
705 peer,
706 blocks_guard
707 .block_refs
708 .clone()
709 .into_iter()
710 .collect::<Vec<_>>(),
711 highest_rounds.clone(),
712 request_timeout,
713 ),
714 )
715 .await;
716
717 fail_point_async!("consensus-delay");
718
719 let resp = match resp {
720 Ok(Err(err)) => {
721 sleep_until(start + request_timeout).await;
724 retries += 1;
725 Err(err)
726 } Err(err) => {
728 sleep_until(start + request_timeout).await;
730 retries += 1;
731 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
732 }
733 Ok(result) => result,
734 };
735 (resp, blocks_guard, retries, peer, highest_rounds)
736 }
737
738 fn start_fetch_own_last_block_task(&mut self) {
739 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
740 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
741
742 let context = self.context.clone();
743 let dag_state = self.dag_state.clone();
744 let network_client = self.network_client.clone();
745 let block_verifier = self.block_verifier.clone();
746 let core_dispatcher = self.core_dispatcher.clone();
747
748 self.fetch_own_last_block_task
749 .spawn(monitored_future!(async move {
750 let _scope = monitored_scope("FetchOwnLastBlockTask");
751
752 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
753 let network_client_cloned = network_client.clone();
754 let own_index = context.own_index;
755 async move {
756 sleep(fetch_own_block_delay).await;
757 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
758 (r, authority_index)
759 }
760 };
761
762 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
763 let mut result = Vec::new();
764 for serialized_block in blocks {
765 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
766 block_verifier.verify(&signed_block).tap_err(|err|{
767 let hostname = context.committee.authority(authority_index).hostname.clone();
768 context
769 .metrics
770 .node_metrics
771 .invalid_blocks
772 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
773 .inc();
774 warn!("Invalid block received from {}: {}", authority_index, err);
775 })?;
776
777 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
778 if verified_block.author() != context.own_index {
779 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
780 }
781 result.push(verified_block);
782 }
783 Ok(result)
784 };
785
786 let mut highest_round;
788 let mut retries = 0;
789 let mut retry_delay_step = Duration::from_millis(500);
790 'main:loop {
791 if context.committee.size() == 1 {
792 highest_round = dag_state.read().get_last_proposed_block().round();
793 info!("Only one node in the network, will not try fetching own last block from peers.");
794 break 'main;
795 }
796
797 let mut total_stake = 0;
798 highest_round = 0;
799
800 let mut results = FuturesUnordered::new();
802
803 for (authority_index, _authority) in context.committee.authorities() {
804 if authority_index != context.own_index {
805 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
806 }
807 }
808
809 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
811 tokio::pin!(timer);
812
813 'inner: loop {
814 tokio::select! {
815 result = results.next() => {
816 let Some((result, authority_index)) = result else {
817 break 'inner;
818 };
819 match result {
820 Ok(result) => {
821 match process_blocks(result, authority_index) {
822 Ok(blocks) => {
823 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
824 highest_round = highest_round.max(max_round);
825
826 total_stake += context.committee.stake(authority_index);
827 },
828 Err(err) => {
829 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
830 }
831 }
832 },
833 Err(err) => {
834 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
835 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
836 }
837 }
838 },
839 () = &mut timer => {
840 info!("Timeout while trying to sync our own last block from peers");
841 break 'inner;
842 }
843 }
844 }
845
846 if context.committee.reached_validity(total_stake) {
848 info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
849 break 'main;
850 }
851
852 retries += 1;
853 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
854 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
855
856 sleep(retry_delay_step).await;
857
858 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
859 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
860 }
861
862 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
864
865 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
866 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
867 }
868 }));
869 }
870
871 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
872 let mut missing_blocks = self
873 .core_dispatcher
874 .get_missing_blocks()
875 .await
876 .map_err(|_err| ConsensusError::Shutdown)?;
877
878 if missing_blocks.is_empty() {
880 return Ok(());
881 }
882
883 let context = self.context.clone();
884 let network_client = self.network_client.clone();
885 let block_verifier = self.block_verifier.clone();
886 let commit_vote_monitor = self.commit_vote_monitor.clone();
887 let core_dispatcher = self.core_dispatcher.clone();
888 let blocks_to_fetch = self.inflight_blocks_map.clone();
889 let commands_sender = self.commands_sender.clone();
890 let dag_state = self.dag_state.clone();
891
892 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
893 if commit_lagging {
894 if dag_state.read().gc_enabled() {
899 return Ok(());
900 }
901
902 let highest_accepted_round = dag_state.read().highest_accepted_round();
906 missing_blocks = missing_blocks
907 .into_iter()
908 .take_while(|b| {
909 b.round <= highest_accepted_round + SYNC_MISSING_BLOCK_ROUND_THRESHOLD
910 })
911 .collect::<BTreeSet<_>>();
912
913 if missing_blocks.is_empty() {
916 trace!(
917 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
918 );
919 self.context
920 .metrics
921 .node_metrics
922 .fetch_blocks_scheduler_skipped
923 .with_label_values(&["commit_lagging"])
924 .inc();
925 return Ok(());
926 }
927 }
928
929 self.fetch_blocks_scheduler_task
930 .spawn(monitored_future!(async move {
931 let _scope = monitored_scope("FetchMissingBlocksScheduler");
932
933 context
934 .metrics
935 .node_metrics
936 .fetch_blocks_scheduler_inflight
937 .inc();
938 let total_requested = missing_blocks.len();
939
940 fail_point_async!("consensus-delay");
941
942 let results = Self::fetch_blocks_from_authorities(
944 context.clone(),
945 blocks_to_fetch.clone(),
946 network_client,
947 missing_blocks,
948 dag_state,
949 )
950 .await;
951 context
952 .metrics
953 .node_metrics
954 .fetch_blocks_scheduler_inflight
955 .dec();
956 if results.is_empty() {
957 warn!("No results returned while requesting missing blocks");
958 return;
959 }
960
961 let mut total_fetched = 0;
963 for (blocks_guard, fetched_blocks, peer) in results {
964 total_fetched += fetched_blocks.len();
965
966 if let Err(err) = Self::process_fetched_blocks(
967 fetched_blocks,
968 peer,
969 blocks_guard,
970 core_dispatcher.clone(),
971 block_verifier.clone(),
972 commit_vote_monitor.clone(),
973 context.clone(),
974 commands_sender.clone(),
975 "periodic",
976 )
977 .await
978 {
979 warn!(
980 "Error occurred while processing fetched blocks from peer {peer}: {err}"
981 );
982 }
983 }
984
985 debug!(
986 "Total blocks requested to fetch: {}, total fetched: {}",
987 total_requested, total_fetched
988 );
989 }));
990 Ok(())
991 }
992
993 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
994 let last_commit_index = self.dag_state.read().last_commit_index();
995 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
996 let commit_threshold = last_commit_index
997 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
998 (
999 commit_threshold < quorum_commit_index,
1000 last_commit_index,
1001 quorum_commit_index,
1002 )
1003 }
1004
1005 async fn fetch_blocks_from_authorities(
1012 context: Arc<Context>,
1013 inflight_blocks: Arc<InflightBlocksMap>,
1014 network_client: Arc<C>,
1015 missing_blocks: BTreeSet<BlockRef>,
1016 dag_state: Arc<RwLock<DagState>>,
1017 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1018 const MAX_PEERS: usize = 3;
1019
1020 let missing_blocks = missing_blocks
1022 .into_iter()
1023 .take(MAX_PEERS * MAX_BLOCKS_PER_FETCH)
1024 .collect::<Vec<_>>();
1025
1026 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1027 for block in &missing_blocks {
1028 missing_blocks_per_authority[block.author] += 1;
1029 }
1030 for (missing, (_, authority)) in missing_blocks_per_authority
1031 .into_iter()
1032 .zip(context.committee.authorities())
1033 {
1034 context
1035 .metrics
1036 .node_metrics
1037 .synchronizer_missing_blocks_by_authority
1038 .with_label_values(&[&authority.hostname])
1039 .inc_by(missing as u64);
1040 context
1041 .metrics
1042 .node_metrics
1043 .synchronizer_current_missing_blocks_by_authority
1044 .with_label_values(&[&authority.hostname])
1045 .set(missing as i64);
1046 }
1047
1048 #[cfg_attr(test, expect(unused_mut))]
1049 let mut peers = context
1050 .committee
1051 .authorities()
1052 .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1053 .collect::<Vec<_>>();
1054
1055 #[cfg(not(test))]
1057 peers.shuffle(&mut ThreadRng::default());
1058
1059 let mut peers = peers.into_iter();
1060 let mut request_futures = FuturesUnordered::new();
1061
1062 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1063
1064 for blocks in missing_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1066 let peer = peers
1067 .next()
1068 .expect("Possible misconfiguration as a peer should be found");
1069 let peer_hostname = &context.committee.authority(peer).hostname;
1070 let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();
1071
1072 if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1075 info!(
1076 "Periodic sync of {} missing blocks from peer {} {}: {}",
1077 block_refs.len(),
1078 peer,
1079 peer_hostname,
1080 block_refs
1081 .iter()
1082 .map(|b| b.to_string())
1083 .collect::<Vec<_>>()
1084 .join(", ")
1085 );
1086 request_futures.push(Self::fetch_blocks_request(
1087 network_client.clone(),
1088 peer,
1089 blocks_guard,
1090 highest_rounds.clone(),
1091 FETCH_REQUEST_TIMEOUT,
1092 1,
1093 ));
1094 }
1095 }
1096
1097 let mut results = Vec::new();
1098 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1099
1100 tokio::pin!(fetcher_timeout);
1101
1102 loop {
1103 tokio::select! {
1104 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1105 let peer_hostname = &context.committee.authority(peer_index).hostname;
1106 match response {
1107 Ok(fetched_blocks) => {
1108 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1109 results.push((blocks_guard, fetched_blocks, peer_index));
1110
1111 if request_futures.is_empty() {
1113 break;
1114 }
1115 },
1116 Err(_) => {
1117 if let Some(next_peer) = peers.next() {
1119 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1121 info!(
1122 "Retrying syncing {} missing blocks from peer {}: {}",
1123 blocks_guard.block_refs.len(),
1124 peer_hostname,
1125 blocks_guard.block_refs
1126 .iter()
1127 .map(|b| b.to_string())
1128 .collect::<Vec<_>>()
1129 .join(", ")
1130 );
1131 request_futures.push(Self::fetch_blocks_request(
1132 network_client.clone(),
1133 next_peer,
1134 blocks_guard,
1135 highest_rounds,
1136 FETCH_REQUEST_TIMEOUT,
1137 1,
1138 ));
1139 } else {
1140 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1141 }
1142 } else {
1143 debug!("No more peers left to fetch blocks");
1144 }
1145 }
1146 }
1147 },
1148 _ = &mut fetcher_timeout => {
1149 debug!("Timed out while fetching missing blocks");
1150 break;
1151 }
1152 }
1153 }
1154
1155 results
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 use std::{
1162 collections::{BTreeMap, BTreeSet},
1163 sync::Arc,
1164 time::Duration,
1165 };
1166
1167 use async_trait::async_trait;
1168 use bytes::Bytes;
1169 use consensus_config::{AuthorityIndex, Parameters};
1170 use parking_lot::RwLock;
1171 use tokio::{sync::Mutex, time::sleep};
1172
1173 use crate::{
1174 BlockAPI, CommitDigest, CommitIndex,
1175 authority_service::COMMIT_LAG_MULTIPLIER,
1176 block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1177 block_verifier::NoopBlockVerifier,
1178 commit::{CommitRange, CommitVote, TrustedCommit},
1179 commit_vote_monitor::CommitVoteMonitor,
1180 context::Context,
1181 core_thread::{CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1182 dag_state::DagState,
1183 error::{ConsensusError, ConsensusResult},
1184 network::{BlockStream, NetworkClient},
1185 storage::mem_store::MemStore,
1186 synchronizer::{
1187 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap,
1188 MAX_BLOCKS_PER_FETCH, SYNC_MISSING_BLOCK_ROUND_THRESHOLD, Synchronizer,
1189 },
1190 };
1191
1192 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1193 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1194 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1195 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1196
1197 #[derive(Default)]
1198 struct MockNetworkClient {
1199 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1200 fetch_latest_blocks_requests:
1201 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1202 }
1203
1204 impl MockNetworkClient {
1205 async fn stub_fetch_blocks(
1206 &self,
1207 blocks: Vec<VerifiedBlock>,
1208 peer: AuthorityIndex,
1209 latency: Option<Duration>,
1210 ) {
1211 let mut lock = self.fetch_blocks_requests.lock().await;
1212 let block_refs = blocks
1213 .iter()
1214 .map(|block| block.reference())
1215 .collect::<Vec<_>>();
1216 lock.insert((block_refs, peer), (blocks, latency));
1217 }
1218
1219 async fn stub_fetch_latest_blocks(
1220 &self,
1221 blocks: Vec<VerifiedBlock>,
1222 peer: AuthorityIndex,
1223 authorities: Vec<AuthorityIndex>,
1224 latency: Option<Duration>,
1225 ) {
1226 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1227 lock.entry((peer, authorities))
1228 .or_default()
1229 .push((blocks, latency));
1230 }
1231
1232 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1233 let lock = self.fetch_latest_blocks_requests.lock().await;
1234 lock.len()
1235 }
1236 }
1237
1238 #[async_trait]
1239 impl NetworkClient for MockNetworkClient {
1240 const SUPPORT_STREAMING: bool = false;
1241
1242 async fn send_block(
1243 &self,
1244 _peer: AuthorityIndex,
1245 _serialized_block: &VerifiedBlock,
1246 _timeout: Duration,
1247 ) -> ConsensusResult<()> {
1248 unimplemented!("Unimplemented")
1249 }
1250
1251 async fn subscribe_blocks(
1252 &self,
1253 _peer: AuthorityIndex,
1254 _last_received: Round,
1255 _timeout: Duration,
1256 ) -> ConsensusResult<BlockStream> {
1257 unimplemented!("Unimplemented")
1258 }
1259
1260 async fn fetch_blocks(
1261 &self,
1262 peer: AuthorityIndex,
1263 block_refs: Vec<BlockRef>,
1264 _highest_accepted_rounds: Vec<Round>,
1265 _timeout: Duration,
1266 ) -> ConsensusResult<Vec<Bytes>> {
1267 let mut lock = self.fetch_blocks_requests.lock().await;
1268 let response = lock
1269 .remove(&(block_refs, peer))
1270 .expect("Unexpected fetch blocks request made");
1271
1272 let serialised = response
1273 .0
1274 .into_iter()
1275 .map(|block| block.serialized().clone())
1276 .collect::<Vec<_>>();
1277
1278 drop(lock);
1279
1280 if let Some(latency) = response.1 {
1281 sleep(latency).await;
1282 }
1283
1284 Ok(serialised)
1285 }
1286
1287 async fn fetch_commits(
1288 &self,
1289 _peer: AuthorityIndex,
1290 _commit_range: CommitRange,
1291 _timeout: Duration,
1292 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1293 unimplemented!("Unimplemented")
1294 }
1295
1296 async fn fetch_latest_blocks(
1297 &self,
1298 peer: AuthorityIndex,
1299 authorities: Vec<AuthorityIndex>,
1300 _timeout: Duration,
1301 ) -> ConsensusResult<Vec<Bytes>> {
1302 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1303 let mut responses = lock
1304 .remove(&(peer, authorities.clone()))
1305 .expect("Unexpected fetch blocks request made");
1306
1307 let response = responses.remove(0);
1308 let serialised = response
1309 .0
1310 .into_iter()
1311 .map(|block| block.serialized().clone())
1312 .collect::<Vec<_>>();
1313
1314 if !responses.is_empty() {
1315 lock.insert((peer, authorities), responses);
1316 }
1317
1318 drop(lock);
1319
1320 if let Some(latency) = response.1 {
1321 sleep(latency).await;
1322 }
1323
1324 Ok(serialised)
1325 }
1326
1327 async fn get_latest_rounds(
1328 &self,
1329 _peer: AuthorityIndex,
1330 _timeout: Duration,
1331 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1332 unimplemented!("Unimplemented")
1333 }
1334 }
1335
1336 #[test]
1337 fn inflight_blocks_map() {
1338 let map = InflightBlocksMap::new();
1340 let some_block_refs = [
1341 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1342 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1343 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1344 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1345 ];
1346 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1347
1348 {
1350 let mut all_guards = Vec::new();
1351
1352 for i in 1..=2 {
1354 let authority = AuthorityIndex::new_for_test(i);
1355
1356 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1357 let guard = guard.expect("Guard should be created");
1358 assert_eq!(guard.block_refs.len(), 4);
1359
1360 all_guards.push(guard);
1361
1362 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1364 assert!(guard.is_none());
1365 }
1366
1367 let authority_3 = AuthorityIndex::new_for_test(3);
1370
1371 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1372 assert!(guard.is_none());
1373
1374 drop(all_guards.remove(0));
1377
1378 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1379 let guard = guard.expect("Guard should be successfully acquired");
1380
1381 assert_eq!(guard.block_refs, missing_block_refs);
1382
1383 drop(guard);
1385 drop(all_guards);
1386
1387 assert_eq!(map.num_of_locked_blocks(), 0);
1388 }
1389
1390 {
1392 let authority_1 = AuthorityIndex::new_for_test(1);
1394 let guard = map
1395 .lock_blocks(missing_block_refs.clone(), authority_1)
1396 .unwrap();
1397
1398 let authority_2 = AuthorityIndex::new_for_test(2);
1400 let guard = map.swap_locks(guard, authority_2);
1401
1402 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1403 }
1404 }
1405
1406 #[tokio::test]
1407 async fn successful_fetch_blocks_from_peer() {
1408 let (context, _) = Context::new_for_test(4);
1410 let context = Arc::new(context);
1411 let block_verifier = Arc::new(NoopBlockVerifier {});
1412 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1413 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1414 let network_client = Arc::new(MockNetworkClient::default());
1415 let store = Arc::new(MemStore::new());
1416 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1417
1418 let handle = Synchronizer::start(
1419 network_client.clone(),
1420 context,
1421 core_dispatcher.clone(),
1422 commit_vote_monitor,
1423 block_verifier,
1424 dag_state,
1425 false,
1426 );
1427
1428 let expected_blocks = (0..10)
1430 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1431 .collect::<Vec<_>>();
1432 let missing_blocks = expected_blocks
1433 .iter()
1434 .map(|block| block.reference())
1435 .collect::<BTreeSet<_>>();
1436
1437 let peer = AuthorityIndex::new_for_test(1);
1439 network_client
1440 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1441 .await;
1442
1443 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1445
1446 sleep(Duration::from_millis(1_000)).await;
1448
1449 let added_blocks = core_dispatcher.get_add_blocks().await;
1451 assert_eq!(added_blocks, expected_blocks);
1452 }
1453
1454 #[tokio::test]
1455 async fn saturate_fetch_blocks_from_peer() {
1456 let (context, _) = Context::new_for_test(4);
1458 let context = Arc::new(context);
1459 let block_verifier = Arc::new(NoopBlockVerifier {});
1460 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1461 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1462 let network_client = Arc::new(MockNetworkClient::default());
1463 let store = Arc::new(MemStore::new());
1464 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1465
1466 let handle = Synchronizer::start(
1467 network_client.clone(),
1468 context,
1469 core_dispatcher.clone(),
1470 commit_vote_monitor,
1471 block_verifier,
1472 dag_state,
1473 false,
1474 );
1475
1476 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1478 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1479 .collect::<Vec<_>>();
1480
1481 let peer = AuthorityIndex::new_for_test(1);
1483 let mut iter = expected_blocks.iter().peekable();
1484 while let Some(block) = iter.next() {
1485 network_client
1488 .stub_fetch_blocks(
1489 vec![block.clone()],
1490 peer,
1491 Some(Duration::from_millis(5_000)),
1492 )
1493 .await;
1494
1495 let mut missing_blocks = BTreeSet::new();
1496 missing_blocks.insert(block.reference());
1497
1498 if iter.peek().is_none() {
1501 match handle.fetch_blocks(missing_blocks, peer).await {
1502 Err(ConsensusError::SynchronizerSaturated(index)) => {
1503 assert_eq!(index, peer);
1504 }
1505 _ => panic!("A saturated synchronizer error was expected"),
1506 }
1507 } else {
1508 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1509 }
1510 }
1511 }
1512
1513 #[tokio::test(flavor = "current_thread", start_paused = true)]
1514 async fn synchronizer_periodic_task_fetch_blocks() {
1515 let (context, _) = Context::new_for_test(4);
1517 let context = Arc::new(context);
1518 let block_verifier = Arc::new(NoopBlockVerifier {});
1519 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1520 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1521 let network_client = Arc::new(MockNetworkClient::default());
1522 let store = Arc::new(MemStore::new());
1523 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1524
1525 let expected_blocks = (0..10)
1527 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1528 .collect::<Vec<_>>();
1529 let missing_blocks = expected_blocks
1530 .iter()
1531 .map(|block| block.reference())
1532 .collect::<BTreeSet<_>>();
1533
1534 core_dispatcher
1536 .stub_missing_blocks(missing_blocks.clone())
1537 .await;
1538
1539 network_client
1543 .stub_fetch_blocks(
1544 expected_blocks.clone(),
1545 AuthorityIndex::new_for_test(1),
1546 Some(FETCH_REQUEST_TIMEOUT),
1547 )
1548 .await;
1549 network_client
1550 .stub_fetch_blocks(
1551 expected_blocks.clone(),
1552 AuthorityIndex::new_for_test(2),
1553 None,
1554 )
1555 .await;
1556
1557 let _handle = Synchronizer::start(
1559 network_client.clone(),
1560 context,
1561 core_dispatcher.clone(),
1562 commit_vote_monitor,
1563 block_verifier,
1564 dag_state,
1565 false,
1566 );
1567
1568 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1569
1570 let added_blocks = core_dispatcher.get_add_blocks().await;
1572 assert_eq!(added_blocks, expected_blocks);
1573
1574 assert!(
1576 core_dispatcher
1577 .get_missing_blocks()
1578 .await
1579 .unwrap()
1580 .is_empty()
1581 );
1582 }
1583
1584 #[tokio::test(flavor = "current_thread", start_paused = true)]
1585 async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds()
1586 {
1587 let (mut context, _) = Context::new_for_test(4);
1589
1590 context
1593 .protocol_config
1594 .set_consensus_gc_depth_for_testing(0);
1595
1596 let context = Arc::new(context);
1597 let block_verifier = Arc::new(NoopBlockVerifier {});
1598 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1599 let network_client = Arc::new(MockNetworkClient::default());
1600 let store = Arc::new(MemStore::new());
1601 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1602 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1603
1604 let expected_blocks = (0..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2)
1607 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1608 .collect::<Vec<_>>();
1609
1610 let missing_blocks = expected_blocks
1611 .iter()
1612 .map(|block| block.reference())
1613 .collect::<BTreeSet<_>>();
1614 core_dispatcher.stub_missing_blocks(missing_blocks).await;
1615
1616 let mut expected_blocks = expected_blocks
1620 .into_iter()
1621 .filter(|block| block.round() <= SYNC_MISSING_BLOCK_ROUND_THRESHOLD)
1622 .collect::<Vec<_>>();
1623
1624 for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1625 network_client
1626 .stub_fetch_blocks(
1627 chunk.to_vec(),
1628 AuthorityIndex::new_for_test(1),
1629 Some(FETCH_REQUEST_TIMEOUT),
1630 )
1631 .await;
1632
1633 network_client
1634 .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1635 .await;
1636 }
1637
1638 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1640 let commit_index: CommitIndex = round - 1;
1641 let blocks = (0..4)
1642 .map(|authority| {
1643 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1644 let block = TestBlock::new(round, authority)
1645 .set_commit_votes(commit_votes)
1646 .build();
1647
1648 VerifiedBlock::new_for_test(block)
1649 })
1650 .collect::<Vec<_>>();
1651
1652 for block in blocks {
1655 commit_vote_monitor.observe_block(&block);
1656 }
1657
1658 let _handle = Synchronizer::start(
1661 network_client.clone(),
1662 context.clone(),
1663 core_dispatcher.clone(),
1664 commit_vote_monitor.clone(),
1665 block_verifier.clone(),
1666 dag_state.clone(),
1667 false,
1668 );
1669
1670 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1671
1672 let mut added_blocks = core_dispatcher.get_add_blocks().await;
1675
1676 added_blocks.sort_by_key(|block| block.reference());
1677 expected_blocks.sort_by_key(|block| block.reference());
1678
1679 assert_eq!(added_blocks, expected_blocks);
1680 }
1681
1682 #[tokio::test(flavor = "current_thread", start_paused = true)]
1683 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1684 let (context, _) = Context::new_for_test(4);
1686 let context = Arc::new(context);
1687 let block_verifier = Arc::new(NoopBlockVerifier {});
1688 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1689 let network_client = Arc::new(MockNetworkClient::default());
1690 let store = Arc::new(MemStore::new());
1691 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1692 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1693
1694 let mut expected_blocks = (SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2
1697 ..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 3)
1698 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1699 .collect::<Vec<_>>();
1700 let missing_blocks = expected_blocks
1701 .iter()
1702 .map(|block| block.reference())
1703 .collect::<BTreeSet<_>>();
1704 core_dispatcher
1705 .stub_missing_blocks(missing_blocks.clone())
1706 .await;
1707
1708 for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1712 network_client
1713 .stub_fetch_blocks(
1714 chunk.to_vec(),
1715 AuthorityIndex::new_for_test(1),
1716 Some(FETCH_REQUEST_TIMEOUT),
1717 )
1718 .await;
1719 network_client
1720 .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1721 .await;
1722 }
1723
1724 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1726 let commit_index: CommitIndex = round - 1;
1727 let blocks = (0..4)
1728 .map(|authority| {
1729 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1730 let block = TestBlock::new(round, authority)
1731 .set_commit_votes(commit_votes)
1732 .build();
1733
1734 VerifiedBlock::new_for_test(block)
1735 })
1736 .collect::<Vec<_>>();
1737
1738 for block in blocks {
1741 commit_vote_monitor.observe_block(&block);
1742 }
1743
1744 let _handle = Synchronizer::start(
1747 network_client.clone(),
1748 context.clone(),
1749 core_dispatcher.clone(),
1750 commit_vote_monitor.clone(),
1751 block_verifier,
1752 dag_state.clone(),
1753 false,
1754 );
1755
1756 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1757
1758 let added_blocks = core_dispatcher.get_add_blocks().await;
1761 assert_eq!(added_blocks, vec![]);
1762
1763 {
1766 let mut d = dag_state.write();
1767 for index in 1..=commit_index {
1768 let commit =
1769 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1770
1771 d.add_commit(commit);
1772 }
1773
1774 assert_eq!(
1775 d.last_commit_index(),
1776 commit_vote_monitor.quorum_commit_index()
1777 );
1778 }
1779
1780 core_dispatcher
1782 .stub_missing_blocks(missing_blocks.clone())
1783 .await;
1784
1785 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1786
1787 let mut added_blocks = core_dispatcher.get_add_blocks().await;
1789
1790 added_blocks.sort_by_key(|block| block.reference());
1791 expected_blocks.sort_by_key(|block| block.reference());
1792
1793 assert_eq!(added_blocks, expected_blocks);
1794 }
1795
1796 #[tokio::test(flavor = "current_thread", start_paused = true)]
1797 async fn synchronizer_fetch_own_last_block() {
1798 let (context, _) = Context::new_for_test(4);
1800 let context = Arc::new(context.with_parameters(Parameters {
1801 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1802 ..Default::default()
1803 }));
1804 let block_verifier = Arc::new(NoopBlockVerifier {});
1805 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1806 let network_client = Arc::new(MockNetworkClient::default());
1807 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1808 let store = Arc::new(MemStore::new());
1809 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1810 let our_index = AuthorityIndex::new_for_test(0);
1811
1812 let mut expected_blocks = (9..=10)
1814 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1815 .collect::<Vec<_>>();
1816
1817 let block_1 = expected_blocks.pop().unwrap();
1820 network_client
1821 .stub_fetch_latest_blocks(
1822 vec![block_1.clone()],
1823 AuthorityIndex::new_for_test(1),
1824 vec![our_index],
1825 None,
1826 )
1827 .await;
1828 network_client
1829 .stub_fetch_latest_blocks(
1830 vec![block_1],
1831 AuthorityIndex::new_for_test(1),
1832 vec![our_index],
1833 None,
1834 )
1835 .await;
1836
1837 let block_2 = expected_blocks.pop().unwrap();
1839 network_client
1840 .stub_fetch_latest_blocks(
1841 vec![block_2.clone()],
1842 AuthorityIndex::new_for_test(2),
1843 vec![our_index],
1844 Some(Duration::from_secs(10)),
1845 )
1846 .await;
1847 network_client
1848 .stub_fetch_latest_blocks(
1849 vec![block_2],
1850 AuthorityIndex::new_for_test(2),
1851 vec![our_index],
1852 None,
1853 )
1854 .await;
1855
1856 network_client
1858 .stub_fetch_latest_blocks(
1859 vec![],
1860 AuthorityIndex::new_for_test(3),
1861 vec![our_index],
1862 Some(Duration::from_secs(10)),
1863 )
1864 .await;
1865 network_client
1866 .stub_fetch_latest_blocks(
1867 vec![],
1868 AuthorityIndex::new_for_test(3),
1869 vec![our_index],
1870 None,
1871 )
1872 .await;
1873
1874 let handle = Synchronizer::start(
1876 network_client.clone(),
1877 context.clone(),
1878 core_dispatcher.clone(),
1879 commit_vote_monitor,
1880 block_verifier,
1881 dag_state,
1882 true,
1883 );
1884
1885 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1887
1888 assert_eq!(
1890 core_dispatcher.get_last_own_proposed_round().await,
1891 vec![10]
1892 );
1893
1894 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
1896
1897 assert_eq!(
1899 context
1900 .metrics
1901 .node_metrics
1902 .sync_last_known_own_block_retries
1903 .get(),
1904 1
1905 );
1906
1907 if let Err(err) = handle.stop().await {
1909 if err.is_panic() {
1910 std::panic::resume_unwind(err.into_panic());
1911 }
1912 }
1913 }
1914}