consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
7    sync::Arc,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::prelude::{IteratorRandom, SeedableRng, SliceRandom, StdRng};
24use tap::TapFallible;
25use tokio::{
26    runtime::Handle,
27    sync::{mpsc::error::TrySendError, oneshot},
28    task::{JoinError, JoinSet},
29    time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34    BlockAPI, CommitIndex, Round,
35    authority_service::COMMIT_LAG_MULTIPLIER,
36    block::{BlockRef, GENESIS_ROUND, SignedBlock, VerifiedBlock},
37    block_verifier::BlockVerifier,
38    commit_vote_monitor::CommitVoteMonitor,
39    context::Context,
40    core_thread::CoreThreadDispatcher,
41    dag_state::DagState,
42    error::{ConsensusError, ConsensusResult},
43    network::NetworkClient,
44};
45
46/// The number of concurrent fetch blocks requests per authority
47const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49/// The maximal additional blocks (parents) that can be fetched.
50// TODO: This is a temporary value, and should be removed once the protocol
51// version is updated to support batching
52pub(crate) const MAX_ADDITIONAL_BLOCKS: usize = 10;
53
54/// The timeout for synchronizer to fetch blocks from a given peer authority.
55const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
56
57/// The timeout for periodic synchronizer to fetch blocks from the peers.
58const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
59
60/// The maximum number of authorities from which we will try to fetch blocks at
61/// the same moment. The guard will protect that we will not ask from more than
62/// this number of authorities at the same time.
63const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 3;
64
65/// The maximum number of peers from which the periodic synchronizer will
66/// request blocks
67const MAX_PERIODIC_SYNC_PEERS: usize = 4;
68
69/// The maximum number of peers in the periodic synchronizer which are chosen
70/// totally random to fetch blocks from. The other peers will be chosen based on
71/// their knowledge of the DAG.
72const MAX_PERIODIC_SYNC_RANDOM_PEERS: usize = 2;
73
74struct BlocksGuard {
75    map: Arc<InflightBlocksMap>,
76    block_refs: BTreeSet<BlockRef>,
77    peer: AuthorityIndex,
78}
79
80impl Drop for BlocksGuard {
81    fn drop(&mut self) {
82        self.map.unlock_blocks(&self.block_refs, self.peer);
83    }
84}
85
86// Keeps a mapping between the missing blocks that have been instructed to be
87// fetched and the authorities that are currently fetching them. For a block ref
88// there is a maximum number of authorities that can concurrently fetch it. The
89// authority ids that are currently fetching a block are set on the
90// corresponding `BTreeSet` and basically they act as "locks".
91struct InflightBlocksMap {
92    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
93}
94
95impl InflightBlocksMap {
96    fn new() -> Arc<Self> {
97        Arc::new(Self {
98            inner: Mutex::new(HashMap::new()),
99        })
100    }
101
102    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
103    /// avoid re-fetching the missing blocks from too many authorities at
104    /// the same time, thus we limit the concurrency per block by attempting
105    /// to lock per block. If a block is already fetched by the maximum allowed
106    /// number of authorities, then the block ref will not be included in the
107    /// returned set. The method returns all the block refs that have been
108    /// successfully locked and allowed to be fetched.
109    fn lock_blocks(
110        self: &Arc<Self>,
111        missing_block_refs: BTreeSet<BlockRef>,
112        peer: AuthorityIndex,
113    ) -> Option<BlocksGuard> {
114        let mut blocks = BTreeSet::new();
115        let mut inner = self.inner.lock();
116
117        for block_ref in missing_block_refs {
118            // check that the number of authorities that are already instructed to fetch the
119            // block is not higher than the allowed and the `peer_index` has not
120            // already been instructed to do that.
121            let authorities = inner.entry(block_ref).or_default();
122            if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
123                && authorities.get(&peer).is_none()
124            {
125                assert!(authorities.insert(peer));
126                blocks.insert(block_ref);
127            }
128        }
129
130        if blocks.is_empty() {
131            None
132        } else {
133            Some(BlocksGuard {
134                map: self.clone(),
135                block_refs: blocks,
136                peer,
137            })
138        }
139    }
140
141    /// Unlocks the provided block references for the given `peer`. The
142    /// unlocking is strict, meaning that if this method is called for a
143    /// specific block ref and peer more times than the corresponding lock
144    /// has been called, it will panic.
145    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
146        // Now mark all the blocks as fetched from the map
147        let mut blocks_to_fetch = self.inner.lock();
148        for block_ref in block_refs {
149            let authorities = blocks_to_fetch
150                .get_mut(block_ref)
151                .expect("Should have found a non empty map");
152
153            assert!(authorities.remove(&peer), "Peer index should be present!");
154
155            // if the last one then just clean up
156            if authorities.is_empty() {
157                blocks_to_fetch.remove(block_ref);
158            }
159        }
160    }
161
162    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
163    /// and lock now again the referenced block refs. The swap is best
164    /// effort and there is no guarantee that the `peer` will be able to
165    /// acquire the new locks.
166    fn swap_locks(
167        self: &Arc<Self>,
168        blocks_guard: BlocksGuard,
169        peer: AuthorityIndex,
170    ) -> Option<BlocksGuard> {
171        let block_refs = blocks_guard.block_refs.clone();
172
173        // Explicitly drop the guard
174        drop(blocks_guard);
175
176        // Now create new guard
177        self.lock_blocks(block_refs, peer)
178    }
179
180    #[cfg(test)]
181    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
182        let inner = self.inner.lock();
183        inner.len()
184    }
185}
186
187enum Command {
188    FetchBlocks {
189        missing_block_refs: BTreeSet<BlockRef>,
190        peer_index: AuthorityIndex,
191        result: oneshot::Sender<Result<(), ConsensusError>>,
192    },
193    FetchOwnLastBlock,
194    KickOffScheduler,
195}
196
197pub(crate) struct SynchronizerHandle {
198    commands_sender: Sender<Command>,
199    tasks: tokio::sync::Mutex<JoinSet<()>>,
200}
201
202impl SynchronizerHandle {
203    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
204    /// block_refs set - from the peer authority.
205    pub(crate) async fn fetch_blocks(
206        &self,
207        missing_block_refs: BTreeSet<BlockRef>,
208        peer_index: AuthorityIndex,
209    ) -> ConsensusResult<()> {
210        let (sender, receiver) = oneshot::channel();
211        self.commands_sender
212            .send(Command::FetchBlocks {
213                missing_block_refs,
214                peer_index,
215                result: sender,
216            })
217            .await
218            .map_err(|_err| ConsensusError::Shutdown)?;
219        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
220    }
221
222    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
223        let mut tasks = self.tasks.lock().await;
224        tasks.abort_all();
225        while let Some(result) = tasks.join_next().await {
226            result?
227        }
228        Ok(())
229    }
230}
231
232/// `Synchronizer` oversees live block synchronization, crucial for node
233/// progress. Live synchronization refers to the process of retrieving missing
234/// blocks, particularly those essential for advancing a node when data from
235/// only a few rounds is absent. If a node significantly lags behind the
236/// network, `commit_syncer` handles fetching missing blocks via a more
237/// efficient approach. `Synchronizer` aims for swift catch-up employing two
238/// mechanisms:
239///
240/// 1. Explicitly requesting missing blocks from designated authorities via the
241///    "block send" path. This includes attempting to fetch any missing
242///    ancestors necessary for processing a received block. Such requests
243///    prioritize the block author, maximizing the chance of prompt retrieval. A
244///    locking mechanism allows concurrent requests for missing blocks from up
245///    to two authorities simultaneously, enhancing the chances of timely
246///    retrieval. Notably, if additional missing blocks arise during block
247///    processing, requests to the same authority are deferred to the scheduler.
248///
249/// 2. Periodically requesting missing blocks via a scheduler. This primarily
250///    serves to retrieve missing blocks that were not ancestors of a received
251///    block via the "block send" path. The scheduler operates on either a fixed
252///    periodic basis or is triggered immediately after explicit fetches
253///    described in (1), ensuring continued block retrieval if gaps persist.
254///
255/// Additionally to the above, the synchronizer can synchronize and fetch the
256/// last own proposed block from the network peers as best effort approach to
257/// recover node from amnesia and avoid making the node equivocate.
258pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
259    context: Arc<Context>,
260    commands_receiver: Receiver<Command>,
261    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
262    core_dispatcher: Arc<D>,
263    commit_vote_monitor: Arc<CommitVoteMonitor>,
264    dag_state: Arc<RwLock<DagState>>,
265    fetch_blocks_scheduler_task: JoinSet<()>,
266    fetch_own_last_block_task: JoinSet<()>,
267    network_client: Arc<C>,
268    block_verifier: Arc<V>,
269    inflight_blocks_map: Arc<InflightBlocksMap>,
270    commands_sender: Sender<Command>,
271}
272
273impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
274    /// Starts the synchronizer, which is responsible for fetching blocks from
275    /// other authorities and managing block synchronization tasks.
276    pub fn start(
277        network_client: Arc<C>,
278        context: Arc<Context>,
279        core_dispatcher: Arc<D>,
280        commit_vote_monitor: Arc<CommitVoteMonitor>,
281        block_verifier: Arc<V>,
282        dag_state: Arc<RwLock<DagState>>,
283        sync_last_known_own_block: bool,
284    ) -> Arc<SynchronizerHandle> {
285        let (commands_sender, commands_receiver) =
286            channel("consensus_synchronizer_commands", 1_000);
287        let inflight_blocks_map = InflightBlocksMap::new();
288
289        // Spawn the tasks to fetch the blocks from the others
290        let mut fetch_block_senders = BTreeMap::new();
291        let mut tasks = JoinSet::new();
292        for (index, _) in context.committee.authorities() {
293            if index == context.own_index {
294                continue;
295            }
296            let (sender, receiver) =
297                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
298            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
299                index,
300                network_client.clone(),
301                block_verifier.clone(),
302                commit_vote_monitor.clone(),
303                context.clone(),
304                core_dispatcher.clone(),
305                dag_state.clone(),
306                receiver,
307                commands_sender.clone(),
308            );
309            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
310            fetch_block_senders.insert(index, sender);
311        }
312
313        let commands_sender_clone = commands_sender.clone();
314
315        if sync_last_known_own_block {
316            commands_sender
317                .try_send(Command::FetchOwnLastBlock)
318                .expect("Failed to sync our last block");
319        }
320
321        // Spawn the task to listen to the requests & periodic runs
322        tasks.spawn(monitored_future!(async move {
323            let mut s = Self {
324                context,
325                commands_receiver,
326                fetch_block_senders,
327                core_dispatcher,
328                commit_vote_monitor,
329                fetch_blocks_scheduler_task: JoinSet::new(),
330                fetch_own_last_block_task: JoinSet::new(),
331                network_client,
332                block_verifier,
333                inflight_blocks_map,
334                commands_sender: commands_sender_clone,
335                dag_state,
336            };
337            s.run().await;
338        }));
339
340        Arc::new(SynchronizerHandle {
341            commands_sender,
342            tasks: tokio::sync::Mutex::new(tasks),
343        })
344    }
345
346    // The main loop to listen for the submitted commands.
347    async fn run(&mut self) {
348        // We want the synchronizer to run periodically every 200ms to fetch any missing
349        // blocks.
350        const PERIODIC_FETCH_TIMEOUT: Duration = Duration::from_millis(200);
351        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_TIMEOUT);
352
353        tokio::pin!(scheduler_timeout);
354
355        loop {
356            tokio::select! {
357                Some(command) = self.commands_receiver.recv() => {
358                    match command {
359                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
360                            if peer_index == self.context.own_index {
361                                error!("We should never attempt to fetch blocks from our own node");
362                                continue;
363                            }
364
365                            let peer_hostname = self.context.committee.authority(peer_index).hostname.clone();
366
367                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
368                            // task will take care syncing whatever is leftover.
369                            let missing_block_refs = missing_block_refs
370                                .into_iter()
371                                .take(self.context.parameters.max_blocks_per_sync)
372                                .collect();
373
374                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
375                            let Some(blocks_guard) = blocks_guard else {
376                                result.send(Ok(())).ok();
377                                continue;
378                            };
379
380                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
381                            // synchronization task will handle any still missing blocks in next run.
382                            let r = self
383                                .fetch_block_senders
384                                .get(&peer_index)
385                                .expect("Fatal error, sender should be present")
386                                .try_send(blocks_guard)
387                                .map_err(|err| {
388                                    match err {
389                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index,peer_hostname),
390                                        TrySendError::Closed(_) => ConsensusError::Shutdown
391                                    }
392                                });
393
394                            result.send(r).ok();
395                        }
396                        Command::FetchOwnLastBlock => {
397                            if self.fetch_own_last_block_task.is_empty() {
398                                self.start_fetch_own_last_block_task();
399                            }
400                        }
401                        Command::KickOffScheduler => {
402                            // just reset the scheduler timeout timer to run immediately if not already running.
403                            // If the scheduler is already running then just reduce the remaining time to run.
404                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
405                                Instant::now()
406                            } else {
407                                Instant::now() + PERIODIC_FETCH_TIMEOUT.checked_div(2).unwrap()
408                            };
409
410                            // only reset if it is earlier than the next deadline
411                            if timeout < scheduler_timeout.deadline() {
412                                scheduler_timeout.as_mut().reset(timeout);
413                            }
414                        }
415                    }
416                },
417                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
418                    match result {
419                        Ok(()) => {},
420                        Err(e) => {
421                            if e.is_cancelled() {
422                            } else if e.is_panic() {
423                                std::panic::resume_unwind(e.into_panic());
424                            } else {
425                                panic!("fetch our last block task failed: {e}");
426                            }
427                        },
428                    };
429                },
430                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
431                    match result {
432                        Ok(()) => {},
433                        Err(e) => {
434                            if e.is_cancelled() {
435                            } else if e.is_panic() {
436                                std::panic::resume_unwind(e.into_panic());
437                            } else {
438                                panic!("fetch blocks scheduler task failed: {e}");
439                            }
440                        },
441                    };
442                },
443                () = &mut scheduler_timeout => {
444                    // we want to start a new task only if the previous one has already finished.
445                    if self.fetch_blocks_scheduler_task.is_empty() {
446                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
447                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
448                            return;
449                        };
450                    }
451
452                    scheduler_timeout
453                        .as_mut()
454                        .reset(Instant::now() + PERIODIC_FETCH_TIMEOUT);
455                }
456            }
457        }
458    }
459
460    async fn fetch_blocks_from_authority(
461        peer_index: AuthorityIndex,
462        network_client: Arc<C>,
463        block_verifier: Arc<V>,
464        commit_vote_monitor: Arc<CommitVoteMonitor>,
465        context: Arc<Context>,
466        core_dispatcher: Arc<D>,
467        dag_state: Arc<RwLock<DagState>>,
468        mut receiver: Receiver<BlocksGuard>,
469        commands_sender: Sender<Command>,
470    ) {
471        const MAX_RETRIES: u32 = 3;
472        let peer_hostname = &context.committee.authority(peer_index).hostname;
473
474        let mut requests = FuturesUnordered::new();
475
476        loop {
477            tokio::select! {
478                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
479                    // get the highest accepted rounds
480                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
481
482                    // Record metrics for live synchronizer requests
483                    let metrics = &context.metrics.node_metrics;
484                    metrics
485                        .synchronizer_requested_blocks_by_peer
486                        .with_label_values(&[peer_hostname.as_str(), "live"])
487                        .inc_by(blocks_guard.block_refs.len() as u64);
488                    // Count requested blocks per authority and increment metric by one per authority
489                    let mut authors = HashSet::new();
490                    for block_ref in &blocks_guard.block_refs {
491                        authors.insert(block_ref.author);
492                    }
493                    for author in authors {
494                        let host = &context.committee.authority(author).hostname;
495                        metrics
496                            .synchronizer_requested_blocks_by_authority
497                            .with_label_values(&[host.as_str(), "live"])
498                            .inc();
499                    }
500
501                    requests.push(Self::fetch_blocks_request(
502                        network_client.clone(),
503                        peer_index,
504                        blocks_guard,
505                        highest_rounds,
506                        FETCH_REQUEST_TIMEOUT,
507                        1,
508                    ))
509                },
510                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
511                    match response {
512                        Ok(blocks) => {
513                            if let Err(err) = Self::process_fetched_blocks(blocks,
514                                peer_index,
515                                blocks_guard,
516                                core_dispatcher.clone(),
517                                block_verifier.clone(),
518                                commit_vote_monitor.clone(),
519                                context.clone(),
520                                commands_sender.clone(),
521                                "live"
522                            ).await {
523                                context.metrics.update_scoring_metrics_on_block_receival(
524                                    peer_index,
525                                    peer_hostname,
526                                    err.clone(),
527                                    "process_fetched_blocks",
528                                );
529                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
530                                context.metrics.node_metrics.synchronizer_process_fetched_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
531                            }
532                        },
533                        Err(_) => {
534                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
535                            if retries <= MAX_RETRIES {
536                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
537                            } else {
538                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
539                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
540                                drop(blocks_guard);
541                            }
542                        }
543                    }
544                },
545                else => {
546                    info!("Fetching blocks from authority {peer_index} task will now abort.");
547                    break;
548                }
549            }
550        }
551    }
552
553    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
554    /// error is returned then the verified blocks are immediately sent to
555    /// Core for processing.
556    async fn process_fetched_blocks(
557        mut serialized_blocks: Vec<Bytes>,
558        peer_index: AuthorityIndex,
559        requested_blocks_guard: BlocksGuard,
560        core_dispatcher: Arc<D>,
561        block_verifier: Arc<V>,
562        commit_vote_monitor: Arc<CommitVoteMonitor>,
563        context: Arc<Context>,
564        commands_sender: Sender<Command>,
565        sync_method: &str,
566    ) -> ConsensusResult<()> {
567        if serialized_blocks.is_empty() {
568            return Ok(());
569        }
570
571        // Limit the number of the returned blocks processed.
572        if context.protocol_config.consensus_batched_block_sync() {
573            serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
574        } else {
575            // Ensure that all the returned blocks do not go over the total max allowed
576            // returned blocks
577            if serialized_blocks.len()
578                > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
579            {
580                return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
581            }
582        }
583
584        // Verify all the fetched blocks
585        let blocks = Handle::current()
586            .spawn_blocking({
587                let block_verifier = block_verifier.clone();
588                let context = context.clone();
589                move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
590            })
591            .await
592            .expect("Spawn blocking should not fail")?;
593
594        if !context.protocol_config.consensus_batched_block_sync() {
595            // Get all the ancestors of the requested blocks only
596            let ancestors = blocks
597                .iter()
598                .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
599                .flat_map(|b| b.ancestors().to_vec())
600                .collect::<BTreeSet<BlockRef>>();
601
602            // Now confirm that the blocks are either between the ones requested, or they
603            // are parents of the requested blocks
604            for block in &blocks {
605                if !requested_blocks_guard
606                    .block_refs
607                    .contains(&block.reference())
608                    && !ancestors.contains(&block.reference())
609                {
610                    return Err(ConsensusError::UnexpectedFetchedBlock {
611                        index: peer_index,
612                        block_ref: block.reference(),
613                    });
614                }
615            }
616        }
617
618        // Record commit votes from the verified blocks.
619        for block in &blocks {
620            commit_vote_monitor.observe_block(block);
621        }
622
623        let metrics = &context.metrics.node_metrics;
624        let peer_hostname = &context.committee.authority(peer_index).hostname;
625        metrics
626            .synchronizer_fetched_blocks_by_peer
627            .with_label_values(&[peer_hostname.as_str(), sync_method])
628            .inc_by(blocks.len() as u64);
629        for block in &blocks {
630            let block_hostname = &context.committee.authority(block.author()).hostname;
631            metrics
632                .synchronizer_fetched_blocks_by_authority
633                .with_label_values(&[block_hostname.as_str(), sync_method])
634                .inc();
635        }
636
637        debug!(
638            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
639            blocks.len(),
640            blocks.iter().map(|b| b.reference().to_string()).join(", "),
641        );
642
643        // Now send them to core for processing. Ignore the returned missing blocks as
644        // we don't want this mechanism to keep feedback looping on fetching
645        // more blocks. The periodic synchronization will take care of that.
646        let missing_blocks = core_dispatcher
647            .add_blocks(blocks)
648            .await
649            .map_err(|_| ConsensusError::Shutdown)?;
650
651        // now release all the locked blocks as they have been fetched, verified &
652        // processed
653        drop(requested_blocks_guard);
654
655        // kick off immediately the scheduled synchronizer
656        if !missing_blocks.is_empty() {
657            // do not block here, so we avoid any possible cycles.
658            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
659            {
660                warn!("Commands channel is full")
661            }
662        }
663
664        context
665            .metrics
666            .node_metrics
667            .missing_blocks_after_fetch_total
668            .inc_by(missing_blocks.len() as u64);
669
670        Ok(())
671    }
672
673    fn get_highest_accepted_rounds(
674        dag_state: Arc<RwLock<DagState>>,
675        context: &Arc<Context>,
676    ) -> Vec<Round> {
677        let blocks = dag_state
678            .read()
679            .get_last_cached_block_per_authority(Round::MAX);
680        assert_eq!(blocks.len(), context.committee.size());
681
682        blocks
683            .into_iter()
684            .map(|(block, _)| block.round())
685            .collect::<Vec<_>>()
686    }
687
688    fn verify_blocks(
689        serialized_blocks: Vec<Bytes>,
690        block_verifier: Arc<V>,
691        context: &Context,
692        peer_index: AuthorityIndex,
693    ) -> ConsensusResult<Vec<VerifiedBlock>> {
694        let mut verified_blocks = Vec::new();
695
696        for serialized_block in serialized_blocks {
697            let signed_block: SignedBlock =
698                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
699
700            // TODO: dedup block verifications, here and with fetched blocks.
701            if let Err(e) = block_verifier.verify(&signed_block) {
702                // TODO: we might want to use a different metric to track the invalid "served"
703                // blocks from the invalid "proposed" ones.
704                let hostname = context.committee.authority(peer_index).hostname.clone();
705
706                context
707                    .metrics
708                    .node_metrics
709                    .invalid_blocks
710                    .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
711                    .inc();
712                warn!("Invalid block received from {}: {}", peer_index, e);
713                return Err(e);
714            }
715            let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
716
717            // Dropping is ok because the block will be refetched.
718            // TODO: improve efficiency, maybe suspend and continue processing the block
719            // asynchronously.
720            let now = context.clock.timestamp_utc_ms();
721            let drift = verified_block.timestamp_ms().saturating_sub(now) as u64;
722            if drift > 0 {
723                let peer_hostname = &context
724                    .committee
725                    .authority(verified_block.author())
726                    .hostname;
727                context
728                    .metrics
729                    .node_metrics
730                    .block_timestamp_drift_ms
731                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
732                    .inc_by(drift);
733
734                if context
735                    .protocol_config
736                    .consensus_median_timestamp_with_checkpoint_enforcement()
737                {
738                    trace!(
739                        "Synced block {} timestamp {} is in the future (now={}). Will not ignore as median based timestamp is enabled.",
740                        verified_block.reference(),
741                        verified_block.timestamp_ms(),
742                        now
743                    );
744                } else {
745                    warn!(
746                        "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
747                        verified_block.reference(),
748                        verified_block.timestamp_ms(),
749                        now
750                    );
751                    continue;
752                }
753            }
754
755            verified_blocks.push(verified_block);
756        }
757
758        Ok(verified_blocks)
759    }
760
761    async fn fetch_blocks_request(
762        network_client: Arc<C>,
763        peer: AuthorityIndex,
764        blocks_guard: BlocksGuard,
765        highest_rounds: Vec<Round>,
766        request_timeout: Duration,
767        mut retries: u32,
768    ) -> (
769        ConsensusResult<Vec<Bytes>>,
770        BlocksGuard,
771        u32,
772        AuthorityIndex,
773        Vec<Round>,
774    ) {
775        let start = Instant::now();
776        let resp = timeout(
777            request_timeout,
778            network_client.fetch_blocks(
779                peer,
780                blocks_guard
781                    .block_refs
782                    .clone()
783                    .into_iter()
784                    .collect::<Vec<_>>(),
785                highest_rounds.clone(),
786                request_timeout,
787            ),
788        )
789        .await;
790
791        fail_point_async!("consensus-delay");
792
793        let resp = match resp {
794            Ok(Err(err)) => {
795                // Add a delay before retrying - if that is needed. If request has timed out
796                // then eventually this will be a no-op.
797                sleep_until(start + request_timeout).await;
798                retries += 1;
799                Err(err)
800            } // network error
801            Err(err) => {
802                // timeout
803                sleep_until(start + request_timeout).await;
804                retries += 1;
805                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
806            }
807            Ok(result) => result,
808        };
809        (resp, blocks_guard, retries, peer, highest_rounds)
810    }
811
812    fn start_fetch_own_last_block_task(&mut self) {
813        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
814        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
815
816        let context = self.context.clone();
817        let dag_state = self.dag_state.clone();
818        let network_client = self.network_client.clone();
819        let block_verifier = self.block_verifier.clone();
820        let core_dispatcher = self.core_dispatcher.clone();
821
822        self.fetch_own_last_block_task
823            .spawn(monitored_future!(async move {
824                let _scope = monitored_scope("FetchOwnLastBlockTask");
825
826                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
827                    let network_client_cloned = network_client.clone();
828                    let own_index = context.own_index;
829                    async move {
830                        sleep(fetch_own_block_delay).await;
831                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
832                        (r, authority_index)
833                    }
834                };
835
836                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
837                                    let mut result = Vec::new();
838                                    for serialized_block in blocks {
839                                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
840                                        block_verifier.verify(&signed_block).tap_err(|err|{
841                                            let hostname = context.committee.authority(authority_index).hostname.clone();
842                                            context
843                                                .metrics
844                                                .node_metrics
845                                                .invalid_blocks
846                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
847                                                .inc();
848                                            warn!("Invalid block received from {}: {}", authority_index, err);
849                                        })?;
850
851                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
852                                        if verified_block.author() != context.own_index {
853                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
854                                        }
855                                        result.push(verified_block);
856                                    }
857                                    Ok(result)
858                };
859
860                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
861                let mut highest_round = GENESIS_ROUND;
862                // Keep track of the received responses to avoid fetching the own block header from same peer
863                let mut received_response = vec![false; context.committee.size()];
864                // Assume that our node is not Byzantine
865                received_response[context.own_index] = true;
866                let mut total_stake = context.committee.stake(context.own_index);
867                let mut retries = 0;
868                let mut retry_delay_step = Duration::from_millis(500);
869                'main:loop {
870                    if context.committee.size() == 1 {
871                        highest_round = dag_state.read().get_last_proposed_block().round();
872                        info!("Only one node in the network, will not try fetching own last block from peers.");
873                        break 'main;
874                    }
875
876                    // Ask all the other peers about our last block
877                    let mut results = FuturesUnordered::new();
878
879                    for (authority_index, _authority) in context.committee.authorities() {
880                        // Skip our own index and the ones that have already responded
881                        if !received_response[authority_index] {
882                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
883                        }
884                    }
885
886                    // Gather the results but wait to timeout as well
887                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
888                    tokio::pin!(timer);
889
890                    'inner: loop {
891                        tokio::select! {
892                            result = results.next() => {
893                                let Some((result, authority_index)) = result else {
894                                    break 'inner;
895                                };
896                                match result {
897                                    Ok(result) => {
898                                        match process_blocks(result, authority_index) {
899                                            Ok(blocks) => {
900                                                received_response[authority_index] = true;
901                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
902                                                highest_round = highest_round.max(max_round);
903
904                                                total_stake += context.committee.stake(authority_index);
905                                            },
906                                            Err(err) => {
907                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
908                                            }
909                                        }
910                                    },
911                                    Err(err) => {
912                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
913                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
914                                    }
915                                }
916                            },
917                            () = &mut timer => {
918                                info!("Timeout while trying to sync our own last block from peers");
919                                break 'inner;
920                            }
921                        }
922                    }
923
924                    // Request at least a quorum of 2f+1 stake to have replied back.
925                    if context.committee.reached_quorum(total_stake) {
926                        info!("A quorum, {} out of {} total stake, returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
927                        break 'main;
928                    } else {
929                        info!("Only {} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
930                    }
931
932                    retries += 1;
933                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
934                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
935
936                    sleep(retry_delay_step).await;
937
938                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
939                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
940                }
941
942                // Update the Core with the highest detected round
943                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
944
945                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
946                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
947                }
948            }));
949    }
950
951    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
952        let mut missing_blocks = self
953            .core_dispatcher
954            .get_missing_blocks()
955            .await
956            .map_err(|_err| ConsensusError::Shutdown)?;
957
958        // No reason to kick off the scheduler if there are no missing blocks to fetch
959        if missing_blocks.is_empty() {
960            return Ok(());
961        }
962
963        let context = self.context.clone();
964        let network_client = self.network_client.clone();
965        let block_verifier = self.block_verifier.clone();
966        let commit_vote_monitor = self.commit_vote_monitor.clone();
967        let core_dispatcher = self.core_dispatcher.clone();
968        let blocks_to_fetch = self.inflight_blocks_map.clone();
969        let commands_sender = self.commands_sender.clone();
970        let dag_state = self.dag_state.clone();
971
972        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
973        trace!(
974            "Commit lagging: {commit_lagging}, last commit index: {last_commit_index}, quorum commit index: {quorum_commit_index}"
975        );
976        if commit_lagging {
977            // If gc is enabled and we are commit lagging, then we don't want to enable the
978            // scheduler. As the new logic of processing the certified commits
979            // takes place we are guaranteed that commits will happen for all the certified
980            // commits.
981            if dag_state.read().gc_enabled() {
982                return Ok(());
983            }
984
985            // As node is commit lagging try to sync only the missing blocks that are within
986            // the acceptable round thresholds to sync. The rest we don't attempt to
987            // sync yet.
988            let highest_accepted_round = dag_state.read().highest_accepted_round();
989            missing_blocks = missing_blocks
990                .into_iter()
991                .take_while(|(block_ref, _)| {
992                    block_ref.round <= highest_accepted_round + self.missing_block_round_threshold()
993                })
994                .collect::<BTreeMap<_, _>>();
995
996            // If no missing blocks are within the acceptable thresholds to sync while we
997            // commit lag, then we disable the scheduler completely for this run.
998            if missing_blocks.is_empty() {
999                trace!(
1000                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
1001                );
1002                self.context
1003                    .metrics
1004                    .node_metrics
1005                    .fetch_blocks_scheduler_skipped
1006                    .with_label_values(&["commit_lagging"])
1007                    .inc();
1008                return Ok(());
1009            }
1010        }
1011
1012        self.fetch_blocks_scheduler_task
1013            .spawn(monitored_future!(async move {
1014                let _scope = monitored_scope("FetchMissingBlocksScheduler");
1015
1016                context
1017                    .metrics
1018                    .node_metrics
1019                    .fetch_blocks_scheduler_inflight
1020                    .inc();
1021                let total_requested = missing_blocks.len();
1022
1023                fail_point_async!("consensus-delay");
1024
1025                // Fetch blocks from peers
1026                let results = Self::fetch_blocks_from_authorities(
1027                    context.clone(),
1028                    blocks_to_fetch.clone(),
1029                    network_client,
1030                    missing_blocks,
1031                    dag_state,
1032                )
1033                .await;
1034                context
1035                    .metrics
1036                    .node_metrics
1037                    .fetch_blocks_scheduler_inflight
1038                    .dec();
1039                if results.is_empty() {
1040                    warn!("No results returned while requesting missing blocks");
1041                    return;
1042                }
1043
1044                // Now process the returned results
1045                let mut total_fetched = 0;
1046                for (blocks_guard, fetched_blocks, peer) in results {
1047                    total_fetched += fetched_blocks.len();
1048
1049                    if let Err(err) = Self::process_fetched_blocks(
1050                        fetched_blocks,
1051                        peer,
1052                        blocks_guard,
1053                        core_dispatcher.clone(),
1054                        block_verifier.clone(),
1055                        commit_vote_monitor.clone(),
1056                        context.clone(),
1057                        commands_sender.clone(),
1058                        "periodic",
1059                    )
1060                    .await
1061                    {
1062                        warn!(
1063                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
1064                        );
1065                    }
1066                }
1067
1068                debug!(
1069                    "Total blocks requested to fetch: {}, total fetched: {}",
1070                    total_requested, total_fetched
1071                );
1072            }));
1073        Ok(())
1074    }
1075
1076    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
1077        let last_commit_index = self.dag_state.read().last_commit_index();
1078        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1079        let commit_threshold = last_commit_index
1080            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1081        (
1082            commit_threshold < quorum_commit_index,
1083            last_commit_index,
1084            quorum_commit_index,
1085        )
1086    }
1087
1088    /// The number of rounds above the highest accepted round to allow fetching
1089    /// missing blocks via the periodic synchronization. Any missing blocks
1090    /// of higher rounds are considered too far in the future to fetch. This
1091    /// property is used only when it's detected that the node has fallen
1092    /// behind on its commit compared to the rest of the network,
1093    /// otherwise scheduler will attempt to fetch any missing block.
1094    fn missing_block_round_threshold(&self) -> Round {
1095        self.context.parameters.commit_sync_batch_size
1096    }
1097
1098    /// Fetches the given `missing_blocks` from up to `MAX_PEERS` authorities in
1099    /// parallel:
1100    ///
1101    /// Randomly select `MAX_PEERS - MAX_RANDOM_PEERS` peers from those who
1102    /// are known to hold some missing block, requesting up to
1103    /// `MAX_BLOCKS_PER_FETCH` block refs per peer.
1104    ///
1105    /// Randomly select `MAX_RANDOM_PEERS` additional peers from the
1106    ///  committee (excluding self and those already selected).
1107    ///
1108    /// The method returns a vector with the fetched blocks from each peer that
1109    /// successfully responded and any corresponding additional ancestor blocks.
1110    /// Each element of the vector is a tuple which contains the requested
1111    /// missing block refs, the returned blocks and the peer authority index.
1112    async fn fetch_blocks_from_authorities(
1113        context: Arc<Context>,
1114        inflight_blocks: Arc<InflightBlocksMap>,
1115        network_client: Arc<C>,
1116        missing_blocks: BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>,
1117        dag_state: Arc<RwLock<DagState>>,
1118    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1119        // Step 1: Map authorities to missing blocks that they are aware of
1120        let mut authority_to_blocks: HashMap<AuthorityIndex, Vec<BlockRef>> = HashMap::new();
1121        for (missing_block_ref, authorities) in &missing_blocks {
1122            for author in authorities {
1123                if author == &context.own_index {
1124                    // Skip our own index as we don't want to fetch blocks from ourselves
1125                    continue;
1126                }
1127                authority_to_blocks
1128                    .entry(*author)
1129                    .or_default()
1130                    .push(*missing_block_ref);
1131            }
1132        }
1133
1134        // Step 2: Choose at most MAX_PEERS-MAX_RANDOM_PEERS peers from those who are
1135        // aware of some missing blocks
1136
1137        #[cfg(not(test))]
1138        let mut rng = StdRng::from_entropy();
1139
1140        // Randomly pick up MAX_PEERS - MAX_RANDOM_PEERS authorities that are aware of
1141        // missing blocks
1142        #[cfg(not(test))]
1143        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> =
1144            authority_to_blocks
1145                .iter()
1146                .choose_multiple(
1147                    &mut rng,
1148                    MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS,
1149                )
1150                .into_iter()
1151                .map(|(&peer, blocks)| {
1152                    let limited_blocks = blocks
1153                        .iter()
1154                        .copied()
1155                        .take(context.parameters.max_blocks_per_sync)
1156                        .collect();
1157                    (peer, limited_blocks, "periodic_known")
1158                })
1159                .collect();
1160        #[cfg(test)]
1161        // Deterministically pick the smallest (MAX_PEERS - MAX_RANDOM_PEERS) authority indices
1162        let mut chosen_peers_with_blocks: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = {
1163            let mut items: Vec<(AuthorityIndex, Vec<BlockRef>, &str)> = authority_to_blocks
1164                .iter()
1165                .map(|(&peer, blocks)| {
1166                    let limited_blocks = blocks
1167                        .iter()
1168                        .copied()
1169                        .take(context.parameters.max_blocks_per_sync)
1170                        .collect();
1171                    (peer, limited_blocks, "periodic_known")
1172                })
1173                .collect();
1174            // Sort by AuthorityIndex (natural order), then take the first MAX_PEERS -
1175            // MAX_RANDOM_PEERS
1176            items.sort_by_key(|(peer, _, _)| *peer);
1177            items
1178                .into_iter()
1179                .take(MAX_PERIODIC_SYNC_PEERS - MAX_PERIODIC_SYNC_RANDOM_PEERS)
1180                .collect()
1181        };
1182
1183        // Step 3: Choose at most two random peers not known to be aware of the missing
1184        // blocks
1185        let already_chosen: HashSet<AuthorityIndex> = chosen_peers_with_blocks
1186            .iter()
1187            .map(|(peer, _, _)| *peer)
1188            .collect();
1189
1190        let random_candidates: Vec<_> = context
1191            .committee
1192            .authorities()
1193            .filter_map(|(peer_index, _)| {
1194                (peer_index != context.own_index && !already_chosen.contains(&peer_index))
1195                    .then_some(peer_index)
1196            })
1197            .collect();
1198        #[cfg(test)]
1199        let random_peers: Vec<AuthorityIndex> = random_candidates
1200            .into_iter()
1201            .take(MAX_PERIODIC_SYNC_RANDOM_PEERS)
1202            .collect();
1203        #[cfg(not(test))]
1204        let random_peers: Vec<AuthorityIndex> = random_candidates
1205            .into_iter()
1206            .choose_multiple(&mut rng, MAX_PERIODIC_SYNC_RANDOM_PEERS);
1207
1208        #[cfg_attr(test, allow(unused_mut))]
1209        let mut all_missing_blocks: Vec<BlockRef> = missing_blocks.keys().cloned().collect();
1210        // Shuffle the missing blocks in case the first ones are blocked by irresponsive
1211        // peers
1212        #[cfg(not(test))]
1213        all_missing_blocks.shuffle(&mut rng);
1214
1215        let mut block_chunks = all_missing_blocks.chunks(context.parameters.max_blocks_per_sync);
1216
1217        for peer in random_peers {
1218            if let Some(chunk) = block_chunks.next() {
1219                chosen_peers_with_blocks.push((peer, chunk.to_vec(), "periodic_random"));
1220            } else {
1221                break;
1222            }
1223        }
1224
1225        let mut request_futures = FuturesUnordered::new();
1226
1227        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1228
1229        // Record the missing blocks per authority for metrics
1230        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1231        for block in &all_missing_blocks {
1232            missing_blocks_per_authority[block.author] += 1;
1233        }
1234        for (missing, (_, authority)) in missing_blocks_per_authority
1235            .into_iter()
1236            .zip(context.committee.authorities())
1237        {
1238            context
1239                .metrics
1240                .node_metrics
1241                .synchronizer_missing_blocks_by_authority
1242                .with_label_values(&[&authority.hostname])
1243                .inc_by(missing as u64);
1244            context
1245                .metrics
1246                .node_metrics
1247                .synchronizer_current_missing_blocks_by_authority
1248                .with_label_values(&[&authority.hostname])
1249                .set(missing as i64);
1250        }
1251
1252        // Look at peers that were not chosen yet and try to fetch blocks from them if
1253        // needed later
1254        #[cfg_attr(test, expect(unused_mut))]
1255        let mut remaining_peers: Vec<_> = context
1256            .committee
1257            .authorities()
1258            .filter_map(|(peer_index, _)| {
1259                if peer_index != context.own_index
1260                    && !chosen_peers_with_blocks
1261                        .iter()
1262                        .any(|(chosen_peer, _, _)| *chosen_peer == peer_index)
1263                {
1264                    Some(peer_index)
1265                } else {
1266                    None
1267                }
1268            })
1269            .collect();
1270
1271        #[cfg(not(test))]
1272        remaining_peers.shuffle(&mut rng);
1273        let mut remaining_peers = remaining_peers.into_iter();
1274
1275        // Send the initial requests
1276        for (peer, blocks_to_request, label) in chosen_peers_with_blocks {
1277            let peer_hostname = &context.committee.authority(peer).hostname;
1278            let block_refs = blocks_to_request.iter().cloned().collect::<BTreeSet<_>>();
1279
1280            // Lock the blocks to be fetched. If no lock can be acquired for any of the
1281            // blocks then don't bother.
1282            if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1283                info!(
1284                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1285                    block_refs.len(),
1286                    peer,
1287                    peer_hostname,
1288                    block_refs
1289                        .iter()
1290                        .map(|b| b.to_string())
1291                        .collect::<Vec<_>>()
1292                        .join(", ")
1293                );
1294                // Record metrics about requested blocks
1295                let metrics = &context.metrics.node_metrics;
1296                metrics
1297                    .synchronizer_requested_blocks_by_peer
1298                    .with_label_values(&[peer_hostname.as_str(), label])
1299                    .inc_by(block_refs.len() as u64);
1300                for block_ref in &block_refs {
1301                    let block_hostname = &context.committee.authority(block_ref.author).hostname;
1302                    metrics
1303                        .synchronizer_requested_blocks_by_authority
1304                        .with_label_values(&[block_hostname.as_str(), label])
1305                        .inc();
1306                }
1307                request_futures.push(Self::fetch_blocks_request(
1308                    network_client.clone(),
1309                    peer,
1310                    blocks_guard,
1311                    highest_rounds.clone(),
1312                    FETCH_REQUEST_TIMEOUT,
1313                    1,
1314                ));
1315            }
1316        }
1317
1318        let mut results = Vec::new();
1319        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1320
1321        tokio::pin!(fetcher_timeout);
1322
1323        loop {
1324            tokio::select! {
1325                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1326                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1327                    match response {
1328                        Ok(fetched_blocks) => {
1329                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1330                            results.push((blocks_guard, fetched_blocks, peer_index));
1331
1332                            // no more pending requests are left, just break the loop
1333                            if request_futures.is_empty() {
1334                                break;
1335                            }
1336                        },
1337                        Err(_) => {
1338                            context.metrics.node_metrics.synchronizer_fetch_failures_by_peer.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1339                            // try again if there is any peer left
1340                            if let Some(next_peer) = remaining_peers.next() {
1341                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1342                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1343                                    info!(
1344                                        "Retrying syncing {} missing blocks from peer {}: {}",
1345                                        blocks_guard.block_refs.len(),
1346                                        peer_hostname,
1347                                        blocks_guard.block_refs
1348                                            .iter()
1349                                            .map(|b| b.to_string())
1350                                            .collect::<Vec<_>>()
1351                                            .join(", ")
1352                                    );
1353                                    let block_refs = blocks_guard.block_refs.clone();
1354                                    // Record metrics about requested blocks
1355                                    let metrics = &context.metrics.node_metrics;
1356                                    metrics
1357                                        .synchronizer_requested_blocks_by_peer
1358                                        .with_label_values(&[peer_hostname.as_str(), "periodic_retry"])
1359                                        .inc_by(block_refs.len() as u64);
1360                                    for block_ref in &block_refs {
1361                                        let block_hostname =
1362                                            &context.committee.authority(block_ref.author).hostname;
1363                                        metrics
1364                                            .synchronizer_requested_blocks_by_authority
1365                                            .with_label_values(&[block_hostname.as_str(), "periodic_retry"])
1366                                            .inc();
1367                                    }
1368                                    request_futures.push(Self::fetch_blocks_request(
1369                                        network_client.clone(),
1370                                        next_peer,
1371                                        blocks_guard,
1372                                        highest_rounds,
1373                                        FETCH_REQUEST_TIMEOUT,
1374                                        1,
1375                                    ));
1376                                } else {
1377                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1378                                }
1379                            } else {
1380                                debug!("No more peers left to fetch blocks");
1381                            }
1382                        }
1383                    }
1384                },
1385                _ = &mut fetcher_timeout => {
1386                    debug!("Timed out while fetching missing blocks");
1387                    break;
1388                }
1389            }
1390        }
1391
1392        results
1393    }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398    use std::{
1399        collections::{BTreeMap, BTreeSet},
1400        sync::Arc,
1401        time::Duration,
1402    };
1403
1404    use async_trait::async_trait;
1405    use bytes::Bytes;
1406    use consensus_config::{AuthorityIndex, Parameters};
1407    use iota_metrics::monitored_mpsc;
1408    use parking_lot::RwLock;
1409    use tokio::{sync::Mutex, time::sleep};
1410
1411    use crate::{
1412        CommitDigest, CommitIndex,
1413        authority_service::COMMIT_LAG_MULTIPLIER,
1414        block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1415        block_verifier::NoopBlockVerifier,
1416        commit::{CertifiedCommits, CommitRange, CommitVote, TrustedCommit},
1417        commit_vote_monitor::CommitVoteMonitor,
1418        context::Context,
1419        core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1420        dag_state::DagState,
1421        error::{ConsensusError, ConsensusResult},
1422        network::{BlockStream, NetworkClient},
1423        round_prober::QuorumRound,
1424        storage::mem_store::MemStore,
1425        synchronizer::{
1426            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1427        },
1428    };
1429
1430    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1431    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1432    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1433    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1434
1435    #[derive(Default)]
1436    struct MockNetworkClient {
1437        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1438        fetch_latest_blocks_requests:
1439            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1440    }
1441
1442    impl MockNetworkClient {
1443        async fn stub_fetch_blocks(
1444            &self,
1445            blocks: Vec<VerifiedBlock>,
1446            peer: AuthorityIndex,
1447            latency: Option<Duration>,
1448        ) {
1449            let mut lock = self.fetch_blocks_requests.lock().await;
1450            let block_refs = blocks
1451                .iter()
1452                .map(|block| block.reference())
1453                .collect::<Vec<_>>();
1454            lock.insert((block_refs, peer), (blocks, latency));
1455        }
1456
1457        async fn stub_fetch_latest_blocks(
1458            &self,
1459            blocks: Vec<VerifiedBlock>,
1460            peer: AuthorityIndex,
1461            authorities: Vec<AuthorityIndex>,
1462            latency: Option<Duration>,
1463        ) {
1464            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1465            lock.entry((peer, authorities))
1466                .or_default()
1467                .push((blocks, latency));
1468        }
1469
1470        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1471            let lock = self.fetch_latest_blocks_requests.lock().await;
1472            lock.len()
1473        }
1474    }
1475
1476    #[async_trait]
1477    impl NetworkClient for MockNetworkClient {
1478        const SUPPORT_STREAMING: bool = false;
1479
1480        async fn send_block(
1481            &self,
1482            _peer: AuthorityIndex,
1483            _serialized_block: &VerifiedBlock,
1484            _timeout: Duration,
1485        ) -> ConsensusResult<()> {
1486            unimplemented!("Unimplemented")
1487        }
1488
1489        async fn subscribe_blocks(
1490            &self,
1491            _peer: AuthorityIndex,
1492            _last_received: Round,
1493            _timeout: Duration,
1494        ) -> ConsensusResult<BlockStream> {
1495            unimplemented!("Unimplemented")
1496        }
1497
1498        async fn fetch_blocks(
1499            &self,
1500            peer: AuthorityIndex,
1501            block_refs: Vec<BlockRef>,
1502            _highest_accepted_rounds: Vec<Round>,
1503            _timeout: Duration,
1504        ) -> ConsensusResult<Vec<Bytes>> {
1505            let mut lock = self.fetch_blocks_requests.lock().await;
1506            let response = lock
1507                .remove(&(block_refs, peer))
1508                .expect("Unexpected fetch blocks request made");
1509
1510            let serialised = response
1511                .0
1512                .into_iter()
1513                .map(|block| block.serialized().clone())
1514                .collect::<Vec<_>>();
1515
1516            drop(lock);
1517
1518            if let Some(latency) = response.1 {
1519                sleep(latency).await;
1520            }
1521
1522            Ok(serialised)
1523        }
1524
1525        async fn fetch_commits(
1526            &self,
1527            _peer: AuthorityIndex,
1528            _commit_range: CommitRange,
1529            _timeout: Duration,
1530        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1531            unimplemented!("Unimplemented")
1532        }
1533
1534        async fn fetch_latest_blocks(
1535            &self,
1536            peer: AuthorityIndex,
1537            authorities: Vec<AuthorityIndex>,
1538            _timeout: Duration,
1539        ) -> ConsensusResult<Vec<Bytes>> {
1540            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1541            let mut responses = lock
1542                .remove(&(peer, authorities.clone()))
1543                .expect("Unexpected fetch blocks request made");
1544
1545            let response = responses.remove(0);
1546            let serialised = response
1547                .0
1548                .into_iter()
1549                .map(|block| block.serialized().clone())
1550                .collect::<Vec<_>>();
1551
1552            if !responses.is_empty() {
1553                lock.insert((peer, authorities), responses);
1554            }
1555
1556            drop(lock);
1557
1558            if let Some(latency) = response.1 {
1559                sleep(latency).await;
1560            }
1561
1562            Ok(serialised)
1563        }
1564
1565        async fn get_latest_rounds(
1566            &self,
1567            _peer: AuthorityIndex,
1568            _timeout: Duration,
1569        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1570            unimplemented!("Unimplemented")
1571        }
1572    }
1573
1574    #[test]
1575    fn test_inflight_blocks_map() {
1576        // GIVEN
1577        let map = InflightBlocksMap::new();
1578        let some_block_refs = [
1579            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1580            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1581            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1582            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1583        ];
1584        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1585
1586        // Lock & unlock blocks
1587        {
1588            let mut all_guards = Vec::new();
1589
1590            // Try to acquire the block locks for authorities 1 & 2 & 3
1591            for i in 1..=3 {
1592                let authority = AuthorityIndex::new_for_test(i);
1593
1594                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1595                let guard = guard.expect("Guard should be created");
1596                assert_eq!(guard.block_refs.len(), 4);
1597
1598                all_guards.push(guard);
1599
1600                // trying to acquire any of them again will not succeed
1601                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1602                assert!(guard.is_none());
1603            }
1604
1605            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1606            // number of allowed peers
1607            let authority_4 = AuthorityIndex::new_for_test(4);
1608
1609            let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1610            assert!(guard.is_none());
1611
1612            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1613            // will now succeed
1614            drop(all_guards.remove(0));
1615
1616            let guard = map.lock_blocks(missing_block_refs.clone(), authority_4);
1617            let guard = guard.expect("Guard should be successfully acquired");
1618
1619            assert_eq!(guard.block_refs, missing_block_refs);
1620
1621            // Dropping all guards should unlock on the block refs
1622            drop(guard);
1623            drop(all_guards);
1624
1625            assert_eq!(map.num_of_locked_blocks(), 0);
1626        }
1627
1628        // Swap locks
1629        {
1630            // acquire a lock for authority 1
1631            let authority_1 = AuthorityIndex::new_for_test(1);
1632            let guard = map
1633                .lock_blocks(missing_block_refs.clone(), authority_1)
1634                .unwrap();
1635
1636            // Now swap the locks for authority 2
1637            let authority_2 = AuthorityIndex::new_for_test(2);
1638            let guard = map.swap_locks(guard, authority_2);
1639
1640            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1641        }
1642    }
1643
1644    #[tokio::test]
1645    async fn test_process_fetched_blocks() {
1646        // GIVEN
1647        let (context, _) = Context::new_for_test(4);
1648        let context = Arc::new(context);
1649        let block_verifier = Arc::new(NoopBlockVerifier {});
1650        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1651        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1652        let (commands_sender, _commands_receiver) =
1653            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1654
1655        // Create input test blocks:
1656        // - Authority 0 block at round 60.
1657        // - Authority 1 blocks from round 30 to 93.
1658        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
1659        expected_blocks.extend(
1660            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
1661        );
1662        assert_eq!(
1663            expected_blocks.len(),
1664            context.parameters.max_blocks_per_sync
1665        );
1666
1667        let expected_serialized_blocks = expected_blocks
1668            .iter()
1669            .map(|b| b.serialized().clone())
1670            .collect::<Vec<_>>();
1671
1672        let expected_block_refs = expected_blocks
1673            .iter()
1674            .map(|b| b.reference())
1675            .collect::<BTreeSet<_>>();
1676
1677        // GIVEN peer to fetch blocks from
1678        let peer_index = AuthorityIndex::new_for_test(2);
1679
1680        // Create blocks_guard
1681        let inflight_blocks_map = InflightBlocksMap::new();
1682        let blocks_guard = inflight_blocks_map
1683            .lock_blocks(expected_block_refs.clone(), peer_index)
1684            .expect("Failed to lock blocks");
1685
1686        assert_eq!(
1687            inflight_blocks_map.num_of_locked_blocks(),
1688            expected_block_refs.len()
1689        );
1690
1691        // Create a Synchronizer
1692        let result = Synchronizer::<
1693            MockNetworkClient,
1694            NoopBlockVerifier,
1695            MockCoreThreadDispatcher,
1696        >::process_fetched_blocks(
1697            expected_serialized_blocks,
1698            peer_index,
1699            blocks_guard, // The guard is consumed here
1700            core_dispatcher.clone(),
1701            block_verifier,
1702            commit_vote_monitor,
1703            context.clone(),
1704            commands_sender,
1705            "test",
1706        )
1707            .await;
1708
1709        // THEN
1710        assert!(result.is_ok());
1711
1712        // Check blocks were sent to core
1713        let added_blocks = core_dispatcher.get_add_blocks().await;
1714        assert_eq!(
1715            added_blocks
1716                .iter()
1717                .map(|b| b.reference())
1718                .collect::<BTreeSet<_>>(),
1719            expected_block_refs,
1720        );
1721
1722        // Check blocks were unlocked
1723        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
1724    }
1725
1726    #[tokio::test]
1727    async fn test_successful_fetch_blocks_from_peer() {
1728        // GIVEN
1729        let (context, _) = Context::new_for_test(4);
1730        let context = Arc::new(context);
1731        let block_verifier = Arc::new(NoopBlockVerifier {});
1732        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1733        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1734        let network_client = Arc::new(MockNetworkClient::default());
1735        let store = Arc::new(MemStore::new());
1736        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1737
1738        let handle = Synchronizer::start(
1739            network_client.clone(),
1740            context,
1741            core_dispatcher.clone(),
1742            commit_vote_monitor,
1743            block_verifier,
1744            dag_state,
1745            false,
1746        );
1747
1748        // Create some test blocks
1749        let expected_blocks = (0..10)
1750            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1751            .collect::<Vec<_>>();
1752        let missing_blocks = expected_blocks
1753            .iter()
1754            .map(|block| block.reference())
1755            .collect::<BTreeSet<_>>();
1756
1757        // AND stub the fetch_blocks request from peer 1
1758        let peer = AuthorityIndex::new_for_test(1);
1759        network_client
1760            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1761            .await;
1762
1763        // WHEN request missing blocks from peer 1
1764        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1765
1766        // Wait a little bit until those have been added in core
1767        sleep(Duration::from_millis(1_000)).await;
1768
1769        // THEN ensure those ended up in Core
1770        let added_blocks = core_dispatcher.get_add_blocks().await;
1771        assert_eq!(added_blocks, expected_blocks);
1772    }
1773
1774    #[tokio::test]
1775    async fn saturate_fetch_blocks_from_peer() {
1776        // GIVEN
1777        let (context, _) = Context::new_for_test(4);
1778        let context = Arc::new(context);
1779        let block_verifier = Arc::new(NoopBlockVerifier {});
1780        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1781        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1782        let network_client = Arc::new(MockNetworkClient::default());
1783        let store = Arc::new(MemStore::new());
1784        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1785
1786        let handle = Synchronizer::start(
1787            network_client.clone(),
1788            context,
1789            core_dispatcher.clone(),
1790            commit_vote_monitor,
1791            block_verifier,
1792            dag_state,
1793            false,
1794        );
1795
1796        // Create some test blocks
1797        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1798            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1799            .collect::<Vec<_>>();
1800
1801        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1802        let peer = AuthorityIndex::new_for_test(1);
1803        let mut iter = expected_blocks.iter().peekable();
1804        while let Some(block) = iter.next() {
1805            // stub the fetch_blocks request from peer 1 and give some high response latency
1806            // so requests can start blocking the peer task.
1807            network_client
1808                .stub_fetch_blocks(
1809                    vec![block.clone()],
1810                    peer,
1811                    Some(Duration::from_millis(5_000)),
1812                )
1813                .await;
1814
1815            let mut missing_blocks = BTreeSet::new();
1816            missing_blocks.insert(block.reference());
1817
1818            // WHEN requesting to fetch the blocks, it should not succeed for the last
1819            // request and get an error with "saturated" synchronizer
1820            if iter.peek().is_none() {
1821                match handle.fetch_blocks(missing_blocks, peer).await {
1822                    Err(ConsensusError::SynchronizerSaturated(index, _)) => {
1823                        assert_eq!(index, peer);
1824                    }
1825                    _ => panic!("A saturated synchronizer error was expected"),
1826                }
1827            } else {
1828                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1829            }
1830        }
1831    }
1832
1833    #[tokio::test(flavor = "current_thread", start_paused = true)]
1834    async fn synchronizer_periodic_task_fetch_blocks() {
1835        // GIVEN
1836        let (context, _) = Context::new_for_test(4);
1837        let context = Arc::new(context);
1838        let block_verifier = Arc::new(NoopBlockVerifier {});
1839        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1840        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1841        let network_client = Arc::new(MockNetworkClient::default());
1842        let store = Arc::new(MemStore::new());
1843        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1844
1845        // Create some test blocks
1846        let expected_blocks = (0..10)
1847            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1848            .collect::<Vec<_>>();
1849        let missing_blocks = expected_blocks
1850            .iter()
1851            .map(|block| block.reference())
1852            .collect::<BTreeSet<_>>();
1853
1854        // AND stub the missing blocks
1855        core_dispatcher
1856            .stub_missing_blocks(missing_blocks.clone())
1857            .await;
1858
1859        // AND stub the requests for authority 1 & 2
1860        // Make the first authority timeout, so the second will be called. "We" are
1861        // authority = 0, so we are skipped anyways.
1862        network_client
1863            .stub_fetch_blocks(
1864                expected_blocks.clone(),
1865                AuthorityIndex::new_for_test(1),
1866                Some(FETCH_REQUEST_TIMEOUT),
1867            )
1868            .await;
1869        network_client
1870            .stub_fetch_blocks(
1871                expected_blocks.clone(),
1872                AuthorityIndex::new_for_test(2),
1873                None,
1874            )
1875            .await;
1876
1877        // WHEN start the synchronizer and wait for a couple of seconds
1878        let _handle = Synchronizer::start(
1879            network_client.clone(),
1880            context,
1881            core_dispatcher.clone(),
1882            commit_vote_monitor,
1883            block_verifier,
1884            dag_state,
1885            false,
1886        );
1887
1888        sleep(8 * FETCH_REQUEST_TIMEOUT).await;
1889
1890        // THEN the missing blocks should now be fetched and added to core
1891        let added_blocks = core_dispatcher.get_add_blocks().await;
1892        assert_eq!(added_blocks, expected_blocks);
1893
1894        // AND missing blocks should have been consumed by the stub
1895        assert!(
1896            core_dispatcher
1897                .get_missing_blocks()
1898                .await
1899                .unwrap()
1900                .is_empty()
1901        );
1902    }
1903
1904    #[tokio::test(flavor = "current_thread", start_paused = true)]
1905    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1906        // GIVEN
1907        let (mut context, _) = Context::new_for_test(4);
1908        context
1909            .protocol_config
1910            .set_consensus_batched_block_sync_for_testing(true);
1911        let context = Arc::new(context);
1912        let block_verifier = Arc::new(NoopBlockVerifier {});
1913        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1914        let network_client = Arc::new(MockNetworkClient::default());
1915        let store = Arc::new(MemStore::new());
1916        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1917        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1918
1919        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
1920        // that are above the sync threshold.
1921        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1922        let stub_blocks = (sync_missing_block_round_threshold * 2
1923            ..sync_missing_block_round_threshold * 2
1924                + context.parameters.max_blocks_per_sync as u32)
1925            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1926            .collect::<Vec<_>>();
1927        let missing_blocks = stub_blocks
1928            .iter()
1929            .map(|block| block.reference())
1930            .collect::<BTreeSet<_>>();
1931        core_dispatcher
1932            .stub_missing_blocks(missing_blocks.clone())
1933            .await;
1934        // AND stub the requests for authority 1 & 2
1935        // Make the first authority timeout, so the second will be called. "We" are
1936        // authority = 0, so we are skipped anyways.
1937        let mut expected_blocks = stub_blocks
1938            .iter()
1939            .take(context.parameters.max_blocks_per_sync)
1940            .cloned()
1941            .collect::<Vec<_>>();
1942        network_client
1943            .stub_fetch_blocks(
1944                expected_blocks.clone(),
1945                AuthorityIndex::new_for_test(1),
1946                Some(FETCH_REQUEST_TIMEOUT),
1947            )
1948            .await;
1949        network_client
1950            .stub_fetch_blocks(
1951                expected_blocks.clone(),
1952                AuthorityIndex::new_for_test(2),
1953                None,
1954            )
1955            .await;
1956
1957        // Now create some blocks to simulate a commit lag
1958        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1959        let commit_index: CommitIndex = round - 1;
1960        let blocks = (0..4)
1961            .map(|authority| {
1962                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1963                let block = TestBlock::new(round, authority)
1964                    .set_commit_votes(commit_votes)
1965                    .build();
1966
1967                VerifiedBlock::new_for_test(block)
1968            })
1969            .collect::<Vec<_>>();
1970
1971        // Pass them through the commit vote monitor - so now there will be a big commit
1972        // lag to prevent the scheduled synchronizer from running
1973        for block in blocks {
1974            commit_vote_monitor.observe_block(&block);
1975        }
1976
1977        // Start the synchronizer and wait for a couple of seconds where normally
1978        // the synchronizer should have kicked in.
1979        let _handle = Synchronizer::start(
1980            network_client.clone(),
1981            context.clone(),
1982            core_dispatcher.clone(),
1983            commit_vote_monitor.clone(),
1984            block_verifier,
1985            dag_state.clone(),
1986            false,
1987        );
1988
1989        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1990
1991        // Since we should be in commit lag mode none of the missed blocks should have
1992        // been fetched - hence nothing should be sent to core for processing.
1993        let added_blocks = core_dispatcher.get_add_blocks().await;
1994        assert_eq!(added_blocks, vec![]);
1995
1996        println!("Before advancing");
1997        // AND advance now the local commit index by adding a new commit that matches
1998        // the commit index of quorum
1999        {
2000            let mut d = dag_state.write();
2001            for index in 1..=commit_index {
2002                let commit =
2003                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2004
2005                d.add_commit(commit);
2006            }
2007
2008            println!("Once advanced");
2009            assert_eq!(
2010                d.last_commit_index(),
2011                commit_vote_monitor.quorum_commit_index()
2012            );
2013        }
2014
2015        // Now stub again the missing blocks to fetch the exact same ones.
2016        core_dispatcher
2017            .stub_missing_blocks(missing_blocks.clone())
2018            .await;
2019
2020        println!("Final sleep");
2021        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2022
2023        // THEN the missing blocks should now be fetched and added to core
2024        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2025        println!("Final await");
2026        added_blocks.sort_by_key(|block| block.reference());
2027        expected_blocks.sort_by_key(|block| block.reference());
2028
2029        assert_eq!(added_blocks, expected_blocks);
2030    }
2031
2032    #[tokio::test(flavor = "current_thread", start_paused = true)]
2033    async fn synchronizer_fetch_own_last_block() {
2034        // GIVEN
2035        let (context, _) = Context::new_for_test(4);
2036        let context = Arc::new(context.with_parameters(Parameters {
2037            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2038            ..Default::default()
2039        }));
2040        let block_verifier = Arc::new(NoopBlockVerifier {});
2041        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2042        let network_client = Arc::new(MockNetworkClient::default());
2043        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2044        let store = Arc::new(MemStore::new());
2045        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2046        let our_index = AuthorityIndex::new_for_test(0);
2047
2048        // Create some test blocks
2049        let mut expected_blocks = (8..=10)
2050            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2051            .collect::<Vec<_>>();
2052
2053        // Now set different latest blocks for the peers
2054        // For peer 1 we give the block of round 10 (highest)
2055        let block_1 = expected_blocks.pop().unwrap();
2056        network_client
2057            .stub_fetch_latest_blocks(
2058                vec![block_1.clone()],
2059                AuthorityIndex::new_for_test(1),
2060                vec![our_index],
2061                Some(Duration::from_secs(10)),
2062            )
2063            .await;
2064        network_client
2065            .stub_fetch_latest_blocks(
2066                vec![block_1],
2067                AuthorityIndex::new_for_test(1),
2068                vec![our_index],
2069                None,
2070            )
2071            .await;
2072
2073        // For peer 2 we give the block of round 9
2074        let block_2 = expected_blocks.pop().unwrap();
2075        network_client
2076            .stub_fetch_latest_blocks(
2077                vec![block_2.clone()],
2078                AuthorityIndex::new_for_test(2),
2079                vec![our_index],
2080                Some(Duration::from_secs(10)),
2081            )
2082            .await;
2083        network_client
2084            .stub_fetch_latest_blocks(
2085                vec![block_2],
2086                AuthorityIndex::new_for_test(2),
2087                vec![our_index],
2088                None,
2089            )
2090            .await;
2091
2092        // For peer 3 we give a block with lowest round
2093        let block_3 = expected_blocks.pop().unwrap();
2094        network_client
2095            .stub_fetch_latest_blocks(
2096                vec![block_3.clone()],
2097                AuthorityIndex::new_for_test(3),
2098                vec![our_index],
2099                Some(Duration::from_secs(10)),
2100            )
2101            .await;
2102        network_client
2103            .stub_fetch_latest_blocks(
2104                vec![block_3],
2105                AuthorityIndex::new_for_test(3),
2106                vec![our_index],
2107                None,
2108            )
2109            .await;
2110
2111        // WHEN start the synchronizer and wait for a couple of seconds
2112        let handle = Synchronizer::start(
2113            network_client.clone(),
2114            context.clone(),
2115            core_dispatcher.clone(),
2116            commit_vote_monitor,
2117            block_verifier,
2118            dag_state,
2119            true,
2120        );
2121
2122        // Wait at least for the timeout time
2123        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2124
2125        // Assert that core has been called to set the min propose round
2126        assert_eq!(
2127            core_dispatcher.get_last_own_proposed_round().await,
2128            vec![10]
2129        );
2130
2131        // Ensure that all the requests have been called
2132        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
2133
2134        // And we got one retry
2135        assert_eq!(
2136            context
2137                .metrics
2138                .node_metrics
2139                .sync_last_known_own_block_retries
2140                .get(),
2141            1
2142        );
2143
2144        // Ensure that no panic occurred
2145        if let Err(err) = handle.stop().await {
2146            if err.is_panic() {
2147                std::panic::resume_unwind(err.into_panic());
2148            }
2149        }
2150    }
2151    #[derive(Default)]
2152    struct SyncMockDispatcher {
2153        missing_blocks: Mutex<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>,
2154        added_blocks: Mutex<Vec<VerifiedBlock>>,
2155    }
2156
2157    #[async_trait::async_trait]
2158    impl CoreThreadDispatcher for SyncMockDispatcher {
2159        async fn get_missing_blocks(
2160            &self,
2161        ) -> Result<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>, CoreError> {
2162            Ok(self.missing_blocks.lock().await.clone())
2163        }
2164        async fn add_blocks(
2165            &self,
2166            blocks: Vec<VerifiedBlock>,
2167        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2168            let mut guard = self.added_blocks.lock().await;
2169            guard.extend(blocks.clone());
2170            Ok(blocks.iter().map(|b| b.reference()).collect())
2171        }
2172
2173        // Stub out the remaining CoreThreadDispatcher methods with defaults:
2174
2175        async fn check_block_refs(
2176            &self,
2177            block_refs: Vec<BlockRef>,
2178        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2179            // Echo back the requested refs by default
2180            Ok(block_refs.into_iter().collect())
2181        }
2182
2183        async fn add_certified_commits(
2184            &self,
2185            _commits: CertifiedCommits,
2186        ) -> Result<BTreeSet<BlockRef>, CoreError> {
2187            // No additional certified-commit logic in tests
2188            Ok(BTreeSet::new())
2189        }
2190
2191        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
2192            Ok(())
2193        }
2194
2195        fn set_quorum_subscribers_exists(&self, _exists: bool) -> Result<(), CoreError> {
2196            Ok(())
2197        }
2198
2199        fn set_propagation_delay_and_quorum_rounds(
2200            &self,
2201            _delay: Round,
2202            _received_quorum_rounds: Vec<QuorumRound>,
2203            _accepted_quorum_rounds: Vec<QuorumRound>,
2204        ) -> Result<(), CoreError> {
2205            Ok(())
2206        }
2207
2208        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
2209            Ok(())
2210        }
2211
2212        fn highest_received_rounds(&self) -> Vec<Round> {
2213            Vec::new()
2214        }
2215    }
2216
2217    #[tokio::test(flavor = "current_thread")]
2218    async fn known_before_random_peer_fetch() {
2219        {
2220            // 1) Setup 10‐node context and in‐mem DAG
2221            let (ctx, _) = Context::new_for_test(10);
2222            let context = Arc::new(ctx);
2223            let store = Arc::new(MemStore::new());
2224            let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2225            let inflight = InflightBlocksMap::new();
2226
2227            // 2) One missing block
2228            let missing_vb = VerifiedBlock::new_for_test(TestBlock::new(100, 3).build());
2229            let missing_ref = missing_vb.reference();
2230            let missing_blocks = BTreeMap::from([(
2231                missing_ref,
2232                BTreeSet::from([
2233                    AuthorityIndex::new_for_test(2),
2234                    AuthorityIndex::new_for_test(3),
2235                    AuthorityIndex::new_for_test(4),
2236                ]),
2237            )]);
2238
2239            // 3) Prepare mocks and stubs
2240            let network_client = Arc::new(MockNetworkClient::default());
2241            // Stub *all*  authorities so none panic:
2242            for i in 1..=9 {
2243                let peer = AuthorityIndex::new_for_test(i);
2244                if i == 1 || i == 4 {
2245                    network_client
2246                        .stub_fetch_blocks(
2247                            vec![missing_vb.clone()],
2248                            peer,
2249                            Some(2 * FETCH_REQUEST_TIMEOUT),
2250                        )
2251                        .await;
2252                    continue;
2253                }
2254                network_client
2255                    .stub_fetch_blocks(vec![missing_vb.clone()], peer, None)
2256                    .await;
2257            }
2258
2259            // 4) Invoke knowledge-based fetch and random fallback selection
2260            //    deterministically
2261            let results = Synchronizer::<MockNetworkClient, NoopBlockVerifier, SyncMockDispatcher>
2262        ::fetch_blocks_from_authorities(
2263            context.clone(),
2264            inflight.clone(),
2265            network_client.clone(),
2266            missing_blocks,
2267            dag_state.clone(),
2268        )
2269            .await;
2270
2271            // 5) Assert we got exactly two fetches - two from the first two who are aware
2272            //    of the missing block (authority 2 and 3)
2273            assert_eq!(results.len(), 2);
2274
2275            // 6) The  knowledge-based‐fetch went to peer 2 and 3
2276            let (_hot_guard, hot_bytes, hot_peer) = &results[0];
2277            assert_eq!(*hot_peer, AuthorityIndex::new_for_test(2));
2278            let (_periodic_guard, _periodic_bytes, periodic_peer) = &results[1];
2279            assert_eq!(*periodic_peer, AuthorityIndex::new_for_test(3));
2280            // 7) Verify the returned bytes correspond to that block
2281            let expected = missing_vb.serialized().clone();
2282            assert_eq!(hot_bytes, &vec![expected]);
2283        }
2284    }
2285
2286    #[tokio::test(flavor = "current_thread")]
2287    async fn known_before_periodic_peer_fetch_larger_scenario() {
2288        use std::{
2289            collections::{BTreeMap, BTreeSet},
2290            sync::Arc,
2291        };
2292
2293        use parking_lot::RwLock;
2294
2295        use crate::{
2296            block::{Round, TestBlock, VerifiedBlock},
2297            context::Context,
2298        };
2299
2300        // 1) Setup a 10-node context, in-memory DAG, and inflight map
2301        let (ctx, _) = Context::new_for_test(10);
2302        let context = Arc::new(ctx);
2303        let store = Arc::new(MemStore::new());
2304        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2305        let inflight = InflightBlocksMap::new();
2306        let network_client = Arc::new(MockNetworkClient::default());
2307
2308        // 2) Create 1000 missing blocks known by authorities 0, 2, and 3
2309        let mut missing_blocks = BTreeMap::new();
2310        let mut missing_vbs = Vec::new();
2311        let known_number_blocks = 10;
2312        for i in 0..1000 {
2313            let vb = VerifiedBlock::new_for_test(TestBlock::new(1000 + i as Round, 0).build());
2314            let r = vb.reference();
2315            if i < known_number_blocks {
2316                // First 10 blocks are known by authorities 0, 2
2317                missing_blocks.insert(
2318                    r,
2319                    BTreeSet::from([
2320                        AuthorityIndex::new_for_test(0),
2321                        AuthorityIndex::new_for_test(2),
2322                    ]),
2323                );
2324            } else if i >= known_number_blocks && i < 2 * known_number_blocks {
2325                // Second 10 blocks are known by authorities 0, 3
2326                missing_blocks.insert(
2327                    r,
2328                    BTreeSet::from([
2329                        AuthorityIndex::new_for_test(0),
2330                        AuthorityIndex::new_for_test(3),
2331                    ]),
2332                );
2333            } else {
2334                // The rest are only known by authority 0
2335                missing_blocks.insert(r, BTreeSet::from([AuthorityIndex::new_for_test(0)]));
2336            }
2337            missing_vbs.push(vb);
2338        }
2339
2340        // 3) Stub fetches for knowledge-based peers (2 and 3)
2341        let known_peers = [2, 3].map(AuthorityIndex::new_for_test);
2342        let known_vbs_by_peer: Vec<(AuthorityIndex, Vec<VerifiedBlock>)> = known_peers
2343            .iter()
2344            .map(|&peer| {
2345                let vbs = missing_vbs
2346                    .iter()
2347                    .filter(|vb| missing_blocks.get(&vb.reference()).unwrap().contains(&peer))
2348                    .take(context.parameters.max_blocks_per_sync)
2349                    .cloned()
2350                    .collect::<Vec<_>>();
2351                (peer, vbs)
2352            })
2353            .collect();
2354
2355        for (peer, vbs) in known_vbs_by_peer {
2356            if peer == AuthorityIndex::new_for_test(2) {
2357                // Simulate timeout for peer 2, then fallback to peer 5
2358                network_client
2359                    .stub_fetch_blocks(vbs.clone(), peer, Some(2 * FETCH_REQUEST_TIMEOUT))
2360                    .await;
2361                network_client
2362                    .stub_fetch_blocks(vbs.clone(), AuthorityIndex::new_for_test(5), None)
2363                    .await;
2364            } else {
2365                network_client
2366                    .stub_fetch_blocks(vbs.clone(), peer, None)
2367                    .await;
2368            }
2369        }
2370
2371        // 4) Stub fetches from periodic path peers (1 and 4)
2372        network_client
2373            .stub_fetch_blocks(
2374                missing_vbs[0..context.parameters.max_blocks_per_sync].to_vec(),
2375                AuthorityIndex::new_for_test(1),
2376                None,
2377            )
2378            .await;
2379
2380        network_client
2381            .stub_fetch_blocks(
2382                missing_vbs[context.parameters.max_blocks_per_sync
2383                    ..2 * context.parameters.max_blocks_per_sync]
2384                    .to_vec(),
2385                AuthorityIndex::new_for_test(4),
2386                None,
2387            )
2388            .await;
2389
2390        // 5) Execute the fetch logic
2391        let results = Synchronizer::<
2392            MockNetworkClient,
2393            NoopBlockVerifier,
2394            SyncMockDispatcher,
2395        >::fetch_blocks_from_authorities(
2396            context.clone(),
2397            inflight.clone(),
2398            network_client.clone(),
2399            missing_blocks,
2400            dag_state.clone(),
2401        )
2402            .await;
2403
2404        // 6) Assert we got 4 fetches: peer 2 (timed out), fallback to 5, then periodic
2405        //    from 1 and 4
2406        assert_eq!(results.len(), 4, "Expected 2 known + 2 random fetches");
2407
2408        // 7) First fetch from peer 3 (knowledge-based)
2409        let (_guard3, bytes3, peer3) = &results[0];
2410        assert_eq!(*peer3, AuthorityIndex::new_for_test(3));
2411        let expected2 = missing_vbs[known_number_blocks..2 * known_number_blocks]
2412            .iter()
2413            .map(|vb| vb.serialized().clone())
2414            .collect::<Vec<_>>();
2415        assert_eq!(bytes3, &expected2);
2416
2417        // 8) Second fetch from peer 1 (periodic)
2418        let (_guard1, bytes1, peer1) = &results[1];
2419        assert_eq!(*peer1, AuthorityIndex::new_for_test(1));
2420        let expected1 = missing_vbs[0..context.parameters.max_blocks_per_sync]
2421            .iter()
2422            .map(|vb| vb.serialized().clone())
2423            .collect::<Vec<_>>();
2424        assert_eq!(bytes1, &expected1);
2425
2426        // 9) Third fetch from peer 4 (periodic)
2427        let (_guard4, bytes4, peer4) = &results[2];
2428        assert_eq!(*peer4, AuthorityIndex::new_for_test(4));
2429        let expected4 = missing_vbs
2430            [context.parameters.max_blocks_per_sync..2 * context.parameters.max_blocks_per_sync]
2431            .iter()
2432            .map(|vb| vb.serialized().clone())
2433            .collect::<Vec<_>>();
2434        assert_eq!(bytes4, &expected4);
2435
2436        // 10) Fourth fetch from peer 5 (fallback after peer 2 timeout)
2437        let (_guard5, bytes5, peer5) = &results[3];
2438        assert_eq!(*peer5, AuthorityIndex::new_for_test(5));
2439        let expected5 = missing_vbs[0..known_number_blocks]
2440            .iter()
2441            .map(|vb| vb.serialized().clone())
2442            .collect::<Vec<_>>();
2443        assert_eq!(bytes5, &expected5);
2444    }
2445}