1use std::{sync::Arc, time::Duration};
25
26use consensus_config::{AuthorityIndex, Committee};
27use futures::stream::{FuturesUnordered, StreamExt as _};
28use iota_common::sync::notify_once::NotifyOnce;
29use iota_metrics::monitored_scope;
30use parking_lot::RwLock;
31use tokio::{task::JoinHandle, time::MissedTickBehavior};
32
33use crate::{
34 BlockAPI as _, Round, context::Context, core_thread::CoreThreadDispatcher, dag_state::DagState,
35 network::NetworkClient,
36};
37
38pub(crate) type QuorumRound = (Round, Round);
54
55pub(crate) struct RoundProberHandle {
57 prober_task: JoinHandle<()>,
58 shutdown_notify: Arc<NotifyOnce>,
59}
60
61impl RoundProberHandle {
62 pub(crate) async fn stop(self) {
63 let _ = self.shutdown_notify.notify();
64 if let Err(e) = self.prober_task.await {
66 if e.is_panic() {
67 std::panic::resume_unwind(e.into_panic());
68 }
69 }
70 }
71}
72
73pub(crate) struct RoundProber<C: NetworkClient> {
74 context: Arc<Context>,
75 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
76 dag_state: Arc<RwLock<DagState>>,
77 network_client: Arc<C>,
78 shutdown_notify: Arc<NotifyOnce>,
79}
80
81impl<C: NetworkClient> RoundProber<C> {
82 pub(crate) fn new(
83 context: Arc<Context>,
84 core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
85 dag_state: Arc<RwLock<DagState>>,
86 network_client: Arc<C>,
87 ) -> Self {
88 Self {
89 context,
90 core_thread_dispatcher,
91 dag_state,
92 network_client,
93 shutdown_notify: Arc::new(NotifyOnce::new()),
94 }
95 }
96
97 pub(crate) fn start(self) -> RoundProberHandle {
98 let shutdown_notify = self.shutdown_notify.clone();
99 let loop_shutdown_notify = shutdown_notify.clone();
100 let prober_task = tokio::spawn(async move {
101 let mut interval = tokio::time::interval(Duration::from_millis(
106 self.context.parameters.round_prober_interval_ms,
107 ));
108 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
109 loop {
110 tokio::select! {
111 _ = interval.tick() => {
112 self.probe().await;
113 }
114 _ = loop_shutdown_notify.wait() => {
115 break;
116 }
117 }
118 }
119 });
120 RoundProberHandle {
121 prober_task,
122 shutdown_notify,
123 }
124 }
125
126 pub(crate) async fn probe(&self) -> (Vec<QuorumRound>, Vec<QuorumRound>, Round) {
130 let _scope = monitored_scope("RoundProber");
131
132 let node_metrics = &self.context.metrics.node_metrics;
133 let request_timeout =
134 Duration::from_millis(self.context.parameters.round_prober_request_timeout_ms);
135 let own_index = self.context.own_index;
136 let mut requests = FuturesUnordered::new();
137
138 for (peer, _) in self.context.committee.authorities() {
139 if peer == own_index {
140 continue;
141 }
142 let network_client = self.network_client.clone();
143 requests.push(async move {
144 let result = tokio::time::timeout(
145 request_timeout,
146 network_client.get_latest_rounds(peer, request_timeout),
147 )
148 .await;
149 (peer, result)
150 });
151 }
152
153 let mut highest_received_rounds =
154 vec![vec![0; self.context.committee.size()]; self.context.committee.size()];
155 let mut highest_accepted_rounds =
156 vec![vec![0; self.context.committee.size()]; self.context.committee.size()];
157
158 let blocks = self
159 .dag_state
160 .read()
161 .get_last_cached_block_per_authority(Round::MAX);
162 let local_highest_accepted_rounds = blocks
163 .into_iter()
164 .map(|(block, _)| block.round())
165 .collect::<Vec<_>>();
166 let last_proposed_round = local_highest_accepted_rounds[own_index];
167
168 highest_received_rounds[own_index] = self.core_thread_dispatcher.highest_received_rounds();
171 highest_accepted_rounds[own_index] = local_highest_accepted_rounds;
172 highest_received_rounds[own_index][own_index] = last_proposed_round;
173 highest_accepted_rounds[own_index][own_index] = last_proposed_round;
174
175 loop {
176 tokio::select! {
177 result = requests.next() => {
178 let Some((peer, result)) = result else { break };
179 match result {
180 Ok(Ok((received, accepted))) => {
181 if received.len() == self.context.committee.size()
182 {
183 highest_received_rounds[peer] = received;
184 } else {
185 node_metrics.round_prober_request_errors.with_label_values(&["invalid_received_rounds"]).inc();
186 tracing::warn!("Received invalid number of received rounds from peer {}", peer);
187 }
188
189 if self
190 .context
191 .protocol_config
192 .consensus_round_prober_probe_accepted_rounds() {
193 if accepted.len() == self.context.committee.size() {
194 highest_accepted_rounds[peer] = accepted;
195 } else {
196 node_metrics.round_prober_request_errors.with_label_values(&["invalid_accepted_rounds"]).inc();
197 tracing::warn!("Received invalid number of accepted rounds from peer {}", peer);
198 }
199 }
200
201 },
202 Ok(Err(err)) => {
213 node_metrics.round_prober_request_errors.with_label_values(&["failed_fetch"]).inc();
214 tracing::debug!("Failed to get latest rounds from peer {}: {:?}", peer, err);
215 },
216 Err(_) => {
217 node_metrics.round_prober_request_errors.with_label_values(&["timeout"]).inc();
218 tracing::debug!("Timeout while getting latest rounds from peer {}", peer);
219 },
220 }
221 }
222 _ = self.shutdown_notify.wait() => break,
223 }
224 }
225
226 let received_quorum_rounds: Vec<_> = self
227 .context
228 .committee
229 .authorities()
230 .map(|(peer, _)| {
231 compute_quorum_round(&self.context.committee, peer, &highest_received_rounds)
232 })
233 .collect();
234 for ((low, high), (_, authority)) in received_quorum_rounds
235 .iter()
236 .zip(self.context.committee.authorities())
237 {
238 node_metrics
239 .round_prober_received_quorum_round_gaps
240 .with_label_values(&[&authority.hostname])
241 .set((high - low) as i64);
242 node_metrics
243 .round_prober_low_received_quorum_round
244 .with_label_values(&[&authority.hostname])
245 .set(*low as i64);
246 node_metrics
248 .round_prober_current_received_round_gaps
249 .with_label_values(&[&authority.hostname])
250 .set(last_proposed_round as i64 - *low as i64);
251 }
252
253 let accepted_quorum_rounds: Vec<_> = self
254 .context
255 .committee
256 .authorities()
257 .map(|(peer, _)| {
258 compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
259 })
260 .collect();
261 for ((low, high), (_, authority)) in accepted_quorum_rounds
262 .iter()
263 .zip(self.context.committee.authorities())
264 {
265 node_metrics
266 .round_prober_accepted_quorum_round_gaps
267 .with_label_values(&[&authority.hostname])
268 .set((high - low) as i64);
269 node_metrics
270 .round_prober_low_accepted_quorum_round
271 .with_label_values(&[&authority.hostname])
272 .set(*low as i64);
273 node_metrics
275 .round_prober_current_accepted_round_gaps
276 .with_label_values(&[&authority.hostname])
277 .set(last_proposed_round as i64 - *low as i64);
278 }
279 let propagation_delay =
290 last_proposed_round.saturating_sub(received_quorum_rounds[own_index].0);
291 node_metrics
292 .round_prober_propagation_delays
293 .observe(propagation_delay as f64);
294 node_metrics
295 .round_prober_last_propagation_delay
296 .set(propagation_delay as i64);
297 if let Err(e) = self
298 .core_thread_dispatcher
299 .set_propagation_delay_and_quorum_rounds(
300 propagation_delay,
301 received_quorum_rounds.clone(),
302 accepted_quorum_rounds.clone(),
303 )
304 {
305 tracing::warn!(
306 "Failed to set propagation delay and quorum rounds {received_quorum_rounds:?} on Core: {:?}",
307 e
308 );
309 }
310
311 (
312 received_quorum_rounds,
313 accepted_quorum_rounds,
314 propagation_delay,
315 )
316 }
317}
318
319fn compute_quorum_round(
322 committee: &Committee,
323 target_index: AuthorityIndex,
324 highest_received_rounds: &[Vec<Round>],
325) -> QuorumRound {
326 let mut rounds_with_stake = highest_received_rounds
327 .iter()
328 .zip(committee.authorities())
329 .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
330 .collect::<Vec<_>>();
331 rounds_with_stake.sort();
332
333 let mut total_stake = 0;
338 let mut low = 0;
339 for (round, stake) in rounds_with_stake.iter().rev() {
340 total_stake += stake;
341 if total_stake >= committee.quorum_threshold() {
342 low = *round;
343 break;
344 }
345 }
346
347 let mut total_stake = 0;
348 let mut high = 0;
349 for (round, stake) in rounds_with_stake.iter() {
350 total_stake += stake;
351 if total_stake >= committee.quorum_threshold() {
352 high = *round;
353 break;
354 }
355 }
356
357 (low, high)
358}
359
360#[cfg(test)]
361mod test {
362 use std::{collections::BTreeSet, sync::Arc, time::Duration};
363
364 use async_trait::async_trait;
365 use bytes::Bytes;
366 use consensus_config::AuthorityIndex;
367 use parking_lot::{Mutex, RwLock};
368
369 use super::QuorumRound;
370 use crate::{
371 Round, TestBlock, VerifiedBlock,
372 block::BlockRef,
373 commit::{CertifiedCommits, CommitRange},
374 context::Context,
375 core_thread::{CoreError, CoreThreadDispatcher},
376 dag_state::DagState,
377 error::{ConsensusError, ConsensusResult},
378 network::{BlockStream, NetworkClient},
379 round_prober::{RoundProber, compute_quorum_round},
380 storage::mem_store::MemStore,
381 };
382
383 struct FakeThreadDispatcher {
384 highest_received_rounds: Vec<Round>,
385 propagation_delay: Mutex<Round>,
386 received_quorum_rounds: Mutex<Vec<QuorumRound>>,
387 accepted_quorum_rounds: Mutex<Vec<QuorumRound>>,
388 }
389
390 impl FakeThreadDispatcher {
391 fn new(highest_received_rounds: Vec<Round>) -> Self {
392 Self {
393 highest_received_rounds,
394 propagation_delay: Mutex::new(0),
395 received_quorum_rounds: Mutex::new(Vec::new()),
396 accepted_quorum_rounds: Mutex::new(Vec::new()),
397 }
398 }
399
400 fn propagation_delay(&self) -> Round {
401 *self.propagation_delay.lock()
402 }
403
404 fn received_quorum_rounds(&self) -> Vec<QuorumRound> {
405 self.received_quorum_rounds.lock().clone()
406 }
407
408 fn accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
409 self.accepted_quorum_rounds.lock().clone()
410 }
411 }
412
413 #[async_trait]
414 impl CoreThreadDispatcher for FakeThreadDispatcher {
415 async fn add_blocks(
416 &self,
417 _blocks: Vec<VerifiedBlock>,
418 ) -> Result<BTreeSet<BlockRef>, CoreError> {
419 unimplemented!()
420 }
421
422 async fn add_certified_commits(
423 &self,
424 _commits: CertifiedCommits,
425 ) -> Result<BTreeSet<BlockRef>, CoreError> {
426 unimplemented!()
427 }
428
429 async fn check_block_refs(
430 &self,
431 _block_refs: Vec<BlockRef>,
432 ) -> Result<BTreeSet<BlockRef>, CoreError> {
433 unimplemented!()
434 }
435
436 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
437 unimplemented!()
438 }
439
440 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
441 unimplemented!()
442 }
443
444 fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
445 unimplemented!()
446 }
447
448 fn set_propagation_delay_and_quorum_rounds(
449 &self,
450 delay: Round,
451 received_quorum_rounds: Vec<QuorumRound>,
452 accepted_quorum_rounds: Vec<QuorumRound>,
453 ) -> Result<(), CoreError> {
454 let mut received_quorum_round_per_authority = self.received_quorum_rounds.lock();
455 *received_quorum_round_per_authority = received_quorum_rounds;
456 let mut accepted_quorum_round_per_authority = self.accepted_quorum_rounds.lock();
457 *accepted_quorum_round_per_authority = accepted_quorum_rounds;
458 let mut propagation_delay = self.propagation_delay.lock();
459 *propagation_delay = delay;
460 Ok(())
461 }
462
463 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
464 unimplemented!()
465 }
466
467 fn highest_received_rounds(&self) -> Vec<Round> {
468 self.highest_received_rounds.clone()
469 }
470 }
471
472 struct FakeNetworkClient {
473 highest_received_rounds: Vec<Vec<Round>>,
474 highest_accepted_rounds: Vec<Vec<Round>>,
475 }
476
477 impl FakeNetworkClient {
478 fn new(
479 highest_received_rounds: Vec<Vec<Round>>,
480 highest_accepted_rounds: Vec<Vec<Round>>,
481 ) -> Self {
482 Self {
483 highest_received_rounds,
484 highest_accepted_rounds,
485 }
486 }
487 }
488
489 #[async_trait]
490 #[async_trait::async_trait]
491 impl NetworkClient for FakeNetworkClient {
492 const SUPPORT_STREAMING: bool = true;
493
494 async fn send_block(
495 &self,
496 _peer: AuthorityIndex,
497 _serialized_block: &VerifiedBlock,
498 _timeout: Duration,
499 ) -> ConsensusResult<()> {
500 unimplemented!("Unimplemented")
501 }
502
503 async fn subscribe_blocks(
504 &self,
505 _peer: AuthorityIndex,
506 _last_received: Round,
507 _timeout: Duration,
508 ) -> ConsensusResult<BlockStream> {
509 unimplemented!("Unimplemented")
510 }
511
512 async fn fetch_blocks(
513 &self,
514 _peer: AuthorityIndex,
515 _block_refs: Vec<BlockRef>,
516 _highest_accepted_rounds: Vec<Round>,
517 _timeout: Duration,
518 ) -> ConsensusResult<Vec<Bytes>> {
519 unimplemented!("Unimplemented")
520 }
521
522 async fn fetch_commits(
523 &self,
524 _peer: AuthorityIndex,
525 _commit_range: CommitRange,
526 _timeout: Duration,
527 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
528 unimplemented!("Unimplemented")
529 }
530
531 async fn fetch_latest_blocks(
532 &self,
533 _peer: AuthorityIndex,
534 _authorities: Vec<AuthorityIndex>,
535 _timeout: Duration,
536 ) -> ConsensusResult<Vec<Bytes>> {
537 unimplemented!("Unimplemented")
538 }
539
540 async fn get_latest_rounds(
541 &self,
542 peer: AuthorityIndex,
543 _timeout: Duration,
544 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
545 let received_rounds = self.highest_received_rounds[peer].clone();
546 let accepted_rounds = self.highest_accepted_rounds[peer].clone();
547 if received_rounds.is_empty() && accepted_rounds.is_empty() {
548 Err(ConsensusError::NetworkRequestTimeout("test".to_string()))
549 } else {
550 Ok((received_rounds, accepted_rounds))
551 }
552 }
553 }
554
555 #[tokio::test]
556 async fn test_round_prober() {
557 const NUM_AUTHORITIES: usize = 7;
558 let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
559 let core_thread_dispatcher = Arc::new(FakeThreadDispatcher::new(vec![
560 110, 120, 130, 140, 150, 160, 170,
561 ]));
562 let store = Arc::new(MemStore::new());
563 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
564 let network_client = Arc::new(FakeNetworkClient::new(
566 vec![
567 vec![],
568 vec![109, 121, 131, 0, 151, 161, 171],
569 vec![101, 0, 103, 104, 105, 166, 107],
570 vec![],
571 vec![100, 102, 133, 0, 155, 106, 177],
572 vec![105, 115, 103, 0, 125, 126, 127],
573 vec![10, 20, 30, 40, 50, 60],
574 ], vec![
576 vec![],
577 vec![0, 121, 131, 0, 151, 161, 171],
578 vec![1, 0, 103, 104, 105, 166, 107],
579 vec![],
580 vec![0, 102, 133, 0, 155, 106, 177],
581 vec![1, 115, 103, 0, 125, 126, 127],
582 vec![1, 20, 30, 40, 50, 60],
583 ], ));
585 let prober = RoundProber::new(
586 context.clone(),
587 core_thread_dispatcher.clone(),
588 dag_state.clone(),
589 network_client.clone(),
590 );
591
592 let blocks = (0..NUM_AUTHORITIES)
595 .map(|authority| {
596 let round = 110 + (authority as u32 * 10);
597 VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build())
598 })
599 .collect::<Vec<_>>();
600
601 dag_state.write().accept_blocks(blocks);
602
603 let (received_quorum_rounds, accepted_quorum_rounds, propagation_delay) =
614 prober.probe().await;
615
616 assert_eq!(
617 received_quorum_rounds,
618 vec![
619 (100, 105),
620 (0, 115),
621 (103, 130),
622 (0, 0),
623 (105, 150),
624 (106, 160),
625 (107, 170)
626 ]
627 );
628
629 assert_eq!(
630 core_thread_dispatcher.received_quorum_rounds(),
631 vec![
632 (100, 105),
633 (0, 115),
634 (103, 130),
635 (0, 0),
636 (105, 150),
637 (106, 160),
638 (107, 170)
639 ]
640 );
641 assert_eq!(propagation_delay, 10);
643 assert_eq!(core_thread_dispatcher.propagation_delay(), 10);
644
645 assert_eq!(
646 accepted_quorum_rounds,
647 vec![
648 (0, 1),
649 (0, 115),
650 (103, 130),
651 (0, 0),
652 (105, 150),
653 (106, 160),
654 (107, 170)
655 ]
656 );
657
658 assert_eq!(
659 core_thread_dispatcher.accepted_quorum_rounds(),
660 vec![
661 (0, 1),
662 (0, 115),
663 (103, 130),
664 (0, 0),
665 (105, 150),
666 (106, 160),
667 (107, 170)
668 ]
669 );
670 }
671
672 #[tokio::test]
673 async fn test_compute_quorum_round() {
674 let (context, _) = Context::new_for_test(4);
675
676 let highest_received_rounds = vec![
678 vec![10, 11, 12, 13],
679 vec![5, 2, 7, 4],
680 vec![0, 0, 0, 0],
681 vec![3, 4, 5, 6],
682 ];
683
684 let round = compute_quorum_round(
685 &context.committee,
686 AuthorityIndex::new_for_test(0),
687 &highest_received_rounds,
688 );
689 assert_eq!(round, (3, 5));
690
691 let round = compute_quorum_round(
692 &context.committee,
693 AuthorityIndex::new_for_test(1),
694 &highest_received_rounds,
695 );
696 assert_eq!(round, (2, 4));
697
698 let round = compute_quorum_round(
699 &context.committee,
700 AuthorityIndex::new_for_test(2),
701 &highest_received_rounds,
702 );
703 assert_eq!(round, (5, 7));
704
705 let round = compute_quorum_round(
706 &context.committee,
707 AuthorityIndex::new_for_test(3),
708 &highest_received_rounds,
709 );
710 assert_eq!(round, (4, 6));
711 }
712}