1use std::{
33 collections::{BTreeMap, BTreeSet},
34 sync::Arc,
35 time::Duration,
36};
37
38use bytes::Bytes;
39use consensus_config::AuthorityIndex;
40use futures::{StreamExt as _, stream::FuturesOrdered};
41use iota_metrics::spawn_logged_monitored_task;
42use itertools::Itertools as _;
43use parking_lot::RwLock;
44use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
45use tokio::{
46 runtime::Handle,
47 sync::oneshot,
48 task::{JoinHandle, JoinSet},
49 time::{MissedTickBehavior, sleep},
50};
51use tracing::{debug, info, warn};
52
53use crate::{
54 CommitConsumerMonitor, CommitIndex,
55 block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
56 block_verifier::BlockVerifier,
57 commit::{
58 CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
59 CommitRef, TrustedCommit,
60 },
61 commit_vote_monitor::CommitVoteMonitor,
62 context::Context,
63 core_thread::CoreThreadDispatcher,
64 dag_state::DagState,
65 error::{ConsensusError, ConsensusResult},
66 network::NetworkClient,
67 stake_aggregator::{QuorumThreshold, StakeAggregator},
68};
69
70pub(crate) struct CommitSyncerHandle {
72 schedule_task: JoinHandle<()>,
73 tx_shutdown: oneshot::Sender<()>,
74}
75
76impl CommitSyncerHandle {
77 pub(crate) async fn stop(self) {
78 let _ = self.tx_shutdown.send(());
79 if let Err(e) = self.schedule_task.await {
81 if e.is_panic() {
82 std::panic::resume_unwind(e.into_panic());
83 }
84 }
85 }
86}
87
88pub(crate) struct CommitSyncer<C: NetworkClient> {
89 inner: Arc<Inner<C>>,
93
94 inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
98 pending_fetches: BTreeSet<CommitRange>,
100 fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
102 highest_scheduled_index: Option<CommitIndex>,
105 highest_fetched_commit_index: CommitIndex,
108 synced_commit_index: CommitIndex,
111}
112
113impl<C: NetworkClient> CommitSyncer<C> {
114 pub(crate) fn new(
115 context: Arc<Context>,
116 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
117 commit_vote_monitor: Arc<CommitVoteMonitor>,
118 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
119 network_client: Arc<C>,
120 block_verifier: Arc<dyn BlockVerifier>,
121 dag_state: Arc<RwLock<DagState>>,
122 ) -> Self {
123 let inner = Arc::new(Inner {
124 context,
125 core_thread_dispatcher,
126 commit_vote_monitor,
127 commit_consumer_monitor,
128 network_client,
129 block_verifier,
130 dag_state,
131 });
132 let synced_commit_index = inner.dag_state.read().last_commit_index();
133 CommitSyncer {
134 inner,
135 inflight_fetches: JoinSet::new(),
136 pending_fetches: BTreeSet::new(),
137 fetched_ranges: BTreeMap::new(),
138 highest_scheduled_index: None,
139 highest_fetched_commit_index: 0,
140 synced_commit_index,
141 }
142 }
143
144 pub(crate) fn start(self) -> CommitSyncerHandle {
145 let (tx_shutdown, rx_shutdown) = oneshot::channel();
146 let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
147 CommitSyncerHandle {
148 schedule_task,
149 tx_shutdown,
150 }
151 }
152
153 async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
154 let mut interval = tokio::time::interval(Duration::from_secs(2));
155 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
156
157 loop {
158 tokio::select! {
159 _ = interval.tick() => {
161 self.try_schedule_once();
162 }
163 Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
165 if let Err(e) = result {
166 if e.is_panic() {
167 std::panic::resume_unwind(e.into_panic());
168 }
169 warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
170 self.inflight_fetches.shutdown().await;
172 return;
173 }
174 let (target_end, commits) = result.unwrap();
175 self.handle_fetch_result(target_end, commits).await;
176 }
177 _ = &mut rx_shutdown => {
178 info!("CommitSyncer shutting down ...");
180 self.inflight_fetches.shutdown().await;
181 return;
182 }
183 }
184
185 self.try_start_fetches();
186 }
187 }
188
189 fn try_schedule_once(&mut self) {
190 let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
191 let local_commit_index = self.inner.dag_state.read().last_commit_index();
192 let metrics = &self.inner.context.metrics.node_metrics;
193 metrics
194 .commit_sync_quorum_index
195 .set(quorum_commit_index as i64);
196 metrics
197 .commit_sync_local_index
198 .set(local_commit_index as i64);
199 let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
200 let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
201 self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
204 let unhandled_commits_threshold = self.unhandled_commits_threshold();
205 info!(
206 "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
207 self.synced_commit_index,
208 highest_handled_index,
209 highest_scheduled_index,
210 quorum_commit_index,
211 unhandled_commits_threshold,
212 );
213
214 let fetch_after_index = self
216 .synced_commit_index
217 .max(self.highest_scheduled_index.unwrap_or(0));
218 for prev_end in (fetch_after_index..=quorum_commit_index)
221 .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
222 {
223 let range_start = prev_end + 1;
225 let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
226 if quorum_commit_index < range_end {
231 break;
232 }
233 if highest_handled_index + unhandled_commits_threshold < range_end {
235 warn!(
236 "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
237 highest_handled_index, highest_scheduled_index
238 );
239 break;
240 }
241 self.pending_fetches
242 .insert((range_start..=range_end).into());
243 self.highest_scheduled_index = Some(range_end);
246 }
247 }
248
249 async fn handle_fetch_result(
250 &mut self,
251 target_end: CommitIndex,
252 certified_commits: CertifiedCommits,
253 ) {
254 assert!(!certified_commits.commits().is_empty());
255
256 let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
257 .commits()
258 .iter()
259 .fold((0, 0), |(blocks, bytes), c| {
260 (
261 blocks + c.blocks().len(),
262 bytes
263 + c.blocks()
264 .iter()
265 .map(|b| b.serialized().len())
266 .sum::<usize>() as u64,
267 )
268 });
269
270 let metrics = &self.inner.context.metrics.node_metrics;
271 metrics
272 .commit_sync_fetched_commits
273 .inc_by(certified_commits.commits().len() as u64);
274 metrics
275 .commit_sync_fetched_blocks
276 .inc_by(total_blocks_fetched as u64);
277 metrics
278 .commit_sync_total_fetched_blocks_size
279 .inc_by(total_blocks_size_bytes);
280
281 let (commit_start, commit_end) = (
282 certified_commits.commits().first().unwrap().index(),
283 certified_commits.commits().last().unwrap().index(),
284 );
285 self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
286 metrics
287 .commit_sync_highest_fetched_index
288 .set(self.highest_fetched_commit_index as i64);
289
290 if commit_end < target_end {
292 self.pending_fetches
293 .insert((commit_end + 1..=target_end).into());
294 }
295 self.synced_commit_index = self
297 .synced_commit_index
298 .max(self.inner.dag_state.read().last_commit_index());
299 if self.synced_commit_index < commit_end {
301 self.fetched_ranges
302 .insert((commit_start..=commit_end).into(), certified_commits);
303 }
304 while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
306 let (fetched_commit_range, commits) =
309 if fetched_commit_range.start() <= self.synced_commit_index + 1 {
310 self.fetched_ranges.pop_first().unwrap()
311 } else {
312 metrics.commit_sync_gap_on_processing.inc();
315 break;
316 };
317 if fetched_commit_range.end() <= self.synced_commit_index {
319 continue;
320 }
321
322 debug!(
323 "Fetched certified blocks for commit range {:?}: {}",
324 fetched_commit_range,
325 commits
326 .commits()
327 .iter()
328 .flat_map(|c| c.blocks())
329 .map(|b| b.reference().to_string())
330 .join(","),
331 );
332
333 match self
339 .inner
340 .core_thread_dispatcher
341 .add_certified_commits(commits)
342 .await
343 {
344 Ok(missing) => {
345 if !missing.is_empty() {
346 warn!(
347 "Fetched blocks have missing ancestors: {:?} for commit range {:?}",
348 missing, fetched_commit_range
349 );
350 }
351 for block_ref in missing {
352 let hostname = &self
353 .inner
354 .context
355 .committee
356 .authority(block_ref.author)
357 .hostname;
358 metrics
359 .commit_sync_fetch_missing_blocks
360 .with_label_values(&[hostname])
361 .inc();
362 }
363 }
364 Err(e) => {
365 info!("Failed to add blocks, shutting down: {}", e);
366 return;
367 }
368 };
369
370 self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
372 }
373
374 metrics
375 .commit_sync_inflight_fetches
376 .set(self.inflight_fetches.len() as i64);
377 metrics
378 .commit_sync_pending_fetches
379 .set(self.pending_fetches.len() as i64);
380 metrics
381 .commit_sync_highest_synced_index
382 .set(self.synced_commit_index as i64);
383 }
384
385 fn try_start_fetches(&mut self) {
386 let target_parallel_fetches = self
392 .inner
393 .context
394 .parameters
395 .commit_sync_parallel_fetches
396 .min(self.inner.context.committee.size() * 2 / 3)
397 .min(
398 self.inner
399 .context
400 .parameters
401 .commit_sync_batches_ahead
402 .saturating_sub(self.fetched_ranges.len()),
403 )
404 .max(1);
405 loop {
407 if self.inflight_fetches.len() >= target_parallel_fetches {
408 break;
409 }
410 let Some(commit_range) = self.pending_fetches.pop_first() else {
411 break;
412 };
413 self.inflight_fetches
414 .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
415 }
416
417 let metrics = &self.inner.context.metrics.node_metrics;
418 metrics
419 .commit_sync_inflight_fetches
420 .set(self.inflight_fetches.len() as i64);
421 metrics
422 .commit_sync_pending_fetches
423 .set(self.pending_fetches.len() as i64);
424 metrics
425 .commit_sync_highest_synced_index
426 .set(self.synced_commit_index as i64);
427 }
428
429 async fn fetch_loop(
433 inner: Arc<Inner<C>>,
434 commit_range: CommitRange,
435 ) -> (CommitIndex, CertifiedCommits) {
436 const TIMEOUT: Duration = Duration::from_secs(10);
438 const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
442 const MAX_NUM_TARGETS: usize = 24;
445 let mut timeout_multiplier = 0;
446
447 let _timer = inner
448 .context
449 .metrics
450 .node_metrics
451 .commit_sync_fetch_loop_latency
452 .start_timer();
453 info!("Starting to fetch commits in {commit_range:?} ...",);
454 loop {
455 let mut target_authorities = inner
458 .context
459 .committee
460 .authorities()
461 .filter_map(|(i, _)| {
462 if i != inner.context.own_index {
463 Some(i)
464 } else {
465 None
466 }
467 })
468 .collect_vec();
469 target_authorities.shuffle(&mut ThreadRng::default());
470 target_authorities.truncate(MAX_NUM_TARGETS);
471 timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
473 let request_timeout = TIMEOUT * timeout_multiplier;
474 let fetch_timeout = request_timeout * 4;
480 for authority in target_authorities {
482 match tokio::time::timeout(
483 fetch_timeout,
484 Self::fetch_once(
485 inner.clone(),
486 authority,
487 commit_range.clone(),
488 request_timeout,
489 ),
490 )
491 .await
492 {
493 Ok(Ok(commits)) => {
494 info!("Finished fetching commits in {commit_range:?}",);
495 return (commit_range.end(), commits);
496 }
497 Ok(Err(e)) => {
498 let hostname = inner
499 .context
500 .committee
501 .authority(authority)
502 .hostname
503 .clone();
504 warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
505 let error: &'static str = e.into();
506 inner
507 .context
508 .metrics
509 .node_metrics
510 .commit_sync_fetch_once_errors
511 .with_label_values(&[hostname.as_str(), error])
512 .inc();
513 }
514 Err(_) => {
515 let hostname = inner
516 .context
517 .committee
518 .authority(authority)
519 .hostname
520 .clone();
521 warn!("Timed out fetching {commit_range:?} from {authority}",);
522 inner
523 .context
524 .metrics
525 .node_metrics
526 .commit_sync_fetch_once_errors
527 .with_label_values(&[hostname.as_str(), "FetchTimeout"])
528 .inc();
529 }
530 }
531 }
532 sleep(TIMEOUT).await;
534 }
535 }
536
537 async fn fetch_once(
541 inner: Arc<Inner<C>>,
542 target_authority: AuthorityIndex,
543 commit_range: CommitRange,
544 timeout: Duration,
545 ) -> ConsensusResult<CertifiedCommits> {
546 let _timer = inner
547 .context
548 .metrics
549 .node_metrics
550 .commit_sync_fetch_once_latency
551 .start_timer();
552
553 let (serialized_commits, serialized_blocks) = inner
555 .network_client
556 .fetch_commits(target_authority, commit_range.clone(), timeout)
557 .await?;
558
559 let (commits, vote_blocks) = Handle::current()
564 .spawn_blocking({
565 let inner = inner.clone();
566 move || {
567 inner.verify_commits(
568 target_authority,
569 commit_range,
570 serialized_commits,
571 serialized_blocks,
572 )
573 }
574 })
575 .await
576 .expect("Spawn blocking should not fail")?;
577
578 let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
580 let num_chunks = block_refs
581 .len()
582 .div_ceil(inner.context.parameters.max_blocks_per_fetch)
583 as u32;
584 let mut requests: FuturesOrdered<_> = block_refs
585 .chunks(inner.context.parameters.max_blocks_per_fetch)
586 .enumerate()
587 .map(|(i, request_block_refs)| {
588 let inner = inner.clone();
589 async move {
590 sleep(timeout * i as u32 / num_chunks).await;
593 let serialized_blocks = inner
595 .network_client
596 .fetch_blocks(
597 target_authority,
598 request_block_refs.to_vec(),
599 vec![],
600 timeout,
601 )
602 .await?;
603 if request_block_refs.len() != serialized_blocks.len() {
605 return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
606 authority: target_authority,
607 requested: request_block_refs.len(),
608 received: serialized_blocks.len(),
609 });
610 }
611 let signed_blocks = serialized_blocks
613 .iter()
614 .map(|serialized| {
615 let block: SignedBlock = bcs::from_bytes(serialized)
616 .map_err(ConsensusError::MalformedBlock)?;
617 Ok(block)
618 })
619 .collect::<ConsensusResult<Vec<_>>>()?;
620 let mut blocks = Vec::new();
623 for ((requested_block_ref, signed_block), serialized) in request_block_refs
624 .iter()
625 .zip(signed_blocks.into_iter())
626 .zip(serialized_blocks.into_iter())
627 {
628 let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
629 let received_block_ref = BlockRef::new(
630 signed_block.round(),
631 signed_block.author(),
632 signed_block_digest,
633 );
634 if *requested_block_ref != received_block_ref {
635 return Err(ConsensusError::UnexpectedBlockForCommit {
636 peer: target_authority,
637 requested: *requested_block_ref,
638 received: received_block_ref,
639 });
640 }
641 blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
642 }
643 Ok(blocks)
644 }
645 })
646 .collect();
647
648 let mut fetched_blocks = BTreeMap::new();
649 while let Some(result) = requests.next().await {
650 for block in result? {
651 fetched_blocks.insert(block.reference(), block);
652 }
653 }
654
655 for block in fetched_blocks.values().chain(vote_blocks.iter()) {
658 let now_ms = inner.context.clock.timestamp_utc_ms();
659 let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
660 if forward_drift == 0 {
661 continue;
662 };
663 let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
664 inner
665 .context
666 .metrics
667 .node_metrics
668 .block_timestamp_drift_wait_ms
669 .with_label_values(&[peer_hostname.as_str(), "commit_syncer"])
670 .inc_by(forward_drift);
671 let forward_drift = Duration::from_millis(forward_drift);
672 if forward_drift >= inner.context.parameters.max_forward_time_drift {
673 warn!(
674 "Local clock is behind a quorum of peers: local ts {}, certified block ts {}",
675 now_ms,
676 block.timestamp_ms()
677 );
678 }
679 sleep(forward_drift).await;
680 }
681
682 let mut certified_commits = Vec::new();
685 for commit in &commits {
686 let blocks = commit
687 .blocks()
688 .iter()
689 .map(|block_ref| {
690 fetched_blocks
691 .remove(block_ref)
692 .expect("Block should exist")
693 })
694 .collect::<Vec<_>>();
695 certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
696 }
697
698 Ok(CertifiedCommits::new(certified_commits, vote_blocks))
699 }
700
701 fn unhandled_commits_threshold(&self) -> CommitIndex {
702 self.inner.context.parameters.commit_sync_batch_size
703 * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
704 }
705
706 #[cfg(test)]
707 fn pending_fetches(&self) -> BTreeSet<CommitRange> {
708 self.pending_fetches.clone()
709 }
710
711 #[cfg(test)]
712 fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
713 self.fetched_ranges.clone()
714 }
715
716 #[cfg(test)]
717 fn highest_scheduled_index(&self) -> Option<CommitIndex> {
718 self.highest_scheduled_index
719 }
720
721 #[cfg(test)]
722 fn highest_fetched_commit_index(&self) -> CommitIndex {
723 self.highest_fetched_commit_index
724 }
725
726 #[cfg(test)]
727 fn synced_commit_index(&self) -> CommitIndex {
728 self.synced_commit_index
729 }
730}
731
732struct Inner<C: NetworkClient> {
733 context: Arc<Context>,
734 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
735 commit_vote_monitor: Arc<CommitVoteMonitor>,
736 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
737 network_client: Arc<C>,
738 block_verifier: Arc<dyn BlockVerifier>,
739 dag_state: Arc<RwLock<DagState>>,
740}
741
742impl<C: NetworkClient> Inner<C> {
743 fn verify_commits(
747 &self,
748 peer: AuthorityIndex,
749 commit_range: CommitRange,
750 serialized_commits: Vec<Bytes>,
751 serialized_vote_blocks: Vec<Bytes>,
752 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
753 let mut commits = Vec::new();
755 for serialized in &serialized_commits {
756 let commit: Commit =
757 bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
758 let digest = TrustedCommit::compute_digest(serialized);
759 if commits.is_empty() {
760 if commit.index() != commit_range.start() {
762 return Err(ConsensusError::UnexpectedStartCommit {
763 peer,
764 start: commit_range.start(),
765 commit: Box::new(commit),
766 });
767 }
768 } else {
769 let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
771 commits.last().unwrap();
772 if commit.index() != last_commit.index() + 1
773 || &commit.previous_digest() != last_commit_digest
774 {
775 return Err(ConsensusError::UnexpectedCommitSequence {
776 peer,
777 prev_commit: Box::new(last_commit.clone()),
778 curr_commit: Box::new(commit),
779 });
780 }
781 }
782 if commit.index() > commit_range.end() {
784 break;
785 }
786 commits.push((digest, commit));
787 }
788 let Some((end_commit_digest, end_commit)) = commits.last() else {
789 return Err(ConsensusError::NoCommitReceived { peer });
790 };
791
792 let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
794 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
795 let mut vote_blocks = Vec::new();
796 for serialized in serialized_vote_blocks {
797 let block: SignedBlock =
798 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
799 self.block_verifier.verify(&block)?;
801 for vote in block.commit_votes() {
802 if *vote == end_commit_ref {
803 stake_aggregator.add(block.author(), &self.context.committee);
804 }
805 }
806 vote_blocks.push(VerifiedBlock::new_verified(block, serialized));
807 }
808
809 if !stake_aggregator.reached_threshold(&self.context.committee) {
811 return Err(ConsensusError::NotEnoughCommitVotes {
812 stake: stake_aggregator.stake(),
813 peer,
814 commit: Box::new(end_commit.clone()),
815 });
816 }
817
818 let trusted_commits = commits
819 .into_iter()
820 .zip(serialized_commits)
821 .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
822 .collect();
823 Ok((trusted_commits, vote_blocks))
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use std::{sync::Arc, time::Duration};
830
831 use bytes::Bytes;
832 use consensus_config::{AuthorityIndex, Parameters};
833 use parking_lot::RwLock;
834
835 use crate::{
836 CommitConsumerMonitor, CommitDigest, CommitRef, Round,
837 block::{BlockRef, TestBlock, VerifiedBlock},
838 block_verifier::NoopBlockVerifier,
839 commit::CommitRange,
840 commit_syncer::CommitSyncer,
841 commit_vote_monitor::CommitVoteMonitor,
842 context::Context,
843 core_thread::tests::MockCoreThreadDispatcher,
844 dag_state::DagState,
845 error::ConsensusResult,
846 network::{BlockStream, NetworkClient},
847 storage::mem_store::MemStore,
848 };
849
850 #[derive(Default)]
851 struct FakeNetworkClient {}
852
853 #[async_trait::async_trait]
854 impl NetworkClient for FakeNetworkClient {
855 const SUPPORT_STREAMING: bool = true;
856
857 async fn send_block(
858 &self,
859 _peer: AuthorityIndex,
860 _serialized_block: &VerifiedBlock,
861 _timeout: Duration,
862 ) -> ConsensusResult<()> {
863 unimplemented!("Unimplemented")
864 }
865
866 async fn subscribe_blocks(
867 &self,
868 _peer: AuthorityIndex,
869 _last_received: Round,
870 _timeout: Duration,
871 ) -> ConsensusResult<BlockStream> {
872 unimplemented!("Unimplemented")
873 }
874
875 async fn fetch_blocks(
876 &self,
877 _peer: AuthorityIndex,
878 _block_refs: Vec<BlockRef>,
879 _highest_accepted_rounds: Vec<Round>,
880 _timeout: Duration,
881 ) -> ConsensusResult<Vec<Bytes>> {
882 unimplemented!("Unimplemented")
883 }
884
885 async fn fetch_commits(
886 &self,
887 _peer: AuthorityIndex,
888 _commit_range: CommitRange,
889 _timeout: Duration,
890 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
891 unimplemented!("Unimplemented")
892 }
893
894 async fn fetch_latest_blocks(
895 &self,
896 _peer: AuthorityIndex,
897 _authorities: Vec<AuthorityIndex>,
898 _timeout: Duration,
899 ) -> ConsensusResult<Vec<Bytes>> {
900 unimplemented!("Unimplemented")
901 }
902
903 async fn get_latest_rounds(
904 &self,
905 _peer: AuthorityIndex,
906 _timeout: Duration,
907 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
908 unimplemented!("Unimplemented")
909 }
910 }
911
912 #[tokio::test(flavor = "current_thread", start_paused = true)]
913 async fn commit_syncer_start_and_pause_scheduling() {
914 let (context, _) = Context::new_for_test(4);
916 let context = Context {
918 own_index: AuthorityIndex::new_for_test(3),
919 parameters: Parameters {
920 commit_sync_batch_size: 5,
921 commit_sync_batches_ahead: 5,
922 commit_sync_parallel_fetches: 5,
923 max_blocks_per_fetch: 5,
924 ..context.parameters
925 },
926 ..context
927 };
928 let context = Arc::new(context);
929 let block_verifier = Arc::new(NoopBlockVerifier {});
930 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
931 let network_client = Arc::new(FakeNetworkClient::default());
932 let store = Arc::new(MemStore::new());
933 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
934 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
935 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0));
936 let mut commit_syncer = CommitSyncer::new(
937 context,
938 core_thread_dispatcher,
939 commit_vote_monitor.clone(),
940 commit_consumer_monitor.clone(),
941 network_client,
942 block_verifier,
943 dag_state,
944 );
945
946 assert!(commit_syncer.pending_fetches().is_empty());
948 assert!(commit_syncer.fetched_ranges().is_empty());
949 assert!(commit_syncer.highest_scheduled_index().is_none());
950 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
951 assert_eq!(commit_syncer.synced_commit_index(), 0);
952
953 for i in 0..3 {
956 let test_block = TestBlock::new(15, i)
957 .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
958 .build();
959 let block = VerifiedBlock::new_for_test(test_block);
960 commit_vote_monitor.observe_block(&block);
961 }
962
963 commit_syncer.try_schedule_once();
965
966 assert_eq!(commit_syncer.pending_fetches().len(), 2);
968 assert!(commit_syncer.fetched_ranges().is_empty());
969 assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
970 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
971 assert_eq!(commit_syncer.synced_commit_index(), 0);
972
973 for i in 0..3 {
976 let test_block = TestBlock::new(40, i)
977 .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
978 .build();
979 let block = VerifiedBlock::new_for_test(test_block);
980 commit_vote_monitor.observe_block(&block);
981 }
982
983 commit_syncer.try_schedule_once();
985
986 assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
988 assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
989 let pending_fetches = commit_syncer.pending_fetches();
990 assert_eq!(pending_fetches.len(), 5);
991
992 commit_consumer_monitor.set_highest_handled_commit(25);
994 commit_syncer.try_schedule_once();
995
996 assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
998 let pending_fetches = commit_syncer.pending_fetches();
999 assert_eq!(pending_fetches.len(), 7);
1000
1001 for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1003 assert_eq!(range.start(), start);
1004 assert_eq!(range.end(), start + 4);
1005 }
1006 }
1007}