1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 sync::Arc,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16 monitored_future,
17 monitored_mpsc::{Receiver, Sender, channel},
18 monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::{prelude::SliceRandom, rngs::ThreadRng};
24use tap::TapFallible;
25use tokio::{
26 runtime::Handle,
27 sync::{mpsc::error::TrySendError, oneshot},
28 task::{JoinError, JoinSet},
29 time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34 BlockAPI, CommitIndex, Round,
35 authority_service::COMMIT_LAG_MULTIPLIER,
36 block::{BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
37 block_verifier::BlockVerifier,
38 commit_vote_monitor::CommitVoteMonitor,
39 context::Context,
40 core_thread::CoreThreadDispatcher,
41 dag_state::DagState,
42 error::{ConsensusError, ConsensusResult},
43 network::NetworkClient,
44};
45
46const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
51const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
52
53const MAX_BLOCKS_PER_FETCH: usize = 32;
57
58const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
59
60const SYNC_MISSING_BLOCK_ROUND_THRESHOLD: u32 = 50;
67
68struct BlocksGuard {
69 map: Arc<InflightBlocksMap>,
70 block_refs: BTreeSet<BlockRef>,
71 peer: AuthorityIndex,
72}
73
74impl Drop for BlocksGuard {
75 fn drop(&mut self) {
76 self.map.unlock_blocks(&self.block_refs, self.peer);
77 }
78}
79
80struct InflightBlocksMap {
86 inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
87}
88
89impl InflightBlocksMap {
90 fn new() -> Arc<Self> {
91 Arc::new(Self {
92 inner: Mutex::new(HashMap::new()),
93 })
94 }
95
96 fn lock_blocks(
104 self: &Arc<Self>,
105 missing_block_refs: BTreeSet<BlockRef>,
106 peer: AuthorityIndex,
107 ) -> Option<BlocksGuard> {
108 let mut blocks = BTreeSet::new();
109 let mut inner = self.inner.lock();
110
111 for block_ref in missing_block_refs {
112 let authorities = inner.entry(block_ref).or_default();
116 if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
117 && authorities.get(&peer).is_none()
118 {
119 assert!(authorities.insert(peer));
120 blocks.insert(block_ref);
121 }
122 }
123
124 if blocks.is_empty() {
125 None
126 } else {
127 Some(BlocksGuard {
128 map: self.clone(),
129 block_refs: blocks,
130 peer,
131 })
132 }
133 }
134
135 fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
140 let mut blocks_to_fetch = self.inner.lock();
142 for block_ref in block_refs {
143 let authorities = blocks_to_fetch
144 .get_mut(block_ref)
145 .expect("Should have found a non empty map");
146
147 assert!(authorities.remove(&peer), "Peer index should be present!");
148
149 if authorities.is_empty() {
151 blocks_to_fetch.remove(block_ref);
152 }
153 }
154 }
155
156 fn swap_locks(
161 self: &Arc<Self>,
162 blocks_guard: BlocksGuard,
163 peer: AuthorityIndex,
164 ) -> Option<BlocksGuard> {
165 let block_refs = blocks_guard.block_refs.clone();
166
167 drop(blocks_guard);
169
170 self.lock_blocks(block_refs, peer)
172 }
173
174 #[cfg(test)]
175 fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
176 let inner = self.inner.lock();
177 inner.len()
178 }
179}
180
181enum Command {
182 FetchBlocks {
183 missing_block_refs: BTreeSet<BlockRef>,
184 peer_index: AuthorityIndex,
185 result: oneshot::Sender<Result<(), ConsensusError>>,
186 },
187 FetchOwnLastBlock,
188 KickOffScheduler,
189}
190
191pub(crate) struct SynchronizerHandle {
192 commands_sender: Sender<Command>,
193 tasks: tokio::sync::Mutex<JoinSet<()>>,
194}
195
196impl SynchronizerHandle {
197 pub(crate) async fn fetch_blocks(
200 &self,
201 missing_block_refs: BTreeSet<BlockRef>,
202 peer_index: AuthorityIndex,
203 ) -> ConsensusResult<()> {
204 let (sender, receiver) = oneshot::channel();
205 self.commands_sender
206 .send(Command::FetchBlocks {
207 missing_block_refs,
208 peer_index,
209 result: sender,
210 })
211 .await
212 .map_err(|_err| ConsensusError::Shutdown)?;
213 receiver.await.map_err(|_err| ConsensusError::Shutdown)?
214 }
215
216 pub(crate) async fn stop(&self) -> Result<(), JoinError> {
217 let mut tasks = self.tasks.lock().await;
218 tasks.abort_all();
219 while let Some(result) = tasks.join_next().await {
220 result?
221 }
222 Ok(())
223 }
224}
225
226pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
253 context: Arc<Context>,
254 commands_receiver: Receiver<Command>,
255 fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
256 core_dispatcher: Arc<D>,
257 commit_vote_monitor: Arc<CommitVoteMonitor>,
258 dag_state: Arc<RwLock<DagState>>,
259 fetch_blocks_scheduler_task: JoinSet<()>,
260 fetch_own_last_block_task: JoinSet<()>,
261 network_client: Arc<C>,
262 block_verifier: Arc<V>,
263 inflight_blocks_map: Arc<InflightBlocksMap>,
264 commands_sender: Sender<Command>,
265}
266
267impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
268 pub fn start(
271 network_client: Arc<C>,
272 context: Arc<Context>,
273 core_dispatcher: Arc<D>,
274 commit_vote_monitor: Arc<CommitVoteMonitor>,
275 block_verifier: Arc<V>,
276 dag_state: Arc<RwLock<DagState>>,
277 sync_last_known_own_block: bool,
278 ) -> Arc<SynchronizerHandle> {
279 let (commands_sender, commands_receiver) =
280 channel("consensus_synchronizer_commands", 1_000);
281 let inflight_blocks_map = InflightBlocksMap::new();
282
283 let mut fetch_block_senders = BTreeMap::new();
285 let mut tasks = JoinSet::new();
286 for (index, _) in context.committee.authorities() {
287 if index == context.own_index {
288 continue;
289 }
290 let (sender, receiver) =
291 channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
292 let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
293 index,
294 network_client.clone(),
295 block_verifier.clone(),
296 commit_vote_monitor.clone(),
297 context.clone(),
298 core_dispatcher.clone(),
299 dag_state.clone(),
300 receiver,
301 commands_sender.clone(),
302 );
303 tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
304 fetch_block_senders.insert(index, sender);
305 }
306
307 let commands_sender_clone = commands_sender.clone();
308
309 if sync_last_known_own_block {
310 commands_sender
311 .try_send(Command::FetchOwnLastBlock)
312 .expect("Failed to sync our last block");
313 }
314
315 tasks.spawn(monitored_future!(async move {
317 let mut s = Self {
318 context,
319 commands_receiver,
320 fetch_block_senders,
321 core_dispatcher,
322 commit_vote_monitor,
323 fetch_blocks_scheduler_task: JoinSet::new(),
324 fetch_own_last_block_task: JoinSet::new(),
325 network_client,
326 block_verifier,
327 inflight_blocks_map,
328 commands_sender: commands_sender_clone,
329 dag_state,
330 };
331 s.run().await;
332 }));
333
334 Arc::new(SynchronizerHandle {
335 commands_sender,
336 tasks: tokio::sync::Mutex::new(tasks),
337 })
338 }
339
340 async fn run(&mut self) {
342 const SYNCHRONIZER_TIMEOUT: Duration = Duration::from_millis(500);
345 let scheduler_timeout = sleep_until(Instant::now() + SYNCHRONIZER_TIMEOUT);
346
347 tokio::pin!(scheduler_timeout);
348
349 loop {
350 tokio::select! {
351 Some(command) = self.commands_receiver.recv() => {
352 match command {
353 Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
354 if peer_index == self.context.own_index {
355 error!("We should never attempt to fetch blocks from our own node");
356 continue;
357 }
358
359 let missing_block_refs = missing_block_refs
362 .into_iter()
363 .take(MAX_BLOCKS_PER_FETCH)
364 .collect();
365
366 let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
367 let Some(blocks_guard) = blocks_guard else {
368 result.send(Ok(())).ok();
369 continue;
370 };
371
372 let r = self
375 .fetch_block_senders
376 .get(&peer_index)
377 .expect("Fatal error, sender should be present")
378 .try_send(blocks_guard)
379 .map_err(|err| {
380 match err {
381 TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index),
382 TrySendError::Closed(_) => ConsensusError::Shutdown
383 }
384 });
385
386 result.send(r).ok();
387 }
388 Command::FetchOwnLastBlock => {
389 if self.fetch_own_last_block_task.is_empty() {
390 self.start_fetch_own_last_block_task();
391 }
392 }
393 Command::KickOffScheduler => {
394 let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
397 Instant::now()
398 } else {
399 Instant::now() + SYNCHRONIZER_TIMEOUT.checked_div(2).unwrap()
400 };
401
402 if timeout < scheduler_timeout.deadline() {
404 scheduler_timeout.as_mut().reset(timeout);
405 }
406 }
407 }
408 },
409 Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
410 match result {
411 Ok(()) => {},
412 Err(e) => {
413 if e.is_cancelled() {
414 } else if e.is_panic() {
415 std::panic::resume_unwind(e.into_panic());
416 } else {
417 panic!("fetch our last block task failed: {e}");
418 }
419 },
420 };
421 },
422 Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
423 match result {
424 Ok(()) => {},
425 Err(e) => {
426 if e.is_cancelled() {
427 } else if e.is_panic() {
428 std::panic::resume_unwind(e.into_panic());
429 } else {
430 panic!("fetch blocks scheduler task failed: {e}");
431 }
432 },
433 };
434 },
435 () = &mut scheduler_timeout => {
436 if self.fetch_blocks_scheduler_task.is_empty() {
438 if let Err(err) = self.start_fetch_missing_blocks_task().await {
439 debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
440 return;
441 };
442 }
443
444 scheduler_timeout
445 .as_mut()
446 .reset(Instant::now() + SYNCHRONIZER_TIMEOUT);
447 }
448 }
449 }
450 }
451
452 async fn fetch_blocks_from_authority(
453 peer_index: AuthorityIndex,
454 network_client: Arc<C>,
455 block_verifier: Arc<V>,
456 commit_vote_monitor: Arc<CommitVoteMonitor>,
457 context: Arc<Context>,
458 core_dispatcher: Arc<D>,
459 dag_state: Arc<RwLock<DagState>>,
460 mut receiver: Receiver<BlocksGuard>,
461 commands_sender: Sender<Command>,
462 ) {
463 const MAX_RETRIES: u32 = 5;
464 let peer_hostname = &context.committee.authority(peer_index).hostname;
465
466 let mut requests = FuturesUnordered::new();
467
468 loop {
469 tokio::select! {
470 Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
471 let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
473
474 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, 1))
475 },
476 Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
477 match response {
478 Ok(blocks) => {
479 if let Err(err) = Self::process_fetched_blocks(blocks,
480 peer_index,
481 blocks_guard,
482 core_dispatcher.clone(),
483 block_verifier.clone(),
484 commit_vote_monitor.clone(),
485 context.clone(),
486 commands_sender.clone(),
487 "live"
488 ).await {
489 warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
490 }
491 },
492 Err(_) => {
493 if retries <= MAX_RETRIES {
494 requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
495 } else {
496 warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
497 drop(blocks_guard);
499 }
500 }
501 }
502 },
503 else => {
504 info!("Fetching blocks from authority {peer_index} task will now abort.");
505 break;
506 }
507 }
508 }
509 }
510
511 async fn process_fetched_blocks(
515 serialized_blocks: Vec<Bytes>,
516 peer_index: AuthorityIndex,
517 requested_blocks_guard: BlocksGuard,
518 core_dispatcher: Arc<D>,
519 block_verifier: Arc<V>,
520 commit_vote_monitor: Arc<CommitVoteMonitor>,
521 context: Arc<Context>,
522 commands_sender: Sender<Command>,
523 sync_method: &str,
524 ) -> ConsensusResult<()> {
525 const MAX_ADDITIONAL_BLOCKS: usize = 10;
528
529 if serialized_blocks.len() > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
532 {
533 return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
534 }
535
536 let blocks = Handle::current()
538 .spawn_blocking({
539 let block_verifier = block_verifier.clone();
540 let context = context.clone();
541 move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
542 })
543 .await
544 .expect("Spawn blocking should not fail")?;
545
546 let ancestors = blocks
548 .iter()
549 .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
550 .flat_map(|b| b.ancestors().to_vec())
551 .collect::<BTreeSet<BlockRef>>();
552
553 for block in &blocks {
556 if !requested_blocks_guard
557 .block_refs
558 .contains(&block.reference())
559 && !ancestors.contains(&block.reference())
560 {
561 return Err(ConsensusError::UnexpectedFetchedBlock {
562 index: peer_index,
563 block_ref: block.reference(),
564 });
565 }
566 }
567
568 for block in &blocks {
570 commit_vote_monitor.observe_block(block);
571 }
572
573 let metrics = &context.metrics.node_metrics;
574 let peer_hostname = &context.committee.authority(peer_index).hostname;
575 metrics
576 .synchronizer_fetched_blocks_by_peer
577 .with_label_values(&[peer_hostname.as_str(), sync_method])
578 .inc_by(blocks.len() as u64);
579 for block in &blocks {
580 let block_hostname = &context.committee.authority(block.author()).hostname;
581 metrics
582 .synchronizer_fetched_blocks_by_authority
583 .with_label_values(&[block_hostname.as_str(), sync_method])
584 .inc();
585 }
586
587 debug!(
588 "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
589 blocks.len(),
590 blocks.iter().map(|b| b.reference().to_string()).join(", "),
591 );
592
593 let missing_blocks = core_dispatcher
597 .add_blocks(blocks)
598 .await
599 .map_err(|_| ConsensusError::Shutdown)?;
600
601 drop(requested_blocks_guard);
604
605 if !missing_blocks.is_empty() {
607 if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
609 {
610 warn!("Commands channel is full")
611 }
612 }
613
614 context
615 .metrics
616 .node_metrics
617 .missing_blocks_after_fetch_total
618 .inc_by(missing_blocks.len() as u64);
619
620 Ok(())
621 }
622
623 fn get_highest_accepted_rounds(
624 dag_state: Arc<RwLock<DagState>>,
625 context: &Arc<Context>,
626 ) -> Vec<Round> {
627 let blocks = dag_state
628 .read()
629 .get_last_cached_block_per_authority(Round::MAX);
630 assert_eq!(blocks.len(), context.committee.size());
631
632 blocks
633 .into_iter()
634 .map(|(block, _)| block.round())
635 .collect::<Vec<_>>()
636 }
637
638 fn verify_blocks(
639 serialized_blocks: Vec<Bytes>,
640 block_verifier: Arc<V>,
641 context: &Context,
642 peer_index: AuthorityIndex,
643 ) -> ConsensusResult<Vec<VerifiedBlock>> {
644 let mut verified_blocks = Vec::new();
645
646 for serialized_block in serialized_blocks {
647 let signed_block: SignedBlock =
648 bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
649
650 if let Err(e) = block_verifier.verify(&signed_block) {
652 let hostname = context.committee.authority(peer_index).hostname.clone();
655
656 context
657 .metrics
658 .node_metrics
659 .invalid_blocks
660 .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
661 .inc();
662 warn!("Invalid block received from {}: {}", peer_index, e);
663 return Err(e);
664 }
665 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
666
667 let now = context.clock.timestamp_utc_ms();
671 if now < verified_block.timestamp_ms() {
672 warn!(
673 "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
674 verified_block.reference(),
675 verified_block.timestamp_ms(),
676 now
677 );
678 continue;
679 }
680
681 verified_blocks.push(verified_block);
682 }
683
684 Ok(verified_blocks)
685 }
686
687 async fn fetch_blocks_request(
688 network_client: Arc<C>,
689 peer: AuthorityIndex,
690 blocks_guard: BlocksGuard,
691 highest_rounds: Vec<Round>,
692 request_timeout: Duration,
693 mut retries: u32,
694 ) -> (
695 ConsensusResult<Vec<Bytes>>,
696 BlocksGuard,
697 u32,
698 AuthorityIndex,
699 Vec<Round>,
700 ) {
701 let start = Instant::now();
702 let resp = timeout(
703 request_timeout,
704 network_client.fetch_blocks(
705 peer,
706 blocks_guard
707 .block_refs
708 .clone()
709 .into_iter()
710 .collect::<Vec<_>>(),
711 highest_rounds.clone(),
712 request_timeout,
713 ),
714 )
715 .await;
716
717 fail_point_async!("consensus-delay");
718
719 let resp = match resp {
720 Ok(Err(err)) => {
721 sleep_until(start + request_timeout).await;
724 retries += 1;
725 Err(err)
726 } Err(err) => {
728 sleep_until(start + request_timeout).await;
730 retries += 1;
731 Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
732 }
733 Ok(result) => result,
734 };
735 (resp, blocks_guard, retries, peer, highest_rounds)
736 }
737
738 fn start_fetch_own_last_block_task(&mut self) {
739 const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
740 const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
741
742 let context = self.context.clone();
743 let dag_state = self.dag_state.clone();
744 let network_client = self.network_client.clone();
745 let block_verifier = self.block_verifier.clone();
746 let core_dispatcher = self.core_dispatcher.clone();
747
748 self.fetch_own_last_block_task
749 .spawn(monitored_future!(async move {
750 let _scope = monitored_scope("FetchOwnLastBlockTask");
751
752 let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
753 let network_client_cloned = network_client.clone();
754 let own_index = context.own_index;
755 async move {
756 sleep(fetch_own_block_delay).await;
757 let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
758 (r, authority_index)
759 }
760 };
761
762 let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
763 let mut result = Vec::new();
764 for serialized_block in blocks {
765 let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
766 block_verifier.verify(&signed_block).tap_err(|err|{
767 let hostname = context.committee.authority(authority_index).hostname.clone();
768 context
769 .metrics
770 .node_metrics
771 .invalid_blocks
772 .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
773 .inc();
774 warn!("Invalid block received from {}: {}", authority_index, err);
775 })?;
776
777 let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
778 if verified_block.author() != context.own_index {
779 return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
780 }
781 result.push(verified_block);
782 }
783 Ok(result)
784 };
785
786 let mut highest_round = GENESIS_ROUND;
788 let mut received_response = vec![false; context.committee.size()];
790 received_response[context.own_index] = true;
792 let mut total_stake = context.committee.stake(context.own_index);
793 let mut retries = 0;
794 let mut retry_delay_step = Duration::from_millis(500);
795 'main:loop {
796 if context.committee.size() == 1 {
797 highest_round = dag_state.read().get_last_proposed_block().round();
798 info!("Only one node in the network, will not try fetching own last block from peers.");
799 break 'main;
800 }
801
802 let mut results = FuturesUnordered::new();
804
805 for (authority_index, _authority) in context.committee.authorities() {
806 if !received_response[authority_index] {
808 results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
809 }
810 }
811
812 let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
814 tokio::pin!(timer);
815
816 'inner: loop {
817 tokio::select! {
818 result = results.next() => {
819 let Some((result, authority_index)) = result else {
820 break 'inner;
821 };
822 match result {
823 Ok(result) => {
824 match process_blocks(result, authority_index) {
825 Ok(blocks) => {
826 received_response[authority_index] = true;
827 let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
828 highest_round = highest_round.max(max_round);
829
830 total_stake += context.committee.stake(authority_index);
831 },
832 Err(err) => {
833 warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
834 }
835 }
836 },
837 Err(err) => {
838 warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
839 results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
840 }
841 }
842 },
843 () = &mut timer => {
844 info!("Timeout while trying to sync our own last block from peers");
845 break 'inner;
846 }
847 }
848 }
849
850 if context.committee.reached_quorum(total_stake) {
852 info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
853 break 'main;
854 } else {
855 info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
856 }
857
858 retries += 1;
859 context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
860 warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
861
862 sleep(retry_delay_step).await;
863
864 retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
865 retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
866 }
867
868 context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
870
871 if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
872 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
873 }
874 }));
875 }
876
877 async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
878 let mut missing_blocks = self
879 .core_dispatcher
880 .get_missing_blocks()
881 .await
882 .map_err(|_err| ConsensusError::Shutdown)?;
883
884 if missing_blocks.is_empty() {
886 return Ok(());
887 }
888
889 let context = self.context.clone();
890 let network_client = self.network_client.clone();
891 let block_verifier = self.block_verifier.clone();
892 let commit_vote_monitor = self.commit_vote_monitor.clone();
893 let core_dispatcher = self.core_dispatcher.clone();
894 let blocks_to_fetch = self.inflight_blocks_map.clone();
895 let commands_sender = self.commands_sender.clone();
896 let dag_state = self.dag_state.clone();
897
898 let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
899 if commit_lagging {
900 if dag_state.read().gc_enabled() {
905 return Ok(());
906 }
907
908 let highest_accepted_round = dag_state.read().highest_accepted_round();
912 missing_blocks = missing_blocks
913 .into_iter()
914 .take_while(|b| {
915 b.round <= highest_accepted_round + SYNC_MISSING_BLOCK_ROUND_THRESHOLD
916 })
917 .collect::<BTreeSet<_>>();
918
919 if missing_blocks.is_empty() {
922 trace!(
923 "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
924 );
925 self.context
926 .metrics
927 .node_metrics
928 .fetch_blocks_scheduler_skipped
929 .with_label_values(&["commit_lagging"])
930 .inc();
931 return Ok(());
932 }
933 }
934
935 self.fetch_blocks_scheduler_task
936 .spawn(monitored_future!(async move {
937 let _scope = monitored_scope("FetchMissingBlocksScheduler");
938
939 context
940 .metrics
941 .node_metrics
942 .fetch_blocks_scheduler_inflight
943 .inc();
944 let total_requested = missing_blocks.len();
945
946 fail_point_async!("consensus-delay");
947
948 let results = Self::fetch_blocks_from_authorities(
950 context.clone(),
951 blocks_to_fetch.clone(),
952 network_client,
953 missing_blocks,
954 dag_state,
955 )
956 .await;
957 context
958 .metrics
959 .node_metrics
960 .fetch_blocks_scheduler_inflight
961 .dec();
962 if results.is_empty() {
963 warn!("No results returned while requesting missing blocks");
964 return;
965 }
966
967 let mut total_fetched = 0;
969 for (blocks_guard, fetched_blocks, peer) in results {
970 total_fetched += fetched_blocks.len();
971
972 if let Err(err) = Self::process_fetched_blocks(
973 fetched_blocks,
974 peer,
975 blocks_guard,
976 core_dispatcher.clone(),
977 block_verifier.clone(),
978 commit_vote_monitor.clone(),
979 context.clone(),
980 commands_sender.clone(),
981 "periodic",
982 )
983 .await
984 {
985 warn!(
986 "Error occurred while processing fetched blocks from peer {peer}: {err}"
987 );
988 }
989 }
990
991 debug!(
992 "Total blocks requested to fetch: {}, total fetched: {}",
993 total_requested, total_fetched
994 );
995 }));
996 Ok(())
997 }
998
999 fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1000 let last_commit_index = self.dag_state.read().last_commit_index();
1001 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1002 let commit_threshold = last_commit_index
1003 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1004 (
1005 commit_threshold < quorum_commit_index,
1006 last_commit_index,
1007 quorum_commit_index,
1008 )
1009 }
1010
1011 async fn fetch_blocks_from_authorities(
1018 context: Arc<Context>,
1019 inflight_blocks: Arc<InflightBlocksMap>,
1020 network_client: Arc<C>,
1021 missing_blocks: BTreeSet<BlockRef>,
1022 dag_state: Arc<RwLock<DagState>>,
1023 ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1024 const MAX_PEERS: usize = 3;
1025
1026 let missing_blocks = missing_blocks
1028 .into_iter()
1029 .take(MAX_PEERS * MAX_BLOCKS_PER_FETCH)
1030 .collect::<Vec<_>>();
1031
1032 let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1033 for block in &missing_blocks {
1034 missing_blocks_per_authority[block.author] += 1;
1035 }
1036 for (missing, (_, authority)) in missing_blocks_per_authority
1037 .into_iter()
1038 .zip(context.committee.authorities())
1039 {
1040 context
1041 .metrics
1042 .node_metrics
1043 .synchronizer_missing_blocks_by_authority
1044 .with_label_values(&[&authority.hostname])
1045 .inc_by(missing as u64);
1046 context
1047 .metrics
1048 .node_metrics
1049 .synchronizer_current_missing_blocks_by_authority
1050 .with_label_values(&[&authority.hostname])
1051 .set(missing as i64);
1052 }
1053
1054 #[cfg_attr(test, expect(unused_mut))]
1055 let mut peers = context
1056 .committee
1057 .authorities()
1058 .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1059 .collect::<Vec<_>>();
1060
1061 #[cfg(not(test))]
1063 peers.shuffle(&mut ThreadRng::default());
1064
1065 let mut peers = peers.into_iter();
1066 let mut request_futures = FuturesUnordered::new();
1067
1068 let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1069
1070 for blocks in missing_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1072 let peer = peers
1073 .next()
1074 .expect("Possible misconfiguration as a peer should be found");
1075 let peer_hostname = &context.committee.authority(peer).hostname;
1076 let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();
1077
1078 if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1081 info!(
1082 "Periodic sync of {} missing blocks from peer {} {}: {}",
1083 block_refs.len(),
1084 peer,
1085 peer_hostname,
1086 block_refs
1087 .iter()
1088 .map(|b| b.to_string())
1089 .collect::<Vec<_>>()
1090 .join(", ")
1091 );
1092 request_futures.push(Self::fetch_blocks_request(
1093 network_client.clone(),
1094 peer,
1095 blocks_guard,
1096 highest_rounds.clone(),
1097 FETCH_REQUEST_TIMEOUT,
1098 1,
1099 ));
1100 }
1101 }
1102
1103 let mut results = Vec::new();
1104 let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1105
1106 tokio::pin!(fetcher_timeout);
1107
1108 loop {
1109 tokio::select! {
1110 Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1111 let peer_hostname = &context.committee.authority(peer_index).hostname;
1112 match response {
1113 Ok(fetched_blocks) => {
1114 info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1115 results.push((blocks_guard, fetched_blocks, peer_index));
1116
1117 if request_futures.is_empty() {
1119 break;
1120 }
1121 },
1122 Err(_) => {
1123 if let Some(next_peer) = peers.next() {
1125 if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1127 info!(
1128 "Retrying syncing {} missing blocks from peer {}: {}",
1129 blocks_guard.block_refs.len(),
1130 peer_hostname,
1131 blocks_guard.block_refs
1132 .iter()
1133 .map(|b| b.to_string())
1134 .collect::<Vec<_>>()
1135 .join(", ")
1136 );
1137 request_futures.push(Self::fetch_blocks_request(
1138 network_client.clone(),
1139 next_peer,
1140 blocks_guard,
1141 highest_rounds,
1142 FETCH_REQUEST_TIMEOUT,
1143 1,
1144 ));
1145 } else {
1146 debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1147 }
1148 } else {
1149 debug!("No more peers left to fetch blocks");
1150 }
1151 }
1152 }
1153 },
1154 _ = &mut fetcher_timeout => {
1155 debug!("Timed out while fetching missing blocks");
1156 break;
1157 }
1158 }
1159 }
1160
1161 results
1162 }
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use std::{
1168 collections::{BTreeMap, BTreeSet},
1169 sync::Arc,
1170 time::Duration,
1171 };
1172
1173 use async_trait::async_trait;
1174 use bytes::Bytes;
1175 use consensus_config::{AuthorityIndex, Parameters};
1176 use parking_lot::RwLock;
1177 use tokio::{sync::Mutex, time::sleep};
1178
1179 use crate::{
1180 BlockAPI, CommitDigest, CommitIndex,
1181 authority_service::COMMIT_LAG_MULTIPLIER,
1182 block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1183 block_verifier::NoopBlockVerifier,
1184 commit::{CommitRange, CommitVote, TrustedCommit},
1185 commit_vote_monitor::CommitVoteMonitor,
1186 context::Context,
1187 core_thread::{CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1188 dag_state::DagState,
1189 error::{ConsensusError, ConsensusResult},
1190 network::{BlockStream, NetworkClient},
1191 storage::mem_store::MemStore,
1192 synchronizer::{
1193 FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap,
1194 MAX_BLOCKS_PER_FETCH, SYNC_MISSING_BLOCK_ROUND_THRESHOLD, Synchronizer,
1195 },
1196 };
1197
1198 type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1199 type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1200 type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1201 type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1202
1203 #[derive(Default)]
1204 struct MockNetworkClient {
1205 fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1206 fetch_latest_blocks_requests:
1207 Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1208 }
1209
1210 impl MockNetworkClient {
1211 async fn stub_fetch_blocks(
1212 &self,
1213 blocks: Vec<VerifiedBlock>,
1214 peer: AuthorityIndex,
1215 latency: Option<Duration>,
1216 ) {
1217 let mut lock = self.fetch_blocks_requests.lock().await;
1218 let block_refs = blocks
1219 .iter()
1220 .map(|block| block.reference())
1221 .collect::<Vec<_>>();
1222 lock.insert((block_refs, peer), (blocks, latency));
1223 }
1224
1225 async fn stub_fetch_latest_blocks(
1226 &self,
1227 blocks: Vec<VerifiedBlock>,
1228 peer: AuthorityIndex,
1229 authorities: Vec<AuthorityIndex>,
1230 latency: Option<Duration>,
1231 ) {
1232 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1233 lock.entry((peer, authorities))
1234 .or_default()
1235 .push((blocks, latency));
1236 }
1237
1238 async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1239 let lock = self.fetch_latest_blocks_requests.lock().await;
1240 lock.len()
1241 }
1242 }
1243
1244 #[async_trait]
1245 impl NetworkClient for MockNetworkClient {
1246 const SUPPORT_STREAMING: bool = false;
1247
1248 async fn send_block(
1249 &self,
1250 _peer: AuthorityIndex,
1251 _serialized_block: &VerifiedBlock,
1252 _timeout: Duration,
1253 ) -> ConsensusResult<()> {
1254 unimplemented!("Unimplemented")
1255 }
1256
1257 async fn subscribe_blocks(
1258 &self,
1259 _peer: AuthorityIndex,
1260 _last_received: Round,
1261 _timeout: Duration,
1262 ) -> ConsensusResult<BlockStream> {
1263 unimplemented!("Unimplemented")
1264 }
1265
1266 async fn fetch_blocks(
1267 &self,
1268 peer: AuthorityIndex,
1269 block_refs: Vec<BlockRef>,
1270 _highest_accepted_rounds: Vec<Round>,
1271 _timeout: Duration,
1272 ) -> ConsensusResult<Vec<Bytes>> {
1273 let mut lock = self.fetch_blocks_requests.lock().await;
1274 let response = lock
1275 .remove(&(block_refs, peer))
1276 .expect("Unexpected fetch blocks request made");
1277
1278 let serialised = response
1279 .0
1280 .into_iter()
1281 .map(|block| block.serialized().clone())
1282 .collect::<Vec<_>>();
1283
1284 drop(lock);
1285
1286 if let Some(latency) = response.1 {
1287 sleep(latency).await;
1288 }
1289
1290 Ok(serialised)
1291 }
1292
1293 async fn fetch_commits(
1294 &self,
1295 _peer: AuthorityIndex,
1296 _commit_range: CommitRange,
1297 _timeout: Duration,
1298 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1299 unimplemented!("Unimplemented")
1300 }
1301
1302 async fn fetch_latest_blocks(
1303 &self,
1304 peer: AuthorityIndex,
1305 authorities: Vec<AuthorityIndex>,
1306 _timeout: Duration,
1307 ) -> ConsensusResult<Vec<Bytes>> {
1308 let mut lock = self.fetch_latest_blocks_requests.lock().await;
1309 let mut responses = lock
1310 .remove(&(peer, authorities.clone()))
1311 .expect("Unexpected fetch blocks request made");
1312
1313 let response = responses.remove(0);
1314 let serialised = response
1315 .0
1316 .into_iter()
1317 .map(|block| block.serialized().clone())
1318 .collect::<Vec<_>>();
1319
1320 if !responses.is_empty() {
1321 lock.insert((peer, authorities), responses);
1322 }
1323
1324 drop(lock);
1325
1326 if let Some(latency) = response.1 {
1327 sleep(latency).await;
1328 }
1329
1330 Ok(serialised)
1331 }
1332
1333 async fn get_latest_rounds(
1334 &self,
1335 _peer: AuthorityIndex,
1336 _timeout: Duration,
1337 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1338 unimplemented!("Unimplemented")
1339 }
1340 }
1341
1342 #[test]
1343 fn inflight_blocks_map() {
1344 let map = InflightBlocksMap::new();
1346 let some_block_refs = [
1347 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1348 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1349 BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1350 BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1351 ];
1352 let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1353
1354 {
1356 let mut all_guards = Vec::new();
1357
1358 for i in 1..=2 {
1360 let authority = AuthorityIndex::new_for_test(i);
1361
1362 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1363 let guard = guard.expect("Guard should be created");
1364 assert_eq!(guard.block_refs.len(), 4);
1365
1366 all_guards.push(guard);
1367
1368 let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1370 assert!(guard.is_none());
1371 }
1372
1373 let authority_3 = AuthorityIndex::new_for_test(3);
1376
1377 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1378 assert!(guard.is_none());
1379
1380 drop(all_guards.remove(0));
1383
1384 let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1385 let guard = guard.expect("Guard should be successfully acquired");
1386
1387 assert_eq!(guard.block_refs, missing_block_refs);
1388
1389 drop(guard);
1391 drop(all_guards);
1392
1393 assert_eq!(map.num_of_locked_blocks(), 0);
1394 }
1395
1396 {
1398 let authority_1 = AuthorityIndex::new_for_test(1);
1400 let guard = map
1401 .lock_blocks(missing_block_refs.clone(), authority_1)
1402 .unwrap();
1403
1404 let authority_2 = AuthorityIndex::new_for_test(2);
1406 let guard = map.swap_locks(guard, authority_2);
1407
1408 assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn successful_fetch_blocks_from_peer() {
1414 let (context, _) = Context::new_for_test(4);
1416 let context = Arc::new(context);
1417 let block_verifier = Arc::new(NoopBlockVerifier {});
1418 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1419 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1420 let network_client = Arc::new(MockNetworkClient::default());
1421 let store = Arc::new(MemStore::new());
1422 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1423
1424 let handle = Synchronizer::start(
1425 network_client.clone(),
1426 context,
1427 core_dispatcher.clone(),
1428 commit_vote_monitor,
1429 block_verifier,
1430 dag_state,
1431 false,
1432 );
1433
1434 let expected_blocks = (0..10)
1436 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1437 .collect::<Vec<_>>();
1438 let missing_blocks = expected_blocks
1439 .iter()
1440 .map(|block| block.reference())
1441 .collect::<BTreeSet<_>>();
1442
1443 let peer = AuthorityIndex::new_for_test(1);
1445 network_client
1446 .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1447 .await;
1448
1449 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1451
1452 sleep(Duration::from_millis(1_000)).await;
1454
1455 let added_blocks = core_dispatcher.get_add_blocks().await;
1457 assert_eq!(added_blocks, expected_blocks);
1458 }
1459
1460 #[tokio::test]
1461 async fn saturate_fetch_blocks_from_peer() {
1462 let (context, _) = Context::new_for_test(4);
1464 let context = Arc::new(context);
1465 let block_verifier = Arc::new(NoopBlockVerifier {});
1466 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1467 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1468 let network_client = Arc::new(MockNetworkClient::default());
1469 let store = Arc::new(MemStore::new());
1470 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1471
1472 let handle = Synchronizer::start(
1473 network_client.clone(),
1474 context,
1475 core_dispatcher.clone(),
1476 commit_vote_monitor,
1477 block_verifier,
1478 dag_state,
1479 false,
1480 );
1481
1482 let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1484 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1485 .collect::<Vec<_>>();
1486
1487 let peer = AuthorityIndex::new_for_test(1);
1489 let mut iter = expected_blocks.iter().peekable();
1490 while let Some(block) = iter.next() {
1491 network_client
1494 .stub_fetch_blocks(
1495 vec![block.clone()],
1496 peer,
1497 Some(Duration::from_millis(5_000)),
1498 )
1499 .await;
1500
1501 let mut missing_blocks = BTreeSet::new();
1502 missing_blocks.insert(block.reference());
1503
1504 if iter.peek().is_none() {
1507 match handle.fetch_blocks(missing_blocks, peer).await {
1508 Err(ConsensusError::SynchronizerSaturated(index)) => {
1509 assert_eq!(index, peer);
1510 }
1511 _ => panic!("A saturated synchronizer error was expected"),
1512 }
1513 } else {
1514 assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1515 }
1516 }
1517 }
1518
1519 #[tokio::test(flavor = "current_thread", start_paused = true)]
1520 async fn synchronizer_periodic_task_fetch_blocks() {
1521 let (context, _) = Context::new_for_test(4);
1523 let context = Arc::new(context);
1524 let block_verifier = Arc::new(NoopBlockVerifier {});
1525 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1526 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1527 let network_client = Arc::new(MockNetworkClient::default());
1528 let store = Arc::new(MemStore::new());
1529 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1530
1531 let expected_blocks = (0..10)
1533 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1534 .collect::<Vec<_>>();
1535 let missing_blocks = expected_blocks
1536 .iter()
1537 .map(|block| block.reference())
1538 .collect::<BTreeSet<_>>();
1539
1540 core_dispatcher
1542 .stub_missing_blocks(missing_blocks.clone())
1543 .await;
1544
1545 network_client
1549 .stub_fetch_blocks(
1550 expected_blocks.clone(),
1551 AuthorityIndex::new_for_test(1),
1552 Some(FETCH_REQUEST_TIMEOUT),
1553 )
1554 .await;
1555 network_client
1556 .stub_fetch_blocks(
1557 expected_blocks.clone(),
1558 AuthorityIndex::new_for_test(2),
1559 None,
1560 )
1561 .await;
1562
1563 let _handle = Synchronizer::start(
1565 network_client.clone(),
1566 context,
1567 core_dispatcher.clone(),
1568 commit_vote_monitor,
1569 block_verifier,
1570 dag_state,
1571 false,
1572 );
1573
1574 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1575
1576 let added_blocks = core_dispatcher.get_add_blocks().await;
1578 assert_eq!(added_blocks, expected_blocks);
1579
1580 assert!(
1582 core_dispatcher
1583 .get_missing_blocks()
1584 .await
1585 .unwrap()
1586 .is_empty()
1587 );
1588 }
1589
1590 #[tokio::test(flavor = "current_thread", start_paused = true)]
1591 async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds()
1592 {
1593 let (mut context, _) = Context::new_for_test(4);
1595
1596 context
1599 .protocol_config
1600 .set_consensus_gc_depth_for_testing(0);
1601
1602 let context = Arc::new(context);
1603 let block_verifier = Arc::new(NoopBlockVerifier {});
1604 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1605 let network_client = Arc::new(MockNetworkClient::default());
1606 let store = Arc::new(MemStore::new());
1607 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1608 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1609
1610 let expected_blocks = (0..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2)
1613 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1614 .collect::<Vec<_>>();
1615
1616 let missing_blocks = expected_blocks
1617 .iter()
1618 .map(|block| block.reference())
1619 .collect::<BTreeSet<_>>();
1620 core_dispatcher.stub_missing_blocks(missing_blocks).await;
1621
1622 let mut expected_blocks = expected_blocks
1626 .into_iter()
1627 .filter(|block| block.round() <= SYNC_MISSING_BLOCK_ROUND_THRESHOLD)
1628 .collect::<Vec<_>>();
1629
1630 for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1631 network_client
1632 .stub_fetch_blocks(
1633 chunk.to_vec(),
1634 AuthorityIndex::new_for_test(1),
1635 Some(FETCH_REQUEST_TIMEOUT),
1636 )
1637 .await;
1638
1639 network_client
1640 .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1641 .await;
1642 }
1643
1644 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1646 let commit_index: CommitIndex = round - 1;
1647 let blocks = (0..4)
1648 .map(|authority| {
1649 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1650 let block = TestBlock::new(round, authority)
1651 .set_commit_votes(commit_votes)
1652 .build();
1653
1654 VerifiedBlock::new_for_test(block)
1655 })
1656 .collect::<Vec<_>>();
1657
1658 for block in blocks {
1661 commit_vote_monitor.observe_block(&block);
1662 }
1663
1664 let _handle = Synchronizer::start(
1667 network_client.clone(),
1668 context.clone(),
1669 core_dispatcher.clone(),
1670 commit_vote_monitor.clone(),
1671 block_verifier.clone(),
1672 dag_state.clone(),
1673 false,
1674 );
1675
1676 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1677
1678 let mut added_blocks = core_dispatcher.get_add_blocks().await;
1681
1682 added_blocks.sort_by_key(|block| block.reference());
1683 expected_blocks.sort_by_key(|block| block.reference());
1684
1685 assert_eq!(added_blocks, expected_blocks);
1686 }
1687
1688 #[tokio::test(flavor = "current_thread", start_paused = true)]
1689 async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1690 let (context, _) = Context::new_for_test(4);
1692 let context = Arc::new(context);
1693 let block_verifier = Arc::new(NoopBlockVerifier {});
1694 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1695 let network_client = Arc::new(MockNetworkClient::default());
1696 let store = Arc::new(MemStore::new());
1697 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1698 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1699
1700 let mut expected_blocks = (SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2
1703 ..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 3)
1704 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1705 .collect::<Vec<_>>();
1706 let missing_blocks = expected_blocks
1707 .iter()
1708 .map(|block| block.reference())
1709 .collect::<BTreeSet<_>>();
1710 core_dispatcher
1711 .stub_missing_blocks(missing_blocks.clone())
1712 .await;
1713
1714 for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1718 network_client
1719 .stub_fetch_blocks(
1720 chunk.to_vec(),
1721 AuthorityIndex::new_for_test(1),
1722 Some(FETCH_REQUEST_TIMEOUT),
1723 )
1724 .await;
1725 network_client
1726 .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1727 .await;
1728 }
1729
1730 let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1732 let commit_index: CommitIndex = round - 1;
1733 let blocks = (0..4)
1734 .map(|authority| {
1735 let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1736 let block = TestBlock::new(round, authority)
1737 .set_commit_votes(commit_votes)
1738 .build();
1739
1740 VerifiedBlock::new_for_test(block)
1741 })
1742 .collect::<Vec<_>>();
1743
1744 for block in blocks {
1747 commit_vote_monitor.observe_block(&block);
1748 }
1749
1750 let _handle = Synchronizer::start(
1753 network_client.clone(),
1754 context.clone(),
1755 core_dispatcher.clone(),
1756 commit_vote_monitor.clone(),
1757 block_verifier,
1758 dag_state.clone(),
1759 false,
1760 );
1761
1762 sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1763
1764 let added_blocks = core_dispatcher.get_add_blocks().await;
1767 assert_eq!(added_blocks, vec![]);
1768
1769 {
1772 let mut d = dag_state.write();
1773 for index in 1..=commit_index {
1774 let commit =
1775 TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1776
1777 d.add_commit(commit);
1778 }
1779
1780 assert_eq!(
1781 d.last_commit_index(),
1782 commit_vote_monitor.quorum_commit_index()
1783 );
1784 }
1785
1786 core_dispatcher
1788 .stub_missing_blocks(missing_blocks.clone())
1789 .await;
1790
1791 sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1792
1793 let mut added_blocks = core_dispatcher.get_add_blocks().await;
1795
1796 added_blocks.sort_by_key(|block| block.reference());
1797 expected_blocks.sort_by_key(|block| block.reference());
1798
1799 assert_eq!(added_blocks, expected_blocks);
1800 }
1801
1802 #[tokio::test(flavor = "current_thread", start_paused = true)]
1803 async fn synchronizer_fetch_own_last_block() {
1804 let (context, _) = Context::new_for_test(4);
1806 let context = Arc::new(context.with_parameters(Parameters {
1807 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1808 ..Default::default()
1809 }));
1810 let block_verifier = Arc::new(NoopBlockVerifier {});
1811 let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1812 let network_client = Arc::new(MockNetworkClient::default());
1813 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1814 let store = Arc::new(MemStore::new());
1815 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1816 let our_index = AuthorityIndex::new_for_test(0);
1817
1818 let mut expected_blocks = (8..=10)
1820 .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1821 .collect::<Vec<_>>();
1822
1823 let block_1 = expected_blocks.pop().unwrap();
1826 network_client
1827 .stub_fetch_latest_blocks(
1828 vec![block_1.clone()],
1829 AuthorityIndex::new_for_test(1),
1830 vec![our_index],
1831 Some(Duration::from_secs(10)),
1832 )
1833 .await;
1834 network_client
1835 .stub_fetch_latest_blocks(
1836 vec![block_1],
1837 AuthorityIndex::new_for_test(1),
1838 vec![our_index],
1839 None,
1840 )
1841 .await;
1842
1843 let block_2 = expected_blocks.pop().unwrap();
1845 network_client
1846 .stub_fetch_latest_blocks(
1847 vec![block_2.clone()],
1848 AuthorityIndex::new_for_test(2),
1849 vec![our_index],
1850 Some(Duration::from_secs(10)),
1851 )
1852 .await;
1853 network_client
1854 .stub_fetch_latest_blocks(
1855 vec![block_2],
1856 AuthorityIndex::new_for_test(2),
1857 vec![our_index],
1858 None,
1859 )
1860 .await;
1861
1862 let block_3 = expected_blocks.pop().unwrap();
1864 network_client
1865 .stub_fetch_latest_blocks(
1866 vec![block_3.clone()],
1867 AuthorityIndex::new_for_test(3),
1868 vec![our_index],
1869 Some(Duration::from_secs(10)),
1870 )
1871 .await;
1872 network_client
1873 .stub_fetch_latest_blocks(
1874 vec![block_3],
1875 AuthorityIndex::new_for_test(3),
1876 vec![our_index],
1877 None,
1878 )
1879 .await;
1880
1881 let handle = Synchronizer::start(
1883 network_client.clone(),
1884 context.clone(),
1885 core_dispatcher.clone(),
1886 commit_vote_monitor,
1887 block_verifier,
1888 dag_state,
1889 true,
1890 );
1891
1892 sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1894
1895 assert_eq!(
1897 core_dispatcher.get_last_own_proposed_round().await,
1898 vec![10]
1899 );
1900
1901 assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
1903
1904 assert_eq!(
1906 context
1907 .metrics
1908 .node_metrics
1909 .sync_last_known_own_block_retries
1910 .get(),
1911 1
1912 );
1913
1914 if let Err(err) = handle.stop().await {
1916 if err.is_panic() {
1917 std::panic::resume_unwind(err.into_panic());
1918 }
1919 }
1920 }
1921}