consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7    num::NonZeroUsize,
8    sync::Arc,
9    time::Duration,
10};
11
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use futures::{StreamExt as _, stream::FuturesUnordered};
15use iota_macros::fail_point_async;
16use iota_metrics::{
17    monitored_future,
18    monitored_mpsc::{Receiver, Sender, channel},
19    monitored_scope,
20};
21use itertools::Itertools as _;
22use lru::LruCache;
23use parking_lot::{Mutex, RwLock};
24#[cfg(not(test))]
25use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
26use tap::TapFallible;
27use tokio::{
28    runtime::Handle,
29    sync::{mpsc::error::TrySendError, oneshot},
30    task::{JoinError, JoinSet},
31    time::{Instant, sleep, sleep_until, timeout},
32};
33use tracing::{debug, error, info, trace, warn};
34
35use crate::{
36    BlockAPI, CommitIndex, Round,
37    authority_service::COMMIT_LAG_MULTIPLIER,
38    block::{BlockDigest, BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
39    block_verifier::BlockVerifier,
40    commit_vote_monitor::CommitVoteMonitor,
41    context::Context,
42    core_thread::CoreThreadDispatcher,
43    dag_state::DagState,
44    error::{ConsensusError, ConsensusResult},
45    network::NetworkClient,
46};
47
48/// The number of concurrent fetch blocks requests per authority
49const FETCH_BLOCKS_CONCURRENCY: usize = 5;
50
51/// The maximal additional blocks (parents) that can be fetched.
52// TODO: This is a temporary value, and should be removed once the protocol
53// version is updated to support batching
54pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
55
56/// The maximum number of verified block references to cache for deduplication.
57const VERIFIED_BLOCKS_CACHE_CAP: usize = 200_000;
58
59/// The timeout for synchronizer to fetch blocks from a given peer authority.
60const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
61
62/// The timeout for periodic synchronizer to fetch blocks from the peers.
63const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
64
65/// The maximum number of authorities from which we will try to periodically
66/// fetch blocks at the same moment. The guard will protect that we will not ask
67/// from more than this number of authorities at the same time.
68const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
69
70/// The maximum number of authorities from which the live synchronizer will try
71/// to fetch blocks at the same moment. This is lower than the periodic sync
72/// limit to prioritize periodic sync.
73const MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK: usize = 1;
74
75/// The maximum number of peers from which the periodic synchronizer will
76/// request blocks
77const MAX_PERIODIC_SYNC_PEERS: usize = 4;
78
79/// The maximum number of peers in the periodic synchronizer which are chosen
80/// totally random to fetch blocks from. The other peers will be chosen based on
81/// their knowledge of the DAG.
82const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
83
84/// Represents the different methods used for synchronization
85#[derive(Clone)]
86enum SyncMethod {
87    Live,
88    Periodic,
89}
90
91struct BlocksGuard {
92    map: Arc<InflightBlocksMap>,
93    block_refs: BTreeSet<BlockRef>,
94    peer: AuthorityIndex,
95    method: SyncMethod,
96}
97
98impl Drop for BlocksGuard {
99    fn drop(&mut self) {
100        self.map.unlock_blocks(&self.block_refs, self.peer);
101    }
102}
103
104// Keeps a mapping between the missing blocks that have been instructed to be
105// fetched and the authorities that are currently fetching them. For a block ref
106// there is a maximum number of authorities that can concurrently fetch it. The
107// authority ids that are currently fetching a block are set on the
108// corresponding `BTreeSet` and basically they act as "locks".
109struct InflightBlocksMap {
110    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
111}
112
113impl InflightBlocksMap {
114    fn new() -> Arc<Self> {
115        Arc::new(Self {
116            inner: Mutex::new(HashMap::new()),
117        })
118    }
119
120    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
121    /// avoid re-fetching the missing blocks from too many authorities at
122    /// the same time, thus we limit the concurrency per block by attempting
123    /// to lock per block. If a block is already fetched by the maximum allowed
124    /// number of authorities, then the block ref will not be included in the
125    /// returned set. The method returns all the block refs that have been
126    /// successfully locked and allowed to be fetched.
127    ///
128    /// Different limits apply based on the sync method:
129    /// - Periodic sync: Can lock if total authorities <
130    ///   MAX_AUTHORITIES_TO_FETCH_PER_BLOCK (3)
131    /// - Live sync: Can lock if total authorities <
132    ///   MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK (2)
133    fn lock_blocks(
134        self: &Arc<Self>,
135        missing_block_refs: BTreeSet<BlockRef>,
136        peer: AuthorityIndex,
137        method: SyncMethod,
138    ) -> Option<BlocksGuard> {
139        let mut blocks = BTreeSet::new();
140        let mut inner = self.inner.lock();
141
142        for block_ref in missing_block_refs {
143            let authorities = inner.entry(block_ref).or_default();
144
145            // Check if this peer is already fetching this block
146            if authorities.contains(&peer) {
147                continue;
148            }
149
150            // Count total authorities currently fetching this block
151            let total_count = authorities.len();
152
153            // Determine the limit based on the sync method
154            let max_limit = match method {
155                SyncMethod::Live => MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK,
156                SyncMethod::Periodic => MAX_AUTHORITIES_TO_FETCH_PER_BLOCK,
157            };
158
159            // Check if we can acquire the lock
160            if total_count < max_limit {
161                assert!(authorities.insert(peer));
162                blocks.insert(block_ref);
163            }
164        }
165
166        if blocks.is_empty() {
167            None
168        } else {
169            Some(BlocksGuard {
170                map: self.clone(),
171                block_refs: blocks,
172                peer,
173                method,
174            })
175        }
176    }
177
178    /// Unlocks the provided block references for the given `peer`. The
179    /// unlocking is strict, meaning that if this method is called for a
180    /// specific block ref and peer more times than the corresponding lock
181    /// has been called, it will panic.
182    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
183        // Now mark all the blocks as fetched from the map
184        let mut blocks_to_fetch = self.inner.lock();
185        for block_ref in block_refs {
186            let authorities = blocks_to_fetch
187                .get_mut(block_ref)
188                .expect("Should have found a non empty map");
189
190            assert!(authorities.remove(&peer), "Peer index should be present!");
191
192            // if the last one then just clean up
193            if authorities.is_empty() {
194                blocks_to_fetch.remove(block_ref);
195            }
196        }
197    }
198
199    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
200    /// and lock now again the referenced block refs. The swap is best
201    /// effort and there is no guarantee that the `peer` will be able to
202    /// acquire the new locks.
203    fn swap_locks(
204        self: &Arc<Self>,
205        blocks_guard: BlocksGuard,
206        peer: AuthorityIndex,
207    ) -> Option<BlocksGuard> {
208        let block_refs = blocks_guard.block_refs.clone();
209        let method = blocks_guard.method.clone();
210
211        // Explicitly drop the guard
212        drop(blocks_guard);
213
214        // Now create a new guard with the same sync method
215        self.lock_blocks(block_refs, peer, method)
216    }
217
218    #[cfg(test)]
219    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
220        let inner = self.inner.lock();
221        inner.len()
222    }
223}
224
225enum Command {
226    FetchBlocks {
227        missing_block_refs: BTreeSet<BlockRef>,
228        peer_index: AuthorityIndex,
229        result: oneshot::Sender<Result<(), ConsensusError>>,
230    },
231    FetchOwnLastBlock,
232    KickOffScheduler,
233}
234
235pub(crate) struct SynchronizerHandle {
236    commands_sender: Sender<Command>,
237    tasks: tokio::sync::Mutex<JoinSet<()>>,
238}
239
240impl SynchronizerHandle {
241    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
242    /// block_refs set - from the peer authority.
243    pub(crate) async fn fetch_blocks(
244        &self,
245        missing_block_refs: BTreeSet<BlockRef>,
246        peer_index: AuthorityIndex,
247    ) -> ConsensusResult<()> {
248        let (sender, receiver) = oneshot::channel();
249        self.commands_sender
250            .send(Command::FetchBlocks {
251                missing_block_refs,
252                peer_index,
253                result: sender,
254            })
255            .await
256            .map_err(|_err| ConsensusError::Shutdown)?;
257        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
258    }
259
260    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
261        let mut tasks = self.tasks.lock().await;
262        tasks.abort_all();
263        while let Some(result) = tasks.join_next().await {
264            result?
265        }
266        Ok(())
267    }
268}
269
270/// `Synchronizer` oversees live block synchronization, crucial for node
271/// progress. Live synchronization refers to the process of retrieving missing
272/// blocks, particularly those essential for advancing a node when data from
273/// only a few rounds is absent. If a node significantly lags behind the
274/// network, `commit_syncer` handles fetching missing blocks via a more
275/// efficient approach. `Synchronizer` aims for swift catch-up employing two
276/// mechanisms:
277///
278/// 1. Explicitly requesting missing blocks from designated authorities via the
279///    "block send" path. This includes attempting to fetch any missing
280///    ancestors necessary for processing a received block. Such requests
281///    prioritize the block author, maximizing the chance of prompt retrieval. A
282///    locking mechanism allows concurrent requests for missing blocks from up
283///    to two authorities simultaneously, enhancing the chances of timely
284///    retrieval. Notably, if additional missing blocks arise during block
285///    processing, requests to the same authority are deferred to the scheduler.
286///
287/// 2. Periodically requesting missing blocks via a scheduler. This primarily
288///    serves to retrieve missing blocks that were not ancestors of a received
289///    block via the "block send" path. The scheduler operates on either a fixed
290///    periodic basis or is triggered immediately after explicit fetches
291///    described in (1), ensuring continued block retrieval if gaps persist.
292///
293/// Additionally to the above, the synchronizer can synchronize and fetch the
294/// last own proposed block from the network peers as best effort approach to
295/// recover node from amnesia and avoid making the node equivocate.
296pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
297    context: Arc<Context>,
298    commands_receiver: Receiver<Command>,
299    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
300    core_dispatcher: Arc<D>,
301    commit_vote_monitor: Arc<CommitVoteMonitor>,
302    dag_state: Arc<RwLock<DagState>>,
303    fetch_blocks_scheduler_task: JoinSet<()>,
304    fetch_own_last_block_task: JoinSet<()>,
305    network_client: Arc<C>,
306    block_verifier: Arc<V>,
307    inflight_blocks_map: Arc<InflightBlocksMap>,
308    verified_blocks_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
309    commands_sender: Sender<Command>,
310}
311
312impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
313    /// Starts the synchronizer, which is responsible for fetching blocks from
314    /// other authorities and managing block synchronization tasks.
315    pub fn start(
316        network_client: Arc<C>,
317        context: Arc<Context>,
318        core_dispatcher: Arc<D>,
319        commit_vote_monitor: Arc<CommitVoteMonitor>,
320        block_verifier: Arc<V>,
321        dag_state: Arc<RwLock<DagState>>,
322        sync_last_known_own_block: bool,
323    ) -> Arc<SynchronizerHandle> {
324        let (commands_sender, commands_receiver) =
325            channel("consensus_synchronizer_commands", 1_000);
326        let inflight_blocks_map = InflightBlocksMap::new();
327        let verified_blocks_cache = Arc::new(Mutex::new(LruCache::new(
328            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
329        )));
330
331        // Spawn the tasks to fetch the blocks from the others
332        let mut fetch_block_senders = BTreeMap::new();
333        let mut tasks = JoinSet::new();
334        for (index, _) in context.committee.authorities() {
335            if index == context.own_index {
336                continue;
337            }
338            let (sender, receiver) =
339                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
340            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
341                index,
342                network_client.clone(),
343                block_verifier.clone(),
344                verified_blocks_cache.clone(),
345                commit_vote_monitor.clone(),
346                context.clone(),
347                core_dispatcher.clone(),
348                dag_state.clone(),
349                receiver,
350                commands_sender.clone(),
351            );
352            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
353            fetch_block_senders.insert(index, sender);
354        }
355
356        let commands_sender_clone = commands_sender.clone();
357
358        if sync_last_known_own_block {
359            commands_sender
360                .try_send(Command::FetchOwnLastBlock)
361                .expect("Failed to sync our last block");
362        }
363
364        // Spawn the task to listen to the requests & periodic runs
365        tasks.spawn(monitored_future!(async move {
366            let mut s = Self {
367                context,
368                commands_receiver,
369                fetch_block_senders,
370                core_dispatcher,
371                commit_vote_monitor,
372                fetch_blocks_scheduler_task: JoinSet::new(),
373                fetch_own_last_block_task: JoinSet::new(),
374                network_client,
375                block_verifier,
376                inflight_blocks_map,
377                verified_blocks_cache,
378                commands_sender: commands_sender_clone,
379                dag_state,
380            };
381            s.run().await;
382        }));
383
384        Arc::new(SynchronizerHandle {
385            commands_sender,
386            tasks: tokio::sync::Mutex::new(tasks),
387        })
388    }
389
390    // The main loop to listen for the submitted commands.
391    async fn run(&mut self) {
392        // We want the synchronizer to run periodically every 200ms to fetch any missing
393        // blocks.
394        const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
395        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
396
397        tokio::pin!(scheduler_timeout);
398
399        loop {
400            tokio::select! {
401                Some(command) = self.commands_receiver.recv() => {
402                    match command {
403                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
404                            if peer_index == self.context.own_index {
405                                error!("We should never attempt to fetch blocks from our own node");
406                                continue;
407                            }
408
409                            let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
410
411                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
412                            // task will take care syncing whatever is leftover.
413                            let missing_block_refs = missing_block_refs
414                                .into_iter()
415                                .take(self.context.parameters.max_blocks_per_sync)
416                                .collect();
417
418                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index, SyncMethod::Live);
419                            let Some(blocks_guard) = blocks_guard else {
420                                result.send(Ok(())).ok();
421                                continue;
422                            };
423
424                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
425                            // synchronization task will handle any still missing blocks in next run.
426                            let r = self
427                                .fetch_block_senders
428                                .get(&peer_index)
429                                .expect("Fatal error, sender should be present")
430                                .try_send(blocks_guard)
431                                .map_err(|err| {
432                                    match err {
433                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
434                                        TrySendError::Closed(_) => ConsensusError::Shutdown
435                                    }
436                                });
437
438                            result.send(r).ok();
439                        }
440                        Command::FetchOwnLastBlock => {
441                            if self.fetch_own_last_block_task.is_empty() {
442                                self.start_fetch_own_last_block_task();
443                            }
444                        }
445                        Command::KickOffScheduler => {
446                            // just reset the scheduler timeout timer to run immediately if not already running.
447                            // If the scheduler is already running then just reduce the remaining time to run.
448                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
449                                Instant::now()
450                            } else {
451                                Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
452                            };
453
454                            // only reset if it is earlier than the next deadline
455                            if timeout < scheduler_timeout.deadline() {
456                                scheduler_timeout.as_mut().reset(timeout);
457                            }
458                        }
459                    }
460                },
461                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
462                    match result {
463                        Ok(()) => {},
464                        Err(e) => {
465                            if e.is_cancelled() {
466                            } else if e.is_panic() {
467                                std::panic::resume_unwind(e.into_panic());
468                            } else {
469                                panic!("fetch our last block task failed: {e}");
470                            }
471                        },
472                    };
473                },
474                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
475                    match result {
476                        Ok(()) => {},
477                        Err(e) => {
478                            if e.is_cancelled() {
479                            } else if e.is_panic() {
480                                std::panic::resume_unwind(e.into_panic());
481                            } else {
482                                panic!("fetch blocks scheduler task failed: {e}");
483                            }
484                        },
485                    };
486                },
487                () = &mut scheduler_timeout => {
488                    // we want to start a new task only if the previous one has already finished.
489                    if self.fetch_blocks_scheduler_task.is_empty() {
490                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
491                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
492                            return;
493                        };
494                    }
495
496                    scheduler_timeout
497                        .as_mut()
498                        .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
499                }
500            }
501        }
502    }
503
504    async fn fetch_blocks_from_authority(
505        peer_index: AuthorityIndex,
506        network_client: Arc<C>,
507        block_verifier: Arc<V>,
508        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
509        commit_vote_monitor: Arc<CommitVoteMonitor>,
510        context: Arc<Context>,
511        core_dispatcher: Arc<D>,
512        dag_state: Arc<RwLock<DagState>>,
513        mut receiver: Receiver<BlocksGuard>,
514        commands_sender: Sender<Command>,
515    ) {
516        const MAX_RETRIES: u32 = 3;
517        let peer_hostname = &context.committee.authority(peer_index).hostname;
518
519        let mut requests = FuturesUnordered::new();
520
521        loop {
522            tokio::select! {
523                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
524                    // get the highest accepted rounds
525                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
526
527                    // Record metrics for live synchronizer requests
528                    let metrics = &context.metrics.node_metrics;
529                    metrics
530                        .synchronizer_requested_blocks_by_peer
531                        .with_label_values(&[peer_hostname.as_str(), "live"])
532                        .inc_by(blocks_guard.block_refs.len() as u64);
533                    // Count requested blocks per authority and increment metric by one per authority
534                    let mut authors = HashSet::new();
535                    for block_ref in &blocks_guard.block_refs {
536                        authors.insert(block_ref.author);
537                    }
538                    for author in authors {
539                        let host = &context.committee.authority(author).hostname;
540                        metrics
541                            .synchronizer_requested_blocks_by_authority
542                            .with_label_values(&[host.as_str(), "live"])
543                            .inc();
544                    }
545
546                    requests.push(Self::fetch_blocks_request(
547                        network_client.clone(),
548                        peer_index,
549                        blocks_guard,
550                        highest_rounds,
551                        FETCH_REQUEST_TIMEOUT,
552                        1,
553                    ))
554                },
555                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
556                    match response {
557                        Ok(blocks) => {
558                            if let Err(err) = Self::process_fetched_blocks(blocks,
559                                peer_index,
560                                blocks_guard,
561                                core_dispatcher.clone(),
562                                block_verifier.clone(),
563                                verified_cache.clone(),
564                                commit_vote_monitor.clone(),
565                                context.clone(),
566                                commands_sender.clone(),
567                                "live"
568                            ).await {
569                                context.metrics.update_scoring_metrics_on_block_receival(
570                                    peer_index,
571                                    peer_hostname,
572                                    err.clone(),
573                                    "process_fetched_blocks",
574                                );
575                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
576                                context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
577                            }
578                        },
579                        Err(_) => {
580                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
581                            if retries <= MAX_RETRIES {
582                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
583                            } else {
584                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
585                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
586                                drop(blocks_guard);
587                            }
588                        }
589                    }
590                },
591                else => {
592                    info!("Fetching blocks from authority {peer_index} task will now abort.");
593                    break;
594                }
595            }
596        }
597    }
598
599    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
600    /// error is returned then the verified blocks are immediately sent to
601    /// Core for processing.
602    async fn process_fetched_blocks(
603        mut serialized_blocks: Vec<Bytes>,
604        peer_index: AuthorityIndex,
605        requested_blocks_guard: BlocksGuard,
606        core_dispatcher: Arc<D>,
607        block_verifier: Arc<V>,
608        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
609        commit_vote_monitor: Arc<CommitVoteMonitor>,
610        context: Arc<Context>,
611        commands_sender: Sender<Command>,
612        sync_method: &str,
613    ) -> ConsensusResult<()> {
614        if serialized_blocks.is_empty() {
615            return Ok(());
616        }
617        let _s = context
618            .metrics
619            .node_metrics
620            .scope_processing_time
621            .with_label_values(&["Synchronizer::process_fetched_blocks"])
622            .start_timer();
623
624        // Limit the number of the returned blocks processed.
625        if context.protocol_config.consensus_batched_block_sync() {
626            serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
627        } else {
628            // Ensure that all the returned blocks do not go over the total max allowed
629            // returned blocks
630            if serialized_blocks.len()
631                > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
632            {
633                return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
634            }
635        }
636
637        // Verify all the fetched blocks
638        let blocks = Handle::current()
639            .spawn_blocking({
640                let block_verifier = block_verifier.clone();
641                let verified_cache = verified_cache.clone();
642                let context = context.clone();
643                let sync_method = sync_method.to_string();
644                move || {
645                    Self::verify_blocks(
646                        serialized_blocks,
647                        block_verifier,
648                        verified_cache,
649                        &context,
650                        peer_index,
651                        &sync_method,
652                    )
653                }
654            })
655            .await
656            .expect("Spawn blocking should not fail")?;
657
658        if !context.protocol_config.consensus_batched_block_sync() {
659            // Get all the ancestors of the requested blocks only
660            let ancestors = blocks
661                .iter()
662                .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
663                .flat_map(|b| b.ancestors().to_vec())
664                .collect::<BTreeSet<BlockRef>>();
665
666            // Now confirm that the blocks are either between the ones requested, or they
667            // are parents of the requested blocks
668            for block in &blocks {
669                if !requested_blocks_guard
670                    .block_refs
671                    .contains(&block.reference())
672                    && !ancestors.contains(&block.reference())
673                {
674                    return Err(ConsensusError::UnexpectedFetchedBlock {
675                        index: peer_index,
676                        block_ref: block.reference(),
677                    });
678                }
679            }
680        }
681
682        // Record commit votes from the verified blocks.
683        for block in &blocks {
684            commit_vote_monitor.observe_block(block);
685        }
686
687        let metrics = &context.metrics.node_metrics;
688        let peer_hostname = &context.committee.authority(peer_index).hostname;
689        metrics
690            .synchronizer_fetched_blocks_by_peer
691            .with_label_values(&[peer_hostname.as_str(), sync_method])
692            .inc_by(blocks.len() as u64);
693        for block in &blocks {
694            let block_hostname = &context.committee.authority(block.author()).hostname;
695            metrics
696                .synchronizer_fetched_blocks_by_authority
697                .with_label_values(&[block_hostname.as_str(), sync_method])
698                .inc();
699        }
700
701        debug!(
702            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
703            blocks.len(),
704            blocks.iter().map(|b| b.reference().to_string()).join(", "),
705        );
706
707        // Now send them to core for processing. Ignore the returned missing blocks as
708        // we don't want this mechanism to keep feedback looping on fetching
709        // more blocks. The periodic synchronization will take care of that.
710        let missing_blocks = core_dispatcher
711            .add_blocks(blocks)
712            .await
713            .map_err(|_| ConsensusError::Shutdown)?;
714
715        // now release all the locked blocks as they have been fetched, verified &
716        // processed
717        drop(requested_blocks_guard);
718
719        // kick off immediately the scheduled synchronizer
720        if !missing_blocks.is_empty() {
721            // do not block here, so we avoid any possible cycles.
722            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
723            {
724                warn!("Commands channel is full")
725            }
726        }
727
728        context
729            .metrics
730            .node_metrics
731            .missing_blocks_after_fetch_total
732            .inc_by(missing_blocks.len() as u64);
733
734        Ok(())
735    }
736
737    fn get_highest_accepted_rounds(
738        dag_state: Arc<RwLock<DagState>>,
739        context: &Arc<Context>,
740    ) -> Vec<Round> {
741        let blocks = dag_state
742            .read()
743            .get_last_cached_block_per_authority(Round::MAX);
744        assert_eq!(blocks.len(), context.committee.size());
745
746        blocks
747            .into_iter()
748            .map(|(block, _)| block.round())
749            .collect::<Vec<_>>()
750    }
751
752    fn verify_blocks(
753        serialized_blocks: Vec<Bytes>,
754        block_verifier: Arc<V>,
755        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
756        context: &Context,
757        peer_index: AuthorityIndex,
758        sync_method: &str,
759    ) -> ConsensusResult<Vec<VerifiedBlock>> {
760        let mut verified_blocks = Vec::new();
761        let mut skipped_count = 0u64;
762
763        for serialized_block in serialized_blocks {
764            let block_digest = VerifiedBlock::compute_digest(&serialized_block);
765
766            // Check if this block has already been verified
767            if verified_cache.lock().get(&block_digest).is_some() {
768                skipped_count += 1;
769                continue; // Skip already verified blocks
770            }
771
772            let signed_block: SignedBlock =
773                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
774
775            if let Err(e) = block_verifier.verify(&signed_block) {
776                // TODO: we might want to use a different metric to track the invalid "served"
777                // blocks from the invalid "proposed" ones.
778                let hostname = context.committee.authority(peer_index).hostname.clone();
779
780                context
781                    .metrics
782                    .node_metrics
783                    .invalid_blocks
784                    .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
785                    .inc();
786                warn!("Invalid block received from {}: {}", peer_index, e);
787                return Err(e);
788            }
789
790            // Add block to verified cache after successful verification
791            verified_cache.lock().put(block_digest, ());
792
793            let verified_block = VerifiedBlock::new_verified_with_digest(
794                signed_block,
795                serialized_block,
796                block_digest,
797            );
798
799            // Dropping is ok because the block will be refetched.
800            // TODO: improve efficiency, maybe suspend and continue processing the block
801            // asynchronously.
802            let now = context.clock.timestamp_utc_ms();
803            let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
804            if drift > 0 {
805                let peer_hostname = &context
806                    .committee
807                    .authority(verified_block.author())
808                    .hostname;
809                context
810                    .metrics
811                    .node_metrics
812                    .block_timestamp_drift_ms
813                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
814                    .inc_by(drift);
815
816                if context
817                    .protocol_config
818                    .consensus_median_timestamp_with_checkpoint_enforcement()
819                {
820                    trace!(
821                        "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
822                        verified_block.reference(),
823                        verified_block.timestamp_ms(),
824                        now
825                    );
826                } else {
827                    warn!(
828                        "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
829                        verified_block.reference(),
830                        verified_block.timestamp_ms(),
831                        now
832                    );
833                    continue;
834                }
835            }
836
837            verified_blocks.push(verified_block);
838        }
839
840        // Record skipped blocks metric
841        if skipped_count > 0 {
842            let peer_hostname = &context.committee.authority(peer_index).hostname;
843            context
844                .metrics
845                .node_metrics
846                .synchronizer_skipped_blocks_by_peer
847                .with_label_values(&[peer_hostname.as_str(), sync_method])
848                .inc_by(skipped_count);
849        }
850
851        Ok(verified_blocks)
852    }
853
854    async fn fetch_blocks_request(
855        network_client: Arc<C>,
856        peer: AuthorityIndex,
857        blocks_guard: BlocksGuard,
858        highest_rounds: Vec<Round>,
859        request_timeout: Duration,
860        mut retries: u32,
861    ) -> (
862        ConsensusResult<Vec<Bytes>>,
863        BlocksGuard,
864        u32,
865        AuthorityIndex,
866        Vec<Round>,
867    ) {
868        let start = Instant::now();
869        let resp = timeout(
870            request_timeout,
871            network_client.fetch_blocks(
872                peer,
873                blocks_guard
874                    .block_refs
875                    .clone()
876                    .into_iter()
877                    .collect::<Vec<_>>(),
878                highest_rounds.clone(),
879                request_timeout,
880            ),
881        )
882        .await;
883
884        fail_point_async!("consensus-delay");
885
886        let resp = match resp {
887            Ok(Err(err)) => {
888                // Add a delay before retrying - if that is needed. If request has timed out
889                // then eventually this will be a no-op.
890                sleep_until(start + request_timeout).await;
891                retries += 1;
892                Err(err)
893            } // network error
894            Err(err) => {
895                // timeout
896                sleep_until(start + request_timeout).await;
897                retries += 1;
898                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
899            }
900            Ok(result) => result,
901        };
902        (resp, blocks_guard, retries, peer, highest_rounds)
903    }
904
905    fn start_fetch_own_last_block_task(&mut self) {
906        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
907        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
908
909        let context = self.context.clone();
910        let dag_state = self.dag_state.clone();
911        let network_client = self.network_client.clone();
912        let block_verifier = self.block_verifier.clone();
913        let core_dispatcher = self.core_dispatcher.clone();
914
915        self.fetch_own_last_block_task
916            .spawn(monitored_future!(async move {
917                let _scope = monitored_scope("FetchOwnLastBlockTask");
918
919                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
920                    let network_client_cloned = network_client.clone();
921                    let own_index = context.own_index;
922                    async move {
923                        sleep(fetch_own_block_delay).await;
924                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
925                        (r, authority_index)
926                    }
927                };
928
929                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
930                                    let mut result = Vec::new();
931                                    for serialized_block in blocks {
932                                        let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
933                                        block_verifier.verify(&signed_block).tap_err(|err|{
934                                            let hostname = context.committee.authority(authority_index).hostname.clone();
935                                            context
936                                                .metrics
937                                                .node_metrics
938                                                .invalid_blocks
939                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
940                                                .inc();
941                                            warn!("Invalid block received from {}: {}", authority_index, err);
942                                        })?;
943
944                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
945                                        if verified_block.author() != context.own_index {
946                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
947                                        }
948                                        result.push(verified_block);
949                                    }
950                                    Ok(result)
951                };
952
953                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
954                let mut highest_round = GENESIS_ROUND;
955                // Keep track of the received responses to avoid fetching the own block header from same peer
956                let mut received_response = vec![false; context.committee.size()];
957                // Assume that our node is not Byzantine
958                received_response[context.own_index] = true;
959                let mut total_stake = context.committee.stake(context.own_index);
960                let mut retries = 0;
961                let mut retry_delay_step = Duration::from_millis(500);
962                'main:loop {
963                    if context.committee.size() == 1 {
964                        highest_round = dag_state.read().get_last_proposed_block().round();
965                        info!("Only one node in the network, will not try fetching own last block from peers.");
966                        break 'main;
967                    }
968
969                    // Ask all the other peers about our last block
970                    let mut results = FuturesUnordered::new();
971
972                    for (authority_index, _authority) in context.committee.authorities() {
973                        // Skip our own index and the ones that have already responded
974                        if !received_response[authority_index] {
975                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
976                        }
977                    }
978
979                    // Gather the results but wait to timeout as well
980                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
981                    tokio::pin!(timer);
982
983                    'inner: loop {
984                        tokio::select! {
985                            result = results.next() => {
986                                let Some((result, authority_index)) = result else {
987                                    break 'inner;
988                                };
989                                match result {
990                                    Ok(result) => {
991                                        match process_blocks(result, authority_index) {
992                                            Ok(blocks) => {
993                                                received_response[authority_index] = true;
994                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
995                                                highest_round = highest_round.max(max_round);
996
997                                                total_stake += context.committee.stake(authority_index);
998                                            },
999                                            Err(err) => {
1000                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
1001                                            }
1002                                        }
1003                                    },
1004                                    Err(err) => {
1005                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
1006                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
1007                                    }
1008                                }
1009                            },
1010                            () = &mut timer => {
1011                                info!("Timeout while trying to sync our own last block from peers");
1012                                break 'inner;
1013                            }
1014                        }
1015                    }
1016
1017                    // Request at least a quorum of 2f+1 stake to have replied back.
1018                    if context.committee.reached_quorum(total_stake) {
1019                        info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1020                        break 'main;
1021                    } else {
1022                        info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1023                    }
1024
1025                    retries += 1;
1026                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
1027                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
1028
1029                    sleep(retry_delay_step).await;
1030
1031                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
1032                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
1033                }
1034
1035                // Update the Core with the highest detected round
1036                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
1037
1038                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
1039                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
1040                }
1041            }));
1042    }
1043
1044    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
1045        let mut missing_blocks = self
1046            .core_dispatcher
1047            .get_missing_blocks()
1048            .await
1049            .map_err(|_err| ConsensusError::Shutdown)?;
1050
1051        // No reason to kick off the scheduler if there are no missing blocks to fetch
1052        if missing_blocks.is_empty() {
1053            return Ok(());
1054        }
1055
1056        let context = self.context.clone();
1057        let network_client = self.network_client.clone();
1058        let block_verifier = self.block_verifier.clone();
1059        let verified_cache = self.verified_blocks_cache.clone();
1060        let commit_vote_monitor = self.commit_vote_monitor.clone();
1061        let core_dispatcher = self.core_dispatcher.clone();
1062        let blocks_to_fetch = self.inflight_blocks_map.clone();
1063        let commands_sender = self.commands_sender.clone();
1064        let dag_state = self.dag_state.clone();
1065
1066        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
1067        trace!(
1068            "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
1069        );
1070        if commit_lagging {
1071            // If gc is enabled and we are commit lagging, then we don't want to enable the
1072            // scheduler. As the new logic of processing the certified commits
1073            // takes place we are guaranteed that commits will happen for all the certified
1074            // commits.
1075            if dag_state.read().gc_enabled() {
1076                return Ok(());
1077            }
1078
1079            // As node is commit lagging try to sync only the missing blocks that are within
1080            // the acceptable round thresholds to sync. The rest we don't attempt to
1081            // sync yet.
1082            let highest_accepted_round = dag_state.read().highest_accepted_round();
1083            missing_blocks = missing_blocks
1084                .into_iter()
1085                .take_while(|(block_ref, _)| {
1086                    block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
1087                })
1088                .collect::<BTreeMap<_, _>>();
1089
1090            // If no missing blocks are within the acceptable thresholds to sync while we
1091            // commit lag, then we disable the scheduler completely for this run.
1092            if missing_blocks.is_empty() {
1093                trace!(
1094                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1095                );
1096                self.context
1097                    .metrics
1098                    .node_metrics
1099                    .fetch_blocks_scheduler_skipped
1100                    .with_label_values(&["commit_lagging"])
1101                    .inc();
1102                return Ok(());
1103            }
1104        }
1105
1106        self.fetch_blocks_scheduler_task
1107            .spawn(monitored_future!(async move {
1108                let _scope = monitored_scope("FetchMissingBlocksScheduler");
1109
1110                context
1111                    .metrics
1112                    .node_metrics
1113                    .fetch_blocks_scheduler_inflight
1114                    .inc();
1115                let total_requested = missing_blocks.len();
1116
1117                fail_point_async!("consensus-delay");
1118
1119                // Fetch blocks from peers
1120                let results = Self::fetch_blocks_from_authorities(
1121                    context.clone(),
1122                    blocks_to_fetch.clone(),
1123                    network_client,
1124                    missing_blocks,
1125                    dag_state,
1126                )
1127                .await;
1128                context
1129                    .metrics
1130                    .node_metrics
1131                    .fetch_blocks_scheduler_inflight
1132                    .dec();
1133                if results.is_empty() {
1134                    warn!("No results returned while requesting missing blocks");
1135                    return;
1136                }
1137
1138                // Now process the returned results
1139                let mut total_fetched = 0;
1140                for (blocks_guard, fetched_blocks, peer) in results {
1141                    total_fetched += fetched_blocks.len();
1142
1143                    if let Err(err) = Self::process_fetched_blocks(
1144                        fetched_blocks,
1145                        peer,
1146                        blocks_guard,
1147                        core_dispatcher.clone(),
1148                        block_verifier.clone(),
1149                        verified_cache.clone(),
1150                        commit_vote_monitor.clone(),
1151                        context.clone(),
1152                        commands_sender.clone(),
1153                        "periodic",
1154                    )
1155                    .await
1156                    {
1157                        warn!(
1158                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
1159                        );
1160                    }
1161                }
1162
1163                debug!(
1164                    "Total blocks requested to fetch: {}, total fetched: {}",
1165                    total_requested, total_fetched
1166                );
1167            }));
1168        Ok(())
1169    }
1170
1171    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1172        let last_commit_index = self.dag_state.read().last_commit_index();
1173        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1174        let commit_threshold = last_commit_index
1175            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1176        (
1177            commit_threshold < quorum_commit_index,
1178            last_commit_index,
1179            quorum_commit_index,
1180        )
1181    }
1182
1183    /// The number of rounds above the highest accepted round to allow fetching
1184    /// missing blocks via the periodic synchronization. Any missing blocks
1185    /// of higher rounds are considered too far in the future to fetch. This
1186    /// property is used only when it's detected that the node has fallen
1187    /// behind on its commit compared to the rest of the network,
1188    /// otherwise scheduler will attempt to fetch any missing block.
1189    fn missing_block_round_threshold(&self) -> Round {
1190        self.context.parameters.commit_sync_batch_size
1191    }
1192
1193    /// Fetches the given `missing_blocks` from up to `MAX_PEERS` authorities in
1194    /// parallel:
1195    ///
1196    /// Randomly select `MAX_PEERS - MAX_RANDOM_PEERS` peers from those who
1197    /// are known to hold some missing block, requesting up to
1198    /// `MAX_BLOCKS_PER_FETCH` block refs per peer.
1199    ///
1200    /// Randomly select `MAX_RANDOM_PEERS` additional peers from the
1201    ///  committee (excluding self and those already selected).
1202    ///
1203    /// The method returns a vector with the fetched blocks from each peer that
1204    /// successfully responded and any corresponding additional ancestor blocks.
1205    /// Each element of the vector is a tuple which contains the requested
1206    /// missing block refs, the returned blocks and the peer authority index.
1207    async fn fetch_blocks_from_authorities(
1208        context: Arc<Context>,
1209        inflight_blocks: Arc<InflightBlocksMap>,
1210        network_client: Arc<C>,
1211        missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1212        dag_state: Arc<RwLock<DagState>>,
1213    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1214        // Step 1: Map authorities to missing blocks that they are aware of
1215        let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1216        for (missing_block_ref, authorities) in &missing_blocks {
1217            for author in authorities {
1218                if author == &context.own_index {
1219                    // Skip our own index as we don't want to fetch blocks from ourselves
1220                    continue;
1221                }
1222                authority_to_blocks
1223                    .entry(*author)
1224                    .or_default()
1225                    .push(*missing_block_ref);
1226            }
1227        }
1228
1229        // Step 2: Choose at most MAX_PEERS-MAX_RANDOM_PEERS peers from those who are
1230        // aware of some missing blocks
1231
1232        #[cfg(not(test))]
1233        let mut rng = StdRng::from_entropy();
1234
1235        // Randomly pick up MAX_PEERS - MAX_RANDOM_PEERS authorities that are aware of
1236        // missing blocks
1237        #[cfg(not(test))]
1238        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1239            authority_to_blocks
1240                .iter()
1241                .choose_multiple(
1242                    &mut rng,
1243                    MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1244                )
1245                .into_iter()
1246                .map(|(&peer, blocks)| {
1247                    let limited_blocks = blocks
1248                        .iter()
1249                        .copied()
1250                        .take(context.parameters.max_blocks_per_sync)
1251                        .collect();
1252                    (peer, limited_blocks, "periodic_known")
1253                })
1254                .collect();
1255        #[cfg(test)]
1256        // Deterministically pick the smallest (MAX_PEERS - MAX_RANDOM_PEERS) authority indices
1257        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1258            let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1259                .iter()
1260                .map(|(&peer, blocks)| {
1261                    let limited_blocks = blocks
1262                        .iter()
1263                        .copied()
1264                        .take(context.parameters.max_blocks_per_sync)
1265                        .collect();
1266                    (peer, limited_blocks, "periodic_known")
1267                })
1268                .collect();
1269            // Sort by AuthorityIndex (natural order), then take the first MAX_PEERS -
1270            // MAX_RANDOM_PEERS
1271            items.sort_by_key(|(peer, _, _)| *peer);
1272            items
1273                .into_iter()
1274                .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1275                .collect()
1276        };
1277
1278        // Step 3: Choose at most two random peers not known to be aware of the missing
1279        // blocks
1280        let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1281            .iter()
1282            .map(|(peer, _, _)| *peer)
1283            .collect();
1284
1285        let random_candidates: Vec<_> = context
1286            .committee
1287            .authorities()
1288            .filter_map(|(peer_index, _)| {
1289                (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1290                    .then_some(peer_index)
1291            })
1292            .collect();
1293        #[cfg(test)]
1294        let random_peers: Vec<AuthorityIndex> = random_candidates
1295            .into_iter()
1296            .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1297            .collect();
1298        #[cfg(not(test))]
1299        let random_peers: Vec<AuthorityIndex> = random_candidates
1300            .into_iter()
1301            .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1302
1303        #[cfg_attr(test, allow(unused_mut))]
1304        let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1305        // Shuffle the missing blocks in case the first ones are blocked by irresponsive
1306        // peers
1307        #[cfg(not(test))]
1308        all_missing_blocks.shuffle(&mut rng);
1309
1310        let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1311
1312        for peer in random_peers {
1313            if let Some(chunk) = block_chunks.next() {
1314                chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1315            } else {
1316                break;
1317            }
1318        }
1319
1320        let mut request_futures = FuturesUnordered::new();
1321
1322        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1323
1324        // Record the missing blocks per authority for metrics
1325        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1326        for block in &all_missing_blocks {
1327            missing_blocks_per_authority[block.author] += 1;
1328        }
1329        for (missing, (_, authority)) in missing_blocks_per_authority
1330            .into_iter()
1331            .zip(context.committee.authorities())
1332        {
1333            context
1334                .metrics
1335                .node_metrics
1336                .synchronizer_missing_blocks_by_authority
1337                .with_label_values(&[&authority.hostname])
1338                .inc_by(missing as u64);
1339            context
1340                .metrics
1341                .node_metrics
1342                .synchronizer_current_missing_blocks_by_authority
1343                .with_label_values(&[&authority.hostname])
1344                .set(missing as i64);
1345        }
1346
1347        // Look at peers that were not chosen yet and try to fetch blocks from them if
1348        // needed later
1349        #[cfg_attr(test, expect(unused_mut))]
1350        let mut remaining_peers: Vec<_> = context
1351            .committee
1352            .authorities()
1353            .filter_map(|(peer_index, _)| {
1354                if peer_index != context.own_index
1355                    && !chosen_peers_with_blocks
1356                        .iter()
1357                        .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1358                {
1359                    Some(peer_index)
1360                } else {
1361                    None
1362                }
1363            })
1364            .collect();
1365
1366        #[cfg(not(test))]
1367        remaining_peers.shuffle(&mut rng);
1368        let mut remaining_peers = remaining_peers.into_iter();
1369
1370        // Send the initial requests
1371        for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1372            let peer_hostname = &context.committee.authority(peer).hostname;
1373            let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1374
1375            // Lock the blocks to be fetched. If no lock can be acquired for any of the
1376            // blocks then don't bother.
1377            if let Some(blocks_guard) =
1378                inflight_blocks.lock_blocks(block_refs.clone(), peer, SyncMethod::Periodic)
1379            {
1380                info!(
1381                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1382                    block_refs.len(),
1383                    peer,
1384                    peer_hostname,
1385                    block_refs
1386                        .iter()
1387                        .map(|b| b.to_string())
1388                        .collect::<Vec<_>>()
1389                        .join(", ")
1390                );
1391                // Record metrics about requested blocks
1392                let metrics = &context.metrics.node_metrics;
1393                metrics
1394                    .synchronizer_requested_blocks_by_peer
1395                    .with_label_values(&[peer_hostname.as_str(), label])
1396                    .inc_by(block_refs.len() as u64);
1397                for block_ref in &block_refs {
1398                    let block_hostname = &context.committee.authority(block_ref.author).hostname;
1399                    metrics
1400                        .synchronizer_requested_blocks_by_authority
1401                        .with_label_values(&[block_hostname.as_str(), label])
1402                        .inc();
1403                }
1404                request_futures.push(Self::fetch_blocks_request(
1405                    network_client.clone(),
1406                    peer,
1407                    blocks_guard,
1408                    highest_rounds.clone(),
1409                    FETCH_REQUEST_TIMEOUT,
1410                    1,
1411                ));
1412            }
1413        }
1414
1415        let mut results = Vec::new();
1416        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1417
1418        tokio::pin!(fetcher_timeout);
1419
1420        loop {
1421            tokio::select! {
1422                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1423                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1424                    match response {
1425                        Ok(fetched_blocks) => {
1426                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1427                            results.push((blocks_guard, fetched_blocks, peer_index));
1428
1429                            // no more pending requests are left, just break the loop
1430                            if request_futures.is_empty() {
1431                                break;
1432                            }
1433                        },
1434                        Err(_) => {
1435                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1436                            // try again if there is any peer left
1437                            if let Some(next_peer) = remaining_peers.next() {
1438                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1439                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1440                                    info!(
1441                                        "Retrying syncing {} missing blocks from peer {}: {}",
1442                                        blocks_guard.block_refs.len(),
1443                                        peer_hostname,
1444                                        blocks_guard.block_refs
1445                                            .iter()
1446                                            .map(|b| b.to_string())
1447                                            .collect::<Vec<_>>()
1448                                            .join(", ")
1449                                    );
1450                                    let block_refs = blocks_guard.block_refs.clone();
1451                                    // Record metrics about requested blocks
1452                                    let metrics = &context.metrics.node_metrics;
1453                                    metrics
1454                                        .synchronizer_requested_blocks_by_peer
1455                                        .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1456                                        .inc_by(block_refs.len() as u64);
1457                                    for block_ref in &block_refs {
1458                                        let block_hostname =
1459                                            &context.committee.authority(block_ref.author).hostname;
1460                                        metrics
1461                                            .synchronizer_requested_blocks_by_authority
1462                                            .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1463                                            .inc();
1464                                    }
1465                                    request_futures.push(Self::fetch_blocks_request(
1466                                        network_client.clone(),
1467                                        next_peer,
1468                                        blocks_guard,
1469                                        highest_rounds,
1470                                        FETCH_REQUEST_TIMEOUT,
1471                                        1,
1472                                    ));
1473                                } else {
1474                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1475                                }
1476                            } else {
1477                                debug!("No more peers left to fetch blocks");
1478                            }
1479                        }
1480                    }
1481                },
1482                _ = &mut fetcher_timeout => {
1483                    debug!("Timed out while fetching missing blocks");
1484                    break;
1485                }
1486            }
1487        }
1488
1489        results
1490    }
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495    use std::{
1496        collections::{BTreeMap, BTreeSet},
1497        num::NonZeroUsize,
1498        sync::Arc,
1499        time::Duration,
1500    };
1501
1502    use async_trait::async_trait;
1503    use bytes::Bytes;
1504    use consensus_config::{AuthorityIndex, Parameters};
1505    use iota_metrics::monitored_mpsc;
1506    use lru::LruCache;
1507    use parking_lot::{Mutex as SyncMutex, RwLock};
1508    use tokio::{sync::Mutex, time::sleep};
1509
1510    use crate::{
1511        CommitDigest, CommitIndex,
1512        authority_service::COMMIT_LAG_MULTIPLIER,
1513        block::{BlockDigest, BlockRef, Round, SignedBlock, TestBlock, VerifiedBlock},
1514        block_verifier::{BlockVerifier, NoopBlockVerifier},
1515        commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1516        commit_vote_monitor::CommitVoteMonitor,
1517        context::Context,
1518        core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1519        dag_state::DagState,
1520        error::{ConsensusError, ConsensusResult},
1521        network::{BlockStream, NetworkClient},
1522        round_prober::QuorumRound,
1523        storage::mem_store::MemStore,
1524        synchronizer::{
1525            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, SyncMethod,
1526            Synchronizer, VERIFIED_BLOCKS_CACHE_CAP,
1527        },
1528    };
1529
1530    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1531    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1532    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1533    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1534
1535    // Mock verifier that always fails verification
1536    struct FailingBlockVerifier;
1537
1538    impl BlockVerifier for FailingBlockVerifier {
1539        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1540            Err(ConsensusError::WrongEpoch {
1541                expected: 1,
1542                actual: 0,
1543            })
1544        }
1545
1546        fn check_ancestors(
1547            &self,
1548            _block: &VerifiedBlock,
1549            _ancestors: &[Option<VerifiedBlock>],
1550            _gc_enabled: bool,
1551            _gc_round: Round,
1552        ) -> ConsensusResult<()> {
1553            Ok(())
1554        }
1555    }
1556
1557    #[derive(Default)]
1558    struct MockNetworkClient {
1559        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1560        fetch_latest_blocks_requests:
1561            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1562    }
1563
1564    impl MockNetworkClient {
1565        async fn stub_fetch_blocks(
1566            &self,
1567            blocks: Vec<VerifiedBlock>,
1568            peer: AuthorityIndex,
1569            latency: Option<Duration>,
1570        ) {
1571            let mut lock = self.fetch_blocks_requests.lock().await;
1572            let block_refs = blocks
1573                .iter()
1574                .map(|block| block.reference())
1575                .collect::<Vec<_>>();
1576            lock.insert((block_refs, peer), (blocks, latency));
1577        }
1578
1579        async fn stub_fetch_latest_blocks(
1580            &self,
1581            blocks: Vec<VerifiedBlock>,
1582            peer: AuthorityIndex,
1583            authorities: Vec<AuthorityIndex>,
1584            latency: Option<Duration>,
1585        ) {
1586            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1587            lock.entry((peer, authorities))
1588                .or_default()
1589                .push((blocks, latency));
1590        }
1591
1592        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1593            let lock = self.fetch_latest_blocks_requests.lock().await;
1594            lock.len()
1595        }
1596    }
1597
1598    #[async_trait]
1599    impl NetworkClient for MockNetworkClient {
1600        const SUPPORT_STREAMING: bool = false;
1601
1602        async fn send_block(
1603            &self,
1604            _peer: AuthorityIndex,
1605            _serialized_block: &VerifiedBlock,
1606            _timeout: Duration,
1607        ) -> ConsensusResult<()> {
1608            unimplemented!("Unimplemented")
1609        }
1610
1611        async fn subscribe_blocks(
1612            &self,
1613            _peer: AuthorityIndex,
1614            _last_received: Round,
1615            _timeout: Duration,
1616        ) -> ConsensusResult<BlockStream> {
1617            unimplemented!("Unimplemented")
1618        }
1619
1620        async fn fetch_blocks(
1621            &self,
1622            peer: AuthorityIndex,
1623            block_refs: Vec<BlockRef>,
1624            _highest_accepted_rounds: Vec<Round>,
1625            _timeout: Duration,
1626        ) -> ConsensusResult<Vec<Bytes>> {
1627            let mut lock = self.fetch_blocks_requests.lock().await;
1628            let response = lock
1629                .remove(&(block_refs, peer))
1630                .expect("Unexpected fetch blocks request made");
1631
1632            let serialised = response
1633                .0
1634                .into_iter()
1635                .map(|block| block.serialized().clone())
1636                .collect::<Vec<_>>();
1637
1638            drop(lock);
1639
1640            if let Some(latency) = response.1 {
1641                sleep(latency).await;
1642            }
1643
1644            Ok(serialised)
1645        }
1646
1647        async fn fetch_commits(
1648            &self,
1649            _peer: AuthorityIndex,
1650            _commit_range: CommitRange,
1651            _timeout: Duration,
1652        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1653            unimplemented!("Unimplemented")
1654        }
1655
1656        async fn fetch_latest_blocks(
1657            &self,
1658            peer: AuthorityIndex,
1659            authorities: Vec<AuthorityIndex>,
1660            _timeout: Duration,
1661        ) -> ConsensusResult<Vec<Bytes>> {
1662            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1663            let mut responses = lock
1664                .remove(&(peer, authorities.clone()))
1665                .expect("Unexpected fetch blocks request made");
1666
1667            let response = responses.remove(0);
1668            let serialised = response
1669                .0
1670                .into_iter()
1671                .map(|block| block.serialized().clone())
1672                .collect::<Vec<_>>();
1673
1674            if !responses.is_empty() {
1675                lock.insert((peer, authorities), responses);
1676            }
1677
1678            drop(lock);
1679
1680            if let Some(latency) = response.1 {
1681                sleep(latency).await;
1682            }
1683
1684            Ok(serialised)
1685        }
1686
1687        async fn get_latest_rounds(
1688            &self,
1689            _peer: AuthorityIndex,
1690            _timeout: Duration,
1691        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1692            unimplemented!("Unimplemented")
1693        }
1694    }
1695
1696    #[test]
1697    fn test_inflight_blocks_map() {
1698        // GIVEN
1699        let map = InflightBlocksMap::new();
1700        let some_block_refs = [
1701            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1702            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1703            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1704            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1705        ];
1706        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1707
1708        // Lock & unlock blocks
1709        {
1710            let mut all_guards = Vec::new();
1711
1712            // Try to acquire the block locks for authorities 1 & 2 (Periodic limit is 2)
1713            for i in 1..=2 {
1714                let authority = AuthorityIndex::new_for_test(i);
1715
1716                let guard =
1717                    map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1718                let guard = guard.expect("Guard should be created");
1719                assert_eq!(guard.block_refs.len(), 4);
1720
1721                all_guards.push(guard);
1722
1723                // trying to acquire any of them again will not succeed
1724                let guard =
1725                    map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1726                assert!(guard.is_none());
1727            }
1728
1729            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1730            // number of allowed peers (Periodic limit is 2)
1731            let authority_3 = AuthorityIndex::new_for_test(3);
1732
1733            let guard = map.lock_blocks(
1734                missing_block_refs.clone(),
1735                authority_3,
1736                SyncMethod::Periodic,
1737            );
1738            assert!(guard.is_none());
1739
1740            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1741            // will now succeed
1742            drop(all_guards.remove(0));
1743
1744            let guard = map.lock_blocks(
1745                missing_block_refs.clone(),
1746                authority_3,
1747                SyncMethod::Periodic,
1748            );
1749            let guard = guard.expect("Guard should be successfully acquired");
1750
1751            assert_eq!(guard.block_refs, missing_block_refs);
1752
1753            // Dropping all guards should unlock on the block refs
1754            drop(guard);
1755            drop(all_guards);
1756
1757            assert_eq!(map.num_of_locked_blocks(), 0);
1758        }
1759
1760        // Swap locks
1761        {
1762            // acquire a lock for authority 1
1763            let authority_1 = AuthorityIndex::new_for_test(1);
1764            let guard = map
1765                .lock_blocks(
1766                    missing_block_refs.clone(),
1767                    authority_1,
1768                    SyncMethod::Periodic,
1769                )
1770                .unwrap();
1771
1772            // Now swap the locks for authority 2
1773            let authority_2 = AuthorityIndex::new_for_test(2);
1774            let guard = map.swap_locks(guard, authority_2);
1775
1776            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1777        }
1778    }
1779
1780    #[test]
1781    fn test_inflight_blocks_map_live_sync_limit() {
1782        // GIVEN
1783        let map = InflightBlocksMap::new();
1784        let some_block_refs = [
1785            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1786            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1787        ];
1788        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1789
1790        // WHEN authority 1 locks with Live sync
1791        let authority_1 = AuthorityIndex::new_for_test(1);
1792        let guard_1 = map
1793            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1794            .expect("Should successfully lock with Live sync");
1795
1796        assert_eq!(guard_1.block_refs.len(), 2);
1797
1798        // THEN authority 2 cannot lock with Live sync (limit of 1 reached)
1799        let authority_2 = AuthorityIndex::new_for_test(2);
1800        let guard_2 = map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1801
1802        assert!(
1803            guard_2.is_none(),
1804            "Should fail to lock - Live limit of 1 reached"
1805        );
1806
1807        // WHEN authority 1 releases the lock
1808        drop(guard_1);
1809
1810        // THEN authority 2 can now lock with Live sync
1811        let guard_2 = map
1812            .lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live)
1813            .expect("Should successfully lock after authority 1 released");
1814
1815        assert_eq!(guard_2.block_refs.len(), 2);
1816    }
1817
1818    #[test]
1819    fn test_inflight_blocks_map_periodic_allows_more_concurrency() {
1820        // GIVEN
1821        let map = InflightBlocksMap::new();
1822        let some_block_refs = [
1823            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1824            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1825        ];
1826        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1827
1828        // WHEN authority 1 locks with Periodic sync
1829        let authority_1 = AuthorityIndex::new_for_test(1);
1830        let guard_1 = map
1831            .lock_blocks(
1832                missing_block_refs.clone(),
1833                authority_1,
1834                SyncMethod::Periodic,
1835            )
1836            .expect("Should successfully lock with Periodic sync");
1837
1838        assert_eq!(guard_1.block_refs.len(), 2);
1839
1840        // THEN authority 2 can also lock with Periodic sync (limit is 2)
1841        let authority_2 = AuthorityIndex::new_for_test(2);
1842        let guard_2 = map
1843            .lock_blocks(
1844                missing_block_refs.clone(),
1845                authority_2,
1846                SyncMethod::Periodic,
1847            )
1848            .expect("Should successfully lock - Periodic allows 2 authorities");
1849
1850        assert_eq!(guard_2.block_refs.len(), 2);
1851
1852        // BUT authority 3 cannot lock with Periodic sync (limit of 2 reached)
1853        let authority_3 = AuthorityIndex::new_for_test(3);
1854        let guard_3 = map.lock_blocks(
1855            missing_block_refs.clone(),
1856            authority_3,
1857            SyncMethod::Periodic,
1858        );
1859
1860        assert!(
1861            guard_3.is_none(),
1862            "Should fail to lock - Periodic limit of 2 reached"
1863        );
1864
1865        // WHEN authority 1 releases the lock
1866        drop(guard_1);
1867
1868        // THEN authority 3 can now lock with Periodic sync
1869        let guard_3 = map
1870            .lock_blocks(
1871                missing_block_refs.clone(),
1872                authority_3,
1873                SyncMethod::Periodic,
1874            )
1875            .expect("Should successfully lock after authority 1 released");
1876
1877        assert_eq!(guard_3.block_refs.len(), 2);
1878    }
1879
1880    #[test]
1881    fn test_inflight_blocks_map_periodic_blocks_live_when_at_live_limit() {
1882        // GIVEN
1883        let map = InflightBlocksMap::new();
1884        let some_block_refs = [
1885            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1886            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1887        ];
1888        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1889
1890        // WHEN authority 1 locks with Periodic sync (total=1, at Live's limit)
1891        let authority_1 = AuthorityIndex::new_for_test(1);
1892        let guard_1 = map
1893            .lock_blocks(
1894                missing_block_refs.clone(),
1895                authority_1,
1896                SyncMethod::Periodic,
1897            )
1898            .expect("Should successfully lock with Periodic sync");
1899
1900        assert_eq!(guard_1.block_refs.len(), 2);
1901
1902        // THEN authority 2 cannot lock with Live sync (total already at Live limit of
1903        // 1)
1904        let authority_2 = AuthorityIndex::new_for_test(2);
1905        let guard_2_live =
1906            map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1907
1908        assert!(
1909            guard_2_live.is_none(),
1910            "Should fail to lock with Live - total already at Live limit of 1"
1911        );
1912
1913        // BUT authority 2 CAN lock with Periodic sync (total would be 2, at Periodic
1914        // limit)
1915        let guard_2_periodic = map
1916            .lock_blocks(
1917                missing_block_refs.clone(),
1918                authority_2,
1919                SyncMethod::Periodic,
1920            )
1921            .expect("Should successfully lock with Periodic - under Periodic limit of 2");
1922
1923        assert_eq!(guard_2_periodic.block_refs.len(), 2);
1924    }
1925
1926    #[test]
1927    fn test_inflight_blocks_map_live_then_periodic_interaction() {
1928        // GIVEN
1929        let map = InflightBlocksMap::new();
1930        let some_block_refs = [
1931            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1932            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1933        ];
1934        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1935
1936        // WHEN authority 1 locks with Live sync (total=1, at Live limit)
1937        let authority_1 = AuthorityIndex::new_for_test(1);
1938        let guard_1 = map
1939            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1940            .expect("Should successfully lock with Live sync");
1941
1942        assert_eq!(guard_1.block_refs.len(), 2);
1943
1944        // THEN authority 2 cannot lock with Live sync (would exceed Live limit of 1)
1945        let authority_2 = AuthorityIndex::new_for_test(2);
1946        let guard_2_live =
1947            map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1948
1949        assert!(
1950            guard_2_live.is_none(),
1951            "Should fail to lock with Live - would exceed Live limit of 1"
1952        );
1953
1954        // BUT authority 2 CAN lock with Periodic sync (total=2, at Periodic limit)
1955        let guard_2 = map
1956            .lock_blocks(
1957                missing_block_refs.clone(),
1958                authority_2,
1959                SyncMethod::Periodic,
1960            )
1961            .expect("Should successfully lock with Periodic - total 2 is at Periodic limit");
1962
1963        assert_eq!(guard_2.block_refs.len(), 2);
1964
1965        // AND authority 3 cannot lock with Periodic sync (would exceed Periodic limit
1966        // of 2)
1967        let authority_3 = AuthorityIndex::new_for_test(3);
1968        let guard_3 = map.lock_blocks(
1969            missing_block_refs.clone(),
1970            authority_3,
1971            SyncMethod::Periodic,
1972        );
1973
1974        assert!(
1975            guard_3.is_none(),
1976            "Should fail to lock with Periodic - would exceed Periodic limit of 2"
1977        );
1978    }
1979
1980    #[test]
1981    fn test_inflight_blocks_map_partial_locks_mixed_methods() {
1982        // GIVEN 4 blocks
1983        let map = InflightBlocksMap::new();
1984        let block_a = BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1985        let block_b = BlockRef::new(2, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1986        let block_c = BlockRef::new(3, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1987        let block_d = BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1988
1989        // Lock block A with authority 1 using Live (A at limit for Live)
1990        let guard_a = map
1991            .lock_blocks(
1992                [block_a].into(),
1993                AuthorityIndex::new_for_test(1),
1994                SyncMethod::Live,
1995            )
1996            .expect("Should lock block A");
1997        assert_eq!(guard_a.block_refs.len(), 1);
1998
1999        // Lock block B with authorities 1 & 2 using Periodic (B at limit for Periodic)
2000        let guard_b1 = map
2001            .lock_blocks(
2002                [block_b].into(),
2003                AuthorityIndex::new_for_test(1),
2004                SyncMethod::Periodic,
2005            )
2006            .expect("Should lock block B");
2007        let guard_b2 = map
2008            .lock_blocks(
2009                [block_b].into(),
2010                AuthorityIndex::new_for_test(2),
2011                SyncMethod::Periodic,
2012            )
2013            .expect("Should lock block B again");
2014        assert_eq!(guard_b1.block_refs.len(), 1);
2015        assert_eq!(guard_b2.block_refs.len(), 1);
2016
2017        // Lock block C with authority 1 using Periodic (C has 1 lock)
2018        let guard_c = map
2019            .lock_blocks(
2020                [block_c].into(),
2021                AuthorityIndex::new_for_test(1),
2022                SyncMethod::Periodic,
2023            )
2024            .expect("Should lock block C");
2025        assert_eq!(guard_c.block_refs.len(), 1);
2026
2027        // Block D is unlocked
2028
2029        // WHEN authority 3 requests all 4 blocks with Periodic
2030        let all_blocks = [block_a, block_b, block_c, block_d].into();
2031        let guard_3 = map
2032            .lock_blocks(
2033                all_blocks,
2034                AuthorityIndex::new_for_test(3),
2035                SyncMethod::Periodic,
2036            )
2037            .expect("Should get partial lock");
2038
2039        // THEN should successfully lock C and D only
2040        // - A: total=1 (at Live limit), authority 3 can still add since using Periodic
2041        //   and total < 2
2042        // - B: total=2 (at Periodic limit), cannot lock
2043        // - C: total=1, can lock (under limit)
2044        // - D: total=0, can lock
2045        assert_eq!(
2046            guard_3.block_refs.len(),
2047            3,
2048            "Should lock blocks A, C, and D"
2049        );
2050        assert!(
2051            guard_3.block_refs.contains(&block_a),
2052            "Should contain block A"
2053        );
2054        assert!(
2055            !guard_3.block_refs.contains(&block_b),
2056            "Should NOT contain block B (at limit)"
2057        );
2058        assert!(
2059            guard_3.block_refs.contains(&block_c),
2060            "Should contain block C"
2061        );
2062        assert!(
2063            guard_3.block_refs.contains(&block_d),
2064            "Should contain block D"
2065        );
2066    }
2067
2068    #[test]
2069    fn test_inflight_blocks_map_swap_locks_preserves_method() {
2070        // GIVEN
2071        let map = InflightBlocksMap::new();
2072        let some_block_refs = [
2073            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2074            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2075        ];
2076        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
2077
2078        // WHEN authority 1 locks with Live sync
2079        let authority_1 = AuthorityIndex::new_for_test(1);
2080        let guard_1 = map
2081            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
2082            .expect("Should lock with Live sync");
2083
2084        assert_eq!(guard_1.block_refs.len(), 2);
2085
2086        // AND we swap to authority 2
2087        let authority_2 = AuthorityIndex::new_for_test(2);
2088        let guard_2 = map
2089            .swap_locks(guard_1, authority_2)
2090            .expect("Should swap locks");
2091
2092        // THEN the new guard should preserve the block refs
2093        assert_eq!(guard_2.block_refs, missing_block_refs);
2094
2095        // AND authority 3 cannot lock with Live sync (limit of 1 reached)
2096        let authority_3 = AuthorityIndex::new_for_test(3);
2097        let guard_3 = map.lock_blocks(missing_block_refs.clone(), authority_3, SyncMethod::Live);
2098        assert!(guard_3.is_none(), "Should fail - Live limit reached");
2099
2100        // BUT authority 3 CAN lock with Periodic sync
2101        let guard_3_periodic = map
2102            .lock_blocks(
2103                missing_block_refs.clone(),
2104                authority_3,
2105                SyncMethod::Periodic,
2106            )
2107            .expect("Should lock with Periodic");
2108        assert_eq!(guard_3_periodic.block_refs.len(), 2);
2109    }
2110
2111    #[tokio::test]
2112    async fn test_process_fetched_blocks() {
2113        // GIVEN
2114        let (context, _) = Context::new_for_test(4);
2115        let context = Arc::new(context);
2116        let block_verifier = Arc::new(NoopBlockVerifier {});
2117        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2118        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2119        let (commands_sender, _commands_receiver) =
2120            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2121
2122        // Create input test blocks:
2123        // - Authority 0 block at round 60.
2124        // - Authority 1 blocks from round 30 to 93.
2125        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2126        expected_blocks.extend(
2127            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2128        );
2129        assert_eq!(
2130            expected_blocks.len(),
2131            context.parameters.max_blocks_per_sync
2132        );
2133
2134        let expected_serialized_blocks = expected_blocks
2135            .iter()
2136            .map(|b| b.serialized().clone())
2137            .collect::<Vec<_>>();
2138
2139        let expected_block_refs = expected_blocks
2140            .iter()
2141            .map(|b| b.reference())
2142            .collect::<BTreeSet<_>>();
2143
2144        // GIVEN peer to fetch blocks from
2145        let peer_index = AuthorityIndex::new_for_test(2);
2146
2147        // Create blocks_guard
2148        let inflight_blocks_map = InflightBlocksMap::new();
2149        let blocks_guard = inflight_blocks_map
2150            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2151            .expect("Failed to lock blocks");
2152
2153        assert_eq!(
2154            inflight_blocks_map.num_of_locked_blocks(),
2155            expected_block_refs.len()
2156        );
2157
2158        // Create a Synchronizer
2159        let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2160            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2161        )));
2162        let result = Synchronizer::<
2163            MockNetworkClient,
2164            NoopBlockVerifier,
2165            MockCoreThreadDispatcher,
2166        >::process_fetched_blocks(
2167            expected_serialized_blocks,
2168            peer_index,
2169            blocks_guard, // The guard is consumed here
2170            core_dispatcher.clone(),
2171            block_verifier,
2172            verified_cache,
2173            commit_vote_monitor,
2174            context.clone(),
2175            commands_sender,
2176            "test",
2177        )
2178            .await;
2179
2180        // THEN
2181        assert!(result.is_ok());
2182
2183        // Check blocks were sent to core
2184        let added_blocks = core_dispatcher.get_add_blocks().await;
2185        assert_eq!(
2186            added_blocks
2187                .iter()
2188                .map(|b| b.reference())
2189                .collect::<BTreeSet<_>>(),
2190            expected_block_refs,
2191        );
2192
2193        // Check blocks were unlocked
2194        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2195    }
2196
2197    #[tokio::test]
2198    async fn test_successful_fetch_blocks_from_peer() {
2199        // GIVEN
2200        let (context, _) = Context::new_for_test(4);
2201        let context = Arc::new(context);
2202        let block_verifier = Arc::new(NoopBlockVerifier {});
2203        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2204        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2205        let network_client = Arc::new(MockNetworkClient::default());
2206        let store = Arc::new(MemStore::new());
2207        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2208
2209        let handle = Synchronizer::start(
2210            network_client.clone(),
2211            context,
2212            core_dispatcher.clone(),
2213            commit_vote_monitor,
2214            block_verifier,
2215            dag_state,
2216            false,
2217        );
2218
2219        // Create some test blocks
2220        let expected_blocks = (0..10)
2221            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2222            .collect::<Vec<_>>();
2223        let missing_blocks = expected_blocks
2224            .iter()
2225            .map(|block| block.reference())
2226            .collect::<BTreeSet<_>>();
2227
2228        // AND stub the fetch_blocks request from peer 1
2229        let peer = AuthorityIndex::new_for_test(1);
2230        network_client
2231            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
2232            .await;
2233
2234        // WHEN request missing blocks from peer 1
2235        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2236
2237        // Wait a little bit until those have been added in core
2238        sleep(Duration::from_millis(1_000)).await;
2239
2240        // THEN ensure those ended up in Core
2241        let added_blocks = core_dispatcher.get_add_blocks().await;
2242        assert_eq!(added_blocks, expected_blocks);
2243    }
2244
2245    #[tokio::test]
2246    async fn saturate_fetch_blocks_from_peer() {
2247        // GIVEN
2248        let (context, _) = Context::new_for_test(4);
2249        let context = Arc::new(context);
2250        let block_verifier = Arc::new(NoopBlockVerifier {});
2251        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2252        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2253        let network_client = Arc::new(MockNetworkClient::default());
2254        let store = Arc::new(MemStore::new());
2255        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2256
2257        let handle = Synchronizer::start(
2258            network_client.clone(),
2259            context,
2260            core_dispatcher.clone(),
2261            commit_vote_monitor,
2262            block_verifier,
2263            dag_state,
2264            false,
2265        );
2266
2267        // Create some test blocks
2268        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
2269            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
2270            .collect::<Vec<_>>();
2271
2272        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
2273        let peer = AuthorityIndex::new_for_test(1);
2274        let mut iter = expected_blocks.iter().peekable();
2275        while let Some(block) = iter.next() {
2276            // stub the fetch_blocks request from peer 1 and give some high response latency
2277            // so requests can start blocking the peer task.
2278            network_client
2279                .stub_fetch_blocks(
2280                    vec![block.clone()],
2281                    peer,
2282                    Some(Duration::from_millis(5_000)),
2283                )
2284                .await;
2285
2286            let mut missing_blocks = BTreeSet::new();
2287            missing_blocks.insert(block.reference());
2288
2289            // WHEN requesting to fetch the blocks, it should not succeed for the last
2290            // request and get an error with "saturated" synchronizer
2291            if iter.peek().is_none() {
2292                match handle.fetch_blocks(missing_blocks, peer).await {
2293                    Err(ConsensusError::SynchronizerSaturated(index, _)) => {
2294                        assert_eq!(index, peer);
2295                    }
2296                    _ => panic!("A saturated synchronizer error was expected"),
2297                }
2298            } else {
2299                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2300            }
2301        }
2302    }
2303
2304    #[tokio::test(flavor = "current_thread", start_paused = true)]
2305    async fn synchronizer_periodic_task_fetch_blocks() {
2306        // GIVEN
2307        let (context, _) = Context::new_for_test(4);
2308        let context = Arc::new(context);
2309        let block_verifier = Arc::new(NoopBlockVerifier {});
2310        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2311        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2312        let network_client = Arc::new(MockNetworkClient::default());
2313        let store = Arc::new(MemStore::new());
2314        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2315
2316        // Create some test blocks
2317        let expected_blocks = (0..10)
2318            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2319            .collect::<Vec<_>>();
2320        let missing_blocks = expected_blocks
2321            .iter()
2322            .map(|block| block.reference())
2323            .collect::<BTreeSet<_>>();
2324
2325        // AND stub the missing blocks
2326        core_dispatcher
2327            .stub_missing_blocks(missing_blocks.clone())
2328            .await;
2329
2330        // AND stub the requests for authority 1 & 2
2331        // Make the first authority timeout, so the second will be called. "We" are
2332        // authority = 0, so we are skipped anyways.
2333        network_client
2334            .stub_fetch_blocks(
2335                expected_blocks.clone(),
2336                AuthorityIndex::new_for_test(1),
2337                Some(FETCH_REQUEST_TIMEOUT),
2338            )
2339            .await;
2340        network_client
2341            .stub_fetch_blocks(
2342                expected_blocks.clone(),
2343                AuthorityIndex::new_for_test(2),
2344                None,
2345            )
2346            .await;
2347
2348        // WHEN start the synchronizer and wait for a couple of seconds
2349        let _handle = Synchronizer::start(
2350            network_client.clone(),
2351            context,
2352            core_dispatcher.clone(),
2353            commit_vote_monitor,
2354            block_verifier,
2355            dag_state,
2356            false,
2357        );
2358
2359        sleep(8 * FETCH_REQUEST_TIMEOUT).await;
2360
2361        // THEN the missing blocks should now be fetched and added to core
2362        let added_blocks = core_dispatcher.get_add_blocks().await;
2363        assert_eq!(added_blocks, expected_blocks);
2364
2365        // AND missing blocks should have been consumed by the stub
2366        assert!(
2367            core_dispatcher
2368                .get_missing_blocks()
2369                .await
2370                .unwrap()
2371                .is_empty()
2372        );
2373    }
2374
2375    #[tokio::test(flavor = "current_thread", start_paused = true)]
2376    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2377        // GIVEN
2378        let (mut context, _) = Context::new_for_test(4);
2379        context
2380            .protocol_config
2381            .set_consensus_batched_block_sync_for_testing(true);
2382        let context = Arc::new(context);
2383        let block_verifier = Arc::new(NoopBlockVerifier {});
2384        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2385        let network_client = Arc::new(MockNetworkClient::default());
2386        let store = Arc::new(MemStore::new());
2387        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2388        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2389
2390        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
2391        // that are above the sync threshold.
2392        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2393        let stub_blocks = (sync_missing_block_round_threshold * 2
2394            ..sync_missing_block_round_threshold * 2
2395                + context.parameters.max_blocks_per_sync as u32)
2396            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2397            .collect::<Vec<_>>();
2398        let missing_blocks = stub_blocks
2399            .iter()
2400            .map(|block| block.reference())
2401            .collect::<BTreeSet<_>>();
2402        core_dispatcher
2403            .stub_missing_blocks(missing_blocks.clone())
2404            .await;
2405        // AND stub the requests for authority 1 & 2
2406        // Make the first authority timeout, so the second will be called. "We" are
2407        // authority = 0, so we are skipped anyways.
2408        let mut expected_blocks = stub_blocks
2409            .iter()
2410            .take(context.parameters.max_blocks_per_sync)
2411            .cloned()
2412            .collect::<Vec<_>>();
2413        network_client
2414            .stub_fetch_blocks(
2415                expected_blocks.clone(),
2416                AuthorityIndex::new_for_test(1),
2417                Some(FETCH_REQUEST_TIMEOUT),
2418            )
2419            .await;
2420        network_client
2421            .stub_fetch_blocks(
2422                expected_blocks.clone(),
2423                AuthorityIndex::new_for_test(2),
2424                None,
2425            )
2426            .await;
2427
2428        // Now create some blocks to simulate a commit lag
2429        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2430        let commit_index: CommitIndex = round - 1;
2431        let blocks = (0..4)
2432            .map(|authority| {
2433                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2434                let block = TestBlock::new(round, authority)
2435                    .set_commit_votes(commit_votes)
2436                    .build();
2437
2438                VerifiedBlock::new_for_test(block)
2439            })
2440            .collect::<Vec<_>>();
2441
2442        // Pass them through the commit vote monitor - so now there will be a big commit
2443        // lag to prevent the scheduled synchronizer from running
2444        for block in blocks {
2445            commit_vote_monitor.observe_block(&block);
2446        }
2447
2448        // Start the synchronizer and wait for a couple of seconds where normally
2449        // the synchronizer should have kicked in.
2450        let _handle = Synchronizer::start(
2451            network_client.clone(),
2452            context.clone(),
2453            core_dispatcher.clone(),
2454            commit_vote_monitor.clone(),
2455            block_verifier,
2456            dag_state.clone(),
2457            false,
2458        );
2459
2460        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2461
2462        // Since we should be in commit lag mode none of the missed blocks should have
2463        // been fetched - hence nothing should be sent to core for processing.
2464        let added_blocks = core_dispatcher.get_add_blocks().await;
2465        assert_eq!(added_blocks, vec![]);
2466
2467        println!("Before advancing");
2468        // AND advance now the local commit index by adding a new commit that matches
2469        // the commit index of quorum
2470        {
2471            let mut d = dag_state.write();
2472            for index in 1..=commit_index {
2473                let commit =
2474                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2475
2476                d.add_commit(commit);
2477            }
2478
2479            println!("Once advanced");
2480            assert_eq!(
2481                d.last_commit_index(),
2482                commit_vote_monitor.quorum_commit_index()
2483            );
2484        }
2485
2486        // Now stub again the missing blocks to fetch the exact same ones.
2487        core_dispatcher
2488            .stub_missing_blocks(missing_blocks.clone())
2489            .await;
2490
2491        println!("Final sleep");
2492        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2493
2494        // THEN the missing blocks should now be fetched and added to core
2495        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2496        println!("Final await");
2497        added_blocks.sort_by_key(|block| block.reference());
2498        expected_blocks.sort_by_key(|block| block.reference());
2499
2500        assert_eq!(added_blocks, expected_blocks);
2501    }
2502
2503    #[tokio::test(flavor = "current_thread", start_paused = true)]
2504    async fn synchronizer_fetch_own_last_block() {
2505        // GIVEN
2506        let (context, _) = Context::new_for_test(4);
2507        let context = Arc::new(context.with_parameters(Parameters {
2508            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2509            ..Default::default()
2510        }));
2511        let block_verifier = Arc::new(NoopBlockVerifier {});
2512        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2513        let network_client = Arc::new(MockNetworkClient::default());
2514        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2515        let store = Arc::new(MemStore::new());
2516        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2517        let our_index = AuthorityIndex::new_for_test(0);
2518
2519        // Create some test blocks
2520        let mut expected_blocks = (8..=10)
2521            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2522            .collect::<Vec<_>>();
2523
2524        // Now set different latest blocks for the peers
2525        // For peer 1 we give the block of round 10 (highest)
2526        let block_1 = expected_blocks.pop().unwrap();
2527        network_client
2528            .stub_fetch_latest_blocks(
2529                vec![block_1.clone()],
2530                AuthorityIndex::new_for_test(1),
2531                vec![our_index],
2532                Some(Duration::from_secs(10)),
2533            )
2534            .await;
2535        network_client
2536            .stub_fetch_latest_blocks(
2537                vec![block_1],
2538                AuthorityIndex::new_for_test(1),
2539                vec![our_index],
2540                None,
2541            )
2542            .await;
2543
2544        // For peer 2 we give the block of round 9
2545        let block_2 = expected_blocks.pop().unwrap();
2546        network_client
2547            .stub_fetch_latest_blocks(
2548                vec![block_2.clone()],
2549                AuthorityIndex::new_for_test(2),
2550                vec![our_index],
2551                Some(Duration::from_secs(10)),
2552            )
2553            .await;
2554        network_client
2555            .stub_fetch_latest_blocks(
2556                vec![block_2],
2557                AuthorityIndex::new_for_test(2),
2558                vec![our_index],
2559                None,
2560            )
2561            .await;
2562
2563        // For peer 3 we give a block with lowest round
2564        let block_3 = expected_blocks.pop().unwrap();
2565        network_client
2566            .stub_fetch_latest_blocks(
2567                vec![block_3.clone()],
2568                AuthorityIndex::new_for_test(3),
2569                vec![our_index],
2570                Some(Duration::from_secs(10)),
2571            )
2572            .await;
2573        network_client
2574            .stub_fetch_latest_blocks(
2575                vec![block_3],
2576                AuthorityIndex::new_for_test(3),
2577                vec![our_index],
2578                None,
2579            )
2580            .await;
2581
2582        // WHEN start the synchronizer and wait for a couple of seconds
2583        let handle = Synchronizer::start(
2584            network_client.clone(),
2585            context.clone(),
2586            core_dispatcher.clone(),
2587            commit_vote_monitor,
2588            block_verifier,
2589            dag_state,
2590            true,
2591        );
2592
2593        // Wait at least for the timeout time
2594        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2595
2596        // Assert that core has been called to set the min propose round
2597        assert_eq!(
2598            core_dispatcher.get_last_own_proposed_round().await,
2599            vec![10]
2600        );
2601
2602        // Ensure that all the requests have been called
2603        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2604
2605        // And we got one retry
2606        assert_eq!(
2607            context
2608                .metrics
2609                .node_metrics
2610                .sync_last_known_own_block_retries
2611                .get(),
2612            1
2613        );
2614
2615        // Ensure that no panic occurred
2616        if let Err(err) = handle.stop().await {
2617            if err.is_panic() {
2618                std::panic::resume_unwind(err.into_panic());
2619            }
2620        }
2621    }
2622    #[derive(Default)]
2623    struct SyncMockDispatcher {
2624        missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2625        added_blocks: Mutex<Vec<VerifiedBlock>>,
2626    }
2627
2628    #[async_trait::async_trait]
2629    impl CoreThreadDispatcher for SyncMockDispatcher {
2630        async fn get_missing_blocks(
2631            &self,
2632        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2633            Ok(self.missing_blocks.lock().await.clone())
2634        }
2635        async fn add_blocks(
2636            &self,
2637            blocks: Vec<VerifiedBlock>,
2638        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2639            let mut guard = self.added_blocks.lock().await;
2640            guard.extend(blocks.clone());
2641            Ok(blocks.iter().map(|b| b.reference()).collect())
2642        }
2643
2644        // Stub out the remaining CoreThreadDispatcher methods with defaults:
2645
2646        async fn check_block_refs(
2647            &self,
2648            block_refs: Vec<BlockRef>,
2649        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2650            // Echo back the requested refs by default
2651            Ok(block_refs.into_iter().collect())
2652        }
2653
2654        async fn add_certified_commits(
2655            &self,
2656            _commits: CertifiedCommits,
2657        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2658            // No additional certified-commit logic in tests
2659            Ok(BTreeSet::new())
2660        }
2661
2662        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2663            Ok(())
2664        }
2665
2666        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2667            Ok(())
2668        }
2669
2670        fn set_propagation_delay_and_quorum_rounds(
2671            &self,
2672            _delay: Round,
2673            _received_quorum_rounds: Vec<QuorumRound>,
2674            _accepted_quorum_rounds: Vec<QuorumRound>,
2675        ) -> Result<(), CoreError> {
2676            Ok(())
2677        }
2678
2679        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2680            Ok(())
2681        }
2682
2683        fn highest_received_rounds(&self) -> Vec<Round> {
2684            Vec::new()
2685        }
2686    }
2687
2688    #[tokio::test(flavor = "current_thread")]
2689    async fn known_before_random_peer_fetch() {
2690        {
2691            // 1) Setup 10‐node context and in‐mem DAG
2692            let (ctx, _) = Context::new_for_test(10);
2693            let context = Arc::new(ctx);
2694            let store = Arc::new(MemStore::new());
2695            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2696            let inflight = InflightBlocksMap::new();
2697
2698            // 2) One missing block
2699            let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2700            let missing_ref = missing_vb.reference();
2701            let missing_blocks = BTreeMap::from([(
2702                missing_ref,
2703                BTreeSet::from([
2704                    AuthorityIndex::new_for_test(2),
2705                    AuthorityIndex::new_for_test(3),
2706                    AuthorityIndex::new_for_test(4),
2707                ]),
2708            )]);
2709
2710            // 3) Prepare mocks and stubs
2711            let network_client = Arc::new(MockNetworkClient::default());
2712            // Stub *all*  authorities so none panic:
2713            for i in 1..=9 {
2714                let peer = AuthorityIndex::new_for_test(i);
2715                if i == 1 || i == 4 {
2716                    network_client
2717                        .stub_fetch_blocks(
2718                            vec![missing_vb.clone()],
2719                            peer,
2720                            Some(2 * FETCH_REQUEST_TIMEOUT),
2721                        )
2722                        .await;
2723                    continue;
2724                }
2725                network_client
2726                    .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2727                    .await;
2728            }
2729
2730            // 4) Invoke knowledge-based fetch and random fallback selection
2731            //    deterministically
2732            let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2733        ::fetch_blocks_from_authorities(
2734            context.clone(),
2735            inflight.clone(),
2736            network_client.clone(),
2737            missing_blocks,
2738            dag_state.clone(),
2739        )
2740            .await;
2741
2742            // 5) Assert we got exactly two fetches - two from the first two who are aware
2743            //    of the missing block (authority 2 and 3)
2744            assert_eq!(results.len(), 2);
2745
2746            // 6) The  knowledge-based‐fetch went to peer 2 and 3
2747            let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2748            assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2749            let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2750            assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2751            // 7) Verify the returned bytes correspond to that block
2752            let expected = missing_vb.serialized().clone();
2753            assert_eq!(hot_bytes, &vec![expected]);
2754        }
2755    }
2756
2757    #[tokio::test(flavor = "current_thread")]
2758    async fn known_before_periodic_peer_fetch_larger_scenario() {
2759        use std::{
2760            collections::{BTreeMap, BTreeSet},
2761            sync::Arc,
2762        };
2763
2764        use parking_lot::RwLock;
2765
2766        use crate::{
2767            block::{Round, TestBlock, VerifiedBlock},
2768            context::Context,
2769        };
2770
2771        // 1) Setup a 10-node context, in-memory DAG, and inflight map
2772        let (ctx, _) = Context::new_for_test(10);
2773        let context = Arc::new(ctx);
2774        let store = Arc::new(MemStore::new());
2775        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2776        let inflight = InflightBlocksMap::new();
2777        let network_client = Arc::new(MockNetworkClient::default());
2778
2779        // 2) Create 1000 missing blocks known by authorities 0, 2, and 3
2780        let mut missing_blocks = BTreeMap::new();
2781        let mut missing_vbs = Vec::new();
2782        let known_number_blocks = 10;
2783        for i in 0..1000 {
2784            let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2785            let r = vb.reference();
2786            if i < known_number_blocks {
2787                // First 10 blocks are known by authorities 0, 2
2788                missing_blocks.insert(
2789                    r,
2790                    BTreeSet::from([
2791                        AuthorityIndex::new_for_test(0),
2792                        AuthorityIndex::new_for_test(2),
2793                    ]),
2794                );
2795            } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2796                // Second 10 blocks are known by authorities 0, 3
2797                missing_blocks.insert(
2798                    r,
2799                    BTreeSet::from([
2800                        AuthorityIndex::new_for_test(0),
2801                        AuthorityIndex::new_for_test(3),
2802                    ]),
2803                );
2804            } else {
2805                // The rest are only known by authority 0
2806                missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2807            }
2808            missing_vbs.push(vb);
2809        }
2810
2811        // 3) Stub fetches for knowledge-based peers (2 and 3)
2812        let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2813        let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2814            .iter()
2815            .map(|&peer| {
2816                let vbs = missing_vbs
2817                    .iter()
2818                    .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2819                    .take(context.parameters.max_blocks_per_sync)
2820                    .cloned()
2821                    .collect::<Vec<_>>();
2822                (peer, vbs)
2823            })
2824            .collect();
2825
2826        for (peer, vbs) in known_vbs_by_peer {
2827            if peer == AuthorityIndex::new_for_test(2) {
2828                // Simulate timeout for peer 2, then fallback to peer 5
2829                network_client
2830                    .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2831                    .await;
2832                network_client
2833                    .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2834                    .await;
2835            } else {
2836                network_client
2837                    .stub_fetch_blocks(vbs.clone(), peer, None)
2838                    .await;
2839            }
2840        }
2841
2842        // 4) Stub fetches from periodic path peers (1 and 4)
2843        network_client
2844            .stub_fetch_blocks(
2845                missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2846                AuthorityIndex::new_for_test(1),
2847                None,
2848            )
2849            .await;
2850
2851        network_client
2852            .stub_fetch_blocks(
2853                missing_vbs[context.parameters.max_blocks_per_sync
2854                    ..2 * context.parameters.max_blocks_per_sync]
2855                    .to_vec(),
2856                AuthorityIndex::new_for_test(4),
2857                None,
2858            )
2859            .await;
2860
2861        // 5) Execute the fetch logic
2862        let results = Synchronizer::<
2863            MockNetworkClient,
2864            NoopBlockVerifier,
2865            SyncMockDispatcher,
2866        >::fetch_blocks_from_authorities(
2867            context.clone(),
2868            inflight.clone(),
2869            network_client.clone(),
2870            missing_blocks,
2871            dag_state.clone(),
2872        )
2873            .await;
2874
2875        // 6) Assert we got 4 fetches: peer 2 (timed out), fallback to 5, then periodic
2876        //    from 1 and 4
2877        assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2878
2879        // 7) First fetch from peer 3 (knowledge-based)
2880        let (_guard3, bytes3, peer3) = &results[0];
2881        assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
2882        let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
2883            .iter()
2884            .map(|vb| vb.serialized().clone())
2885            .collect::<Vec<_>>();
2886        assert_eq!(bytes3, &expected2);
2887
2888        // 8) Second fetch from peer 1 (periodic)
2889        let (_guard1, bytes1, peer1) = &results[1];
2890        assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
2891        let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
2892            .iter()
2893            .map(|vb| vb.serialized().clone())
2894            .collect::<Vec<_>>();
2895        assert_eq!(bytes1, &expected1);
2896
2897        // 9) Third fetch from peer 4 (periodic)
2898        let (_guard4, bytes4, peer4) = &results[2];
2899        assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
2900        let expected4 = missing_vbs
2901            [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
2902            .iter()
2903            .map(|vb| vb.serialized().clone())
2904            .collect::<Vec<_>>();
2905        assert_eq!(bytes4, &expected4);
2906
2907        // 10) Fourth fetch from peer 5 (fallback after peer 2 timeout)
2908        let (_guard5, bytes5, peer5) = &results[3];
2909        assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
2910        let expected5 = missing_vbs[0..known_number_blocks]
2911            .iter()
2912            .map(|vb| vb.serialized().clone())
2913            .collect::<Vec<_>>();
2914        assert_eq!(bytes5, &expected5);
2915    }
2916
2917    #[tokio::test(flavor = "current_thread")]
2918    async fn test_verify_blocks_deduplication() {
2919        let (context, _keys) = Context::new_for_test(4);
2920        let context = Arc::new(context);
2921        let block_verifier = Arc::new(NoopBlockVerifier {});
2922        let failing_verifier = Arc::new(FailingBlockVerifier);
2923        let peer1 = AuthorityIndex::new_for_test(1);
2924        let peer2 = AuthorityIndex::new_for_test(2);
2925
2926        // Create cache with capacity of 5 for eviction testing
2927        let cache = Arc::new(SyncMutex::new(LruCache::new(NonZeroUsize::new(5).unwrap())));
2928
2929        // Test 1: Per-peer metric tracking
2930        let block1 = VerifiedBlock::new_for_test(TestBlock::new(10, 0).build());
2931        let serialized1 = vec![block1.serialized().clone()];
2932
2933        // Verify from peer1 (cache miss)
2934        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2935            serialized1.clone(), block_verifier.clone(), cache.clone(), &context, peer1, "live",
2936        );
2937        assert_eq!(result.unwrap().len(), 1);
2938
2939        let peer1_hostname = &context.committee.authority(peer1).hostname;
2940        assert_eq!(
2941            context
2942                .metrics
2943                .node_metrics
2944                .synchronizer_skipped_blocks_by_peer
2945                .with_label_values(&[peer1_hostname.as_str(), "live"])
2946                .get(),
2947            0
2948        );
2949
2950        // Verify same block from peer2 with different sync method (cache hit)
2951        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2952            serialized1, block_verifier.clone(), cache.clone(), &context, peer2, "periodic",
2953        );
2954        assert_eq!(result.unwrap().len(), 0, "Should skip cached block");
2955
2956        let peer2_hostname = &context.committee.authority(peer2).hostname;
2957        assert_eq!(
2958            context
2959                .metrics
2960                .node_metrics
2961                .synchronizer_skipped_blocks_by_peer
2962                .with_label_values(&[peer2_hostname.as_str(), "periodic"])
2963                .get(),
2964            1
2965        );
2966
2967        // Test 2: Invalid blocks not cached
2968        let invalid_block = VerifiedBlock::new_for_test(TestBlock::new(20, 0).build());
2969        let invalid_serialized = vec![invalid_block.serialized().clone()];
2970
2971        assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2972            invalid_serialized.clone(), failing_verifier.clone(), cache.clone(), &context, peer1, "test",
2973        ).is_err());
2974        assert_eq!(cache.lock().len(), 1, "Invalid block should not be cached");
2975
2976        // Verify invalid block fails again (not from cache)
2977        assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2978            invalid_serialized, failing_verifier, cache.clone(), &context, peer1, "test",
2979        ).is_err());
2980
2981        // Test 3: Cache eviction
2982        let blocks: Vec<_> = (0..5)
2983            .map(|i| VerifiedBlock::new_for_test(TestBlock::new(30 + i, 0).build()))
2984            .collect();
2985
2986        // Fill cache to capacity
2987        for block in &blocks {
2988            Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2989                vec![block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
2990            ).unwrap();
2991        }
2992        assert_eq!(cache.lock().len(), 5);
2993
2994        // Verify first block is evicted when adding new one
2995        let new_block = VerifiedBlock::new_for_test(TestBlock::new(99, 0).build());
2996        Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
2997            vec![new_block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
2998        ).unwrap();
2999
3000        // First block (block1) should be evicted, so re-verifying it should not be a
3001        // cache hit
3002        let block1_serialized = vec![block1.serialized().clone()];
3003        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3004            block1_serialized, block_verifier.clone(), cache.clone(), &context, peer1, "test",
3005        );
3006        assert_eq!(
3007            result.unwrap().len(),
3008            1,
3009            "Evicted block should be re-verified"
3010        );
3011
3012        // New block should still be in cache
3013        let new_block_serialized = vec![new_block.serialized().clone()];
3014        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3015            new_block_serialized, block_verifier, cache, &context, peer1, "test",
3016        );
3017        assert_eq!(result.unwrap().len(), 0, "New block should be cached");
3018    }
3019}