consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    sync::Arc,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::{prelude::SliceRandom, rngs::ThreadRng};
24use tap::TapFallible;
25use tokio::{
26    runtime::Handle,
27    sync::{mpsc::error::TrySendError, oneshot},
28    task::{JoinError, JoinSet},
29    time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34    BlockAPI, CommitIndex, Round,
35    authority_service::COMMIT_LAG_MULTIPLIER,
36    block::{BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
37    block_verifier::BlockVerifier,
38    commit_vote_monitor::CommitVoteMonitor,
39    context::Context,
40    core_thread::CoreThreadDispatcher,
41    dag_state::DagState,
42    error::{ConsensusError, ConsensusResult},
43    network::NetworkClient,
44};
45
46/// The number of concurrent fetch blocks requests per authority
47const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49/// Timeouts when fetching blocks.
50const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
51const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
52
53/// Max number of blocks to fetch per request.
54/// This value should be chosen so even with blocks at max size, the requests
55/// can finish on hosts with good network using the timeouts above.
56const MAX_BLOCKS_PER_FETCH: usize = 32;
57
58const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
59
60/// The number of rounds above the highest accepted round that still willing to
61/// fetch missing blocks via the periodic synchronizer. Any missing blocks of
62/// higher rounds are considered too far in the future to fetch. This property
63/// is taken into account only when it's detected that the node has fallen
64/// behind on its commit compared to the rest of the network, otherwise
65/// scheduler will attempt to fetch any missing block.
66const SYNC_MISSING_BLOCK_ROUND_THRESHOLD: u32 = 50;
67
68struct BlocksGuard {
69    map: Arc<InflightBlocksMap>,
70    block_refs: BTreeSet<BlockRef>,
71    peer: AuthorityIndex,
72}
73
74impl Drop for BlocksGuard {
75    fn drop(&mut self) {
76        self.map.unlock_blocks(&self.block_refs, self.peer);
77    }
78}
79
80// Keeps a mapping between the missing blocks that have been instructed to be
81// fetched and the authorities that are currently fetching them. For a block ref
82// there is a maximum number of authorities that can concurrently fetch it. The
83// authority ids that are currently fetching a block are set on the
84// corresponding `BTreeSet` and basically they act as "locks".
85struct InflightBlocksMap {
86    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
87}
88
89impl InflightBlocksMap {
90    fn new() -> Arc<Self> {
91        Arc::new(Self {
92            inner: Mutex::new(HashMap::new()),
93        })
94    }
95
96    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
97    /// avoid re-fetching the missing blocks from too many authorities at
98    /// the same time, thus we limit the concurrency per block by attempting
99    /// to lock per block. If a block is already fetched by the maximum allowed
100    /// number of authorities, then the block ref will not be included in the
101    /// returned set. The method returns all the block refs that have been
102    /// successfully locked and allowed to be fetched.
103    fn lock_blocks(
104        self: &Arc<Self>,
105        missing_block_refs: BTreeSet<BlockRef>,
106        peer: AuthorityIndex,
107    ) -> Option<BlocksGuard> {
108        let mut blocks = BTreeSet::new();
109        let mut inner = self.inner.lock();
110
111        for block_ref in missing_block_refs {
112            // check that the number of authorities that are already instructed to fetch the
113            // block is not higher than the allowed and the `peer_index` has not
114            // already been instructed to do that.
115            let authorities = inner.entry(block_ref).or_default();
116            if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
117                && authorities.get(&peer).is_none()
118            {
119                assert!(authorities.insert(peer));
120                blocks.insert(block_ref);
121            }
122        }
123
124        if blocks.is_empty() {
125            None
126        } else {
127            Some(BlocksGuard {
128                map: self.clone(),
129                block_refs: blocks,
130                peer,
131            })
132        }
133    }
134
135    /// Unlocks the provided block references for the given `peer`. The
136    /// unlocking is strict, meaning that if this method is called for a
137    /// specific block ref and peer more times than the corresponding lock
138    /// has been called, it will panic.
139    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
140        // Now mark all the blocks as fetched from the map
141        let mut blocks_to_fetch = self.inner.lock();
142        for block_ref in block_refs {
143            let authorities = blocks_to_fetch
144                .get_mut(block_ref)
145                .expect("Should have found a non empty map");
146
147            assert!(authorities.remove(&peer), "Peer index should be present!");
148
149            // if the last one then just clean up
150            if authorities.is_empty() {
151                blocks_to_fetch.remove(block_ref);
152            }
153        }
154    }
155
156    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
157    /// and lock now again the referenced block refs. The swap is best
158    /// effort and there is no guarantee that the `peer` will be able to
159    /// acquire the new locks.
160    fn swap_locks(
161        self: &Arc<Self>,
162        blocks_guard: BlocksGuard,
163        peer: AuthorityIndex,
164    ) -> Option<BlocksGuard> {
165        let block_refs = blocks_guard.block_refs.clone();
166
167        // Explicitly drop the guard
168        drop(blocks_guard);
169
170        // Now create new guard
171        self.lock_blocks(block_refs, peer)
172    }
173
174    #[cfg(test)]
175    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
176        let inner = self.inner.lock();
177        inner.len()
178    }
179}
180
181enum Command {
182    FetchBlocks {
183        missing_block_refs: BTreeSet<BlockRef>,
184        peer_index: AuthorityIndex,
185        result: oneshot::Sender<Result<(), ConsensusError>>,
186    },
187    FetchOwnLastBlock,
188    KickOffScheduler,
189}
190
191pub(crate) struct SynchronizerHandle {
192    commands_sender: Sender<Command>,
193    tasks: tokio::sync::Mutex<JoinSet<()>>,
194}
195
196impl SynchronizerHandle {
197    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
198    /// block_refs set - from the peer authority.
199    pub(crate) async fn fetch_blocks(
200        &self,
201        missing_block_refs: BTreeSet<BlockRef>,
202        peer_index: AuthorityIndex,
203    ) -> ConsensusResult<()> {
204        let (sender, receiver) = oneshot::channel();
205        self.commands_sender
206            .send(Command::FetchBlocks {
207                missing_block_refs,
208                peer_index,
209                result: sender,
210            })
211            .await
212            .map_err(|_err| ConsensusError::Shutdown)?;
213        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
214    }
215
216    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
217        let mut tasks = self.tasks.lock().await;
218        tasks.abort_all();
219        while let Some(result) = tasks.join_next().await {
220            result?
221        }
222        Ok(())
223    }
224}
225
226/// `Synchronizer` oversees live block synchronization, crucial for node
227/// progress. Live synchronization refers to the process of retrieving missing
228/// blocks, particularly those essential for advancing a node when data from
229/// only a few rounds is absent. If a node significantly lags behind the
230/// network, `commit_syncer` handles fetching missing blocks via a more
231/// efficient approach. `Synchronizer` aims for swift catch-up employing two
232/// mechanisms:
233///
234/// 1. Explicitly requesting missing blocks from designated authorities via the
235///    "block send" path. This includes attempting to fetch any missing
236///    ancestors necessary for processing a received block. Such requests
237///    prioritize the block author, maximizing the chance of prompt retrieval. A
238///    locking mechanism allows concurrent requests for missing blocks from up
239///    to two authorities simultaneously, enhancing the chances of timely
240///    retrieval. Notably, if additional missing blocks arise during block
241///    processing, requests to the same authority are deferred to the scheduler.
242///
243/// 2. Periodically requesting missing blocks via a scheduler. This primarily
244///    serves to retrieve missing blocks that were not ancestors of a received
245///    block via the "block send" path. The scheduler operates on either a fixed
246///    periodic basis or is triggered immediately after explicit fetches
247///    described in (1), ensuring continued block retrieval if gaps persist.
248///
249/// Additionally to the above, the synchronizer can synchronize and fetch the
250/// last own proposed block from the network peers as best effort approach to
251/// recover node from amnesia and avoid making the node equivocate.
252pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
253    context: Arc<Context>,
254    commands_receiver: Receiver<Command>,
255    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
256    core_dispatcher: Arc<D>,
257    commit_vote_monitor: Arc<CommitVoteMonitor>,
258    dag_state: Arc<RwLock<DagState>>,
259    fetch_blocks_scheduler_task: JoinSet<()>,
260    fetch_own_last_block_task: JoinSet<()>,
261    network_client: Arc<C>,
262    block_verifier: Arc<V>,
263    inflight_blocks_map: Arc<InflightBlocksMap>,
264    commands_sender: Sender<Command>,
265}
266
267impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
268    /// Starts the synchronizer, which is responsible for fetching blocks from
269    /// other authorities and managing block synchronization tasks.
270    pub fn start(
271        network_client: Arc<C>,
272        context: Arc<Context>,
273        core_dispatcher: Arc<D>,
274        commit_vote_monitor: Arc<CommitVoteMonitor>,
275        block_verifier: Arc<V>,
276        dag_state: Arc<RwLock<DagState>>,
277        sync_last_known_own_block: bool,
278    ) -> Arc<SynchronizerHandle> {
279        let (commands_sender, commands_receiver) =
280            channel("consensus_synchronizer_commands", 1_000);
281        let inflight_blocks_map = InflightBlocksMap::new();
282
283        // Spawn the tasks to fetch the blocks from the others
284        let mut fetch_block_senders = BTreeMap::new();
285        let mut tasks = JoinSet::new();
286        for (index, _) in context.committee.authorities() {
287            if index == context.own_index {
288                continue;
289            }
290            let (sender, receiver) =
291                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
292            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
293                index,
294                network_client.clone(),
295                block_verifier.clone(),
296                commit_vote_monitor.clone(),
297                context.clone(),
298                core_dispatcher.clone(),
299                dag_state.clone(),
300                receiver,
301                commands_sender.clone(),
302            );
303            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
304            fetch_block_senders.insert(index, sender);
305        }
306
307        let commands_sender_clone = commands_sender.clone();
308
309        if sync_last_known_own_block {
310            commands_sender
311                .try_send(Command::FetchOwnLastBlock)
312                .expect("Failed to sync our last block");
313        }
314
315        // Spawn the task to listen to the requests & periodic runs
316        tasks.spawn(monitored_future!(async move {
317            let mut s = Self {
318                context,
319                commands_receiver,
320                fetch_block_senders,
321                core_dispatcher,
322                commit_vote_monitor,
323                fetch_blocks_scheduler_task: JoinSet::new(),
324                fetch_own_last_block_task: JoinSet::new(),
325                network_client,
326                block_verifier,
327                inflight_blocks_map,
328                commands_sender: commands_sender_clone,
329                dag_state,
330            };
331            s.run().await;
332        }));
333
334        Arc::new(SynchronizerHandle {
335            commands_sender,
336            tasks: tokio::sync::Mutex::new(tasks),
337        })
338    }
339
340    // The main loop to listen for the submitted commands.
341    async fn run(&mut self) {
342        // We want the synchronizer to run periodically every 500ms to fetch any missing
343        // blocks.
344        const SYNCHRONIZER_TIMEOUT: Duration = Duration::from_millis(500);
345        let scheduler_timeout = sleep_until(Instant::now() + SYNCHRONIZER_TIMEOUT);
346
347        tokio::pin!(scheduler_timeout);
348
349        loop {
350            tokio::select! {
351                Some(command) = self.commands_receiver.recv() => {
352                    match command {
353                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
354                            if peer_index == self.context.own_index {
355                                error!("We should never attempt to fetch blocks from our own node");
356                                continue;
357                            }
358
359                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
360                            // task will take care syncing whatever is leftover.
361                            let missing_block_refs = missing_block_refs
362                                .into_iter()
363                                .take(MAX_BLOCKS_PER_FETCH)
364                                .collect();
365
366                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
367                            let Some(blocks_guard) = blocks_guard else {
368                                result.send(Ok(())).ok();
369                                continue;
370                            };
371
372                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
373                            // synchronization task will handle any still missing blocks in next run.
374                            let r = self
375                                .fetch_block_senders
376                                .get(&peer_index)
377                                .expect("Fatal error, sender should be present")
378                                .try_send(blocks_guard)
379                                .map_err(|err| {
380                                    match err {
381                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index),
382                                        TrySendError::Closed(_) => ConsensusError::Shutdown
383                                    }
384                                });
385
386                            result.send(r).ok();
387                        }
388                        Command::FetchOwnLastBlock => {
389                            if self.fetch_own_last_block_task.is_empty() {
390                                self.start_fetch_own_last_block_task();
391                            }
392                        }
393                        Command::KickOffScheduler => {
394                            // just reset the scheduler timeout timer to run immediately if not already running.
395                            // If the scheduler is already running then just reduce the remaining time to run.
396                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
397                                Instant::now()
398                            } else {
399                                Instant::now() + SYNCHRONIZER_TIMEOUT.checked_div(2).unwrap()
400                            };
401
402                            // only reset if it is earlier than the next deadline
403                            if timeout < scheduler_timeout.deadline() {
404                                scheduler_timeout.as_mut().reset(timeout);
405                            }
406                        }
407                    }
408                },
409                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
410                    match result {
411                        Ok(()) => {},
412                        Err(e) => {
413                            if e.is_cancelled() {
414                            } else if e.is_panic() {
415                                std::panic::resume_unwind(e.into_panic());
416                            } else {
417                                panic!("fetch our last block task failed: {e}");
418                            }
419                        },
420                    };
421                },
422                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
423                    match result {
424                        Ok(()) => {},
425                        Err(e) => {
426                            if e.is_cancelled() {
427                            } else if e.is_panic() {
428                                std::panic::resume_unwind(e.into_panic());
429                            } else {
430                                panic!("fetch blocks scheduler task failed: {e}");
431                            }
432                        },
433                    };
434                },
435                () = &mut scheduler_timeout => {
436                    // we want to start a new task only if the previous one has already finished.
437                    if self.fetch_blocks_scheduler_task.is_empty() {
438                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
439                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
440                            return;
441                        };
442                    }
443
444                    scheduler_timeout
445                        .as_mut()
446                        .reset(Instant::now() + SYNCHRONIZER_TIMEOUT);
447                }
448            }
449        }
450    }
451
452    async fn fetch_blocks_from_authority(
453        peer_index: AuthorityIndex,
454        network_client: Arc<C>,
455        block_verifier: Arc<V>,
456        commit_vote_monitor: Arc<CommitVoteMonitor>,
457        context: Arc<Context>,
458        core_dispatcher: Arc<D>,
459        dag_state: Arc<RwLock<DagState>>,
460        mut receiver: Receiver<BlocksGuard>,
461        commands_sender: Sender<Command>,
462    ) {
463        const MAX_RETRIES: u32 = 5;
464        let peer_hostname = &context.committee.authority(peer_index).hostname;
465
466        let mut requests = FuturesUnordered::new();
467
468        loop {
469            tokio::select! {
470                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
471                    // get the highest accepted rounds
472                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
473
474                    requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, 1))
475                },
476                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
477                    match response {
478                        Ok(blocks) => {
479                            if let Err(err) = Self::process_fetched_blocks(blocks,
480                                peer_index,
481                                blocks_guard,
482                                core_dispatcher.clone(),
483                                block_verifier.clone(),
484                                commit_vote_monitor.clone(),
485                                context.clone(),
486                                commands_sender.clone(),
487                                "live"
488                            ).await {
489                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
490                            }
491                        },
492                        Err(_) => {
493                            if retries <= MAX_RETRIES {
494                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
495                            } else {
496                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
497                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
498                                drop(blocks_guard);
499                            }
500                        }
501                    }
502                },
503                else => {
504                    info!("Fetching blocks from authority {peer_index} task will now abort.");
505                    break;
506                }
507            }
508        }
509    }
510
511    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
512    /// error is returned then the verified blocks are immediately sent to
513    /// Core for processing.
514    async fn process_fetched_blocks(
515        serialized_blocks: Vec<Bytes>,
516        peer_index: AuthorityIndex,
517        requested_blocks_guard: BlocksGuard,
518        core_dispatcher: Arc<D>,
519        block_verifier: Arc<V>,
520        commit_vote_monitor: Arc<CommitVoteMonitor>,
521        context: Arc<Context>,
522        commands_sender: Sender<Command>,
523        sync_method: &str,
524    ) -> ConsensusResult<()> {
525        // The maximum number of blocks that can be additionally fetched from the one
526        // requested - those are potentially missing ancestors.
527        const MAX_ADDITIONAL_BLOCKS: usize = 10;
528
529        // Ensure that all the returned blocks do not go over the total max allowed
530        // returned blocks
531        if serialized_blocks.len() > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
532        {
533            return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
534        }
535
536        // Verify all the fetched blocks
537        let blocks = Handle::current()
538            .spawn_blocking({
539                let block_verifier = block_verifier.clone();
540                let context = context.clone();
541                move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
542            })
543            .await
544            .expect("Spawn blocking should not fail")?;
545
546        // Get all the ancestors of the requested blocks only
547        let ancestors = blocks
548            .iter()
549            .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
550            .flat_map(|b| b.ancestors().to_vec())
551            .collect::<BTreeSet<BlockRef>>();
552
553        // Now confirm that the blocks are either between the ones requested, or they
554        // are parents of the requested blocks
555        for block in &blocks {
556            if !requested_blocks_guard
557                .block_refs
558                .contains(&block.reference())
559                && !ancestors.contains(&block.reference())
560            {
561                return Err(ConsensusError::UnexpectedFetchedBlock {
562                    index: peer_index,
563                    block_ref: block.reference(),
564                });
565            }
566        }
567
568        // Record commit votes from the verified blocks.
569        for block in &blocks {
570            commit_vote_monitor.observe_block(block);
571        }
572
573        let metrics = &context.metrics.node_metrics;
574        let peer_hostname = &context.committee.authority(peer_index).hostname;
575        metrics
576            .synchronizer_fetched_blocks_by_peer
577            .with_label_values(&[peer_hostname.as_str(), sync_method])
578            .inc_by(blocks.len() as u64);
579        for block in &blocks {
580            let block_hostname = &context.committee.authority(block.author()).hostname;
581            metrics
582                .synchronizer_fetched_blocks_by_authority
583                .with_label_values(&[block_hostname.as_str(), sync_method])
584                .inc();
585        }
586
587        debug!(
588            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
589            blocks.len(),
590            blocks.iter().map(|b| b.reference().to_string()).join(", "),
591        );
592
593        // Now send them to core for processing. Ignore the returned missing blocks as
594        // we don't want this mechanism to keep feedback looping on fetching
595        // more blocks. The periodic synchronization will take care of that.
596        let missing_blocks = core_dispatcher
597            .add_blocks(blocks)
598            .await
599            .map_err(|_| ConsensusError::Shutdown)?;
600
601        // now release all the locked blocks as they have been fetched, verified &
602        // processed
603        drop(requested_blocks_guard);
604
605        // kick off immediately the scheduled synchronizer
606        if !missing_blocks.is_empty() {
607            // do not block here, so we avoid any possible cycles.
608            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
609            {
610                warn!("Commands channel is full")
611            }
612        }
613
614        context
615            .metrics
616            .node_metrics
617            .missing_blocks_after_fetch_total
618            .inc_by(missing_blocks.len() as u64);
619
620        Ok(())
621    }
622
623    fn get_highest_accepted_rounds(
624        dag_state: Arc<RwLock<DagState>>,
625        context: &Arc<Context>,
626    ) -> Vec<Round> {
627        let blocks = dag_state
628            .read()
629            .get_last_cached_block_per_authority(Round::MAX);
630        assert_eq!(blocks.len(), context.committee.size());
631
632        blocks
633            .into_iter()
634            .map(|(block, _)| block.round())
635            .collect::<Vec<_>>()
636    }
637
638    fn verify_blocks(
639        serialized_blocks: Vec<Bytes>,
640        block_verifier: Arc<V>,
641        context: &Context,
642        peer_index: AuthorityIndex,
643    ) -> ConsensusResult<Vec<VerifiedBlock>> {
644        let mut verified_blocks = Vec::new();
645
646        for serialized_block in serialized_blocks {
647            let signed_block: SignedBlock =
648                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
649
650            // TODO: dedup block verifications, here and with fetched blocks.
651            if let Err(e) = block_verifier.verify(&signed_block) {
652                // TODO: we might want to use a different metric to track the invalid "served"
653                // blocks from the invalid "proposed" ones.
654                let hostname = context.committee.authority(peer_index).hostname.clone();
655
656                context
657                    .metrics
658                    .node_metrics
659                    .invalid_blocks
660                    .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
661                    .inc();
662                warn!("Invalid block received from {}: {}", peer_index, e);
663                return Err(e);
664            }
665            let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
666
667            // Dropping is ok because the block will be refetched.
668            // TODO: improve efficiency, maybe suspend and continue processing the block
669            // asynchronously.
670            let now = context.clock.timestamp_utc_ms();
671            if now < verified_block.timestamp_ms() {
672                warn!(
673                    "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
674                    verified_block.reference(),
675                    verified_block.timestamp_ms(),
676                    now
677                );
678                continue;
679            }
680
681            verified_blocks.push(verified_block);
682        }
683
684        Ok(verified_blocks)
685    }
686
687    async fn fetch_blocks_request(
688        network_client: Arc<C>,
689        peer: AuthorityIndex,
690        blocks_guard: BlocksGuard,
691        highest_rounds: Vec<Round>,
692        request_timeout: Duration,
693        mut retries: u32,
694    ) -> (
695        ConsensusResult<Vec<Bytes>>,
696        BlocksGuard,
697        u32,
698        AuthorityIndex,
699        Vec<Round>,
700    ) {
701        let start = Instant::now();
702        let resp = timeout(
703            request_timeout,
704            network_client.fetch_blocks(
705                peer,
706                blocks_guard
707                    .block_refs
708                    .clone()
709                    .into_iter()
710                    .collect::<Vec<_>>(),
711                highest_rounds.clone(),
712                request_timeout,
713            ),
714        )
715        .await;
716
717        fail_point_async!("consensus-delay");
718
719        let resp = match resp {
720            Ok(Err(err)) => {
721                // Add a delay before retrying - if that is needed. If request has timed out
722                // then eventually this will be a no-op.
723                sleep_until(start + request_timeout).await;
724                retries += 1;
725                Err(err)
726            } // network error
727            Err(err) => {
728                // timeout
729                sleep_until(start + request_timeout).await;
730                retries += 1;
731                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
732            }
733            Ok(result) => result,
734        };
735        (resp, blocks_guard, retries, peer, highest_rounds)
736    }
737
738    fn start_fetch_own_last_block_task(&mut self) {
739        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
740        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
741
742        let context = self.context.clone();
743        let dag_state = self.dag_state.clone();
744        let network_client = self.network_client.clone();
745        let block_verifier = self.block_verifier.clone();
746        let core_dispatcher = self.core_dispatcher.clone();
747
748        self.fetch_own_last_block_task
749            .spawn(monitored_future!(async move {
750                let _scope = monitored_scope("FetchOwnLastBlockTask");
751
752                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
753                    let network_client_cloned = network_client.clone();
754                    let own_index = context.own_index;
755                    async move {
756                        sleep(fetch_own_block_delay).await;
757                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
758                        (r, authority_index)
759                    }
760                };
761
762                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
763                                    let mut result = Vec::new();
764                                    for serialized_block in blocks {
765                                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
766                                        block_verifier.verify(&signed_block).tap_err(|err|{
767                                            let hostname = context.committee.authority(authority_index).hostname.clone();
768                                            context
769                                                .metrics
770                                                .node_metrics
771                                                .invalid_blocks
772                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
773                                                .inc();
774                                            warn!("Invalid block received from {}: {}", authority_index, err);
775                                        })?;
776
777                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
778                                        if verified_block.author() != context.own_index {
779                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
780                                        }
781                                        result.push(verified_block);
782                                    }
783                                    Ok(result)
784                };
785
786                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
787                let mut highest_round = GENESIS_ROUND;
788                // Keep track of the received responses to avoid fetching the own block header from same peer
789                let mut received_response = vec![false; context.committee.size()];
790                // Assume that our node is not Byzantine
791                received_response[context.own_index] = true;
792                let mut total_stake = context.committee.stake(context.own_index);
793                let mut retries = 0;
794                let mut retry_delay_step = Duration::from_millis(500);
795                'main:loop {
796                    if context.committee.size() == 1 {
797                        highest_round = dag_state.read().get_last_proposed_block().round();
798                        info!("Only one node in the network, will not try fetching own last block from peers.");
799                        break 'main;
800                    }
801
802                    // Ask all the other peers about our last block
803                    let mut results = FuturesUnordered::new();
804
805                    for (authority_index, _authority) in context.committee.authorities() {
806                        // Skip our own index and the ones that have already responded
807                        if !received_response[authority_index] {
808                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
809                        }
810                    }
811
812                    // Gather the results but wait to timeout as well
813                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
814                    tokio::pin!(timer);
815
816                    'inner: loop {
817                        tokio::select! {
818                            result = results.next() => {
819                                let Some((result, authority_index)) = result else {
820                                    break 'inner;
821                                };
822                                match result {
823                                    Ok(result) => {
824                                        match process_blocks(result, authority_index) {
825                                            Ok(blocks) => {
826                                                received_response[authority_index] = true;
827                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
828                                                highest_round = highest_round.max(max_round);
829
830                                                total_stake += context.committee.stake(authority_index);
831                                            },
832                                            Err(err) => {
833                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
834                                            }
835                                        }
836                                    },
837                                    Err(err) => {
838                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
839                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
840                                    }
841                                }
842                            },
843                            () = &mut timer => {
844                                info!("Timeout while trying to sync our own last block from peers");
845                                break 'inner;
846                            }
847                        }
848                    }
849
850                    // Request at least a quorum of 2f+1 stake to have replied back.
851                    if context.committee.reached_quorum(total_stake) {
852                        info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
853                        break 'main;
854                    } else {
855                        info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
856                    }
857
858                    retries += 1;
859                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
860                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
861
862                    sleep(retry_delay_step).await;
863
864                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
865                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
866                }
867
868                // Update the Core with the highest detected round
869                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
870
871                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
872                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
873                }
874            }));
875    }
876
877    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
878        let mut missing_blocks = self
879            .core_dispatcher
880            .get_missing_blocks()
881            .await
882            .map_err(|_err| ConsensusError::Shutdown)?;
883
884        // No reason to kick off the scheduler if there are no missing blocks to fetch
885        if missing_blocks.is_empty() {
886            return Ok(());
887        }
888
889        let context = self.context.clone();
890        let network_client = self.network_client.clone();
891        let block_verifier = self.block_verifier.clone();
892        let commit_vote_monitor = self.commit_vote_monitor.clone();
893        let core_dispatcher = self.core_dispatcher.clone();
894        let blocks_to_fetch = self.inflight_blocks_map.clone();
895        let commands_sender = self.commands_sender.clone();
896        let dag_state = self.dag_state.clone();
897
898        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
899        if commit_lagging {
900            // If gc is enabled and we are commit lagging, then we don't want to enable the
901            // scheduler. As the new logic of processing the certified commits
902            // takes place we are guaranteed that commits will happen for all the certified
903            // commits.
904            if dag_state.read().gc_enabled() {
905                return Ok(());
906            }
907
908            // As node is commit lagging try to sync only the missing blocks that are within
909            // the acceptable round thresholds to sync. The rest we don't attempt to
910            // sync yet.
911            let highest_accepted_round = dag_state.read().highest_accepted_round();
912            missing_blocks = missing_blocks
913                .into_iter()
914                .take_while(|b| {
915                    b.round <= highest_accepted_round + SYNC_MISSING_BLOCK_ROUND_THRESHOLD
916                })
917                .collect::<BTreeSet<_>>();
918
919            // If no missing blocks are within the acceptable thresholds to sync while we
920            // commit lag, then we disable the scheduler completely for this run.
921            if missing_blocks.is_empty() {
922                trace!(
923                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
924                );
925                self.context
926                    .metrics
927                    .node_metrics
928                    .fetch_blocks_scheduler_skipped
929                    .with_label_values(&["commit_lagging"])
930                    .inc();
931                return Ok(());
932            }
933        }
934
935        self.fetch_blocks_scheduler_task
936            .spawn(monitored_future!(async move {
937                let _scope = monitored_scope("FetchMissingBlocksScheduler");
938
939                context
940                    .metrics
941                    .node_metrics
942                    .fetch_blocks_scheduler_inflight
943                    .inc();
944                let total_requested = missing_blocks.len();
945
946                fail_point_async!("consensus-delay");
947
948                // Fetch blocks from peers
949                let results = Self::fetch_blocks_from_authorities(
950                    context.clone(),
951                    blocks_to_fetch.clone(),
952                    network_client,
953                    missing_blocks,
954                    dag_state,
955                )
956                .await;
957                context
958                    .metrics
959                    .node_metrics
960                    .fetch_blocks_scheduler_inflight
961                    .dec();
962                if results.is_empty() {
963                    warn!("No results returned while requesting missing blocks");
964                    return;
965                }
966
967                // Now process the returned results
968                let mut total_fetched = 0;
969                for (blocks_guard, fetched_blocks, peer) in results {
970                    total_fetched += fetched_blocks.len();
971
972                    if let Err(err) = Self::process_fetched_blocks(
973                        fetched_blocks,
974                        peer,
975                        blocks_guard,
976                        core_dispatcher.clone(),
977                        block_verifier.clone(),
978                        commit_vote_monitor.clone(),
979                        context.clone(),
980                        commands_sender.clone(),
981                        "periodic",
982                    )
983                    .await
984                    {
985                        warn!(
986                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
987                        );
988                    }
989                }
990
991                debug!(
992                    "Total blocks requested to fetch: {}, total fetched: {}",
993                    total_requested, total_fetched
994                );
995            }));
996        Ok(())
997    }
998
999    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1000        let last_commit_index = self.dag_state.read().last_commit_index();
1001        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1002        let commit_threshold = last_commit_index
1003            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1004        (
1005            commit_threshold < quorum_commit_index,
1006            last_commit_index,
1007            quorum_commit_index,
1008        )
1009    }
1010
1011    /// Fetches the `missing_blocks` from available peers. The method will
1012    /// attempt to split the load amongst multiple (random) peers.
1013    /// The method returns a vector with the fetched blocks from each peer that
1014    /// successfully responded and any corresponding additional ancestor blocks.
1015    /// Each element of the vector is a tuple which contains the requested
1016    /// missing block refs, the returned blocks and the peer authority index.
1017    async fn fetch_blocks_from_authorities(
1018        context: Arc<Context>,
1019        inflight_blocks: Arc<InflightBlocksMap>,
1020        network_client: Arc<C>,
1021        missing_blocks: BTreeSet<BlockRef>,
1022        dag_state: Arc<RwLock<DagState>>,
1023    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1024        const MAX_PEERS: usize = 3;
1025
1026        // Attempt to fetch only up to a max of blocks
1027        let missing_blocks = missing_blocks
1028            .into_iter()
1029            .take(MAX_PEERS * MAX_BLOCKS_PER_FETCH)
1030            .collect::<Vec<_>>();
1031
1032        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1033        for block in &missing_blocks {
1034            missing_blocks_per_authority[block.author] += 1;
1035        }
1036        for (missing, (_, authority)) in missing_blocks_per_authority
1037            .into_iter()
1038            .zip(context.committee.authorities())
1039        {
1040            context
1041                .metrics
1042                .node_metrics
1043                .synchronizer_missing_blocks_by_authority
1044                .with_label_values(&[&authority.hostname])
1045                .inc_by(missing as u64);
1046            context
1047                .metrics
1048                .node_metrics
1049                .synchronizer_current_missing_blocks_by_authority
1050                .with_label_values(&[&authority.hostname])
1051                .set(missing as i64);
1052        }
1053
1054        #[cfg_attr(test, expect(unused_mut))]
1055        let mut peers = context
1056            .committee
1057            .authorities()
1058            .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1059            .collect::<Vec<_>>();
1060
1061        // In test, the order is not randomized
1062        #[cfg(not(test))]
1063        peers.shuffle(&mut ThreadRng::default());
1064
1065        let mut peers = peers.into_iter();
1066        let mut request_futures = FuturesUnordered::new();
1067
1068        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1069
1070        // Send the initial requests
1071        for blocks in missing_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1072            let peer = peers
1073                .next()
1074                .expect("Possible misconfiguration as a peer should be found");
1075            let peer_hostname = &context.committee.authority(peer).hostname;
1076            let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();
1077
1078            // lock the blocks to be fetched. If no lock can be acquired for any of the
1079            // blocks then don't bother
1080            if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1081                info!(
1082                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1083                    block_refs.len(),
1084                    peer,
1085                    peer_hostname,
1086                    block_refs
1087                        .iter()
1088                        .map(|b| b.to_string())
1089                        .collect::<Vec<_>>()
1090                        .join(", ")
1091                );
1092                request_futures.push(Self::fetch_blocks_request(
1093                    network_client.clone(),
1094                    peer,
1095                    blocks_guard,
1096                    highest_rounds.clone(),
1097                    FETCH_REQUEST_TIMEOUT,
1098                    1,
1099                ));
1100            }
1101        }
1102
1103        let mut results = Vec::new();
1104        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1105
1106        tokio::pin!(fetcher_timeout);
1107
1108        loop {
1109            tokio::select! {
1110                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1111                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1112                    match response {
1113                        Ok(fetched_blocks) => {
1114                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1115                            results.push((blocks_guard, fetched_blocks, peer_index));
1116
1117                            // no more pending requests are left, just break the loop
1118                            if request_futures.is_empty() {
1119                                break;
1120                            }
1121                        },
1122                        Err(_) => {
1123                            // try again if there is any peer left
1124                            if let Some(next_peer) = peers.next() {
1125                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1126                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1127                                    info!(
1128                                        "Retrying syncing {} missing blocks from peer {}: {}",
1129                                        blocks_guard.block_refs.len(),
1130                                        peer_hostname,
1131                                        blocks_guard.block_refs
1132                                            .iter()
1133                                            .map(|b| b.to_string())
1134                                            .collect::<Vec<_>>()
1135                                            .join(", ")
1136                                    );
1137                                    request_futures.push(Self::fetch_blocks_request(
1138                                        network_client.clone(),
1139                                        next_peer,
1140                                        blocks_guard,
1141                                        highest_rounds,
1142                                        FETCH_REQUEST_TIMEOUT,
1143                                        1,
1144                                    ));
1145                                } else {
1146                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1147                                }
1148                            } else {
1149                                debug!("No more peers left to fetch blocks");
1150                            }
1151                        }
1152                    }
1153                },
1154                _ = &mut fetcher_timeout => {
1155                    debug!("Timed out while fetching missing blocks");
1156                    break;
1157                }
1158            }
1159        }
1160
1161        results
1162    }
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167    use std::{
1168        collections::{BTreeMap, BTreeSet},
1169        sync::Arc,
1170        time::Duration,
1171    };
1172
1173    use async_trait::async_trait;
1174    use bytes::Bytes;
1175    use consensus_config::{AuthorityIndex, Parameters};
1176    use parking_lot::RwLock;
1177    use tokio::{sync::Mutex, time::sleep};
1178
1179    use crate::{
1180        BlockAPI, CommitDigest, CommitIndex,
1181        authority_service::COMMIT_LAG_MULTIPLIER,
1182        block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1183        block_verifier::NoopBlockVerifier,
1184        commit::{CommitRange, CommitVote, TrustedCommit},
1185        commit_vote_monitor::CommitVoteMonitor,
1186        context::Context,
1187        core_thread::{CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1188        dag_state::DagState,
1189        error::{ConsensusError, ConsensusResult},
1190        network::{BlockStream, NetworkClient},
1191        storage::mem_store::MemStore,
1192        synchronizer::{
1193            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap,
1194            MAX_BLOCKS_PER_FETCH, SYNC_MISSING_BLOCK_ROUND_THRESHOLD, Synchronizer,
1195        },
1196    };
1197
1198    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1199    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1200    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1201    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1202
1203    #[derive(Default)]
1204    struct MockNetworkClient {
1205        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1206        fetch_latest_blocks_requests:
1207            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1208    }
1209
1210    impl MockNetworkClient {
1211        async fn stub_fetch_blocks(
1212            &self,
1213            blocks: Vec<VerifiedBlock>,
1214            peer: AuthorityIndex,
1215            latency: Option<Duration>,
1216        ) {
1217            let mut lock = self.fetch_blocks_requests.lock().await;
1218            let block_refs = blocks
1219                .iter()
1220                .map(|block| block.reference())
1221                .collect::<Vec<_>>();
1222            lock.insert((block_refs, peer), (blocks, latency));
1223        }
1224
1225        async fn stub_fetch_latest_blocks(
1226            &self,
1227            blocks: Vec<VerifiedBlock>,
1228            peer: AuthorityIndex,
1229            authorities: Vec<AuthorityIndex>,
1230            latency: Option<Duration>,
1231        ) {
1232            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1233            lock.entry((peer, authorities))
1234                .or_default()
1235                .push((blocks, latency));
1236        }
1237
1238        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1239            let lock = self.fetch_latest_blocks_requests.lock().await;
1240            lock.len()
1241        }
1242    }
1243
1244    #[async_trait]
1245    impl NetworkClient for MockNetworkClient {
1246        const SUPPORT_STREAMING: bool = false;
1247
1248        async fn send_block(
1249            &self,
1250            _peer: AuthorityIndex,
1251            _serialized_block: &VerifiedBlock,
1252            _timeout: Duration,
1253        ) -> ConsensusResult<()> {
1254            unimplemented!("Unimplemented")
1255        }
1256
1257        async fn subscribe_blocks(
1258            &self,
1259            _peer: AuthorityIndex,
1260            _last_received: Round,
1261            _timeout: Duration,
1262        ) -> ConsensusResult<BlockStream> {
1263            unimplemented!("Unimplemented")
1264        }
1265
1266        async fn fetch_blocks(
1267            &self,
1268            peer: AuthorityIndex,
1269            block_refs: Vec<BlockRef>,
1270            _highest_accepted_rounds: Vec<Round>,
1271            _timeout: Duration,
1272        ) -> ConsensusResult<Vec<Bytes>> {
1273            let mut lock = self.fetch_blocks_requests.lock().await;
1274            let response = lock
1275                .remove(&(block_refs, peer))
1276                .expect("Unexpected fetch blocks request made");
1277
1278            let serialised = response
1279                .0
1280                .into_iter()
1281                .map(|block| block.serialized().clone())
1282                .collect::<Vec<_>>();
1283
1284            drop(lock);
1285
1286            if let Some(latency) = response.1 {
1287                sleep(latency).await;
1288            }
1289
1290            Ok(serialised)
1291        }
1292
1293        async fn fetch_commits(
1294            &self,
1295            _peer: AuthorityIndex,
1296            _commit_range: CommitRange,
1297            _timeout: Duration,
1298        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1299            unimplemented!("Unimplemented")
1300        }
1301
1302        async fn fetch_latest_blocks(
1303            &self,
1304            peer: AuthorityIndex,
1305            authorities: Vec<AuthorityIndex>,
1306            _timeout: Duration,
1307        ) -> ConsensusResult<Vec<Bytes>> {
1308            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1309            let mut responses = lock
1310                .remove(&(peer, authorities.clone()))
1311                .expect("Unexpected fetch blocks request made");
1312
1313            let response = responses.remove(0);
1314            let serialised = response
1315                .0
1316                .into_iter()
1317                .map(|block| block.serialized().clone())
1318                .collect::<Vec<_>>();
1319
1320            if !responses.is_empty() {
1321                lock.insert((peer, authorities), responses);
1322            }
1323
1324            drop(lock);
1325
1326            if let Some(latency) = response.1 {
1327                sleep(latency).await;
1328            }
1329
1330            Ok(serialised)
1331        }
1332
1333        async fn get_latest_rounds(
1334            &self,
1335            _peer: AuthorityIndex,
1336            _timeout: Duration,
1337        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1338            unimplemented!("Unimplemented")
1339        }
1340    }
1341
1342    #[test]
1343    fn inflight_blocks_map() {
1344        // GIVEN
1345        let map = InflightBlocksMap::new();
1346        let some_block_refs = [
1347            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1348            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1349            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1350            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1351        ];
1352        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1353
1354        // Lock & unlock blocks
1355        {
1356            let mut all_guards = Vec::new();
1357
1358            // Try to acquire the block locks for authorities 1 & 2
1359            for i in 1..=2 {
1360                let authority = AuthorityIndex::new_for_test(i);
1361
1362                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1363                let guard = guard.expect("Guard should be created");
1364                assert_eq!(guard.block_refs.len(), 4);
1365
1366                all_guards.push(guard);
1367
1368                // trying to acquire any of them again will not succeed
1369                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1370                assert!(guard.is_none());
1371            }
1372
1373            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1374            // number of allowed peers
1375            let authority_3 = AuthorityIndex::new_for_test(3);
1376
1377            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1378            assert!(guard.is_none());
1379
1380            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1381            // will now succeed
1382            drop(all_guards.remove(0));
1383
1384            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1385            let guard = guard.expect("Guard should be successfully acquired");
1386
1387            assert_eq!(guard.block_refs, missing_block_refs);
1388
1389            // Dropping all guards should unlock on the block refs
1390            drop(guard);
1391            drop(all_guards);
1392
1393            assert_eq!(map.num_of_locked_blocks(), 0);
1394        }
1395
1396        // Swap locks
1397        {
1398            // acquire a lock for authority 1
1399            let authority_1 = AuthorityIndex::new_for_test(1);
1400            let guard = map
1401                .lock_blocks(missing_block_refs.clone(), authority_1)
1402                .unwrap();
1403
1404            // Now swap the locks for authority 2
1405            let authority_2 = AuthorityIndex::new_for_test(2);
1406            let guard = map.swap_locks(guard, authority_2);
1407
1408            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn successful_fetch_blocks_from_peer() {
1414        // GIVEN
1415        let (context, _) = Context::new_for_test(4);
1416        let context = Arc::new(context);
1417        let block_verifier = Arc::new(NoopBlockVerifier {});
1418        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1419        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1420        let network_client = Arc::new(MockNetworkClient::default());
1421        let store = Arc::new(MemStore::new());
1422        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1423
1424        let handle = Synchronizer::start(
1425            network_client.clone(),
1426            context,
1427            core_dispatcher.clone(),
1428            commit_vote_monitor,
1429            block_verifier,
1430            dag_state,
1431            false,
1432        );
1433
1434        // Create some test blocks
1435        let expected_blocks = (0..10)
1436            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1437            .collect::<Vec<_>>();
1438        let missing_blocks = expected_blocks
1439            .iter()
1440            .map(|block| block.reference())
1441            .collect::<BTreeSet<_>>();
1442
1443        // AND stub the fetch_blocks request from peer 1
1444        let peer = AuthorityIndex::new_for_test(1);
1445        network_client
1446            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1447            .await;
1448
1449        // WHEN request missing blocks from peer 1
1450        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1451
1452        // Wait a little bit until those have been added in core
1453        sleep(Duration::from_millis(1_000)).await;
1454
1455        // THEN ensure those ended up in Core
1456        let added_blocks = core_dispatcher.get_add_blocks().await;
1457        assert_eq!(added_blocks, expected_blocks);
1458    }
1459
1460    #[tokio::test]
1461    async fn saturate_fetch_blocks_from_peer() {
1462        // GIVEN
1463        let (context, _) = Context::new_for_test(4);
1464        let context = Arc::new(context);
1465        let block_verifier = Arc::new(NoopBlockVerifier {});
1466        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1467        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1468        let network_client = Arc::new(MockNetworkClient::default());
1469        let store = Arc::new(MemStore::new());
1470        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1471
1472        let handle = Synchronizer::start(
1473            network_client.clone(),
1474            context,
1475            core_dispatcher.clone(),
1476            commit_vote_monitor,
1477            block_verifier,
1478            dag_state,
1479            false,
1480        );
1481
1482        // Create some test blocks
1483        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1484            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1485            .collect::<Vec<_>>();
1486
1487        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1488        let peer = AuthorityIndex::new_for_test(1);
1489        let mut iter = expected_blocks.iter().peekable();
1490        while let Some(block) = iter.next() {
1491            // stub the fetch_blocks request from peer 1 and give some high response latency
1492            // so requests can start blocking the peer task.
1493            network_client
1494                .stub_fetch_blocks(
1495                    vec![block.clone()],
1496                    peer,
1497                    Some(Duration::from_millis(5_000)),
1498                )
1499                .await;
1500
1501            let mut missing_blocks = BTreeSet::new();
1502            missing_blocks.insert(block.reference());
1503
1504            // WHEN requesting to fetch the blocks, it should not succeed for the last
1505            // request and get an error with "saturated" synchronizer
1506            if iter.peek().is_none() {
1507                match handle.fetch_blocks(missing_blocks, peer).await {
1508                    Err(ConsensusError::SynchronizerSaturated(index)) => {
1509                        assert_eq!(index, peer);
1510                    }
1511                    _ => panic!("A saturated synchronizer error was expected"),
1512                }
1513            } else {
1514                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1515            }
1516        }
1517    }
1518
1519    #[tokio::test(flavor = "current_thread", start_paused = true)]
1520    async fn synchronizer_periodic_task_fetch_blocks() {
1521        // GIVEN
1522        let (context, _) = Context::new_for_test(4);
1523        let context = Arc::new(context);
1524        let block_verifier = Arc::new(NoopBlockVerifier {});
1525        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1526        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1527        let network_client = Arc::new(MockNetworkClient::default());
1528        let store = Arc::new(MemStore::new());
1529        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1530
1531        // Create some test blocks
1532        let expected_blocks = (0..10)
1533            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1534            .collect::<Vec<_>>();
1535        let missing_blocks = expected_blocks
1536            .iter()
1537            .map(|block| block.reference())
1538            .collect::<BTreeSet<_>>();
1539
1540        // AND stub the missing blocks
1541        core_dispatcher
1542            .stub_missing_blocks(missing_blocks.clone())
1543            .await;
1544
1545        // AND stub the requests for authority 1 & 2
1546        // Make the first authority timeout, so the second will be called. "We" are
1547        // authority = 0, so we are skipped anyways.
1548        network_client
1549            .stub_fetch_blocks(
1550                expected_blocks.clone(),
1551                AuthorityIndex::new_for_test(1),
1552                Some(FETCH_REQUEST_TIMEOUT),
1553            )
1554            .await;
1555        network_client
1556            .stub_fetch_blocks(
1557                expected_blocks.clone(),
1558                AuthorityIndex::new_for_test(2),
1559                None,
1560            )
1561            .await;
1562
1563        // WHEN start the synchronizer and wait for a couple of seconds
1564        let _handle = Synchronizer::start(
1565            network_client.clone(),
1566            context,
1567            core_dispatcher.clone(),
1568            commit_vote_monitor,
1569            block_verifier,
1570            dag_state,
1571            false,
1572        );
1573
1574        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1575
1576        // THEN the missing blocks should now be fetched and added to core
1577        let added_blocks = core_dispatcher.get_add_blocks().await;
1578        assert_eq!(added_blocks, expected_blocks);
1579
1580        // AND missing blocks should have been consumed by the stub
1581        assert!(
1582            core_dispatcher
1583                .get_missing_blocks()
1584                .await
1585                .unwrap()
1586                .is_empty()
1587        );
1588    }
1589
1590    #[tokio::test(flavor = "current_thread", start_paused = true)]
1591    async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds()
1592     {
1593        // GIVEN
1594        let (mut context, _) = Context::new_for_test(4);
1595
1596        // We want to run this test only when gc is disabled. Once gc gets enabled this
1597        // logic won't execute any more.
1598        context
1599            .protocol_config
1600            .set_consensus_gc_depth_for_testing(0);
1601
1602        let context = Arc::new(context);
1603        let block_verifier = Arc::new(NoopBlockVerifier {});
1604        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1605        let network_client = Arc::new(MockNetworkClient::default());
1606        let store = Arc::new(MemStore::new());
1607        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1608        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1609
1610        // AND stub some missing blocks. The highest accepted round is 0. Create some
1611        // blocks that are below and above the threshold sync.
1612        let expected_blocks = (0..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2)
1613            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1614            .collect::<Vec<_>>();
1615
1616        let missing_blocks = expected_blocks
1617            .iter()
1618            .map(|block| block.reference())
1619            .collect::<BTreeSet<_>>();
1620        core_dispatcher.stub_missing_blocks(missing_blocks).await;
1621
1622        // AND stub the requests for authority 1 & 2
1623        // Make the first authority timeout, so the second will be called. "We" are
1624        // authority = 0, so we are skipped anyways.
1625        let mut expected_blocks = expected_blocks
1626            .into_iter()
1627            .filter(|block| block.round() <= SYNC_MISSING_BLOCK_ROUND_THRESHOLD)
1628            .collect::<Vec<_>>();
1629
1630        for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1631            network_client
1632                .stub_fetch_blocks(
1633                    chunk.to_vec(),
1634                    AuthorityIndex::new_for_test(1),
1635                    Some(FETCH_REQUEST_TIMEOUT),
1636                )
1637                .await;
1638
1639            network_client
1640                .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1641                .await;
1642        }
1643
1644        // Now create some blocks to simulate a commit lag
1645        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1646        let commit_index: CommitIndex = round - 1;
1647        let blocks = (0..4)
1648            .map(|authority| {
1649                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1650                let block = TestBlock::new(round, authority)
1651                    .set_commit_votes(commit_votes)
1652                    .build();
1653
1654                VerifiedBlock::new_for_test(block)
1655            })
1656            .collect::<Vec<_>>();
1657
1658        // Pass them through the commit vote monitor - so now there will be a big commit
1659        // lag to prevent the scheduled synchronizer from running
1660        for block in blocks {
1661            commit_vote_monitor.observe_block(&block);
1662        }
1663
1664        // WHEN start the synchronizer and wait for a couple of seconds where normally
1665        // the synchronizer should have kicked in.
1666        let _handle = Synchronizer::start(
1667            network_client.clone(),
1668            context.clone(),
1669            core_dispatcher.clone(),
1670            commit_vote_monitor.clone(),
1671            block_verifier.clone(),
1672            dag_state.clone(),
1673            false,
1674        );
1675
1676        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1677
1678        // We should be in commit lag mode, but since there are missing blocks within
1679        // the acceptable round thresholds those ones should be fetched. Nothing above.
1680        let mut added_blocks = core_dispatcher.get_add_blocks().await;
1681
1682        added_blocks.sort_by_key(|block| block.reference());
1683        expected_blocks.sort_by_key(|block| block.reference());
1684
1685        assert_eq!(added_blocks, expected_blocks);
1686    }
1687
1688    #[tokio::test(flavor = "current_thread", start_paused = true)]
1689    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1690        // GIVEN
1691        let (context, _) = Context::new_for_test(4);
1692        let context = Arc::new(context);
1693        let block_verifier = Arc::new(NoopBlockVerifier {});
1694        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1695        let network_client = Arc::new(MockNetworkClient::default());
1696        let store = Arc::new(MemStore::new());
1697        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1698        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1699
1700        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
1701        // that are above the threshold sync.
1702        let mut expected_blocks = (SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2
1703            ..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 3)
1704            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1705            .collect::<Vec<_>>();
1706        let missing_blocks = expected_blocks
1707            .iter()
1708            .map(|block| block.reference())
1709            .collect::<BTreeSet<_>>();
1710        core_dispatcher
1711            .stub_missing_blocks(missing_blocks.clone())
1712            .await;
1713
1714        // AND stub the requests for authority 1 & 2
1715        // Make the first authority timeout, so the second will be called. "We" are
1716        // authority = 0, so we are skipped anyways.
1717        for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1718            network_client
1719                .stub_fetch_blocks(
1720                    chunk.to_vec(),
1721                    AuthorityIndex::new_for_test(1),
1722                    Some(FETCH_REQUEST_TIMEOUT),
1723                )
1724                .await;
1725            network_client
1726                .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1727                .await;
1728        }
1729
1730        // Now create some blocks to simulate a commit lag
1731        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1732        let commit_index: CommitIndex = round - 1;
1733        let blocks = (0..4)
1734            .map(|authority| {
1735                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1736                let block = TestBlock::new(round, authority)
1737                    .set_commit_votes(commit_votes)
1738                    .build();
1739
1740                VerifiedBlock::new_for_test(block)
1741            })
1742            .collect::<Vec<_>>();
1743
1744        // Pass them through the commit vote monitor - so now there will be a big commit
1745        // lag to prevent the scheduled synchronizer from running
1746        for block in blocks {
1747            commit_vote_monitor.observe_block(&block);
1748        }
1749
1750        // WHEN start the synchronizer and wait for a couple of seconds where normally
1751        // the synchronizer should have kicked in.
1752        let _handle = Synchronizer::start(
1753            network_client.clone(),
1754            context.clone(),
1755            core_dispatcher.clone(),
1756            commit_vote_monitor.clone(),
1757            block_verifier,
1758            dag_state.clone(),
1759            false,
1760        );
1761
1762        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1763
1764        // Since we should be in commit lag mode none of the missed blocks should have
1765        // been fetched - hence nothing should be sent to core for processing.
1766        let added_blocks = core_dispatcher.get_add_blocks().await;
1767        assert_eq!(added_blocks, vec![]);
1768
1769        // AND advance now the local commit index by adding a new commit that matches
1770        // the commit index of quorum
1771        {
1772            let mut d = dag_state.write();
1773            for index in 1..=commit_index {
1774                let commit =
1775                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1776
1777                d.add_commit(commit);
1778            }
1779
1780            assert_eq!(
1781                d.last_commit_index(),
1782                commit_vote_monitor.quorum_commit_index()
1783            );
1784        }
1785
1786        // Now stub again the missing blocks to fetch the exact same ones.
1787        core_dispatcher
1788            .stub_missing_blocks(missing_blocks.clone())
1789            .await;
1790
1791        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1792
1793        // THEN the missing blocks should now be fetched and added to core
1794        let mut added_blocks = core_dispatcher.get_add_blocks().await;
1795
1796        added_blocks.sort_by_key(|block| block.reference());
1797        expected_blocks.sort_by_key(|block| block.reference());
1798
1799        assert_eq!(added_blocks, expected_blocks);
1800    }
1801
1802    #[tokio::test(flavor = "current_thread", start_paused = true)]
1803    async fn synchronizer_fetch_own_last_block() {
1804        // GIVEN
1805        let (context, _) = Context::new_for_test(4);
1806        let context = Arc::new(context.with_parameters(Parameters {
1807            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1808            ..Default::default()
1809        }));
1810        let block_verifier = Arc::new(NoopBlockVerifier {});
1811        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1812        let network_client = Arc::new(MockNetworkClient::default());
1813        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1814        let store = Arc::new(MemStore::new());
1815        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1816        let our_index = AuthorityIndex::new_for_test(0);
1817
1818        // Create some test blocks
1819        let mut expected_blocks = (8..=10)
1820            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1821            .collect::<Vec<_>>();
1822
1823        // Now set different latest blocks for the peers
1824        // For peer 1 we give the block of round 10 (highest)
1825        let block_1 = expected_blocks.pop().unwrap();
1826        network_client
1827            .stub_fetch_latest_blocks(
1828                vec![block_1.clone()],
1829                AuthorityIndex::new_for_test(1),
1830                vec![our_index],
1831                Some(Duration::from_secs(10)),
1832            )
1833            .await;
1834        network_client
1835            .stub_fetch_latest_blocks(
1836                vec![block_1],
1837                AuthorityIndex::new_for_test(1),
1838                vec![our_index],
1839                None,
1840            )
1841            .await;
1842
1843        // For peer 2 we give the block of round 9
1844        let block_2 = expected_blocks.pop().unwrap();
1845        network_client
1846            .stub_fetch_latest_blocks(
1847                vec![block_2.clone()],
1848                AuthorityIndex::new_for_test(2),
1849                vec![our_index],
1850                Some(Duration::from_secs(10)),
1851            )
1852            .await;
1853        network_client
1854            .stub_fetch_latest_blocks(
1855                vec![block_2],
1856                AuthorityIndex::new_for_test(2),
1857                vec![our_index],
1858                None,
1859            )
1860            .await;
1861
1862        // For peer 3 we give a block with lowest round
1863        let block_3 = expected_blocks.pop().unwrap();
1864        network_client
1865            .stub_fetch_latest_blocks(
1866                vec![block_3.clone()],
1867                AuthorityIndex::new_for_test(3),
1868                vec![our_index],
1869                Some(Duration::from_secs(10)),
1870            )
1871            .await;
1872        network_client
1873            .stub_fetch_latest_blocks(
1874                vec![block_3],
1875                AuthorityIndex::new_for_test(3),
1876                vec![our_index],
1877                None,
1878            )
1879            .await;
1880
1881        // WHEN start the synchronizer and wait for a couple of seconds
1882        let handle = Synchronizer::start(
1883            network_client.clone(),
1884            context.clone(),
1885            core_dispatcher.clone(),
1886            commit_vote_monitor,
1887            block_verifier,
1888            dag_state,
1889            true,
1890        );
1891
1892        // Wait at least for the timeout time
1893        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1894
1895        // Assert that core has been called to set the min propose round
1896        assert_eq!(
1897            core_dispatcher.get_last_own_proposed_round().await,
1898            vec![10]
1899        );
1900
1901        // Ensure that all the requests have been called
1902        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
1903
1904        // And we got one retry
1905        assert_eq!(
1906            context
1907                .metrics
1908                .node_metrics
1909                .sync_last_known_own_block_retries
1910                .get(),
1911            1
1912        );
1913
1914        // Ensure that no panic occurred
1915        if let Err(err) = handle.stop().await {
1916            if err.is_panic() {
1917                std::panic::resume_unwind(err.into_panic());
1918            }
1919        }
1920    }
1921}