consensus_core/
ancestor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use consensus_config::AuthorityIndex;
8use tracing::info;
9
10use crate::{context::Context, leader_scoring::ReputationScores, round_prober::QuorumRound};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
13pub(crate) enum AncestorState {
14    Include,
15    Exclude(u64),
16}
17
18#[derive(Clone)]
19struct AncestorInfo {
20    state: AncestorState,
21    // This will be set to the count of either the quorum round update count or
22    // the score update count for which the EXCLUDE or INCLUDE state are locked
23    // in respectively.
24    lock_expiry_count: u32,
25}
26
27impl AncestorInfo {
28    fn new() -> Self {
29        Self {
30            state: AncestorState::Include,
31            lock_expiry_count: 0,
32        }
33    }
34
35    fn is_locked(
36        &self,
37        propagation_score_update_count: u32,
38        quorum_round_update_count: u32,
39    ) -> bool {
40        match self.state {
41            AncestorState::Include => self.lock_expiry_count > propagation_score_update_count,
42            AncestorState::Exclude(_) => self.lock_expiry_count > quorum_round_update_count,
43        }
44    }
45
46    fn set_lock(&mut self, future_count: u32) {
47        self.lock_expiry_count = future_count;
48    }
49}
50
51pub(crate) struct AncestorStateManager {
52    context: Arc<Context>,
53    state_map: Vec<AncestorInfo>,
54    propagation_score_update_count: u32,
55    quorum_round_update_count: u32,
56    pub(crate) received_quorum_round_per_authority: Vec<QuorumRound>,
57    pub(crate) accepted_quorum_round_per_authority: Vec<QuorumRound>,
58    // This is the reputation scores that we use for leader election but we are
59    // using it here as a signal for high quality block propagation as well.
60    pub(crate) propagation_scores: ReputationScores,
61}
62
63impl AncestorStateManager {
64    // Number of quorum round updates for which an ancestor is locked in the EXCLUDE
65    // state Chose 10 updates as that should be ~50 seconds of waiting with the
66    // current round prober interval of 5s
67    #[cfg(not(test))]
68    const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 10;
69    #[cfg(test)]
70    const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 1;
71
72    // Number of propagation score updates for which an ancestor is locked in the
73    // INCLUDE state Chose 2 leader schedule updates (~300 commits per schedule)
74    // which should be ~30-90 seconds depending on the round rate for the
75    // authority to improve scores.
76    #[cfg(not(test))]
77    const STATE_LOCK_SCORE_UPDATES: u32 = 2;
78    #[cfg(test)]
79    const STATE_LOCK_SCORE_UPDATES: u32 = 1;
80
81    // Exclusion threshold is based on propagation (reputation) scores
82    const EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 10;
83
84    pub(crate) fn new(context: Arc<Context>) -> Self {
85        let state_map = vec![AncestorInfo::new(); context.committee.size()];
86
87        let received_quorum_round_per_authority = vec![(0, 0); context.committee.size()];
88        let accepted_quorum_round_per_authority = vec![(0, 0); context.committee.size()];
89        Self {
90            context,
91            state_map,
92            propagation_score_update_count: 0,
93            quorum_round_update_count: 0,
94            propagation_scores: ReputationScores::default(),
95            received_quorum_round_per_authority,
96            accepted_quorum_round_per_authority,
97        }
98    }
99
100    pub(crate) fn set_quorum_rounds_per_authority(
101        &mut self,
102        received_quorum_rounds: Vec<QuorumRound>,
103        accepted_quorum_rounds: Vec<QuorumRound>,
104    ) {
105        self.received_quorum_round_per_authority = received_quorum_rounds;
106        self.accepted_quorum_round_per_authority = accepted_quorum_rounds;
107        self.quorum_round_update_count += 1;
108    }
109
110    pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) {
111        self.propagation_scores = scores;
112        self.propagation_score_update_count += 1;
113    }
114
115    pub(crate) fn get_ancestor_states(&self) -> Vec<AncestorState> {
116        self.state_map.iter().map(|info| info.state).collect()
117    }
118
119    /// Updates the state of all ancestors based on the latest scores and quorum
120    /// rounds
121    pub(crate) fn update_all_ancestors_state(&mut self) {
122        let propagation_scores_by_authority = self
123            .propagation_scores
124            .scores_per_authority
125            .clone()
126            .into_iter()
127            .enumerate()
128            .map(|(idx, score)| {
129                (
130                    self.context
131                        .committee
132                        .to_authority_index(idx)
133                        .expect("Index should be valid"),
134                    score,
135                )
136            })
137            .collect::<Vec<_>>();
138
139        // If round prober has not run yet and we don't have network quorum round,
140        // it is okay because network_high_quorum_round will be zero and we will
141        // include all ancestors until we get more information.
142        let network_high_quorum_round = self.calculate_network_high_quorum_round();
143
144        // If propagation scores are not ready because the first 300 commits have not
145        // happened, this is okay as we will only start excluding ancestors after that
146        // point in time.
147        for (authority_id, score) in propagation_scores_by_authority {
148            let (_low, authority_high_quorum_round) = if self
149                .context
150                .protocol_config
151                .consensus_round_prober_probe_accepted_rounds()
152            {
153                self.accepted_quorum_round_per_authority[authority_id]
154            } else {
155                self.received_quorum_round_per_authority[authority_id]
156            };
157
158            self.update_state(
159                authority_id,
160                score,
161                authority_high_quorum_round,
162                network_high_quorum_round,
163            );
164        }
165    }
166
167    /// Updates the state of the given authority based on current scores and
168    /// quorum rounds.
169    fn update_state(
170        &mut self,
171        authority_id: AuthorityIndex,
172        propagation_score: u64,
173        authority_high_quorum_round: u32,
174        network_high_quorum_round: u32,
175    ) {
176        let block_hostname = &self.context.committee.authority(authority_id).hostname;
177        let mut ancestor_info = self.state_map[authority_id].clone();
178
179        if ancestor_info.is_locked(
180            self.propagation_score_update_count,
181            self.quorum_round_update_count,
182        ) {
183            // If still locked, we won't make any state changes.
184            return;
185        }
186
187        let low_score_threshold =
188            (self.propagation_scores.highest_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100;
189
190        match ancestor_info.state {
191            // Check conditions to switch to EXCLUDE state
192            // TODO: Consider using received round gaps for exclusion.
193            AncestorState::Include => {
194                if propagation_score <= low_score_threshold {
195                    ancestor_info.state = AncestorState::Exclude(propagation_score);
196                    ancestor_info.set_lock(
197                        self.quorum_round_update_count + Self::STATE_LOCK_QUORUM_ROUND_UPDATES,
198                    );
199                    info!(
200                        "Authority {authority_id} moved to EXCLUDE state with score {propagation_score} <= threshold of {low_score_threshold} and locked for {:?} quorum round updates",
201                        Self::STATE_LOCK_QUORUM_ROUND_UPDATES
202                    );
203                    self.context
204                        .metrics
205                        .node_metrics
206                        .ancestor_state_change_by_authority
207                        .with_label_values(&[block_hostname.as_str(), "exclude"])
208                        .inc();
209                }
210            }
211            // Check conditions to switch back to INCLUDE state
212            AncestorState::Exclude(_) => {
213                // It should not be possible for the scores to get over the threshold
214                // until the node is back in the INCLUDE state, but adding just in case.
215                if propagation_score > low_score_threshold
216                    || authority_high_quorum_round >= network_high_quorum_round
217                {
218                    ancestor_info.state = AncestorState::Include;
219                    ancestor_info.set_lock(
220                        self.propagation_score_update_count + Self::STATE_LOCK_SCORE_UPDATES,
221                    );
222                    info!(
223                        "Authority {authority_id} moved to INCLUDE state with {propagation_score} > threshold of {low_score_threshold} or {authority_high_quorum_round} >= {network_high_quorum_round} and locked for {:?} score updates.",
224                        Self::STATE_LOCK_SCORE_UPDATES
225                    );
226                    self.context
227                        .metrics
228                        .node_metrics
229                        .ancestor_state_change_by_authority
230                        .with_label_values(&[block_hostname.as_str(), "include"])
231                        .inc();
232                }
233            }
234        }
235
236        // If any updates were made to state ensure they are persisted.
237        self.state_map[authority_id] = ancestor_info;
238    }
239
240    /// Calculate the network's quorum round based on what information is
241    /// available via RoundProber.
242    /// When consensus_round_prober_probe_accepted_rounds is true, uses accepted
243    /// rounds. Otherwise falls back to received rounds.
244    fn calculate_network_high_quorum_round(&self) -> u32 {
245        if self
246            .context
247            .protocol_config
248            .consensus_round_prober_probe_accepted_rounds()
249        {
250            self.calculate_network_high_accepted_quorum_round()
251        } else {
252            self.calculate_network_high_received_quorum_round()
253        }
254    }
255
256    fn calculate_network_high_accepted_quorum_round(&self) -> u32 {
257        let committee = &self.context.committee;
258
259        let high_quorum_rounds_with_stake = self
260            .accepted_quorum_round_per_authority
261            .iter()
262            .zip(committee.authorities())
263            .map(|((_low, high), (_, authority))| (*high, authority.stake))
264            .collect::<Vec<_>>();
265
266        self.calculate_network_high_quorum_round_internal(high_quorum_rounds_with_stake)
267    }
268
269    fn calculate_network_high_received_quorum_round(&self) -> u32 {
270        let committee = &self.context.committee;
271
272        let high_quorum_rounds_with_stake = self
273            .received_quorum_round_per_authority
274            .iter()
275            .zip(committee.authorities())
276            .map(|((_low, high), (_, authority))| (*high, authority.stake))
277            .collect::<Vec<_>>();
278
279        self.calculate_network_high_quorum_round_internal(high_quorum_rounds_with_stake)
280    }
281
282    /// Calculate the network's high quorum round.
283    /// The authority high quorum round is the lowest round higher or equal to
284    /// rounds from a quorum of authorities. The network high quorum round
285    /// is using the high quorum round of each authority as reported by the
286    /// [`RoundProber`] and then finding the high quorum round of those high
287    /// quorum rounds.
288    fn calculate_network_high_quorum_round_internal(
289        &self,
290        mut high_quorum_rounds_with_stake: Vec<(u32, u64)>,
291    ) -> u32 {
292        high_quorum_rounds_with_stake.sort();
293
294        let mut total_stake = 0;
295        let mut network_high_quorum_round = 0;
296
297        for (round, stake) in high_quorum_rounds_with_stake.iter() {
298            total_stake += stake;
299            if total_stake >= self.context.committee.quorum_threshold() {
300                network_high_quorum_round = *round;
301                break;
302            }
303        }
304
305        network_high_quorum_round
306    }
307}
308
309#[cfg(test)]
310mod test {
311    use super::*;
312    use crate::leader_scoring::ReputationScores;
313
314    #[tokio::test]
315    async fn test_calculate_network_high_received_quorum_round() {
316        telemetry_subscribers::init_for_testing();
317
318        let (mut context, _key_pairs) = Context::new_for_test(4);
319        context
320            .protocol_config
321            .set_consensus_round_prober_probe_accepted_rounds(false);
322        let context = Arc::new(context);
323
324        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
325        let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
326        ancestor_state_manager.set_propagation_scores(scores);
327
328        // Quorum rounds are not set yet, so we should calculate a network
329        // quorum round of 0 to start.
330        let network_high_quorum_round =
331            ancestor_state_manager.calculate_network_high_quorum_round();
332        assert_eq!(network_high_quorum_round, 0);
333
334        let received_quorum_rounds = vec![(100, 229), (225, 229), (229, 300), (229, 300)];
335        let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
336        ancestor_state_manager.set_quorum_rounds_per_authority(
337            received_quorum_rounds.clone(),
338            accepted_quorum_rounds.clone(),
339        );
340
341        // When probe_accepted_rounds is false, should use received rounds
342        let network_high_quorum_round =
343            ancestor_state_manager.calculate_network_high_quorum_round();
344        assert_eq!(network_high_quorum_round, 300);
345    }
346
347    #[tokio::test]
348    async fn test_calculate_network_high_accepted_quorum_round() {
349        telemetry_subscribers::init_for_testing();
350
351        let (mut context, _key_pairs) = Context::new_for_test(4);
352        context
353            .protocol_config
354            .set_consensus_round_prober_probe_accepted_rounds(true);
355        let context = Arc::new(context);
356
357        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
358        let mut ancestor_state_manager = AncestorStateManager::new(context.clone());
359        ancestor_state_manager.set_propagation_scores(scores);
360
361        // Quorum rounds are not set yet, so we should calculate a network
362        // quorum round of 0 to start.
363        let network_high_quorum_round =
364            ancestor_state_manager.calculate_network_high_quorum_round();
365        assert_eq!(network_high_quorum_round, 0);
366
367        let received_quorum_rounds = vec![(100, 229), (225, 300), (229, 300), (229, 300)];
368        let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
369        ancestor_state_manager.set_quorum_rounds_per_authority(
370            received_quorum_rounds.clone(),
371            accepted_quorum_rounds.clone(),
372        );
373
374        // When probe_accepted_rounds is true, should use accepted rounds
375        let network_high_quorum_round =
376            ancestor_state_manager.calculate_network_high_quorum_round();
377        assert_eq!(network_high_quorum_round, 229);
378    }
379
380    // Test all state transitions with probe_accepted_rounds = true
381    // Default all INCLUDE -> EXCLUDE
382    // EXCLUDE -> INCLUDE (Blocked due to lock)
383    // EXCLUDE -> INCLUDE (Pass due to lock expired)
384    // INCLUDE -> EXCLUDE (Blocked due to lock)
385    // INCLUDE -> EXCLUDE (Pass due to lock expired)
386    #[tokio::test]
387    async fn test_update_all_ancestor_state_using_accepted_rounds() {
388        telemetry_subscribers::init_for_testing();
389        let (mut context, _key_pairs) = Context::new_for_test(4);
390        context
391            .protocol_config
392            .set_consensus_round_prober_probe_accepted_rounds(true);
393        let context = Arc::new(context);
394
395        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
396        let mut ancestor_state_manager = AncestorStateManager::new(context);
397        ancestor_state_manager.set_propagation_scores(scores);
398
399        let received_quorum_rounds = vec![(300, 400), (300, 400), (300, 400), (300, 400)];
400        let accepted_quorum_rounds = vec![(225, 229), (225, 229), (229, 300), (229, 300)];
401        ancestor_state_manager
402            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
403        ancestor_state_manager.update_all_ancestors_state();
404
405        // Score threshold for exclude is (4 * 10) / 100 = 0
406        // No ancestors should be excluded in with this threshold
407        let state_map = ancestor_state_manager.get_ancestor_states();
408        for state in state_map.iter() {
409            assert_eq!(*state, AncestorState::Include);
410        }
411
412        let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]);
413        ancestor_state_manager.set_propagation_scores(scores);
414        ancestor_state_manager.update_all_ancestors_state();
415
416        // Score threshold for exclude is (100 * 10) / 100 = 10
417        // 2 authorities should be excluded in with this threshold
418        let state_map = ancestor_state_manager.get_ancestor_states();
419        for (authority, state) in state_map.iter().enumerate() {
420            if (0..=1).contains(&authority) {
421                assert_eq!(*state, AncestorState::Exclude(10));
422            } else {
423                assert_eq!(*state, AncestorState::Include);
424            }
425        }
426
427        ancestor_state_manager.update_all_ancestors_state();
428
429        // 2 authorities should still be excluded with these scores and no new
430        // quorum round updates have been set to expire the locks.
431        let state_map = ancestor_state_manager.get_ancestor_states();
432        for (authority, state) in state_map.iter().enumerate() {
433            if (0..=1).contains(&authority) {
434                assert_eq!(*state, AncestorState::Exclude(10));
435            } else {
436                assert_eq!(*state, AncestorState::Include);
437            }
438        }
439
440        // Updating the quorum rounds will expire the lock as we only need 1
441        // quorum round update for tests.
442        let received_quorum_rounds = vec![(400, 500), (400, 500), (400, 500), (400, 500)];
443        let accepted_quorum_rounds = vec![(229, 300), (225, 229), (229, 300), (229, 300)];
444        ancestor_state_manager
445            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
446        ancestor_state_manager.update_all_ancestors_state();
447
448        // Authority 0 should now be included again because high quorum round is
449        // at the network high quorum round of 300. Authority 1's quorum round is
450        // too low and will remain excluded.
451        let state_map = ancestor_state_manager.get_ancestor_states();
452        for (authority, state) in state_map.iter().enumerate() {
453            if authority == 1 {
454                assert_eq!(*state, AncestorState::Exclude(10));
455            } else {
456                assert_eq!(*state, AncestorState::Include);
457            }
458        }
459
460        let received_quorum_rounds = vec![(500, 600), (500, 600), (500, 600), (500, 600)];
461        let accepted_quorum_rounds = vec![(229, 300), (229, 300), (229, 300), (229, 300)];
462        ancestor_state_manager
463            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
464        ancestor_state_manager.update_all_ancestors_state();
465
466        // Ancestor 1 can transition to the INCLUDE state. Ancestor 0 is still locked
467        // in the INCLUDE state until a score update is performed which is why
468        // even though the scores are still low it has not moved to the EXCLUDE
469        // state.
470        let state_map = ancestor_state_manager.get_ancestor_states();
471        for state in state_map.iter() {
472            assert_eq!(*state, AncestorState::Include);
473        }
474
475        // Updating the scores will expire the lock as we only need 1 update for tests.
476        let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]);
477        ancestor_state_manager.set_propagation_scores(scores);
478        ancestor_state_manager.update_all_ancestors_state();
479
480        // Ancestor 1 can transition to EXCLUDE state now that the lock expired
481        // and its scores are below the threshold.
482        let state_map = ancestor_state_manager.get_ancestor_states();
483        for (authority, state) in state_map.iter().enumerate() {
484            if authority == 1 {
485                assert_eq!(*state, AncestorState::Exclude(10));
486            } else {
487                assert_eq!(*state, AncestorState::Include);
488            }
489        }
490    }
491
492    // Test all state transitions with probe_accepted_rounds = false
493    // Default all INCLUDE -> EXCLUDE
494    // EXCLUDE -> INCLUDE (Blocked due to lock)
495    // EXCLUDE -> INCLUDE (Pass due to lock expired)
496    // INCLUDE -> EXCLUDE (Blocked due to lock)
497    // INCLUDE -> EXCLUDE (Pass due to lock expired)
498    #[tokio::test]
499    async fn test_update_all_ancestor_state_using_received_rounds() {
500        telemetry_subscribers::init_for_testing();
501        let (mut context, _key_pairs) = Context::new_for_test(4);
502        context
503            .protocol_config
504            .set_consensus_round_prober_probe_accepted_rounds(false);
505        let context = Arc::new(context);
506
507        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
508        let mut ancestor_state_manager = AncestorStateManager::new(context);
509        ancestor_state_manager.set_propagation_scores(scores);
510
511        let received_quorum_rounds = vec![(225, 229), (225, 300), (229, 300), (229, 300)];
512        let accepted_quorum_rounds = vec![(100, 150), (100, 150), (100, 150), (100, 150)];
513        ancestor_state_manager
514            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
515        ancestor_state_manager.update_all_ancestors_state();
516
517        // Score threshold for exclude is (4 * 10) / 100 = 0
518        // No ancestors should be excluded in with this threshold
519        let state_map = ancestor_state_manager.get_ancestor_states();
520        for state in state_map.iter() {
521            assert_eq!(*state, AncestorState::Include);
522        }
523
524        let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]);
525        ancestor_state_manager.set_propagation_scores(scores);
526        ancestor_state_manager.update_all_ancestors_state();
527
528        // Score threshold for exclude is (100 * 10) / 100 = 10
529        // 2 authorities should be excluded in with this threshold
530        let state_map = ancestor_state_manager.get_ancestor_states();
531        for (authority, state) in state_map.iter().enumerate() {
532            if (0..=1).contains(&authority) {
533                assert_eq!(*state, AncestorState::Exclude(10));
534            } else {
535                assert_eq!(*state, AncestorState::Include);
536            }
537        }
538
539        ancestor_state_manager.update_all_ancestors_state();
540
541        // 2 authorities should still be excluded with these scores and no new
542        // quorum round updates have been set to expire the locks.
543        let state_map = ancestor_state_manager.get_ancestor_states();
544        for (authority, state) in state_map.iter().enumerate() {
545            if (0..=1).contains(&authority) {
546                assert_eq!(*state, AncestorState::Exclude(10));
547            } else {
548                assert_eq!(*state, AncestorState::Include);
549            }
550        }
551
552        // Updating the quorum rounds will expire the lock as we only need 1
553        // quorum round update for tests.
554        let received_quorum_rounds = vec![(229, 300), (225, 229), (229, 300), (229, 300)];
555        let accepted_quorum_rounds = vec![(100, 150), (100, 150), (100, 150), (100, 150)];
556        ancestor_state_manager
557            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
558        ancestor_state_manager.update_all_ancestors_state();
559
560        // Authority 0 should now be included again because high quorum round is
561        // at the network high quorum round of 300. Authority 1's quorum round is
562        // too low and will remain excluded.
563        let state_map = ancestor_state_manager.get_ancestor_states();
564        for (authority, state) in state_map.iter().enumerate() {
565            if authority == 1 {
566                assert_eq!(*state, AncestorState::Exclude(10));
567            } else {
568                assert_eq!(*state, AncestorState::Include);
569            }
570        }
571
572        let received_quorum_rounds = vec![(229, 300), (229, 300), (229, 300), (229, 300)];
573        let accepted_quorum_rounds = vec![(100, 150), (100, 150), (100, 150), (100, 150)];
574        ancestor_state_manager
575            .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
576        ancestor_state_manager.update_all_ancestors_state();
577
578        // Ancestor 1 can transition to the INCLUDE state. Ancestor 0 is still locked
579        // in the INCLUDE state until a score update is performed which is why
580        // even though the scores are still low it has not moved to the EXCLUDE
581        // state.
582        let state_map = ancestor_state_manager.get_ancestor_states();
583        for state in state_map.iter() {
584            assert_eq!(*state, AncestorState::Include);
585        }
586
587        // Updating the scores will expire the lock as we only need 1 update for tests.
588        let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]);
589        ancestor_state_manager.set_propagation_scores(scores);
590        ancestor_state_manager.update_all_ancestors_state();
591
592        // Ancestor 1 can transition to EXCLUDE state now that the lock expired
593        // and its scores are below the threshold.
594        let state_map = ancestor_state_manager.get_ancestor_states();
595        for (authority, state) in state_map.iter().enumerate() {
596            if authority == 1 {
597                assert_eq!(*state, AncestorState::Exclude(10));
598            } else {
599                assert_eq!(*state, AncestorState::Include);
600            }
601        }
602    }
603}