consensus_core/
commit_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CommitSyncer implements efficient synchronization of committed data.
6//!
7//! During the operation of a committee of authorities for consensus, one or
8//! more authorities can fall behind the quorum in their received and accepted
9//! blocks. This can happen due to network disruptions, host crash, or other
10//! reasons. Authorities fell behind need to catch up to the quorum to be able
11//! to vote on the latest leaders. So efficient synchronization is necessary
12//! to minimize the impact of temporary disruptions and maintain smooth
13//! operations of the network.  
14//! CommitSyncer achieves efficient synchronization by relying on the following:
15//! when blocks are included in commits with >= 2f+1 certifiers by stake, these
16//! blocks must have passed verifications on some honest validators, so
17//! re-verifying them is unnecessary. In fact, the quorum certified commits
18//! themselves can be trusted to be sent to IOTA directly, but for simplicity
19//! this is not done. Blocks from trusted commits still go through Core and
20//! committer.
21//!
22//! Another way CommitSyncer improves the efficiency of synchronization is
23//! parallel fetching: commits have a simple dependency graph (linear), so it is
24//! easy to fetch ranges of commits in parallel.
25//!
26//! Commit synchronization is an expensive operation, involving transferring
27//! large amount of data via the network. And it is not on the critical path of
28//! block processing. So the heuristics for synchronization, including triggers
29//! and retries, should be chosen to favor throughput and efficient resource
30//! usage, over faster reactions.
31
32use std::{
33    collections::{BTreeMap, BTreeSet},
34    sync::Arc,
35    time::Duration,
36};
37
38use bytes::Bytes;
39use consensus_config::AuthorityIndex;
40use futures::{StreamExt as _, stream::FuturesOrdered};
41use iota_metrics::spawn_logged_monitored_task;
42use itertools::Itertools as _;
43use parking_lot::RwLock;
44use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
45use tokio::{
46    runtime::Handle,
47    sync::oneshot,
48    task::{JoinHandle, JoinSet},
49    time::{MissedTickBehavior, sleep},
50};
51use tracing::{debug, info, warn};
52
53use crate::{
54    CommitConsumerMonitor, CommitIndex,
55    block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
56    block_verifier::BlockVerifier,
57    commit::{
58        CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
59        CommitRef, TrustedCommit,
60    },
61    commit_vote_monitor::CommitVoteMonitor,
62    context::Context,
63    core_thread::CoreThreadDispatcher,
64    dag_state::DagState,
65    error::{ConsensusError, ConsensusResult},
66    network::NetworkClient,
67    stake_aggregator::{QuorumThreshold, StakeAggregator},
68};
69
70// Handle to stop the CommitSyncer loop.
71pub(crate) struct CommitSyncerHandle {
72    schedule_task: JoinHandle<()>,
73    tx_shutdown: oneshot::Sender<()>,
74}
75
76impl CommitSyncerHandle {
77    pub(crate) async fn stop(self) {
78        let _ = self.tx_shutdown.send(());
79        // Do not abort schedule task, which waits for fetches to shut down.
80        if let Err(e) = self.schedule_task.await {
81            if e.is_panic() {
82                std::panic::resume_unwind(e.into_panic());
83            }
84        }
85    }
86}
87
88pub(crate) struct CommitSyncer<C: NetworkClient> {
89    // States shared by scheduler and fetch tasks.
90
91    // Shared components wrapper.
92    inner: Arc<Inner<C>>,
93
94    // States only used by the scheduler.
95
96    // Inflight requests to fetch commits from different authorities.
97    inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
98    // Additional ranges of commits to fetch.
99    pending_fetches: BTreeSet<CommitRange>,
100    // Fetched commits and blocks by commit range.
101    fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
102    // Highest commit index among inflight and pending fetches.
103    // Used to determine the start of new ranges to be fetched.
104    highest_scheduled_index: Option<CommitIndex>,
105    // Highest index among fetched commits, after commits and blocks are verified.
106    // Used for metrics.
107    highest_fetched_commit_index: CommitIndex,
108    // The commit index that is the max of highest local commit index and commit index inflight to
109    // Core. Used to determine if fetched blocks can be sent to Core without gaps.
110    synced_commit_index: CommitIndex,
111}
112
113impl<C: NetworkClient> CommitSyncer<C> {
114    pub(crate) fn new(
115        context: Arc<Context>,
116        core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
117        commit_vote_monitor: Arc<CommitVoteMonitor>,
118        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
119        network_client: Arc<C>,
120        block_verifier: Arc<dyn BlockVerifier>,
121        dag_state: Arc<RwLock<DagState>>,
122    ) -> Self {
123        let inner = Arc::new(Inner {
124            context,
125            core_thread_dispatcher,
126            commit_vote_monitor,
127            commit_consumer_monitor,
128            network_client,
129            block_verifier,
130            dag_state,
131        });
132        let synced_commit_index = inner.dag_state.read().last_commit_index();
133        CommitSyncer {
134            inner,
135            inflight_fetches: JoinSet::new(),
136            pending_fetches: BTreeSet::new(),
137            fetched_ranges: BTreeMap::new(),
138            highest_scheduled_index: None,
139            highest_fetched_commit_index: 0,
140            synced_commit_index,
141        }
142    }
143
144    pub(crate) fn start(self) -> CommitSyncerHandle {
145        let (tx_shutdown, rx_shutdown) = oneshot::channel();
146        let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
147        CommitSyncerHandle {
148            schedule_task,
149            tx_shutdown,
150        }
151    }
152
153    async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
154        let mut interval = tokio::time::interval(Duration::from_secs(2));
155        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
156
157        loop {
158            tokio::select! {
159                // Periodically, schedule new fetches if the node is falling behind.
160                _ = interval.tick() => {
161                    self.try_schedule_once();
162                }
163                // Handles results from fetch tasks.
164                Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
165                    if let Err(e) = result {
166                        if e.is_panic() {
167                            std::panic::resume_unwind(e.into_panic());
168                        }
169                        warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
170                        // If any fetch is cancelled or panicked, try to shutdown and exit the loop.
171                        self.inflight_fetches.shutdown().await;
172                        return;
173                    }
174                    let (target_end, commits) = result.unwrap();
175                    self.handle_fetch_result(target_end, commits).await;
176                }
177                _ = &mut rx_shutdown => {
178                    // Shutdown requested.
179                    info!("CommitSyncer shutting down ...");
180                    self.inflight_fetches.shutdown().await;
181                    return;
182                }
183            }
184
185            self.try_start_fetches();
186        }
187    }
188
189    fn try_schedule_once(&mut self) {
190        let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
191        let local_commit_index = self.inner.dag_state.read().last_commit_index();
192        let metrics = &self.inner.context.metrics.node_metrics;
193        metrics
194            .commit_sync_quorum_index
195            .set(quorum_commit_index as i64);
196        metrics
197            .commit_sync_local_index
198            .set(local_commit_index as i64);
199        let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
200        let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
201        // Update synced_commit_index periodically to make sure it is no smaller than
202        // local commit index.
203        self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
204        let unhandled_commits_threshold = self.unhandled_commits_threshold();
205        info!(
206            "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
207            self.synced_commit_index,
208            highest_handled_index,
209            highest_scheduled_index,
210            quorum_commit_index,
211            unhandled_commits_threshold,
212        );
213
214        // TODO: cleanup inflight fetches that are no longer needed.
215        let fetch_after_index = self
216            .synced_commit_index
217            .max(self.highest_scheduled_index.unwrap_or(0));
218        // When the node is falling behind, schedule pending fetches which will be
219        // executed on later.
220        for prev_end in (fetch_after_index..=quorum_commit_index)
221            .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
222        {
223            // Create range with inclusive start and end.
224            let range_start = prev_end + 1;
225            let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
226            // Commit range is not fetched when [range_start, range_end] contains less
227            // number of commits than the target batch size. This is to avoid
228            // the cost of processing more and smaller batches. Block broadcast,
229            // subscription and synchronization will help the node catchup.
230            if quorum_commit_index < range_end {
231                break;
232            }
233            // Pause scheduling new fetches when handling of commits is lagging.
234            if highest_handled_index + unhandled_commits_threshold < range_end {
235                warn!(
236                    "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
237                    highest_handled_index, highest_scheduled_index
238                );
239                break;
240            }
241            self.pending_fetches
242                .insert((range_start..=range_end).into());
243            // quorum_commit_index should be non-decreasing, so highest_scheduled_index
244            // should not decrease either.
245            self.highest_scheduled_index = Some(range_end);
246        }
247    }
248
249    async fn handle_fetch_result(
250        &mut self,
251        target_end: CommitIndex,
252        certified_commits: CertifiedCommits,
253    ) {
254        assert!(!certified_commits.commits().is_empty());
255
256        let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
257            .commits()
258            .iter()
259            .fold((0, 0), |(blocks, bytes), c| {
260                (
261                    blocks + c.blocks().len(),
262                    bytes
263                        + c.blocks()
264                            .iter()
265                            .map(|b| b.serialized().len())
266                            .sum::<usize>() as u64,
267                )
268            });
269
270        let metrics = &self.inner.context.metrics.node_metrics;
271        metrics
272            .commit_sync_fetched_commits
273            .inc_by(certified_commits.commits().len() as u64);
274        metrics
275            .commit_sync_fetched_blocks
276            .inc_by(total_blocks_fetched as u64);
277        metrics
278            .commit_sync_total_fetched_blocks_size
279            .inc_by(total_blocks_size_bytes);
280
281        let (commit_start, commit_end) = (
282            certified_commits.commits().first().unwrap().index(),
283            certified_commits.commits().last().unwrap().index(),
284        );
285        self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
286        metrics
287            .commit_sync_highest_fetched_index
288            .set(self.highest_fetched_commit_index as i64);
289
290        // Allow returning partial results, and try fetching the rest separately.
291        if commit_end < target_end {
292            self.pending_fetches
293                .insert((commit_end + 1..=target_end).into());
294        }
295        // Make sure synced_commit_index is up to date.
296        self.synced_commit_index = self
297            .synced_commit_index
298            .max(self.inner.dag_state.read().last_commit_index());
299        // Only add new blocks if at least some of them are not already synced.
300        if self.synced_commit_index < commit_end {
301            self.fetched_ranges
302                .insert((commit_start..=commit_end).into(), certified_commits);
303        }
304        // Try to process as many fetched blocks as possible.
305        while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
306            // Only pop fetched_ranges if there is no gap with blocks already synced.
307            // Note: start, end and synced_commit_index are all inclusive.
308            let (fetched_commit_range, commits) =
309                if fetched_commit_range.start() <= self.synced_commit_index + 1 {
310                    self.fetched_ranges.pop_first().unwrap()
311                } else {
312                    // Found gap between earliest fetched block and latest synced block,
313                    // so not sending additional blocks to Core.
314                    metrics.commit_sync_gap_on_processing.inc();
315                    break;
316                };
317            // Avoid sending to Core a whole batch of already synced blocks.
318            if fetched_commit_range.end() <= self.synced_commit_index {
319                continue;
320            }
321
322            debug!(
323                "Fetched certified blocks for commit range {:?}: {}",
324                fetched_commit_range,
325                commits
326                    .commits()
327                    .iter()
328                    .flat_map(|c| c.blocks())
329                    .map(|b| b.reference().to_string())
330                    .join(","),
331            );
332
333            // If core thread cannot handle the incoming blocks, it is ok to block here.
334            // Also it is possible to have missing ancestors because an equivocating
335            // validator may produce blocks that are not included in commits but
336            // are ancestors to other blocks. Synchronizer is needed to fill in
337            // the missing ancestors in this case.
338            match self
339                .inner
340                .core_thread_dispatcher
341                .add_certified_commits(commits)
342                .await
343            {
344                Ok(missing) => {
345                    if !missing.is_empty() {
346                        warn!(
347                            "Fetched blocks have missing ancestors: {:?} for commit range {:?}",
348                            missing, fetched_commit_range
349                        );
350                    }
351                    for block_ref in missing {
352                        let hostname = &self
353                            .inner
354                            .context
355                            .committee
356                            .authority(block_ref.author)
357                            .hostname;
358                        metrics
359                            .commit_sync_fetch_missing_blocks
360                            .with_label_values(&[hostname])
361                            .inc();
362                    }
363                }
364                Err(e) => {
365                    info!("Failed to add blocks, shutting down: {}", e);
366                    return;
367                }
368            };
369
370            // Once commits and blocks are sent to Core, ratchet up synced_commit_index
371            self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
372        }
373
374        metrics
375            .commit_sync_inflight_fetches
376            .set(self.inflight_fetches.len() as i64);
377        metrics
378            .commit_sync_pending_fetches
379            .set(self.pending_fetches.len() as i64);
380        metrics
381            .commit_sync_highest_synced_index
382            .set(self.synced_commit_index as i64);
383    }
384
385    fn try_start_fetches(&mut self) {
386        // Cap parallel fetches based on configured limit and committee size, to avoid
387        // overloading the network. Also when there are too many fetched blocks
388        // that cannot be sent to Core before an earlier fetch has not finished,
389        // reduce parallelism so the earlier fetch can retry on a better host and
390        // succeed.
391        let target_parallel_fetches = self
392            .inner
393            .context
394            .parameters
395            .commit_sync_parallel_fetches
396            .min(self.inner.context.committee.size() * 2 / 3)
397            .min(
398                self.inner
399                    .context
400                    .parameters
401                    .commit_sync_batches_ahead
402                    .saturating_sub(self.fetched_ranges.len()),
403            )
404            .max(1);
405        // Start new fetches if there are pending batches and available slots.
406        loop {
407            if self.inflight_fetches.len() >= target_parallel_fetches {
408                break;
409            }
410            let Some(commit_range) = self.pending_fetches.pop_first() else {
411                break;
412            };
413            self.inflight_fetches
414                .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
415        }
416
417        let metrics = &self.inner.context.metrics.node_metrics;
418        metrics
419            .commit_sync_inflight_fetches
420            .set(self.inflight_fetches.len() as i64);
421        metrics
422            .commit_sync_pending_fetches
423            .set(self.pending_fetches.len() as i64);
424        metrics
425            .commit_sync_highest_synced_index
426            .set(self.synced_commit_index as i64);
427    }
428
429    // Retries fetching commits and blocks from available authorities, until a
430    // request succeeds where at least a prefix of the commit range is fetched.
431    // Returns the fetched commits and blocks referenced by the commits.
432    async fn fetch_loop(
433        inner: Arc<Inner<C>>,
434        commit_range: CommitRange,
435    ) -> (CommitIndex, CertifiedCommits) {
436        // Individual request base timeout.
437        const TIMEOUT: Duration = Duration::from_secs(10);
438        // Max per-request timeout will be base timeout times a multiplier.
439        // At the extreme, this means there will be 120s timeout to fetch
440        // max_blocks_per_fetch blocks.
441        const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
442        // timeout * max number of targets should be reasonably small, so the
443        // system can adjust to slow network or large data sizes quickly.
444        const MAX_NUM_TARGETS: usize = 24;
445        let mut timeout_multiplier = 0;
446
447        let _timer = inner
448            .context
449            .metrics
450            .node_metrics
451            .commit_sync_fetch_loop_latency
452            .start_timer();
453        info!("Starting to fetch commits in {commit_range:?} ...",);
454        loop {
455            // Attempt to fetch commits and blocks through min(committee size,
456            // MAX_NUM_TARGETS) peers.
457            let mut target_authorities = inner
458                .context
459                .committee
460                .authorities()
461                .filter_map(|(i, _)| {
462                    if i != inner.context.own_index {
463                        Some(i)
464                    } else {
465                        None
466                    }
467                })
468                .collect_vec();
469            target_authorities.shuffle(&mut ThreadRng::default());
470            target_authorities.truncate(MAX_NUM_TARGETS);
471            // Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
472            timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
473            let request_timeout = TIMEOUT * timeout_multiplier;
474            // Give enough overall timeout for fetching commits and blocks.
475            // - Timeout for fetching commits and commit certifying blocks.
476            // - Timeout for fetching blocks referenced by the commits.
477            // - Time spent on pipelining requests to fetch blocks.
478            // - Another headroom to allow fetch_once() to timeout gracefully if possible.
479            let fetch_timeout = request_timeout * 4;
480            // Try fetching from selected target authority.
481            for authority in target_authorities {
482                match tokio::time::timeout(
483                    fetch_timeout,
484                    Self::fetch_once(
485                        inner.clone(),
486                        authority,
487                        commit_range.clone(),
488                        request_timeout,
489                    ),
490                )
491                .await
492                {
493                    Ok(Ok(commits)) => {
494                        info!("Finished fetching commits in {commit_range:?}",);
495                        return (commit_range.end(), commits);
496                    }
497                    Ok(Err(e)) => {
498                        let hostname = inner
499                            .context
500                            .committee
501                            .authority(authority)
502                            .hostname
503                            .clone();
504                        warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
505                        let error: &'static str = e.into();
506                        inner
507                            .context
508                            .metrics
509                            .node_metrics
510                            .commit_sync_fetch_once_errors
511                            .with_label_values(&[hostname.as_str(), error])
512                            .inc();
513                    }
514                    Err(_) => {
515                        let hostname = inner
516                            .context
517                            .committee
518                            .authority(authority)
519                            .hostname
520                            .clone();
521                        warn!("Timed out fetching {commit_range:?} from {authority}",);
522                        inner
523                            .context
524                            .metrics
525                            .node_metrics
526                            .commit_sync_fetch_once_errors
527                            .with_label_values(&[hostname.as_str(), "FetchTimeout"])
528                            .inc();
529                    }
530                }
531            }
532            // Avoid busy looping, by waiting for a while before retrying.
533            sleep(TIMEOUT).await;
534        }
535    }
536
537    // Fetches commits and blocks from a single authority. At a high level, first
538    // the commits are fetched and verified. After that, blocks referenced in
539    // the certified commits are fetched and sent to Core for processing.
540    async fn fetch_once(
541        inner: Arc<Inner<C>>,
542        target_authority: AuthorityIndex,
543        commit_range: CommitRange,
544        timeout: Duration,
545    ) -> ConsensusResult<CertifiedCommits> {
546        let _timer = inner
547            .context
548            .metrics
549            .node_metrics
550            .commit_sync_fetch_once_latency
551            .start_timer();
552
553        // 1. Fetch commits in the commit range from the target authority.
554        let (serialized_commits, serialized_blocks) = inner
555            .network_client
556            .fetch_commits(target_authority, commit_range.clone(), timeout)
557            .await?;
558
559        // 2. Verify the response contains blocks that can certify the last returned
560        //    commit,
561        // and the returned commits are chained by digest, so earlier commits are
562        // certified as well.
563        let (commits, vote_blocks) = Handle::current()
564            .spawn_blocking({
565                let inner = inner.clone();
566                move || {
567                    inner.verify_commits(
568                        target_authority,
569                        commit_range,
570                        serialized_commits,
571                        serialized_blocks,
572                    )
573                }
574            })
575            .await
576            .expect("Spawn blocking should not fail")?;
577
578        // 3. Fetch blocks referenced by the commits, from the same authority.
579        let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
580        let num_chunks = block_refs
581            .len()
582            .div_ceil(inner.context.parameters.max_blocks_per_fetch)
583            as u32;
584        let mut requests: FuturesOrdered<_> = block_refs
585            .chunks(inner.context.parameters.max_blocks_per_fetch)
586            .enumerate()
587            .map(|(i, request_block_refs)| {
588                let inner = inner.clone();
589                async move {
590                    // 4. Send out pipelined fetch requests to avoid overloading the target
591                    //    authority.
592                    sleep(timeout * i as u32 / num_chunks).await;
593                    // TODO: add some retries.
594                    let serialized_blocks = inner
595                        .network_client
596                        .fetch_blocks(
597                            target_authority,
598                            request_block_refs.to_vec(),
599                            vec![],
600                            timeout,
601                        )
602                        .await?;
603                    // 5. Verify the same number of blocks are returned as requested.
604                    if request_block_refs.len() != serialized_blocks.len() {
605                        return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
606                            authority: target_authority,
607                            requested: request_block_refs.len(),
608                            received: serialized_blocks.len(),
609                        });
610                    }
611                    // 6. Verify returned blocks have valid formats.
612                    let signed_blocks = serialized_blocks
613                        .iter()
614                        .map(|serialized| {
615                            let block: SignedBlock = bcs::from_bytes(serialized)
616                                .map_err(ConsensusError::MalformedBlock)?;
617                            Ok(block)
618                        })
619                        .collect::<ConsensusResult<Vec<_>>>()?;
620                    // 7. Verify the returned blocks match the requested block refs.
621                    // If they do match, the returned blocks can be considered verified as well.
622                    let mut blocks = Vec::new();
623                    for ((requested_block_ref, signed_block), serialized) in request_block_refs
624                        .iter()
625                        .zip(signed_blocks.into_iter())
626                        .zip(serialized_blocks.into_iter())
627                    {
628                        let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
629                        let received_block_ref = BlockRef::new(
630                            signed_block.round(),
631                            signed_block.author(),
632                            signed_block_digest,
633                        );
634                        if *requested_block_ref != received_block_ref {
635                            return Err(ConsensusError::UnexpectedBlockForCommit {
636                                peer: target_authority,
637                                requested: *requested_block_ref,
638                                received: received_block_ref,
639                            });
640                        }
641                        blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
642                    }
643                    Ok(blocks)
644                }
645            })
646            .collect();
647
648        let mut fetched_blocks = BTreeMap::new();
649        while let Some(result) = requests.next().await {
650            for block in result? {
651                fetched_blocks.insert(block.reference(), block);
652            }
653        }
654
655        // 8. Make sure fetched block (and votes) timestamps are lower than current
656        //    time.
657        for block in fetched_blocks.values().chain(vote_blocks.iter()) {
658            let now_ms = inner.context.clock.timestamp_utc_ms();
659            let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
660            if forward_drift == 0 {
661                continue;
662            };
663            let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
664            inner
665                .context
666                .metrics
667                .node_metrics
668                .block_timestamp_drift_wait_ms
669                .with_label_values(&[peer_hostname.as_str(), "commit_syncer"])
670                .inc_by(forward_drift);
671            let forward_drift = Duration::from_millis(forward_drift);
672            if forward_drift >= inner.context.parameters.max_forward_time_drift {
673                warn!(
674                    "Local clock is behind a quorum of peers: local ts {}, certified block ts {}",
675                    now_ms,
676                    block.timestamp_ms()
677                );
678            }
679            sleep(forward_drift).await;
680        }
681
682        // 9. Now create the Certified commits by assigning the blocks to each commit
683        //    and retaining the commit votes history.
684        let mut certified_commits = Vec::new();
685        for commit in &commits {
686            let blocks = commit
687                .blocks()
688                .iter()
689                .map(|block_ref| {
690                    fetched_blocks
691                        .remove(block_ref)
692                        .expect("Block should exist")
693                })
694                .collect::<Vec<_>>();
695            certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
696        }
697
698        Ok(CertifiedCommits::new(certified_commits, vote_blocks))
699    }
700
701    fn unhandled_commits_threshold(&self) -> CommitIndex {
702        self.inner.context.parameters.commit_sync_batch_size
703            * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
704    }
705
706    #[cfg(test)]
707    fn pending_fetches(&self) -> BTreeSet<CommitRange> {
708        self.pending_fetches.clone()
709    }
710
711    #[cfg(test)]
712    fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
713        self.fetched_ranges.clone()
714    }
715
716    #[cfg(test)]
717    fn highest_scheduled_index(&self) -> Option<CommitIndex> {
718        self.highest_scheduled_index
719    }
720
721    #[cfg(test)]
722    fn highest_fetched_commit_index(&self) -> CommitIndex {
723        self.highest_fetched_commit_index
724    }
725
726    #[cfg(test)]
727    fn synced_commit_index(&self) -> CommitIndex {
728        self.synced_commit_index
729    }
730}
731
732struct Inner<C: NetworkClient> {
733    context: Arc<Context>,
734    core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
735    commit_vote_monitor: Arc<CommitVoteMonitor>,
736    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
737    network_client: Arc<C>,
738    block_verifier: Arc<dyn BlockVerifier>,
739    dag_state: Arc<RwLock<DagState>>,
740}
741
742impl<C: NetworkClient> Inner<C> {
743    /// Verifies the commits and also certifies them using the provided vote
744    /// blocks for the last commit. The method returns the trusted commits
745    /// and the votes as verified blocks.
746    fn verify_commits(
747        &self,
748        peer: AuthorityIndex,
749        commit_range: CommitRange,
750        serialized_commits: Vec<Bytes>,
751        serialized_vote_blocks: Vec<Bytes>,
752    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
753        // Parse and verify commits.
754        let mut commits = Vec::new();
755        for serialized in &serialized_commits {
756            let commit: Commit =
757                bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
758            let digest = TrustedCommit::compute_digest(serialized);
759            if commits.is_empty() {
760                // start is inclusive, so first commit must be at the start index.
761                if commit.index() != commit_range.start() {
762                    return Err(ConsensusError::UnexpectedStartCommit {
763                        peer,
764                        start: commit_range.start(),
765                        commit: Box::new(commit),
766                    });
767                }
768            } else {
769                // Verify next commit increments index and references the previous digest.
770                let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
771                    commits.last().unwrap();
772                if commit.index() != last_commit.index() + 1
773                    || &commit.previous_digest() != last_commit_digest
774                {
775                    return Err(ConsensusError::UnexpectedCommitSequence {
776                        peer,
777                        prev_commit: Box::new(last_commit.clone()),
778                        curr_commit: Box::new(commit),
779                    });
780                }
781            }
782            // Do not process more commits past the end index.
783            if commit.index() > commit_range.end() {
784                break;
785            }
786            commits.push((digest, commit));
787        }
788        let Some((end_commit_digest, end_commit)) = commits.last() else {
789            return Err(ConsensusError::NoCommitReceived { peer });
790        };
791
792        // Parse and verify blocks. Then accumulate votes on the end commit.
793        let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
794        let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
795        let mut vote_blocks = Vec::new();
796        for serialized in serialized_vote_blocks {
797            let block: SignedBlock =
798                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
799            // The block signature needs to be verified.
800            self.block_verifier.verify(&block)?;
801            for vote in block.commit_votes() {
802                if *vote == end_commit_ref {
803                    stake_aggregator.add(block.author(), &self.context.committee);
804                }
805            }
806            vote_blocks.push(VerifiedBlock::new_verified(block, serialized));
807        }
808
809        // Check if the end commit has enough votes.
810        if !stake_aggregator.reached_threshold(&self.context.committee) {
811            return Err(ConsensusError::NotEnoughCommitVotes {
812                stake: stake_aggregator.stake(),
813                peer,
814                commit: Box::new(end_commit.clone()),
815            });
816        }
817
818        let trusted_commits = commits
819            .into_iter()
820            .zip(serialized_commits)
821            .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
822            .collect();
823        Ok((trusted_commits, vote_blocks))
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use std::{sync::Arc, time::Duration};
830
831    use bytes::Bytes;
832    use consensus_config::{AuthorityIndex, Parameters};
833    use parking_lot::RwLock;
834
835    use crate::{
836        CommitConsumerMonitor, CommitDigest, CommitRef, Round,
837        block::{BlockRef, TestBlock, VerifiedBlock},
838        block_verifier::NoopBlockVerifier,
839        commit::CommitRange,
840        commit_syncer::CommitSyncer,
841        commit_vote_monitor::CommitVoteMonitor,
842        context::Context,
843        core_thread::tests::MockCoreThreadDispatcher,
844        dag_state::DagState,
845        error::ConsensusResult,
846        network::{BlockStream, NetworkClient},
847        storage::mem_store::MemStore,
848    };
849
850    #[derive(Default)]
851    struct FakeNetworkClient {}
852
853    #[async_trait::async_trait]
854    impl NetworkClient for FakeNetworkClient {
855        const SUPPORT_STREAMING: bool = true;
856
857        async fn send_block(
858            &self,
859            _peer: AuthorityIndex,
860            _serialized_block: &VerifiedBlock,
861            _timeout: Duration,
862        ) -> ConsensusResult<()> {
863            unimplemented!("Unimplemented")
864        }
865
866        async fn subscribe_blocks(
867            &self,
868            _peer: AuthorityIndex,
869            _last_received: Round,
870            _timeout: Duration,
871        ) -> ConsensusResult<BlockStream> {
872            unimplemented!("Unimplemented")
873        }
874
875        async fn fetch_blocks(
876            &self,
877            _peer: AuthorityIndex,
878            _block_refs: Vec<BlockRef>,
879            _highest_accepted_rounds: Vec<Round>,
880            _timeout: Duration,
881        ) -> ConsensusResult<Vec<Bytes>> {
882            unimplemented!("Unimplemented")
883        }
884
885        async fn fetch_commits(
886            &self,
887            _peer: AuthorityIndex,
888            _commit_range: CommitRange,
889            _timeout: Duration,
890        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
891            unimplemented!("Unimplemented")
892        }
893
894        async fn fetch_latest_blocks(
895            &self,
896            _peer: AuthorityIndex,
897            _authorities: Vec<AuthorityIndex>,
898            _timeout: Duration,
899        ) -> ConsensusResult<Vec<Bytes>> {
900            unimplemented!("Unimplemented")
901        }
902
903        async fn get_latest_rounds(
904            &self,
905            _peer: AuthorityIndex,
906            _timeout: Duration,
907        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
908            unimplemented!("Unimplemented")
909        }
910    }
911
912    #[tokio::test(flavor = "current_thread", start_paused = true)]
913    async fn commit_syncer_start_and_pause_scheduling() {
914        // SETUP
915        let (context, _) = Context::new_for_test(4);
916        // Use smaller batches and fetch limits for testing.
917        let context = Context {
918            own_index: AuthorityIndex::new_for_test(3),
919            parameters: Parameters {
920                commit_sync_batch_size: 5,
921                commit_sync_batches_ahead: 5,
922                commit_sync_parallel_fetches: 5,
923                max_blocks_per_fetch: 5,
924                ..context.parameters
925            },
926            ..context
927        };
928        let context = Arc::new(context);
929        let block_verifier = Arc::new(NoopBlockVerifier {});
930        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
931        let network_client = Arc::new(FakeNetworkClient::default());
932        let store = Arc::new(MemStore::new());
933        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
934        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
935        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0));
936        let mut commit_syncer = CommitSyncer::new(
937            context,
938            core_thread_dispatcher,
939            commit_vote_monitor.clone(),
940            commit_consumer_monitor.clone(),
941            network_client,
942            block_verifier,
943            dag_state,
944        );
945
946        // Check initial state.
947        assert!(commit_syncer.pending_fetches().is_empty());
948        assert!(commit_syncer.fetched_ranges().is_empty());
949        assert!(commit_syncer.highest_scheduled_index().is_none());
950        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
951        assert_eq!(commit_syncer.synced_commit_index(), 0);
952
953        // Observe round 15 blocks voting for commit 10 from authorities 0 to 2 in
954        // CommitVoteMonitor
955        for i in 0..3 {
956            let test_block = TestBlock::new(15, i)
957                .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
958                .build();
959            let block = VerifiedBlock::new_for_test(test_block);
960            commit_vote_monitor.observe_block(&block);
961        }
962
963        // Fetches should be scheduled after seeing progress of other validators.
964        commit_syncer.try_schedule_once();
965
966        // Verify state.
967        assert_eq!(commit_syncer.pending_fetches().len(), 2);
968        assert!(commit_syncer.fetched_ranges().is_empty());
969        assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
970        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
971        assert_eq!(commit_syncer.synced_commit_index(), 0);
972
973        // Observe round 40 blocks voting for commit 35 from authorities 0 to 2 in
974        // CommitVoteMonitor
975        for i in 0..3 {
976            let test_block = TestBlock::new(40, i)
977                .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
978                .build();
979            let block = VerifiedBlock::new_for_test(test_block);
980            commit_vote_monitor.observe_block(&block);
981        }
982
983        // Fetches should be scheduled until the unhandled commits threshold.
984        commit_syncer.try_schedule_once();
985
986        // Verify commit syncer is paused after scheduling 15 commits to index 25.
987        assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
988        assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
989        let pending_fetches = commit_syncer.pending_fetches();
990        assert_eq!(pending_fetches.len(), 5);
991
992        // Indicate commit index 25 is consumed, and try to schedule again.
993        commit_consumer_monitor.set_highest_handled_commit(25);
994        commit_syncer.try_schedule_once();
995
996        // Verify commit syncer schedules fetches up to index 35.
997        assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
998        let pending_fetches = commit_syncer.pending_fetches();
999        assert_eq!(pending_fetches.len(), 7);
1000
1001        // Verify contiguous ranges are scheduled.
1002        for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1003            assert_eq!(range.start(), start);
1004            assert_eq!(range.end(), start + 4);
1005        }
1006    }
1007}