consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7    num::NonZeroUsize,
8    sync::Arc,
9    time::Duration,
10};
11
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use futures::{StreamExt as _, stream::FuturesUnordered};
15use iota_macros::fail_point_async;
16use iota_metrics::{
17    monitored_future,
18    monitored_mpsc::{Receiver, Sender, channel},
19    monitored_scope,
20};
21use itertools::Itertools as _;
22use lru::LruCache;
23use parking_lot::{Mutex, RwLock};
24#[cfg(not(test))]
25use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
26use tap::TapFallible;
27use tokio::{
28    runtime::Handle,
29    sync::{mpsc::error::TrySendError, oneshot},
30    task::{JoinError, JoinSet},
31    time::{Instant, sleep, sleep_until, timeout},
32};
33use tracing::{debug, error, info, instrument, trace, warn};
34
35use crate::{
36    BlockAPI, CommitIndex, Round,
37    authority_service::COMMIT_LAG_MULTIPLIER,
38    block::{BlockDigest, BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
39    block_verifier::BlockVerifier,
40    commit_vote_monitor::CommitVoteMonitor,
41    context::Context,
42    core_thread::CoreThreadDispatcher,
43    dag_state::DagState,
44    error::{ConsensusError, ConsensusResult},
45    network::NetworkClient,
46    scoring_metrics_store::ErrorSource,
47};
48
49/// The number of concurrent fetch blocks requests per authority
50const FETCH_BLOCKS_CONCURRENCY: usize = 5;
51
52/// The maximal additional blocks (parents) that can be fetched.
53// TODO: This is a temporary value, and should be removed once the protocol
54// version is updated to support batching
55pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
56
57/// The maximum number of verified block references to cache for deduplication.
58const VERIFIED_BLOCKS_CACHE_CAP: usize = 200_000;
59
60/// The timeout for synchronizer to fetch blocks from a given peer authority.
61const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
62
63/// The timeout for periodic synchronizer to fetch blocks from the peers.
64const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
65
66/// The maximum number of authorities from which we will try to periodically
67/// fetch blocks at the same moment. The guard will protect that we will not ask
68/// from more than this number of authorities at the same time.
69const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
70
71/// The maximum number of authorities from which the live synchronizer will try
72/// to fetch blocks at the same moment. This is lower than the periodic sync
73/// limit to prioritize periodic sync.
74const MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK: usize = 1;
75
76/// The maximum number of peers from which the periodic synchronizer will
77/// request blocks
78const MAX_PERIODIC_SYNC_PEERS: usize = 4;
79
80/// The maximum number of peers in the periodic synchronizer which are chosen
81/// totally random to fetch blocks from. The other peers will be chosen based on
82/// their knowledge of the DAG.
83const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
84
85/// Represents the different methods used for synchronization
86#[derive(Clone)]
87enum SyncMethod {
88    Live,
89    Periodic,
90}
91
92struct BlocksGuard {
93    map: Arc<InflightBlocksMap>,
94    block_refs: BTreeSet<BlockRef>,
95    peer: AuthorityIndex,
96    method: SyncMethod,
97}
98
99impl Drop for BlocksGuard {
100    fn drop(&mut self) {
101        self.map.unlock_blocks(&self.block_refs, self.peer);
102    }
103}
104
105// Keeps a mapping between the missing blocks that have been instructed to be
106// fetched and the authorities that are currently fetching them. For a block ref
107// there is a maximum number of authorities that can concurrently fetch it. The
108// authority ids that are currently fetching a block are set on the
109// corresponding `BTreeSet` and basically they act as "locks".
110struct InflightBlocksMap {
111    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
112}
113
114impl InflightBlocksMap {
115    fn new() -> Arc<Self> {
116        Arc::new(Self {
117            inner: Mutex::new(HashMap::new()),
118        })
119    }
120
121    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
122    /// avoid re-fetching the missing blocks from too many authorities at
123    /// the same time, thus we limit the concurrency per block by attempting
124    /// to lock per block. If a block is already fetched by the maximum allowed
125    /// number of authorities, then the block ref will not be included in the
126    /// returned set. The method returns all the block refs that have been
127    /// successfully locked and allowed to be fetched.
128    ///
129    /// Different limits apply based on the sync method:
130    /// - Periodic sync: Can lock if total authorities <
131    ///   MAX_AUTHORITIES_TO_FETCH_PER_BLOCK (3)
132    /// - Live sync: Can lock if total authorities <
133    ///   MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK (2)
134    fn lock_blocks(
135        self: &Arc<Self>,
136        missing_block_refs: BTreeSet<BlockRef>,
137        peer: AuthorityIndex,
138        method: SyncMethod,
139    ) -> Option<BlocksGuard> {
140        let mut blocks = BTreeSet::new();
141        let mut inner = self.inner.lock();
142
143        for block_ref in missing_block_refs {
144            let authorities = inner.entry(block_ref).or_default();
145
146            // Check if this peer is already fetching this block
147            if authorities.contains(&peer) {
148                continue;
149            }
150
151            // Count total authorities currently fetching this block
152            let total_count = authorities.len();
153
154            // Determine the limit based on the sync method
155            let max_limit = match method {
156                SyncMethod::Live => MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK,
157                SyncMethod::Periodic => MAX_AUTHORITIES_TO_FETCH_PER_BLOCK,
158            };
159
160            // Check if we can acquire the lock
161            if total_count < max_limit {
162                assert!(authorities.insert(peer));
163                blocks.insert(block_ref);
164            }
165        }
166
167        if blocks.is_empty() {
168            None
169        } else {
170            Some(BlocksGuard {
171                map: self.clone(),
172                block_refs: blocks,
173                peer,
174                method,
175            })
176        }
177    }
178
179    /// Unlocks the provided block references for the given `peer`. The
180    /// unlocking is strict, meaning that if this method is called for a
181    /// specific block ref and peer more times than the corresponding lock
182    /// has been called, it will panic.
183    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
184        // Now mark all the blocks as fetched from the map
185        let mut blocks_to_fetch = self.inner.lock();
186        for block_ref in block_refs {
187            let authorities = blocks_to_fetch
188                .get_mut(block_ref)
189                .expect("Should have found a non empty map");
190
191            assert!(authorities.remove(&peer), "Peer index should be present!");
192
193            // if the last one then just clean up
194            if authorities.is_empty() {
195                blocks_to_fetch.remove(block_ref);
196            }
197        }
198    }
199
200    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
201    /// and lock now again the referenced block refs. The swap is best
202    /// effort and there is no guarantee that the `peer` will be able to
203    /// acquire the new locks.
204    fn swap_locks(
205        self: &Arc<Self>,
206        blocks_guard: BlocksGuard,
207        peer: AuthorityIndex,
208    ) -> Option<BlocksGuard> {
209        let block_refs = blocks_guard.block_refs.clone();
210        let method = blocks_guard.method.clone();
211
212        // Explicitly drop the guard
213        drop(blocks_guard);
214
215        // Now create a new guard with the same sync method
216        self.lock_blocks(block_refs, peer, method)
217    }
218
219    #[cfg(test)]
220    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
221        let inner = self.inner.lock();
222        inner.len()
223    }
224}
225
226enum Command {
227    FetchBlocks {
228        missing_block_refs: BTreeSet<BlockRef>,
229        peer_index: AuthorityIndex,
230        result: oneshot::Sender<Result<(), ConsensusError>>,
231    },
232    FetchOwnLastBlock,
233    KickOffScheduler,
234}
235
236pub(crate) struct SynchronizerHandle {
237    commands_sender: Sender<Command>,
238    tasks: tokio::sync::Mutex<JoinSet<()>>,
239}
240
241impl SynchronizerHandle {
242    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
243    /// block_refs set - from the peer authority.
244    pub(crate) async fn fetch_blocks(
245        &self,
246        missing_block_refs: BTreeSet<BlockRef>,
247        peer_index: AuthorityIndex,
248    ) -> ConsensusResult<()> {
249        let (sender, receiver) = oneshot::channel();
250        self.commands_sender
251            .send(Command::FetchBlocks {
252                missing_block_refs,
253                peer_index,
254                result: sender,
255            })
256            .await
257            .map_err(|_err| ConsensusError::Shutdown)?;
258        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
259    }
260
261    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
262        let mut tasks = self.tasks.lock().await;
263        tasks.abort_all();
264        while let Some(result) = tasks.join_next().await {
265            result?
266        }
267        Ok(())
268    }
269}
270
271/// `Synchronizer` oversees live block synchronization, crucial for node
272/// progress. Live synchronization refers to the process of retrieving missing
273/// blocks, particularly those essential for advancing a node when data from
274/// only a few rounds is absent. If a node significantly lags behind the
275/// network, `commit_syncer` handles fetching missing blocks via a more
276/// efficient approach. `Synchronizer` aims for swift catch-up employing two
277/// mechanisms:
278///
279/// 1. Explicitly requesting missing blocks from designated authorities via the
280///    "block send" path. This includes attempting to fetch any missing
281///    ancestors necessary for processing a received block. Such requests
282///    prioritize the block author, maximizing the chance of prompt retrieval. A
283///    locking mechanism allows concurrent requests for missing blocks from up
284///    to two authorities simultaneously, enhancing the chances of timely
285///    retrieval. Notably, if additional missing blocks arise during block
286///    processing, requests to the same authority are deferred to the scheduler.
287///
288/// 2. Periodically requesting missing blocks via a scheduler. This primarily
289///    serves to retrieve missing blocks that were not ancestors of a received
290///    block via the "block send" path. The scheduler operates on either a fixed
291///    periodic basis or is triggered immediately after explicit fetches
292///    described in (1), ensuring continued block retrieval if gaps persist.
293///
294/// Additionally to the above, the synchronizer can synchronize and fetch the
295/// last own proposed block from the network peers as best effort approach to
296/// recover node from amnesia and avoid making the node equivocate.
297pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
298    context: Arc<Context>,
299    commands_receiver: Receiver<Command>,
300    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
301    core_dispatcher: Arc<D>,
302    commit_vote_monitor: Arc<CommitVoteMonitor>,
303    dag_state: Arc<RwLock<DagState>>,
304    fetch_blocks_scheduler_task: JoinSet<()>,
305    fetch_own_last_block_task: JoinSet<()>,
306    network_client: Arc<C>,
307    block_verifier: Arc<V>,
308    inflight_blocks_map: Arc<InflightBlocksMap>,
309    verified_blocks_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
310    commands_sender: Sender<Command>,
311}
312
313impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
314    /// Starts the synchronizer, which is responsible for fetching blocks from
315    /// other authorities and managing block synchronization tasks.
316    pub fn start(
317        network_client: Arc<C>,
318        context: Arc<Context>,
319        core_dispatcher: Arc<D>,
320        commit_vote_monitor: Arc<CommitVoteMonitor>,
321        block_verifier: Arc<V>,
322        dag_state: Arc<RwLock<DagState>>,
323        sync_last_known_own_block: bool,
324    ) -> Arc<SynchronizerHandle> {
325        let (commands_sender, commands_receiver) =
326            channel("consensus_synchronizer_commands", 1_000);
327        let inflight_blocks_map = InflightBlocksMap::new();
328        let verified_blocks_cache = Arc::new(Mutex::new(LruCache::new(
329            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
330        )));
331
332        // Spawn the tasks to fetch the blocks from the others
333        let mut fetch_block_senders = BTreeMap::new();
334        let mut tasks = JoinSet::new();
335        for (index, _) in context.committee.authorities() {
336            if index == context.own_index {
337                continue;
338            }
339            let (sender, receiver) =
340                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
341            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
342                index,
343                network_client.clone(),
344                block_verifier.clone(),
345                verified_blocks_cache.clone(),
346                commit_vote_monitor.clone(),
347                context.clone(),
348                core_dispatcher.clone(),
349                dag_state.clone(),
350                receiver,
351                commands_sender.clone(),
352            );
353            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
354            fetch_block_senders.insert(index, sender);
355        }
356
357        let commands_sender_clone = commands_sender.clone();
358
359        if sync_last_known_own_block {
360            commands_sender
361                .try_send(Command::FetchOwnLastBlock)
362                .expect("Failed to sync our last block");
363        }
364
365        // Spawn the task to listen to the requests & periodic runs
366        tasks.spawn(monitored_future!(async move {
367            let mut s = Self {
368                context,
369                commands_receiver,
370                fetch_block_senders,
371                core_dispatcher,
372                commit_vote_monitor,
373                fetch_blocks_scheduler_task: JoinSet::new(),
374                fetch_own_last_block_task: JoinSet::new(),
375                network_client,
376                block_verifier,
377                inflight_blocks_map,
378                verified_blocks_cache,
379                commands_sender: commands_sender_clone,
380                dag_state,
381            };
382            s.run().await;
383        }));
384
385        Arc::new(SynchronizerHandle {
386            commands_sender,
387            tasks: tokio::sync::Mutex::new(tasks),
388        })
389    }
390
391    // The main loop to listen for the submitted commands.
392    async fn run(&mut self) {
393        // We want the synchronizer to run periodically every 200ms to fetch any missing
394        // blocks.
395        const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
396        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
397
398        tokio::pin!(scheduler_timeout);
399
400        loop {
401            tokio::select! {
402                Some(command) = self.commands_receiver.recv() => {
403                    match command {
404                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
405                            if peer_index == self.context.own_index {
406                                error!("We should never attempt to fetch blocks from our own node");
407                                continue;
408                            }
409
410                            let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
411
412                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
413                            // task will take care syncing whatever is leftover.
414                            let missing_block_refs = missing_block_refs
415                                .into_iter()
416                                .take(self.context.parameters.max_blocks_per_sync)
417                                .collect();
418
419                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index, SyncMethod::Live);
420                            let Some(blocks_guard) = blocks_guard else {
421                                result.send(Ok(())).ok();
422                                continue;
423                            };
424
425                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
426                            // synchronization task will handle any still missing blocks in next run.
427                            let r = self
428                                .fetch_block_senders
429                                .get(&peer_index)
430                                .expect("Fatal error, sender should be present")
431                                .try_send(blocks_guard)
432                                .map_err(|err| {
433                                    match err {
434                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
435                                        TrySendError::Closed(_) => ConsensusError::Shutdown
436                                    }
437                                });
438
439                            result.send(r).ok();
440                        }
441                        Command::FetchOwnLastBlock => {
442                            if self.fetch_own_last_block_task.is_empty() {
443                                self.start_fetch_own_last_block_task();
444                            }
445                        }
446                        Command::KickOffScheduler => {
447                            // just reset the scheduler timeout timer to run immediately if not already running.
448                            // If the scheduler is already running then just reduce the remaining time to run.
449                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
450                                Instant::now()
451                            } else {
452                                Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
453                            };
454
455                            // only reset if it is earlier than the next deadline
456                            if timeout < scheduler_timeout.deadline() {
457                                scheduler_timeout.as_mut().reset(timeout);
458                            }
459                        }
460                    }
461                },
462                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
463                    match result {
464                        Ok(()) => {},
465                        Err(e) => {
466                            if e.is_cancelled() {
467                            } else if e.is_panic() {
468                                std::panic::resume_unwind(e.into_panic());
469                            } else {
470                                panic!("fetch our last block task failed: {e}");
471                            }
472                        },
473                    };
474                },
475                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
476                    match result {
477                        Ok(()) => {},
478                        Err(e) => {
479                            if e.is_cancelled() {
480                            } else if e.is_panic() {
481                                std::panic::resume_unwind(e.into_panic());
482                            } else {
483                                panic!("fetch blocks scheduler task failed: {e}");
484                            }
485                        },
486                    };
487                },
488                () = &mut scheduler_timeout => {
489                    // we want to start a new task only if the previous one has already finished.
490                    if self.fetch_blocks_scheduler_task.is_empty() {
491                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
492                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
493                            return;
494                        };
495                    }
496
497                    scheduler_timeout
498                        .as_mut()
499                        .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
500                }
501            }
502        }
503    }
504
505    async fn fetch_blocks_from_authority(
506        peer_index: AuthorityIndex,
507        network_client: Arc<C>,
508        block_verifier: Arc<V>,
509        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
510        commit_vote_monitor: Arc<CommitVoteMonitor>,
511        context: Arc<Context>,
512        core_dispatcher: Arc<D>,
513        dag_state: Arc<RwLock<DagState>>,
514        mut receiver: Receiver<BlocksGuard>,
515        commands_sender: Sender<Command>,
516    ) {
517        const MAX_RETRIES: u32 = 3;
518        let peer_hostname = &context.committee.authority(peer_index).hostname;
519
520        let mut requests = FuturesUnordered::new();
521
522        loop {
523            tokio::select! {
524                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
525                    // get the highest accepted rounds
526                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
527
528                    // Record metrics for live synchronizer requests
529                    let metrics = &context.metrics.node_metrics;
530                    metrics
531                        .synchronizer_requested_blocks_by_peer
532                        .with_label_values(&[peer_hostname.as_str(), "live"])
533                        .inc_by(blocks_guard.block_refs.len() as u64);
534                    // Count requested blocks per authority and increment metric by one per authority
535                    let mut authors = HashSet::new();
536                    for block_ref in &blocks_guard.block_refs {
537                        authors.insert(block_ref.author);
538                    }
539                    for author in authors {
540                        let host = &context.committee.authority(author).hostname;
541                        metrics
542                            .synchronizer_requested_blocks_by_authority
543                            .with_label_values(&[host.as_str(), "live"])
544                            .inc();
545                    }
546
547                    requests.push(Self::fetch_blocks_request(
548                        network_client.clone(),
549                        peer_index,
550                        blocks_guard,
551                        highest_rounds,
552                        FETCH_REQUEST_TIMEOUT,
553                        1,
554                    ))
555                },
556                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
557                    match response {
558                        Ok(blocks) => {
559                            if let Err(err) = Self::process_fetched_blocks(blocks,
560                                peer_index,
561                                blocks_guard,
562                                core_dispatcher.clone(),
563                                block_verifier.clone(),
564                                verified_cache.clone(),
565                                commit_vote_monitor.clone(),
566                                context.clone(),
567                                commands_sender.clone(),
568                                "live"
569                            ).await {
570                                context.scoring_metrics_store.update_scoring_metrics_on_block_receival(
571                                    peer_index,
572                                    peer_hostname,
573                                    err.clone(),
574                                    ErrorSource::Synchronizer,
575                                    &context.metrics.node_metrics,
576                                );
577                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
578                                context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
579                            }
580                        },
581                        Err(_) => {
582                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
583                            if retries <= MAX_RETRIES {
584                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
585                            } else {
586                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
587                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
588                                drop(blocks_guard);
589                            }
590                        }
591                    }
592                },
593                else => {
594                    info!("Fetching blocks from authority {peer_index} task will now abort.");
595                    break;
596                }
597            }
598        }
599    }
600
601    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
602    /// error is returned then the verified blocks are immediately sent to
603    /// Core for processing.
604    async fn process_fetched_blocks(
605        mut serialized_blocks: Vec<Bytes>,
606        peer_index: AuthorityIndex,
607        requested_blocks_guard: BlocksGuard,
608        core_dispatcher: Arc<D>,
609        block_verifier: Arc<V>,
610        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
611        commit_vote_monitor: Arc<CommitVoteMonitor>,
612        context: Arc<Context>,
613        commands_sender: Sender<Command>,
614        sync_method: &str,
615    ) -> ConsensusResult<()> {
616        if serialized_blocks.is_empty() {
617            return Ok(());
618        }
619        let _s = context
620            .metrics
621            .node_metrics
622            .scope_processing_time
623            .with_label_values(&["Synchronizer::process_fetched_blocks"])
624            .start_timer();
625
626        // Limit the number of the returned blocks processed.
627        if context.protocol_config.consensus_batched_block_sync() {
628            serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
629        } else {
630            // Ensure that all the returned blocks do not go over the total max allowed
631            // returned blocks
632            if serialized_blocks.len()
633                > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
634            {
635                return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
636            }
637        }
638
639        // Verify all the fetched blocks
640        let blocks = Handle::current()
641            .spawn_blocking({
642                let block_verifier = block_verifier.clone();
643                let verified_cache = verified_cache.clone();
644                let context = context.clone();
645                let sync_method = sync_method.to_string();
646                move || {
647                    Self::verify_blocks(
648                        serialized_blocks,
649                        block_verifier,
650                        verified_cache,
651                        &context,
652                        peer_index,
653                        &sync_method,
654                    )
655                }
656            })
657            .await
658            .expect("Spawn blocking should not fail")?;
659
660        if !context.protocol_config.consensus_batched_block_sync() {
661            // Get all the ancestors of the requested blocks only
662            let ancestors = blocks
663                .iter()
664                .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
665                .flat_map(|b| b.ancestors().to_vec())
666                .collect::<BTreeSet<BlockRef>>();
667
668            // Now confirm that the blocks are either between the ones requested, or they
669            // are parents of the requested blocks
670            for block in &blocks {
671                if !requested_blocks_guard
672                    .block_refs
673                    .contains(&block.reference())
674                    && !ancestors.contains(&block.reference())
675                {
676                    return Err(ConsensusError::UnexpectedFetchedBlock {
677                        index: peer_index,
678                        block_ref: block.reference(),
679                    });
680                }
681            }
682        }
683
684        // Record commit votes from the verified blocks.
685        for block in &blocks {
686            commit_vote_monitor.observe_block(block);
687        }
688
689        let metrics = &context.metrics.node_metrics;
690        let peer_hostname = &context.committee.authority(peer_index).hostname;
691        metrics
692            .synchronizer_fetched_blocks_by_peer
693            .with_label_values(&[peer_hostname.as_str(), sync_method])
694            .inc_by(blocks.len() as u64);
695        for block in &blocks {
696            let block_hostname = &context.committee.authority(block.author()).hostname;
697            metrics
698                .synchronizer_fetched_blocks_by_authority
699                .with_label_values(&[block_hostname.as_str(), sync_method])
700                .inc();
701        }
702
703        debug!(
704            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
705            blocks.len(),
706            blocks.iter().map(|b| b.reference().to_string()).join(", "),
707        );
708
709        // Now send them to core for processing. Ignore the returned missing blocks as
710        // we don't want this mechanism to keep feedback looping on fetching
711        // more blocks. The periodic synchronization will take care of that.
712        let missing_blocks = core_dispatcher
713            .add_blocks(blocks)
714            .await
715            .map_err(|_| ConsensusError::Shutdown)?;
716
717        // now release all the locked blocks as they have been fetched, verified &
718        // processed
719        drop(requested_blocks_guard);
720
721        // kick off immediately the scheduled synchronizer
722        if !missing_blocks.is_empty() {
723            // do not block here, so we avoid any possible cycles.
724            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
725            {
726                warn!("Commands channel is full")
727            }
728        }
729
730        context
731            .metrics
732            .node_metrics
733            .missing_blocks_after_fetch_total
734            .inc_by(missing_blocks.len() as u64);
735
736        Ok(())
737    }
738
739    fn get_highest_accepted_rounds(
740        dag_state: Arc<RwLock<DagState>>,
741        context: &Arc<Context>,
742    ) -> Vec<Round> {
743        let blocks = dag_state
744            .read()
745            .get_last_cached_block_per_authority(Round::MAX);
746        assert_eq!(blocks.len(), context.committee.size());
747
748        blocks
749            .into_iter()
750            .map(|(block, _)| block.round())
751            .collect::<Vec<_>>()
752    }
753
754    #[instrument(level = "trace", skip_all)]
755    fn verify_blocks(
756        serialized_blocks: Vec<Bytes>,
757        block_verifier: Arc<V>,
758        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
759        context: &Context,
760        peer_index: AuthorityIndex,
761        sync_method: &str,
762    ) -> ConsensusResult<Vec<VerifiedBlock>> {
763        let mut verified_blocks = Vec::new();
764        let mut skipped_count = 0u64;
765
766        for serialized_block in serialized_blocks {
767            let block_digest = VerifiedBlock::compute_digest(&serialized_block);
768
769            // Check if this block has already been verified
770            if verified_cache.lock().get(&block_digest).is_some() {
771                skipped_count += 1;
772                continue; // Skip already verified blocks
773            }
774
775            let signed_block: SignedBlock =
776                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
777
778            if let Err(e) = block_verifier.verify(&signed_block) {
779                // TODO: we might want to use a different metric to track the invalid "served"
780                // blocks from the invalid "proposed" ones.
781                let hostname = context.committee.authority(peer_index).hostname.clone();
782
783                context
784                    .metrics
785                    .node_metrics
786                    .invalid_blocks
787                    .with_label_values(&[hostname.as_str(), "synchronizer", e.name()])
788                    .inc();
789                warn!("Invalid block received from {}: {}", peer_index, e);
790                return Err(e);
791            }
792
793            // Add block to verified cache after successful verification
794            verified_cache.lock().put(block_digest, ());
795
796            let verified_block = VerifiedBlock::new_verified_with_digest(
797                signed_block,
798                serialized_block,
799                block_digest,
800            );
801
802            // Dropping is ok because the block will be refetched.
803            // TODO: improve efficiency, maybe suspend and continue processing the block
804            // asynchronously.
805            let now = context.clock.timestamp_utc_ms();
806            let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
807            if drift > 0 {
808                let peer_hostname = &context
809                    .committee
810                    .authority(verified_block.author())
811                    .hostname;
812                context
813                    .metrics
814                    .node_metrics
815                    .block_timestamp_drift_ms
816                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
817                    .inc_by(drift);
818
819                if context
820                    .protocol_config
821                    .consensus_median_timestamp_with_checkpoint_enforcement()
822                {
823                    trace!(
824                        "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
825                        verified_block.reference(),
826                        verified_block.timestamp_ms(),
827                        now
828                    );
829                } else {
830                    warn!(
831                        "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
832                        verified_block.reference(),
833                        verified_block.timestamp_ms(),
834                        now
835                    );
836                    continue;
837                }
838            }
839
840            verified_blocks.push(verified_block);
841        }
842
843        // Record skipped blocks metric
844        if skipped_count > 0 {
845            let peer_hostname = &context.committee.authority(peer_index).hostname;
846            context
847                .metrics
848                .node_metrics
849                .synchronizer_skipped_blocks_by_peer
850                .with_label_values(&[peer_hostname.as_str(), sync_method])
851                .inc_by(skipped_count);
852        }
853
854        Ok(verified_blocks)
855    }
856
857    async fn fetch_blocks_request(
858        network_client: Arc<C>,
859        peer: AuthorityIndex,
860        blocks_guard: BlocksGuard,
861        highest_rounds: Vec<Round>,
862        request_timeout: Duration,
863        mut retries: u32,
864    ) -> (
865        ConsensusResult<Vec<Bytes>>,
866        BlocksGuard,
867        u32,
868        AuthorityIndex,
869        Vec<Round>,
870    ) {
871        let start = Instant::now();
872        let resp = timeout(
873            request_timeout,
874            network_client.fetch_blocks(
875                peer,
876                blocks_guard
877                    .block_refs
878                    .clone()
879                    .into_iter()
880                    .collect::<Vec<_>>(),
881                highest_rounds.clone(),
882                request_timeout,
883            ),
884        )
885        .await;
886
887        fail_point_async!("consensus-delay");
888
889        let resp = match resp {
890            Ok(Err(err)) => {
891                // Add a delay before retrying - if that is needed. If request has timed out
892                // then eventually this will be a no-op.
893                sleep_until(start + request_timeout).await;
894                retries += 1;
895                Err(err)
896            } // network error
897            Err(err) => {
898                // timeout
899                sleep_until(start + request_timeout).await;
900                retries += 1;
901                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
902            }
903            Ok(result) => result,
904        };
905        (resp, blocks_guard, retries, peer, highest_rounds)
906    }
907
908    fn start_fetch_own_last_block_task(&mut self) {
909        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
910        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
911
912        let context = self.context.clone();
913        let dag_state = self.dag_state.clone();
914        let network_client = self.network_client.clone();
915        let block_verifier = self.block_verifier.clone();
916        let core_dispatcher = self.core_dispatcher.clone();
917
918        self.fetch_own_last_block_task
919            .spawn(monitored_future!(async move {
920                let _scope = monitored_scope("FetchOwnLastBlockTask");
921
922                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
923                    let network_client_cloned = network_client.clone();
924                    let own_index = context.own_index;
925                    async move {
926                        sleep(fetch_own_block_delay).await;
927                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
928                        (r, authority_index)
929                    }
930                };
931
932                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
933                                    let mut result = Vec::new();
934                                    for serialized_block in blocks {
935                                        let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
936                                        block_verifier.verify(&signed_block).tap_err(|err|{
937                                            let hostname = context.committee.authority(authority_index).hostname.clone();
938                                            context
939                                                .metrics
940                                                .node_metrics
941                                                .invalid_blocks
942                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
943                                                .inc();
944                                            warn!("Invalid block received from {}: {}", authority_index, err);
945                                        })?;
946
947                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
948                                        if verified_block.author() != context.own_index {
949                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
950                                        }
951                                        result.push(verified_block);
952                                    }
953                                    Ok(result)
954                };
955
956                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
957                let mut highest_round = GENESIS_ROUND;
958                // Keep track of the received responses to avoid fetching the own block header from same peer
959                let mut received_response = vec![false; context.committee.size()];
960                // Assume that our node is not Byzantine
961                received_response[context.own_index] = true;
962                let mut total_stake = context.committee.stake(context.own_index);
963                let mut retries = 0;
964                let mut retry_delay_step = Duration::from_millis(500);
965                'main:loop {
966                    if context.committee.size() == 1 {
967                        highest_round = dag_state.read().get_last_proposed_block().round();
968                        info!("Only one node in the network, will not try fetching own last block from peers.");
969                        break 'main;
970                    }
971
972                    // Ask all the other peers about our last block
973                    let mut results = FuturesUnordered::new();
974
975                    for (authority_index, _authority) in context.committee.authorities() {
976                        // Skip our own index and the ones that have already responded
977                        if !received_response[authority_index] {
978                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
979                        }
980                    }
981
982                    // Gather the results but wait to timeout as well
983                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
984                    tokio::pin!(timer);
985
986                    'inner: loop {
987                        tokio::select! {
988                            result = results.next() => {
989                                let Some((result, authority_index)) = result else {
990                                    break 'inner;
991                                };
992                                match result {
993                                    Ok(result) => {
994                                        match process_blocks(result, authority_index) {
995                                            Ok(blocks) => {
996                                                received_response[authority_index] = true;
997                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
998                                                highest_round = highest_round.max(max_round);
999
1000                                                total_stake += context.committee.stake(authority_index);
1001                                            },
1002                                            Err(err) => {
1003                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
1004                                            }
1005                                        }
1006                                    },
1007                                    Err(err) => {
1008                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
1009                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
1010                                    }
1011                                }
1012                            },
1013                            () = &mut timer => {
1014                                info!("Timeout while trying to sync our own last block from peers");
1015                                break 'inner;
1016                            }
1017                        }
1018                    }
1019
1020                    // Request at least a quorum of 2f+1 stake to have replied back.
1021                    if context.committee.reached_quorum(total_stake) {
1022                        info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1023                        break 'main;
1024                    } else {
1025                        info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1026                    }
1027
1028                    retries += 1;
1029                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
1030                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
1031
1032                    sleep(retry_delay_step).await;
1033
1034                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
1035                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
1036                }
1037
1038                // Update the Core with the highest detected round
1039                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
1040
1041                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
1042                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
1043                }
1044            }));
1045    }
1046
1047    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
1048        let mut missing_blocks = self
1049            .core_dispatcher
1050            .get_missing_blocks()
1051            .await
1052            .map_err(|_err| ConsensusError::Shutdown)?;
1053
1054        // No reason to kick off the scheduler if there are no missing blocks to fetch
1055        if missing_blocks.is_empty() {
1056            return Ok(());
1057        }
1058
1059        let context = self.context.clone();
1060        let network_client = self.network_client.clone();
1061        let block_verifier = self.block_verifier.clone();
1062        let verified_cache = self.verified_blocks_cache.clone();
1063        let commit_vote_monitor = self.commit_vote_monitor.clone();
1064        let core_dispatcher = self.core_dispatcher.clone();
1065        let blocks_to_fetch = self.inflight_blocks_map.clone();
1066        let commands_sender = self.commands_sender.clone();
1067        let dag_state = self.dag_state.clone();
1068
1069        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
1070        trace!(
1071            "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
1072        );
1073        if commit_lagging {
1074            // If gc is enabled and we are commit lagging, then we don't want to enable the
1075            // scheduler. As the new logic of processing the certified commits
1076            // takes place we are guaranteed that commits will happen for all the certified
1077            // commits.
1078            if dag_state.read().gc_enabled() {
1079                return Ok(());
1080            }
1081
1082            // As node is commit lagging try to sync only the missing blocks that are within
1083            // the acceptable round thresholds to sync. The rest we don't attempt to
1084            // sync yet.
1085            let highest_accepted_round = dag_state.read().highest_accepted_round();
1086            missing_blocks = missing_blocks
1087                .into_iter()
1088                .take_while(|(block_ref, _)| {
1089                    block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
1090                })
1091                .collect::<BTreeMap<_, _>>();
1092
1093            // If no missing blocks are within the acceptable thresholds to sync while we
1094            // commit lag, then we disable the scheduler completely for this run.
1095            if missing_blocks.is_empty() {
1096                trace!(
1097                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1098                );
1099                self.context
1100                    .metrics
1101                    .node_metrics
1102                    .fetch_blocks_scheduler_skipped
1103                    .with_label_values(&["commit_lagging"])
1104                    .inc();
1105                return Ok(());
1106            }
1107        }
1108
1109        self.fetch_blocks_scheduler_task
1110            .spawn(monitored_future!(async move {
1111                let _scope = monitored_scope("FetchMissingBlocksScheduler");
1112
1113                context
1114                    .metrics
1115                    .node_metrics
1116                    .fetch_blocks_scheduler_inflight
1117                    .inc();
1118                let total_requested = missing_blocks.len();
1119
1120                fail_point_async!("consensus-delay");
1121
1122                // Fetch blocks from peers
1123                let results = Self::fetch_blocks_from_authorities(
1124                    context.clone(),
1125                    blocks_to_fetch.clone(),
1126                    network_client,
1127                    missing_blocks,
1128                    dag_state,
1129                )
1130                .await;
1131                context
1132                    .metrics
1133                    .node_metrics
1134                    .fetch_blocks_scheduler_inflight
1135                    .dec();
1136                if results.is_empty() {
1137                    warn!("No results returned while requesting missing blocks");
1138                    return;
1139                }
1140
1141                // Now process the returned results
1142                let mut total_fetched = 0;
1143                for (blocks_guard, fetched_blocks, peer) in results {
1144                    total_fetched += fetched_blocks.len();
1145
1146                    if let Err(err) = Self::process_fetched_blocks(
1147                        fetched_blocks,
1148                        peer,
1149                        blocks_guard,
1150                        core_dispatcher.clone(),
1151                        block_verifier.clone(),
1152                        verified_cache.clone(),
1153                        commit_vote_monitor.clone(),
1154                        context.clone(),
1155                        commands_sender.clone(),
1156                        "periodic",
1157                    )
1158                    .await
1159                    {
1160                        warn!(
1161                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
1162                        );
1163                    }
1164                }
1165
1166                debug!(
1167                    "Total blocks requested to fetch: {}, total fetched: {}",
1168                    total_requested, total_fetched
1169                );
1170            }));
1171        Ok(())
1172    }
1173
1174    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1175        let last_commit_index = self.dag_state.read().last_commit_index();
1176        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1177        let commit_threshold = last_commit_index
1178            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1179        (
1180            commit_threshold < quorum_commit_index,
1181            last_commit_index,
1182            quorum_commit_index,
1183        )
1184    }
1185
1186    /// The number of rounds above the highest accepted round to allow fetching
1187    /// missing blocks via the periodic synchronization. Any missing blocks
1188    /// of higher rounds are considered too far in the future to fetch. This
1189    /// property is used only when it's detected that the node has fallen
1190    /// behind on its commit compared to the rest of the network,
1191    /// otherwise scheduler will attempt to fetch any missing block.
1192    fn missing_block_round_threshold(&self) -> Round {
1193        self.context.parameters.commit_sync_batch_size
1194    }
1195
1196    /// Fetches the given `missing_blocks` from up to `MAX_PEERS` authorities in
1197    /// parallel:
1198    ///
1199    /// Randomly select `MAX_PEERS - MAX_RANDOM_PEERS` peers from those who
1200    /// are known to hold some missing block, requesting up to
1201    /// `MAX_BLOCKS_PER_FETCH` block refs per peer.
1202    ///
1203    /// Randomly select `MAX_RANDOM_PEERS` additional peers from the
1204    ///  committee (excluding self and those already selected).
1205    ///
1206    /// The method returns a vector with the fetched blocks from each peer that
1207    /// successfully responded and any corresponding additional ancestor blocks.
1208    /// Each element of the vector is a tuple which contains the requested
1209    /// missing block refs, the returned blocks and the peer authority index.
1210    async fn fetch_blocks_from_authorities(
1211        context: Arc<Context>,
1212        inflight_blocks: Arc<InflightBlocksMap>,
1213        network_client: Arc<C>,
1214        missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1215        dag_state: Arc<RwLock<DagState>>,
1216    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1217        // Step 1: Map authorities to missing blocks that they are aware of
1218        let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1219        for (missing_block_ref, authorities) in &missing_blocks {
1220            for author in authorities {
1221                if author == &context.own_index {
1222                    // Skip our own index as we don't want to fetch blocks from ourselves
1223                    continue;
1224                }
1225                authority_to_blocks
1226                    .entry(*author)
1227                    .or_default()
1228                    .push(*missing_block_ref);
1229            }
1230        }
1231
1232        // Step 2: Choose at most MAX_PEERS-MAX_RANDOM_PEERS peers from those who are
1233        // aware of some missing blocks
1234
1235        #[cfg(not(test))]
1236        let mut rng = StdRng::from_entropy();
1237
1238        // Randomly pick up MAX_PEERS - MAX_RANDOM_PEERS authorities that are aware of
1239        // missing blocks
1240        #[cfg(not(test))]
1241        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1242            authority_to_blocks
1243                .iter()
1244                .choose_multiple(
1245                    &mut rng,
1246                    MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1247                )
1248                .into_iter()
1249                .map(|(&peer, blocks)| {
1250                    let limited_blocks = blocks
1251                        .iter()
1252                        .copied()
1253                        .take(context.parameters.max_blocks_per_sync)
1254                        .collect();
1255                    (peer, limited_blocks, "periodic_known")
1256                })
1257                .collect();
1258        #[cfg(test)]
1259        // Deterministically pick the smallest (MAX_PEERS - MAX_RANDOM_PEERS) authority indices
1260        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1261            let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1262                .iter()
1263                .map(|(&peer, blocks)| {
1264                    let limited_blocks = blocks
1265                        .iter()
1266                        .copied()
1267                        .take(context.parameters.max_blocks_per_sync)
1268                        .collect();
1269                    (peer, limited_blocks, "periodic_known")
1270                })
1271                .collect();
1272            // Sort by AuthorityIndex (natural order), then take the first MAX_PEERS -
1273            // MAX_RANDOM_PEERS
1274            items.sort_by_key(|(peer, _, _)| *peer);
1275            items
1276                .into_iter()
1277                .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1278                .collect()
1279        };
1280
1281        // Step 3: Choose at most two random peers not known to be aware of the missing
1282        // blocks
1283        let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1284            .iter()
1285            .map(|(peer, _, _)| *peer)
1286            .collect();
1287
1288        let random_candidates: Vec<_> = context
1289            .committee
1290            .authorities()
1291            .filter_map(|(peer_index, _)| {
1292                (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1293                    .then_some(peer_index)
1294            })
1295            .collect();
1296        #[cfg(test)]
1297        let random_peers: Vec<AuthorityIndex> = random_candidates
1298            .into_iter()
1299            .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1300            .collect();
1301        #[cfg(not(test))]
1302        let random_peers: Vec<AuthorityIndex> = random_candidates
1303            .into_iter()
1304            .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1305
1306        #[cfg_attr(test, allow(unused_mut))]
1307        let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1308        // Shuffle the missing blocks in case the first ones are blocked by irresponsive
1309        // peers
1310        #[cfg(not(test))]
1311        all_missing_blocks.shuffle(&mut rng);
1312
1313        let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1314
1315        for peer in random_peers {
1316            if let Some(chunk) = block_chunks.next() {
1317                chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1318            } else {
1319                break;
1320            }
1321        }
1322
1323        let mut request_futures = FuturesUnordered::new();
1324
1325        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1326
1327        // Record the missing blocks per authority for metrics
1328        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1329        for block in &all_missing_blocks {
1330            missing_blocks_per_authority[block.author] += 1;
1331        }
1332        for (missing, (_, authority)) in missing_blocks_per_authority
1333            .into_iter()
1334            .zip(context.committee.authorities())
1335        {
1336            context
1337                .metrics
1338                .node_metrics
1339                .synchronizer_missing_blocks_by_authority
1340                .with_label_values(&[&authority.hostname])
1341                .inc_by(missing as u64);
1342            context
1343                .metrics
1344                .node_metrics
1345                .synchronizer_current_missing_blocks_by_authority
1346                .with_label_values(&[&authority.hostname])
1347                .set(missing as i64);
1348        }
1349
1350        // Look at peers that were not chosen yet and try to fetch blocks from them if
1351        // needed later
1352        #[cfg_attr(test, expect(unused_mut))]
1353        let mut remaining_peers: Vec<_> = context
1354            .committee
1355            .authorities()
1356            .filter_map(|(peer_index, _)| {
1357                if peer_index != context.own_index
1358                    && !chosen_peers_with_blocks
1359                        .iter()
1360                        .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1361                {
1362                    Some(peer_index)
1363                } else {
1364                    None
1365                }
1366            })
1367            .collect();
1368
1369        #[cfg(not(test))]
1370        remaining_peers.shuffle(&mut rng);
1371        let mut remaining_peers = remaining_peers.into_iter();
1372
1373        // Send the initial requests
1374        for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1375            let peer_hostname = &context.committee.authority(peer).hostname;
1376            let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1377
1378            // Lock the blocks to be fetched. If no lock can be acquired for any of the
1379            // blocks then don't bother.
1380            if let Some(blocks_guard) =
1381                inflight_blocks.lock_blocks(block_refs.clone(), peer, SyncMethod::Periodic)
1382            {
1383                info!(
1384                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1385                    block_refs.len(),
1386                    peer,
1387                    peer_hostname,
1388                    block_refs
1389                        .iter()
1390                        .map(|b| b.to_string())
1391                        .collect::<Vec<_>>()
1392                        .join(", ")
1393                );
1394                // Record metrics about requested blocks
1395                let metrics = &context.metrics.node_metrics;
1396                metrics
1397                    .synchronizer_requested_blocks_by_peer
1398                    .with_label_values(&[peer_hostname.as_str(), label])
1399                    .inc_by(block_refs.len() as u64);
1400                for block_ref in &block_refs {
1401                    let block_hostname = &context.committee.authority(block_ref.author).hostname;
1402                    metrics
1403                        .synchronizer_requested_blocks_by_authority
1404                        .with_label_values(&[block_hostname.as_str(), label])
1405                        .inc();
1406                }
1407                request_futures.push(Self::fetch_blocks_request(
1408                    network_client.clone(),
1409                    peer,
1410                    blocks_guard,
1411                    highest_rounds.clone(),
1412                    FETCH_REQUEST_TIMEOUT,
1413                    1,
1414                ));
1415            }
1416        }
1417
1418        let mut results = Vec::new();
1419        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1420
1421        tokio::pin!(fetcher_timeout);
1422
1423        loop {
1424            tokio::select! {
1425                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1426                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1427                    match response {
1428                        Ok(fetched_blocks) => {
1429                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1430                            results.push((blocks_guard, fetched_blocks, peer_index));
1431
1432                            // no more pending requests are left, just break the loop
1433                            if request_futures.is_empty() {
1434                                break;
1435                            }
1436                        },
1437                        Err(_) => {
1438                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1439                            // try again if there is any peer left
1440                            if let Some(next_peer) = remaining_peers.next() {
1441                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1442                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1443                                    info!(
1444                                        "Retrying syncing {} missing blocks from peer {}: {}",
1445                                        blocks_guard.block_refs.len(),
1446                                        peer_hostname,
1447                                        blocks_guard.block_refs
1448                                            .iter()
1449                                            .map(|b| b.to_string())
1450                                            .collect::<Vec<_>>()
1451                                            .join(", ")
1452                                    );
1453                                    let block_refs = blocks_guard.block_refs.clone();
1454                                    // Record metrics about requested blocks
1455                                    let metrics = &context.metrics.node_metrics;
1456                                    metrics
1457                                        .synchronizer_requested_blocks_by_peer
1458                                        .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1459                                        .inc_by(block_refs.len() as u64);
1460                                    for block_ref in &block_refs {
1461                                        let block_hostname =
1462                                            &context.committee.authority(block_ref.author).hostname;
1463                                        metrics
1464                                            .synchronizer_requested_blocks_by_authority
1465                                            .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1466                                            .inc();
1467                                    }
1468                                    request_futures.push(Self::fetch_blocks_request(
1469                                        network_client.clone(),
1470                                        next_peer,
1471                                        blocks_guard,
1472                                        highest_rounds,
1473                                        FETCH_REQUEST_TIMEOUT,
1474                                        1,
1475                                    ));
1476                                } else {
1477                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1478                                }
1479                            } else {
1480                                debug!("No more peers left to fetch blocks");
1481                            }
1482                        }
1483                    }
1484                },
1485                _ = &mut fetcher_timeout => {
1486                    debug!("Timed out while fetching missing blocks");
1487                    break;
1488                }
1489            }
1490        }
1491
1492        results
1493    }
1494}
1495
1496#[cfg(test)]
1497mod tests {
1498    use std::{
1499        collections::{BTreeMap, BTreeSet},
1500        num::NonZeroUsize,
1501        sync::Arc,
1502        time::Duration,
1503    };
1504
1505    use async_trait::async_trait;
1506    use bytes::Bytes;
1507    use consensus_config::{AuthorityIndex, Parameters};
1508    use iota_metrics::monitored_mpsc;
1509    use lru::LruCache;
1510    use parking_lot::{Mutex as SyncMutex, RwLock};
1511    use tokio::{sync::Mutex, time::sleep};
1512
1513    use crate::{
1514        CommitDigest, CommitIndex,
1515        authority_service::COMMIT_LAG_MULTIPLIER,
1516        block::{BlockDigest, BlockRef, Round, SignedBlock, TestBlock, VerifiedBlock},
1517        block_verifier::{BlockVerifier, NoopBlockVerifier},
1518        commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1519        commit_vote_monitor::CommitVoteMonitor,
1520        context::Context,
1521        core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1522        dag_state::DagState,
1523        error::{ConsensusError, ConsensusResult},
1524        network::{BlockStream, NetworkClient},
1525        round_prober::QuorumRound,
1526        storage::mem_store::MemStore,
1527        synchronizer::{
1528            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, SyncMethod,
1529            Synchronizer, VERIFIED_BLOCKS_CACHE_CAP,
1530        },
1531    };
1532
1533    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1534    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1535    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1536    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1537
1538    // Mock verifier that always fails verification
1539    struct FailingBlockVerifier;
1540
1541    impl BlockVerifier for FailingBlockVerifier {
1542        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1543            Err(ConsensusError::WrongEpoch {
1544                expected: 1,
1545                actual: 0,
1546            })
1547        }
1548
1549        fn check_ancestors(
1550            &self,
1551            _block: &VerifiedBlock,
1552            _ancestors: &[Option<VerifiedBlock>],
1553            _gc_enabled: bool,
1554            _gc_round: Round,
1555        ) -> ConsensusResult<()> {
1556            Ok(())
1557        }
1558    }
1559
1560    #[derive(Default)]
1561    struct MockNetworkClient {
1562        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1563        fetch_latest_blocks_requests:
1564            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1565    }
1566
1567    impl MockNetworkClient {
1568        async fn stub_fetch_blocks(
1569            &self,
1570            blocks: Vec<VerifiedBlock>,
1571            peer: AuthorityIndex,
1572            latency: Option<Duration>,
1573        ) {
1574            let mut lock = self.fetch_blocks_requests.lock().await;
1575            let block_refs = blocks
1576                .iter()
1577                .map(|block| block.reference())
1578                .collect::<Vec<_>>();
1579            lock.insert((block_refs, peer), (blocks, latency));
1580        }
1581
1582        async fn stub_fetch_latest_blocks(
1583            &self,
1584            blocks: Vec<VerifiedBlock>,
1585            peer: AuthorityIndex,
1586            authorities: Vec<AuthorityIndex>,
1587            latency: Option<Duration>,
1588        ) {
1589            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1590            lock.entry((peer, authorities))
1591                .or_default()
1592                .push((blocks, latency));
1593        }
1594
1595        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1596            let lock = self.fetch_latest_blocks_requests.lock().await;
1597            lock.len()
1598        }
1599    }
1600
1601    #[async_trait]
1602    impl NetworkClient for MockNetworkClient {
1603        const SUPPORT_STREAMING: bool = false;
1604
1605        async fn send_block(
1606            &self,
1607            _peer: AuthorityIndex,
1608            _serialized_block: &VerifiedBlock,
1609            _timeout: Duration,
1610        ) -> ConsensusResult<()> {
1611            unimplemented!("Unimplemented")
1612        }
1613
1614        async fn subscribe_blocks(
1615            &self,
1616            _peer: AuthorityIndex,
1617            _last_received: Round,
1618            _timeout: Duration,
1619        ) -> ConsensusResult<BlockStream> {
1620            unimplemented!("Unimplemented")
1621        }
1622
1623        async fn fetch_blocks(
1624            &self,
1625            peer: AuthorityIndex,
1626            block_refs: Vec<BlockRef>,
1627            _highest_accepted_rounds: Vec<Round>,
1628            _timeout: Duration,
1629        ) -> ConsensusResult<Vec<Bytes>> {
1630            let mut lock = self.fetch_blocks_requests.lock().await;
1631            let response = lock
1632                .remove(&(block_refs, peer))
1633                .expect("Unexpected fetch blocks request made");
1634
1635            let serialised = response
1636                .0
1637                .into_iter()
1638                .map(|block| block.serialized().clone())
1639                .collect::<Vec<_>>();
1640
1641            drop(lock);
1642
1643            if let Some(latency) = response.1 {
1644                sleep(latency).await;
1645            }
1646
1647            Ok(serialised)
1648        }
1649
1650        async fn fetch_commits(
1651            &self,
1652            _peer: AuthorityIndex,
1653            _commit_range: CommitRange,
1654            _timeout: Duration,
1655        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1656            unimplemented!("Unimplemented")
1657        }
1658
1659        async fn fetch_latest_blocks(
1660            &self,
1661            peer: AuthorityIndex,
1662            authorities: Vec<AuthorityIndex>,
1663            _timeout: Duration,
1664        ) -> ConsensusResult<Vec<Bytes>> {
1665            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1666            let mut responses = lock
1667                .remove(&(peer, authorities.clone()))
1668                .expect("Unexpected fetch blocks request made");
1669
1670            let response = responses.remove(0);
1671            let serialised = response
1672                .0
1673                .into_iter()
1674                .map(|block| block.serialized().clone())
1675                .collect::<Vec<_>>();
1676
1677            if !responses.is_empty() {
1678                lock.insert((peer, authorities), responses);
1679            }
1680
1681            drop(lock);
1682
1683            if let Some(latency) = response.1 {
1684                sleep(latency).await;
1685            }
1686
1687            Ok(serialised)
1688        }
1689
1690        async fn get_latest_rounds(
1691            &self,
1692            _peer: AuthorityIndex,
1693            _timeout: Duration,
1694        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1695            unimplemented!("Unimplemented")
1696        }
1697    }
1698
1699    #[test]
1700    fn test_inflight_blocks_map() {
1701        // GIVEN
1702        let map = InflightBlocksMap::new();
1703        let some_block_refs = [
1704            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1705            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1706            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1707            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1708        ];
1709        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1710
1711        // Lock & unlock blocks
1712        {
1713            let mut all_guards = Vec::new();
1714
1715            // Try to acquire the block locks for authorities 1 & 2 (Periodic limit is 2)
1716            for i in 1..=2 {
1717                let authority = AuthorityIndex::new_for_test(i);
1718
1719                let guard =
1720                    map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1721                let guard = guard.expect("Guard should be created");
1722                assert_eq!(guard.block_refs.len(), 4);
1723
1724                all_guards.push(guard);
1725
1726                // trying to acquire any of them again will not succeed
1727                let guard =
1728                    map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1729                assert!(guard.is_none());
1730            }
1731
1732            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1733            // number of allowed peers (Periodic limit is 2)
1734            let authority_3 = AuthorityIndex::new_for_test(3);
1735
1736            let guard = map.lock_blocks(
1737                missing_block_refs.clone(),
1738                authority_3,
1739                SyncMethod::Periodic,
1740            );
1741            assert!(guard.is_none());
1742
1743            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1744            // will now succeed
1745            drop(all_guards.remove(0));
1746
1747            let guard = map.lock_blocks(
1748                missing_block_refs.clone(),
1749                authority_3,
1750                SyncMethod::Periodic,
1751            );
1752            let guard = guard.expect("Guard should be successfully acquired");
1753
1754            assert_eq!(guard.block_refs, missing_block_refs);
1755
1756            // Dropping all guards should unlock on the block refs
1757            drop(guard);
1758            drop(all_guards);
1759
1760            assert_eq!(map.num_of_locked_blocks(), 0);
1761        }
1762
1763        // Swap locks
1764        {
1765            // acquire a lock for authority 1
1766            let authority_1 = AuthorityIndex::new_for_test(1);
1767            let guard = map
1768                .lock_blocks(
1769                    missing_block_refs.clone(),
1770                    authority_1,
1771                    SyncMethod::Periodic,
1772                )
1773                .unwrap();
1774
1775            // Now swap the locks for authority 2
1776            let authority_2 = AuthorityIndex::new_for_test(2);
1777            let guard = map.swap_locks(guard, authority_2);
1778
1779            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1780        }
1781    }
1782
1783    #[test]
1784    fn test_inflight_blocks_map_live_sync_limit() {
1785        // GIVEN
1786        let map = InflightBlocksMap::new();
1787        let some_block_refs = [
1788            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1789            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1790        ];
1791        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1792
1793        // WHEN authority 1 locks with Live sync
1794        let authority_1 = AuthorityIndex::new_for_test(1);
1795        let guard_1 = map
1796            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1797            .expect("Should successfully lock with Live sync");
1798
1799        assert_eq!(guard_1.block_refs.len(), 2);
1800
1801        // THEN authority 2 cannot lock with Live sync (limit of 1 reached)
1802        let authority_2 = AuthorityIndex::new_for_test(2);
1803        let guard_2 = map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1804
1805        assert!(
1806            guard_2.is_none(),
1807            "Should fail to lock - Live limit of 1 reached"
1808        );
1809
1810        // WHEN authority 1 releases the lock
1811        drop(guard_1);
1812
1813        // THEN authority 2 can now lock with Live sync
1814        let guard_2 = map
1815            .lock_blocks(missing_block_refs, authority_2, SyncMethod::Live)
1816            .expect("Should successfully lock after authority 1 released");
1817
1818        assert_eq!(guard_2.block_refs.len(), 2);
1819    }
1820
1821    #[test]
1822    fn test_inflight_blocks_map_periodic_allows_more_concurrency() {
1823        // GIVEN
1824        let map = InflightBlocksMap::new();
1825        let some_block_refs = [
1826            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1827            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1828        ];
1829        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1830
1831        // WHEN authority 1 locks with Periodic sync
1832        let authority_1 = AuthorityIndex::new_for_test(1);
1833        let guard_1 = map
1834            .lock_blocks(
1835                missing_block_refs.clone(),
1836                authority_1,
1837                SyncMethod::Periodic,
1838            )
1839            .expect("Should successfully lock with Periodic sync");
1840
1841        assert_eq!(guard_1.block_refs.len(), 2);
1842
1843        // THEN authority 2 can also lock with Periodic sync (limit is 2)
1844        let authority_2 = AuthorityIndex::new_for_test(2);
1845        let guard_2 = map
1846            .lock_blocks(
1847                missing_block_refs.clone(),
1848                authority_2,
1849                SyncMethod::Periodic,
1850            )
1851            .expect("Should successfully lock - Periodic allows 2 authorities");
1852
1853        assert_eq!(guard_2.block_refs.len(), 2);
1854
1855        // BUT authority 3 cannot lock with Periodic sync (limit of 2 reached)
1856        let authority_3 = AuthorityIndex::new_for_test(3);
1857        let guard_3 = map.lock_blocks(
1858            missing_block_refs.clone(),
1859            authority_3,
1860            SyncMethod::Periodic,
1861        );
1862
1863        assert!(
1864            guard_3.is_none(),
1865            "Should fail to lock - Periodic limit of 2 reached"
1866        );
1867
1868        // WHEN authority 1 releases the lock
1869        drop(guard_1);
1870
1871        // THEN authority 3 can now lock with Periodic sync
1872        let guard_3 = map
1873            .lock_blocks(missing_block_refs, authority_3, SyncMethod::Periodic)
1874            .expect("Should successfully lock after authority 1 released");
1875
1876        assert_eq!(guard_3.block_refs.len(), 2);
1877    }
1878
1879    #[test]
1880    fn test_inflight_blocks_map_periodic_blocks_live_when_at_live_limit() {
1881        // GIVEN
1882        let map = InflightBlocksMap::new();
1883        let some_block_refs = [
1884            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1885            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1886        ];
1887        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1888
1889        // WHEN authority 1 locks with Periodic sync (total=1, at Live's limit)
1890        let authority_1 = AuthorityIndex::new_for_test(1);
1891        let guard_1 = map
1892            .lock_blocks(
1893                missing_block_refs.clone(),
1894                authority_1,
1895                SyncMethod::Periodic,
1896            )
1897            .expect("Should successfully lock with Periodic sync");
1898
1899        assert_eq!(guard_1.block_refs.len(), 2);
1900
1901        // THEN authority 2 cannot lock with Live sync (total already at Live limit of
1902        // 1)
1903        let authority_2 = AuthorityIndex::new_for_test(2);
1904        let guard_2_live =
1905            map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1906
1907        assert!(
1908            guard_2_live.is_none(),
1909            "Should fail to lock with Live - total already at Live limit of 1"
1910        );
1911
1912        // BUT authority 2 CAN lock with Periodic sync (total would be 2, at Periodic
1913        // limit)
1914        let guard_2_periodic = map
1915            .lock_blocks(missing_block_refs, authority_2, SyncMethod::Periodic)
1916            .expect("Should successfully lock with Periodic - under Periodic limit of 2");
1917
1918        assert_eq!(guard_2_periodic.block_refs.len(), 2);
1919    }
1920
1921    #[test]
1922    fn test_inflight_blocks_map_live_then_periodic_interaction() {
1923        // GIVEN
1924        let map = InflightBlocksMap::new();
1925        let some_block_refs = [
1926            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1927            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1928        ];
1929        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1930
1931        // WHEN authority 1 locks with Live sync (total=1, at Live limit)
1932        let authority_1 = AuthorityIndex::new_for_test(1);
1933        let guard_1 = map
1934            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1935            .expect("Should successfully lock with Live sync");
1936
1937        assert_eq!(guard_1.block_refs.len(), 2);
1938
1939        // THEN authority 2 cannot lock with Live sync (would exceed Live limit of 1)
1940        let authority_2 = AuthorityIndex::new_for_test(2);
1941        let guard_2_live =
1942            map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1943
1944        assert!(
1945            guard_2_live.is_none(),
1946            "Should fail to lock with Live - would exceed Live limit of 1"
1947        );
1948
1949        // BUT authority 2 CAN lock with Periodic sync (total=2, at Periodic limit)
1950        let guard_2 = map
1951            .lock_blocks(
1952                missing_block_refs.clone(),
1953                authority_2,
1954                SyncMethod::Periodic,
1955            )
1956            .expect("Should successfully lock with Periodic - total 2 is at Periodic limit");
1957
1958        assert_eq!(guard_2.block_refs.len(), 2);
1959
1960        // AND authority 3 cannot lock with Periodic sync (would exceed Periodic limit
1961        // of 2)
1962        let authority_3 = AuthorityIndex::new_for_test(3);
1963        let guard_3 = map.lock_blocks(missing_block_refs, authority_3, SyncMethod::Periodic);
1964
1965        assert!(
1966            guard_3.is_none(),
1967            "Should fail to lock with Periodic - would exceed Periodic limit of 2"
1968        );
1969    }
1970
1971    #[test]
1972    fn test_inflight_blocks_map_partial_locks_mixed_methods() {
1973        // GIVEN 4 blocks
1974        let map = InflightBlocksMap::new();
1975        let block_a = BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1976        let block_b = BlockRef::new(2, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1977        let block_c = BlockRef::new(3, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1978        let block_d = BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1979
1980        // Lock block A with authority 1 using Live (A at limit for Live)
1981        let guard_a = map
1982            .lock_blocks(
1983                [block_a].into(),
1984                AuthorityIndex::new_for_test(1),
1985                SyncMethod::Live,
1986            )
1987            .expect("Should lock block A");
1988        assert_eq!(guard_a.block_refs.len(), 1);
1989
1990        // Lock block B with authorities 1 & 2 using Periodic (B at limit for Periodic)
1991        let guard_b1 = map
1992            .lock_blocks(
1993                [block_b].into(),
1994                AuthorityIndex::new_for_test(1),
1995                SyncMethod::Periodic,
1996            )
1997            .expect("Should lock block B");
1998        let guard_b2 = map
1999            .lock_blocks(
2000                [block_b].into(),
2001                AuthorityIndex::new_for_test(2),
2002                SyncMethod::Periodic,
2003            )
2004            .expect("Should lock block B again");
2005        assert_eq!(guard_b1.block_refs.len(), 1);
2006        assert_eq!(guard_b2.block_refs.len(), 1);
2007
2008        // Lock block C with authority 1 using Periodic (C has 1 lock)
2009        let guard_c = map
2010            .lock_blocks(
2011                [block_c].into(),
2012                AuthorityIndex::new_for_test(1),
2013                SyncMethod::Periodic,
2014            )
2015            .expect("Should lock block C");
2016        assert_eq!(guard_c.block_refs.len(), 1);
2017
2018        // Block D is unlocked
2019
2020        // WHEN authority 3 requests all 4 blocks with Periodic
2021        let all_blocks = [block_a, block_b, block_c, block_d].into();
2022        let guard_3 = map
2023            .lock_blocks(
2024                all_blocks,
2025                AuthorityIndex::new_for_test(3),
2026                SyncMethod::Periodic,
2027            )
2028            .expect("Should get partial lock");
2029
2030        // THEN should successfully lock C and D only
2031        // - A: total=1 (at Live limit), authority 3 can still add since using Periodic
2032        //   and total < 2
2033        // - B: total=2 (at Periodic limit), cannot lock
2034        // - C: total=1, can lock (under limit)
2035        // - D: total=0, can lock
2036        assert_eq!(
2037            guard_3.block_refs.len(),
2038            3,
2039            "Should lock blocks A, C, and D"
2040        );
2041        assert!(
2042            guard_3.block_refs.contains(&block_a),
2043            "Should contain block A"
2044        );
2045        assert!(
2046            !guard_3.block_refs.contains(&block_b),
2047            "Should NOT contain block B (at limit)"
2048        );
2049        assert!(
2050            guard_3.block_refs.contains(&block_c),
2051            "Should contain block C"
2052        );
2053        assert!(
2054            guard_3.block_refs.contains(&block_d),
2055            "Should contain block D"
2056        );
2057    }
2058
2059    #[test]
2060    fn test_inflight_blocks_map_swap_locks_preserves_method() {
2061        // GIVEN
2062        let map = InflightBlocksMap::new();
2063        let some_block_refs = [
2064            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2065            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2066        ];
2067        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
2068
2069        // WHEN authority 1 locks with Live sync
2070        let authority_1 = AuthorityIndex::new_for_test(1);
2071        let guard_1 = map
2072            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
2073            .expect("Should lock with Live sync");
2074
2075        assert_eq!(guard_1.block_refs.len(), 2);
2076
2077        // AND we swap to authority 2
2078        let authority_2 = AuthorityIndex::new_for_test(2);
2079        let guard_2 = map
2080            .swap_locks(guard_1, authority_2)
2081            .expect("Should swap locks");
2082
2083        // THEN the new guard should preserve the block refs
2084        assert_eq!(guard_2.block_refs, missing_block_refs);
2085
2086        // AND authority 3 cannot lock with Live sync (limit of 1 reached)
2087        let authority_3 = AuthorityIndex::new_for_test(3);
2088        let guard_3 = map.lock_blocks(missing_block_refs.clone(), authority_3, SyncMethod::Live);
2089        assert!(guard_3.is_none(), "Should fail - Live limit reached");
2090
2091        // BUT authority 3 CAN lock with Periodic sync
2092        let guard_3_periodic = map
2093            .lock_blocks(missing_block_refs, authority_3, SyncMethod::Periodic)
2094            .expect("Should lock with Periodic");
2095        assert_eq!(guard_3_periodic.block_refs.len(), 2);
2096    }
2097
2098    #[tokio::test]
2099    async fn test_process_fetched_blocks() {
2100        // GIVEN
2101        let (context, _) = Context::new_for_test(4);
2102        let context = Arc::new(context);
2103        let block_verifier = Arc::new(NoopBlockVerifier {});
2104        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2105        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2106        let (commands_sender, _commands_receiver) =
2107            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2108
2109        // Create input test blocks:
2110        // - Authority 0 block at round 60.
2111        // - Authority 1 blocks from round 30 to 93.
2112        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2113        expected_blocks.extend(
2114            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2115        );
2116        assert_eq!(
2117            expected_blocks.len(),
2118            context.parameters.max_blocks_per_sync
2119        );
2120
2121        let expected_serialized_blocks = expected_blocks
2122            .iter()
2123            .map(|b| b.serialized().clone())
2124            .collect::<Vec<_>>();
2125
2126        let expected_block_refs = expected_blocks
2127            .iter()
2128            .map(|b| b.reference())
2129            .collect::<BTreeSet<_>>();
2130
2131        // GIVEN peer to fetch blocks from
2132        let peer_index = AuthorityIndex::new_for_test(2);
2133
2134        // Create blocks_guard
2135        let inflight_blocks_map = InflightBlocksMap::new();
2136        let blocks_guard = inflight_blocks_map
2137            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2138            .expect("Failed to lock blocks");
2139
2140        assert_eq!(
2141            inflight_blocks_map.num_of_locked_blocks(),
2142            expected_block_refs.len()
2143        );
2144
2145        // Create a Synchronizer
2146        let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2147            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2148        )));
2149        let result = Synchronizer::<
2150            MockNetworkClient,
2151            NoopBlockVerifier,
2152            MockCoreThreadDispatcher,
2153        >::process_fetched_blocks(
2154            expected_serialized_blocks,
2155            peer_index,
2156            blocks_guard, // The guard is consumed here
2157            core_dispatcher.clone(),
2158            block_verifier,
2159            verified_cache,
2160            commit_vote_monitor,
2161            context.clone(),
2162            commands_sender,
2163            "test",
2164        )
2165            .await;
2166
2167        // THEN
2168        assert!(result.is_ok());
2169
2170        // Check blocks were sent to core
2171        let added_blocks = core_dispatcher.get_add_blocks().await;
2172        assert_eq!(
2173            added_blocks
2174                .iter()
2175                .map(|b| b.reference())
2176                .collect::<BTreeSet<_>>(),
2177            expected_block_refs,
2178        );
2179
2180        // Check blocks were unlocked
2181        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2182    }
2183
2184    #[tokio::test]
2185    async fn test_process_fetched_blocks_duplicates() {
2186        // GIVEN
2187        let (context, _) = Context::new_for_test(4);
2188        let context = Arc::new(context);
2189        let block_verifier = Arc::new(NoopBlockVerifier {});
2190        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2191        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2192        let (commands_sender, _commands_receiver) =
2193            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2194
2195        // Create input test blocks:
2196        // - Authority 0 block at round 60.
2197        // - Authority 1 blocks from round 30 to 60.
2198        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2199        expected_blocks.extend(
2200            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2201        );
2202        assert_eq!(
2203            expected_blocks.len(),
2204            context.parameters.max_blocks_per_sync
2205        );
2206
2207        let expected_serialized_blocks = expected_blocks
2208            .iter()
2209            .map(|b| b.serialized().clone())
2210            .collect::<Vec<_>>();
2211
2212        let expected_block_refs = expected_blocks
2213            .iter()
2214            .map(|b| b.reference())
2215            .collect::<BTreeSet<_>>();
2216
2217        // GIVEN peer to fetch blocks from
2218        let peer_index = AuthorityIndex::new_for_test(2);
2219
2220        // Create blocks_guard
2221        let inflight_blocks_map = InflightBlocksMap::new();
2222        let blocks_guard = inflight_blocks_map
2223            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2224            .expect("Failed to lock blocks");
2225
2226        assert_eq!(
2227            inflight_blocks_map.num_of_locked_blocks(),
2228            expected_block_refs.len()
2229        );
2230
2231        // Create a shared LruCache that will be reused to verify duplicate prevention
2232        let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2233            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2234        )));
2235
2236        // WHEN process fetched blocks for the first time
2237        let result = Synchronizer::<
2238            MockNetworkClient,
2239            NoopBlockVerifier,
2240            MockCoreThreadDispatcher,
2241        >::process_fetched_blocks(
2242            expected_serialized_blocks.clone(),
2243            peer_index,
2244            blocks_guard,
2245            core_dispatcher.clone(),
2246            block_verifier.clone(),
2247            verified_cache.clone(),
2248            commit_vote_monitor.clone(),
2249            context.clone(),
2250            commands_sender.clone(),
2251            "test",
2252        )
2253        .await;
2254
2255        // THEN
2256        assert!(result.is_ok());
2257
2258        // Check blocks were sent to core
2259        let added_blocks = core_dispatcher.get_add_blocks().await;
2260        assert_eq!(
2261            added_blocks
2262                .iter()
2263                .map(|b| b.reference())
2264                .collect::<BTreeSet<_>>(),
2265            expected_block_refs,
2266        );
2267
2268        // Check blocks were unlocked
2269        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2270
2271        // PART 2: Verify LruCache prevents duplicate processing
2272        // Try to process the same blocks again (simulating duplicate fetch)
2273        let blocks_guard_second = inflight_blocks_map
2274            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2275            .expect("Failed to lock blocks for second call");
2276
2277        let result_second = Synchronizer::<
2278            MockNetworkClient,
2279            NoopBlockVerifier,
2280            MockCoreThreadDispatcher,
2281        >::process_fetched_blocks(
2282            expected_serialized_blocks,
2283            peer_index,
2284            blocks_guard_second,
2285            core_dispatcher.clone(),
2286            block_verifier,
2287            verified_cache.clone(),
2288            commit_vote_monitor,
2289            context.clone(),
2290            commands_sender,
2291            "test",
2292        )
2293        .await;
2294
2295        assert!(result_second.is_ok());
2296
2297        // Verify NO blocks were sent to core on the second call
2298        // because they were already in the LruCache
2299        let added_blocks_second_call = core_dispatcher.get_add_blocks().await;
2300        assert!(
2301            added_blocks_second_call.is_empty(),
2302            "Expected no blocks to be added on second call due to LruCache, but got {} blocks",
2303            added_blocks_second_call.len()
2304        );
2305
2306        // Verify the cache contains all the block digests
2307        let cache_size = verified_cache.lock().len();
2308        assert_eq!(
2309            cache_size,
2310            expected_block_refs.len(),
2311            "Expected {} entries in the LruCache, but got {}",
2312            expected_block_refs.len(),
2313            cache_size
2314        );
2315    }
2316
2317    #[tokio::test]
2318    async fn test_successful_fetch_blocks_from_peer() {
2319        // GIVEN
2320        let (context, _) = Context::new_for_test(4);
2321        let context = Arc::new(context);
2322        let block_verifier = Arc::new(NoopBlockVerifier {});
2323        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2324        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2325        let network_client = Arc::new(MockNetworkClient::default());
2326        let store = Arc::new(MemStore::new());
2327        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2328
2329        let handle = Synchronizer::start(
2330            network_client.clone(),
2331            context,
2332            core_dispatcher.clone(),
2333            commit_vote_monitor,
2334            block_verifier,
2335            dag_state,
2336            false,
2337        );
2338
2339        // Create some test blocks
2340        let expected_blocks = (0..10)
2341            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2342            .collect::<Vec<_>>();
2343        let missing_blocks = expected_blocks
2344            .iter()
2345            .map(|block| block.reference())
2346            .collect::<BTreeSet<_>>();
2347
2348        // AND stub the fetch_blocks request from peer 1
2349        let peer = AuthorityIndex::new_for_test(1);
2350        network_client
2351            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
2352            .await;
2353
2354        // WHEN request missing blocks from peer 1
2355        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2356
2357        // Wait a little bit until those have been added in core
2358        sleep(Duration::from_millis(1_000)).await;
2359
2360        // THEN ensure those ended up in Core
2361        let added_blocks = core_dispatcher.get_add_blocks().await;
2362        assert_eq!(added_blocks, expected_blocks);
2363    }
2364
2365    #[tokio::test]
2366    async fn saturate_fetch_blocks_from_peer() {
2367        // GIVEN
2368        let (context, _) = Context::new_for_test(4);
2369        let context = Arc::new(context);
2370        let block_verifier = Arc::new(NoopBlockVerifier {});
2371        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2372        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2373        let network_client = Arc::new(MockNetworkClient::default());
2374        let store = Arc::new(MemStore::new());
2375        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2376
2377        let handle = Synchronizer::start(
2378            network_client.clone(),
2379            context,
2380            core_dispatcher.clone(),
2381            commit_vote_monitor,
2382            block_verifier,
2383            dag_state,
2384            false,
2385        );
2386
2387        // Create some test blocks
2388        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
2389            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
2390            .collect::<Vec<_>>();
2391
2392        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
2393        let peer = AuthorityIndex::new_for_test(1);
2394        let mut iter = expected_blocks.iter().peekable();
2395        while let Some(block) = iter.next() {
2396            // stub the fetch_blocks request from peer 1 and give some high response latency
2397            // so requests can start blocking the peer task.
2398            network_client
2399                .stub_fetch_blocks(
2400                    vec![block.clone()],
2401                    peer,
2402                    Some(Duration::from_millis(5_000)),
2403                )
2404                .await;
2405
2406            let mut missing_blocks = BTreeSet::new();
2407            missing_blocks.insert(block.reference());
2408
2409            // WHEN requesting to fetch the blocks, it should not succeed for the last
2410            // request and get an error with "saturated" synchronizer
2411            if iter.peek().is_none() {
2412                match handle.fetch_blocks(missing_blocks, peer).await {
2413                    Err(ConsensusError::SynchronizerSaturated(index, _)) => {
2414                        assert_eq!(index, peer);
2415                    }
2416                    _ => panic!("A saturated synchronizer error was expected"),
2417                }
2418            } else {
2419                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2420            }
2421        }
2422    }
2423
2424    #[tokio::test(flavor = "current_thread", start_paused = true)]
2425    async fn synchronizer_periodic_task_fetch_blocks() {
2426        // GIVEN
2427        let (context, _) = Context::new_for_test(4);
2428        let context = Arc::new(context);
2429        let block_verifier = Arc::new(NoopBlockVerifier {});
2430        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2431        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2432        let network_client = Arc::new(MockNetworkClient::default());
2433        let store = Arc::new(MemStore::new());
2434        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2435
2436        // Create some test blocks
2437        let expected_blocks = (0..10)
2438            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2439            .collect::<Vec<_>>();
2440        let missing_blocks = expected_blocks
2441            .iter()
2442            .map(|block| block.reference())
2443            .collect::<BTreeSet<_>>();
2444
2445        // AND stub the missing blocks
2446        core_dispatcher
2447            .stub_missing_blocks(missing_blocks.clone())
2448            .await;
2449
2450        // AND stub the requests for authority 1 & 2
2451        // Make the first authority timeout, so the second will be called. "We" are
2452        // authority = 0, so we are skipped anyways.
2453        network_client
2454            .stub_fetch_blocks(
2455                expected_blocks.clone(),
2456                AuthorityIndex::new_for_test(1),
2457                Some(FETCH_REQUEST_TIMEOUT),
2458            )
2459            .await;
2460        network_client
2461            .stub_fetch_blocks(
2462                expected_blocks.clone(),
2463                AuthorityIndex::new_for_test(2),
2464                None,
2465            )
2466            .await;
2467
2468        // WHEN start the synchronizer and wait for a couple of seconds
2469        let _handle = Synchronizer::start(
2470            network_client.clone(),
2471            context,
2472            core_dispatcher.clone(),
2473            commit_vote_monitor,
2474            block_verifier,
2475            dag_state,
2476            false,
2477        );
2478
2479        sleep(8 * FETCH_REQUEST_TIMEOUT).await;
2480
2481        // THEN the missing blocks should now be fetched and added to core
2482        let added_blocks = core_dispatcher.get_add_blocks().await;
2483        assert_eq!(added_blocks, expected_blocks);
2484
2485        // AND missing blocks should have been consumed by the stub
2486        assert!(
2487            core_dispatcher
2488                .get_missing_blocks()
2489                .await
2490                .unwrap()
2491                .is_empty()
2492        );
2493    }
2494
2495    #[tokio::test(flavor = "current_thread", start_paused = true)]
2496    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2497        // GIVEN
2498        let (mut context, _) = Context::new_for_test(4);
2499        context
2500            .protocol_config
2501            .set_consensus_batched_block_sync_for_testing(true);
2502        let context = Arc::new(context);
2503        let block_verifier = Arc::new(NoopBlockVerifier {});
2504        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2505        let network_client = Arc::new(MockNetworkClient::default());
2506        let store = Arc::new(MemStore::new());
2507        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2508        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2509
2510        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
2511        // that are above the sync threshold.
2512        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2513        let stub_blocks = (sync_missing_block_round_threshold * 2
2514            ..sync_missing_block_round_threshold * 2
2515                + context.parameters.max_blocks_per_sync as u32)
2516            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2517            .collect::<Vec<_>>();
2518        let missing_blocks = stub_blocks
2519            .iter()
2520            .map(|block| block.reference())
2521            .collect::<BTreeSet<_>>();
2522        core_dispatcher
2523            .stub_missing_blocks(missing_blocks.clone())
2524            .await;
2525        // AND stub the requests for authority 1 & 2
2526        // Make the first authority timeout, so the second will be called. "We" are
2527        // authority = 0, so we are skipped anyways.
2528        let mut expected_blocks = stub_blocks
2529            .iter()
2530            .take(context.parameters.max_blocks_per_sync)
2531            .cloned()
2532            .collect::<Vec<_>>();
2533        network_client
2534            .stub_fetch_blocks(
2535                expected_blocks.clone(),
2536                AuthorityIndex::new_for_test(1),
2537                Some(FETCH_REQUEST_TIMEOUT),
2538            )
2539            .await;
2540        network_client
2541            .stub_fetch_blocks(
2542                expected_blocks.clone(),
2543                AuthorityIndex::new_for_test(2),
2544                None,
2545            )
2546            .await;
2547
2548        // Now create some blocks to simulate a commit lag
2549        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2550        let commit_index: CommitIndex = round - 1;
2551        let blocks = (0..4)
2552            .map(|authority| {
2553                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2554                let block = TestBlock::new(round, authority)
2555                    .set_commit_votes(commit_votes)
2556                    .build();
2557
2558                VerifiedBlock::new_for_test(block)
2559            })
2560            .collect::<Vec<_>>();
2561
2562        // Pass them through the commit vote monitor - so now there will be a big commit
2563        // lag to prevent the scheduled synchronizer from running
2564        for block in blocks {
2565            commit_vote_monitor.observe_block(&block);
2566        }
2567
2568        // Start the synchronizer and wait for a couple of seconds where normally
2569        // the synchronizer should have kicked in.
2570        let _handle = Synchronizer::start(
2571            network_client.clone(),
2572            context.clone(),
2573            core_dispatcher.clone(),
2574            commit_vote_monitor.clone(),
2575            block_verifier,
2576            dag_state.clone(),
2577            false,
2578        );
2579
2580        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2581
2582        // Since we should be in commit lag mode none of the missed blocks should have
2583        // been fetched - hence nothing should be sent to core for processing.
2584        let added_blocks = core_dispatcher.get_add_blocks().await;
2585        assert_eq!(added_blocks, vec![]);
2586
2587        println!("Before advancing");
2588        // AND advance now the local commit index by adding a new commit that matches
2589        // the commit index of quorum
2590        {
2591            let mut d = dag_state.write();
2592            for index in 1..=commit_index {
2593                let commit =
2594                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2595
2596                d.add_commit(commit);
2597            }
2598
2599            println!("Once advanced");
2600            assert_eq!(
2601                d.last_commit_index(),
2602                commit_vote_monitor.quorum_commit_index()
2603            );
2604        }
2605
2606        // Now stub again the missing blocks to fetch the exact same ones.
2607        core_dispatcher
2608            .stub_missing_blocks(missing_blocks.clone())
2609            .await;
2610
2611        println!("Final sleep");
2612        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2613
2614        // THEN the missing blocks should now be fetched and added to core
2615        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2616        println!("Final await");
2617        added_blocks.sort_by_key(|block| block.reference());
2618        expected_blocks.sort_by_key(|block| block.reference());
2619
2620        assert_eq!(added_blocks, expected_blocks);
2621    }
2622
2623    #[tokio::test(flavor = "current_thread", start_paused = true)]
2624    async fn synchronizer_fetch_own_last_block() {
2625        // GIVEN
2626        let (context, _) = Context::new_for_test(4);
2627        let context = Arc::new(context.with_parameters(Parameters {
2628            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2629            ..Default::default()
2630        }));
2631        let block_verifier = Arc::new(NoopBlockVerifier {});
2632        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2633        let network_client = Arc::new(MockNetworkClient::default());
2634        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2635        let store = Arc::new(MemStore::new());
2636        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2637        let our_index = AuthorityIndex::new_for_test(0);
2638
2639        // Create some test blocks
2640        let mut expected_blocks = (8..=10)
2641            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2642            .collect::<Vec<_>>();
2643
2644        // Now set different latest blocks for the peers
2645        // For peer 1 we give the block of round 10 (highest)
2646        let block_1 = expected_blocks.pop().unwrap();
2647        network_client
2648            .stub_fetch_latest_blocks(
2649                vec![block_1.clone()],
2650                AuthorityIndex::new_for_test(1),
2651                vec![our_index],
2652                Some(Duration::from_secs(10)),
2653            )
2654            .await;
2655        network_client
2656            .stub_fetch_latest_blocks(
2657                vec![block_1],
2658                AuthorityIndex::new_for_test(1),
2659                vec![our_index],
2660                None,
2661            )
2662            .await;
2663
2664        // For peer 2 we give the block of round 9
2665        let block_2 = expected_blocks.pop().unwrap();
2666        network_client
2667            .stub_fetch_latest_blocks(
2668                vec![block_2.clone()],
2669                AuthorityIndex::new_for_test(2),
2670                vec![our_index],
2671                Some(Duration::from_secs(10)),
2672            )
2673            .await;
2674        network_client
2675            .stub_fetch_latest_blocks(
2676                vec![block_2],
2677                AuthorityIndex::new_for_test(2),
2678                vec![our_index],
2679                None,
2680            )
2681            .await;
2682
2683        // For peer 3 we give a block with lowest round
2684        let block_3 = expected_blocks.pop().unwrap();
2685        network_client
2686            .stub_fetch_latest_blocks(
2687                vec![block_3.clone()],
2688                AuthorityIndex::new_for_test(3),
2689                vec![our_index],
2690                Some(Duration::from_secs(10)),
2691            )
2692            .await;
2693        network_client
2694            .stub_fetch_latest_blocks(
2695                vec![block_3],
2696                AuthorityIndex::new_for_test(3),
2697                vec![our_index],
2698                None,
2699            )
2700            .await;
2701
2702        // WHEN start the synchronizer and wait for a couple of seconds
2703        let handle = Synchronizer::start(
2704            network_client.clone(),
2705            context.clone(),
2706            core_dispatcher.clone(),
2707            commit_vote_monitor,
2708            block_verifier,
2709            dag_state,
2710            true,
2711        );
2712
2713        // Wait at least for the timeout time
2714        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2715
2716        // Assert that core has been called to set the min propose round
2717        assert_eq!(
2718            core_dispatcher.get_last_own_proposed_round().await,
2719            vec![10]
2720        );
2721
2722        // Ensure that all the requests have been called
2723        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2724
2725        // And we got one retry
2726        assert_eq!(
2727            context
2728                .metrics
2729                .node_metrics
2730                .sync_last_known_own_block_retries
2731                .get(),
2732            1
2733        );
2734
2735        // Ensure that no panic occurred
2736        if let Err(err) = handle.stop().await {
2737            if err.is_panic() {
2738                std::panic::resume_unwind(err.into_panic());
2739            }
2740        }
2741    }
2742    #[derive(Default)]
2743    struct SyncMockDispatcher {
2744        missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2745        added_blocks: Mutex<Vec<VerifiedBlock>>,
2746    }
2747
2748    #[async_trait::async_trait]
2749    impl CoreThreadDispatcher for SyncMockDispatcher {
2750        async fn get_missing_blocks(
2751            &self,
2752        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2753            Ok(self.missing_blocks.lock().await.clone())
2754        }
2755        async fn add_blocks(
2756            &self,
2757            blocks: Vec<VerifiedBlock>,
2758        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2759            let mut guard = self.added_blocks.lock().await;
2760            guard.extend(blocks.clone());
2761            Ok(blocks.iter().map(|b| b.reference()).collect())
2762        }
2763
2764        // Stub out the remaining CoreThreadDispatcher methods with defaults:
2765
2766        async fn check_block_refs(
2767            &self,
2768            block_refs: Vec<BlockRef>,
2769        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2770            // Echo back the requested refs by default
2771            Ok(block_refs.into_iter().collect())
2772        }
2773
2774        async fn add_certified_commits(
2775            &self,
2776            _commits: CertifiedCommits,
2777        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2778            // No additional certified-commit logic in tests
2779            Ok(BTreeSet::new())
2780        }
2781
2782        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2783            Ok(())
2784        }
2785
2786        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2787            Ok(())
2788        }
2789
2790        fn set_propagation_delay_and_quorum_rounds(
2791            &self,
2792            _delay: Round,
2793            _received_quorum_rounds: Vec<QuorumRound>,
2794            _accepted_quorum_rounds: Vec<QuorumRound>,
2795        ) -> Result<(), CoreError> {
2796            Ok(())
2797        }
2798
2799        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2800            Ok(())
2801        }
2802
2803        fn highest_received_rounds(&self) -> Vec<Round> {
2804            Vec::new()
2805        }
2806    }
2807
2808    #[tokio::test(flavor = "current_thread")]
2809    async fn known_before_random_peer_fetch() {
2810        {
2811            // 1) Setup 10‐node context and in‐mem DAG
2812            let (ctx, _) = Context::new_for_test(10);
2813            let context = Arc::new(ctx);
2814            let store = Arc::new(MemStore::new());
2815            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2816            let inflight = InflightBlocksMap::new();
2817
2818            // 2) One missing block
2819            let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2820            let missing_ref = missing_vb.reference();
2821            let missing_blocks = BTreeMap::from([(
2822                missing_ref,
2823                BTreeSet::from([
2824                    AuthorityIndex::new_for_test(2),
2825                    AuthorityIndex::new_for_test(3),
2826                    AuthorityIndex::new_for_test(4),
2827                ]),
2828            )]);
2829
2830            // 3) Prepare mocks and stubs
2831            let network_client = Arc::new(MockNetworkClient::default());
2832            // Stub *all*  authorities so none panic:
2833            for i in 1..=9 {
2834                let peer = AuthorityIndex::new_for_test(i);
2835                if i == 1 || i == 4 {
2836                    network_client
2837                        .stub_fetch_blocks(
2838                            vec![missing_vb.clone()],
2839                            peer,
2840                            Some(2 * FETCH_REQUEST_TIMEOUT),
2841                        )
2842                        .await;
2843                    continue;
2844                }
2845                network_client
2846                    .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2847                    .await;
2848            }
2849
2850            // 4) Invoke knowledge-based fetch and random fallback selection
2851            //    deterministically
2852            let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2853        ::fetch_blocks_from_authorities(
2854            context.clone(),
2855            inflight.clone(),
2856            network_client.clone(),
2857            missing_blocks,
2858            dag_state.clone(),
2859        )
2860            .await;
2861
2862            // 5) Assert we got exactly two fetches - two from the first two who are aware
2863            //    of the missing block (authority 2 and 3)
2864            assert_eq!(results.len(), 2);
2865
2866            // 6) The  knowledge-based‐fetch went to peer 2 and 3
2867            let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2868            assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2869            let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2870            assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2871            // 7) Verify the returned bytes correspond to that block
2872            let expected = missing_vb.serialized().clone();
2873            assert_eq!(hot_bytes, &vec![expected]);
2874        }
2875    }
2876
2877    #[tokio::test(flavor = "current_thread")]
2878    async fn known_before_periodic_peer_fetch_larger_scenario() {
2879        use std::{
2880            collections::{BTreeMap, BTreeSet},
2881            sync::Arc,
2882        };
2883
2884        use parking_lot::RwLock;
2885
2886        use crate::{
2887            block::{Round, TestBlock, VerifiedBlock},
2888            context::Context,
2889        };
2890
2891        // 1) Setup a 10-node context, in-memory DAG, and inflight map
2892        let (ctx, _) = Context::new_for_test(10);
2893        let context = Arc::new(ctx);
2894        let store = Arc::new(MemStore::new());
2895        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2896        let inflight = InflightBlocksMap::new();
2897        let network_client = Arc::new(MockNetworkClient::default());
2898
2899        // 2) Create 1000 missing blocks known by authorities 0, 2, and 3
2900        let mut missing_blocks = BTreeMap::new();
2901        let mut missing_vbs = Vec::new();
2902        let known_number_blocks = 10;
2903        for i in 0..1000 {
2904            let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2905            let r = vb.reference();
2906            if i < known_number_blocks {
2907                // First 10 blocks are known by authorities 0, 2
2908                missing_blocks.insert(
2909                    r,
2910                    BTreeSet::from([
2911                        AuthorityIndex::new_for_test(0),
2912                        AuthorityIndex::new_for_test(2),
2913                    ]),
2914                );
2915            } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2916                // Second 10 blocks are known by authorities 0, 3
2917                missing_blocks.insert(
2918                    r,
2919                    BTreeSet::from([
2920                        AuthorityIndex::new_for_test(0),
2921                        AuthorityIndex::new_for_test(3),
2922                    ]),
2923                );
2924            } else {
2925                // The rest are only known by authority 0
2926                missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2927            }
2928            missing_vbs.push(vb);
2929        }
2930
2931        // 3) Stub fetches for knowledge-based peers (2 and 3)
2932        let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2933        let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2934            .iter()
2935            .map(|&peer| {
2936                let vbs = missing_vbs
2937                    .iter()
2938                    .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2939                    .take(context.parameters.max_blocks_per_sync)
2940                    .cloned()
2941                    .collect::<Vec<_>>();
2942                (peer, vbs)
2943            })
2944            .collect();
2945
2946        for (peer, vbs) in known_vbs_by_peer {
2947            if peer == AuthorityIndex::new_for_test(2) {
2948                // Simulate timeout for peer 2, then fallback to peer 5
2949                network_client
2950                    .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2951                    .await;
2952                network_client
2953                    .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2954                    .await;
2955            } else {
2956                network_client
2957                    .stub_fetch_blocks(vbs.clone(), peer, None)
2958                    .await;
2959            }
2960        }
2961
2962        // 4) Stub fetches from periodic path peers (1 and 4)
2963        network_client
2964            .stub_fetch_blocks(
2965                missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2966                AuthorityIndex::new_for_test(1),
2967                None,
2968            )
2969            .await;
2970
2971        network_client
2972            .stub_fetch_blocks(
2973                missing_vbs[context.parameters.max_blocks_per_sync
2974                    ..2 * context.parameters.max_blocks_per_sync]
2975                    .to_vec(),
2976                AuthorityIndex::new_for_test(4),
2977                None,
2978            )
2979            .await;
2980
2981        // 5) Execute the fetch logic
2982        let results = Synchronizer::<
2983            MockNetworkClient,
2984            NoopBlockVerifier,
2985            SyncMockDispatcher,
2986        >::fetch_blocks_from_authorities(
2987            context.clone(),
2988            inflight.clone(),
2989            network_client.clone(),
2990            missing_blocks,
2991            dag_state.clone(),
2992        )
2993            .await;
2994
2995        // 6) Assert we got 4 fetches: peer 2 (timed out), fallback to 5, then periodic
2996        //    from 1 and 4
2997        assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2998
2999        // 7) First fetch from peer 3 (knowledge-based)
3000        let (_guard3, bytes3, peer3) = &results[0];
3001        assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
3002        let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
3003            .iter()
3004            .map(|vb| vb.serialized().clone())
3005            .collect::<Vec<_>>();
3006        assert_eq!(bytes3, &expected2);
3007
3008        // 8) Second fetch from peer 1 (periodic)
3009        let (_guard1, bytes1, peer1) = &results[1];
3010        assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
3011        let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
3012            .iter()
3013            .map(|vb| vb.serialized().clone())
3014            .collect::<Vec<_>>();
3015        assert_eq!(bytes1, &expected1);
3016
3017        // 9) Third fetch from peer 4 (periodic)
3018        let (_guard4, bytes4, peer4) = &results[2];
3019        assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
3020        let expected4 = missing_vbs
3021            [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
3022            .iter()
3023            .map(|vb| vb.serialized().clone())
3024            .collect::<Vec<_>>();
3025        assert_eq!(bytes4, &expected4);
3026
3027        // 10) Fourth fetch from peer 5 (fallback after peer 2 timeout)
3028        let (_guard5, bytes5, peer5) = &results[3];
3029        assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
3030        let expected5 = missing_vbs[0..known_number_blocks]
3031            .iter()
3032            .map(|vb| vb.serialized().clone())
3033            .collect::<Vec<_>>();
3034        assert_eq!(bytes5, &expected5);
3035    }
3036
3037    #[tokio::test(flavor = "current_thread")]
3038    async fn test_verify_blocks_deduplication() {
3039        let (context, _keys) = Context::new_for_test(4);
3040        let context = Arc::new(context);
3041        let block_verifier = Arc::new(NoopBlockVerifier {});
3042        let failing_verifier = Arc::new(FailingBlockVerifier);
3043        let peer1 = AuthorityIndex::new_for_test(1);
3044        let peer2 = AuthorityIndex::new_for_test(2);
3045
3046        // Create cache with capacity of 5 for eviction testing
3047        let cache = Arc::new(SyncMutex::new(LruCache::new(NonZeroUsize::new(5).unwrap())));
3048
3049        // Test 1: Per-peer metric tracking
3050        let block1 = VerifiedBlock::new_for_test(TestBlock::new(10, 0).build());
3051        let serialized1 = vec![block1.serialized().clone()];
3052
3053        // Verify from peer1 (cache miss)
3054        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3055            serialized1.clone(), block_verifier.clone(), cache.clone(), &context, peer1, "live",
3056        );
3057        assert_eq!(result.unwrap().len(), 1);
3058
3059        let peer1_hostname = &context.committee.authority(peer1).hostname;
3060        assert_eq!(
3061            context
3062                .metrics
3063                .node_metrics
3064                .synchronizer_skipped_blocks_by_peer
3065                .with_label_values(&[peer1_hostname.as_str(), "live"])
3066                .get(),
3067            0
3068        );
3069
3070        // Verify same block from peer2 with different sync method (cache hit)
3071        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3072            serialized1, block_verifier.clone(), cache.clone(), &context, peer2, "periodic",
3073        );
3074        assert_eq!(result.unwrap().len(), 0, "Should skip cached block");
3075
3076        let peer2_hostname = &context.committee.authority(peer2).hostname;
3077        assert_eq!(
3078            context
3079                .metrics
3080                .node_metrics
3081                .synchronizer_skipped_blocks_by_peer
3082                .with_label_values(&[peer2_hostname.as_str(), "periodic"])
3083                .get(),
3084            1
3085        );
3086
3087        // Test 2: Invalid blocks not cached
3088        let invalid_block = VerifiedBlock::new_for_test(TestBlock::new(20, 0).build());
3089        let invalid_serialized = vec![invalid_block.serialized().clone()];
3090
3091        assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3092            invalid_serialized.clone(), failing_verifier.clone(), cache.clone(), &context, peer1, "test",
3093        ).is_err());
3094        assert_eq!(cache.lock().len(), 1, "Invalid block should not be cached");
3095
3096        // Verify invalid block fails again (not from cache)
3097        assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3098            invalid_serialized, failing_verifier, cache.clone(), &context, peer1, "test",
3099        ).is_err());
3100
3101        // Test 3: Cache eviction
3102        let blocks: Vec<_> = (0..5)
3103            .map(|i| VerifiedBlock::new_for_test(TestBlock::new(30 + i, 0).build()))
3104            .collect();
3105
3106        // Fill cache to capacity
3107        for block in &blocks {
3108            Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3109                vec![block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3110            ).unwrap();
3111        }
3112        assert_eq!(cache.lock().len(), 5);
3113
3114        // Verify first block is evicted when adding new one
3115        let new_block = VerifiedBlock::new_for_test(TestBlock::new(99, 0).build());
3116        Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3117            vec![new_block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3118        ).unwrap();
3119
3120        // First block (block1) should be evicted, so re-verifying it should not be a
3121        // cache hit
3122        let block1_serialized = vec![block1.serialized().clone()];
3123        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3124            block1_serialized, block_verifier.clone(), cache.clone(), &context, peer1, "test",
3125        );
3126        assert_eq!(
3127            result.unwrap().len(),
3128            1,
3129            "Evicted block should be re-verified"
3130        );
3131
3132        // New block should still be in cache
3133        let new_block_serialized = vec![new_block.serialized().clone()];
3134        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3135            new_block_serialized, block_verifier, cache, &context, peer1, "test",
3136        );
3137        assert_eq!(result.unwrap().len(), 0, "New block should be cached");
3138    }
3139}