consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7    sync::Arc,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
24use tap::TapFallible;
25use tokio::{
26    runtime::Handle,
27    sync::{mpsc::error::TrySendError, oneshot},
28    task::{JoinError, JoinSet},
29    time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34    BlockAPI, CommitIndex, Round,
35    authority_service::COMMIT_LAG_MULTIPLIER,
36    block::{BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
37    block_verifier::BlockVerifier,
38    commit_vote_monitor::CommitVoteMonitor,
39    context::Context,
40    core_thread::CoreThreadDispatcher,
41    dag_state::DagState,
42    error::{ConsensusError, ConsensusResult},
43    network::NetworkClient,
44};
45
46/// The number of concurrent fetch blocks requests per authority
47const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49/// The maximal additional blocks (parents) that can be fetched.
50// TODO: This is a temporary value, and should be removed once the protocol
51// version is updated to support batching
52pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
53
54/// The timeout for synchronizer to fetch blocks from a given peer authority.
55const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
56
57/// The timeout for periodic synchronizer to fetch blocks from the peers.
58const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
59
60/// The maximum number of authorities from which we will try to fetch blocks at
61/// the same moment. The guard will protect that we will not ask from more than
62/// this number of authorities at the same time.
63const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 3;
64
65/// The maximum number of peers from which the periodic synchronizer will
66/// request blocks
67const MAX_PERIODIC_SYNC_PEERS: usize = 4;
68
69/// The maximum number of peers in the periodic synchronizer which are chosen
70/// totally random to fetch blocks from. The other peers will be chosen based on
71/// their knowledge of the DAG.
72const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
73
74struct BlocksGuard {
75    map: Arc<InflightBlocksMap>,
76    block_refs: BTreeSet<BlockRef>,
77    peer: AuthorityIndex,
78}
79
80impl Drop for BlocksGuard {
81    fn drop(&mut self) {
82        self.map.unlock_blocks(&self.block_refs, self.peer);
83    }
84}
85
86// Keeps a mapping between the missing blocks that have been instructed to be
87// fetched and the authorities that are currently fetching them. For a block ref
88// there is a maximum number of authorities that can concurrently fetch it. The
89// authority ids that are currently fetching a block are set on the
90// corresponding `BTreeSet` and basically they act as "locks".
91struct InflightBlocksMap {
92    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
93}
94
95impl InflightBlocksMap {
96    fn new() -> Arc<Self> {
97        Arc::new(Self {
98            inner: Mutex::new(HashMap::new()),
99        })
100    }
101
102    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
103    /// avoid re-fetching the missing blocks from too many authorities at
104    /// the same time, thus we limit the concurrency per block by attempting
105    /// to lock per block. If a block is already fetched by the maximum allowed
106    /// number of authorities, then the block ref will not be included in the
107    /// returned set. The method returns all the block refs that have been
108    /// successfully locked and allowed to be fetched.
109    fn lock_blocks(
110        self: &Arc<Self>,
111        missing_block_refs: BTreeSet<BlockRef>,
112        peer: AuthorityIndex,
113    ) -> Option<BlocksGuard> {
114        let mut blocks = BTreeSet::new();
115        let mut inner = self.inner.lock();
116
117        for block_ref in missing_block_refs {
118            // check that the number of authorities that are already instructed to fetch the
119            // block is not higher than the allowed and the `peer_index` has not
120            // already been instructed to do that.
121            let authorities = inner.entry(block_ref).or_default();
122            if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
123                && authorities.get(&peer).is_none()
124            {
125                assert!(authorities.insert(peer));
126                blocks.insert(block_ref);
127            }
128        }
129
130        if blocks.is_empty() {
131            None
132        } else {
133            Some(BlocksGuard {
134                map: self.clone(),
135                block_refs: blocks,
136                peer,
137            })
138        }
139    }
140
141    /// Unlocks the provided block references for the given `peer`. The
142    /// unlocking is strict, meaning that if this method is called for a
143    /// specific block ref and peer more times than the corresponding lock
144    /// has been called, it will panic.
145    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
146        // Now mark all the blocks as fetched from the map
147        let mut blocks_to_fetch = self.inner.lock();
148        for block_ref in block_refs {
149            let authorities = blocks_to_fetch
150                .get_mut(block_ref)
151                .expect("Should have found a non empty map");
152
153            assert!(authorities.remove(&peer), "Peer index should be present!");
154
155            // if the last one then just clean up
156            if authorities.is_empty() {
157                blocks_to_fetch.remove(block_ref);
158            }
159        }
160    }
161
162    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
163    /// and lock now again the referenced block refs. The swap is best
164    /// effort and there is no guarantee that the `peer` will be able to
165    /// acquire the new locks.
166    fn swap_locks(
167        self: &Arc<Self>,
168        blocks_guard: BlocksGuard,
169        peer: AuthorityIndex,
170    ) -> Option<BlocksGuard> {
171        let block_refs = blocks_guard.block_refs.clone();
172
173        // Explicitly drop the guard
174        drop(blocks_guard);
175
176        // Now create new guard
177        self.lock_blocks(block_refs, peer)
178    }
179
180    #[cfg(test)]
181    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
182        let inner = self.inner.lock();
183        inner.len()
184    }
185}
186
187enum Command {
188    FetchBlocks {
189        missing_block_refs: BTreeSet<BlockRef>,
190        peer_index: AuthorityIndex,
191        result: oneshot::Sender<Result<(), ConsensusError>>,
192    },
193    FetchOwnLastBlock,
194    KickOffScheduler,
195}
196
197pub(crate) struct SynchronizerHandle {
198    commands_sender: Sender<Command>,
199    tasks: tokio::sync::Mutex<JoinSet<()>>,
200}
201
202impl SynchronizerHandle {
203    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
204    /// block_refs set - from the peer authority.
205    pub(crate) async fn fetch_blocks(
206        &self,
207        missing_block_refs: BTreeSet<BlockRef>,
208        peer_index: AuthorityIndex,
209    ) -> ConsensusResult<()> {
210        let (sender, receiver) = oneshot::channel();
211        self.commands_sender
212            .send(Command::FetchBlocks {
213                missing_block_refs,
214                peer_index,
215                result: sender,
216            })
217            .await
218            .map_err(|_err| ConsensusError::Shutdown)?;
219        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
220    }
221
222    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
223        let mut tasks = self.tasks.lock().await;
224        tasks.abort_all();
225        while let Some(result) = tasks.join_next().await {
226            result?
227        }
228        Ok(())
229    }
230}
231
232/// `Synchronizer` oversees live block synchronization, crucial for node
233/// progress. Live synchronization refers to the process of retrieving missing
234/// blocks, particularly those essential for advancing a node when data from
235/// only a few rounds is absent. If a node significantly lags behind the
236/// network, `commit_syncer` handles fetching missing blocks via a more
237/// efficient approach. `Synchronizer` aims for swift catch-up employing two
238/// mechanisms:
239///
240/// 1. Explicitly requesting missing blocks from designated authorities via the
241///    "block send" path. This includes attempting to fetch any missing
242///    ancestors necessary for processing a received block. Such requests
243///    prioritize the block author, maximizing the chance of prompt retrieval. A
244///    locking mechanism allows concurrent requests for missing blocks from up
245///    to two authorities simultaneously, enhancing the chances of timely
246///    retrieval. Notably, if additional missing blocks arise during block
247///    processing, requests to the same authority are deferred to the scheduler.
248///
249/// 2. Periodically requesting missing blocks via a scheduler. This primarily
250///    serves to retrieve missing blocks that were not ancestors of a received
251///    block via the "block send" path. The scheduler operates on either a fixed
252///    periodic basis or is triggered immediately after explicit fetches
253///    described in (1), ensuring continued block retrieval if gaps persist.
254///
255/// Additionally to the above, the synchronizer can synchronize and fetch the
256/// last own proposed block from the network peers as best effort approach to
257/// recover node from amnesia and avoid making the node equivocate.
258pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
259    context: Arc<Context>,
260    commands_receiver: Receiver<Command>,
261    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
262    core_dispatcher: Arc<D>,
263    commit_vote_monitor: Arc<CommitVoteMonitor>,
264    dag_state: Arc<RwLock<DagState>>,
265    fetch_blocks_scheduler_task: JoinSet<()>,
266    fetch_own_last_block_task: JoinSet<()>,
267    network_client: Arc<C>,
268    block_verifier: Arc<V>,
269    inflight_blocks_map: Arc<InflightBlocksMap>,
270    commands_sender: Sender<Command>,
271}
272
273impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
274    /// Starts the synchronizer, which is responsible for fetching blocks from
275    /// other authorities and managing block synchronization tasks.
276    pub fn start(
277        network_client: Arc<C>,
278        context: Arc<Context>,
279        core_dispatcher: Arc<D>,
280        commit_vote_monitor: Arc<CommitVoteMonitor>,
281        block_verifier: Arc<V>,
282        dag_state: Arc<RwLock<DagState>>,
283        sync_last_known_own_block: bool,
284    ) -> Arc<SynchronizerHandle> {
285        let (commands_sender, commands_receiver) =
286            channel("consensus_synchronizer_commands", 1_000);
287        let inflight_blocks_map = InflightBlocksMap::new();
288
289        // Spawn the tasks to fetch the blocks from the others
290        let mut fetch_block_senders = BTreeMap::new();
291        let mut tasks = JoinSet::new();
292        for (index, _) in context.committee.authorities() {
293            if index == context.own_index {
294                continue;
295            }
296            let (sender, receiver) =
297                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
298            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
299                index,
300                network_client.clone(),
301                block_verifier.clone(),
302                commit_vote_monitor.clone(),
303                context.clone(),
304                core_dispatcher.clone(),
305                dag_state.clone(),
306                receiver,
307                commands_sender.clone(),
308            );
309            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
310            fetch_block_senders.insert(index, sender);
311        }
312
313        let commands_sender_clone = commands_sender.clone();
314
315        if sync_last_known_own_block {
316            commands_sender
317                .try_send(Command::FetchOwnLastBlock)
318                .expect("Failed to sync our last block");
319        }
320
321        // Spawn the task to listen to the requests & periodic runs
322        tasks.spawn(monitored_future!(async move {
323            let mut s = Self {
324                context,
325                commands_receiver,
326                fetch_block_senders,
327                core_dispatcher,
328                commit_vote_monitor,
329                fetch_blocks_scheduler_task: JoinSet::new(),
330                fetch_own_last_block_task: JoinSet::new(),
331                network_client,
332                block_verifier,
333                inflight_blocks_map,
334                commands_sender: commands_sender_clone,
335                dag_state,
336            };
337            s.run().await;
338        }));
339
340        Arc::new(SynchronizerHandle {
341            commands_sender,
342            tasks: tokio::sync::Mutex::new(tasks),
343        })
344    }
345
346    // The main loop to listen for the submitted commands.
347    async fn run(&mut self) {
348        // We want the synchronizer to run periodically every 200ms to fetch any missing
349        // blocks.
350        const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
351        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
352
353        tokio::pin!(scheduler_timeout);
354
355        loop {
356            tokio::select! {
357                Some(command) = self.commands_receiver.recv() => {
358                    match command {
359                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
360                            if peer_index == self.context.own_index {
361                                error!("We should never attempt to fetch blocks from our own node");
362                                continue;
363                            }
364
365                            let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
366
367                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
368                            // task will take care syncing whatever is leftover.
369                            let missing_block_refs = missing_block_refs
370                                .into_iter()
371                                .take(self.context.parameters.max_blocks_per_sync)
372                                .collect();
373
374                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
375                            let Some(blocks_guard) = blocks_guard else {
376                                result.send(Ok(())).ok();
377                                continue;
378                            };
379
380                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
381                            // synchronization task will handle any still missing blocks in next run.
382                            let r = self
383                                .fetch_block_senders
384                                .get(&peer_index)
385                                .expect("Fatal error, sender should be present")
386                                .try_send(blocks_guard)
387                                .map_err(|err| {
388                                    match err {
389                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
390                                        TrySendError::Closed(_) => ConsensusError::Shutdown
391                                    }
392                                });
393
394                            result.send(r).ok();
395                        }
396                        Command::FetchOwnLastBlock => {
397                            if self.fetch_own_last_block_task.is_empty() {
398                                self.start_fetch_own_last_block_task();
399                            }
400                        }
401                        Command::KickOffScheduler => {
402                            // just reset the scheduler timeout timer to run immediately if not already running.
403                            // If the scheduler is already running then just reduce the remaining time to run.
404                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
405                                Instant::now()
406                            } else {
407                                Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
408                            };
409
410                            // only reset if it is earlier than the next deadline
411                            if timeout < scheduler_timeout.deadline() {
412                                scheduler_timeout.as_mut().reset(timeout);
413                            }
414                        }
415                    }
416                },
417                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
418                    match result {
419                        Ok(()) => {},
420                        Err(e) => {
421                            if e.is_cancelled() {
422                            } else if e.is_panic() {
423                                std::panic::resume_unwind(e.into_panic());
424                            } else {
425                                panic!("fetch our last block task failed: {e}");
426                            }
427                        },
428                    };
429                },
430                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
431                    match result {
432                        Ok(()) => {},
433                        Err(e) => {
434                            if e.is_cancelled() {
435                            } else if e.is_panic() {
436                                std::panic::resume_unwind(e.into_panic());
437                            } else {
438                                panic!("fetch blocks scheduler task failed: {e}");
439                            }
440                        },
441                    };
442                },
443                () = &mut scheduler_timeout => {
444                    // we want to start a new task only if the previous one has already finished.
445                    if self.fetch_blocks_scheduler_task.is_empty() {
446                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
447                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
448                            return;
449                        };
450                    }
451
452                    scheduler_timeout
453                        .as_mut()
454                        .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
455                }
456            }
457        }
458    }
459
460    async fn fetch_blocks_from_authority(
461        peer_index: AuthorityIndex,
462        network_client: Arc<C>,
463        block_verifier: Arc<V>,
464        commit_vote_monitor: Arc<CommitVoteMonitor>,
465        context: Arc<Context>,
466        core_dispatcher: Arc<D>,
467        dag_state: Arc<RwLock<DagState>>,
468        mut receiver: Receiver<BlocksGuard>,
469        commands_sender: Sender<Command>,
470    ) {
471        const MAX_RETRIES: u32 = 3;
472        let peer_hostname = &context.committee.authority(peer_index).hostname;
473
474        let mut requests = FuturesUnordered::new();
475
476        loop {
477            tokio::select! {
478                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
479                    // get the highest accepted rounds
480                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
481
482                    // Record metrics for live synchronizer requests
483                    let metrics = &context.metrics.node_metrics;
484                    metrics
485                        .synchronizer_requested_blocks_by_peer
486                        .with_label_values(&[peer_hostname.as_str(), "live"])
487                        .inc_by(blocks_guard.block_refs.len() as u64);
488                    // Count requested blocks per authority and increment metric by one per authority
489                    let mut authors = HashSet::new();
490                    for block_ref in &blocks_guard.block_refs {
491                        authors.insert(block_ref.author);
492                    }
493                    for author in authors {
494                        let host = &context.committee.authority(author).hostname;
495                        metrics
496                            .synchronizer_requested_blocks_by_authority
497                            .with_label_values(&[host.as_str(), "live"])
498                            .inc();
499                    }
500
501                    requests.push(Self::fetch_blocks_request(
502                        network_client.clone(),
503                        peer_index,
504                        blocks_guard,
505                        highest_rounds,
506                        FETCH_REQUEST_TIMEOUT,
507                        1,
508                    ))
509                },
510                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
511                    match response {
512                        Ok(blocks) => {
513                            if let Err(err) = Self::process_fetched_blocks(blocks,
514                                peer_index,
515                                blocks_guard,
516                                core_dispatcher.clone(),
517                                block_verifier.clone(),
518                                commit_vote_monitor.clone(),
519                                context.clone(),
520                                commands_sender.clone(),
521                                "live"
522                            ).await {
523                                context.metrics.update_scoring_metrics_on_block_receival(
524                                    peer_index,
525                                    peer_hostname,
526                                    err.clone(),
527                                    "process_fetched_blocks",
528                                );
529                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
530                                context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
531                            }
532                        },
533                        Err(_) => {
534                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
535                            if retries <= MAX_RETRIES {
536                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
537                            } else {
538                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
539                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
540                                drop(blocks_guard);
541                            }
542                        }
543                    }
544                },
545                else => {
546                    info!("Fetching blocks from authority {peer_index} task will now abort.");
547                    break;
548                }
549            }
550        }
551    }
552
553    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
554    /// error is returned then the verified blocks are immediately sent to
555    /// Core for processing.
556    async fn process_fetched_blocks(
557        mut serialized_blocks: Vec<Bytes>,
558        peer_index: AuthorityIndex,
559        requested_blocks_guard: BlocksGuard,
560        core_dispatcher: Arc<D>,
561        block_verifier: Arc<V>,
562        commit_vote_monitor: Arc<CommitVoteMonitor>,
563        context: Arc<Context>,
564        commands_sender: Sender<Command>,
565        sync_method: &str,
566    ) -> ConsensusResult<()> {
567        if serialized_blocks.is_empty() {
568            return Ok(());
569        }
570
571        // Limit the number of the returned blocks processed.
572        if context.protocol_config.consensus_batched_block_sync() {
573            serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
574        } else {
575            // Ensure that all the returned blocks do not go over the total max allowed
576            // returned blocks
577            if serialized_blocks.len()
578                > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
579            {
580                return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
581            }
582        }
583
584        // Verify all the fetched blocks
585        let blocks = Handle::current()
586            .spawn_blocking({
587                let block_verifier = block_verifier.clone();
588                let context = context.clone();
589                move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
590            })
591            .await
592            .expect("Spawn blocking should not fail")?;
593
594        if !context.protocol_config.consensus_batched_block_sync() {
595            // Get all the ancestors of the requested blocks only
596            let ancestors = blocks
597                .iter()
598                .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
599                .flat_map(|b| b.ancestors().to_vec())
600                .collect::<BTreeSet<BlockRef>>();
601
602            // Now confirm that the blocks are either between the ones requested, or they
603            // are parents of the requested blocks
604            for block in &blocks {
605                if !requested_blocks_guard
606                    .block_refs
607                    .contains(&block.reference())
608                    && !ancestors.contains(&block.reference())
609                {
610                    return Err(ConsensusError::UnexpectedFetchedBlock {
611                        index: peer_index,
612                        block_ref: block.reference(),
613                    });
614                }
615            }
616        }
617
618        // Record commit votes from the verified blocks.
619        for block in &blocks {
620            commit_vote_monitor.observe_block(block);
621        }
622
623        let metrics = &context.metrics.node_metrics;
624        let peer_hostname = &context.committee.authority(peer_index).hostname;
625        metrics
626            .synchronizer_fetched_blocks_by_peer
627            .with_label_values(&[peer_hostname.as_str(), sync_method])
628            .inc_by(blocks.len() as u64);
629        for block in &blocks {
630            let block_hostname = &context.committee.authority(block.author()).hostname;
631            metrics
632                .synchronizer_fetched_blocks_by_authority
633                .with_label_values(&[block_hostname.as_str(), sync_method])
634                .inc();
635        }
636
637        debug!(
638            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
639            blocks.len(),
640            blocks.iter().map(|b| b.reference().to_string()).join(", "),
641        );
642
643        // Now send them to core for processing. Ignore the returned missing blocks as
644        // we don't want this mechanism to keep feedback looping on fetching
645        // more blocks. The periodic synchronization will take care of that.
646        let missing_blocks = core_dispatcher
647            .add_blocks(blocks)
648            .await
649            .map_err(|_| ConsensusError::Shutdown)?;
650
651        // now release all the locked blocks as they have been fetched, verified &
652        // processed
653        drop(requested_blocks_guard);
654
655        // kick off immediately the scheduled synchronizer
656        if !missing_blocks.is_empty() {
657            // do not block here, so we avoid any possible cycles.
658            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
659            {
660                warn!("Commands channel is full")
661            }
662        }
663
664        context
665            .metrics
666            .node_metrics
667            .missing_blocks_after_fetch_total
668            .inc_by(missing_blocks.len() as u64);
669
670        Ok(())
671    }
672
673    fn get_highest_accepted_rounds(
674        dag_state: Arc<RwLock<DagState>>,
675        context: &Arc<Context>,
676    ) -> Vec<Round> {
677        let blocks = dag_state
678            .read()
679            .get_last_cached_block_per_authority(Round::MAX);
680        assert_eq!(blocks.len(), context.committee.size());
681
682        blocks
683            .into_iter()
684            .map(|(block, _)| block.round())
685            .collect::<Vec<_>>()
686    }
687
688    fn verify_blocks(
689        serialized_blocks: Vec<Bytes>,
690        block_verifier: Arc<V>,
691        context: &Context,
692        peer_index: AuthorityIndex,
693    ) -> ConsensusResult<Vec<VerifiedBlock>> {
694        let mut verified_blocks = Vec::new();
695
696        for serialized_block in serialized_blocks {
697            let signed_block: SignedBlock =
698                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
699
700            // TODO: dedup block verifications, here and with fetched blocks.
701            if let Err(e) = block_verifier.verify(&signed_block) {
702                // TODO: we might want to use a different metric to track the invalid "served"
703                // blocks from the invalid "proposed" ones.
704                let hostname = context.committee.authority(peer_index).hostname.clone();
705
706                context
707                    .metrics
708                    .node_metrics
709                    .invalid_blocks
710                    .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
711                    .inc();
712                warn!("Invalid block received from {}: {}", peer_index, e);
713                return Err(e);
714            }
715            let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
716
717            // Dropping is ok because the block will be refetched.
718            // TODO: improve efficiency, maybe suspend and continue processing the block
719            // asynchronously.
720            let now = context.clock.timestamp_utc_ms();
721            if now < verified_block.timestamp_ms() {
722                warn!(
723                    "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
724                    verified_block.reference(),
725                    verified_block.timestamp_ms(),
726                    now
727                );
728                continue;
729            }
730
731            verified_blocks.push(verified_block);
732        }
733
734        Ok(verified_blocks)
735    }
736
737    async fn fetch_blocks_request(
738        network_client: Arc<C>,
739        peer: AuthorityIndex,
740        blocks_guard: BlocksGuard,
741        highest_rounds: Vec<Round>,
742        request_timeout: Duration,
743        mut retries: u32,
744    ) -> (
745        ConsensusResult<Vec<Bytes>>,
746        BlocksGuard,
747        u32,
748        AuthorityIndex,
749        Vec<Round>,
750    ) {
751        let start = Instant::now();
752        let resp = timeout(
753            request_timeout,
754            network_client.fetch_blocks(
755                peer,
756                blocks_guard
757                    .block_refs
758                    .clone()
759                    .into_iter()
760                    .collect::<Vec<_>>(),
761                highest_rounds.clone(),
762                request_timeout,
763            ),
764        )
765        .await;
766
767        fail_point_async!("consensus-delay");
768
769        let resp = match resp {
770            Ok(Err(err)) => {
771                // Add a delay before retrying - if that is needed. If request has timed out
772                // then eventually this will be a no-op.
773                sleep_until(start + request_timeout).await;
774                retries += 1;
775                Err(err)
776            } // network error
777            Err(err) => {
778                // timeout
779                sleep_until(start + request_timeout).await;
780                retries += 1;
781                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
782            }
783            Ok(result) => result,
784        };
785        (resp, blocks_guard, retries, peer, highest_rounds)
786    }
787
788    fn start_fetch_own_last_block_task(&mut self) {
789        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
790        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
791
792        let context = self.context.clone();
793        let dag_state = self.dag_state.clone();
794        let network_client = self.network_client.clone();
795        let block_verifier = self.block_verifier.clone();
796        let core_dispatcher = self.core_dispatcher.clone();
797
798        self.fetch_own_last_block_task
799            .spawn(monitored_future!(async move {
800                let _scope = monitored_scope("FetchOwnLastBlockTask");
801
802                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
803                    let network_client_cloned = network_client.clone();
804                    let own_index = context.own_index;
805                    async move {
806                        sleep(fetch_own_block_delay).await;
807                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
808                        (r, authority_index)
809                    }
810                };
811
812                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
813                                    let mut result = Vec::new();
814                                    for serialized_block in blocks {
815                                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
816                                        block_verifier.verify(&signed_block).tap_err(|err|{
817                                            let hostname = context.committee.authority(authority_index).hostname.clone();
818                                            context
819                                                .metrics
820                                                .node_metrics
821                                                .invalid_blocks
822                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
823                                                .inc();
824                                            warn!("Invalid block received from {}: {}", authority_index, err);
825                                        })?;
826
827                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
828                                        if verified_block.author() != context.own_index {
829                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
830                                        }
831                                        result.push(verified_block);
832                                    }
833                                    Ok(result)
834                };
835
836                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
837                let mut highest_round = GENESIS_ROUND;
838                // Keep track of the received responses to avoid fetching the own block header from same peer
839                let mut received_response = vec![false; context.committee.size()];
840                // Assume that our node is not Byzantine
841                received_response[context.own_index] = true;
842                let mut total_stake = context.committee.stake(context.own_index);
843                let mut retries = 0;
844                let mut retry_delay_step = Duration::from_millis(500);
845                'main:loop {
846                    if context.committee.size() == 1 {
847                        highest_round = dag_state.read().get_last_proposed_block().round();
848                        info!("Only one node in the network, will not try fetching own last block from peers.");
849                        break 'main;
850                    }
851
852                    // Ask all the other peers about our last block
853                    let mut results = FuturesUnordered::new();
854
855                    for (authority_index, _authority) in context.committee.authorities() {
856                        // Skip our own index and the ones that have already responded
857                        if !received_response[authority_index] {
858                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
859                        }
860                    }
861
862                    // Gather the results but wait to timeout as well
863                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
864                    tokio::pin!(timer);
865
866                    'inner: loop {
867                        tokio::select! {
868                            result = results.next() => {
869                                let Some((result, authority_index)) = result else {
870                                    break 'inner;
871                                };
872                                match result {
873                                    Ok(result) => {
874                                        match process_blocks(result, authority_index) {
875                                            Ok(blocks) => {
876                                                received_response[authority_index] = true;
877                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
878                                                highest_round = highest_round.max(max_round);
879
880                                                total_stake += context.committee.stake(authority_index);
881                                            },
882                                            Err(err) => {
883                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
884                                            }
885                                        }
886                                    },
887                                    Err(err) => {
888                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
889                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
890                                    }
891                                }
892                            },
893                            () = &mut timer => {
894                                info!("Timeout while trying to sync our own last block from peers");
895                                break 'inner;
896                            }
897                        }
898                    }
899
900                    // Request at least a quorum of 2f+1 stake to have replied back.
901                    if context.committee.reached_quorum(total_stake) {
902                        info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
903                        break 'main;
904                    } else {
905                        info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
906                    }
907
908                    retries += 1;
909                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
910                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
911
912                    sleep(retry_delay_step).await;
913
914                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
915                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
916                }
917
918                // Update the Core with the highest detected round
919                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
920
921                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
922                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
923                }
924            }));
925    }
926
927    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
928        let mut missing_blocks = self
929            .core_dispatcher
930            .get_missing_blocks()
931            .await
932            .map_err(|_err| ConsensusError::Shutdown)?;
933
934        // No reason to kick off the scheduler if there are no missing blocks to fetch
935        if missing_blocks.is_empty() {
936            return Ok(());
937        }
938
939        let context = self.context.clone();
940        let network_client = self.network_client.clone();
941        let block_verifier = self.block_verifier.clone();
942        let commit_vote_monitor = self.commit_vote_monitor.clone();
943        let core_dispatcher = self.core_dispatcher.clone();
944        let blocks_to_fetch = self.inflight_blocks_map.clone();
945        let commands_sender = self.commands_sender.clone();
946        let dag_state = self.dag_state.clone();
947
948        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
949        trace!(
950            "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
951        );
952        if commit_lagging {
953            // If gc is enabled and we are commit lagging, then we don't want to enable the
954            // scheduler. As the new logic of processing the certified commits
955            // takes place we are guaranteed that commits will happen for all the certified
956            // commits.
957            if dag_state.read().gc_enabled() {
958                return Ok(());
959            }
960
961            // As node is commit lagging try to sync only the missing blocks that are within
962            // the acceptable round thresholds to sync. The rest we don't attempt to
963            // sync yet.
964            let highest_accepted_round = dag_state.read().highest_accepted_round();
965            missing_blocks = missing_blocks
966                .into_iter()
967                .take_while(|(block_ref, _)| {
968                    block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
969                })
970                .collect::<BTreeMap<_, _>>();
971
972            // If no missing blocks are within the acceptable thresholds to sync while we
973            // commit lag, then we disable the scheduler completely for this run.
974            if missing_blocks.is_empty() {
975                trace!(
976                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
977                );
978                self.context
979                    .metrics
980                    .node_metrics
981                    .fetch_blocks_scheduler_skipped
982                    .with_label_values(&["commit_lagging"])
983                    .inc();
984                return Ok(());
985            }
986        }
987
988        self.fetch_blocks_scheduler_task
989            .spawn(monitored_future!(async move {
990                let _scope = monitored_scope("FetchMissingBlocksScheduler");
991
992                context
993                    .metrics
994                    .node_metrics
995                    .fetch_blocks_scheduler_inflight
996                    .inc();
997                let total_requested = missing_blocks.len();
998
999                fail_point_async!("consensus-delay");
1000
1001                // Fetch blocks from peers
1002                let results = Self::fetch_blocks_from_authorities(
1003                    context.clone(),
1004                    blocks_to_fetch.clone(),
1005                    network_client,
1006                    missing_blocks,
1007                    dag_state,
1008                )
1009                .await;
1010                context
1011                    .metrics
1012                    .node_metrics
1013                    .fetch_blocks_scheduler_inflight
1014                    .dec();
1015                if results.is_empty() {
1016                    warn!("No results returned while requesting missing blocks");
1017                    return;
1018                }
1019
1020                // Now process the returned results
1021                let mut total_fetched = 0;
1022                for (blocks_guard, fetched_blocks, peer) in results {
1023                    total_fetched += fetched_blocks.len();
1024
1025                    if let Err(err) = Self::process_fetched_blocks(
1026                        fetched_blocks,
1027                        peer,
1028                        blocks_guard,
1029                        core_dispatcher.clone(),
1030                        block_verifier.clone(),
1031                        commit_vote_monitor.clone(),
1032                        context.clone(),
1033                        commands_sender.clone(),
1034                        "periodic",
1035                    )
1036                    .await
1037                    {
1038                        warn!(
1039                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
1040                        );
1041                    }
1042                }
1043
1044                debug!(
1045                    "Total blocks requested to fetch: {}, total fetched: {}",
1046                    total_requested, total_fetched
1047                );
1048            }));
1049        Ok(())
1050    }
1051
1052    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1053        let last_commit_index = self.dag_state.read().last_commit_index();
1054        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1055        let commit_threshold = last_commit_index
1056            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1057        (
1058            commit_threshold < quorum_commit_index,
1059            last_commit_index,
1060            quorum_commit_index,
1061        )
1062    }
1063
1064    /// The number of rounds above the highest accepted round to allow fetching
1065    /// missing blocks via the periodic synchronization. Any missing blocks
1066    /// of higher rounds are considered too far in the future to fetch. This
1067    /// property is used only when it's detected that the node has fallen
1068    /// behind on its commit compared to the rest of the network,
1069    /// otherwise scheduler will attempt to fetch any missing block.
1070    fn missing_block_round_threshold(&self) -> Round {
1071        self.context.parameters.commit_sync_batch_size
1072    }
1073
1074    /// Fetches the given `missing_blocks` from up to `MAX_PEERS` authorities in
1075    /// parallel:
1076    ///
1077    /// Randomly select `MAX_PEERS - MAX_RANDOM_PEERS` peers from those who
1078    /// are known to hold some missing block, requesting up to
1079    /// `MAX_BLOCKS_PER_FETCH` block refs per peer.
1080    ///
1081    /// Randomly select `MAX_RANDOM_PEERS` additional peers from the
1082    ///  committee (excluding self and those already selected).
1083    ///
1084    /// The method returns a vector with the fetched blocks from each peer that
1085    /// successfully responded and any corresponding additional ancestor blocks.
1086    /// Each element of the vector is a tuple which contains the requested
1087    /// missing block refs, the returned blocks and the peer authority index.
1088    async fn fetch_blocks_from_authorities(
1089        context: Arc<Context>,
1090        inflight_blocks: Arc<InflightBlocksMap>,
1091        network_client: Arc<C>,
1092        missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1093        dag_state: Arc<RwLock<DagState>>,
1094    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1095        // Step 1: Map authorities to missing blocks that they are aware of
1096        let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1097        for (missing_block_ref, authorities) in &missing_blocks {
1098            for author in authorities {
1099                if author == &context.own_index {
1100                    // Skip our own index as we don't want to fetch blocks from ourselves
1101                    continue;
1102                }
1103                authority_to_blocks
1104                    .entry(*author)
1105                    .or_default()
1106                    .push(*missing_block_ref);
1107            }
1108        }
1109
1110        // Step 2: Choose at most MAX_PEERS-MAX_RANDOM_PEERS peers from those who are
1111        // aware of some missing blocks
1112
1113        #[cfg(not(test))]
1114        let mut rng = StdRng::from_entropy();
1115
1116        // Randomly pick up MAX_PEERS - MAX_RANDOM_PEERS authorities that are aware of
1117        // missing blocks
1118        #[cfg(not(test))]
1119        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1120            authority_to_blocks
1121                .iter()
1122                .choose_multiple(
1123                    &mut rng,
1124                    MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1125                )
1126                .into_iter()
1127                .map(|(&peer, blocks)| {
1128                    let limited_blocks = blocks
1129                        .iter()
1130                        .copied()
1131                        .take(context.parameters.max_blocks_per_sync)
1132                        .collect();
1133                    (peer, limited_blocks, "periodic_known")
1134                })
1135                .collect();
1136        #[cfg(test)]
1137        // Deterministically pick the smallest (MAX_PEERS - MAX_RANDOM_PEERS) authority indices
1138        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1139            let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1140                .iter()
1141                .map(|(&peer, blocks)| {
1142                    let limited_blocks = blocks
1143                        .iter()
1144                        .copied()
1145                        .take(context.parameters.max_blocks_per_sync)
1146                        .collect();
1147                    (peer, limited_blocks, "periodic_known")
1148                })
1149                .collect();
1150            // Sort by AuthorityIndex (natural order), then take the first MAX_PEERS -
1151            // MAX_RANDOM_PEERS
1152            items.sort_by_key(|(peer, _, _)| *peer);
1153            items
1154                .into_iter()
1155                .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1156                .collect()
1157        };
1158
1159        // Step 3: Choose at most two random peers not known to be aware of the missing
1160        // blocks
1161        let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1162            .iter()
1163            .map(|(peer, _, _)| *peer)
1164            .collect();
1165
1166        let random_candidates: Vec<_> = context
1167            .committee
1168            .authorities()
1169            .filter_map(|(peer_index, _)| {
1170                (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1171                    .then_some(peer_index)
1172            })
1173            .collect();
1174        #[cfg(test)]
1175        let random_peers: Vec<AuthorityIndex> = random_candidates
1176            .into_iter()
1177            .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1178            .collect();
1179        #[cfg(not(test))]
1180        let random_peers: Vec<AuthorityIndex> = random_candidates
1181            .into_iter()
1182            .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1183
1184        #[cfg_attr(test, allow(unused_mut))]
1185        let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1186        // Shuffle the missing blocks in case the first ones are blocked by irresponsive
1187        // peers
1188        #[cfg(not(test))]
1189        all_missing_blocks.shuffle(&mut rng);
1190
1191        let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1192
1193        for peer in random_peers {
1194            if let Some(chunk) = block_chunks.next() {
1195                chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1196            } else {
1197                break;
1198            }
1199        }
1200
1201        let mut request_futures = FuturesUnordered::new();
1202
1203        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1204
1205        // Record the missing blocks per authority for metrics
1206        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1207        for block in &all_missing_blocks {
1208            missing_blocks_per_authority[block.author] += 1;
1209        }
1210        for (missing, (_, authority)) in missing_blocks_per_authority
1211            .into_iter()
1212            .zip(context.committee.authorities())
1213        {
1214            context
1215                .metrics
1216                .node_metrics
1217                .synchronizer_missing_blocks_by_authority
1218                .with_label_values(&[&authority.hostname])
1219                .inc_by(missing as u64);
1220            context
1221                .metrics
1222                .node_metrics
1223                .synchronizer_current_missing_blocks_by_authority
1224                .with_label_values(&[&authority.hostname])
1225                .set(missing as i64);
1226        }
1227
1228        // Look at peers that were not chosen yet and try to fetch blocks from them if
1229        // needed later
1230        #[cfg_attr(test, expect(unused_mut))]
1231        let mut remaining_peers: Vec<_> = context
1232            .committee
1233            .authorities()
1234            .filter_map(|(peer_index, _)| {
1235                if peer_index != context.own_index
1236                    && !chosen_peers_with_blocks
1237                        .iter()
1238                        .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1239                {
1240                    Some(peer_index)
1241                } else {
1242                    None
1243                }
1244            })
1245            .collect();
1246
1247        #[cfg(not(test))]
1248        remaining_peers.shuffle(&mut rng);
1249        let mut remaining_peers = remaining_peers.into_iter();
1250
1251        // Send the initial requests
1252        for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1253            let peer_hostname = &context.committee.authority(peer).hostname;
1254            let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1255
1256            // Lock the blocks to be fetched. If no lock can be acquired for any of the
1257            // blocks then don't bother.
1258            if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1259                info!(
1260                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1261                    block_refs.len(),
1262                    peer,
1263                    peer_hostname,
1264                    block_refs
1265                        .iter()
1266                        .map(|b| b.to_string())
1267                        .collect::<Vec<_>>()
1268                        .join(", ")
1269                );
1270                // Record metrics about requested blocks
1271                let metrics = &context.metrics.node_metrics;
1272                metrics
1273                    .synchronizer_requested_blocks_by_peer
1274                    .with_label_values(&[peer_hostname.as_str(), label])
1275                    .inc_by(block_refs.len() as u64);
1276                for block_ref in &block_refs {
1277                    let block_hostname = &context.committee.authority(block_ref.author).hostname;
1278                    metrics
1279                        .synchronizer_requested_blocks_by_authority
1280                        .with_label_values(&[block_hostname.as_str(), label])
1281                        .inc();
1282                }
1283                request_futures.push(Self::fetch_blocks_request(
1284                    network_client.clone(),
1285                    peer,
1286                    blocks_guard,
1287                    highest_rounds.clone(),
1288                    FETCH_REQUEST_TIMEOUT,
1289                    1,
1290                ));
1291            }
1292        }
1293
1294        let mut results = Vec::new();
1295        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1296
1297        tokio::pin!(fetcher_timeout);
1298
1299        loop {
1300            tokio::select! {
1301                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1302                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1303                    match response {
1304                        Ok(fetched_blocks) => {
1305                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1306                            results.push((blocks_guard, fetched_blocks, peer_index));
1307
1308                            // no more pending requests are left, just break the loop
1309                            if request_futures.is_empty() {
1310                                break;
1311                            }
1312                        },
1313                        Err(_) => {
1314                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1315                            // try again if there is any peer left
1316                            if let Some(next_peer) = remaining_peers.next() {
1317                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1318                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1319                                    info!(
1320                                        "Retrying syncing {} missing blocks from peer {}: {}",
1321                                        blocks_guard.block_refs.len(),
1322                                        peer_hostname,
1323                                        blocks_guard.block_refs
1324                                            .iter()
1325                                            .map(|b| b.to_string())
1326                                            .collect::<Vec<_>>()
1327                                            .join(", ")
1328                                    );
1329                                    let block_refs = blocks_guard.block_refs.clone();
1330                                    // Record metrics about requested blocks
1331                                    let metrics = &context.metrics.node_metrics;
1332                                    metrics
1333                                        .synchronizer_requested_blocks_by_peer
1334                                        .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1335                                        .inc_by(block_refs.len() as u64);
1336                                    for block_ref in &block_refs {
1337                                        let block_hostname =
1338                                            &context.committee.authority(block_ref.author).hostname;
1339                                        metrics
1340                                            .synchronizer_requested_blocks_by_authority
1341                                            .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1342                                            .inc();
1343                                    }
1344                                    request_futures.push(Self::fetch_blocks_request(
1345                                        network_client.clone(),
1346                                        next_peer,
1347                                        blocks_guard,
1348                                        highest_rounds,
1349                                        FETCH_REQUEST_TIMEOUT,
1350                                        1,
1351                                    ));
1352                                } else {
1353                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1354                                }
1355                            } else {
1356                                debug!("No more peers left to fetch blocks");
1357                            }
1358                        }
1359                    }
1360                },
1361                _ = &mut fetcher_timeout => {
1362                    debug!("Timed out while fetching missing blocks");
1363                    break;
1364                }
1365            }
1366        }
1367
1368        results
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    use std::{
1375        collections::{BTreeMap, BTreeSet},
1376        sync::Arc,
1377        time::Duration,
1378    };
1379
1380    use async_trait::async_trait;
1381    use bytes::Bytes;
1382    use consensus_config::{AuthorityIndex, Parameters};
1383    use iota_metrics::monitored_mpsc;
1384    use parking_lot::RwLock;
1385    use tokio::{sync::Mutex, time::sleep};
1386
1387    use crate::{
1388        CommitDigest, CommitIndex,
1389        authority_service::COMMIT_LAG_MULTIPLIER,
1390        block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1391        block_verifier::NoopBlockVerifier,
1392        commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1393        commit_vote_monitor::CommitVoteMonitor,
1394        context::Context,
1395        core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1396        dag_state::DagState,
1397        error::{ConsensusError, ConsensusResult},
1398        network::{BlockStream, NetworkClient},
1399        round_prober::QuorumRound,
1400        storage::mem_store::MemStore,
1401        synchronizer::{
1402            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1403        },
1404    };
1405
1406    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1407    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1408    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1409    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1410
1411    #[derive(Default)]
1412    struct MockNetworkClient {
1413        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1414        fetch_latest_blocks_requests:
1415            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1416    }
1417
1418    impl MockNetworkClient {
1419        async fn stub_fetch_blocks(
1420            &self,
1421            blocks: Vec<VerifiedBlock>,
1422            peer: AuthorityIndex,
1423            latency: Option<Duration>,
1424        ) {
1425            let mut lock = self.fetch_blocks_requests.lock().await;
1426            let block_refs = blocks
1427                .iter()
1428                .map(|block| block.reference())
1429                .collect::<Vec<_>>();
1430            lock.insert((block_refs, peer), (blocks, latency));
1431        }
1432
1433        async fn stub_fetch_latest_blocks(
1434            &self,
1435            blocks: Vec<VerifiedBlock>,
1436            peer: AuthorityIndex,
1437            authorities: Vec<AuthorityIndex>,
1438            latency: Option<Duration>,
1439        ) {
1440            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1441            lock.entry((peer, authorities))
1442                .or_default()
1443                .push((blocks, latency));
1444        }
1445
1446        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1447            let lock = self.fetch_latest_blocks_requests.lock().await;
1448            lock.len()
1449        }
1450    }
1451
1452    #[async_trait]
1453    impl NetworkClient for MockNetworkClient {
1454        const SUPPORT_STREAMING: bool = false;
1455
1456        async fn send_block(
1457            &self,
1458            _peer: AuthorityIndex,
1459            _serialized_block: &VerifiedBlock,
1460            _timeout: Duration,
1461        ) -> ConsensusResult<()> {
1462            unimplemented!("Unimplemented")
1463        }
1464
1465        async fn subscribe_blocks(
1466            &self,
1467            _peer: AuthorityIndex,
1468            _last_received: Round,
1469            _timeout: Duration,
1470        ) -> ConsensusResult<BlockStream> {
1471            unimplemented!("Unimplemented")
1472        }
1473
1474        async fn fetch_blocks(
1475            &self,
1476            peer: AuthorityIndex,
1477            block_refs: Vec<BlockRef>,
1478            _highest_accepted_rounds: Vec<Round>,
1479            _timeout: Duration,
1480        ) -> ConsensusResult<Vec<Bytes>> {
1481            let mut lock = self.fetch_blocks_requests.lock().await;
1482            let response = lock
1483                .remove(&(block_refs, peer))
1484                .expect("Unexpected fetch blocks request made");
1485
1486            let serialised = response
1487                .0
1488                .into_iter()
1489                .map(|block| block.serialized().clone())
1490                .collect::<Vec<_>>();
1491
1492            drop(lock);
1493
1494            if let Some(latency) = response.1 {
1495                sleep(latency).await;
1496            }
1497
1498            Ok(serialised)
1499        }
1500
1501        async fn fetch_commits(
1502            &self,
1503            _peer: AuthorityIndex,
1504            _commit_range: CommitRange,
1505            _timeout: Duration,
1506        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1507            unimplemented!("Unimplemented")
1508        }
1509
1510        async fn fetch_latest_blocks(
1511            &self,
1512            peer: AuthorityIndex,
1513            authorities: Vec<AuthorityIndex>,
1514            _timeout: Duration,
1515        ) -> ConsensusResult<Vec<Bytes>> {
1516            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1517            let mut responses = lock
1518                .remove(&(peer, authorities.clone()))
1519                .expect("Unexpected fetch blocks request made");
1520
1521            let response = responses.remove(0);
1522            let serialised = response
1523                .0
1524                .into_iter()
1525                .map(|block| block.serialized().clone())
1526                .collect::<Vec<_>>();
1527
1528            if !responses.is_empty() {
1529                lock.insert((peer, authorities), responses);
1530            }
1531
1532            drop(lock);
1533
1534            if let Some(latency) = response.1 {
1535                sleep(latency).await;
1536            }
1537
1538            Ok(serialised)
1539        }
1540
1541        async fn get_latest_rounds(
1542            &self,
1543            _peer: AuthorityIndex,
1544            _timeout: Duration,
1545        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1546            unimplemented!("Unimplemented")
1547        }
1548    }
1549
1550    #[test]
1551    fn test_inflight_blocks_map() {
1552        // GIVEN
1553        let map = InflightBlocksMap::new();
1554        let some_block_refs = [
1555            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1556            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1557            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1558            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1559        ];
1560        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1561
1562        // Lock & unlock blocks
1563        {
1564            let mut all_guards = Vec::new();
1565
1566            // Try to acquire the block locks for authorities 1 & 2 & 3
1567            for i in 1..=3 {
1568                let authority = AuthorityIndex::new_for_test(i);
1569
1570                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1571                let guard = guard.expect("Guard should be created");
1572                assert_eq!(guard.block_refs.len(), 4);
1573
1574                all_guards.push(guard);
1575
1576                // trying to acquire any of them again will not succeed
1577                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1578                assert!(guard.is_none());
1579            }
1580
1581            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1582            // number of allowed peers
1583            let authority_4 = AuthorityIndex::new_for_test(4);
1584
1585            let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1586            assert!(guard.is_none());
1587
1588            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1589            // will now succeed
1590            drop(all_guards.remove(0));
1591
1592            let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1593            let guard = guard.expect("Guard should be successfully acquired");
1594
1595            assert_eq!(guard.block_refs, missing_block_refs);
1596
1597            // Dropping all guards should unlock on the block refs
1598            drop(guard);
1599            drop(all_guards);
1600
1601            assert_eq!(map.num_of_locked_blocks(), 0);
1602        }
1603
1604        // Swap locks
1605        {
1606            // acquire a lock for authority 1
1607            let authority_1 = AuthorityIndex::new_for_test(1);
1608            let guard = map
1609                .lock_blocks(missing_block_refs.clone(), authority_1)
1610                .unwrap();
1611
1612            // Now swap the locks for authority 2
1613            let authority_2 = AuthorityIndex::new_for_test(2);
1614            let guard = map.swap_locks(guard, authority_2);
1615
1616            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1617        }
1618    }
1619
1620    #[tokio::test]
1621    async fn test_process_fetched_blocks() {
1622        // GIVEN
1623        let (context, _) = Context::new_for_test(4);
1624        let context = Arc::new(context);
1625        let block_verifier = Arc::new(NoopBlockVerifier {});
1626        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1627        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1628        let (commands_sender, _commands_receiver) =
1629            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1630
1631        // Create input test blocks:
1632        // - Authority 0 block at round 60.
1633        // - Authority 1 blocks from round 30 to 93.
1634        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
1635        expected_blocks.extend(
1636            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
1637        );
1638        assert_eq!(
1639            expected_blocks.len(),
1640            context.parameters.max_blocks_per_sync
1641        );
1642
1643        let expected_serialized_blocks = expected_blocks
1644            .iter()
1645            .map(|b| b.serialized().clone())
1646            .collect::<Vec<_>>();
1647
1648        let expected_block_refs = expected_blocks
1649            .iter()
1650            .map(|b| b.reference())
1651            .collect::<BTreeSet<_>>();
1652
1653        // GIVEN peer to fetch blocks from
1654        let peer_index = AuthorityIndex::new_for_test(2);
1655
1656        // Create blocks_guard
1657        let inflight_blocks_map = InflightBlocksMap::new();
1658        let blocks_guard = inflight_blocks_map
1659            .lock_blocks(expected_block_refs.clone(), peer_index)
1660            .expect("Failed to lock blocks");
1661
1662        assert_eq!(
1663            inflight_blocks_map.num_of_locked_blocks(),
1664            expected_block_refs.len()
1665        );
1666
1667        // Create a Synchronizer
1668        let result = Synchronizer::<
1669            MockNetworkClient,
1670            NoopBlockVerifier,
1671            MockCoreThreadDispatcher,
1672        >::process_fetched_blocks(
1673            expected_serialized_blocks,
1674            peer_index,
1675            blocks_guard, // The guard is consumed here
1676            core_dispatcher.clone(),
1677            block_verifier,
1678            commit_vote_monitor,
1679            context.clone(),
1680            commands_sender,
1681            "test",
1682        )
1683            .await;
1684
1685        // THEN
1686        assert!(result.is_ok());
1687
1688        // Check blocks were sent to core
1689        let added_blocks = core_dispatcher.get_add_blocks().await;
1690        assert_eq!(
1691            added_blocks
1692                .iter()
1693                .map(|b| b.reference())
1694                .collect::<BTreeSet<_>>(),
1695            expected_block_refs,
1696        );
1697
1698        // Check blocks were unlocked
1699        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
1700    }
1701
1702    #[tokio::test]
1703    async fn test_successful_fetch_blocks_from_peer() {
1704        // GIVEN
1705        let (context, _) = Context::new_for_test(4);
1706        let context = Arc::new(context);
1707        let block_verifier = Arc::new(NoopBlockVerifier {});
1708        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1709        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1710        let network_client = Arc::new(MockNetworkClient::default());
1711        let store = Arc::new(MemStore::new());
1712        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1713
1714        let handle = Synchronizer::start(
1715            network_client.clone(),
1716            context,
1717            core_dispatcher.clone(),
1718            commit_vote_monitor,
1719            block_verifier,
1720            dag_state,
1721            false,
1722        );
1723
1724        // Create some test blocks
1725        let expected_blocks = (0..10)
1726            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1727            .collect::<Vec<_>>();
1728        let missing_blocks = expected_blocks
1729            .iter()
1730            .map(|block| block.reference())
1731            .collect::<BTreeSet<_>>();
1732
1733        // AND stub the fetch_blocks request from peer 1
1734        let peer = AuthorityIndex::new_for_test(1);
1735        network_client
1736            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1737            .await;
1738
1739        // WHEN request missing blocks from peer 1
1740        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1741
1742        // Wait a little bit until those have been added in core
1743        sleep(Duration::from_millis(1_000)).await;
1744
1745        // THEN ensure those ended up in Core
1746        let added_blocks = core_dispatcher.get_add_blocks().await;
1747        assert_eq!(added_blocks, expected_blocks);
1748    }
1749
1750    #[tokio::test]
1751    async fn saturate_fetch_blocks_from_peer() {
1752        // GIVEN
1753        let (context, _) = Context::new_for_test(4);
1754        let context = Arc::new(context);
1755        let block_verifier = Arc::new(NoopBlockVerifier {});
1756        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1757        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1758        let network_client = Arc::new(MockNetworkClient::default());
1759        let store = Arc::new(MemStore::new());
1760        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1761
1762        let handle = Synchronizer::start(
1763            network_client.clone(),
1764            context,
1765            core_dispatcher.clone(),
1766            commit_vote_monitor,
1767            block_verifier,
1768            dag_state,
1769            false,
1770        );
1771
1772        // Create some test blocks
1773        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1774            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1775            .collect::<Vec<_>>();
1776
1777        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1778        let peer = AuthorityIndex::new_for_test(1);
1779        let mut iter = expected_blocks.iter().peekable();
1780        while let Some(block) = iter.next() {
1781            // stub the fetch_blocks request from peer 1 and give some high response latency
1782            // so requests can start blocking the peer task.
1783            network_client
1784                .stub_fetch_blocks(
1785                    vec![block.clone()],
1786                    peer,
1787                    Some(Duration::from_millis(5_000)),
1788                )
1789                .await;
1790
1791            let mut missing_blocks = BTreeSet::new();
1792            missing_blocks.insert(block.reference());
1793
1794            // WHEN requesting to fetch the blocks, it should not succeed for the last
1795            // request and get an error with "saturated" synchronizer
1796            if iter.peek().is_none() {
1797                match handle.fetch_blocks(missing_blocks, peer).await {
1798                    Err(ConsensusError::SynchronizerSaturated(index, _)) => {
1799                        assert_eq!(index, peer);
1800                    }
1801                    _ => panic!("A saturated synchronizer error was expected"),
1802                }
1803            } else {
1804                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1805            }
1806        }
1807    }
1808
1809    #[tokio::test(flavor = "current_thread", start_paused = true)]
1810    async fn synchronizer_periodic_task_fetch_blocks() {
1811        // GIVEN
1812        let (context, _) = Context::new_for_test(4);
1813        let context = Arc::new(context);
1814        let block_verifier = Arc::new(NoopBlockVerifier {});
1815        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1816        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1817        let network_client = Arc::new(MockNetworkClient::default());
1818        let store = Arc::new(MemStore::new());
1819        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1820
1821        // Create some test blocks
1822        let expected_blocks = (0..10)
1823            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1824            .collect::<Vec<_>>();
1825        let missing_blocks = expected_blocks
1826            .iter()
1827            .map(|block| block.reference())
1828            .collect::<BTreeSet<_>>();
1829
1830        // AND stub the missing blocks
1831        core_dispatcher
1832            .stub_missing_blocks(missing_blocks.clone())
1833            .await;
1834
1835        // AND stub the requests for authority 1 & 2
1836        // Make the first authority timeout, so the second will be called. "We" are
1837        // authority = 0, so we are skipped anyways.
1838        network_client
1839            .stub_fetch_blocks(
1840                expected_blocks.clone(),
1841                AuthorityIndex::new_for_test(1),
1842                Some(FETCH_REQUEST_TIMEOUT),
1843            )
1844            .await;
1845        network_client
1846            .stub_fetch_blocks(
1847                expected_blocks.clone(),
1848                AuthorityIndex::new_for_test(2),
1849                None,
1850            )
1851            .await;
1852
1853        // WHEN start the synchronizer and wait for a couple of seconds
1854        let _handle = Synchronizer::start(
1855            network_client.clone(),
1856            context,
1857            core_dispatcher.clone(),
1858            commit_vote_monitor,
1859            block_verifier,
1860            dag_state,
1861            false,
1862        );
1863
1864        sleep(8 * FETCH_REQUEST_TIMEOUT).await;
1865
1866        // THEN the missing blocks should now be fetched and added to core
1867        let added_blocks = core_dispatcher.get_add_blocks().await;
1868        assert_eq!(added_blocks, expected_blocks);
1869
1870        // AND missing blocks should have been consumed by the stub
1871        assert!(
1872            core_dispatcher
1873                .get_missing_blocks()
1874                .await
1875                .unwrap()
1876                .is_empty()
1877        );
1878    }
1879
1880    #[tokio::test(flavor = "current_thread", start_paused = true)]
1881    async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds()
1882     {
1883        // GIVEN
1884        let (mut context, _) = Context::new_for_test(4);
1885
1886        // We want to run this test only when gc is disabled. Once gc gets enabled this
1887        // logic won't execute any more.
1888        context
1889            .protocol_config
1890            .set_consensus_gc_depth_for_testing(0);
1891        // Enable batching for synchronizer
1892        context
1893            .protocol_config
1894            .set_consensus_batched_block_sync_for_testing(true);
1895
1896        let context = Arc::new(context);
1897
1898        let block_verifier = Arc::new(NoopBlockVerifier {});
1899        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1900        let network_client = Arc::new(MockNetworkClient::default());
1901        let store = Arc::new(MemStore::new());
1902        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1903        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1904
1905        // AND stub some missing blocks. The highest accepted round is 0.
1906        // Create some blocks that are below and above the threshold sync.
1907        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1908        let expected_blocks = (1..=sync_missing_block_round_threshold * 2)
1909            .flat_map(|round| {
1910                vec![
1911                    VerifiedBlock::new_for_test(TestBlock::new(round, 1).build()),
1912                    VerifiedBlock::new_for_test(TestBlock::new(round, 2).build()),
1913                    VerifiedBlock::new_for_test(TestBlock::new(round, 3).build()),
1914                ]
1915                .into_iter()
1916            })
1917            .collect::<Vec<_>>();
1918
1919        let missing_blocks = expected_blocks
1920            .iter()
1921            .map(|block| block.reference())
1922            .collect::<BTreeSet<_>>();
1923        core_dispatcher.stub_missing_blocks(missing_blocks).await;
1924
1925        // Stub the requests for authority 1 & 2 & 3
1926        let stub_blocks = expected_blocks
1927            .iter()
1928            .map(|block| (block.reference(), block.clone()))
1929            .collect::<BTreeMap<_, _>>();
1930
1931        // Authority 1 and 2 will be requested about their blocks
1932        let stub_block_author_1 = stub_blocks
1933            .iter()
1934            .filter(|(block, _)| block.author == AuthorityIndex::new_for_test(1))
1935            .take(context.parameters.max_blocks_per_sync)
1936            .map(|(_, block)| block.clone())
1937            .collect::<Vec<_>>();
1938
1939        let stub_block_author_2 = stub_blocks
1940            .iter()
1941            .filter(|(block, _)| block.author == AuthorityIndex::new_for_test(2))
1942            .take(context.parameters.max_blocks_per_sync)
1943            .map(|(_, block)| block.clone())
1944            .collect::<Vec<_>>();
1945
1946        // Authority 3 will be requested about first blocks in the missing blocks
1947        let stub_block_author_3 = stub_blocks
1948            .iter()
1949            .take(context.parameters.max_blocks_per_sync)
1950            .map(|(_, block)| block.clone())
1951            .collect::<Vec<_>>();
1952
1953        let mut expected_blocks: Vec<_> = Vec::new();
1954        expected_blocks.extend(stub_block_author_1.clone());
1955        expected_blocks.extend(stub_block_author_2.clone());
1956        expected_blocks.extend(stub_block_author_3.clone());
1957
1958        network_client
1959            .stub_fetch_blocks(stub_block_author_1, AuthorityIndex::new_for_test(1), None)
1960            .await;
1961        network_client
1962            .stub_fetch_blocks(stub_block_author_2, AuthorityIndex::new_for_test(2), None)
1963            .await;
1964        network_client
1965            .stub_fetch_blocks(stub_block_author_3, AuthorityIndex::new_for_test(3), None)
1966            .await;
1967
1968        // Now create some blocks to simulate a commit lag
1969        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1970        let commit_index: CommitIndex = round - 1;
1971        let blocks = (0..4)
1972            .map(|authority| {
1973                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1974                let block = TestBlock::new(round, authority)
1975                    .set_commit_votes(commit_votes)
1976                    .build();
1977
1978                VerifiedBlock::new_for_test(block)
1979            })
1980            .collect::<Vec<_>>();
1981
1982        // Pass them through the commit vote monitor - so now there will be a big commit
1983        // lag to prevent the scheduled synchronizer from running
1984        for block in blocks {
1985            commit_vote_monitor.observe_block(&block);
1986        }
1987
1988        // WHEN start the synchronizer and wait for a couple of seconds where normally
1989        // the synchronizer should have kicked in.
1990        let _handle = Synchronizer::start(
1991            network_client.clone(),
1992            context.clone(),
1993            core_dispatcher.clone(),
1994            commit_vote_monitor.clone(),
1995            block_verifier.clone(),
1996            dag_state.clone(),
1997            false,
1998        );
1999
2000        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2001
2002        // We should be in commit lag mode, but since there are missing blocks within
2003        // the acceptable round thresholds those ones should be fetched. Nothing above.
2004        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2005
2006        added_blocks.sort_by_key(|block| block.reference());
2007        expected_blocks.sort_by_key(|block| block.reference());
2008
2009        assert_eq!(added_blocks, expected_blocks);
2010    }
2011
2012    #[tokio::test(flavor = "current_thread", start_paused = true)]
2013    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
2014        // GIVEN
2015        let (mut context, _) = Context::new_for_test(4);
2016        context
2017            .protocol_config
2018            .set_consensus_batched_block_sync_for_testing(true);
2019        let context = Arc::new(context);
2020        let block_verifier = Arc::new(NoopBlockVerifier {});
2021        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2022        let network_client = Arc::new(MockNetworkClient::default());
2023        let store = Arc::new(MemStore::new());
2024        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2025        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2026
2027        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
2028        // that are above the sync threshold.
2029        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2030        let stub_blocks = (sync_missing_block_round_threshold * 2
2031            ..sync_missing_block_round_threshold * 2
2032                + context.parameters.max_blocks_per_sync as u32)
2033            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2034            .collect::<Vec<_>>();
2035        let missing_blocks = stub_blocks
2036            .iter()
2037            .map(|block| block.reference())
2038            .collect::<BTreeSet<_>>();
2039        core_dispatcher
2040            .stub_missing_blocks(missing_blocks.clone())
2041            .await;
2042        // AND stub the requests for authority 1 & 2
2043        // Make the first authority timeout, so the second will be called. "We" are
2044        // authority = 0, so we are skipped anyways.
2045        let mut expected_blocks = stub_blocks
2046            .iter()
2047            .take(context.parameters.max_blocks_per_sync)
2048            .cloned()
2049            .collect::<Vec<_>>();
2050        network_client
2051            .stub_fetch_blocks(
2052                expected_blocks.clone(),
2053                AuthorityIndex::new_for_test(1),
2054                Some(FETCH_REQUEST_TIMEOUT),
2055            )
2056            .await;
2057        network_client
2058            .stub_fetch_blocks(
2059                expected_blocks.clone(),
2060                AuthorityIndex::new_for_test(2),
2061                None,
2062            )
2063            .await;
2064
2065        // Now create some blocks to simulate a commit lag
2066        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2067        let commit_index: CommitIndex = round - 1;
2068        let blocks = (0..4)
2069            .map(|authority| {
2070                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2071                let block = TestBlock::new(round, authority)
2072                    .set_commit_votes(commit_votes)
2073                    .build();
2074
2075                VerifiedBlock::new_for_test(block)
2076            })
2077            .collect::<Vec<_>>();
2078
2079        // Pass them through the commit vote monitor - so now there will be a big commit
2080        // lag to prevent the scheduled synchronizer from running
2081        for block in blocks {
2082            commit_vote_monitor.observe_block(&block);
2083        }
2084
2085        // Start the synchronizer and wait for a couple of seconds where normally
2086        // the synchronizer should have kicked in.
2087        let _handle = Synchronizer::start(
2088            network_client.clone(),
2089            context.clone(),
2090            core_dispatcher.clone(),
2091            commit_vote_monitor.clone(),
2092            block_verifier,
2093            dag_state.clone(),
2094            false,
2095        );
2096
2097        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
2098
2099        // Since we should be in commit lag mode none of the missed blocks should have
2100        // been fetched - hence nothing should be sent to core for processing.
2101        let added_blocks = core_dispatcher.get_add_blocks().await;
2102        assert_eq!(added_blocks, vec![]);
2103
2104        println!("Before advancing");
2105        // AND advance now the local commit index by adding a new commit that matches
2106        // the commit index of quorum
2107        {
2108            let mut d = dag_state.write();
2109            for index in 1..=commit_index {
2110                let commit =
2111                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2112
2113                d.add_commit(commit);
2114            }
2115
2116            println!("Once advanced");
2117            assert_eq!(
2118                d.last_commit_index(),
2119                commit_vote_monitor.quorum_commit_index()
2120            );
2121        }
2122
2123        // Now stub again the missing blocks to fetch the exact same ones.
2124        core_dispatcher
2125            .stub_missing_blocks(missing_blocks.clone())
2126            .await;
2127
2128        println!("Final sleep");
2129        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2130
2131        // THEN the missing blocks should now be fetched and added to core
2132        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2133        println!("Final await");
2134        added_blocks.sort_by_key(|block| block.reference());
2135        expected_blocks.sort_by_key(|block| block.reference());
2136
2137        assert_eq!(added_blocks, expected_blocks);
2138    }
2139
2140    #[tokio::test(flavor = "current_thread", start_paused = true)]
2141    async fn synchronizer_fetch_own_last_block() {
2142        // GIVEN
2143        let (context, _) = Context::new_for_test(4);
2144        let context = Arc::new(context.with_parameters(Parameters {
2145            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2146            ..Default::default()
2147        }));
2148        let block_verifier = Arc::new(NoopBlockVerifier {});
2149        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2150        let network_client = Arc::new(MockNetworkClient::default());
2151        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2152        let store = Arc::new(MemStore::new());
2153        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2154        let our_index = AuthorityIndex::new_for_test(0);
2155
2156        // Create some test blocks
2157        let mut expected_blocks = (8..=10)
2158            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2159            .collect::<Vec<_>>();
2160
2161        // Now set different latest blocks for the peers
2162        // For peer 1 we give the block of round 10 (highest)
2163        let block_1 = expected_blocks.pop().unwrap();
2164        network_client
2165            .stub_fetch_latest_blocks(
2166                vec![block_1.clone()],
2167                AuthorityIndex::new_for_test(1),
2168                vec![our_index],
2169                Some(Duration::from_secs(10)),
2170            )
2171            .await;
2172        network_client
2173            .stub_fetch_latest_blocks(
2174                vec![block_1],
2175                AuthorityIndex::new_for_test(1),
2176                vec![our_index],
2177                None,
2178            )
2179            .await;
2180
2181        // For peer 2 we give the block of round 9
2182        let block_2 = expected_blocks.pop().unwrap();
2183        network_client
2184            .stub_fetch_latest_blocks(
2185                vec![block_2.clone()],
2186                AuthorityIndex::new_for_test(2),
2187                vec![our_index],
2188                Some(Duration::from_secs(10)),
2189            )
2190            .await;
2191        network_client
2192            .stub_fetch_latest_blocks(
2193                vec![block_2],
2194                AuthorityIndex::new_for_test(2),
2195                vec![our_index],
2196                None,
2197            )
2198            .await;
2199
2200        // For peer 3 we give a block with lowest round
2201        let block_3 = expected_blocks.pop().unwrap();
2202        network_client
2203            .stub_fetch_latest_blocks(
2204                vec![block_3.clone()],
2205                AuthorityIndex::new_for_test(3),
2206                vec![our_index],
2207                Some(Duration::from_secs(10)),
2208            )
2209            .await;
2210        network_client
2211            .stub_fetch_latest_blocks(
2212                vec![block_3],
2213                AuthorityIndex::new_for_test(3),
2214                vec![our_index],
2215                None,
2216            )
2217            .await;
2218
2219        // WHEN start the synchronizer and wait for a couple of seconds
2220        let handle = Synchronizer::start(
2221            network_client.clone(),
2222            context.clone(),
2223            core_dispatcher.clone(),
2224            commit_vote_monitor,
2225            block_verifier,
2226            dag_state,
2227            true,
2228        );
2229
2230        // Wait at least for the timeout time
2231        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2232
2233        // Assert that core has been called to set the min propose round
2234        assert_eq!(
2235            core_dispatcher.get_last_own_proposed_round().await,
2236            vec![10]
2237        );
2238
2239        // Ensure that all the requests have been called
2240        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2241
2242        // And we got one retry
2243        assert_eq!(
2244            context
2245                .metrics
2246                .node_metrics
2247                .sync_last_known_own_block_retries
2248                .get(),
2249            1
2250        );
2251
2252        // Ensure that no panic occurred
2253        if let Err(err) = handle.stop().await {
2254            if err.is_panic() {
2255                std::panic::resume_unwind(err.into_panic());
2256            }
2257        }
2258    }
2259    #[derive(Default)]
2260    struct SyncMockDispatcher {
2261        missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2262        added_blocks: Mutex<Vec<VerifiedBlock>>,
2263    }
2264
2265    #[async_trait::async_trait]
2266    impl CoreThreadDispatcher for SyncMockDispatcher {
2267        async fn get_missing_blocks(
2268            &self,
2269        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2270            Ok(self.missing_blocks.lock().await.clone())
2271        }
2272        async fn add_blocks(
2273            &self,
2274            blocks: Vec<VerifiedBlock>,
2275        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2276            let mut guard = self.added_blocks.lock().await;
2277            guard.extend(blocks.clone());
2278            Ok(blocks.iter().map(|b| b.reference()).collect())
2279        }
2280
2281        // Stub out the remaining CoreThreadDispatcher methods with defaults:
2282
2283        async fn check_block_refs(
2284            &self,
2285            block_refs: Vec<BlockRef>,
2286        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2287            // Echo back the requested refs by default
2288            Ok(block_refs.into_iter().collect())
2289        }
2290
2291        async fn add_certified_commits(
2292            &self,
2293            _commits: CertifiedCommits,
2294        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2295            // No additional certified-commit logic in tests
2296            Ok(BTreeSet::new())
2297        }
2298
2299        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2300            Ok(())
2301        }
2302
2303        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2304            Ok(())
2305        }
2306
2307        fn set_propagation_delay_and_quorum_rounds(
2308            &self,
2309            _delay: Round,
2310            _received_quorum_rounds: Vec<QuorumRound>,
2311            _accepted_quorum_rounds: Vec<QuorumRound>,
2312        ) -> Result<(), CoreError> {
2313            Ok(())
2314        }
2315
2316        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2317            Ok(())
2318        }
2319
2320        fn highest_received_rounds(&self) -> Vec<Round> {
2321            Vec::new()
2322        }
2323    }
2324
2325    #[tokio::test(flavor = "current_thread")]
2326    async fn known_before_random_peer_fetch() {
2327        {
2328            // 1) Setup 10‐node context and in‐mem DAG
2329            let (ctx, _) = Context::new_for_test(10);
2330            let context = Arc::new(ctx);
2331            let store = Arc::new(MemStore::new());
2332            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2333            let inflight = InflightBlocksMap::new();
2334
2335            // 2) One missing block
2336            let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2337            let missing_ref = missing_vb.reference();
2338            let missing_blocks = BTreeMap::from([(
2339                missing_ref,
2340                BTreeSet::from([
2341                    AuthorityIndex::new_for_test(2),
2342                    AuthorityIndex::new_for_test(3),
2343                    AuthorityIndex::new_for_test(4),
2344                ]),
2345            )]);
2346
2347            // 3) Prepare mocks and stubs
2348            let network_client = Arc::new(MockNetworkClient::default());
2349            // Stub *all*  authorities so none panic:
2350            for i in 1..=9 {
2351                let peer = AuthorityIndex::new_for_test(i);
2352                if i == 1 || i == 4 {
2353                    network_client
2354                        .stub_fetch_blocks(
2355                            vec![missing_vb.clone()],
2356                            peer,
2357                            Some(2 * FETCH_REQUEST_TIMEOUT),
2358                        )
2359                        .await;
2360                    continue;
2361                }
2362                network_client
2363                    .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2364                    .await;
2365            }
2366
2367            // 4) Invoke knowledge-based fetch and random fallback selection
2368            //    deterministically
2369            let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2370        ::fetch_blocks_from_authorities(
2371            context.clone(),
2372            inflight.clone(),
2373            network_client.clone(),
2374            missing_blocks,
2375            dag_state.clone(),
2376        )
2377            .await;
2378
2379            // 5) Assert we got exactly two fetches - two from the first two who are aware
2380            //    of the missing block (authority 2 and 3)
2381            assert_eq!(results.len(), 2);
2382
2383            // 6) The  knowledge-based‐fetch went to peer 2 and 3
2384            let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2385            assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2386            let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2387            assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2388            // 7) Verify the returned bytes correspond to that block
2389            let expected = missing_vb.serialized().clone();
2390            assert_eq!(hot_bytes, &vec![expected]);
2391        }
2392    }
2393
2394    #[tokio::test(flavor = "current_thread")]
2395    async fn known_before_periodic_peer_fetch_larger_scenario() {
2396        use std::{
2397            collections::{BTreeMap, BTreeSet},
2398            sync::Arc,
2399        };
2400
2401        use parking_lot::RwLock;
2402
2403        use crate::{
2404            block::{Round, TestBlock, VerifiedBlock},
2405            context::Context,
2406        };
2407
2408        // 1) Setup a 10-node context, in-memory DAG, and inflight map
2409        let (ctx, _) = Context::new_for_test(10);
2410        let context = Arc::new(ctx);
2411        let store = Arc::new(MemStore::new());
2412        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2413        let inflight = InflightBlocksMap::new();
2414        let network_client = Arc::new(MockNetworkClient::default());
2415
2416        // 2) Create 1000 missing blocks known by authorities 0, 2, and 3
2417        let mut missing_blocks = BTreeMap::new();
2418        let mut missing_vbs = Vec::new();
2419        let known_number_blocks = 10;
2420        for i in 0..1000 {
2421            let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2422            let r = vb.reference();
2423            if i < known_number_blocks {
2424                // First 10 blocks are known by authorities 0, 2
2425                missing_blocks.insert(
2426                    r,
2427                    BTreeSet::from([
2428                        AuthorityIndex::new_for_test(0),
2429                        AuthorityIndex::new_for_test(2),
2430                    ]),
2431                );
2432            } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2433                // Second 10 blocks are known by authorities 0, 3
2434                missing_blocks.insert(
2435                    r,
2436                    BTreeSet::from([
2437                        AuthorityIndex::new_for_test(0),
2438                        AuthorityIndex::new_for_test(3),
2439                    ]),
2440                );
2441            } else {
2442                // The rest are only known by authority 0
2443                missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2444            }
2445            missing_vbs.push(vb);
2446        }
2447
2448        // 3) Stub fetches for knowledge-based peers (2 and 3)
2449        let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2450        let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2451            .iter()
2452            .map(|&peer| {
2453                let vbs = missing_vbs
2454                    .iter()
2455                    .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2456                    .take(context.parameters.max_blocks_per_sync)
2457                    .cloned()
2458                    .collect::<Vec<_>>();
2459                (peer, vbs)
2460            })
2461            .collect();
2462
2463        for (peer, vbs) in known_vbs_by_peer {
2464            if peer == AuthorityIndex::new_for_test(2) {
2465                // Simulate timeout for peer 2, then fallback to peer 5
2466                network_client
2467                    .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2468                    .await;
2469                network_client
2470                    .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2471                    .await;
2472            } else {
2473                network_client
2474                    .stub_fetch_blocks(vbs.clone(), peer, None)
2475                    .await;
2476            }
2477        }
2478
2479        // 4) Stub fetches from periodic path peers (1 and 4)
2480        network_client
2481            .stub_fetch_blocks(
2482                missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2483                AuthorityIndex::new_for_test(1),
2484                None,
2485            )
2486            .await;
2487
2488        network_client
2489            .stub_fetch_blocks(
2490                missing_vbs[context.parameters.max_blocks_per_sync
2491                    ..2 * context.parameters.max_blocks_per_sync]
2492                    .to_vec(),
2493                AuthorityIndex::new_for_test(4),
2494                None,
2495            )
2496            .await;
2497
2498        // 5) Execute the fetch logic
2499        let results = Synchronizer::<
2500            MockNetworkClient,
2501            NoopBlockVerifier,
2502            SyncMockDispatcher,
2503        >::fetch_blocks_from_authorities(
2504            context.clone(),
2505            inflight.clone(),
2506            network_client.clone(),
2507            missing_blocks,
2508            dag_state.clone(),
2509        )
2510            .await;
2511
2512        // 6) Assert we got 4 fetches: peer 2 (timed out), fallback to 5, then periodic
2513        //    from 1 and 4
2514        assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2515
2516        // 7) First fetch from peer 3 (knowledge-based)
2517        let (_guard3, bytes3, peer3) = &results[0];
2518        assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
2519        let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
2520            .iter()
2521            .map(|vb| vb.serialized().clone())
2522            .collect::<Vec<_>>();
2523        assert_eq!(bytes3, &expected2);
2524
2525        // 8) Second fetch from peer 1 (periodic)
2526        let (_guard1, bytes1, peer1) = &results[1];
2527        assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
2528        let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
2529            .iter()
2530            .map(|vb| vb.serialized().clone())
2531            .collect::<Vec<_>>();
2532        assert_eq!(bytes1, &expected1);
2533
2534        // 9) Third fetch from peer 4 (periodic)
2535        let (_guard4, bytes4, peer4) = &results[2];
2536        assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
2537        let expected4 = missing_vbs
2538            [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
2539            .iter()
2540            .map(|vb| vb.serialized().clone())
2541            .collect::<Vec<_>>();
2542        assert_eq!(bytes4, &expected4);
2543
2544        // 10) Fourth fetch from peer 5 (fallback after peer 2 timeout)
2545        let (_guard5, bytes5, peer5) = &results[3];
2546        assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
2547        let expected5 = missing_vbs[0..known_number_blocks]
2548            .iter()
2549            .map(|vb| vb.serialized().clone())
2550            .collect::<Vec<_>>();
2551        assert_eq!(bytes5, &expected5);
2552    }
2553}