consensus_core/
commit_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CommitSyncer implements efficient synchronization of committed data.
6//!
7//! During the operation of a committee of authorities for consensus, one or
8//! more authorities can fall behind the quorum in their received and accepted
9//! blocks. This can happen due to network disruptions, host crash, or other
10//! reasons. Authorities fell behind need to catch up to the quorum to be able
11//! to vote on the latest leaders. So efficient synchronization is necessary
12//! to minimize the impact of temporary disruptions and maintain smooth
13//! operations of the network.  
14//! CommitSyncer achieves efficient synchronization by relying on the following:
15//! when blocks are included in commits with >= 2f+1 certifiers by stake, these
16//! blocks must have passed verifications on some honest validators, so
17//! re-verifying them is unnecessary. In fact, the quorum certified commits
18//! themselves can be trusted to be sent to IOTA directly, but for simplicity
19//! this is not done. Blocks from trusted commits still go through Core and
20//! committer.
21//!
22//! Another way CommitSyncer improves the efficiency of synchronization is
23//! parallel fetching: commits have a simple dependency graph (linear), so it is
24//! easy to fetch ranges of commits in parallel.
25//!
26//! Commit synchronization is an expensive operation, involving transferring
27//! large amount of data via the network. And it is not on the critical path of
28//! block processing. So the heuristics for synchronization, including triggers
29//! and retries, should be chosen to favor throughput and efficient resource
30//! usage, over faster reactions.
31
32use std::{
33    collections::{BTreeMap, BTreeSet},
34    sync::Arc,
35    time::Duration,
36};
37
38use bytes::Bytes;
39use consensus_config::AuthorityIndex;
40use futures::{StreamExt as _, stream::FuturesOrdered};
41use iota_metrics::spawn_logged_monitored_task;
42use itertools::Itertools as _;
43use parking_lot::RwLock;
44use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
45use tokio::{
46    runtime::Handle,
47    sync::oneshot,
48    task::{JoinHandle, JoinSet},
49    time::{MissedTickBehavior, sleep},
50};
51use tracing::{debug, info, warn};
52
53use crate::{
54    CommitConsumerMonitor, CommitIndex,
55    block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
56    block_verifier::BlockVerifier,
57    commit::{
58        CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
59        CommitRef, TrustedCommit,
60    },
61    commit_vote_monitor::CommitVoteMonitor,
62    context::Context,
63    core_thread::CoreThreadDispatcher,
64    dag_state::DagState,
65    error::{ConsensusError, ConsensusResult},
66    network::NetworkClient,
67    stake_aggregator::{QuorumThreshold, StakeAggregator},
68};
69
70// Handle to stop the CommitSyncer loop.
71pub(crate) struct CommitSyncerHandle {
72    schedule_task: JoinHandle<()>,
73    tx_shutdown: oneshot::Sender<()>,
74}
75
76impl CommitSyncerHandle {
77    pub(crate) async fn stop(self) {
78        let _ = self.tx_shutdown.send(());
79        // Do not abort schedule task, which waits for fetches to shut down.
80        if let Err(e) = self.schedule_task.await {
81            if e.is_panic() {
82                std::panic::resume_unwind(e.into_panic());
83            }
84        }
85    }
86}
87
88pub(crate) struct CommitSyncer<C: NetworkClient> {
89    // States shared by scheduler and fetch tasks.
90
91    // Shared components wrapper.
92    inner: Arc<Inner<C>>,
93
94    // States only used by the scheduler.
95
96    // Inflight requests to fetch commits from different authorities.
97    inflight_fetches: JoinSet<(AuthorityIndex, u32, CertifiedCommits)>,
98    // Additional ranges of commits to fetch.
99    pending_fetches: BTreeSet<CommitRange>,
100    // Fetched commits and blocks by commit range.
101    fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
102    // Highest commit index among inflight and pending fetches.
103    // Used to determine the start of new ranges to be fetched.
104    highest_scheduled_index: Option<CommitIndex>,
105    // Highest index among fetched commits, after commits and blocks are verified.
106    // Used for metrics.
107    highest_fetched_commit_index: CommitIndex,
108    // The commit index that is the max of highest local commit index and commit index inflight to
109    // Core. Used to determine if fetched blocks can be sent to Core without gaps.
110    synced_commit_index: CommitIndex,
111}
112
113impl<C: NetworkClient> CommitSyncer<C> {
114    pub(crate) fn new(
115        context: Arc<Context>,
116        core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
117        commit_vote_monitor: Arc<CommitVoteMonitor>,
118        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
119        network_client: Arc<C>,
120        block_verifier: Arc<dyn BlockVerifier>,
121        dag_state: Arc<RwLock<DagState>>,
122    ) -> Self {
123        let inner = Arc::new(Inner {
124            context,
125            core_thread_dispatcher,
126            commit_vote_monitor,
127            commit_consumer_monitor,
128            network_client,
129            block_verifier,
130            dag_state,
131        });
132        let synced_commit_index = inner.dag_state.read().last_commit_index();
133        CommitSyncer {
134            inner,
135            inflight_fetches: JoinSet::new(),
136            pending_fetches: BTreeSet::new(),
137            fetched_ranges: BTreeMap::new(),
138            highest_scheduled_index: None,
139            highest_fetched_commit_index: 0,
140            synced_commit_index,
141        }
142    }
143
144    pub(crate) fn start(self) -> CommitSyncerHandle {
145        let (tx_shutdown, rx_shutdown) = oneshot::channel();
146        let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
147        CommitSyncerHandle {
148            schedule_task,
149            tx_shutdown,
150        }
151    }
152
153    async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
154        let mut interval = tokio::time::interval(Duration::from_secs(2));
155        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
156
157        loop {
158            tokio::select! {
159                // Periodically, schedule new fetches if the node is falling behind.
160                _ = interval.tick() => {
161                    self.try_schedule_once();
162                }
163                // Handles results from fetch tasks.
164                Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
165                    if let Err(e) = result {
166                        if e.is_panic() {
167                            std::panic::resume_unwind(e.into_panic());
168                        }
169                        warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
170                        // If any fetch is cancelled or panicked, try to shutdown and exit the loop.
171                        self.inflight_fetches.shutdown().await;
172                        return;
173                    }
174                    let (authority, target_end, commits) = result.unwrap();
175                    self.handle_fetch_result(authority, target_end, commits).await;
176                }
177                _ = &mut rx_shutdown => {
178                    // Shutdown requested.
179                    info!("CommitSyncer shutting down ...");
180                    self.inflight_fetches.shutdown().await;
181                    return;
182                }
183            }
184
185            self.try_start_fetches();
186        }
187    }
188
189    fn try_schedule_once(&mut self) {
190        let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
191        let local_commit_index = self.inner.dag_state.read().last_commit_index();
192        let metrics = &self.inner.context.metrics.node_metrics;
193        metrics
194            .commit_sync_quorum_index
195            .set(quorum_commit_index as i64);
196        metrics
197            .commit_sync_local_index
198            .set(local_commit_index as i64);
199        let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
200        let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
201        // Update synced_commit_index periodically to make sure it is no smaller than
202        // local commit index.
203        self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
204        let unhandled_commits_threshold = self.unhandled_commits_threshold();
205        info!(
206            "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
207            self.synced_commit_index,
208            highest_handled_index,
209            highest_scheduled_index,
210            quorum_commit_index,
211            unhandled_commits_threshold,
212        );
213
214        // TODO: cleanup inflight fetches that are no longer needed.
215        let fetch_after_index = self
216            .synced_commit_index
217            .max(self.highest_scheduled_index.unwrap_or(0));
218        // When the node is falling behind, schedule pending fetches which will be
219        // executed on later.
220        for prev_end in (fetch_after_index..=quorum_commit_index)
221            .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
222        {
223            // Create range with inclusive start and end.
224            let range_start = prev_end + 1;
225            let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
226            // Commit range is not fetched when [range_start, range_end] contains less
227            // number of commits than the target batch size. This is to avoid
228            // the cost of processing more and smaller batches. Block broadcast,
229            // subscription and synchronization will help the node catchup.
230            if quorum_commit_index < range_end {
231                break;
232            }
233            // Pause scheduling new fetches when handling of commits is lagging.
234            if highest_handled_index + unhandled_commits_threshold < range_end {
235                warn!(
236                    "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
237                    highest_handled_index, highest_scheduled_index
238                );
239                break;
240            }
241            self.pending_fetches
242                .insert((range_start..=range_end).into());
243            // quorum_commit_index should be non-decreasing, so highest_scheduled_index
244            // should not decrease either.
245            self.highest_scheduled_index = Some(range_end);
246        }
247    }
248
249    async fn handle_fetch_result(
250        &mut self,
251        authority_index: AuthorityIndex,
252        target_end: CommitIndex,
253        certified_commits: CertifiedCommits,
254    ) {
255        assert!(!certified_commits.commits().is_empty());
256
257        let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
258            .commits()
259            .iter()
260            .fold((0, 0), |(blocks, bytes), c| {
261                (
262                    blocks + c.blocks().len(),
263                    bytes
264                        + c.blocks()
265                            .iter()
266                            .map(|b| b.serialized().len())
267                            .sum::<usize>() as u64,
268                )
269            });
270        let hostname = &self
271            .inner
272            .context
273            .committee
274            .authority(authority_index)
275            .hostname;
276        let metrics = &self.inner.context.metrics.node_metrics;
277        metrics
278            .commit_sync_fetched_commits
279            .with_label_values(&[&hostname.as_str()])
280            .inc_by(certified_commits.commits().len() as u64);
281        metrics
282            .commit_sync_fetched_blocks
283            .with_label_values(&[&hostname.as_str()])
284            .inc_by(total_blocks_fetched as u64);
285        metrics
286            .commit_sync_total_fetched_blocks_size
287            .with_label_values(&[&hostname.as_str()])
288            .inc_by(total_blocks_size_bytes);
289
290        let (commit_start, commit_end) = (
291            certified_commits.commits().first().unwrap().index(),
292            certified_commits.commits().last().unwrap().index(),
293        );
294        self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
295        metrics
296            .commit_sync_highest_fetched_index
297            .set(self.highest_fetched_commit_index as i64);
298
299        // Allow returning partial results, and try fetching the rest separately.
300        if commit_end < target_end {
301            self.pending_fetches
302                .insert((commit_end + 1..=target_end).into());
303        }
304        // Make sure synced_commit_index is up to date.
305        self.synced_commit_index = self
306            .synced_commit_index
307            .max(self.inner.dag_state.read().last_commit_index());
308        // Only add new blocks if at least some of them are not already synced.
309        if self.synced_commit_index < commit_end {
310            self.fetched_ranges
311                .insert((commit_start..=commit_end).into(), certified_commits);
312        }
313        // Try to process as many fetched blocks as possible.
314        while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
315            // Only pop fetched_ranges if there is no gap with blocks already synced.
316            // Note: start, end and synced_commit_index are all inclusive.
317            let (fetched_commit_range, commits) =
318                if fetched_commit_range.start() <= self.synced_commit_index + 1 {
319                    self.fetched_ranges.pop_first().unwrap()
320                } else {
321                    // Found gap between earliest fetched block and latest synced block,
322                    // so not sending additional blocks to Core.
323                    metrics.commit_sync_gap_on_processing.inc();
324                    break;
325                };
326            // Avoid sending to Core a whole batch of already synced blocks.
327            if fetched_commit_range.end() <= self.synced_commit_index {
328                continue;
329            }
330
331            debug!(
332                "Fetched certified blocks for commit range {:?}: {}",
333                fetched_commit_range,
334                commits
335                    .commits()
336                    .iter()
337                    .flat_map(|c| c.blocks())
338                    .map(|b| b.reference().to_string())
339                    .join(","),
340            );
341
342            // If core thread cannot handle the incoming blocks, it is ok to block here.
343            // Also it is possible to have missing ancestors because an equivocating
344            // validator may produce blocks that are not included in commits but
345            // are ancestors to other blocks. Synchronizer is needed to fill in
346            // the missing ancestors in this case.
347            match self
348                .inner
349                .core_thread_dispatcher
350                .add_certified_commits(commits)
351                .await
352            {
353                Ok(missing) => {
354                    if !missing.is_empty() {
355                        warn!(
356                            "Fetched blocks have missing ancestors: {:?} for commit range {:?}",
357                            missing, fetched_commit_range
358                        );
359                    }
360                    for block_ref in missing {
361                        let hostname = &self
362                            .inner
363                            .context
364                            .committee
365                            .authority(block_ref.author)
366                            .hostname;
367                        metrics
368                            .commit_sync_fetch_missing_blocks
369                            .with_label_values(&[hostname])
370                            .inc();
371                    }
372                }
373                Err(e) => {
374                    info!("Failed to add blocks, shutting down: {}", e);
375                    return;
376                }
377            };
378
379            // Once commits and blocks are sent to Core, ratchet up synced_commit_index
380            self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
381        }
382
383        metrics
384            .commit_sync_inflight_fetches
385            .set(self.inflight_fetches.len() as i64);
386        metrics
387            .commit_sync_pending_fetches
388            .set(self.pending_fetches.len() as i64);
389        metrics
390            .commit_sync_highest_synced_index
391            .set(self.synced_commit_index as i64);
392    }
393
394    fn try_start_fetches(&mut self) {
395        // Cap parallel fetches based on configured limit and committee size, to avoid
396        // overloading the network. Also when there are too many fetched blocks
397        // that cannot be sent to Core before an earlier fetch has not finished,
398        // reduce parallelism so the earlier fetch can retry on a better host and
399        // succeed.
400        let target_parallel_fetches = self
401            .inner
402            .context
403            .parameters
404            .commit_sync_parallel_fetches
405            .min(self.inner.context.committee.size() * 2 / 3)
406            .min(
407                self.inner
408                    .context
409                    .parameters
410                    .commit_sync_batches_ahead
411                    .saturating_sub(self.fetched_ranges.len()),
412            )
413            .max(1);
414        // Start new fetches if there are pending batches and available slots.
415        loop {
416            if self.inflight_fetches.len() >= target_parallel_fetches {
417                break;
418            }
419            let Some(commit_range) = self.pending_fetches.pop_first() else {
420                break;
421            };
422            self.inflight_fetches
423                .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
424        }
425
426        let metrics = &self.inner.context.metrics.node_metrics;
427        metrics
428            .commit_sync_inflight_fetches
429            .set(self.inflight_fetches.len() as i64);
430        metrics
431            .commit_sync_pending_fetches
432            .set(self.pending_fetches.len() as i64);
433        metrics
434            .commit_sync_highest_synced_index
435            .set(self.synced_commit_index as i64);
436    }
437
438    // Retries fetching commits and blocks from available authorities, until a
439    // request succeeds where at least a prefix of the commit range is fetched.
440    // Returns the fetched commits and blocks referenced by the commits.
441    async fn fetch_loop(
442        inner: Arc<Inner<C>>,
443        commit_range: CommitRange,
444    ) -> (AuthorityIndex, CommitIndex, CertifiedCommits) {
445        // Individual request base timeout.
446        const TIMEOUT: Duration = Duration::from_secs(10);
447        // Max per-request timeout will be base timeout times a multiplier.
448        // At the extreme, this means there will be 120s timeout to fetch
449        // max_blocks_per_fetch blocks.
450        const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
451        // timeout * max number of targets should be reasonably small, so the
452        // system can adjust to slow network or large data sizes quickly.
453        const MAX_NUM_TARGETS: usize = 24;
454        let mut timeout_multiplier = 0;
455
456        let _timer = inner
457            .context
458            .metrics
459            .node_metrics
460            .commit_sync_fetch_loop_latency
461            .start_timer();
462        info!("Starting to fetch commits in {commit_range:?} ...",);
463        loop {
464            // Attempt to fetch commits and blocks through min(committee size,
465            // MAX_NUM_TARGETS) peers.
466            let mut target_authorities = inner
467                .context
468                .committee
469                .authorities()
470                .filter_map(|(i, _)| {
471                    if i != inner.context.own_index {
472                        Some(i)
473                    } else {
474                        None
475                    }
476                })
477                .collect_vec();
478            target_authorities.shuffle(&mut ThreadRng::default());
479            target_authorities.truncate(MAX_NUM_TARGETS);
480            // Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
481            timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
482            let request_timeout = TIMEOUT * timeout_multiplier;
483            // Give enough overall timeout for fetching commits and blocks.
484            // - Timeout for fetching commits and commit certifying blocks.
485            // - Timeout for fetching blocks referenced by the commits.
486            // - Time spent on pipelining requests to fetch blocks.
487            // - Another headroom to allow fetch_once() to timeout gracefully if possible.
488            let fetch_timeout = request_timeout * 4;
489            // Try fetching from selected target authority.
490            for authority in target_authorities {
491                match tokio::time::timeout(
492                    fetch_timeout,
493                    Self::fetch_once(
494                        inner.clone(),
495                        authority,
496                        commit_range.clone(),
497                        request_timeout,
498                    ),
499                )
500                .await
501                {
502                    Ok(Ok(commits)) => {
503                        info!("Finished fetching commits in {commit_range:?}",);
504                        return (authority, commit_range.end(), commits);
505                    }
506                    Ok(Err(e)) => {
507                        let hostname = inner
508                            .context
509                            .committee
510                            .authority(authority)
511                            .hostname
512                            .clone();
513                        inner
514                            .context
515                            .metrics
516                            .update_scoring_metrics_on_block_receival(
517                                authority,
518                                hostname.as_str(),
519                                e.clone(),
520                                "fetch_once",
521                            );
522                        warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
523                        let error: &'static str = e.into();
524                        inner
525                            .context
526                            .metrics
527                            .node_metrics
528                            .commit_sync_fetch_once_errors
529                            .with_label_values(&[hostname.as_str(), error])
530                            .inc();
531                    }
532                    Err(_) => {
533                        let hostname = inner
534                            .context
535                            .committee
536                            .authority(authority)
537                            .hostname
538                            .clone();
539                        warn!("Timed out fetching {commit_range:?} from {authority}",);
540                        inner
541                            .context
542                            .metrics
543                            .node_metrics
544                            .commit_sync_fetch_once_errors
545                            .with_label_values(&[hostname.as_str(), "FetchTimeout"])
546                            .inc();
547                    }
548                }
549            }
550            // Avoid busy looping, by waiting for a while before retrying.
551            sleep(TIMEOUT).await;
552        }
553    }
554
555    // Fetches commits and blocks from a single authority. At a high level, first
556    // the commits are fetched and verified. After that, blocks referenced in
557    // the certified commits are fetched and sent to Core for processing.
558    async fn fetch_once(
559        inner: Arc<Inner<C>>,
560        target_authority: AuthorityIndex,
561        commit_range: CommitRange,
562        timeout: Duration,
563    ) -> ConsensusResult<CertifiedCommits> {
564        // Maximum delay between consecutive pipelined requests, to avoid
565        // overwhelming the peer while still maintaining reasonable throughput.
566        const MAX_PIPELINE_DELAY: Duration = Duration::from_secs(1);
567
568        let hostname = inner
569            .context
570            .committee
571            .authority(target_authority)
572            .hostname
573            .clone();
574        let _timer = inner
575            .context
576            .metrics
577            .node_metrics
578            .commit_sync_fetch_once_latency
579            .with_label_values(&[hostname.as_str()])
580            .start_timer();
581
582        // 1. Fetch commits in the commit range from the target authority.
583        let (serialized_commits, serialized_blocks) = inner
584            .network_client
585            .fetch_commits(target_authority, commit_range.clone(), timeout)
586            .await?;
587
588        // 2. Verify the response contains blocks that can certify the last returned
589        //    commit,
590        // and the returned commits are chained by digest, so earlier commits are
591        // certified as well.
592        let (commits, vote_blocks) = Handle::current()
593            .spawn_blocking({
594                let inner = inner.clone();
595                move || {
596                    inner.verify_commits(
597                        target_authority,
598                        commit_range,
599                        serialized_commits,
600                        serialized_blocks,
601                    )
602                }
603            })
604            .await
605            .expect("Spawn blocking should not fail")?;
606
607        // 3. Fetch blocks referenced by the commits, from the same authority.
608        let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
609        let num_chunks = block_refs
610            .len()
611            .div_ceil(inner.context.parameters.max_blocks_per_fetch)
612            as u32;
613        let mut requests: FuturesOrdered<_> = block_refs
614            .chunks(inner.context.parameters.max_blocks_per_fetch)
615            .enumerate()
616            .map(|(i, request_block_refs)| {
617                let inner = inner.clone();
618                async move {
619                    // 4. Send out pipelined fetch requests to avoid overloading the target
620                    //    authority.
621                    let individual_delay = (timeout / num_chunks).min(MAX_PIPELINE_DELAY);
622                    sleep(individual_delay * i as u32).await;
623                    // TODO: add some retries.
624                    let serialized_blocks = inner
625                        .network_client
626                        .fetch_blocks(
627                            target_authority,
628                            request_block_refs.to_vec(),
629                            vec![],
630                            timeout,
631                        )
632                        .await?;
633                    // 5. Verify the same number of blocks are returned as requested.
634                    if request_block_refs.len() != serialized_blocks.len() {
635                        return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
636                            authority: target_authority,
637                            requested: request_block_refs.len(),
638                            received: serialized_blocks.len(),
639                        });
640                    }
641                    // 6. Verify returned blocks have valid formats.
642                    let signed_blocks = serialized_blocks
643                        .iter()
644                        .map(|serialized| {
645                            let block: SignedBlock = bcs::from_bytes(serialized)
646                                .map_err(ConsensusError::MalformedBlock)?;
647                            Ok(block)
648                        })
649                        .collect::<ConsensusResult<Vec<_>>>()?;
650                    // 7. Verify the returned blocks match the requested block refs.
651                    // If they do match, the returned blocks can be considered verified as well.
652                    let mut blocks = Vec::new();
653                    for ((requested_block_ref, signed_block), serialized) in request_block_refs
654                        .iter()
655                        .zip(signed_blocks.into_iter())
656                        .zip(serialized_blocks.into_iter())
657                    {
658                        let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
659                        let received_block_ref = BlockRef::new(
660                            signed_block.round(),
661                            signed_block.author(),
662                            signed_block_digest,
663                        );
664                        if *requested_block_ref != received_block_ref {
665                            return Err(ConsensusError::UnexpectedBlockForCommit {
666                                peer: target_authority,
667                                requested: *requested_block_ref,
668                                received: received_block_ref,
669                            });
670                        }
671                        blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
672                    }
673                    Ok(blocks)
674                }
675            })
676            .collect();
677
678        let mut fetched_blocks = BTreeMap::new();
679        while let Some(result) = requests.next().await {
680            for block in result? {
681                fetched_blocks.insert(block.reference(), block);
682            }
683        }
684
685        // 8. Make sure fetched block (and votes) timestamps are lower than current
686        //    time.
687        for block in fetched_blocks.values().chain(vote_blocks.iter()) {
688            let now_ms = inner.context.clock.timestamp_utc_ms();
689            let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
690            if forward_drift == 0 {
691                continue;
692            };
693            let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
694            inner
695                .context
696                .metrics
697                .node_metrics
698                .block_timestamp_drift_ms
699                .with_label_values(&[peer_hostname.as_str(), "commit_syncer"])
700                .inc_by(forward_drift);
701
702            // We want to run the following checks only if the median based commit timestamp
703            // is not enabled.
704            if !inner
705                .context
706                .protocol_config
707                .consensus_median_timestamp_with_checkpoint_enforcement()
708            {
709                let forward_drift = Duration::from_millis(forward_drift);
710                if forward_drift >= inner.context.parameters.max_forward_time_drift {
711                    warn!(
712                        "Local clock is behind a quorum of peers: local ts {}, committed block ts {}",
713                        now_ms,
714                        block.timestamp_ms()
715                    );
716                }
717                sleep(forward_drift).await;
718            }
719        }
720
721        // 9. Now create the Certified commits by assigning the blocks to each commit
722        //    and retaining the commit votes history.
723        let mut certified_commits = Vec::new();
724        for commit in &commits {
725            let blocks = commit
726                .blocks()
727                .iter()
728                .map(|block_ref| {
729                    fetched_blocks
730                        .remove(block_ref)
731                        .expect("Block should exist")
732                })
733                .collect::<Vec<_>>();
734            certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
735        }
736
737        Ok(CertifiedCommits::new(certified_commits, vote_blocks))
738    }
739
740    fn unhandled_commits_threshold(&self) -> CommitIndex {
741        self.inner.context.parameters.commit_sync_batch_size
742            * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
743    }
744
745    #[cfg(test)]
746    fn pending_fetches(&self) -> BTreeSet<CommitRange> {
747        self.pending_fetches.clone()
748    }
749
750    #[cfg(test)]
751    fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
752        self.fetched_ranges.clone()
753    }
754
755    #[cfg(test)]
756    fn highest_scheduled_index(&self) -> Option<CommitIndex> {
757        self.highest_scheduled_index
758    }
759
760    #[cfg(test)]
761    fn highest_fetched_commit_index(&self) -> CommitIndex {
762        self.highest_fetched_commit_index
763    }
764
765    #[cfg(test)]
766    fn synced_commit_index(&self) -> CommitIndex {
767        self.synced_commit_index
768    }
769}
770
771struct Inner<C: NetworkClient> {
772    context: Arc<Context>,
773    core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
774    commit_vote_monitor: Arc<CommitVoteMonitor>,
775    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
776    network_client: Arc<C>,
777    block_verifier: Arc<dyn BlockVerifier>,
778    dag_state: Arc<RwLock<DagState>>,
779}
780
781impl<C: NetworkClient> Inner<C> {
782    /// Verifies the commits and also certifies them using the provided vote
783    /// blocks for the last commit. The method returns the trusted commits
784    /// and the votes as verified blocks.
785    fn verify_commits(
786        &self,
787        peer: AuthorityIndex,
788        commit_range: CommitRange,
789        serialized_commits: Vec<Bytes>,
790        serialized_vote_blocks: Vec<Bytes>,
791    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
792        // Parse and verify commits.
793        let mut commits = Vec::new();
794        for serialized in &serialized_commits {
795            let commit: Commit =
796                bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
797            let digest = TrustedCommit::compute_digest(serialized);
798            if commits.is_empty() {
799                // start is inclusive, so first commit must be at the start index.
800                if commit.index() != commit_range.start() {
801                    return Err(ConsensusError::UnexpectedStartCommit {
802                        peer,
803                        start: commit_range.start(),
804                        commit: Box::new(commit),
805                    });
806                }
807            } else {
808                // Verify next commit increments index and references the previous digest.
809                let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
810                    commits.last().unwrap();
811                if commit.index() != last_commit.index() + 1
812                    || &commit.previous_digest() != last_commit_digest
813                {
814                    return Err(ConsensusError::UnexpectedCommitSequence {
815                        peer,
816                        prev_commit: Box::new(last_commit.clone()),
817                        curr_commit: Box::new(commit),
818                    });
819                }
820            }
821            // Do not process more commits past the end index.
822            if commit.index() > commit_range.end() {
823                break;
824            }
825            commits.push((digest, commit));
826        }
827        let Some((end_commit_digest, end_commit)) = commits.last() else {
828            return Err(ConsensusError::NoCommitReceived { peer });
829        };
830
831        // Parse and verify blocks. Then accumulate votes on the end commit.
832        let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
833        let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
834        let mut vote_blocks = Vec::new();
835        for serialized in serialized_vote_blocks {
836            let block: SignedBlock =
837                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
838            // The block signature needs to be verified.
839            self.block_verifier.verify(&block)?;
840            for vote in block.commit_votes() {
841                if *vote == end_commit_ref {
842                    stake_aggregator.add(block.author(), &self.context.committee);
843                }
844            }
845            vote_blocks.push(VerifiedBlock::new_verified(block, serialized));
846        }
847
848        // Check if the end commit has enough votes.
849        if !stake_aggregator.reached_threshold(&self.context.committee) {
850            return Err(ConsensusError::NotEnoughCommitVotes {
851                stake: stake_aggregator.stake(),
852                peer,
853                commit: Box::new(end_commit.clone()),
854            });
855        }
856
857        let trusted_commits = commits
858            .into_iter()
859            .zip(serialized_commits)
860            .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
861            .collect();
862        Ok((trusted_commits, vote_blocks))
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use std::{sync::Arc, time::Duration};
869
870    use bytes::Bytes;
871    use consensus_config::{AuthorityIndex, Parameters};
872    use parking_lot::RwLock;
873
874    use crate::{
875        CommitConsumerMonitor, CommitDigest, CommitRef, Round,
876        block::{BlockRef, TestBlock, VerifiedBlock},
877        block_verifier::NoopBlockVerifier,
878        commit::CommitRange,
879        commit_syncer::CommitSyncer,
880        commit_vote_monitor::CommitVoteMonitor,
881        context::Context,
882        core_thread::tests::MockCoreThreadDispatcher,
883        dag_state::DagState,
884        error::ConsensusResult,
885        network::{BlockStream, NetworkClient},
886        storage::mem_store::MemStore,
887    };
888
889    #[derive(Default)]
890    struct FakeNetworkClient {}
891
892    #[async_trait::async_trait]
893    impl NetworkClient for FakeNetworkClient {
894        const SUPPORT_STREAMING: bool = true;
895
896        async fn send_block(
897            &self,
898            _peer: AuthorityIndex,
899            _serialized_block: &VerifiedBlock,
900            _timeout: Duration,
901        ) -> ConsensusResult<()> {
902            unimplemented!("Unimplemented")
903        }
904
905        async fn subscribe_blocks(
906            &self,
907            _peer: AuthorityIndex,
908            _last_received: Round,
909            _timeout: Duration,
910        ) -> ConsensusResult<BlockStream> {
911            unimplemented!("Unimplemented")
912        }
913
914        async fn fetch_blocks(
915            &self,
916            _peer: AuthorityIndex,
917            _block_refs: Vec<BlockRef>,
918            _highest_accepted_rounds: Vec<Round>,
919            _timeout: Duration,
920        ) -> ConsensusResult<Vec<Bytes>> {
921            unimplemented!("Unimplemented")
922        }
923
924        async fn fetch_commits(
925            &self,
926            _peer: AuthorityIndex,
927            _commit_range: CommitRange,
928            _timeout: Duration,
929        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
930            unimplemented!("Unimplemented")
931        }
932
933        async fn fetch_latest_blocks(
934            &self,
935            _peer: AuthorityIndex,
936            _authorities: Vec<AuthorityIndex>,
937            _timeout: Duration,
938        ) -> ConsensusResult<Vec<Bytes>> {
939            unimplemented!("Unimplemented")
940        }
941
942        async fn get_latest_rounds(
943            &self,
944            _peer: AuthorityIndex,
945            _timeout: Duration,
946        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
947            unimplemented!("Unimplemented")
948        }
949    }
950
951    #[tokio::test(flavor = "current_thread", start_paused = true)]
952    async fn commit_syncer_start_and_pause_scheduling() {
953        // SETUP
954        let (context, _) = Context::new_for_test(4);
955        // Use smaller batches and fetch limits for testing.
956        let context = Context {
957            own_index: AuthorityIndex::new_for_test(3),
958            parameters: Parameters {
959                commit_sync_batch_size: 5,
960                commit_sync_batches_ahead: 5,
961                commit_sync_parallel_fetches: 5,
962                max_blocks_per_fetch: 5,
963                ..context.parameters
964            },
965            ..context
966        };
967        let context = Arc::new(context);
968        let block_verifier = Arc::new(NoopBlockVerifier {});
969        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
970        let network_client = Arc::new(FakeNetworkClient::default());
971        let store = Arc::new(MemStore::new());
972        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
973        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
974        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0));
975        let mut commit_syncer = CommitSyncer::new(
976            context,
977            core_thread_dispatcher,
978            commit_vote_monitor.clone(),
979            commit_consumer_monitor.clone(),
980            network_client,
981            block_verifier,
982            dag_state,
983        );
984
985        // Check initial state.
986        assert!(commit_syncer.pending_fetches().is_empty());
987        assert!(commit_syncer.fetched_ranges().is_empty());
988        assert!(commit_syncer.highest_scheduled_index().is_none());
989        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
990        assert_eq!(commit_syncer.synced_commit_index(), 0);
991
992        // Observe round 15 blocks voting for commit 10 from authorities 0 to 2 in
993        // CommitVoteMonitor
994        for i in 0..3 {
995            let test_block = TestBlock::new(15, i)
996                .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
997                .build();
998            let block = VerifiedBlock::new_for_test(test_block);
999            commit_vote_monitor.observe_block(&block);
1000        }
1001
1002        // Fetches should be scheduled after seeing progress of other validators.
1003        commit_syncer.try_schedule_once();
1004
1005        // Verify state.
1006        assert_eq!(commit_syncer.pending_fetches().len(), 2);
1007        assert!(commit_syncer.fetched_ranges().is_empty());
1008        assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
1009        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1010        assert_eq!(commit_syncer.synced_commit_index(), 0);
1011
1012        // Observe round 40 blocks voting for commit 35 from authorities 0 to 2 in
1013        // CommitVoteMonitor
1014        for i in 0..3 {
1015            let test_block = TestBlock::new(40, i)
1016                .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
1017                .build();
1018            let block = VerifiedBlock::new_for_test(test_block);
1019            commit_vote_monitor.observe_block(&block);
1020        }
1021
1022        // Fetches should be scheduled until the unhandled commits threshold.
1023        commit_syncer.try_schedule_once();
1024
1025        // Verify commit syncer is paused after scheduling 15 commits to index 25.
1026        assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
1027        assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
1028        let pending_fetches = commit_syncer.pending_fetches();
1029        assert_eq!(pending_fetches.len(), 5);
1030
1031        // Indicate commit index 25 is consumed, and try to schedule again.
1032        commit_consumer_monitor.set_highest_handled_commit(25);
1033        commit_syncer.try_schedule_once();
1034
1035        // Verify commit syncer schedules fetches up to index 35.
1036        assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1037        let pending_fetches = commit_syncer.pending_fetches();
1038        assert_eq!(pending_fetches.len(), 7);
1039
1040        // Verify contiguous ranges are scheduled.
1041        for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1042            assert_eq!(range.start(), start);
1043            assert_eq!(range.end(), start + 4);
1044        }
1045    }
1046}