consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    sync::Arc,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use consensus_config::AuthorityIndex;
13use futures::{StreamExt as _, stream::FuturesUnordered};
14use iota_macros::fail_point_async;
15use iota_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use itertools::Itertools as _;
21use parking_lot::{Mutex, RwLock};
22#[cfg(not(test))]
23use rand::{prelude::SliceRandom, rngs::ThreadRng};
24use tap::TapFallible;
25use tokio::{
26    runtime::Handle,
27    sync::{mpsc::error::TrySendError, oneshot},
28    task::{JoinError, JoinSet},
29    time::{Instant, sleep, sleep_until, timeout},
30};
31use tracing::{debug, error, info, trace, warn};
32
33use crate::{
34    BlockAPI, CommitIndex, Round,
35    authority_service::COMMIT_LAG_MULTIPLIER,
36    block::{BlockRef, SignedBlock, VerifiedBlock},
37    block_verifier::BlockVerifier,
38    commit_vote_monitor::CommitVoteMonitor,
39    context::Context,
40    core_thread::CoreThreadDispatcher,
41    dag_state::DagState,
42    error::{ConsensusError, ConsensusResult},
43    network::NetworkClient,
44};
45
46/// The number of concurrent fetch blocks requests per authority
47const FETCH_BLOCKS_CONCURRENCY: usize = 5;
48
49/// Timeouts when fetching blocks.
50const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
51const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
52
53/// Max number of blocks to fetch per request.
54/// This value should be chosen so even with blocks at max size, the requests
55/// can finish on hosts with good network using the timeouts above.
56const MAX_BLOCKS_PER_FETCH: usize = 32;
57
58const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
59
60/// The number of rounds above the highest accepted round that still willing to
61/// fetch missing blocks via the periodic synchronizer. Any missing blocks of
62/// higher rounds are considered too far in the future to fetch. This property
63/// is taken into account only when it's detected that the node has fallen
64/// behind on its commit compared to the rest of the network, otherwise
65/// scheduler will attempt to fetch any missing block.
66const SYNC_MISSING_BLOCK_ROUND_THRESHOLD: u32 = 50;
67
68struct BlocksGuard {
69    map: Arc<InflightBlocksMap>,
70    block_refs: BTreeSet<BlockRef>,
71    peer: AuthorityIndex,
72}
73
74impl Drop for BlocksGuard {
75    fn drop(&mut self) {
76        self.map.unlock_blocks(&self.block_refs, self.peer);
77    }
78}
79
80// Keeps a mapping between the missing blocks that have been instructed to be
81// fetched and the authorities that are currently fetching them. For a block ref
82// there is a maximum number of authorities that can concurrently fetch it. The
83// authority ids that are currently fetching a block are set on the
84// corresponding `BTreeSet` and basically they act as "locks".
85struct InflightBlocksMap {
86    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
87}
88
89impl InflightBlocksMap {
90    fn new() -> Arc<Self> {
91        Arc::new(Self {
92            inner: Mutex::new(HashMap::new()),
93        })
94    }
95
96    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to
97    /// avoid re-fetching the missing blocks from too many authorities at
98    /// the same time, thus we limit the concurrency per block by attempting
99    /// to lock per block. If a block is already fetched by the maximum allowed
100    /// number of authorities, then the block ref will not be included in the
101    /// returned set. The method returns all the block refs that have been
102    /// successfully locked and allowed to be fetched.
103    fn lock_blocks(
104        self: &Arc<Self>,
105        missing_block_refs: BTreeSet<BlockRef>,
106        peer: AuthorityIndex,
107    ) -> Option<BlocksGuard> {
108        let mut blocks = BTreeSet::new();
109        let mut inner = self.inner.lock();
110
111        for block_ref in missing_block_refs {
112            // check that the number of authorities that are already instructed to fetch the
113            // block is not higher than the allowed and the `peer_index` has not
114            // already been instructed to do that.
115            let authorities = inner.entry(block_ref).or_default();
116            if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
117                && authorities.get(&peer).is_none()
118            {
119                assert!(authorities.insert(peer));
120                blocks.insert(block_ref);
121            }
122        }
123
124        if blocks.is_empty() {
125            None
126        } else {
127            Some(BlocksGuard {
128                map: self.clone(),
129                block_refs: blocks,
130                peer,
131            })
132        }
133    }
134
135    /// Unlocks the provided block references for the given `peer`. The
136    /// unlocking is strict, meaning that if this method is called for a
137    /// specific block ref and peer more times than the corresponding lock
138    /// has been called, it will panic.
139    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
140        // Now mark all the blocks as fetched from the map
141        let mut blocks_to_fetch = self.inner.lock();
142        for block_ref in block_refs {
143            let authorities = blocks_to_fetch
144                .get_mut(block_ref)
145                .expect("Should have found a non empty map");
146
147            assert!(authorities.remove(&peer), "Peer index should be present!");
148
149            // if the last one then just clean up
150            if authorities.is_empty() {
151                blocks_to_fetch.remove(block_ref);
152            }
153        }
154    }
155
156    /// Drops the provided `blocks_guard` which will force to unlock the blocks,
157    /// and lock now again the referenced block refs. The swap is best
158    /// effort and there is no guarantee that the `peer` will be able to
159    /// acquire the new locks.
160    fn swap_locks(
161        self: &Arc<Self>,
162        blocks_guard: BlocksGuard,
163        peer: AuthorityIndex,
164    ) -> Option<BlocksGuard> {
165        let block_refs = blocks_guard.block_refs.clone();
166
167        // Explicitly drop the guard
168        drop(blocks_guard);
169
170        // Now create new guard
171        self.lock_blocks(block_refs, peer)
172    }
173
174    #[cfg(test)]
175    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
176        let inner = self.inner.lock();
177        inner.len()
178    }
179}
180
181enum Command {
182    FetchBlocks {
183        missing_block_refs: BTreeSet<BlockRef>,
184        peer_index: AuthorityIndex,
185        result: oneshot::Sender<Result<(), ConsensusError>>,
186    },
187    FetchOwnLastBlock,
188    KickOffScheduler,
189}
190
191pub(crate) struct SynchronizerHandle {
192    commands_sender: Sender<Command>,
193    tasks: tokio::sync::Mutex<JoinSet<()>>,
194}
195
196impl SynchronizerHandle {
197    /// Explicitly asks from the synchronizer to fetch the blocks - provided the
198    /// block_refs set - from the peer authority.
199    pub(crate) async fn fetch_blocks(
200        &self,
201        missing_block_refs: BTreeSet<BlockRef>,
202        peer_index: AuthorityIndex,
203    ) -> ConsensusResult<()> {
204        let (sender, receiver) = oneshot::channel();
205        self.commands_sender
206            .send(Command::FetchBlocks {
207                missing_block_refs,
208                peer_index,
209                result: sender,
210            })
211            .await
212            .map_err(|_err| ConsensusError::Shutdown)?;
213        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
214    }
215
216    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
217        let mut tasks = self.tasks.lock().await;
218        tasks.abort_all();
219        while let Some(result) = tasks.join_next().await {
220            result?
221        }
222        Ok(())
223    }
224}
225
226/// `Synchronizer` oversees live block synchronization, crucial for node
227/// progress. Live synchronization refers to the process of retrieving missing
228/// blocks, particularly those essential for advancing a node when data from
229/// only a few rounds is absent. If a node significantly lags behind the
230/// network, `commit_syncer` handles fetching missing blocks via a more
231/// efficient approach. `Synchronizer` aims for swift catch-up employing two
232/// mechanisms:
233///
234/// 1. Explicitly requesting missing blocks from designated authorities via the
235///    "block send" path. This includes attempting to fetch any missing
236///    ancestors necessary for processing a received block. Such requests
237///    prioritize the block author, maximizing the chance of prompt retrieval. A
238///    locking mechanism allows concurrent requests for missing blocks from up
239///    to two authorities simultaneously, enhancing the chances of timely
240///    retrieval. Notably, if additional missing blocks arise during block
241///    processing, requests to the same authority are deferred to the scheduler.
242///
243/// 2. Periodically requesting missing blocks via a scheduler. This primarily
244///    serves to retrieve missing blocks that were not ancestors of a received
245///    block via the "block send" path. The scheduler operates on either a fixed
246///    periodic basis or is triggered immediately after explicit fetches
247///    described in (1), ensuring continued block retrieval if gaps persist.
248///
249/// Additionally to the above, the synchronizer can synchronize and fetch the
250/// last own proposed block from the network peers as best effort approach to
251/// recover node from amnesia and avoid making the node equivocate.
252pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
253    context: Arc<Context>,
254    commands_receiver: Receiver<Command>,
255    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
256    core_dispatcher: Arc<D>,
257    commit_vote_monitor: Arc<CommitVoteMonitor>,
258    dag_state: Arc<RwLock<DagState>>,
259    fetch_blocks_scheduler_task: JoinSet<()>,
260    fetch_own_last_block_task: JoinSet<()>,
261    network_client: Arc<C>,
262    block_verifier: Arc<V>,
263    inflight_blocks_map: Arc<InflightBlocksMap>,
264    commands_sender: Sender<Command>,
265}
266
267impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
268    /// Starts the synchronizer, which is responsible for fetching blocks from
269    /// other authorities and managing block synchronization tasks.
270    pub fn start(
271        network_client: Arc<C>,
272        context: Arc<Context>,
273        core_dispatcher: Arc<D>,
274        commit_vote_monitor: Arc<CommitVoteMonitor>,
275        block_verifier: Arc<V>,
276        dag_state: Arc<RwLock<DagState>>,
277        sync_last_known_own_block: bool,
278    ) -> Arc<SynchronizerHandle> {
279        let (commands_sender, commands_receiver) =
280            channel("consensus_synchronizer_commands", 1_000);
281        let inflight_blocks_map = InflightBlocksMap::new();
282
283        // Spawn the tasks to fetch the blocks from the others
284        let mut fetch_block_senders = BTreeMap::new();
285        let mut tasks = JoinSet::new();
286        for (index, _) in context.committee.authorities() {
287            if index == context.own_index {
288                continue;
289            }
290            let (sender, receiver) =
291                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
292            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
293                index,
294                network_client.clone(),
295                block_verifier.clone(),
296                commit_vote_monitor.clone(),
297                context.clone(),
298                core_dispatcher.clone(),
299                dag_state.clone(),
300                receiver,
301                commands_sender.clone(),
302            );
303            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
304            fetch_block_senders.insert(index, sender);
305        }
306
307        let commands_sender_clone = commands_sender.clone();
308
309        if sync_last_known_own_block {
310            commands_sender
311                .try_send(Command::FetchOwnLastBlock)
312                .expect("Failed to sync our last block");
313        }
314
315        // Spawn the task to listen to the requests & periodic runs
316        tasks.spawn(monitored_future!(async move {
317            let mut s = Self {
318                context,
319                commands_receiver,
320                fetch_block_senders,
321                core_dispatcher,
322                commit_vote_monitor,
323                fetch_blocks_scheduler_task: JoinSet::new(),
324                fetch_own_last_block_task: JoinSet::new(),
325                network_client,
326                block_verifier,
327                inflight_blocks_map,
328                commands_sender: commands_sender_clone,
329                dag_state,
330            };
331            s.run().await;
332        }));
333
334        Arc::new(SynchronizerHandle {
335            commands_sender,
336            tasks: tokio::sync::Mutex::new(tasks),
337        })
338    }
339
340    // The main loop to listen for the submitted commands.
341    async fn run(&mut self) {
342        // We want the synchronizer to run periodically every 500ms to fetch any missing
343        // blocks.
344        const SYNCHRONIZER_TIMEOUT: Duration = Duration::from_millis(500);
345        let scheduler_timeout = sleep_until(Instant::now() + SYNCHRONIZER_TIMEOUT);
346
347        tokio::pin!(scheduler_timeout);
348
349        loop {
350            tokio::select! {
351                Some(command) = self.commands_receiver.recv() => {
352                    match command {
353                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
354                            if peer_index == self.context.own_index {
355                                error!("We should never attempt to fetch blocks from our own node");
356                                continue;
357                            }
358
359                            // Keep only the max allowed blocks to request. It is ok to reduce here as the scheduler
360                            // task will take care syncing whatever is leftover.
361                            let missing_block_refs = missing_block_refs
362                                .into_iter()
363                                .take(MAX_BLOCKS_PER_FETCH)
364                                .collect();
365
366                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
367                            let Some(blocks_guard) = blocks_guard else {
368                                result.send(Ok(())).ok();
369                                continue;
370                            };
371
372                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
373                            // synchronization task will handle any still missing blocks in next run.
374                            let r = self
375                                .fetch_block_senders
376                                .get(&peer_index)
377                                .expect("Fatal error, sender should be present")
378                                .try_send(blocks_guard)
379                                .map_err(|err| {
380                                    match err {
381                                        TrySendError::Full(_) => ConsensusError::SynchronizerSaturated(peer_index),
382                                        TrySendError::Closed(_) => ConsensusError::Shutdown
383                                    }
384                                });
385
386                            result.send(r).ok();
387                        }
388                        Command::FetchOwnLastBlock => {
389                            if self.fetch_own_last_block_task.is_empty() {
390                                self.start_fetch_own_last_block_task();
391                            }
392                        }
393                        Command::KickOffScheduler => {
394                            // just reset the scheduler timeout timer to run immediately if not already running.
395                            // If the scheduler is already running then just reduce the remaining time to run.
396                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
397                                Instant::now()
398                            } else {
399                                Instant::now() + SYNCHRONIZER_TIMEOUT.checked_div(2).unwrap()
400                            };
401
402                            // only reset if it is earlier than the next deadline
403                            if timeout < scheduler_timeout.deadline() {
404                                scheduler_timeout.as_mut().reset(timeout);
405                            }
406                        }
407                    }
408                },
409                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
410                    match result {
411                        Ok(()) => {},
412                        Err(e) => {
413                            if e.is_cancelled() {
414                            } else if e.is_panic() {
415                                std::panic::resume_unwind(e.into_panic());
416                            } else {
417                                panic!("fetch our last block task failed: {e}");
418                            }
419                        },
420                    };
421                },
422                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
423                    match result {
424                        Ok(()) => {},
425                        Err(e) => {
426                            if e.is_cancelled() {
427                            } else if e.is_panic() {
428                                std::panic::resume_unwind(e.into_panic());
429                            } else {
430                                panic!("fetch blocks scheduler task failed: {e}");
431                            }
432                        },
433                    };
434                },
435                () = &mut scheduler_timeout => {
436                    // we want to start a new task only if the previous one has already finished.
437                    if self.fetch_blocks_scheduler_task.is_empty() {
438                        if let Err(err) = self.start_fetch_missing_blocks_task().await {
439                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
440                            return;
441                        };
442                    }
443
444                    scheduler_timeout
445                        .as_mut()
446                        .reset(Instant::now() + SYNCHRONIZER_TIMEOUT);
447                }
448            }
449        }
450    }
451
452    async fn fetch_blocks_from_authority(
453        peer_index: AuthorityIndex,
454        network_client: Arc<C>,
455        block_verifier: Arc<V>,
456        commit_vote_monitor: Arc<CommitVoteMonitor>,
457        context: Arc<Context>,
458        core_dispatcher: Arc<D>,
459        dag_state: Arc<RwLock<DagState>>,
460        mut receiver: Receiver<BlocksGuard>,
461        commands_sender: Sender<Command>,
462    ) {
463        const MAX_RETRIES: u32 = 5;
464        let peer_hostname = &context.committee.authority(peer_index).hostname;
465
466        let mut requests = FuturesUnordered::new();
467
468        loop {
469            tokio::select! {
470                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
471                    // get the highest accepted rounds
472                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
473
474                    requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, 1))
475                },
476                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
477                    match response {
478                        Ok(blocks) => {
479                            if let Err(err) = Self::process_fetched_blocks(blocks,
480                                peer_index,
481                                blocks_guard,
482                                core_dispatcher.clone(),
483                                block_verifier.clone(),
484                                commit_vote_monitor.clone(),
485                                context.clone(),
486                                commands_sender.clone(),
487                                "live"
488                            ).await {
489                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
490                            }
491                        },
492                        Err(_) => {
493                            if retries <= MAX_RETRIES {
494                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries))
495                            } else {
496                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
497                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
498                                drop(blocks_guard);
499                            }
500                        }
501                    }
502                },
503                else => {
504                    info!("Fetching blocks from authority {peer_index} task will now abort.");
505                    break;
506                }
507            }
508        }
509    }
510
511    /// Processes the requested raw fetched blocks from peer `peer_index`. If no
512    /// error is returned then the verified blocks are immediately sent to
513    /// Core for processing.
514    async fn process_fetched_blocks(
515        serialized_blocks: Vec<Bytes>,
516        peer_index: AuthorityIndex,
517        requested_blocks_guard: BlocksGuard,
518        core_dispatcher: Arc<D>,
519        block_verifier: Arc<V>,
520        commit_vote_monitor: Arc<CommitVoteMonitor>,
521        context: Arc<Context>,
522        commands_sender: Sender<Command>,
523        sync_method: &str,
524    ) -> ConsensusResult<()> {
525        // The maximum number of blocks that can be additionally fetched from the one
526        // requested - those are potentially missing ancestors.
527        const MAX_ADDITIONAL_BLOCKS: usize = 10;
528
529        // Ensure that all the returned blocks do not go over the total max allowed
530        // returned blocks
531        if serialized_blocks.len() > requested_blocks_guard.block_refs.len() + MAX_ADDITIONAL_BLOCKS
532        {
533            return Err(ConsensusError::TooManyFetchedBlocksReturned(peer_index));
534        }
535
536        // Verify all the fetched blocks
537        let blocks = Handle::current()
538            .spawn_blocking({
539                let block_verifier = block_verifier.clone();
540                let context = context.clone();
541                move || Self::verify_blocks(serialized_blocks, block_verifier, &context, peer_index)
542            })
543            .await
544            .expect("Spawn blocking should not fail")?;
545
546        // Get all the ancestors of the requested blocks only
547        let ancestors = blocks
548            .iter()
549            .filter(|b| requested_blocks_guard.block_refs.contains(&b.reference()))
550            .flat_map(|b| b.ancestors().to_vec())
551            .collect::<BTreeSet<BlockRef>>();
552
553        // Now confirm that the blocks are either between the ones requested, or they
554        // are parents of the requested blocks
555        for block in &blocks {
556            if !requested_blocks_guard
557                .block_refs
558                .contains(&block.reference())
559                && !ancestors.contains(&block.reference())
560            {
561                return Err(ConsensusError::UnexpectedFetchedBlock {
562                    index: peer_index,
563                    block_ref: block.reference(),
564                });
565            }
566        }
567
568        // Record commit votes from the verified blocks.
569        for block in &blocks {
570            commit_vote_monitor.observe_block(block);
571        }
572
573        let metrics = &context.metrics.node_metrics;
574        let peer_hostname = &context.committee.authority(peer_index).hostname;
575        metrics
576            .synchronizer_fetched_blocks_by_peer
577            .with_label_values(&[peer_hostname.as_str(), sync_method])
578            .inc_by(blocks.len() as u64);
579        for block in &blocks {
580            let block_hostname = &context.committee.authority(block.author()).hostname;
581            metrics
582                .synchronizer_fetched_blocks_by_authority
583                .with_label_values(&[block_hostname.as_str(), sync_method])
584                .inc();
585        }
586
587        debug!(
588            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
589            blocks.len(),
590            blocks.iter().map(|b| b.reference().to_string()).join(", "),
591        );
592
593        // Now send them to core for processing. Ignore the returned missing blocks as
594        // we don't want this mechanism to keep feedback looping on fetching
595        // more blocks. The periodic synchronization will take care of that.
596        let missing_blocks = core_dispatcher
597            .add_blocks(blocks)
598            .await
599            .map_err(|_| ConsensusError::Shutdown)?;
600
601        // now release all the locked blocks as they have been fetched, verified &
602        // processed
603        drop(requested_blocks_guard);
604
605        // kick off immediately the scheduled synchronizer
606        if !missing_blocks.is_empty() {
607            // do not block here, so we avoid any possible cycles.
608            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
609            {
610                warn!("Commands channel is full")
611            }
612        }
613
614        context
615            .metrics
616            .node_metrics
617            .missing_blocks_after_fetch_total
618            .inc_by(missing_blocks.len() as u64);
619
620        Ok(())
621    }
622
623    fn get_highest_accepted_rounds(
624        dag_state: Arc<RwLock<DagState>>,
625        context: &Arc<Context>,
626    ) -> Vec<Round> {
627        let blocks = dag_state
628            .read()
629            .get_last_cached_block_per_authority(Round::MAX);
630        assert_eq!(blocks.len(), context.committee.size());
631
632        blocks
633            .into_iter()
634            .map(|(block, _)| block.round())
635            .collect::<Vec<_>>()
636    }
637
638    fn verify_blocks(
639        serialized_blocks: Vec<Bytes>,
640        block_verifier: Arc<V>,
641        context: &Context,
642        peer_index: AuthorityIndex,
643    ) -> ConsensusResult<Vec<VerifiedBlock>> {
644        let mut verified_blocks = Vec::new();
645
646        for serialized_block in serialized_blocks {
647            let signed_block: SignedBlock =
648                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
649
650            // TODO: dedup block verifications, here and with fetched blocks.
651            if let Err(e) = block_verifier.verify(&signed_block) {
652                // TODO: we might want to use a different metric to track the invalid "served"
653                // blocks from the invalid "proposed" ones.
654                let hostname = context.committee.authority(peer_index).hostname.clone();
655
656                context
657                    .metrics
658                    .node_metrics
659                    .invalid_blocks
660                    .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
661                    .inc();
662                warn!("Invalid block received from {}: {}", peer_index, e);
663                return Err(e);
664            }
665            let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
666
667            // Dropping is ok because the block will be refetched.
668            // TODO: improve efficiency, maybe suspend and continue processing the block
669            // asynchronously.
670            let now = context.clock.timestamp_utc_ms();
671            if now < verified_block.timestamp_ms() {
672                warn!(
673                    "Synced block {} timestamp {} is in the future (now={}). Ignoring.",
674                    verified_block.reference(),
675                    verified_block.timestamp_ms(),
676                    now
677                );
678                continue;
679            }
680
681            verified_blocks.push(verified_block);
682        }
683
684        Ok(verified_blocks)
685    }
686
687    async fn fetch_blocks_request(
688        network_client: Arc<C>,
689        peer: AuthorityIndex,
690        blocks_guard: BlocksGuard,
691        highest_rounds: Vec<Round>,
692        request_timeout: Duration,
693        mut retries: u32,
694    ) -> (
695        ConsensusResult<Vec<Bytes>>,
696        BlocksGuard,
697        u32,
698        AuthorityIndex,
699        Vec<Round>,
700    ) {
701        let start = Instant::now();
702        let resp = timeout(
703            request_timeout,
704            network_client.fetch_blocks(
705                peer,
706                blocks_guard
707                    .block_refs
708                    .clone()
709                    .into_iter()
710                    .collect::<Vec<_>>(),
711                highest_rounds.clone(),
712                request_timeout,
713            ),
714        )
715        .await;
716
717        fail_point_async!("consensus-delay");
718
719        let resp = match resp {
720            Ok(Err(err)) => {
721                // Add a delay before retrying - if that is needed. If request has timed out
722                // then eventually this will be a no-op.
723                sleep_until(start + request_timeout).await;
724                retries += 1;
725                Err(err)
726            } // network error
727            Err(err) => {
728                // timeout
729                sleep_until(start + request_timeout).await;
730                retries += 1;
731                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
732            }
733            Ok(result) => result,
734        };
735        (resp, blocks_guard, retries, peer, highest_rounds)
736    }
737
738    fn start_fetch_own_last_block_task(&mut self) {
739        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
740        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
741
742        let context = self.context.clone();
743        let dag_state = self.dag_state.clone();
744        let network_client = self.network_client.clone();
745        let block_verifier = self.block_verifier.clone();
746        let core_dispatcher = self.core_dispatcher.clone();
747
748        self.fetch_own_last_block_task
749            .spawn(monitored_future!(async move {
750                let _scope = monitored_scope("FetchOwnLastBlockTask");
751
752                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
753                    let network_client_cloned = network_client.clone();
754                    let own_index = context.own_index;
755                    async move {
756                        sleep(fetch_own_block_delay).await;
757                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
758                        (r, authority_index)
759                    }
760                };
761
762                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
763                                    let mut result = Vec::new();
764                                    for serialized_block in blocks {
765                                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
766                                        block_verifier.verify(&signed_block).tap_err(|err|{
767                                            let hostname = context.committee.authority(authority_index).hostname.clone();
768                                            context
769                                                .metrics
770                                                .node_metrics
771                                                .invalid_blocks
772                                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
773                                                .inc();
774                                            warn!("Invalid block received from {}: {}", authority_index, err);
775                                        })?;
776
777                                        let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
778                                        if verified_block.author() != context.own_index {
779                                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
780                                        }
781                                        result.push(verified_block);
782                                    }
783                                    Ok(result)
784                };
785
786                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
787                let mut highest_round;
788                let mut retries = 0;
789                let mut retry_delay_step = Duration::from_millis(500);
790                'main:loop {
791                    if context.committee.size() == 1 {
792                        highest_round = dag_state.read().get_last_proposed_block().round();
793                        info!("Only one node in the network, will not try fetching own last block from peers.");
794                        break 'main;
795                    }
796
797                    let mut total_stake = 0;
798                    highest_round = 0;
799
800                    // Ask all the other peers about our last block
801                    let mut results = FuturesUnordered::new();
802
803                    for (authority_index, _authority) in context.committee.authorities() {
804                        if authority_index != context.own_index {
805                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
806                        }
807                    }
808
809                    // Gather the results but wait to timeout as well
810                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
811                    tokio::pin!(timer);
812
813                    'inner: loop {
814                        tokio::select! {
815                            result = results.next() => {
816                                let Some((result, authority_index)) = result else {
817                                    break 'inner;
818                                };
819                                match result {
820                                    Ok(result) => {
821                                        match process_blocks(result, authority_index) {
822                                            Ok(blocks) => {
823                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
824                                                highest_round = highest_round.max(max_round);
825
826                                                total_stake += context.committee.stake(authority_index);
827                                            },
828                                            Err(err) => {
829                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
830                                            }
831                                        }
832                                    },
833                                    Err(err) => {
834                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
835                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
836                                    }
837                                }
838                            },
839                            () = &mut timer => {
840                                info!("Timeout while trying to sync our own last block from peers");
841                                break 'inner;
842                            }
843                        }
844                    }
845
846                    // Request at least f+1 stake to have replied back.
847                    if context.committee.reached_validity(total_stake) {
848                        info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
849                        break 'main;
850                    }
851
852                    retries += 1;
853                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
854                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
855
856                    sleep(retry_delay_step).await;
857
858                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
859                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
860                }
861
862                // Update the Core with the highest detected round
863                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
864
865                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
866                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
867                }
868            }));
869    }
870
871    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
872        let mut missing_blocks = self
873            .core_dispatcher
874            .get_missing_blocks()
875            .await
876            .map_err(|_err| ConsensusError::Shutdown)?;
877
878        // No reason to kick off the scheduler if there are no missing blocks to fetch
879        if missing_blocks.is_empty() {
880            return Ok(());
881        }
882
883        let context = self.context.clone();
884        let network_client = self.network_client.clone();
885        let block_verifier = self.block_verifier.clone();
886        let commit_vote_monitor = self.commit_vote_monitor.clone();
887        let core_dispatcher = self.core_dispatcher.clone();
888        let blocks_to_fetch = self.inflight_blocks_map.clone();
889        let commands_sender = self.commands_sender.clone();
890        let dag_state = self.dag_state.clone();
891
892        let (commit_lagging, last_commit_index, quorum_commit_index) = self.is_commit_lagging();
893        if commit_lagging {
894            // If gc is enabled and we are commit lagging, then we don't want to enable the
895            // scheduler. As the new logic of processing the certified commits
896            // takes place we are guaranteed that commits will happen for all the certified
897            // commits.
898            if dag_state.read().gc_enabled() {
899                return Ok(());
900            }
901
902            // As node is commit lagging try to sync only the missing blocks that are within
903            // the acceptable round thresholds to sync. The rest we don't attempt to
904            // sync yet.
905            let highest_accepted_round = dag_state.read().highest_accepted_round();
906            missing_blocks = missing_blocks
907                .into_iter()
908                .take_while(|b| {
909                    b.round <= highest_accepted_round + SYNC_MISSING_BLOCK_ROUND_THRESHOLD
910                })
911                .collect::<BTreeSet<_>>();
912
913            // If no missing blocks are within the acceptable thresholds to sync while we
914            // commit lag, then we disable the scheduler completely for this run.
915            if missing_blocks.is_empty() {
916                trace!(
917                    "Scheduled synchronizer temporarily disabled as local commit is falling behind from quorum {last_commit_index} << {quorum_commit_index} and missing blocks are too far in the future."
918                );
919                self.context
920                    .metrics
921                    .node_metrics
922                    .fetch_blocks_scheduler_skipped
923                    .with_label_values(&["commit_lagging"])
924                    .inc();
925                return Ok(());
926            }
927        }
928
929        self.fetch_blocks_scheduler_task
930            .spawn(monitored_future!(async move {
931                let _scope = monitored_scope("FetchMissingBlocksScheduler");
932
933                context
934                    .metrics
935                    .node_metrics
936                    .fetch_blocks_scheduler_inflight
937                    .inc();
938                let total_requested = missing_blocks.len();
939
940                fail_point_async!("consensus-delay");
941
942                // Fetch blocks from peers
943                let results = Self::fetch_blocks_from_authorities(
944                    context.clone(),
945                    blocks_to_fetch.clone(),
946                    network_client,
947                    missing_blocks,
948                    dag_state,
949                )
950                .await;
951                context
952                    .metrics
953                    .node_metrics
954                    .fetch_blocks_scheduler_inflight
955                    .dec();
956                if results.is_empty() {
957                    warn!("No results returned while requesting missing blocks");
958                    return;
959                }
960
961                // Now process the returned results
962                let mut total_fetched = 0;
963                for (blocks_guard, fetched_blocks, peer) in results {
964                    total_fetched += fetched_blocks.len();
965
966                    if let Err(err) = Self::process_fetched_blocks(
967                        fetched_blocks,
968                        peer,
969                        blocks_guard,
970                        core_dispatcher.clone(),
971                        block_verifier.clone(),
972                        commit_vote_monitor.clone(),
973                        context.clone(),
974                        commands_sender.clone(),
975                        "periodic",
976                    )
977                    .await
978                    {
979                        warn!(
980                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
981                        );
982                    }
983                }
984
985                debug!(
986                    "Total blocks requested to fetch: {}, total fetched: {}",
987                    total_requested, total_fetched
988                );
989            }));
990        Ok(())
991    }
992
993    fn is_commit_lagging(&self) -> (bool, CommitIndex, CommitIndex) {
994        let last_commit_index = self.dag_state.read().last_commit_index();
995        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
996        let commit_threshold = last_commit_index
997            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
998        (
999            commit_threshold < quorum_commit_index,
1000            last_commit_index,
1001            quorum_commit_index,
1002        )
1003    }
1004
1005    /// Fetches the `missing_blocks` from available peers. The method will
1006    /// attempt to split the load amongst multiple (random) peers.
1007    /// The method returns a vector with the fetched blocks from each peer that
1008    /// successfully responded and any corresponding additional ancestor blocks.
1009    /// Each element of the vector is a tuple which contains the requested
1010    /// missing block refs, the returned blocks and the peer authority index.
1011    async fn fetch_blocks_from_authorities(
1012        context: Arc<Context>,
1013        inflight_blocks: Arc<InflightBlocksMap>,
1014        network_client: Arc<C>,
1015        missing_blocks: BTreeSet<BlockRef>,
1016        dag_state: Arc<RwLock<DagState>>,
1017    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1018        const MAX_PEERS: usize = 3;
1019
1020        // Attempt to fetch only up to a max of blocks
1021        let missing_blocks = missing_blocks
1022            .into_iter()
1023            .take(MAX_PEERS * MAX_BLOCKS_PER_FETCH)
1024            .collect::<Vec<_>>();
1025
1026        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1027        for block in &missing_blocks {
1028            missing_blocks_per_authority[block.author] += 1;
1029        }
1030        for (missing, (_, authority)) in missing_blocks_per_authority
1031            .into_iter()
1032            .zip(context.committee.authorities())
1033        {
1034            context
1035                .metrics
1036                .node_metrics
1037                .synchronizer_missing_blocks_by_authority
1038                .with_label_values(&[&authority.hostname])
1039                .inc_by(missing as u64);
1040            context
1041                .metrics
1042                .node_metrics
1043                .synchronizer_current_missing_blocks_by_authority
1044                .with_label_values(&[&authority.hostname])
1045                .set(missing as i64);
1046        }
1047
1048        #[cfg_attr(test, expect(unused_mut))]
1049        let mut peers = context
1050            .committee
1051            .authorities()
1052            .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1053            .collect::<Vec<_>>();
1054
1055        // In test, the order is not randomized
1056        #[cfg(not(test))]
1057        peers.shuffle(&mut ThreadRng::default());
1058
1059        let mut peers = peers.into_iter();
1060        let mut request_futures = FuturesUnordered::new();
1061
1062        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1063
1064        // Send the initial requests
1065        for blocks in missing_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1066            let peer = peers
1067                .next()
1068                .expect("Possible misconfiguration as a peer should be found");
1069            let peer_hostname = &context.committee.authority(peer).hostname;
1070            let block_refs = blocks.iter().cloned().collect::<BTreeSet<_>>();
1071
1072            // lock the blocks to be fetched. If no lock can be acquired for any of the
1073            // blocks then don't bother
1074            if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1075                info!(
1076                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1077                    block_refs.len(),
1078                    peer,
1079                    peer_hostname,
1080                    block_refs
1081                        .iter()
1082                        .map(|b| b.to_string())
1083                        .collect::<Vec<_>>()
1084                        .join(", ")
1085                );
1086                request_futures.push(Self::fetch_blocks_request(
1087                    network_client.clone(),
1088                    peer,
1089                    blocks_guard,
1090                    highest_rounds.clone(),
1091                    FETCH_REQUEST_TIMEOUT,
1092                    1,
1093                ));
1094            }
1095        }
1096
1097        let mut results = Vec::new();
1098        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1099
1100        tokio::pin!(fetcher_timeout);
1101
1102        loop {
1103            tokio::select! {
1104                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1105                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1106                    match response {
1107                        Ok(fetched_blocks) => {
1108                            info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname);
1109                            results.push((blocks_guard, fetched_blocks, peer_index));
1110
1111                            // no more pending requests are left, just break the loop
1112                            if request_futures.is_empty() {
1113                                break;
1114                            }
1115                        },
1116                        Err(_) => {
1117                            // try again if there is any peer left
1118                            if let Some(next_peer) = peers.next() {
1119                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1120                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1121                                    info!(
1122                                        "Retrying syncing {} missing blocks from peer {}: {}",
1123                                        blocks_guard.block_refs.len(),
1124                                        peer_hostname,
1125                                        blocks_guard.block_refs
1126                                            .iter()
1127                                            .map(|b| b.to_string())
1128                                            .collect::<Vec<_>>()
1129                                            .join(", ")
1130                                    );
1131                                    request_futures.push(Self::fetch_blocks_request(
1132                                        network_client.clone(),
1133                                        next_peer,
1134                                        blocks_guard,
1135                                        highest_rounds,
1136                                        FETCH_REQUEST_TIMEOUT,
1137                                        1,
1138                                    ));
1139                                } else {
1140                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1141                                }
1142                            } else {
1143                                debug!("No more peers left to fetch blocks");
1144                            }
1145                        }
1146                    }
1147                },
1148                _ = &mut fetcher_timeout => {
1149                    debug!("Timed out while fetching missing blocks");
1150                    break;
1151                }
1152            }
1153        }
1154
1155        results
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use std::{
1162        collections::{BTreeMap, BTreeSet},
1163        sync::Arc,
1164        time::Duration,
1165    };
1166
1167    use async_trait::async_trait;
1168    use bytes::Bytes;
1169    use consensus_config::{AuthorityIndex, Parameters};
1170    use parking_lot::RwLock;
1171    use tokio::{sync::Mutex, time::sleep};
1172
1173    use crate::{
1174        BlockAPI, CommitDigest, CommitIndex,
1175        authority_service::COMMIT_LAG_MULTIPLIER,
1176        block::{BlockDigest, BlockRef, Round, TestBlock, VerifiedBlock},
1177        block_verifier::NoopBlockVerifier,
1178        commit::{CommitRange, CommitVote, TrustedCommit},
1179        commit_vote_monitor::CommitVoteMonitor,
1180        context::Context,
1181        core_thread::{CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
1182        dag_state::DagState,
1183        error::{ConsensusError, ConsensusResult},
1184        network::{BlockStream, NetworkClient},
1185        storage::mem_store::MemStore,
1186        synchronizer::{
1187            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap,
1188            MAX_BLOCKS_PER_FETCH, SYNC_MISSING_BLOCK_ROUND_THRESHOLD, Synchronizer,
1189        },
1190    };
1191
1192    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1193    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1194    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1195    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1196
1197    #[derive(Default)]
1198    struct MockNetworkClient {
1199        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1200        fetch_latest_blocks_requests:
1201            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1202    }
1203
1204    impl MockNetworkClient {
1205        async fn stub_fetch_blocks(
1206            &self,
1207            blocks: Vec<VerifiedBlock>,
1208            peer: AuthorityIndex,
1209            latency: Option<Duration>,
1210        ) {
1211            let mut lock = self.fetch_blocks_requests.lock().await;
1212            let block_refs = blocks
1213                .iter()
1214                .map(|block| block.reference())
1215                .collect::<Vec<_>>();
1216            lock.insert((block_refs, peer), (blocks, latency));
1217        }
1218
1219        async fn stub_fetch_latest_blocks(
1220            &self,
1221            blocks: Vec<VerifiedBlock>,
1222            peer: AuthorityIndex,
1223            authorities: Vec<AuthorityIndex>,
1224            latency: Option<Duration>,
1225        ) {
1226            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1227            lock.entry((peer, authorities))
1228                .or_default()
1229                .push((blocks, latency));
1230        }
1231
1232        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1233            let lock = self.fetch_latest_blocks_requests.lock().await;
1234            lock.len()
1235        }
1236    }
1237
1238    #[async_trait]
1239    impl NetworkClient for MockNetworkClient {
1240        const SUPPORT_STREAMING: bool = false;
1241
1242        async fn send_block(
1243            &self,
1244            _peer: AuthorityIndex,
1245            _serialized_block: &VerifiedBlock,
1246            _timeout: Duration,
1247        ) -> ConsensusResult<()> {
1248            unimplemented!("Unimplemented")
1249        }
1250
1251        async fn subscribe_blocks(
1252            &self,
1253            _peer: AuthorityIndex,
1254            _last_received: Round,
1255            _timeout: Duration,
1256        ) -> ConsensusResult<BlockStream> {
1257            unimplemented!("Unimplemented")
1258        }
1259
1260        async fn fetch_blocks(
1261            &self,
1262            peer: AuthorityIndex,
1263            block_refs: Vec<BlockRef>,
1264            _highest_accepted_rounds: Vec<Round>,
1265            _timeout: Duration,
1266        ) -> ConsensusResult<Vec<Bytes>> {
1267            let mut lock = self.fetch_blocks_requests.lock().await;
1268            let response = lock
1269                .remove(&(block_refs, peer))
1270                .expect("Unexpected fetch blocks request made");
1271
1272            let serialised = response
1273                .0
1274                .into_iter()
1275                .map(|block| block.serialized().clone())
1276                .collect::<Vec<_>>();
1277
1278            drop(lock);
1279
1280            if let Some(latency) = response.1 {
1281                sleep(latency).await;
1282            }
1283
1284            Ok(serialised)
1285        }
1286
1287        async fn fetch_commits(
1288            &self,
1289            _peer: AuthorityIndex,
1290            _commit_range: CommitRange,
1291            _timeout: Duration,
1292        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1293            unimplemented!("Unimplemented")
1294        }
1295
1296        async fn fetch_latest_blocks(
1297            &self,
1298            peer: AuthorityIndex,
1299            authorities: Vec<AuthorityIndex>,
1300            _timeout: Duration,
1301        ) -> ConsensusResult<Vec<Bytes>> {
1302            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1303            let mut responses = lock
1304                .remove(&(peer, authorities.clone()))
1305                .expect("Unexpected fetch blocks request made");
1306
1307            let response = responses.remove(0);
1308            let serialised = response
1309                .0
1310                .into_iter()
1311                .map(|block| block.serialized().clone())
1312                .collect::<Vec<_>>();
1313
1314            if !responses.is_empty() {
1315                lock.insert((peer, authorities), responses);
1316            }
1317
1318            drop(lock);
1319
1320            if let Some(latency) = response.1 {
1321                sleep(latency).await;
1322            }
1323
1324            Ok(serialised)
1325        }
1326
1327        async fn get_latest_rounds(
1328            &self,
1329            _peer: AuthorityIndex,
1330            _timeout: Duration,
1331        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1332            unimplemented!("Unimplemented")
1333        }
1334    }
1335
1336    #[test]
1337    fn inflight_blocks_map() {
1338        // GIVEN
1339        let map = InflightBlocksMap::new();
1340        let some_block_refs = [
1341            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1342            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1343            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1344            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1345        ];
1346        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1347
1348        // Lock & unlock blocks
1349        {
1350            let mut all_guards = Vec::new();
1351
1352            // Try to acquire the block locks for authorities 1 & 2
1353            for i in 1..=2 {
1354                let authority = AuthorityIndex::new_for_test(i);
1355
1356                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1357                let guard = guard.expect("Guard should be created");
1358                assert_eq!(guard.block_refs.len(), 4);
1359
1360                all_guards.push(guard);
1361
1362                // trying to acquire any of them again will not succeed
1363                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1364                assert!(guard.is_none());
1365            }
1366
1367            // Trying to acquire for authority 3 it will fail - as we have maxed out the
1368            // number of allowed peers
1369            let authority_3 = AuthorityIndex::new_for_test(3);
1370
1371            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1372            assert!(guard.is_none());
1373
1374            // Explicitly drop the guard of authority 1 and try for authority 3 again - it
1375            // will now succeed
1376            drop(all_guards.remove(0));
1377
1378            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1379            let guard = guard.expect("Guard should be successfully acquired");
1380
1381            assert_eq!(guard.block_refs, missing_block_refs);
1382
1383            // Dropping all guards should unlock on the block refs
1384            drop(guard);
1385            drop(all_guards);
1386
1387            assert_eq!(map.num_of_locked_blocks(), 0);
1388        }
1389
1390        // Swap locks
1391        {
1392            // acquire a lock for authority 1
1393            let authority_1 = AuthorityIndex::new_for_test(1);
1394            let guard = map
1395                .lock_blocks(missing_block_refs.clone(), authority_1)
1396                .unwrap();
1397
1398            // Now swap the locks for authority 2
1399            let authority_2 = AuthorityIndex::new_for_test(2);
1400            let guard = map.swap_locks(guard, authority_2);
1401
1402            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1403        }
1404    }
1405
1406    #[tokio::test]
1407    async fn successful_fetch_blocks_from_peer() {
1408        // GIVEN
1409        let (context, _) = Context::new_for_test(4);
1410        let context = Arc::new(context);
1411        let block_verifier = Arc::new(NoopBlockVerifier {});
1412        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1413        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1414        let network_client = Arc::new(MockNetworkClient::default());
1415        let store = Arc::new(MemStore::new());
1416        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1417
1418        let handle = Synchronizer::start(
1419            network_client.clone(),
1420            context,
1421            core_dispatcher.clone(),
1422            commit_vote_monitor,
1423            block_verifier,
1424            dag_state,
1425            false,
1426        );
1427
1428        // Create some test blocks
1429        let expected_blocks = (0..10)
1430            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1431            .collect::<Vec<_>>();
1432        let missing_blocks = expected_blocks
1433            .iter()
1434            .map(|block| block.reference())
1435            .collect::<BTreeSet<_>>();
1436
1437        // AND stub the fetch_blocks request from peer 1
1438        let peer = AuthorityIndex::new_for_test(1);
1439        network_client
1440            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1441            .await;
1442
1443        // WHEN request missing blocks from peer 1
1444        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1445
1446        // Wait a little bit until those have been added in core
1447        sleep(Duration::from_millis(1_000)).await;
1448
1449        // THEN ensure those ended up in Core
1450        let added_blocks = core_dispatcher.get_add_blocks().await;
1451        assert_eq!(added_blocks, expected_blocks);
1452    }
1453
1454    #[tokio::test]
1455    async fn saturate_fetch_blocks_from_peer() {
1456        // GIVEN
1457        let (context, _) = Context::new_for_test(4);
1458        let context = Arc::new(context);
1459        let block_verifier = Arc::new(NoopBlockVerifier {});
1460        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1461        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1462        let network_client = Arc::new(MockNetworkClient::default());
1463        let store = Arc::new(MemStore::new());
1464        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1465
1466        let handle = Synchronizer::start(
1467            network_client.clone(),
1468            context,
1469            core_dispatcher.clone(),
1470            commit_vote_monitor,
1471            block_verifier,
1472            dag_state,
1473            false,
1474        );
1475
1476        // Create some test blocks
1477        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1478            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1479            .collect::<Vec<_>>();
1480
1481        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1482        let peer = AuthorityIndex::new_for_test(1);
1483        let mut iter = expected_blocks.iter().peekable();
1484        while let Some(block) = iter.next() {
1485            // stub the fetch_blocks request from peer 1 and give some high response latency
1486            // so requests can start blocking the peer task.
1487            network_client
1488                .stub_fetch_blocks(
1489                    vec![block.clone()],
1490                    peer,
1491                    Some(Duration::from_millis(5_000)),
1492                )
1493                .await;
1494
1495            let mut missing_blocks = BTreeSet::new();
1496            missing_blocks.insert(block.reference());
1497
1498            // WHEN requesting to fetch the blocks, it should not succeed for the last
1499            // request and get an error with "saturated" synchronizer
1500            if iter.peek().is_none() {
1501                match handle.fetch_blocks(missing_blocks, peer).await {
1502                    Err(ConsensusError::SynchronizerSaturated(index)) => {
1503                        assert_eq!(index, peer);
1504                    }
1505                    _ => panic!("A saturated synchronizer error was expected"),
1506                }
1507            } else {
1508                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1509            }
1510        }
1511    }
1512
1513    #[tokio::test(flavor = "current_thread", start_paused = true)]
1514    async fn synchronizer_periodic_task_fetch_blocks() {
1515        // GIVEN
1516        let (context, _) = Context::new_for_test(4);
1517        let context = Arc::new(context);
1518        let block_verifier = Arc::new(NoopBlockVerifier {});
1519        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1520        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1521        let network_client = Arc::new(MockNetworkClient::default());
1522        let store = Arc::new(MemStore::new());
1523        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1524
1525        // Create some test blocks
1526        let expected_blocks = (0..10)
1527            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1528            .collect::<Vec<_>>();
1529        let missing_blocks = expected_blocks
1530            .iter()
1531            .map(|block| block.reference())
1532            .collect::<BTreeSet<_>>();
1533
1534        // AND stub the missing blocks
1535        core_dispatcher
1536            .stub_missing_blocks(missing_blocks.clone())
1537            .await;
1538
1539        // AND stub the requests for authority 1 & 2
1540        // Make the first authority timeout, so the second will be called. "We" are
1541        // authority = 0, so we are skipped anyways.
1542        network_client
1543            .stub_fetch_blocks(
1544                expected_blocks.clone(),
1545                AuthorityIndex::new_for_test(1),
1546                Some(FETCH_REQUEST_TIMEOUT),
1547            )
1548            .await;
1549        network_client
1550            .stub_fetch_blocks(
1551                expected_blocks.clone(),
1552                AuthorityIndex::new_for_test(2),
1553                None,
1554            )
1555            .await;
1556
1557        // WHEN start the synchronizer and wait for a couple of seconds
1558        let _handle = Synchronizer::start(
1559            network_client.clone(),
1560            context,
1561            core_dispatcher.clone(),
1562            commit_vote_monitor,
1563            block_verifier,
1564            dag_state,
1565            false,
1566        );
1567
1568        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1569
1570        // THEN the missing blocks should now be fetched and added to core
1571        let added_blocks = core_dispatcher.get_add_blocks().await;
1572        assert_eq!(added_blocks, expected_blocks);
1573
1574        // AND missing blocks should have been consumed by the stub
1575        assert!(
1576            core_dispatcher
1577                .get_missing_blocks()
1578                .await
1579                .unwrap()
1580                .is_empty()
1581        );
1582    }
1583
1584    #[tokio::test(flavor = "current_thread", start_paused = true)]
1585    async fn synchronizer_periodic_task_when_commit_lagging_with_missing_blocks_in_acceptable_thresholds()
1586     {
1587        // GIVEN
1588        let (mut context, _) = Context::new_for_test(4);
1589
1590        // We want to run this test only when gc is disabled. Once gc gets enabled this
1591        // logic won't execute any more.
1592        context
1593            .protocol_config
1594            .set_consensus_gc_depth_for_testing(0);
1595
1596        let context = Arc::new(context);
1597        let block_verifier = Arc::new(NoopBlockVerifier {});
1598        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1599        let network_client = Arc::new(MockNetworkClient::default());
1600        let store = Arc::new(MemStore::new());
1601        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1602        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1603
1604        // AND stub some missing blocks. The highest accepted round is 0. Create some
1605        // blocks that are below and above the threshold sync.
1606        let expected_blocks = (0..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2)
1607            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1608            .collect::<Vec<_>>();
1609
1610        let missing_blocks = expected_blocks
1611            .iter()
1612            .map(|block| block.reference())
1613            .collect::<BTreeSet<_>>();
1614        core_dispatcher.stub_missing_blocks(missing_blocks).await;
1615
1616        // AND stub the requests for authority 1 & 2
1617        // Make the first authority timeout, so the second will be called. "We" are
1618        // authority = 0, so we are skipped anyways.
1619        let mut expected_blocks = expected_blocks
1620            .into_iter()
1621            .filter(|block| block.round() <= SYNC_MISSING_BLOCK_ROUND_THRESHOLD)
1622            .collect::<Vec<_>>();
1623
1624        for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1625            network_client
1626                .stub_fetch_blocks(
1627                    chunk.to_vec(),
1628                    AuthorityIndex::new_for_test(1),
1629                    Some(FETCH_REQUEST_TIMEOUT),
1630                )
1631                .await;
1632
1633            network_client
1634                .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1635                .await;
1636        }
1637
1638        // Now create some blocks to simulate a commit lag
1639        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1640        let commit_index: CommitIndex = round - 1;
1641        let blocks = (0..4)
1642            .map(|authority| {
1643                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1644                let block = TestBlock::new(round, authority)
1645                    .set_commit_votes(commit_votes)
1646                    .build();
1647
1648                VerifiedBlock::new_for_test(block)
1649            })
1650            .collect::<Vec<_>>();
1651
1652        // Pass them through the commit vote monitor - so now there will be a big commit
1653        // lag to prevent the scheduled synchronizer from running
1654        for block in blocks {
1655            commit_vote_monitor.observe_block(&block);
1656        }
1657
1658        // WHEN start the synchronizer and wait for a couple of seconds where normally
1659        // the synchronizer should have kicked in.
1660        let _handle = Synchronizer::start(
1661            network_client.clone(),
1662            context.clone(),
1663            core_dispatcher.clone(),
1664            commit_vote_monitor.clone(),
1665            block_verifier.clone(),
1666            dag_state.clone(),
1667            false,
1668        );
1669
1670        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1671
1672        // We should be in commit lag mode, but since there are missing blocks within
1673        // the acceptable round thresholds those ones should be fetched. Nothing above.
1674        let mut added_blocks = core_dispatcher.get_add_blocks().await;
1675
1676        added_blocks.sort_by_key(|block| block.reference());
1677        expected_blocks.sort_by_key(|block| block.reference());
1678
1679        assert_eq!(added_blocks, expected_blocks);
1680    }
1681
1682    #[tokio::test(flavor = "current_thread", start_paused = true)]
1683    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1684        // GIVEN
1685        let (context, _) = Context::new_for_test(4);
1686        let context = Arc::new(context);
1687        let block_verifier = Arc::new(NoopBlockVerifier {});
1688        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1689        let network_client = Arc::new(MockNetworkClient::default());
1690        let store = Arc::new(MemStore::new());
1691        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1692        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1693
1694        // AND stub some missing blocks. The highest accepted round is 0. Create blocks
1695        // that are above the threshold sync.
1696        let mut expected_blocks = (SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 2
1697            ..SYNC_MISSING_BLOCK_ROUND_THRESHOLD * 3)
1698            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1699            .collect::<Vec<_>>();
1700        let missing_blocks = expected_blocks
1701            .iter()
1702            .map(|block| block.reference())
1703            .collect::<BTreeSet<_>>();
1704        core_dispatcher
1705            .stub_missing_blocks(missing_blocks.clone())
1706            .await;
1707
1708        // AND stub the requests for authority 1 & 2
1709        // Make the first authority timeout, so the second will be called. "We" are
1710        // authority = 0, so we are skipped anyways.
1711        for chunk in expected_blocks.chunks(MAX_BLOCKS_PER_FETCH) {
1712            network_client
1713                .stub_fetch_blocks(
1714                    chunk.to_vec(),
1715                    AuthorityIndex::new_for_test(1),
1716                    Some(FETCH_REQUEST_TIMEOUT),
1717                )
1718                .await;
1719            network_client
1720                .stub_fetch_blocks(chunk.to_vec(), AuthorityIndex::new_for_test(2), None)
1721                .await;
1722        }
1723
1724        // Now create some blocks to simulate a commit lag
1725        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1726        let commit_index: CommitIndex = round - 1;
1727        let blocks = (0..4)
1728            .map(|authority| {
1729                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1730                let block = TestBlock::new(round, authority)
1731                    .set_commit_votes(commit_votes)
1732                    .build();
1733
1734                VerifiedBlock::new_for_test(block)
1735            })
1736            .collect::<Vec<_>>();
1737
1738        // Pass them through the commit vote monitor - so now there will be a big commit
1739        // lag to prevent the scheduled synchronizer from running
1740        for block in blocks {
1741            commit_vote_monitor.observe_block(&block);
1742        }
1743
1744        // WHEN start the synchronizer and wait for a couple of seconds where normally
1745        // the synchronizer should have kicked in.
1746        let _handle = Synchronizer::start(
1747            network_client.clone(),
1748            context.clone(),
1749            core_dispatcher.clone(),
1750            commit_vote_monitor.clone(),
1751            block_verifier,
1752            dag_state.clone(),
1753            false,
1754        );
1755
1756        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1757
1758        // Since we should be in commit lag mode none of the missed blocks should have
1759        // been fetched - hence nothing should be sent to core for processing.
1760        let added_blocks = core_dispatcher.get_add_blocks().await;
1761        assert_eq!(added_blocks, vec![]);
1762
1763        // AND advance now the local commit index by adding a new commit that matches
1764        // the commit index of quorum
1765        {
1766            let mut d = dag_state.write();
1767            for index in 1..=commit_index {
1768                let commit =
1769                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1770
1771                d.add_commit(commit);
1772            }
1773
1774            assert_eq!(
1775                d.last_commit_index(),
1776                commit_vote_monitor.quorum_commit_index()
1777            );
1778        }
1779
1780        // Now stub again the missing blocks to fetch the exact same ones.
1781        core_dispatcher
1782            .stub_missing_blocks(missing_blocks.clone())
1783            .await;
1784
1785        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1786
1787        // THEN the missing blocks should now be fetched and added to core
1788        let mut added_blocks = core_dispatcher.get_add_blocks().await;
1789
1790        added_blocks.sort_by_key(|block| block.reference());
1791        expected_blocks.sort_by_key(|block| block.reference());
1792
1793        assert_eq!(added_blocks, expected_blocks);
1794    }
1795
1796    #[tokio::test(flavor = "current_thread", start_paused = true)]
1797    async fn synchronizer_fetch_own_last_block() {
1798        // GIVEN
1799        let (context, _) = Context::new_for_test(4);
1800        let context = Arc::new(context.with_parameters(Parameters {
1801            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1802            ..Default::default()
1803        }));
1804        let block_verifier = Arc::new(NoopBlockVerifier {});
1805        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1806        let network_client = Arc::new(MockNetworkClient::default());
1807        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1808        let store = Arc::new(MemStore::new());
1809        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1810        let our_index = AuthorityIndex::new_for_test(0);
1811
1812        // Create some test blocks
1813        let mut expected_blocks = (9..=10)
1814            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1815            .collect::<Vec<_>>();
1816
1817        // Now set different latest blocks for the peers
1818        // For peer 1 we give the block of round 10 (highest)
1819        let block_1 = expected_blocks.pop().unwrap();
1820        network_client
1821            .stub_fetch_latest_blocks(
1822                vec![block_1.clone()],
1823                AuthorityIndex::new_for_test(1),
1824                vec![our_index],
1825                None,
1826            )
1827            .await;
1828        network_client
1829            .stub_fetch_latest_blocks(
1830                vec![block_1],
1831                AuthorityIndex::new_for_test(1),
1832                vec![our_index],
1833                None,
1834            )
1835            .await;
1836
1837        // For peer 2 we give the block of round 9
1838        let block_2 = expected_blocks.pop().unwrap();
1839        network_client
1840            .stub_fetch_latest_blocks(
1841                vec![block_2.clone()],
1842                AuthorityIndex::new_for_test(2),
1843                vec![our_index],
1844                Some(Duration::from_secs(10)),
1845            )
1846            .await;
1847        network_client
1848            .stub_fetch_latest_blocks(
1849                vec![block_2],
1850                AuthorityIndex::new_for_test(2),
1851                vec![our_index],
1852                None,
1853            )
1854            .await;
1855
1856        // For peer 3 we don't give any block - and it should return an empty vector
1857        network_client
1858            .stub_fetch_latest_blocks(
1859                vec![],
1860                AuthorityIndex::new_for_test(3),
1861                vec![our_index],
1862                Some(Duration::from_secs(10)),
1863            )
1864            .await;
1865        network_client
1866            .stub_fetch_latest_blocks(
1867                vec![],
1868                AuthorityIndex::new_for_test(3),
1869                vec![our_index],
1870                None,
1871            )
1872            .await;
1873
1874        // WHEN start the synchronizer and wait for a couple of seconds
1875        let handle = Synchronizer::start(
1876            network_client.clone(),
1877            context.clone(),
1878            core_dispatcher.clone(),
1879            commit_vote_monitor,
1880            block_verifier,
1881            dag_state,
1882            true,
1883        );
1884
1885        // Wait at least for the timeout time
1886        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1887
1888        // Assert that core has been called to set the min propose round
1889        assert_eq!(
1890            core_dispatcher.get_last_own_proposed_round().await,
1891            vec![10]
1892        );
1893
1894        // Ensure that all the requests have been called
1895        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
1896
1897        // And we got one retry
1898        assert_eq!(
1899            context
1900                .metrics
1901                .node_metrics
1902                .sync_last_known_own_block_retries
1903                .get(),
1904            1
1905        );
1906
1907        // Ensure that no panic occurred
1908        if let Err(err) = handle.stop().await {
1909            if err.is_panic() {
1910                std::panic::resume_unwind(err.into_panic());
1911            }
1912        }
1913    }
1914}