1use std::{
33 collections::{BTreeMap, BTreeSet},
34 sync::Arc,
35 time::Duration,
36};
37
38use bytes::Bytes;
39use consensus_config::AuthorityIndex;
40use futures::{StreamExt as _, stream::FuturesOrdered};
41use iota_metrics::spawn_logged_monitored_task;
42use itertools::Itertools as _;
43use parking_lot::RwLock;
44use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
45use tokio::{
46 runtime::Handle,
47 sync::oneshot,
48 task::{JoinHandle, JoinSet},
49 time::{MissedTickBehavior, sleep},
50};
51use tracing::{debug, info, warn};
52
53use crate::{
54 CommitConsumerMonitor, CommitIndex,
55 block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
56 block_verifier::BlockVerifier,
57 commit::{
58 CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
59 CommitRef, TrustedCommit,
60 },
61 commit_vote_monitor::CommitVoteMonitor,
62 context::Context,
63 core_thread::CoreThreadDispatcher,
64 dag_state::DagState,
65 error::{ConsensusError, ConsensusResult},
66 network::NetworkClient,
67 stake_aggregator::{QuorumThreshold, StakeAggregator},
68};
69
70pub(crate) struct CommitSyncerHandle {
72 schedule_task: JoinHandle<()>,
73 tx_shutdown: oneshot::Sender<()>,
74}
75
76impl CommitSyncerHandle {
77 pub(crate) async fn stop(self) {
78 let _ = self.tx_shutdown.send(());
79 if let Err(e) = self.schedule_task.await {
81 if e.is_panic() {
82 std::panic::resume_unwind(e.into_panic());
83 }
84 }
85 }
86}
87
88pub(crate) struct CommitSyncer<C: NetworkClient> {
89 inner: Arc<Inner<C>>,
93
94 inflight_fetches: JoinSet<(AuthorityIndex, u32, CertifiedCommits)>,
98 pending_fetches: BTreeSet<CommitRange>,
100 fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
102 highest_scheduled_index: Option<CommitIndex>,
105 highest_fetched_commit_index: CommitIndex,
108 synced_commit_index: CommitIndex,
111}
112
113impl<C: NetworkClient> CommitSyncer<C> {
114 pub(crate) fn new(
115 context: Arc<Context>,
116 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
117 commit_vote_monitor: Arc<CommitVoteMonitor>,
118 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
119 network_client: Arc<C>,
120 block_verifier: Arc<dyn BlockVerifier>,
121 dag_state: Arc<RwLock<DagState>>,
122 ) -> Self {
123 let inner = Arc::new(Inner {
124 context,
125 core_thread_dispatcher,
126 commit_vote_monitor,
127 commit_consumer_monitor,
128 network_client,
129 block_verifier,
130 dag_state,
131 });
132 let synced_commit_index = inner.dag_state.read().last_commit_index();
133 CommitSyncer {
134 inner,
135 inflight_fetches: JoinSet::new(),
136 pending_fetches: BTreeSet::new(),
137 fetched_ranges: BTreeMap::new(),
138 highest_scheduled_index: None,
139 highest_fetched_commit_index: 0,
140 synced_commit_index,
141 }
142 }
143
144 pub(crate) fn start(self) -> CommitSyncerHandle {
145 let (tx_shutdown, rx_shutdown) = oneshot::channel();
146 let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
147 CommitSyncerHandle {
148 schedule_task,
149 tx_shutdown,
150 }
151 }
152
153 async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
154 let mut interval = tokio::time::interval(Duration::from_secs(2));
155 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
156
157 loop {
158 tokio::select! {
159 _ = interval.tick() => {
161 self.try_schedule_once();
162 }
163 Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
165 if let Err(e) = result {
166 if e.is_panic() {
167 std::panic::resume_unwind(e.into_panic());
168 }
169 warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
170 self.inflight_fetches.shutdown().await;
172 return;
173 }
174 let (authority, target_end, commits) = result.unwrap();
175 self.handle_fetch_result(authority, target_end, commits).await;
176 }
177 _ = &mut rx_shutdown => {
178 info!("CommitSyncer shutting down ...");
180 self.inflight_fetches.shutdown().await;
181 return;
182 }
183 }
184
185 self.try_start_fetches();
186 }
187 }
188
189 fn try_schedule_once(&mut self) {
190 let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
191 let local_commit_index = self.inner.dag_state.read().last_commit_index();
192 let metrics = &self.inner.context.metrics.node_metrics;
193 metrics
194 .commit_sync_quorum_index
195 .set(quorum_commit_index as i64);
196 metrics
197 .commit_sync_local_index
198 .set(local_commit_index as i64);
199 let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
200 let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
201 self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
204 let unhandled_commits_threshold = self.unhandled_commits_threshold();
205 info!(
206 "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
207 self.synced_commit_index,
208 highest_handled_index,
209 highest_scheduled_index,
210 quorum_commit_index,
211 unhandled_commits_threshold,
212 );
213
214 let fetch_after_index = self
216 .synced_commit_index
217 .max(self.highest_scheduled_index.unwrap_or(0));
218 for prev_end in (fetch_after_index..=quorum_commit_index)
221 .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
222 {
223 let range_start = prev_end + 1;
225 let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
226 if quorum_commit_index < range_end {
231 break;
232 }
233 if highest_handled_index + unhandled_commits_threshold < range_end {
235 warn!(
236 "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
237 highest_handled_index, highest_scheduled_index
238 );
239 break;
240 }
241 self.pending_fetches
242 .insert((range_start..=range_end).into());
243 self.highest_scheduled_index = Some(range_end);
246 }
247 }
248
249 async fn handle_fetch_result(
250 &mut self,
251 authority_index: AuthorityIndex,
252 target_end: CommitIndex,
253 certified_commits: CertifiedCommits,
254 ) {
255 assert!(!certified_commits.commits().is_empty());
256
257 let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
258 .commits()
259 .iter()
260 .fold((0, 0), |(blocks, bytes), c| {
261 (
262 blocks + c.blocks().len(),
263 bytes
264 + c.blocks()
265 .iter()
266 .map(|b| b.serialized().len())
267 .sum::<usize>() as u64,
268 )
269 });
270 let hostname = &self
271 .inner
272 .context
273 .committee
274 .authority(authority_index)
275 .hostname;
276 let metrics = &self.inner.context.metrics.node_metrics;
277 metrics
278 .commit_sync_fetched_commits
279 .with_label_values(&[&hostname.as_str()])
280 .inc_by(certified_commits.commits().len() as u64);
281 metrics
282 .commit_sync_fetched_blocks
283 .with_label_values(&[&hostname.as_str()])
284 .inc_by(total_blocks_fetched as u64);
285 metrics
286 .commit_sync_total_fetched_blocks_size
287 .with_label_values(&[&hostname.as_str()])
288 .inc_by(total_blocks_size_bytes);
289
290 let (commit_start, commit_end) = (
291 certified_commits.commits().first().unwrap().index(),
292 certified_commits.commits().last().unwrap().index(),
293 );
294 self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
295 metrics
296 .commit_sync_highest_fetched_index
297 .set(self.highest_fetched_commit_index as i64);
298
299 if commit_end < target_end {
301 self.pending_fetches
302 .insert((commit_end + 1..=target_end).into());
303 }
304 self.synced_commit_index = self
306 .synced_commit_index
307 .max(self.inner.dag_state.read().last_commit_index());
308 if self.synced_commit_index < commit_end {
310 self.fetched_ranges
311 .insert((commit_start..=commit_end).into(), certified_commits);
312 }
313 while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
315 let (fetched_commit_range, commits) =
318 if fetched_commit_range.start() <= self.synced_commit_index + 1 {
319 self.fetched_ranges.pop_first().unwrap()
320 } else {
321 metrics.commit_sync_gap_on_processing.inc();
324 break;
325 };
326 if fetched_commit_range.end() <= self.synced_commit_index {
328 continue;
329 }
330
331 debug!(
332 "Fetched certified blocks for commit range {:?}: {}",
333 fetched_commit_range,
334 commits
335 .commits()
336 .iter()
337 .flat_map(|c| c.blocks())
338 .map(|b| b.reference().to_string())
339 .join(","),
340 );
341
342 match self
348 .inner
349 .core_thread_dispatcher
350 .add_certified_commits(commits)
351 .await
352 {
353 Ok(missing) => {
354 if !missing.is_empty() {
355 warn!(
356 "Fetched blocks have missing ancestors: {:?} for commit range {:?}",
357 missing, fetched_commit_range
358 );
359 }
360 for block_ref in missing {
361 let hostname = &self
362 .inner
363 .context
364 .committee
365 .authority(block_ref.author)
366 .hostname;
367 metrics
368 .commit_sync_fetch_missing_blocks
369 .with_label_values(&[hostname])
370 .inc();
371 }
372 }
373 Err(e) => {
374 info!("Failed to add blocks, shutting down: {}", e);
375 return;
376 }
377 };
378
379 self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
381 }
382
383 metrics
384 .commit_sync_inflight_fetches
385 .set(self.inflight_fetches.len() as i64);
386 metrics
387 .commit_sync_pending_fetches
388 .set(self.pending_fetches.len() as i64);
389 metrics
390 .commit_sync_highest_synced_index
391 .set(self.synced_commit_index as i64);
392 }
393
394 fn try_start_fetches(&mut self) {
395 let target_parallel_fetches = self
401 .inner
402 .context
403 .parameters
404 .commit_sync_parallel_fetches
405 .min(self.inner.context.committee.size() * 2 / 3)
406 .min(
407 self.inner
408 .context
409 .parameters
410 .commit_sync_batches_ahead
411 .saturating_sub(self.fetched_ranges.len()),
412 )
413 .max(1);
414 loop {
416 if self.inflight_fetches.len() >= target_parallel_fetches {
417 break;
418 }
419 let Some(commit_range) = self.pending_fetches.pop_first() else {
420 break;
421 };
422 self.inflight_fetches
423 .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
424 }
425
426 let metrics = &self.inner.context.metrics.node_metrics;
427 metrics
428 .commit_sync_inflight_fetches
429 .set(self.inflight_fetches.len() as i64);
430 metrics
431 .commit_sync_pending_fetches
432 .set(self.pending_fetches.len() as i64);
433 metrics
434 .commit_sync_highest_synced_index
435 .set(self.synced_commit_index as i64);
436 }
437
438 async fn fetch_loop(
442 inner: Arc<Inner<C>>,
443 commit_range: CommitRange,
444 ) -> (AuthorityIndex, CommitIndex, CertifiedCommits) {
445 const TIMEOUT: Duration = Duration::from_secs(10);
447 const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
451 const MAX_NUM_TARGETS: usize = 24;
454 let mut timeout_multiplier = 0;
455
456 let _timer = inner
457 .context
458 .metrics
459 .node_metrics
460 .commit_sync_fetch_loop_latency
461 .start_timer();
462 info!("Starting to fetch commits in {commit_range:?} ...",);
463 loop {
464 let mut target_authorities = inner
467 .context
468 .committee
469 .authorities()
470 .filter_map(|(i, _)| {
471 if i != inner.context.own_index {
472 Some(i)
473 } else {
474 None
475 }
476 })
477 .collect_vec();
478 target_authorities.shuffle(&mut ThreadRng::default());
479 target_authorities.truncate(MAX_NUM_TARGETS);
480 timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
482 let request_timeout = TIMEOUT * timeout_multiplier;
483 let fetch_timeout = request_timeout * 4;
489 for authority in target_authorities {
491 match tokio::time::timeout(
492 fetch_timeout,
493 Self::fetch_once(
494 inner.clone(),
495 authority,
496 commit_range.clone(),
497 request_timeout,
498 ),
499 )
500 .await
501 {
502 Ok(Ok(commits)) => {
503 info!("Finished fetching commits in {commit_range:?}",);
504 return (authority, commit_range.end(), commits);
505 }
506 Ok(Err(e)) => {
507 let hostname = inner
508 .context
509 .committee
510 .authority(authority)
511 .hostname
512 .clone();
513 inner
514 .context
515 .metrics
516 .update_scoring_metrics_on_block_receival(
517 authority,
518 hostname.as_str(),
519 e.clone(),
520 "fetch_once",
521 );
522 warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
523 let error: &'static str = e.into();
524 inner
525 .context
526 .metrics
527 .node_metrics
528 .commit_sync_fetch_once_errors
529 .with_label_values(&[hostname.as_str(), error])
530 .inc();
531 }
532 Err(_) => {
533 let hostname = inner
534 .context
535 .committee
536 .authority(authority)
537 .hostname
538 .clone();
539 warn!("Timed out fetching {commit_range:?} from {authority}",);
540 inner
541 .context
542 .metrics
543 .node_metrics
544 .commit_sync_fetch_once_errors
545 .with_label_values(&[hostname.as_str(), "FetchTimeout"])
546 .inc();
547 }
548 }
549 }
550 sleep(TIMEOUT).await;
552 }
553 }
554
555 async fn fetch_once(
559 inner: Arc<Inner<C>>,
560 target_authority: AuthorityIndex,
561 commit_range: CommitRange,
562 timeout: Duration,
563 ) -> ConsensusResult<CertifiedCommits> {
564 const MAX_PIPELINE_DELAY: Duration = Duration::from_secs(1);
567
568 let hostname = inner
569 .context
570 .committee
571 .authority(target_authority)
572 .hostname
573 .clone();
574 let _timer = inner
575 .context
576 .metrics
577 .node_metrics
578 .commit_sync_fetch_once_latency
579 .with_label_values(&[hostname.as_str()])
580 .start_timer();
581
582 let (serialized_commits, serialized_blocks) = inner
584 .network_client
585 .fetch_commits(target_authority, commit_range.clone(), timeout)
586 .await?;
587
588 let (commits, vote_blocks) = Handle::current()
593 .spawn_blocking({
594 let inner = inner.clone();
595 move || {
596 inner.verify_commits(
597 target_authority,
598 commit_range,
599 serialized_commits,
600 serialized_blocks,
601 )
602 }
603 })
604 .await
605 .expect("Spawn blocking should not fail")?;
606
607 let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
609 let num_chunks = block_refs
610 .len()
611 .div_ceil(inner.context.parameters.max_blocks_per_fetch)
612 as u32;
613 let mut requests: FuturesOrdered<_> = block_refs
614 .chunks(inner.context.parameters.max_blocks_per_fetch)
615 .enumerate()
616 .map(|(i, request_block_refs)| {
617 let inner = inner.clone();
618 async move {
619 let individual_delay = (timeout / num_chunks).min(MAX_PIPELINE_DELAY);
622 sleep(individual_delay * i as u32).await;
623 let serialized_blocks = inner
625 .network_client
626 .fetch_blocks(
627 target_authority,
628 request_block_refs.to_vec(),
629 vec![],
630 timeout,
631 )
632 .await?;
633 if request_block_refs.len() != serialized_blocks.len() {
635 return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
636 authority: target_authority,
637 requested: request_block_refs.len(),
638 received: serialized_blocks.len(),
639 });
640 }
641 let signed_blocks = serialized_blocks
643 .iter()
644 .map(|serialized| {
645 let block: SignedBlock = bcs::from_bytes(serialized)
646 .map_err(ConsensusError::MalformedBlock)?;
647 Ok(block)
648 })
649 .collect::<ConsensusResult<Vec<_>>>()?;
650 let mut blocks = Vec::new();
653 for ((requested_block_ref, signed_block), serialized) in request_block_refs
654 .iter()
655 .zip(signed_blocks.into_iter())
656 .zip(serialized_blocks.into_iter())
657 {
658 let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
659 let received_block_ref = BlockRef::new(
660 signed_block.round(),
661 signed_block.author(),
662 signed_block_digest,
663 );
664 if *requested_block_ref != received_block_ref {
665 return Err(ConsensusError::UnexpectedBlockForCommit {
666 peer: target_authority,
667 requested: *requested_block_ref,
668 received: received_block_ref,
669 });
670 }
671 blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
672 }
673 Ok(blocks)
674 }
675 })
676 .collect();
677
678 let mut fetched_blocks = BTreeMap::new();
679 while let Some(result) = requests.next().await {
680 for block in result? {
681 fetched_blocks.insert(block.reference(), block);
682 }
683 }
684
685 for block in fetched_blocks.values().chain(vote_blocks.iter()) {
688 let now_ms = inner.context.clock.timestamp_utc_ms();
689 let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
690 if forward_drift == 0 {
691 continue;
692 };
693 let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
694 inner
695 .context
696 .metrics
697 .node_metrics
698 .block_timestamp_drift_ms
699 .with_label_values(&[peer_hostname.as_str(), "commit_syncer"])
700 .inc_by(forward_drift);
701
702 if !inner
705 .context
706 .protocol_config
707 .consensus_median_timestamp_with_checkpoint_enforcement()
708 {
709 let forward_drift = Duration::from_millis(forward_drift);
710 if forward_drift >= inner.context.parameters.max_forward_time_drift {
711 warn!(
712 "Local clock is behind a quorum of peers: local ts {}, committed block ts {}",
713 now_ms,
714 block.timestamp_ms()
715 );
716 }
717 sleep(forward_drift).await;
718 }
719 }
720
721 let mut certified_commits = Vec::new();
724 for commit in &commits {
725 let blocks = commit
726 .blocks()
727 .iter()
728 .map(|block_ref| {
729 fetched_blocks
730 .remove(block_ref)
731 .expect("Block should exist")
732 })
733 .collect::<Vec<_>>();
734 certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
735 }
736
737 Ok(CertifiedCommits::new(certified_commits, vote_blocks))
738 }
739
740 fn unhandled_commits_threshold(&self) -> CommitIndex {
741 self.inner.context.parameters.commit_sync_batch_size
742 * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
743 }
744
745 #[cfg(test)]
746 fn pending_fetches(&self) -> BTreeSet<CommitRange> {
747 self.pending_fetches.clone()
748 }
749
750 #[cfg(test)]
751 fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
752 self.fetched_ranges.clone()
753 }
754
755 #[cfg(test)]
756 fn highest_scheduled_index(&self) -> Option<CommitIndex> {
757 self.highest_scheduled_index
758 }
759
760 #[cfg(test)]
761 fn highest_fetched_commit_index(&self) -> CommitIndex {
762 self.highest_fetched_commit_index
763 }
764
765 #[cfg(test)]
766 fn synced_commit_index(&self) -> CommitIndex {
767 self.synced_commit_index
768 }
769}
770
771struct Inner<C: NetworkClient> {
772 context: Arc<Context>,
773 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
774 commit_vote_monitor: Arc<CommitVoteMonitor>,
775 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
776 network_client: Arc<C>,
777 block_verifier: Arc<dyn BlockVerifier>,
778 dag_state: Arc<RwLock<DagState>>,
779}
780
781impl<C: NetworkClient> Inner<C> {
782 fn verify_commits(
786 &self,
787 peer: AuthorityIndex,
788 commit_range: CommitRange,
789 serialized_commits: Vec<Bytes>,
790 serialized_vote_blocks: Vec<Bytes>,
791 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
792 let mut commits = Vec::new();
794 for serialized in &serialized_commits {
795 let commit: Commit =
796 bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
797 let digest = TrustedCommit::compute_digest(serialized);
798 if commits.is_empty() {
799 if commit.index() != commit_range.start() {
801 return Err(ConsensusError::UnexpectedStartCommit {
802 peer,
803 start: commit_range.start(),
804 commit: Box::new(commit),
805 });
806 }
807 } else {
808 let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
810 commits.last().unwrap();
811 if commit.index() != last_commit.index() + 1
812 || &commit.previous_digest() != last_commit_digest
813 {
814 return Err(ConsensusError::UnexpectedCommitSequence {
815 peer,
816 prev_commit: Box::new(last_commit.clone()),
817 curr_commit: Box::new(commit),
818 });
819 }
820 }
821 if commit.index() > commit_range.end() {
823 break;
824 }
825 commits.push((digest, commit));
826 }
827 let Some((end_commit_digest, end_commit)) = commits.last() else {
828 return Err(ConsensusError::NoCommitReceived { peer });
829 };
830
831 let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
833 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
834 let mut vote_blocks = Vec::new();
835 for serialized in serialized_vote_blocks {
836 let block: SignedBlock =
837 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
838 self.block_verifier.verify(&block)?;
840 for vote in block.commit_votes() {
841 if *vote == end_commit_ref {
842 stake_aggregator.add(block.author(), &self.context.committee);
843 }
844 }
845 vote_blocks.push(VerifiedBlock::new_verified(block, serialized));
846 }
847
848 if !stake_aggregator.reached_threshold(&self.context.committee) {
850 return Err(ConsensusError::NotEnoughCommitVotes {
851 stake: stake_aggregator.stake(),
852 peer,
853 commit: Box::new(end_commit.clone()),
854 });
855 }
856
857 let trusted_commits = commits
858 .into_iter()
859 .zip(serialized_commits)
860 .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
861 .collect();
862 Ok((trusted_commits, vote_blocks))
863 }
864}
865
866#[cfg(test)]
867mod tests {
868 use std::{sync::Arc, time::Duration};
869
870 use bytes::Bytes;
871 use consensus_config::{AuthorityIndex, Parameters};
872 use parking_lot::RwLock;
873
874 use crate::{
875 CommitConsumerMonitor, CommitDigest, CommitRef, Round,
876 block::{BlockRef, TestBlock, VerifiedBlock},
877 block_verifier::NoopBlockVerifier,
878 commit::CommitRange,
879 commit_syncer::CommitSyncer,
880 commit_vote_monitor::CommitVoteMonitor,
881 context::Context,
882 core_thread::tests::MockCoreThreadDispatcher,
883 dag_state::DagState,
884 error::ConsensusResult,
885 network::{BlockStream, NetworkClient},
886 storage::mem_store::MemStore,
887 };
888
889 #[derive(Default)]
890 struct FakeNetworkClient {}
891
892 #[async_trait::async_trait]
893 impl NetworkClient for FakeNetworkClient {
894 const SUPPORT_STREAMING: bool = true;
895
896 async fn send_block(
897 &self,
898 _peer: AuthorityIndex,
899 _serialized_block: &VerifiedBlock,
900 _timeout: Duration,
901 ) -> ConsensusResult<()> {
902 unimplemented!("Unimplemented")
903 }
904
905 async fn subscribe_blocks(
906 &self,
907 _peer: AuthorityIndex,
908 _last_received: Round,
909 _timeout: Duration,
910 ) -> ConsensusResult<BlockStream> {
911 unimplemented!("Unimplemented")
912 }
913
914 async fn fetch_blocks(
915 &self,
916 _peer: AuthorityIndex,
917 _block_refs: Vec<BlockRef>,
918 _highest_accepted_rounds: Vec<Round>,
919 _timeout: Duration,
920 ) -> ConsensusResult<Vec<Bytes>> {
921 unimplemented!("Unimplemented")
922 }
923
924 async fn fetch_commits(
925 &self,
926 _peer: AuthorityIndex,
927 _commit_range: CommitRange,
928 _timeout: Duration,
929 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
930 unimplemented!("Unimplemented")
931 }
932
933 async fn fetch_latest_blocks(
934 &self,
935 _peer: AuthorityIndex,
936 _authorities: Vec<AuthorityIndex>,
937 _timeout: Duration,
938 ) -> ConsensusResult<Vec<Bytes>> {
939 unimplemented!("Unimplemented")
940 }
941
942 async fn get_latest_rounds(
943 &self,
944 _peer: AuthorityIndex,
945 _timeout: Duration,
946 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
947 unimplemented!("Unimplemented")
948 }
949 }
950
951 #[tokio::test(flavor = "current_thread", start_paused = true)]
952 async fn commit_syncer_start_and_pause_scheduling() {
953 let (context, _) = Context::new_for_test(4);
955 let context = Context {
957 own_index: AuthorityIndex::new_for_test(3),
958 parameters: Parameters {
959 commit_sync_batch_size: 5,
960 commit_sync_batches_ahead: 5,
961 commit_sync_parallel_fetches: 5,
962 max_blocks_per_fetch: 5,
963 ..context.parameters
964 },
965 ..context
966 };
967 let context = Arc::new(context);
968 let block_verifier = Arc::new(NoopBlockVerifier {});
969 let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
970 let network_client = Arc::new(FakeNetworkClient::default());
971 let store = Arc::new(MemStore::new());
972 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
973 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
974 let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0));
975 let mut commit_syncer = CommitSyncer::new(
976 context,
977 core_thread_dispatcher,
978 commit_vote_monitor.clone(),
979 commit_consumer_monitor.clone(),
980 network_client,
981 block_verifier,
982 dag_state,
983 );
984
985 assert!(commit_syncer.pending_fetches().is_empty());
987 assert!(commit_syncer.fetched_ranges().is_empty());
988 assert!(commit_syncer.highest_scheduled_index().is_none());
989 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
990 assert_eq!(commit_syncer.synced_commit_index(), 0);
991
992 for i in 0..3 {
995 let test_block = TestBlock::new(15, i)
996 .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
997 .build();
998 let block = VerifiedBlock::new_for_test(test_block);
999 commit_vote_monitor.observe_block(&block);
1000 }
1001
1002 commit_syncer.try_schedule_once();
1004
1005 assert_eq!(commit_syncer.pending_fetches().len(), 2);
1007 assert!(commit_syncer.fetched_ranges().is_empty());
1008 assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
1009 assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1010 assert_eq!(commit_syncer.synced_commit_index(), 0);
1011
1012 for i in 0..3 {
1015 let test_block = TestBlock::new(40, i)
1016 .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
1017 .build();
1018 let block = VerifiedBlock::new_for_test(test_block);
1019 commit_vote_monitor.observe_block(&block);
1020 }
1021
1022 commit_syncer.try_schedule_once();
1024
1025 assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
1027 assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
1028 let pending_fetches = commit_syncer.pending_fetches();
1029 assert_eq!(pending_fetches.len(), 5);
1030
1031 commit_consumer_monitor.set_highest_handled_commit(25);
1033 commit_syncer.try_schedule_once();
1034
1035 assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1037 let pending_fetches = commit_syncer.pending_fetches();
1038 assert_eq!(pending_fetches.len(), 7);
1039
1040 for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1042 assert_eq!(range.start(), start);
1043 assert_eq!(range.end(), start + 4);
1044 }
1045 }
1046}