consensus_core/
universal_committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::VecDeque, sync::Arc};
6
7use consensus_config::AuthorityIndex;
8use parking_lot::RwLock;
9
10use crate::{
11    base_committer::BaseCommitter,
12    block::{GENESIS_ROUND, Round, Slot},
13    commit::{DecidedLeader, Decision},
14    context::Context,
15    dag_state::DagState,
16};
17
18#[cfg(test)]
19#[path = "tests/universal_committer_tests.rs"]
20mod universal_committer_tests;
21
22#[cfg(test)]
23#[path = "tests/pipelined_committer_tests.rs"]
24mod pipelined_committer_tests;
25
26/// A universal committer uses a collection of committers to commit a sequence
27/// of leaders. It can be configured to use a combination of different commit
28/// strategies, including multi-leaders, backup leaders, and pipelines.
29pub(crate) struct UniversalCommitter {
30    /// The per-epoch configuration of this authority.
31    context: Arc<Context>,
32    /// In memory block store representing the dag state
33    dag_state: Arc<RwLock<DagState>>,
34    /// The list of committers for multi-leader or pipelining
35    committers: Vec<BaseCommitter>,
36}
37
38impl UniversalCommitter {
39    /// Try to decide part of the dag. This function is idempotent and returns
40    /// an ordered list of decided leaders.
41    #[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
42    pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
43        let highest_accepted_round = self.dag_state.read().highest_accepted_round();
44
45        // Try to decide as many leaders as possible, starting with the highest round.
46        let mut leaders = VecDeque::new();
47
48        let last_round = last_decided.round + 1;
49
50        // Keep this code commented for re-use when we re-enable multiple leaders.
51        // let last_round = match self
52        //     .context
53        //     .protocol_config
54        //     .mysticeti_num_leaders_per_round()
55        // {
56        //     Some(1) => {
57        // Ensure that we don't commit any leaders from the same round as last_decided
58        // until we have full support for multi-leader per round.
59        // This can happen when we are on a leader schedule boundary and the leader
60        // elected for the round changes with the new schedule.
61        //         last_decided.round + 1
62        //     }
63        //     _ => last_decided.round,
64        // };
65
66        // try to commit a leader up to the highest_accepted_round - 2. There is no
67        // reason to try and iterate on higher rounds as in order to make a direct
68        // decision for a leader at round R we need blocks from round R+2 to figure
69        // out that enough certificates and support exist to commit a leader.
70        'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
71            for committer in self.committers.iter().rev() {
72                // Skip committers that don't have a leader for this round.
73                let Some(slot) = committer.elect_leader(round) else {
74                    tracing::debug!("No leader for round {round}, skipping");
75                    continue;
76                };
77
78                // now that we reached the last committed leader we can stop the commit rule
79                if slot == last_decided {
80                    tracing::debug!("Reached last committed {slot}, now exit");
81                    break 'outer;
82                }
83
84                tracing::debug!("Trying to decide {slot} with {committer}",);
85
86                // Try to directly decide the leader.
87                let mut status = committer.try_direct_decide(slot);
88                tracing::debug!("Outcome of direct rule: {status}");
89
90                // If we can't directly decide the leader, try to indirectly decide it.
91                if status.is_decided() {
92                    leaders.push_front((status, Decision::Direct));
93                } else {
94                    status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
95                    tracing::debug!("Outcome of indirect rule: {status}");
96                    leaders.push_front((status, Decision::Indirect));
97                }
98            }
99        }
100
101        // The decided sequence is the longest prefix of decided leaders.
102        let mut decided_leaders = Vec::new();
103        for (leader, decision) in leaders {
104            if leader.round() == GENESIS_ROUND {
105                continue;
106            }
107            let Some(decided_leader) = leader.into_decided_leader() else {
108                break;
109            };
110            Self::update_metrics(&self.context, &decided_leader, decision);
111            decided_leaders.push(decided_leader);
112        }
113        tracing::debug!("Decided {decided_leaders:?}");
114        decided_leaders
115    }
116
117    /// Return list of leaders for the round.
118    /// Can return empty vec if round does not have a designated leader.
119    pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
120        self.committers
121            .iter()
122            .filter_map(|committer| committer.elect_leader(round))
123            .map(|l| l.authority)
124            .collect()
125    }
126
127    /// Update metrics.
128    pub(crate) fn update_metrics(
129        context: &Context,
130        decided_leader: &DecidedLeader,
131        decision: Decision,
132    ) {
133        let decision_str = match decision {
134            Decision::Direct => "direct",
135            Decision::Indirect => "indirect",
136            Decision::Certified => "certified",
137        };
138        let status = match decided_leader {
139            DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
140            DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
141        };
142        let leader_host = &context
143            .committee
144            .authority(decided_leader.slot().authority)
145            .hostname;
146        context
147            .metrics
148            .node_metrics
149            .committed_leaders_total
150            .with_label_values(&[leader_host, &status])
151            .inc();
152    }
153}
154
155/// A builder for a universal committer. By default, the builder creates a
156/// single base committer, that is, a single leader and no pipeline.
157pub(crate) mod universal_committer_builder {
158    use super::*;
159    use crate::{
160        base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
161        leader_schedule::LeaderSchedule,
162    };
163
164    pub(crate) struct UniversalCommitterBuilder {
165        context: Arc<Context>,
166        leader_schedule: Arc<LeaderSchedule>,
167        dag_state: Arc<RwLock<DagState>>,
168        wave_length: Round,
169        number_of_leaders: usize,
170        pipeline: bool,
171    }
172
173    impl UniversalCommitterBuilder {
174        pub(crate) fn new(
175            context: Arc<Context>,
176            leader_schedule: Arc<LeaderSchedule>,
177            dag_state: Arc<RwLock<DagState>>,
178        ) -> Self {
179            Self {
180                context,
181                leader_schedule,
182                dag_state,
183                wave_length: DEFAULT_WAVE_LENGTH,
184                number_of_leaders: 1,
185                pipeline: false,
186            }
187        }
188
189        #[expect(unused)]
190        pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self {
191            self.wave_length = wave_length;
192            self
193        }
194
195        pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
196            self.number_of_leaders = number_of_leaders;
197            self
198        }
199
200        pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
201            self.pipeline = pipeline;
202            self
203        }
204
205        pub(crate) fn build(self) -> UniversalCommitter {
206            let mut committers = Vec::new();
207            let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
208            for round_offset in 0..pipeline_stages {
209                for leader_offset in 0..self.number_of_leaders {
210                    let options = BaseCommitterOptions {
211                        wave_length: self.wave_length,
212                        round_offset,
213                        leader_offset: leader_offset as Round,
214                    };
215                    let committer = BaseCommitter::new(
216                        self.context.clone(),
217                        self.leader_schedule.clone(),
218                        self.dag_state.clone(),
219                        options,
220                    );
221                    committers.push(committer);
222                }
223            }
224
225            UniversalCommitter {
226                context: self.context,
227                dag_state: self.dag_state,
228                committers,
229            }
230        }
231    }
232}