consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7    num::NonZeroUsize,
8    sync::Arc,
9    time::Duration,
10};
11
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use futures::{StreamExt as _, stream::FuturesUnordered};
15use iota_macros::fail_point_async;
16use iota_metrics::{
17    monitored_future,
18    monitored_mpsc::{Receiver, Sender, channel},
19    monitored_scope,
20};
21use itertools::Itertools as _;
22use lru::LruCache;
23use parking_lot::{Mutex, RwLock};
24#[cfg(not(test))]
25use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
26use tap::TapFallible;
27use tokio::{
28    runtime::Handle,
29    sync::{mpsc::error::TrySendError, oneshot},
30    task::{JoinError, JoinSet},
31    time::{Instant, sleep, sleep_until, timeout},
32};
33use tracing::{debug, error, info, trace, warn};
34
35use crate::{
36    BlockAPI, CommitIndex, Round,
37    authority_service::COMMIT_LAG_MULTIPLIER,
38    block::{BlockDigest, BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
39    block_verifier::BlockVerifier,
40    commit_vote_monitor::CommitVoteMonitor,
41    context::Context,
42    core_thread::CoreThreadDispatcher,
43    dag_state::DagState,
44    error::{ConsensusError, ConsensusResult},
45    network::NetworkClient,
46    scoring_metrics_store::ErrorSource,
47};
48
49/// The number of concurrent fetch blocks requests per authority
50const FETCH_BLOCKS_CONCURRENCY: usize = 5;
51
52/// The maximal additional blocks (parents) that can be fetched.
53// TODO: This is a temporary value, and should be removed once the protocol
54// version is updated to support batching
55pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
56
57/// The maximum number of verified block references to cache for deduplication.
58const VERIFIED_BLOCKS_CACHE_CAP: usize = 200_000;
59
60/// The timeout for synchronizer to fetch blocks from a given peer authority.
61const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
62
63/// The timeout for periodic synchronizer to fetch blocks from the peers.
64const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
65
66/// The maximum number of authorities from which we will try to periodically
67/// fetch blocks at the same moment. The guard will protect that we will not ask
68/// from more than this number of authorities at the same time.
69const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
70
71/// The maximum number of authorities from which the live synchronizer will try
72/// to fetch blocks at the same moment. This is lower than the periodic sync
73/// limit to prioritize periodic sync.
74const MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK: usize = 1;
75
76/// The maximum number of peers from which the periodic synchronizer will
77/// request blocks
78const MAX_PERIODIC_SYNC_PEERS: usize = 4;
79
80/// The maximum number of peers in the periodic synchronizer which are chosen
81/// totally random to fetch blocks from. The other peers will be chosen based on
82/// their knowledge of the DAG.
83const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
84
85/// Represents the different methods used for synchronization
86#[derive(Clone)]
87enum SyncMethod {
88    Live,
89    Periodic,
90}
91
92struct BlocksGuard {
93    map: Arc<InflightBlocksMap>,
94    block_refs: BTreeSet<BlockRef>,
95    peer: AuthorityIndex,
96    method: SyncMethod,
97}
98
99impl Drop for BlocksGuard {
100    fn drop(&mut self) {
101        self.map.unlock_blocks(&self.block_refs, self.peer);
102    }
103}
104
105// Keeps a mapping between the missing blocks that have been instructed to be
106// fetched and the authorities that are currently fetching them. For a block ref
107// there is a maximum number of authorities that can concurrently fetch it. The
108// authority ids that are currently fetching a block are set on the
109// corresponding `BTreeSet` and basically they act as "locks".
110struct InflightBlocksMap {
111    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
112}
113
114impl InflightBlocksMap {
115    fn new() -> Arc<Self> {
116        Arc::new(Self {
117            inner: Mutex::new(HashMap::new()),
118        })
119    }
120
121    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
122    /// avoid re-fetching the missing blocks from too many authorities at
123    /// the same time, thus we limit the concurrency per block by attempting
124    /// to lock per block. If a block is already fetched by the maximum allowed
125    /// number of authorities, then the block ref will not be included in the
126    /// returned set. The method returns all the block refs that have been
127    /// successfully locked and allowed to be fetched.
128    ///
129    /// Different limits apply based on the sync method:
130    /// - Periodic sync: Can lock if total authorities <
131    ///   MAX_AUTHORITIES_TO_FETCH_PER_BLOCK (3)
132    /// - Live sync: Can lock if total authorities <
133    ///   MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK (2)
134    fn lock_blocks(
135        self: &Arc<Self>,
136        missing_block_refs: BTreeSet<BlockRef>,
137        peer: AuthorityIndex,
138        method: SyncMethod,
139    ) -> Option<BlocksGuard> {
140        let mut blocks = BTreeSet::new();
141        let mut inner = self.inner.lock();
142
143        for block_ref in missing_block_refs {
144            let authorities = inner.entry(block_ref).or_default();
145
146            // Check if this peer is already fetching this block
147            if authorities.contains(&peer) {
148                continue;
149            }
150
151            // Count total authorities currently fetching this block
152            let total_count = authorities.len();
153
154            // Determine the limit based on the sync method
155            let max_limit = match method {
156                SyncMethod::Live => MAX_AUTHORITIES_TO_LIVE_FETCH_PER_BLOCK,
157                SyncMethod::Periodic => MAX_AUTHORITIES_TO_FETCH_PER_BLOCK,
158            };
159
160            // Check if we can acquire the lock
161            if total_count < max_limit {
162                assert!(authorities.insert(peer));
163                blocks.insert(block_ref);
164            }
165        }
166
167        if blocks.is_empty() {
168            None
169        } else {
170            Some(BlocksGuard {
171                map: self.clone(),
172                block_refs: blocks,
173                peer,
174                method,
175            })
176        }
177    }
178
179    /// Unlocks the provided block references for the given `peer`. The
180    /// unlocking is strict, meaning that if this method is called for a
181    /// specific block ref and peer more times than the corresponding lock
182    /// has been called, it will panic.
183    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
184        // Now mark all the blocks as fetched from the map
185        let mut blocks_to_fetch = self.inner.lock();
186        for block_ref in block_refs {
187            let authorities = blocks_to_fetch
188                .get_mut(block_ref)
189                .expect("Should have found a non empty map");
190
191            assert!(authorities.remove(&peer), "Peer index should be present!");
192
193            // if the last one then just clean up
194            if authorities.is_empty() {
195                blocks_to_fetch.remove(block_ref);
196            }
197        }
198    }
199
200    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
201    /// and lock now again the referenced block refs. The swap is best
202    /// effort and there is no guarantee that the `peer` will be able to
203    /// acquire the new locks.
204    fn swap_locks(
205        self: &Arc<Self>,
206        blocks_guard: BlocksGuard,
207        peer: AuthorityIndex,
208    ) -> Option<BlocksGuard> {
209        let block_refs = blocks_guard.block_refs.clone();
210        let method = blocks_guard.method.clone();
211
212        // Explicitly drop the guard
213        drop(blocks_guard);
214
215        // Now create a new guard with the same sync method
216        self.lock_blocks(block_refs, peer, method)
217    }
218
219    #[cfg(test)]
220    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
221        let inner = self.inner.lock();
222        inner.len()
223    }
224}
225
226enum Command {
227    FetchBlocks {
228        missing_block_refs: BTreeSet<BlockRef>,
229        peer_index: AuthorityIndex,
230        result: oneshot::Sender<Result<(), ConsensusError>>,
231    },
232    FetchOwnLastBlock,
233    KickOffScheduler,
234}
235
236pub(crate) struct SynchronizerHandle {
237    commands_sender: Sender<Command>,
238    tasks: tokio::sync::Mutex<JoinSet<()>>,
239}
240
241impl SynchronizerHandle {
242    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
243    /// block_refs set - from the peer authority.
244    pub(crate) async fn fetch_blocks(
245        &self,
246        missing_block_refs: BTreeSet<BlockRef>,
247        peer_index: AuthorityIndex,
248    ) -> ConsensusResult<()> {
249        let (sender, receiver) = oneshot::channel();
250        self.commands_sender
251            .send(Command::FetchBlocks {
252                missing_block_refs,
253                peer_index,
254                result: sender,
255            })
256            .await
257            .map_err(|_err| ConsensusError::Shutdown)?;
258        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
259    }
260
261    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
262        let mut tasks = self.tasks.lock().await;
263        tasks.abort_all();
264        while let Some(result) = tasks.join_next().await {
265            result?
266        }
267        Ok(())
268    }
269}
270
271/// `Synchronizer` oversees live block synchronization, crucial for node
272/// progress. Live synchronization refers to the process of retrieving missing
273/// blocks, particularly those essential for advancing a node when data from
274/// only a few rounds is absent. If a node significantly lags behind the
275/// network, `commit_syncer` handles fetching missing blocks via a more
276/// efficient approach. `Synchronizer` aims for swift catch-up employing two
277/// mechanisms:
278///
279/// 1. Explicitly requesting missing blocks from designated authorities via the
280///    "block send" path. This includes attempting to fetch any missing
281///    ancestors necessary for processing a received block. Such requests
282///    prioritize the block author, maximizing the chance of prompt retrieval. A
283///    locking mechanism allows concurrent requests for missing blocks from up
284///    to two authorities simultaneously, enhancing the chances of timely
285///    retrieval. Notably, if additional missing blocks arise during block
286///    processing, requests to the same authority are deferred to the scheduler.
287///
288/// 2. Periodically requesting missing blocks via a scheduler. This primarily
289///    serves to retrieve missing blocks that were not ancestors of a received
290///    block via the "block send" path. The scheduler operates on either a fixed
291///    periodic basis or is triggered immediately after explicit fetches
292///    described in (1), ensuring continued block retrieval if gaps persist.
293///
294/// Additionally to the above, the synchronizer can synchronize and fetch the
295/// last own proposed block from the network peers as best effort approach to
296/// recover node from amnesia and avoid making the node equivocate.
297pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
298    context: Arc<Context>,
299    commands_receiver: Receiver<Command>,
300    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
301    core_dispatcher: Arc<D>,
302    commit_vote_monitor: Arc<CommitVoteMonitor>,
303    dag_state: Arc<RwLock<DagState>>,
304    fetch_blocks_scheduler_task: JoinSet<()>,
305    fetch_own_last_block_task: JoinSet<()>,
306    network_client: Arc<C>,
307    block_verifier: Arc<V>,
308    inflight_blocks_map: Arc<InflightBlocksMap>,
309    verified_blocks_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
310    commands_sender: Sender<Command>,
311}
312
313impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
314    /// Starts the synchronizer, which is responsible for fetching blocks from
315    /// other authorities and managing block synchronization tasks.
316    pub fn start(
317        network_client: Arc<C>,
318        context: Arc<Context>,
319        core_dispatcher: Arc<D>,
320        commit_vote_monitor: Arc<CommitVoteMonitor>,
321        block_verifier: Arc<V>,
322        dag_state: Arc<RwLock<DagState>>,
323        sync_last_known_own_block: bool,
324    ) -> Arc<SynchronizerHandle> {
325        let (commands_sender, commands_receiver) =
326            channel("consensus_synchronizer_commands", 1_000);
327        let inflight_blocks_map = InflightBlocksMap::new();
328        let verified_blocks_cache = Arc::new(Mutex::new(LruCache::new(
329            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
330        )));
331
332        // Spawn the tasks to fetch the blocks from the others
333        let mut fetch_block_senders = BTreeMap::new();
334        let mut tasks = JoinSet::new();
335        for (index, _) in context.committee.authorities() {
336            if index == context.own_index {
337                continue;
338            }
339            let (sender, receiver) =
340                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
341            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
342                index,
343                network_client.clone(),
344                block_verifier.clone(),
345                verified_blocks_cache.clone(),
346                commit_vote_monitor.clone(),
347                context.clone(),
348                core_dispatcher.clone(),
349                dag_state.clone(),
350                receiver,
351                commands_sender.clone(),
352            );
353            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
354            fetch_block_senders.insert(index, sender);
355        }
356
357        let commands_sender_clone = commands_sender.clone();
358
359        if sync_last_known_own_block {
360            commands_sender
361                .try_send(Command::FetchOwnLastBlock)
362                .expect("Failed to sync our last block");
363        }
364
365        // Spawn the task to listen to the requests & periodic runs
366        tasks.spawn(monitored_future!(async move {
367            let mut s = Self {
368                context,
369                commands_receiver,
370                fetch_block_senders,
371                core_dispatcher,
372                commit_vote_monitor,
373                fetch_blocks_scheduler_task: JoinSet::new(),
374                fetch_own_last_block_task: JoinSet::new(),
375                network_client,
376                block_verifier,
377                inflight_blocks_map,
378                verified_blocks_cache,
379                commands_sender: commands_sender_clone,
380                dag_state,
381            };
382            s.run().await;
383        }));
384
385        Arc::new(SynchronizerHandle {
386            commands_sender,
387            tasks: tokio::sync::Mutex::new(tasks),
388        })
389    }
390
391    // The main loop to listen for the submitted commands.
392    async fn run(&mut self) {
393        // We want the synchronizer to run periodically every 200ms to fetch any missing
394        // blocks.
395        const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
396        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
397
398        tokio::pin!(scheduler_timeout);
399
400        loop {
401            tokio::select! {
402                Some(command) = self.commands_receiver.recv() => {
403                    match command {
404                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
405                            if peer_index == self.context.own_index {
406                                error!("We should never attempt to fetch blocks from our own node");
407                                continue;
408                            }
409
410                            let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
411
412                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
413                            // task will take care syncing whatever is leftover.
414                            let missing_block_refs = missing_block_refs
415                                .into_iter()
416                                .take(self.context.parameters.max_blocks_per_sync)
417                                .collect();
418
419                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index, SyncMethod::Live);
420                            let Some(blocks_guard) = blocks_guard else {
421                                result.send(Ok(())).ok();
422                                continue;
423                            };
424
425                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
426                            // synchronization task will handle any still missing blocks in next run.
427                            let r = self
428                                .fetch_block_senders
429                                .get(&peer_index)
430                                .expect("Fatal error, sender should be present")
431                                .try_send(blocks_guard)
432                                .map_err(|err| {
433                                    match err {
434                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
435                                        TrySendError::Closed(_) => ConsensusError::Shutdown
436                                    }
437                                });
438
439                            result.send(r).ok();
440                        }
441                        Command::FetchOwnLastBlock => {
442                            if self.fetch_own_last_block_task.is_empty() {
443                                self.start_fetch_own_last_block_task();
444                            }
445                        }
446                        Command::KickOffScheduler => {
447                            // just reset the scheduler timeout timer to run immediately if not already running.
448                            // If the scheduler is already running then just reduce the remaining time to run.
449                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
450                                Instant::now()
451                            } else {
452                                Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
453                            };
454
455                            // only reset if it is earlier than the next deadline
456                            if timeout < scheduler_timeout.deadline() {
457                                scheduler_timeout.as_mut().reset(timeout);
458                            }
459                        }
460                    }
461                },
462                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
463                    match result {
464                        Ok(()) => {},
465                        Err(e) => {
466                            if e.is_cancelled() {
467                            } else if e.is_panic() {
468                                std::panic::resume_unwind(e.into_panic());
469                            } else {
470                                panic!("fetch our last block task failed: {e}");
471                            }
472                        },
473                    };
474                },
475                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
476                    match result {
477                        Ok(()) => {},
478                        Err(e) => {
479                            if e.is_cancelled() {
480                            } else if e.is_panic() {
481                                std::panic::resume_unwind(e.into_panic());
482                            } else {
483                                panic!("fetch blocks scheduler task failed: {e}");
484                            }
485                        },
486                    };
487                },
488                () = &mut scheduler_timeout => {
489                    // we want to start a new task only if the previous one has already finished.
490                    if self.fetch_blocks_scheduler_task.is_empty() {
491                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
492                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
493                            return;
494                        };
495                    }
496
497                    scheduler_timeout
498                        .as_mut()
499                        .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
500                }
501            }
502        }
503    }
504
505    async fn fetch_blocks_from_authority(
506        peer_index: AuthorityIndex,
507        network_client: Arc<C>,
508        block_verifier: Arc<V>,
509        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
510        commit_vote_monitor: Arc<CommitVoteMonitor>,
511        context: Arc<Context>,
512        core_dispatcher: Arc<D>,
513        dag_state: Arc<RwLock<DagState>>,
514        mut receiver: Receiver<BlocksGuard>,
515        commands_sender: Sender<Command>,
516    ) {
517        const MAX_RETRIES: u32 = 3;
518        let peer_hostname = &context.committee.authority(peer_index).hostname;
519
520        let mut requests = FuturesUnordered::new();
521
522        loop {
523            tokio::select! {
524                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
525                    // get the highest accepted rounds
526                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
527
528                    // Record metrics for live synchronizer requests
529                    let metrics = &context.metrics.node_metrics;
530                    metrics
531                        .synchronizer_requested_blocks_by_peer
532                        .with_label_values(&[peer_hostname.as_str(), "live"])
533                        .inc_by(blocks_guard.block_refs.len() as u64);
534                    // Count requested blocks per authority and increment metric by one per authority
535                    let mut authors = HashSet::new();
536                    for block_ref in &blocks_guard.block_refs {
537                        authors.insert(block_ref.author);
538                    }
539                    for author in authors {
540                        let host = &context.committee.authority(author).hostname;
541                        metrics
542                            .synchronizer_requested_blocks_by_authority
543                            .with_label_values(&[host.as_str(), "live"])
544                            .inc();
545                    }
546
547                    requests.push(Self::fetch_blocks_request(
548                        network_client.clone(),
549                        peer_index,
550                        blocks_guard,
551                        highest_rounds,
552                        FETCH_REQUEST_TIMEOUT,
553                        1,
554                    ))
555                },
556                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
557                    match response {
558                        Ok(blocks) => {
559                            if let Err(err) = Self::process_fetched_blocks(blocks,
560                                peer_index,
561                                blocks_guard,
562                                core_dispatcher.clone(),
563                                block_verifier.clone(),
564                                verified_cache.clone(),
565                                commit_vote_monitor.clone(),
566                                context.clone(),
567                                commands_sender.clone(),
568                                "live"
569                            ).await {
570                                context.scoring_metrics_store.update_scoring_metrics_on_block_receival(
571                                    peer_index,
572                                    peer_hostname,
573                                    err.clone(),
574                                    ErrorSource::Synchronizer,
575                                    &context.metrics.node_metrics,
576                                );
577                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
578                                context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
579                            }
580                        },
581                        Err(_) => {
582                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
583                            if retries <= MAX_RETRIES {
584                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
585                            } else {
586                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
587                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
588                                drop(blocks_guard);
589                            }
590                        }
591                    }
592                },
593                else => {
594                    info!("Fetching blocks from authority {peer_index} task will now abort.");
595                    break;
596                }
597            }
598        }
599    }
600
601    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
602    /// error is returned then the verified blocks are immediately sent to
603    /// Core for processing.
604    async fn process_fetched_blocks(
605        mut serialized_blocks: Vec<Bytes>,
606        peer_index: AuthorityIndex,
607        requested_blocks_guard: BlocksGuard,
608        core_dispatcher: Arc<D>,
609        block_verifier: Arc<V>,
610        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
611        commit_vote_monitor: Arc<CommitVoteMonitor>,
612        context: Arc<Context>,
613        commands_sender: Sender<Command>,
614        sync_method: &str,
615    ) -> ConsensusResult<()> {
616        if serialized_blocks.is_empty() {
617            return Ok(());
618        }
619        let _s = context
620            .metrics
621            .node_metrics
622            .scope_processing_time
623            .with_label_values(&["Synchronizer::process_fetched_blocks"])
624            .start_timer();
625
626        // Limit the number of the returned blocks processed.
627        if context.protocol_config.consensus_batched_block_sync() {
628            serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
629        } else {
630            // Ensure that all the returned blocks do not go over the total max allowed
631            // returned blocks
632            if serialized_blocks.len()
633                > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
634            {
635                return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
636            }
637        }
638
639        // Verify all the fetched blocks
640        let blocks = Handle::current()
641            .spawn_blocking({
642                let block_verifier = block_verifier.clone();
643                let verified_cache = verified_cache.clone();
644                let context = context.clone();
645                let sync_method = sync_method.to_string();
646                move || {
647                    Self::verify_blocks(
648                        serialized_blocks,
649                        block_verifier,
650                        verified_cache,
651                        &context,
652                        peer_index,
653                        &sync_method,
654                    )
655                }
656            })
657            .await
658            .expect("Spawn blocking should not fail")?;
659
660        if !context.protocol_config.consensus_batched_block_sync() {
661            // Get all the ancestors of the requested blocks only
662            let ancestors = blocks
663                .iter()
664                .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
665                .flat_map(|b| b.ancestors().to_vec())
666                .collect::<BTreeSet<BlockRef>>();
667
668            // Now confirm that the blocks are either between the ones requested, or they
669            // are parents of the requested blocks
670            for block in &blocks {
671                if !requested_blocks_guard
672                    .block_refs
673                    .contains(&block.reference())
674                    && !ancestors.contains(&block.reference())
675                {
676                    return Err(ConsensusError::UnexpectedFetchedBlock {
677                        index: peer_index,
678                        block_ref: block.reference(),
679                    });
680                }
681            }
682        }
683
684        // Record commit votes from the verified blocks.
685        for block in &blocks {
686            commit_vote_monitor.observe_block(block);
687        }
688
689        let metrics = &context.metrics.node_metrics;
690        let peer_hostname = &context.committee.authority(peer_index).hostname;
691        metrics
692            .synchronizer_fetched_blocks_by_peer
693            .with_label_values(&[peer_hostname.as_str(), sync_method])
694            .inc_by(blocks.len() as u64);
695        for block in &blocks {
696            let block_hostname = &context.committee.authority(block.author()).hostname;
697            metrics
698                .synchronizer_fetched_blocks_by_authority
699                .with_label_values(&[block_hostname.as_str(), sync_method])
700                .inc();
701        }
702
703        debug!(
704            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
705            blocks.len(),
706            blocks.iter().map(|b| b.reference().to_string()).join(", "),
707        );
708
709        // Now send them to core for processing. Ignore the returned missing blocks as
710        // we don't want this mechanism to keep feedback looping on fetching
711        // more blocks. The periodic synchronization will take care of that.
712        let missing_blocks = core_dispatcher
713            .add_blocks(blocks)
714            .await
715            .map_err(|_| ConsensusError::Shutdown)?;
716
717        // now release all the locked blocks as they have been fetched, verified &
718        // processed
719        drop(requested_blocks_guard);
720
721        // kick off immediately the scheduled synchronizer
722        if !missing_blocks.is_empty() {
723            // do not block here, so we avoid any possible cycles.
724            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
725            {
726                warn!("Commands channel is full")
727            }
728        }
729
730        context
731            .metrics
732            .node_metrics
733            .missing_blocks_after_fetch_total
734            .inc_by(missing_blocks.len() as u64);
735
736        Ok(())
737    }
738
739    fn get_highest_accepted_rounds(
740        dag_state: Arc<RwLock<DagState>>,
741        context: &Arc<Context>,
742    ) -> Vec<Round> {
743        let blocks = dag_state
744            .read()
745            .get_last_cached_block_per_authority(Round::MAX);
746        assert_eq!(blocks.len(), context.committee.size());
747
748        blocks
749            .into_iter()
750            .map(|(block, _)| block.round())
751            .collect::<Vec<_>>()
752    }
753
754    fn verify_blocks(
755        serialized_blocks: Vec<Bytes>,
756        block_verifier: Arc<V>,
757        verified_cache: Arc<Mutex<LruCache<BlockDigest, ()>>>,
758        context: &Context,
759        peer_index: AuthorityIndex,
760        sync_method: &str,
761    ) -> ConsensusResult<Vec<VerifiedBlock>> {
762        let mut verified_blocks = Vec::new();
763        let mut skipped_count = 0u64;
764
765        for serialized_block in serialized_blocks {
766            let block_digest = VerifiedBlock::compute_digest(&serialized_block);
767
768            // Check if this block has already been verified
769            if verified_cache.lock().get(&block_digest).is_some() {
770                skipped_count += 1;
771                continue; // Skip already verified blocks
772            }
773
774            let signed_block: SignedBlock =
775                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
776
777            if let Err(e) = block_verifier.verify(&signed_block) {
778                // TODO: we might want to use a different metric to track the invalid "served"
779                // blocks from the invalid "proposed" ones.
780                let hostname = context.committee.authority(peer_index).hostname.clone();
781
782                context
783                    .metrics
784                    .node_metrics
785                    .invalid_blocks
786                    .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
787                    .inc();
788                warn!("Invalid block received from {}: {}", peer_index, e);
789                return Err(e);
790            }
791
792            // Add block to verified cache after successful verification
793            verified_cache.lock().put(block_digest, ());
794
795            let verified_block = VerifiedBlock::new_verified_with_digest(
796                signed_block,
797                serialized_block,
798                block_digest,
799            );
800
801            // Dropping is ok because the block will be refetched.
802            // TODO: improve efficiency, maybe suspend and continue processing the block
803            // asynchronously.
804            let now = context.clock.timestamp_utc_ms();
805            let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
806            if drift > 0 {
807                let peer_hostname = &context
808                    .committee
809                    .authority(verified_block.author())
810                    .hostname;
811                context
812                    .metrics
813                    .node_metrics
814                    .block_timestamp_drift_ms
815                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
816                    .inc_by(drift);
817
818                if context
819                    .protocol_config
820                    .consensus_median_timestamp_with_checkpoint_enforcement()
821                {
822                    trace!(
823                        "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
824                        verified_block.reference(),
825                        verified_block.timestamp_ms(),
826                        now
827                    );
828                } else {
829                    warn!(
830                        "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
831                        verified_block.reference(),
832                        verified_block.timestamp_ms(),
833                        now
834                    );
835                    continue;
836                }
837            }
838
839            verified_blocks.push(verified_block);
840        }
841
842        // Record skipped blocks metric
843        if skipped_count > 0 {
844            let peer_hostname = &context.committee.authority(peer_index).hostname;
845            context
846                .metrics
847                .node_metrics
848                .synchronizer_skipped_blocks_by_peer
849                .with_label_values(&[peer_hostname.as_str(), sync_method])
850                .inc_by(skipped_count);
851        }
852
853        Ok(verified_blocks)
854    }
855
856    async fn fetch_blocks_request(
857        network_client: Arc<C>,
858        peer: AuthorityIndex,
859        blocks_guard: BlocksGuard,
860        highest_rounds: Vec<Round>,
861        request_timeout: Duration,
862        mut retries: u32,
863    ) -> (
864        ConsensusResult<Vec<Bytes>>,
865        BlocksGuard,
866        u32,
867        AuthorityIndex,
868        Vec<Round>,
869    ) {
870        let start = Instant::now();
871        let resp = timeout(
872            request_timeout,
873            network_client.fetch_blocks(
874                peer,
875                blocks_guard
876                    .block_refs
877                    .clone()
878                    .into_iter()
879                    .collect::<Vec<_>>(),
880                highest_rounds.clone(),
881                request_timeout,
882            ),
883        )
884        .await;
885
886        fail_point_async!("consensus-delay");
887
888        let resp = match resp {
889            Ok(Err(err)) => {
890                // Add a delay before retrying - if that is needed. If request has timed out
891                // then eventually this will be a no-op.
892                sleep_until(start + request_timeout).await;
893                retries += 1;
894                Err(err)
895            } // network error
896            Err(err) => {
897                // timeout
898                sleep_until(start + request_timeout).await;
899                retries += 1;
900                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
901            }
902            Ok(result) => result,
903        };
904        (resp, blocks_guard, retries, peer, highest_rounds)
905    }
906
907    fn start_fetch_own_last_block_task(&mut self) {
908        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
909        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
910
911        let context = self.context.clone();
912        let dag_state = self.dag_state.clone();
913        let network_client = self.network_client.clone();
914        let block_verifier = self.block_verifier.clone();
915        let core_dispatcher = self.core_dispatcher.clone();
916
917        self.fetch_own_last_block_task
918            .spawn(monitored_future!(async move {
919                let _scope = monitored_scope("FetchOwnLastBlockTask");
920
921                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
922                    let network_client_cloned = network_client.clone();
923                    let own_index = context.own_index;
924                    async move {
925                        sleep(fetch_own_block_delay).await;
926                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
927                        (r, authority_index)
928                    }
929                };
930
931                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
932                                    let mut result = Vec::new();
933                                    for serialized_block in blocks {
934                                        let signed_block: SignedBlock = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
935                                        block_verifier.verify(&signed_block).tap_err(|err|{
936                                            let hostname = context.committee.authority(authority_index).hostname.clone();
937                                            context
938                                                .metrics
939                                                .node_metrics
940                                                .invalid_blocks
941                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
942                                                .inc();
943                                            warn!("Invalid block received from {}: {}", authority_index, err);
944                                        })?;
945
946                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
947                                        if verified_block.author() != context.own_index {
948                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
949                                        }
950                                        result.push(verified_block);
951                                    }
952                                    Ok(result)
953                };
954
955                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
956                let mut highest_round = GENESIS_ROUND;
957                // Keep track of the received responses to avoid fetching the own block header from same peer
958                let mut received_response = vec![false; context.committee.size()];
959                // Assume that our node is not Byzantine
960                received_response[context.own_index] = true;
961                let mut total_stake = context.committee.stake(context.own_index);
962                let mut retries = 0;
963                let mut retry_delay_step = Duration::from_millis(500);
964                'main:loop {
965                    if context.committee.size() == 1 {
966                        highest_round = dag_state.read().get_last_proposed_block().round();
967                        info!("Only one node in the network, will not try fetching own last block from peers.");
968                        break 'main;
969                    }
970
971                    // Ask all the other peers about our last block
972                    let mut results = FuturesUnordered::new();
973
974                    for (authority_index, _authority) in context.committee.authorities() {
975                        // Skip our own index and the ones that have already responded
976                        if !received_response[authority_index] {
977                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
978                        }
979                    }
980
981                    // Gather the results but wait to timeout as well
982                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
983                    tokio::pin!(timer);
984
985                    'inner: loop {
986                        tokio::select! {
987                            result = results.next() => {
988                                let Some((result, authority_index)) = result else {
989                                    break 'inner;
990                                };
991                                match result {
992                                    Ok(result) => {
993                                        match process_blocks(result, authority_index) {
994                                            Ok(blocks) => {
995                                                received_response[authority_index] = true;
996                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
997                                                highest_round = highest_round.max(max_round);
998
999                                                total_stake += context.committee.stake(authority_index);
1000                                            },
1001                                            Err(err) => {
1002                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
1003                                            }
1004                                        }
1005                                    },
1006                                    Err(err) => {
1007                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
1008                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
1009                                    }
1010                                }
1011                            },
1012                            () = &mut timer => {
1013                                info!("Timeout while trying to sync our own last block from peers");
1014                                break 'inner;
1015                            }
1016                        }
1017                    }
1018
1019                    // Request at least a quorum of 2f+1 stake to have replied back.
1020                    if context.committee.reached_quorum(total_stake) {
1021                        info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1022                        break 'main;
1023                    } else {
1024                        info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
1025                    }
1026
1027                    retries += 1;
1028                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
1029                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
1030
1031                    sleep(retry_delay_step).await;
1032
1033                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
1034                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
1035                }
1036
1037                // Update the Core with the highest detected round
1038                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
1039
1040                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
1041                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
1042                }
1043            }));
1044    }
1045
1046    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
1047        let mut missing_blocks = self
1048            .core_dispatcher
1049            .get_missing_blocks()
1050            .await
1051            .map_err(|_err| ConsensusError::Shutdown)?;
1052
1053        // No reason to kick off the scheduler if there are no missing blocks to fetch
1054        if missing_blocks.is_empty() {
1055            return Ok(());
1056        }
1057
1058        let context = self.context.clone();
1059        let network_client = self.network_client.clone();
1060        let block_verifier = self.block_verifier.clone();
1061        let verified_cache = self.verified_blocks_cache.clone();
1062        let commit_vote_monitor = self.commit_vote_monitor.clone();
1063        let core_dispatcher = self.core_dispatcher.clone();
1064        let blocks_to_fetch = self.inflight_blocks_map.clone();
1065        let commands_sender = self.commands_sender.clone();
1066        let dag_state = self.dag_state.clone();
1067
1068        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
1069        trace!(
1070            "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
1071        );
1072        if commit_lagging {
1073            // If gc is enabled and we are commit lagging, then we don't want to enable the
1074            // scheduler. As the new logic of processing the certified commits
1075            // takes place we are guaranteed that commits will happen for all the certified
1076            // commits.
1077            if dag_state.read().gc_enabled() {
1078                return Ok(());
1079            }
1080
1081            // As node is commit lagging try to sync only the missing blocks that are within
1082            // the acceptable round thresholds to sync. The rest we don't attempt to
1083            // sync yet.
1084            let highest_accepted_round = dag_state.read().highest_accepted_round();
1085            missing_blocks = missing_blocks
1086                .into_iter()
1087                .take_while(|(block_ref, _)| {
1088                    block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
1089                })
1090                .collect::<BTreeMap<_, _>>();
1091
1092            // If no missing blocks are within the acceptable thresholds to sync while we
1093            // commit lag, then we disable the scheduler completely for this run.
1094            if missing_blocks.is_empty() {
1095                trace!(
1096                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1097                );
1098                self.context
1099                    .metrics
1100                    .node_metrics
1101                    .fetch_blocks_scheduler_skipped
1102                    .with_label_values(&["commit_lagging"])
1103                    .inc();
1104                return Ok(());
1105            }
1106        }
1107
1108        self.fetch_blocks_scheduler_task
1109            .spawn(monitored_future!(async move {
1110                let _scope = monitored_scope("FetchMissingBlocksScheduler");
1111
1112                context
1113                    .metrics
1114                    .node_metrics
1115                    .fetch_blocks_scheduler_inflight
1116                    .inc();
1117                let total_requested = missing_blocks.len();
1118
1119                fail_point_async!("consensus-delay");
1120
1121                // Fetch blocks from peers
1122                let results = Self::fetch_blocks_from_authorities(
1123                    context.clone(),
1124                    blocks_to_fetch.clone(),
1125                    network_client,
1126                    missing_blocks,
1127                    dag_state,
1128                )
1129                .await;
1130                context
1131                    .metrics
1132                    .node_metrics
1133                    .fetch_blocks_scheduler_inflight
1134                    .dec();
1135                if results.is_empty() {
1136                    warn!("No results returned while requesting missing blocks");
1137                    return;
1138                }
1139
1140                // Now process the returned results
1141                let mut total_fetched = 0;
1142                for (blocks_guard, fetched_blocks, peer) in results {
1143                    total_fetched += fetched_blocks.len();
1144
1145                    if let Err(err) = Self::process_fetched_blocks(
1146                        fetched_blocks,
1147                        peer,
1148                        blocks_guard,
1149                        core_dispatcher.clone(),
1150                        block_verifier.clone(),
1151                        verified_cache.clone(),
1152                        commit_vote_monitor.clone(),
1153                        context.clone(),
1154                        commands_sender.clone(),
1155                        "periodic",
1156                    )
1157                    .await
1158                    {
1159                        warn!(
1160                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
1161                        );
1162                    }
1163                }
1164
1165                debug!(
1166                    "Total blocks requested to fetch: {}, total fetched: {}",
1167                    total_requested, total_fetched
1168                );
1169            }));
1170        Ok(())
1171    }
1172
1173    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1174        let last_commit_index = self.dag_state.read().last_commit_index();
1175        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1176        let commit_threshold = last_commit_index
1177            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1178        (
1179            commit_threshold < quorum_commit_index,
1180            last_commit_index,
1181            quorum_commit_index,
1182        )
1183    }
1184
1185    /// The number of rounds above the highest accepted round to allow fetching
1186    /// missing blocks via the periodic synchronization. Any missing blocks
1187    /// of higher rounds are considered too far in the future to fetch. This
1188    /// property is used only when it's detected that the node has fallen
1189    /// behind on its commit compared to the rest of the network,
1190    /// otherwise scheduler will attempt to fetch any missing block.
1191    fn missing_block_round_threshold(&self) -> Round {
1192        self.context.parameters.commit_sync_batch_size
1193    }
1194
1195    /// Fetches the given `missing_blocks` from up to `MAX_PEERS` authorities in
1196    /// parallel:
1197    ///
1198    /// Randomly select `MAX_PEERS - MAX_RANDOM_PEERS` peers from those who
1199    /// are known to hold some missing block, requesting up to
1200    /// `MAX_BLOCKS_PER_FETCH` block refs per peer.
1201    ///
1202    /// Randomly select `MAX_RANDOM_PEERS` additional peers from the
1203    ///  committee (excluding self and those already selected).
1204    ///
1205    /// The method returns a vector with the fetched blocks from each peer that
1206    /// successfully responded and any corresponding additional ancestor blocks.
1207    /// Each element of the vector is a tuple which contains the requested
1208    /// missing block refs, the returned blocks and the peer authority index.
1209    async fn fetch_blocks_from_authorities(
1210        context: Arc<Context>,
1211        inflight_blocks: Arc<InflightBlocksMap>,
1212        network_client: Arc<C>,
1213        missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1214        dag_state: Arc<RwLock<DagState>>,
1215    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1216        // Step 1: Map authorities to missing blocks that they are aware of
1217        let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1218        for (missing_block_ref, authorities) in &missing_blocks {
1219            for author in authorities {
1220                if author == &context.own_index {
1221                    // Skip our own index as we don't want to fetch blocks from ourselves
1222                    continue;
1223                }
1224                authority_to_blocks
1225                    .entry(*author)
1226                    .or_default()
1227                    .push(*missing_block_ref);
1228            }
1229        }
1230
1231        // Step 2: Choose at most MAX_PEERS-MAX_RANDOM_PEERS peers from those who are
1232        // aware of some missing blocks
1233
1234        #[cfg(not(test))]
1235        let mut rng = StdRng::from_entropy();
1236
1237        // Randomly pick up MAX_PEERS - MAX_RANDOM_PEERS authorities that are aware of
1238        // missing blocks
1239        #[cfg(not(test))]
1240        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1241            authority_to_blocks
1242                .iter()
1243                .choose_multiple(
1244                    &mut rng,
1245                    MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1246                )
1247                .into_iter()
1248                .map(|(&peer, blocks)| {
1249                    let limited_blocks = blocks
1250                        .iter()
1251                        .copied()
1252                        .take(context.parameters.max_blocks_per_sync)
1253                        .collect();
1254                    (peer, limited_blocks, "periodic_known")
1255                })
1256                .collect();
1257        #[cfg(test)]
1258        // Deterministically pick the smallest (MAX_PEERS - MAX_RANDOM_PEERS) authority indices
1259        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1260            let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1261                .iter()
1262                .map(|(&peer, blocks)| {
1263                    let limited_blocks = blocks
1264                        .iter()
1265                        .copied()
1266                        .take(context.parameters.max_blocks_per_sync)
1267                        .collect();
1268                    (peer, limited_blocks, "periodic_known")
1269                })
1270                .collect();
1271            // Sort by AuthorityIndex (natural order), then take the first MAX_PEERS -
1272            // MAX_RANDOM_PEERS
1273            items.sort_by_key(|(peer, _, _)| *peer);
1274            items
1275                .into_iter()
1276                .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1277                .collect()
1278        };
1279
1280        // Step 3: Choose at most two random peers not known to be aware of the missing
1281        // blocks
1282        let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1283            .iter()
1284            .map(|(peer, _, _)| *peer)
1285            .collect();
1286
1287        let random_candidates: Vec<_> = context
1288            .committee
1289            .authorities()
1290            .filter_map(|(peer_index, _)| {
1291                (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1292                    .then_some(peer_index)
1293            })
1294            .collect();
1295        #[cfg(test)]
1296        let random_peers: Vec<AuthorityIndex> = random_candidates
1297            .into_iter()
1298            .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1299            .collect();
1300        #[cfg(not(test))]
1301        let random_peers: Vec<AuthorityIndex> = random_candidates
1302            .into_iter()
1303            .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1304
1305        #[cfg_attr(test, allow(unused_mut))]
1306        let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1307        // Shuffle the missing blocks in case the first ones are blocked by irresponsive
1308        // peers
1309        #[cfg(not(test))]
1310        all_missing_blocks.shuffle(&mut rng);
1311
1312        let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1313
1314        for peer in random_peers {
1315            if let Some(chunk) = block_chunks.next() {
1316                chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1317            } else {
1318                break;
1319            }
1320        }
1321
1322        let mut request_futures = FuturesUnordered::new();
1323
1324        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1325
1326        // Record the missing blocks per authority for metrics
1327        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1328        for block in &all_missing_blocks {
1329            missing_blocks_per_authority[block.author] += 1;
1330        }
1331        for (missing, (_, authority)) in missing_blocks_per_authority
1332            .into_iter()
1333            .zip(context.committee.authorities())
1334        {
1335            context
1336                .metrics
1337                .node_metrics
1338                .synchronizer_missing_blocks_by_authority
1339                .with_label_values(&[&authority.hostname])
1340                .inc_by(missing as u64);
1341            context
1342                .metrics
1343                .node_metrics
1344                .synchronizer_current_missing_blocks_by_authority
1345                .with_label_values(&[&authority.hostname])
1346                .set(missing as i64);
1347        }
1348
1349        // Look at peers that were not chosen yet and try to fetch blocks from them if
1350        // needed later
1351        #[cfg_attr(test, expect(unused_mut))]
1352        let mut remaining_peers: Vec<_> = context
1353            .committee
1354            .authorities()
1355            .filter_map(|(peer_index, _)| {
1356                if peer_index != context.own_index
1357                    && !chosen_peers_with_blocks
1358                        .iter()
1359                        .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1360                {
1361                    Some(peer_index)
1362                } else {
1363                    None
1364                }
1365            })
1366            .collect();
1367
1368        #[cfg(not(test))]
1369        remaining_peers.shuffle(&mut rng);
1370        let mut remaining_peers = remaining_peers.into_iter();
1371
1372        // Send the initial requests
1373        for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1374            let peer_hostname = &context.committee.authority(peer).hostname;
1375            let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1376
1377            // Lock the blocks to be fetched. If no lock can be acquired for any of the
1378            // blocks then don't bother.
1379            if let Some(blocks_guard) =
1380                inflight_blocks.lock_blocks(block_refs.clone(), peer, SyncMethod::Periodic)
1381            {
1382                info!(
1383                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1384                    block_refs.len(),
1385                    peer,
1386                    peer_hostname,
1387                    block_refs
1388                        .iter()
1389                        .map(|b| b.to_string())
1390                        .collect::<Vec<_>>()
1391                        .join(", ")
1392                );
1393                // Record metrics about requested blocks
1394                let metrics = &context.metrics.node_metrics;
1395                metrics
1396                    .synchronizer_requested_blocks_by_peer
1397                    .with_label_values(&[peer_hostname.as_str(), label])
1398                    .inc_by(block_refs.len() as u64);
1399                for block_ref in &block_refs {
1400                    let block_hostname = &context.committee.authority(block_ref.author).hostname;
1401                    metrics
1402                        .synchronizer_requested_blocks_by_authority
1403                        .with_label_values(&[block_hostname.as_str(), label])
1404                        .inc();
1405                }
1406                request_futures.push(Self::fetch_blocks_request(
1407                    network_client.clone(),
1408                    peer,
1409                    blocks_guard,
1410                    highest_rounds.clone(),
1411                    FETCH_REQUEST_TIMEOUT,
1412                    1,
1413                ));
1414            }
1415        }
1416
1417        let mut results = Vec::new();
1418        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1419
1420        tokio::pin!(fetcher_timeout);
1421
1422        loop {
1423            tokio::select! {
1424                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1425                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1426                    match response {
1427                        Ok(fetched_blocks) => {
1428                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1429                            results.push((blocks_guard, fetched_blocks, peer_index));
1430
1431                            // no more pending requests are left, just break the loop
1432                            if request_futures.is_empty() {
1433                                break;
1434                            }
1435                        },
1436                        Err(_) => {
1437                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1438                            // try again if there is any peer left
1439                            if let Some(next_peer) = remaining_peers.next() {
1440                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1441                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1442                                    info!(
1443                                        "Retrying syncing {} missing blocks from peer {}: {}",
1444                                        blocks_guard.block_refs.len(),
1445                                        peer_hostname,
1446                                        blocks_guard.block_refs
1447                                            .iter()
1448                                            .map(|b| b.to_string())
1449                                            .collect::<Vec<_>>()
1450                                            .join(", ")
1451                                    );
1452                                    let block_refs = blocks_guard.block_refs.clone();
1453                                    // Record metrics about requested blocks
1454                                    let metrics = &context.metrics.node_metrics;
1455                                    metrics
1456                                        .synchronizer_requested_blocks_by_peer
1457                                        .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1458                                        .inc_by(block_refs.len() as u64);
1459                                    for block_ref in &block_refs {
1460                                        let block_hostname =
1461                                            &context.committee.authority(block_ref.author).hostname;
1462                                        metrics
1463                                            .synchronizer_requested_blocks_by_authority
1464                                            .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1465                                            .inc();
1466                                    }
1467                                    request_futures.push(Self::fetch_blocks_request(
1468                                        network_client.clone(),
1469                                        next_peer,
1470                                        blocks_guard,
1471                                        highest_rounds,
1472                                        FETCH_REQUEST_TIMEOUT,
1473                                        1,
1474                                    ));
1475                                } else {
1476                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1477                                }
1478                            } else {
1479                                debug!("No more peers left to fetch blocks");
1480                            }
1481                        }
1482                    }
1483                },
1484                _ = &mut fetcher_timeout => {
1485                    debug!("Timed out while fetching missing blocks");
1486                    break;
1487                }
1488            }
1489        }
1490
1491        results
1492    }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497    use std::{
1498        collections::{BTreeMap, BTreeSet},
1499        num::NonZeroUsize,
1500        sync::Arc,
1501        time::Duration,
1502    };
1503
1504    use async_trait::async_trait;
1505    use bytes::Bytes;
1506    use consensus_config::{AuthorityIndex, Parameters};
1507    use iota_metrics::monitored_mpsc;
1508    use lru::LruCache;
1509    use parking_lot::{Mutex as SyncMutex, RwLock};
1510    use tokio::{sync::Mutex, time::sleep};
1511
1512    use crate::{
1513        CommitDigest, CommitIndex,
1514        authority_service::COMMIT_LAG_MULTIPLIER,
1515        block::{BlockDigest, BlockRef, Round, SignedBlock, TestBlock, VerifiedBlock},
1516        block_verifier::{BlockVerifier, NoopBlockVerifier},
1517        commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1518        commit_vote_monitor::CommitVoteMonitor,
1519        context::Context,
1520        core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1521        dag_state::DagState,
1522        error::{ConsensusError, ConsensusResult},
1523        network::{BlockStream, NetworkClient},
1524        round_prober::QuorumRound,
1525        storage::mem_store::MemStore,
1526        synchronizer::{
1527            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, SyncMethod,
1528            Synchronizer, VERIFIED_BLOCKS_CACHE_CAP,
1529        },
1530    };
1531
1532    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1533    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1534    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1535    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1536
1537    // Mock verifier that always fails verification
1538    struct FailingBlockVerifier;
1539
1540    impl BlockVerifier for FailingBlockVerifier {
1541        fn verify(&self, _block: &SignedBlock) -> ConsensusResult<()> {
1542            Err(ConsensusError::WrongEpoch {
1543                expected: 1,
1544                actual: 0,
1545            })
1546        }
1547
1548        fn check_ancestors(
1549            &self,
1550            _block: &VerifiedBlock,
1551            _ancestors: &[Option<VerifiedBlock>],
1552            _gc_enabled: bool,
1553            _gc_round: Round,
1554        ) -> ConsensusResult<()> {
1555            Ok(())
1556        }
1557    }
1558
1559    #[derive(Default)]
1560    struct MockNetworkClient {
1561        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1562        fetch_latest_blocks_requests:
1563            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1564    }
1565
1566    impl MockNetworkClient {
1567        async fn stub_fetch_blocks(
1568            &self,
1569            blocks: Vec<VerifiedBlock>,
1570            peer: AuthorityIndex,
1571            latency: Option<Duration>,
1572        ) {
1573            let mut lock = self.fetch_blocks_requests.lock().await;
1574            let block_refs = blocks
1575                .iter()
1576                .map(|block| block.reference())
1577                .collect::<Vec<_>>();
1578            lock.insert((block_refs, peer), (blocks, latency));
1579        }
1580
1581        async fn stub_fetch_latest_blocks(
1582            &self,
1583            blocks: Vec<VerifiedBlock>,
1584            peer: AuthorityIndex,
1585            authorities: Vec<AuthorityIndex>,
1586            latency: Option<Duration>,
1587        ) {
1588            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1589            lock.entry((peer, authorities))
1590                .or_default()
1591                .push((blocks, latency));
1592        }
1593
1594        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1595            let lock = self.fetch_latest_blocks_requests.lock().await;
1596            lock.len()
1597        }
1598    }
1599
1600    #[async_trait]
1601    impl NetworkClient for MockNetworkClient {
1602        const SUPPORT_STREAMING: bool = false;
1603
1604        async fn send_block(
1605            &self,
1606            _peer: AuthorityIndex,
1607            _serialized_block: &VerifiedBlock,
1608            _timeout: Duration,
1609        ) -> ConsensusResult<()> {
1610            unimplemented!("Unimplemented")
1611        }
1612
1613        async fn subscribe_blocks(
1614            &self,
1615            _peer: AuthorityIndex,
1616            _last_received: Round,
1617            _timeout: Duration,
1618        ) -> ConsensusResult<BlockStream> {
1619            unimplemented!("Unimplemented")
1620        }
1621
1622        async fn fetch_blocks(
1623            &self,
1624            peer: AuthorityIndex,
1625            block_refs: Vec<BlockRef>,
1626            _highest_accepted_rounds: Vec<Round>,
1627            _timeout: Duration,
1628        ) -> ConsensusResult<Vec<Bytes>> {
1629            let mut lock = self.fetch_blocks_requests.lock().await;
1630            let response = lock
1631                .remove(&(block_refs, peer))
1632                .expect("Unexpected fetch blocks request made");
1633
1634            let serialised = response
1635                .0
1636                .into_iter()
1637                .map(|block| block.serialized().clone())
1638                .collect::<Vec<_>>();
1639
1640            drop(lock);
1641
1642            if let Some(latency) = response.1 {
1643                sleep(latency).await;
1644            }
1645
1646            Ok(serialised)
1647        }
1648
1649        async fn fetch_commits(
1650            &self,
1651            _peer: AuthorityIndex,
1652            _commit_range: CommitRange,
1653            _timeout: Duration,
1654        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1655            unimplemented!("Unimplemented")
1656        }
1657
1658        async fn fetch_latest_blocks(
1659            &self,
1660            peer: AuthorityIndex,
1661            authorities: Vec<AuthorityIndex>,
1662            _timeout: Duration,
1663        ) -> ConsensusResult<Vec<Bytes>> {
1664            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1665            let mut responses = lock
1666                .remove(&(peer, authorities.clone()))
1667                .expect("Unexpected fetch blocks request made");
1668
1669            let response = responses.remove(0);
1670            let serialised = response
1671                .0
1672                .into_iter()
1673                .map(|block| block.serialized().clone())
1674                .collect::<Vec<_>>();
1675
1676            if !responses.is_empty() {
1677                lock.insert((peer, authorities), responses);
1678            }
1679
1680            drop(lock);
1681
1682            if let Some(latency) = response.1 {
1683                sleep(latency).await;
1684            }
1685
1686            Ok(serialised)
1687        }
1688
1689        async fn get_latest_rounds(
1690            &self,
1691            _peer: AuthorityIndex,
1692            _timeout: Duration,
1693        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1694            unimplemented!("Unimplemented")
1695        }
1696    }
1697
1698    #[test]
1699    fn test_inflight_blocks_map() {
1700        // GIVEN
1701        let map = InflightBlocksMap::new();
1702        let some_block_refs = [
1703            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1704            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1705            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1706            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1707        ];
1708        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1709
1710        // Lock & unlock blocks
1711        {
1712            let mut all_guards = Vec::new();
1713
1714            // Try to acquire the block locks for authorities 1 & 2 (Periodic limit is 2)
1715            for i in 1..=2 {
1716                let authority = AuthorityIndex::new_for_test(i);
1717
1718                let guard =
1719                    map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1720                let guard = guard.expect("Guard should be created");
1721                assert_eq!(guard.block_refs.len(), 4);
1722
1723                all_guards.push(guard);
1724
1725                // trying to acquire any of them again will not succeed
1726                let guard =
1727                    map.lock_blocks(missing_block_refs.clone(), authority, SyncMethod::Periodic);
1728                assert!(guard.is_none());
1729            }
1730
1731            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1732            // number of allowed peers (Periodic limit is 2)
1733            let authority_3 = AuthorityIndex::new_for_test(3);
1734
1735            let guard = map.lock_blocks(
1736                missing_block_refs.clone(),
1737                authority_3,
1738                SyncMethod::Periodic,
1739            );
1740            assert!(guard.is_none());
1741
1742            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1743            // will now succeed
1744            drop(all_guards.remove(0));
1745
1746            let guard = map.lock_blocks(
1747                missing_block_refs.clone(),
1748                authority_3,
1749                SyncMethod::Periodic,
1750            );
1751            let guard = guard.expect("Guard should be successfully acquired");
1752
1753            assert_eq!(guard.block_refs, missing_block_refs);
1754
1755            // Dropping all guards should unlock on the block refs
1756            drop(guard);
1757            drop(all_guards);
1758
1759            assert_eq!(map.num_of_locked_blocks(), 0);
1760        }
1761
1762        // Swap locks
1763        {
1764            // acquire a lock for authority 1
1765            let authority_1 = AuthorityIndex::new_for_test(1);
1766            let guard = map
1767                .lock_blocks(
1768                    missing_block_refs.clone(),
1769                    authority_1,
1770                    SyncMethod::Periodic,
1771                )
1772                .unwrap();
1773
1774            // Now swap the locks for authority 2
1775            let authority_2 = AuthorityIndex::new_for_test(2);
1776            let guard = map.swap_locks(guard, authority_2);
1777
1778            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1779        }
1780    }
1781
1782    #[test]
1783    fn test_inflight_blocks_map_live_sync_limit() {
1784        // GIVEN
1785        let map = InflightBlocksMap::new();
1786        let some_block_refs = [
1787            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1788            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1789        ];
1790        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1791
1792        // WHEN authority 1 locks with Live sync
1793        let authority_1 = AuthorityIndex::new_for_test(1);
1794        let guard_1 = map
1795            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1796            .expect("Should successfully lock with Live sync");
1797
1798        assert_eq!(guard_1.block_refs.len(), 2);
1799
1800        // THEN authority 2 cannot lock with Live sync (limit of 1 reached)
1801        let authority_2 = AuthorityIndex::new_for_test(2);
1802        let guard_2 = map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1803
1804        assert!(
1805            guard_2.is_none(),
1806            "Should fail to lock - Live limit of 1 reached"
1807        );
1808
1809        // WHEN authority 1 releases the lock
1810        drop(guard_1);
1811
1812        // THEN authority 2 can now lock with Live sync
1813        let guard_2 = map
1814            .lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live)
1815            .expect("Should successfully lock after authority 1 released");
1816
1817        assert_eq!(guard_2.block_refs.len(), 2);
1818    }
1819
1820    #[test]
1821    fn test_inflight_blocks_map_periodic_allows_more_concurrency() {
1822        // GIVEN
1823        let map = InflightBlocksMap::new();
1824        let some_block_refs = [
1825            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1826            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1827        ];
1828        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1829
1830        // WHEN authority 1 locks with Periodic sync
1831        let authority_1 = AuthorityIndex::new_for_test(1);
1832        let guard_1 = map
1833            .lock_blocks(
1834                missing_block_refs.clone(),
1835                authority_1,
1836                SyncMethod::Periodic,
1837            )
1838            .expect("Should successfully lock with Periodic sync");
1839
1840        assert_eq!(guard_1.block_refs.len(), 2);
1841
1842        // THEN authority 2 can also lock with Periodic sync (limit is 2)
1843        let authority_2 = AuthorityIndex::new_for_test(2);
1844        let guard_2 = map
1845            .lock_blocks(
1846                missing_block_refs.clone(),
1847                authority_2,
1848                SyncMethod::Periodic,
1849            )
1850            .expect("Should successfully lock - Periodic allows 2 authorities");
1851
1852        assert_eq!(guard_2.block_refs.len(), 2);
1853
1854        // BUT authority 3 cannot lock with Periodic sync (limit of 2 reached)
1855        let authority_3 = AuthorityIndex::new_for_test(3);
1856        let guard_3 = map.lock_blocks(
1857            missing_block_refs.clone(),
1858            authority_3,
1859            SyncMethod::Periodic,
1860        );
1861
1862        assert!(
1863            guard_3.is_none(),
1864            "Should fail to lock - Periodic limit of 2 reached"
1865        );
1866
1867        // WHEN authority 1 releases the lock
1868        drop(guard_1);
1869
1870        // THEN authority 3 can now lock with Periodic sync
1871        let guard_3 = map
1872            .lock_blocks(
1873                missing_block_refs.clone(),
1874                authority_3,
1875                SyncMethod::Periodic,
1876            )
1877            .expect("Should successfully lock after authority 1 released");
1878
1879        assert_eq!(guard_3.block_refs.len(), 2);
1880    }
1881
1882    #[test]
1883    fn test_inflight_blocks_map_periodic_blocks_live_when_at_live_limit() {
1884        // GIVEN
1885        let map = InflightBlocksMap::new();
1886        let some_block_refs = [
1887            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1888            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1889        ];
1890        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1891
1892        // WHEN authority 1 locks with Periodic sync (total=1, at Live's limit)
1893        let authority_1 = AuthorityIndex::new_for_test(1);
1894        let guard_1 = map
1895            .lock_blocks(
1896                missing_block_refs.clone(),
1897                authority_1,
1898                SyncMethod::Periodic,
1899            )
1900            .expect("Should successfully lock with Periodic sync");
1901
1902        assert_eq!(guard_1.block_refs.len(), 2);
1903
1904        // THEN authority 2 cannot lock with Live sync (total already at Live limit of
1905        // 1)
1906        let authority_2 = AuthorityIndex::new_for_test(2);
1907        let guard_2_live =
1908            map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1909
1910        assert!(
1911            guard_2_live.is_none(),
1912            "Should fail to lock with Live - total already at Live limit of 1"
1913        );
1914
1915        // BUT authority 2 CAN lock with Periodic sync (total would be 2, at Periodic
1916        // limit)
1917        let guard_2_periodic = map
1918            .lock_blocks(
1919                missing_block_refs.clone(),
1920                authority_2,
1921                SyncMethod::Periodic,
1922            )
1923            .expect("Should successfully lock with Periodic - under Periodic limit of 2");
1924
1925        assert_eq!(guard_2_periodic.block_refs.len(), 2);
1926    }
1927
1928    #[test]
1929    fn test_inflight_blocks_map_live_then_periodic_interaction() {
1930        // GIVEN
1931        let map = InflightBlocksMap::new();
1932        let some_block_refs = [
1933            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1934            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1935        ];
1936        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1937
1938        // WHEN authority 1 locks with Live sync (total=1, at Live limit)
1939        let authority_1 = AuthorityIndex::new_for_test(1);
1940        let guard_1 = map
1941            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
1942            .expect("Should successfully lock with Live sync");
1943
1944        assert_eq!(guard_1.block_refs.len(), 2);
1945
1946        // THEN authority 2 cannot lock with Live sync (would exceed Live limit of 1)
1947        let authority_2 = AuthorityIndex::new_for_test(2);
1948        let guard_2_live =
1949            map.lock_blocks(missing_block_refs.clone(), authority_2, SyncMethod::Live);
1950
1951        assert!(
1952            guard_2_live.is_none(),
1953            "Should fail to lock with Live - would exceed Live limit of 1"
1954        );
1955
1956        // BUT authority 2 CAN lock with Periodic sync (total=2, at Periodic limit)
1957        let guard_2 = map
1958            .lock_blocks(
1959                missing_block_refs.clone(),
1960                authority_2,
1961                SyncMethod::Periodic,
1962            )
1963            .expect("Should successfully lock with Periodic - total 2 is at Periodic limit");
1964
1965        assert_eq!(guard_2.block_refs.len(), 2);
1966
1967        // AND authority 3 cannot lock with Periodic sync (would exceed Periodic limit
1968        // of 2)
1969        let authority_3 = AuthorityIndex::new_for_test(3);
1970        let guard_3 = map.lock_blocks(
1971            missing_block_refs.clone(),
1972            authority_3,
1973            SyncMethod::Periodic,
1974        );
1975
1976        assert!(
1977            guard_3.is_none(),
1978            "Should fail to lock with Periodic - would exceed Periodic limit of 2"
1979        );
1980    }
1981
1982    #[test]
1983    fn test_inflight_blocks_map_partial_locks_mixed_methods() {
1984        // GIVEN 4 blocks
1985        let map = InflightBlocksMap::new();
1986        let block_a = BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1987        let block_b = BlockRef::new(2, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1988        let block_c = BlockRef::new(3, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1989        let block_d = BlockRef::new(4, AuthorityIndex::new_for_test(0), BlockDigest::MIN);
1990
1991        // Lock block A with authority 1 using Live (A at limit for Live)
1992        let guard_a = map
1993            .lock_blocks(
1994                [block_a].into(),
1995                AuthorityIndex::new_for_test(1),
1996                SyncMethod::Live,
1997            )
1998            .expect("Should lock block A");
1999        assert_eq!(guard_a.block_refs.len(), 1);
2000
2001        // Lock block B with authorities 1 & 2 using Periodic (B at limit for Periodic)
2002        let guard_b1 = map
2003            .lock_blocks(
2004                [block_b].into(),
2005                AuthorityIndex::new_for_test(1),
2006                SyncMethod::Periodic,
2007            )
2008            .expect("Should lock block B");
2009        let guard_b2 = map
2010            .lock_blocks(
2011                [block_b].into(),
2012                AuthorityIndex::new_for_test(2),
2013                SyncMethod::Periodic,
2014            )
2015            .expect("Should lock block B again");
2016        assert_eq!(guard_b1.block_refs.len(), 1);
2017        assert_eq!(guard_b2.block_refs.len(), 1);
2018
2019        // Lock block C with authority 1 using Periodic (C has 1 lock)
2020        let guard_c = map
2021            .lock_blocks(
2022                [block_c].into(),
2023                AuthorityIndex::new_for_test(1),
2024                SyncMethod::Periodic,
2025            )
2026            .expect("Should lock block C");
2027        assert_eq!(guard_c.block_refs.len(), 1);
2028
2029        // Block D is unlocked
2030
2031        // WHEN authority 3 requests all 4 blocks with Periodic
2032        let all_blocks = [block_a, block_b, block_c, block_d].into();
2033        let guard_3 = map
2034            .lock_blocks(
2035                all_blocks,
2036                AuthorityIndex::new_for_test(3),
2037                SyncMethod::Periodic,
2038            )
2039            .expect("Should get partial lock");
2040
2041        // THEN should successfully lock C and D only
2042        // - A: total=1 (at Live limit), authority 3 can still add since using Periodic
2043        //   and total < 2
2044        // - B: total=2 (at Periodic limit), cannot lock
2045        // - C: total=1, can lock (under limit)
2046        // - D: total=0, can lock
2047        assert_eq!(
2048            guard_3.block_refs.len(),
2049            3,
2050            "Should lock blocks A, C, and D"
2051        );
2052        assert!(
2053            guard_3.block_refs.contains(&block_a),
2054            "Should contain block A"
2055        );
2056        assert!(
2057            !guard_3.block_refs.contains(&block_b),
2058            "Should NOT contain block B (at limit)"
2059        );
2060        assert!(
2061            guard_3.block_refs.contains(&block_c),
2062            "Should contain block C"
2063        );
2064        assert!(
2065            guard_3.block_refs.contains(&block_d),
2066            "Should contain block D"
2067        );
2068    }
2069
2070    #[test]
2071    fn test_inflight_blocks_map_swap_locks_preserves_method() {
2072        // GIVEN
2073        let map = InflightBlocksMap::new();
2074        let some_block_refs = [
2075            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2076            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
2077        ];
2078        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
2079
2080        // WHEN authority 1 locks with Live sync
2081        let authority_1 = AuthorityIndex::new_for_test(1);
2082        let guard_1 = map
2083            .lock_blocks(missing_block_refs.clone(), authority_1, SyncMethod::Live)
2084            .expect("Should lock with Live sync");
2085
2086        assert_eq!(guard_1.block_refs.len(), 2);
2087
2088        // AND we swap to authority 2
2089        let authority_2 = AuthorityIndex::new_for_test(2);
2090        let guard_2 = map
2091            .swap_locks(guard_1, authority_2)
2092            .expect("Should swap locks");
2093
2094        // THEN the new guard should preserve the block refs
2095        assert_eq!(guard_2.block_refs, missing_block_refs);
2096
2097        // AND authority 3 cannot lock with Live sync (limit of 1 reached)
2098        let authority_3 = AuthorityIndex::new_for_test(3);
2099        let guard_3 = map.lock_blocks(missing_block_refs.clone(), authority_3, SyncMethod::Live);
2100        assert!(guard_3.is_none(), "Should fail - Live limit reached");
2101
2102        // BUT authority 3 CAN lock with Periodic sync
2103        let guard_3_periodic = map
2104            .lock_blocks(
2105                missing_block_refs.clone(),
2106                authority_3,
2107                SyncMethod::Periodic,
2108            )
2109            .expect("Should lock with Periodic");
2110        assert_eq!(guard_3_periodic.block_refs.len(), 2);
2111    }
2112
2113    #[tokio::test]
2114    async fn test_process_fetched_blocks() {
2115        // GIVEN
2116        let (context, _) = Context::new_for_test(4);
2117        let context = Arc::new(context);
2118        let block_verifier = Arc::new(NoopBlockVerifier {});
2119        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2120        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2121        let (commands_sender, _commands_receiver) =
2122            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2123
2124        // Create input test blocks:
2125        // - Authority 0 block at round 60.
2126        // - Authority 1 blocks from round 30 to 93.
2127        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2128        expected_blocks.extend(
2129            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2130        );
2131        assert_eq!(
2132            expected_blocks.len(),
2133            context.parameters.max_blocks_per_sync
2134        );
2135
2136        let expected_serialized_blocks = expected_blocks
2137            .iter()
2138            .map(|b| b.serialized().clone())
2139            .collect::<Vec<_>>();
2140
2141        let expected_block_refs = expected_blocks
2142            .iter()
2143            .map(|b| b.reference())
2144            .collect::<BTreeSet<_>>();
2145
2146        // GIVEN peer to fetch blocks from
2147        let peer_index = AuthorityIndex::new_for_test(2);
2148
2149        // Create blocks_guard
2150        let inflight_blocks_map = InflightBlocksMap::new();
2151        let blocks_guard = inflight_blocks_map
2152            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2153            .expect("Failed to lock blocks");
2154
2155        assert_eq!(
2156            inflight_blocks_map.num_of_locked_blocks(),
2157            expected_block_refs.len()
2158        );
2159
2160        // Create a Synchronizer
2161        let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2162            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2163        )));
2164        let result = Synchronizer::<
2165            MockNetworkClient,
2166            NoopBlockVerifier,
2167            MockCoreThreadDispatcher,
2168        >::process_fetched_blocks(
2169            expected_serialized_blocks,
2170            peer_index,
2171            blocks_guard, // The guard is consumed here
2172            core_dispatcher.clone(),
2173            block_verifier,
2174            verified_cache,
2175            commit_vote_monitor,
2176            context.clone(),
2177            commands_sender,
2178            "test",
2179        )
2180            .await;
2181
2182        // THEN
2183        assert!(result.is_ok());
2184
2185        // Check blocks were sent to core
2186        let added_blocks = core_dispatcher.get_add_blocks().await;
2187        assert_eq!(
2188            added_blocks
2189                .iter()
2190                .map(|b| b.reference())
2191                .collect::<BTreeSet<_>>(),
2192            expected_block_refs,
2193        );
2194
2195        // Check blocks were unlocked
2196        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2197    }
2198
2199    #[tokio::test]
2200    async fn test_process_fetched_blocks_duplicates() {
2201        // GIVEN
2202        let (context, _) = Context::new_for_test(4);
2203        let context = Arc::new(context);
2204        let block_verifier = Arc::new(NoopBlockVerifier {});
2205        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2206        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2207        let (commands_sender, _commands_receiver) =
2208            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2209
2210        // Create input test blocks:
2211        // - Authority 0 block at round 60.
2212        // - Authority 1 blocks from round 30 to 60.
2213        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2214        expected_blocks.extend(
2215            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2216        );
2217        assert_eq!(
2218            expected_blocks.len(),
2219            context.parameters.max_blocks_per_sync
2220        );
2221
2222        let expected_serialized_blocks = expected_blocks
2223            .iter()
2224            .map(|b| b.serialized().clone())
2225            .collect::<Vec<_>>();
2226
2227        let expected_block_refs = expected_blocks
2228            .iter()
2229            .map(|b| b.reference())
2230            .collect::<BTreeSet<_>>();
2231
2232        // GIVEN peer to fetch blocks from
2233        let peer_index = AuthorityIndex::new_for_test(2);
2234
2235        // Create blocks_guard
2236        let inflight_blocks_map = InflightBlocksMap::new();
2237        let blocks_guard = inflight_blocks_map
2238            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2239            .expect("Failed to lock blocks");
2240
2241        assert_eq!(
2242            inflight_blocks_map.num_of_locked_blocks(),
2243            expected_block_refs.len()
2244        );
2245
2246        // Create a shared LruCache that will be reused to verify duplicate prevention
2247        let verified_cache = Arc::new(SyncMutex::new(LruCache::new(
2248            NonZeroUsize::new(VERIFIED_BLOCKS_CACHE_CAP).unwrap(),
2249        )));
2250
2251        // WHEN process fetched blocks for the first time
2252        let result = Synchronizer::<
2253            MockNetworkClient,
2254            NoopBlockVerifier,
2255            MockCoreThreadDispatcher,
2256        >::process_fetched_blocks(
2257            expected_serialized_blocks.clone(),
2258            peer_index,
2259            blocks_guard,
2260            core_dispatcher.clone(),
2261            block_verifier.clone(),
2262            verified_cache.clone(),
2263            commit_vote_monitor.clone(),
2264            context.clone(),
2265            commands_sender.clone(),
2266            "test",
2267        )
2268        .await;
2269
2270        // THEN
2271        assert!(result.is_ok());
2272
2273        // Check blocks were sent to core
2274        let added_blocks = core_dispatcher.get_add_blocks().await;
2275        assert_eq!(
2276            added_blocks
2277                .iter()
2278                .map(|b| b.reference())
2279                .collect::<BTreeSet<_>>(),
2280            expected_block_refs,
2281        );
2282
2283        // Check blocks were unlocked
2284        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2285
2286        // PART 2: Verify LruCache prevents duplicate processing
2287        // Try to process the same blocks again (simulating duplicate fetch)
2288        let blocks_guard_second = inflight_blocks_map
2289            .lock_blocks(expected_block_refs.clone(), peer_index, SyncMethod::Live)
2290            .expect("Failed to lock blocks for second call");
2291
2292        let result_second = Synchronizer::<
2293            MockNetworkClient,
2294            NoopBlockVerifier,
2295            MockCoreThreadDispatcher,
2296        >::process_fetched_blocks(
2297            expected_serialized_blocks,
2298            peer_index,
2299            blocks_guard_second,
2300            core_dispatcher.clone(),
2301            block_verifier,
2302            verified_cache.clone(),
2303            commit_vote_monitor,
2304            context.clone(),
2305            commands_sender,
2306            "test",
2307        )
2308        .await;
2309
2310        assert!(result_second.is_ok());
2311
2312        // Verify NO blocks were sent to core on the second call
2313        // because they were already in the LruCache
2314        let added_blocks_second_call = core_dispatcher.get_add_blocks().await;
2315        assert!(
2316            added_blocks_second_call.is_empty(),
2317            "Expected no blocks to be added on second call due to LruCache, but got {} blocks",
2318            added_blocks_second_call.len()
2319        );
2320
2321        // Verify the cache contains all the block digests
2322        let cache_size = verified_cache.lock().len();
2323        assert_eq!(
2324            cache_size,
2325            expected_block_refs.len(),
2326            "Expected {} entries in the LruCache, but got {}",
2327            expected_block_refs.len(),
2328            cache_size
2329        );
2330    }
2331
2332    #[tokio::test]
2333    async fn test_successful_fetch_blocks_from_peer() {
2334        // GIVEN
2335        let (context, _) = Context::new_for_test(4);
2336        let context = Arc::new(context);
2337        let block_verifier = Arc::new(NoopBlockVerifier {});
2338        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2339        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2340        let network_client = Arc::new(MockNetworkClient::default());
2341        let store = Arc::new(MemStore::new());
2342        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2343
2344        let handle = Synchronizer::start(
2345            network_client.clone(),
2346            context,
2347            core_dispatcher.clone(),
2348            commit_vote_monitor,
2349            block_verifier,
2350            dag_state,
2351            false,
2352        );
2353
2354        // Create some test blocks
2355        let expected_blocks = (0..10)
2356            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2357            .collect::<Vec<_>>();
2358        let missing_blocks = expected_blocks
2359            .iter()
2360            .map(|block| block.reference())
2361            .collect::<BTreeSet<_>>();
2362
2363        // AND stub the fetch_blocks request from peer 1
2364        let peer = AuthorityIndex::new_for_test(1);
2365        network_client
2366            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
2367            .await;
2368
2369        // WHEN request missing blocks from peer 1
2370        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2371
2372        // Wait a little bit until those have been added in core
2373        sleep(Duration::from_millis(1_000)).await;
2374
2375        // THEN ensure those ended up in Core
2376        let added_blocks = core_dispatcher.get_add_blocks().await;
2377        assert_eq!(added_blocks, expected_blocks);
2378    }
2379
2380    #[tokio::test]
2381    async fn saturate_fetch_blocks_from_peer() {
2382        // GIVEN
2383        let (context, _) = Context::new_for_test(4);
2384        let context = Arc::new(context);
2385        let block_verifier = Arc::new(NoopBlockVerifier {});
2386        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2387        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2388        let network_client = Arc::new(MockNetworkClient::default());
2389        let store = Arc::new(MemStore::new());
2390        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2391
2392        let handle = Synchronizer::start(
2393            network_client.clone(),
2394            context,
2395            core_dispatcher.clone(),
2396            commit_vote_monitor,
2397            block_verifier,
2398            dag_state,
2399            false,
2400        );
2401
2402        // Create some test blocks
2403        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
2404            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
2405            .collect::<Vec<_>>();
2406
2407        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
2408        let peer = AuthorityIndex::new_for_test(1);
2409        let mut iter = expected_blocks.iter().peekable();
2410        while let Some(block) = iter.next() {
2411            // stub the fetch_blocks request from peer 1 and give some high response latency
2412            // so requests can start blocking the peer task.
2413            network_client
2414                .stub_fetch_blocks(
2415                    vec![block.clone()],
2416                    peer,
2417                    Some(Duration::from_millis(5_000)),
2418                )
2419                .await;
2420
2421            let mut missing_blocks = BTreeSet::new();
2422            missing_blocks.insert(block.reference());
2423
2424            // WHEN requesting to fetch the blocks, it should not succeed for the last
2425            // request and get an error with "saturated" synchronizer
2426            if iter.peek().is_none() {
2427                match handle.fetch_blocks(missing_blocks, peer).await {
2428                    Err(ConsensusError::SynchronizerSaturated(index, _)) => {
2429                        assert_eq!(index, peer);
2430                    }
2431                    _ => panic!("A saturated synchronizer error was expected"),
2432                }
2433            } else {
2434                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
2435            }
2436        }
2437    }
2438
2439    #[tokio::test(flavor = "current_thread", start_paused = true)]
2440    async fn synchronizer_periodic_task_fetch_blocks() {
2441        // GIVEN
2442        let (context, _) = Context::new_for_test(4);
2443        let context = Arc::new(context);
2444        let block_verifier = Arc::new(NoopBlockVerifier {});
2445        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2446        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2447        let network_client = Arc::new(MockNetworkClient::default());
2448        let store = Arc::new(MemStore::new());
2449        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2450
2451        // Create some test blocks
2452        let expected_blocks = (0..10)
2453            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2454            .collect::<Vec<_>>();
2455        let missing_blocks = expected_blocks
2456            .iter()
2457            .map(|block| block.reference())
2458            .collect::<BTreeSet<_>>();
2459
2460        // AND stub the missing blocks
2461        core_dispatcher
2462            .stub_missing_blocks(missing_blocks.clone())
2463            .await;
2464
2465        // AND stub the requests for authority 1 & 2
2466        // Make the first authority timeout, so the second will be called. "We" are
2467        // authority = 0, so we are skipped anyways.
2468        network_client
2469            .stub_fetch_blocks(
2470                expected_blocks.clone(),
2471                AuthorityIndex::new_for_test(1),
2472                Some(FETCH_REQUEST_TIMEOUT),
2473            )
2474            .await;
2475        network_client
2476            .stub_fetch_blocks(
2477                expected_blocks.clone(),
2478                AuthorityIndex::new_for_test(2),
2479                None,
2480            )
2481            .await;
2482
2483        // WHEN start the synchronizer and wait for a couple of seconds
2484        let _handle = Synchronizer::start(
2485            network_client.clone(),
2486            context,
2487            core_dispatcher.clone(),
2488            commit_vote_monitor,
2489            block_verifier,
2490            dag_state,
2491            false,
2492        );
2493
2494        sleep(8 * FETCH_REQUEST_TIMEOUT).await;
2495
2496        // THEN the missing blocks should now be fetched and added to core
2497        let added_blocks = core_dispatcher.get_add_blocks().await;
2498        assert_eq!(added_blocks, expected_blocks);
2499
2500        // AND missing blocks should have been consumed by the stub
2501        assert!(
2502            core_dispatcher
2503                .get_missing_blocks()
2504                .await
2505                .unwrap()
2506                .is_empty()
2507        );
2508    }
2509
2510    #[tokio::test(flavor = "current_thread", start_paused = true)]
2511    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2512        // GIVEN
2513        let (mut context, _) = Context::new_for_test(4);
2514        context
2515            .protocol_config
2516            .set_consensus_batched_block_sync_for_testing(true);
2517        let context = Arc::new(context);
2518        let block_verifier = Arc::new(NoopBlockVerifier {});
2519        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2520        let network_client = Arc::new(MockNetworkClient::default());
2521        let store = Arc::new(MemStore::new());
2522        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2523        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2524
2525        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
2526        // that are above the sync threshold.
2527        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2528        let stub_blocks = (sync_missing_block_round_threshold * 2
2529            ..sync_missing_block_round_threshold * 2
2530                + context.parameters.max_blocks_per_sync as u32)
2531            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2532            .collect::<Vec<_>>();
2533        let missing_blocks = stub_blocks
2534            .iter()
2535            .map(|block| block.reference())
2536            .collect::<BTreeSet<_>>();
2537        core_dispatcher
2538            .stub_missing_blocks(missing_blocks.clone())
2539            .await;
2540        // AND stub the requests for authority 1 & 2
2541        // Make the first authority timeout, so the second will be called. "We" are
2542        // authority = 0, so we are skipped anyways.
2543        let mut expected_blocks = stub_blocks
2544            .iter()
2545            .take(context.parameters.max_blocks_per_sync)
2546            .cloned()
2547            .collect::<Vec<_>>();
2548        network_client
2549            .stub_fetch_blocks(
2550                expected_blocks.clone(),
2551                AuthorityIndex::new_for_test(1),
2552                Some(FETCH_REQUEST_TIMEOUT),
2553            )
2554            .await;
2555        network_client
2556            .stub_fetch_blocks(
2557                expected_blocks.clone(),
2558                AuthorityIndex::new_for_test(2),
2559                None,
2560            )
2561            .await;
2562
2563        // Now create some blocks to simulate a commit lag
2564        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2565        let commit_index: CommitIndex = round - 1;
2566        let blocks = (0..4)
2567            .map(|authority| {
2568                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2569                let block = TestBlock::new(round, authority)
2570                    .set_commit_votes(commit_votes)
2571                    .build();
2572
2573                VerifiedBlock::new_for_test(block)
2574            })
2575            .collect::<Vec<_>>();
2576
2577        // Pass them through the commit vote monitor - so now there will be a big commit
2578        // lag to prevent the scheduled synchronizer from running
2579        for block in blocks {
2580            commit_vote_monitor.observe_block(&block);
2581        }
2582
2583        // Start the synchronizer and wait for a couple of seconds where normally
2584        // the synchronizer should have kicked in.
2585        let _handle = Synchronizer::start(
2586            network_client.clone(),
2587            context.clone(),
2588            core_dispatcher.clone(),
2589            commit_vote_monitor.clone(),
2590            block_verifier,
2591            dag_state.clone(),
2592            false,
2593        );
2594
2595        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2596
2597        // Since we should be in commit lag mode none of the missed blocks should have
2598        // been fetched - hence nothing should be sent to core for processing.
2599        let added_blocks = core_dispatcher.get_add_blocks().await;
2600        assert_eq!(added_blocks, vec![]);
2601
2602        println!("Before advancing");
2603        // AND advance now the local commit index by adding a new commit that matches
2604        // the commit index of quorum
2605        {
2606            let mut d = dag_state.write();
2607            for index in 1..=commit_index {
2608                let commit =
2609                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2610
2611                d.add_commit(commit);
2612            }
2613
2614            println!("Once advanced");
2615            assert_eq!(
2616                d.last_commit_index(),
2617                commit_vote_monitor.quorum_commit_index()
2618            );
2619        }
2620
2621        // Now stub again the missing blocks to fetch the exact same ones.
2622        core_dispatcher
2623            .stub_missing_blocks(missing_blocks.clone())
2624            .await;
2625
2626        println!("Final sleep");
2627        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2628
2629        // THEN the missing blocks should now be fetched and added to core
2630        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2631        println!("Final await");
2632        added_blocks.sort_by_key(|block| block.reference());
2633        expected_blocks.sort_by_key(|block| block.reference());
2634
2635        assert_eq!(added_blocks, expected_blocks);
2636    }
2637
2638    #[tokio::test(flavor = "current_thread", start_paused = true)]
2639    async fn synchronizer_fetch_own_last_block() {
2640        // GIVEN
2641        let (context, _) = Context::new_for_test(4);
2642        let context = Arc::new(context.with_parameters(Parameters {
2643            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2644            ..Default::default()
2645        }));
2646        let block_verifier = Arc::new(NoopBlockVerifier {});
2647        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2648        let network_client = Arc::new(MockNetworkClient::default());
2649        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2650        let store = Arc::new(MemStore::new());
2651        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2652        let our_index = AuthorityIndex::new_for_test(0);
2653
2654        // Create some test blocks
2655        let mut expected_blocks = (8..=10)
2656            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2657            .collect::<Vec<_>>();
2658
2659        // Now set different latest blocks for the peers
2660        // For peer 1 we give the block of round 10 (highest)
2661        let block_1 = expected_blocks.pop().unwrap();
2662        network_client
2663            .stub_fetch_latest_blocks(
2664                vec![block_1.clone()],
2665                AuthorityIndex::new_for_test(1),
2666                vec![our_index],
2667                Some(Duration::from_secs(10)),
2668            )
2669            .await;
2670        network_client
2671            .stub_fetch_latest_blocks(
2672                vec![block_1],
2673                AuthorityIndex::new_for_test(1),
2674                vec![our_index],
2675                None,
2676            )
2677            .await;
2678
2679        // For peer 2 we give the block of round 9
2680        let block_2 = expected_blocks.pop().unwrap();
2681        network_client
2682            .stub_fetch_latest_blocks(
2683                vec![block_2.clone()],
2684                AuthorityIndex::new_for_test(2),
2685                vec![our_index],
2686                Some(Duration::from_secs(10)),
2687            )
2688            .await;
2689        network_client
2690            .stub_fetch_latest_blocks(
2691                vec![block_2],
2692                AuthorityIndex::new_for_test(2),
2693                vec![our_index],
2694                None,
2695            )
2696            .await;
2697
2698        // For peer 3 we give a block with lowest round
2699        let block_3 = expected_blocks.pop().unwrap();
2700        network_client
2701            .stub_fetch_latest_blocks(
2702                vec![block_3.clone()],
2703                AuthorityIndex::new_for_test(3),
2704                vec![our_index],
2705                Some(Duration::from_secs(10)),
2706            )
2707            .await;
2708        network_client
2709            .stub_fetch_latest_blocks(
2710                vec![block_3],
2711                AuthorityIndex::new_for_test(3),
2712                vec![our_index],
2713                None,
2714            )
2715            .await;
2716
2717        // WHEN start the synchronizer and wait for a couple of seconds
2718        let handle = Synchronizer::start(
2719            network_client.clone(),
2720            context.clone(),
2721            core_dispatcher.clone(),
2722            commit_vote_monitor,
2723            block_verifier,
2724            dag_state,
2725            true,
2726        );
2727
2728        // Wait at least for the timeout time
2729        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2730
2731        // Assert that core has been called to set the min propose round
2732        assert_eq!(
2733            core_dispatcher.get_last_own_proposed_round().await,
2734            vec![10]
2735        );
2736
2737        // Ensure that all the requests have been called
2738        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2739
2740        // And we got one retry
2741        assert_eq!(
2742            context
2743                .metrics
2744                .node_metrics
2745                .sync_last_known_own_block_retries
2746                .get(),
2747            1
2748        );
2749
2750        // Ensure that no panic occurred
2751        if let Err(err) = handle.stop().await {
2752            if err.is_panic() {
2753                std::panic::resume_unwind(err.into_panic());
2754            }
2755        }
2756    }
2757    #[derive(Default)]
2758    struct SyncMockDispatcher {
2759        missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2760        added_blocks: Mutex<Vec<VerifiedBlock>>,
2761    }
2762
2763    #[async_trait::async_trait]
2764    impl CoreThreadDispatcher for SyncMockDispatcher {
2765        async fn get_missing_blocks(
2766            &self,
2767        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2768            Ok(self.missing_blocks.lock().await.clone())
2769        }
2770        async fn add_blocks(
2771            &self,
2772            blocks: Vec<VerifiedBlock>,
2773        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2774            let mut guard = self.added_blocks.lock().await;
2775            guard.extend(blocks.clone());
2776            Ok(blocks.iter().map(|b| b.reference()).collect())
2777        }
2778
2779        // Stub out the remaining CoreThreadDispatcher methods with defaults:
2780
2781        async fn check_block_refs(
2782            &self,
2783            block_refs: Vec<BlockRef>,
2784        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2785            // Echo back the requested refs by default
2786            Ok(block_refs.into_iter().collect())
2787        }
2788
2789        async fn add_certified_commits(
2790            &self,
2791            _commits: CertifiedCommits,
2792        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2793            // No additional certified-commit logic in tests
2794            Ok(BTreeSet::new())
2795        }
2796
2797        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2798            Ok(())
2799        }
2800
2801        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2802            Ok(())
2803        }
2804
2805        fn set_propagation_delay_and_quorum_rounds(
2806            &self,
2807            _delay: Round,
2808            _received_quorum_rounds: Vec<QuorumRound>,
2809            _accepted_quorum_rounds: Vec<QuorumRound>,
2810        ) -> Result<(), CoreError> {
2811            Ok(())
2812        }
2813
2814        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2815            Ok(())
2816        }
2817
2818        fn highest_received_rounds(&self) -> Vec<Round> {
2819            Vec::new()
2820        }
2821    }
2822
2823    #[tokio::test(flavor = "current_thread")]
2824    async fn known_before_random_peer_fetch() {
2825        {
2826            // 1) Setup 10‐node context and in‐mem DAG
2827            let (ctx, _) = Context::new_for_test(10);
2828            let context = Arc::new(ctx);
2829            let store = Arc::new(MemStore::new());
2830            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2831            let inflight = InflightBlocksMap::new();
2832
2833            // 2) One missing block
2834            let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2835            let missing_ref = missing_vb.reference();
2836            let missing_blocks = BTreeMap::from([(
2837                missing_ref,
2838                BTreeSet::from([
2839                    AuthorityIndex::new_for_test(2),
2840                    AuthorityIndex::new_for_test(3),
2841                    AuthorityIndex::new_for_test(4),
2842                ]),
2843            )]);
2844
2845            // 3) Prepare mocks and stubs
2846            let network_client = Arc::new(MockNetworkClient::default());
2847            // Stub *all*  authorities so none panic:
2848            for i in 1..=9 {
2849                let peer = AuthorityIndex::new_for_test(i);
2850                if i == 1 || i == 4 {
2851                    network_client
2852                        .stub_fetch_blocks(
2853                            vec![missing_vb.clone()],
2854                            peer,
2855                            Some(2 * FETCH_REQUEST_TIMEOUT),
2856                        )
2857                        .await;
2858                    continue;
2859                }
2860                network_client
2861                    .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2862                    .await;
2863            }
2864
2865            // 4) Invoke knowledge-based fetch and random fallback selection
2866            //    deterministically
2867            let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2868        ::fetch_blocks_from_authorities(
2869            context.clone(),
2870            inflight.clone(),
2871            network_client.clone(),
2872            missing_blocks,
2873            dag_state.clone(),
2874        )
2875            .await;
2876
2877            // 5) Assert we got exactly two fetches - two from the first two who are aware
2878            //    of the missing block (authority 2 and 3)
2879            assert_eq!(results.len(), 2);
2880
2881            // 6) The  knowledge-based‐fetch went to peer 2 and 3
2882            let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2883            assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2884            let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2885            assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2886            // 7) Verify the returned bytes correspond to that block
2887            let expected = missing_vb.serialized().clone();
2888            assert_eq!(hot_bytes, &vec![expected]);
2889        }
2890    }
2891
2892    #[tokio::test(flavor = "current_thread")]
2893    async fn known_before_periodic_peer_fetch_larger_scenario() {
2894        use std::{
2895            collections::{BTreeMap, BTreeSet},
2896            sync::Arc,
2897        };
2898
2899        use parking_lot::RwLock;
2900
2901        use crate::{
2902            block::{Round, TestBlock, VerifiedBlock},
2903            context::Context,
2904        };
2905
2906        // 1) Setup a 10-node context, in-memory DAG, and inflight map
2907        let (ctx, _) = Context::new_for_test(10);
2908        let context = Arc::new(ctx);
2909        let store = Arc::new(MemStore::new());
2910        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2911        let inflight = InflightBlocksMap::new();
2912        let network_client = Arc::new(MockNetworkClient::default());
2913
2914        // 2) Create 1000 missing blocks known by authorities 0, 2, and 3
2915        let mut missing_blocks = BTreeMap::new();
2916        let mut missing_vbs = Vec::new();
2917        let known_number_blocks = 10;
2918        for i in 0..1000 {
2919            let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2920            let r = vb.reference();
2921            if i < known_number_blocks {
2922                // First 10 blocks are known by authorities 0, 2
2923                missing_blocks.insert(
2924                    r,
2925                    BTreeSet::from([
2926                        AuthorityIndex::new_for_test(0),
2927                        AuthorityIndex::new_for_test(2),
2928                    ]),
2929                );
2930            } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2931                // Second 10 blocks are known by authorities 0, 3
2932                missing_blocks.insert(
2933                    r,
2934                    BTreeSet::from([
2935                        AuthorityIndex::new_for_test(0),
2936                        AuthorityIndex::new_for_test(3),
2937                    ]),
2938                );
2939            } else {
2940                // The rest are only known by authority 0
2941                missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2942            }
2943            missing_vbs.push(vb);
2944        }
2945
2946        // 3) Stub fetches for knowledge-based peers (2 and 3)
2947        let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2948        let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2949            .iter()
2950            .map(|&peer| {
2951                let vbs = missing_vbs
2952                    .iter()
2953                    .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2954                    .take(context.parameters.max_blocks_per_sync)
2955                    .cloned()
2956                    .collect::<Vec<_>>();
2957                (peer, vbs)
2958            })
2959            .collect();
2960
2961        for (peer, vbs) in known_vbs_by_peer {
2962            if peer == AuthorityIndex::new_for_test(2) {
2963                // Simulate timeout for peer 2, then fallback to peer 5
2964                network_client
2965                    .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2966                    .await;
2967                network_client
2968                    .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2969                    .await;
2970            } else {
2971                network_client
2972                    .stub_fetch_blocks(vbs.clone(), peer, None)
2973                    .await;
2974            }
2975        }
2976
2977        // 4) Stub fetches from periodic path peers (1 and 4)
2978        network_client
2979            .stub_fetch_blocks(
2980                missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2981                AuthorityIndex::new_for_test(1),
2982                None,
2983            )
2984            .await;
2985
2986        network_client
2987            .stub_fetch_blocks(
2988                missing_vbs[context.parameters.max_blocks_per_sync
2989                    ..2 * context.parameters.max_blocks_per_sync]
2990                    .to_vec(),
2991                AuthorityIndex::new_for_test(4),
2992                None,
2993            )
2994            .await;
2995
2996        // 5) Execute the fetch logic
2997        let results = Synchronizer::<
2998            MockNetworkClient,
2999            NoopBlockVerifier,
3000            SyncMockDispatcher,
3001        >::fetch_blocks_from_authorities(
3002            context.clone(),
3003            inflight.clone(),
3004            network_client.clone(),
3005            missing_blocks,
3006            dag_state.clone(),
3007        )
3008            .await;
3009
3010        // 6) Assert we got 4 fetches: peer 2 (timed out), fallback to 5, then periodic
3011        //    from 1 and 4
3012        assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
3013
3014        // 7) First fetch from peer 3 (knowledge-based)
3015        let (_guard3, bytes3, peer3) = &results[0];
3016        assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
3017        let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
3018            .iter()
3019            .map(|vb| vb.serialized().clone())
3020            .collect::<Vec<_>>();
3021        assert_eq!(bytes3, &expected2);
3022
3023        // 8) Second fetch from peer 1 (periodic)
3024        let (_guard1, bytes1, peer1) = &results[1];
3025        assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
3026        let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
3027            .iter()
3028            .map(|vb| vb.serialized().clone())
3029            .collect::<Vec<_>>();
3030        assert_eq!(bytes1, &expected1);
3031
3032        // 9) Third fetch from peer 4 (periodic)
3033        let (_guard4, bytes4, peer4) = &results[2];
3034        assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
3035        let expected4 = missing_vbs
3036            [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
3037            .iter()
3038            .map(|vb| vb.serialized().clone())
3039            .collect::<Vec<_>>();
3040        assert_eq!(bytes4, &expected4);
3041
3042        // 10) Fourth fetch from peer 5 (fallback after peer 2 timeout)
3043        let (_guard5, bytes5, peer5) = &results[3];
3044        assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
3045        let expected5 = missing_vbs[0..known_number_blocks]
3046            .iter()
3047            .map(|vb| vb.serialized().clone())
3048            .collect::<Vec<_>>();
3049        assert_eq!(bytes5, &expected5);
3050    }
3051
3052    #[tokio::test(flavor = "current_thread")]
3053    async fn test_verify_blocks_deduplication() {
3054        let (context, _keys) = Context::new_for_test(4);
3055        let context = Arc::new(context);
3056        let block_verifier = Arc::new(NoopBlockVerifier {});
3057        let failing_verifier = Arc::new(FailingBlockVerifier);
3058        let peer1 = AuthorityIndex::new_for_test(1);
3059        let peer2 = AuthorityIndex::new_for_test(2);
3060
3061        // Create cache with capacity of 5 for eviction testing
3062        let cache = Arc::new(SyncMutex::new(LruCache::new(NonZeroUsize::new(5).unwrap())));
3063
3064        // Test 1: Per-peer metric tracking
3065        let block1 = VerifiedBlock::new_for_test(TestBlock::new(10, 0).build());
3066        let serialized1 = vec![block1.serialized().clone()];
3067
3068        // Verify from peer1 (cache miss)
3069        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3070            serialized1.clone(), block_verifier.clone(), cache.clone(), &context, peer1, "live",
3071        );
3072        assert_eq!(result.unwrap().len(), 1);
3073
3074        let peer1_hostname = &context.committee.authority(peer1).hostname;
3075        assert_eq!(
3076            context
3077                .metrics
3078                .node_metrics
3079                .synchronizer_skipped_blocks_by_peer
3080                .with_label_values(&[peer1_hostname.as_str(), "live"])
3081                .get(),
3082            0
3083        );
3084
3085        // Verify same block from peer2 with different sync method (cache hit)
3086        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3087            serialized1, block_verifier.clone(), cache.clone(), &context, peer2, "periodic",
3088        );
3089        assert_eq!(result.unwrap().len(), 0, "Should skip cached block");
3090
3091        let peer2_hostname = &context.committee.authority(peer2).hostname;
3092        assert_eq!(
3093            context
3094                .metrics
3095                .node_metrics
3096                .synchronizer_skipped_blocks_by_peer
3097                .with_label_values(&[peer2_hostname.as_str(), "periodic"])
3098                .get(),
3099            1
3100        );
3101
3102        // Test 2: Invalid blocks not cached
3103        let invalid_block = VerifiedBlock::new_for_test(TestBlock::new(20, 0).build());
3104        let invalid_serialized = vec![invalid_block.serialized().clone()];
3105
3106        assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3107            invalid_serialized.clone(), failing_verifier.clone(), cache.clone(), &context, peer1, "test",
3108        ).is_err());
3109        assert_eq!(cache.lock().len(), 1, "Invalid block should not be cached");
3110
3111        // Verify invalid block fails again (not from cache)
3112        assert!(Synchronizer::<MockNetworkClient, FailingBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3113            invalid_serialized, failing_verifier, cache.clone(), &context, peer1, "test",
3114        ).is_err());
3115
3116        // Test 3: Cache eviction
3117        let blocks: Vec<_> = (0..5)
3118            .map(|i| VerifiedBlock::new_for_test(TestBlock::new(30 + i, 0).build()))
3119            .collect();
3120
3121        // Fill cache to capacity
3122        for block in &blocks {
3123            Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3124                vec![block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3125            ).unwrap();
3126        }
3127        assert_eq!(cache.lock().len(), 5);
3128
3129        // Verify first block is evicted when adding new one
3130        let new_block = VerifiedBlock::new_for_test(TestBlock::new(99, 0).build());
3131        Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3132            vec![new_block.serialized().clone()], block_verifier.clone(), cache.clone(), &context, peer1, "test",
3133        ).unwrap();
3134
3135        // First block (block1) should be evicted, so re-verifying it should not be a
3136        // cache hit
3137        let block1_serialized = vec![block1.serialized().clone()];
3138        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3139            block1_serialized, block_verifier.clone(), cache.clone(), &context, peer1, "test",
3140        );
3141        assert_eq!(
3142            result.unwrap().len(),
3143            1,
3144            "Evicted block should be re-verified"
3145        );
3146
3147        // New block should still be in cache
3148        let new_block_serialized = vec![new_block.serialized().clone()];
3149        let result = Synchronizer::<MockNetworkClient, NoopBlockVerifier, MockCoreThreadDispatcher>::verify_blocks(
3150            new_block_serialized, block_verifier, cache, &context, peer1, "test",
3151        );
3152        assert_eq!(result.unwrap().len(), 0, "New block should be cached");
3153    }
3154}