consensus_core/
linearizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc};
6
7use consensus_config::{AuthorityIndex, Stake};
8use itertools::Itertools;
9use parking_lot::RwLock;
10use tracing::instrument;
11
12use crate::{
13    Round,
14    block::{BlockAPI, BlockRef, BlockTimestampMs, VerifiedBlock},
15    commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
16    context::Context,
17    dag_state::DagState,
18    leader_schedule::LeaderSchedule,
19};
20
21/// The `StorageAPI` trait provides an interface for the block store and has
22/// been mostly introduced for allowing to inject the test store in
23/// `DagBuilder`.
24pub(crate) trait BlockStoreAPI {
25    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
26
27    fn gc_round(&self) -> Round;
28
29    fn gc_enabled(&self) -> bool;
30
31    fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
32
33    fn is_committed(&self, block_ref: &BlockRef) -> bool;
34}
35
36impl BlockStoreAPI
37    for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
38{
39    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
40        DagState::get_blocks(self, refs)
41    }
42
43    fn gc_round(&self) -> Round {
44        DagState::gc_round(self)
45    }
46
47    fn gc_enabled(&self) -> bool {
48        DagState::gc_enabled(self)
49    }
50
51    fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
52        DagState::set_committed(self, block_ref)
53    }
54
55    fn is_committed(&self, block_ref: &BlockRef) -> bool {
56        DagState::is_committed(self, block_ref)
57    }
58}
59
60/// Expand a committed sequence of leader into a sequence of sub-dags.
61#[derive(Clone)]
62pub(crate) struct Linearizer {
63    /// In memory block store representing the dag state
64    context: Arc<Context>,
65    dag_state: Arc<RwLock<DagState>>,
66    leader_schedule: Arc<LeaderSchedule>,
67}
68
69impl Linearizer {
70    pub(crate) fn new(
71        context: Arc<Context>,
72        dag_state: Arc<RwLock<DagState>>,
73        leader_schedule: Arc<LeaderSchedule>,
74    ) -> Self {
75        Self {
76            dag_state,
77            leader_schedule,
78            context,
79        }
80    }
81
82    /// Collect the sub-dag and the corresponding commit from a specific leader
83    /// excluding any duplicates or blocks that have already been committed
84    /// (within previous sub-dags).
85    fn collect_sub_dag_and_commit(
86        &mut self,
87        leader_block: VerifiedBlock,
88        reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
89    ) -> (CommittedSubDag, TrustedCommit) {
90        let _s = self
91            .context
92            .metrics
93            .node_metrics
94            .scope_processing_time
95            .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
96            .start_timer();
97        // Grab latest commit state from dag state
98        let mut dag_state = self.dag_state.write();
99        let last_commit_index = dag_state.last_commit_index();
100        let last_commit_digest = dag_state.last_commit_digest();
101        let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
102        let last_committed_rounds = dag_state.last_committed_rounds();
103
104        // Now linearize the sub-dag starting from the leader block
105        let to_commit = Self::linearize_sub_dag(
106            &self.context,
107            leader_block.clone(),
108            last_committed_rounds,
109            &mut dag_state,
110        );
111
112        let timestamp_ms = Self::calculate_commit_timestamp(
113            &self.context,
114            &mut dag_state,
115            &leader_block,
116            last_commit_timestamp_ms,
117        );
118
119        drop(dag_state);
120
121        // Create the Commit.
122        let commit = Commit::new(
123            last_commit_index + 1,
124            last_commit_digest,
125            timestamp_ms,
126            leader_block.reference(),
127            to_commit
128                .iter()
129                .map(|block| block.reference())
130                .collect::<Vec<_>>(),
131        );
132        let serialized = commit
133            .serialize()
134            .unwrap_or_else(|e| panic!("Failed to serialize commit: {e}"));
135        let commit = TrustedCommit::new_trusted(commit, serialized);
136
137        // Create the corresponding committed sub dag
138        let sub_dag = CommittedSubDag::new(
139            leader_block.reference(),
140            to_commit,
141            timestamp_ms,
142            commit.reference(),
143            reputation_scores_desc,
144        );
145
146        (sub_dag, commit)
147    }
148
149    /// Calculates the commit's timestamp. If the median-based timestamp
150    /// calculation is enabled, then the timestamp will be calculated as the
151    /// median of leader's parents (leader.round - 1) timestamps by stake.
152    /// Otherwise, the leader's timestamp will be used. To ensure that commit
153    /// timestamp monotonicity is respected, it is compared against the
154    /// `last_commit_timestamp_ms` and the maximum of the two is returned.
155    pub(crate) fn calculate_commit_timestamp(
156        context: &Context,
157        dag_state: &mut impl BlockStoreAPI,
158        leader_block: &VerifiedBlock,
159        last_commit_timestamp_ms: BlockTimestampMs,
160    ) -> BlockTimestampMs {
161        let timestamp_ms = if context
162            .protocol_config
163            .consensus_median_timestamp_with_checkpoint_enforcement()
164        {
165            // Select leaders' parent blocks.
166            let block_refs = leader_block
167                .ancestors()
168                .iter()
169                .filter(|block_ref| block_ref.round == leader_block.round() - 1)
170                .cloned()
171                .collect::<Vec<_>>();
172            // Get the blocks from dag state which should not fail.
173            let blocks = dag_state
174                .get_blocks(&block_refs)
175                .into_iter()
176                .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
177            median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
178                panic!(
179                    "Cannot compute median timestamp for leader block {leader_block:?} ancestors: {e}"
180                )
181            })
182        } else {
183            leader_block.timestamp_ms()
184        };
185
186        // Always make sure that commit timestamps are monotonic, so override if
187        // necessary.
188        timestamp_ms.max(last_commit_timestamp_ms)
189    }
190
191    pub(crate) fn linearize_sub_dag(
192        context: &Context,
193        leader_block: VerifiedBlock,
194        last_committed_rounds: Vec<u32>,
195        dag_state: &mut impl BlockStoreAPI,
196    ) -> Vec<VerifiedBlock> {
197        let gc_enabled = dag_state.gc_enabled();
198        // The GC round here is calculated based on the last committed round of the
199        // leader block. The algorithm will attempt to commit blocks up to this
200        // GC round. Once this commit has been processed and written to DagState, then
201        // gc round will update and on the processing of the next commit we'll
202        // have it already updated, so no need to do any gc_round recalculations here.
203        // We just use whatever is currently in DagState.
204        let gc_round: Round = dag_state.gc_round();
205        let leader_block_ref = leader_block.reference();
206        let mut buffer = vec![leader_block];
207
208        let mut to_commit = Vec::new();
209
210        // The new logic will perform the recursion without stopping at the highest
211        // round that has been committed per authority. Instead it will
212        // allow to commit blocks that are lower than the highest committed round for an
213        // authority but higher than gc_round.
214        if context.protocol_config.consensus_linearize_subdag_v2() {
215            assert!(
216                dag_state.set_committed(&leader_block_ref),
217                "Leader block with reference {leader_block_ref:?} attempted to be committed twice"
218            );
219
220            while let Some(x) = buffer.pop() {
221                to_commit.push(x.clone());
222
223                let ancestors: Vec<VerifiedBlock> = dag_state
224                    .get_blocks(
225                        &x.ancestors()
226                            .iter()
227                            .copied()
228                            .filter(|ancestor| {
229                                ancestor.round > gc_round && !dag_state.is_committed(ancestor)
230                            })
231                            .collect::<Vec<_>>(),
232                    )
233                    .into_iter()
234                    .map(|ancestor_opt| {
235                        ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
236                    })
237                    .collect();
238
239                for ancestor in ancestors {
240                    buffer.push(ancestor.clone());
241                    assert!(
242                        dag_state.set_committed(&ancestor.reference()),
243                        "Block with reference {:?} attempted to be committed twice",
244                        ancestor.reference()
245                    );
246                }
247            }
248        } else {
249            let mut committed = HashSet::new();
250            assert!(committed.insert(leader_block_ref));
251
252            while let Some(x) = buffer.pop() {
253                to_commit.push(x.clone());
254
255                let ancestors: Vec<VerifiedBlock> = dag_state
256                    .get_blocks(
257                        &x.ancestors()
258                            .iter()
259                            .copied()
260                            .filter(|ancestor| {
261                                // We skip the block if we already committed it or we reached a
262                                // round that we already committed.
263                                // TODO: for Fast Path we need to amend the recursion rule here and
264                                // allow us to commit blocks all the way up to the `gc_round`.
265                                // Some additional work will be needed to make sure that we keep the
266                                // uncommitted blocks up to the `gc_round` across commits.
267                                !committed.contains(ancestor)
268                                    && last_committed_rounds[ancestor.author] < ancestor.round
269                            })
270                            .filter(|ancestor| {
271                                // Keep the block if GC is not enabled or it is enabled and the
272                                // block is above the gc_round. We do this
273                                // to stop the recursion early and avoid going to deep when it's
274                                // unnecessary.
275                                !gc_enabled || ancestor.round > gc_round
276                            })
277                            .collect::<Vec<_>>(),
278                    )
279                    .into_iter()
280                    .map(|ancestor_opt| {
281                        ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
282                    })
283                    .collect();
284
285                for ancestor in ancestors {
286                    buffer.push(ancestor.clone());
287                    assert!(committed.insert(ancestor.reference()));
288                }
289            }
290        }
291
292        // The above code should have not yielded any blocks that are <= gc_round, but
293        // just to make sure that we'll never commit anything that should be
294        // garbage collected we attempt to prune here as well.
295        if gc_enabled {
296            assert!(
297                to_commit.iter().all(|block| block.round() > gc_round),
298                "No blocks <= {gc_round} should be committed. Leader round {leader_block_ref}, blocks {to_commit:?}."
299            );
300        }
301
302        // Sort the blocks of the sub-dag blocks
303        sort_sub_dag_blocks(&mut to_commit);
304
305        to_commit
306    }
307
308    // This function should be called whenever a new commit is observed. This will
309    // iterate over the sequence of committed leaders and produce a list of
310    // committed sub-dags.
311    #[instrument(level = "trace", skip_all)]
312    pub(crate) fn handle_commit(
313        &mut self,
314        committed_leaders: Vec<VerifiedBlock>,
315    ) -> Vec<CommittedSubDag> {
316        if committed_leaders.is_empty() {
317            return vec![];
318        }
319
320        // We check whether the leader schedule has been updated. If yes, then we'll
321        // send the scores as part of the first sub dag.
322        let schedule_updated = self
323            .leader_schedule
324            .leader_schedule_updated(&self.dag_state);
325
326        let mut committed_sub_dags = vec![];
327        for (i, leader_block) in committed_leaders.into_iter().enumerate() {
328            let reputation_scores_desc = if schedule_updated && i == 0 {
329                self.leader_schedule
330                    .leader_swap_table
331                    .read()
332                    .reputation_scores_desc
333                    .clone()
334            } else {
335                vec![]
336            };
337
338            // Collect the sub-dag generated using each of these leaders and the
339            // corresponding commit.
340            let (sub_dag, commit) =
341                self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);
342
343            self.update_blocks_pruned_metric(&sub_dag);
344
345            // Buffer commit in dag state for persistence later.
346            // This also updates the last committed rounds.
347            self.dag_state.write().add_commit(commit.clone());
348
349            committed_sub_dags.push(sub_dag);
350        }
351
352        // Committed blocks must be persisted to storage before sending them to IOTA and
353        // executing their transactions.
354        // Commit metadata can be persisted more lazily because they are recoverable.
355        // Uncommitted blocks can wait to persist too.
356        // But for simplicity, all unpersisted blocks and commits are flushed to
357        // storage.
358        self.dag_state.write().flush();
359
360        committed_sub_dags
361    }
362
363    // Try to measure the number of blocks that get pruned due to GC. This is not
364    // very accurate, but it can give us a good enough idea. We consider a block
365    // as pruned when it is an ancestor of a block that has been committed as part
366    // of the provided `sub_dag`, but it has not been committed as part of
367    // previous commits. Right now we measure this via checking that highest
368    // committed round for the authority as we don't an efficient look up
369    // functionality to check if a block has been committed or not.
370    fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
371        let (last_committed_rounds, gc_round) = {
372            let dag_state = self.dag_state.read();
373            (dag_state.last_committed_rounds(), dag_state.gc_round())
374        };
375
376        for block_ref in sub_dag
377            .blocks
378            .iter()
379            .flat_map(|block| block.ancestors())
380            .filter(
381                |ancestor_ref| {
382                    ancestor_ref.round <= gc_round
383                        && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
384                }, /* If the last committed round is the same as the pruned block's round, then
385                    * we know for sure that it has been committed and it doesn't count here
386                    * as pruned block. */
387            )
388            .unique()
389        {
390            let hostname = &self.context.committee.authority(block_ref.author).hostname;
391
392            // If the last committed round from this authority is lower than the pruned
393            // ancestor in question, then we know for sure that it has not been committed.
394            let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
395                &[hostname, "uncommitted"]
396            } else {
397                // If last committed round is higher for this authority, then we don't really
398                // know it's status, but we know that there is a higher committed block from
399                // this authority.
400                &[hostname, "higher_committed"]
401            };
402
403            self.context
404                .metrics
405                .node_metrics
406                .blocks_pruned_on_commit
407                .with_label_values(label_values)
408                .inc();
409        }
410    }
411}
412
413/// Computes the median timestamp of the blocks weighted by the stake of their
414/// authorities. This function assumes each block comes from a different
415/// authority of the same round. Error is returned if no blocks are provided or
416/// total stake is less than quorum threshold.
417pub(crate) fn median_timestamp_by_stake(
418    context: &Context,
419    blocks: impl Iterator<Item = VerifiedBlock>,
420) -> Result<BlockTimestampMs, String> {
421    let mut total_stake = 0;
422    let mut timestamps = vec![];
423    for block in blocks {
424        let stake = context.committee.authority(block.author()).stake;
425        timestamps.push((block.timestamp_ms(), stake));
426        total_stake += stake;
427    }
428
429    if timestamps.is_empty() {
430        return Err("No blocks provided".to_string());
431    }
432    if total_stake < context.committee.quorum_threshold() {
433        return Err(format!(
434            "Total stake {} < quorum threshold {}",
435            total_stake,
436            context.committee.quorum_threshold()
437        )
438        .to_string());
439    }
440
441    Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
442}
443
444fn median_timestamps_by_stake_inner(
445    mut timestamps: Vec<(BlockTimestampMs, Stake)>,
446    total_stake: Stake,
447) -> BlockTimestampMs {
448    timestamps.sort_by_key(|(ts, _)| *ts);
449
450    let mut cumulative_stake = 0;
451    for (ts, stake) in &timestamps {
452        cumulative_stake += stake;
453        if cumulative_stake > total_stake / 2 {
454            return *ts;
455        }
456    }
457
458    timestamps.last().unwrap().0
459}
460
461#[cfg(test)]
462mod tests {
463    use rstest::rstest;
464
465    use super::*;
466    use crate::{
467        CommitIndex, TestBlock,
468        commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
469        context::Context,
470        leader_schedule::{LeaderSchedule, LeaderSwapTable},
471        storage::mem_store::MemStore,
472        test_dag_builder::DagBuilder,
473        test_dag_parser::parse_dag,
474    };
475
476    #[rstest]
477    #[tokio::test]
478    async fn test_handle_commit(#[values(true, false)] consensus_median_timestamp: bool) {
479        telemetry_subscribers::init_for_testing();
480        let num_authorities = 4;
481        let (mut context, _keys) = Context::new_for_test(num_authorities);
482        context
483            .protocol_config
484            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
485                consensus_median_timestamp,
486            );
487
488        let context = Arc::new(context);
489
490        let dag_state = Arc::new(RwLock::new(DagState::new(
491            context.clone(),
492            Arc::new(MemStore::new()),
493        )));
494        let leader_schedule = Arc::new(LeaderSchedule::new(
495            context.clone(),
496            LeaderSwapTable::default(),
497        ));
498        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
499
500        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
501        let num_rounds: u32 = 10;
502        let mut dag_builder = DagBuilder::new(context.clone());
503        dag_builder
504            .layers(1..=num_rounds)
505            .build()
506            .persist_layers(dag_state.clone());
507
508        let leaders = dag_builder
509            .leader_blocks(1..=num_rounds)
510            .into_iter()
511            .map(Option::unwrap)
512            .collect::<Vec<_>>();
513
514        let commits = linearizer.handle_commit(leaders.clone());
515        for (idx, subdag) in commits.into_iter().enumerate() {
516            tracing::info!("{subdag:?}");
517            assert_eq!(subdag.leader, leaders[idx].reference());
518
519            let expected_ts = if consensus_median_timestamp {
520                let block_refs = leaders[idx]
521                    .ancestors()
522                    .iter()
523                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
524                    .cloned()
525                    .collect::<Vec<_>>();
526                let blocks = dag_state
527                    .read()
528                    .get_blocks(&block_refs)
529                    .into_iter()
530                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
531
532                median_timestamp_by_stake(&context, blocks).unwrap()
533            } else {
534                leaders[idx].timestamp_ms()
535            };
536            assert_eq!(subdag.timestamp_ms, expected_ts);
537
538            if idx == 0 {
539                // First subdag includes the leader block only
540                assert_eq!(subdag.blocks.len(), 1);
541            } else {
542                // Every subdag after will be missing the leader block from the previous
543                // committed subdag
544                assert_eq!(subdag.blocks.len(), num_authorities);
545            }
546            for block in subdag.blocks.iter() {
547                assert!(block.round() <= leaders[idx].round());
548            }
549            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
550        }
551    }
552
553    #[tokio::test]
554    async fn test_handle_commit_with_schedule_update() {
555        telemetry_subscribers::init_for_testing();
556        let num_authorities = 4;
557        let context = Arc::new(Context::new_for_test(num_authorities).0);
558        let dag_state = Arc::new(RwLock::new(DagState::new(
559            context.clone(),
560            Arc::new(MemStore::new()),
561        )));
562        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
563        let leader_schedule = Arc::new(
564            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
565                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
566        );
567        let mut linearizer =
568            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
569
570        // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
571        let num_rounds: u32 = 20;
572        let mut dag_builder = DagBuilder::new(context.clone());
573        dag_builder
574            .layers(1..=num_rounds)
575            .build()
576            .persist_layers(dag_state.clone());
577
578        // Take the first 10 leaders
579        let leaders = dag_builder
580            .leader_blocks(1..=10)
581            .into_iter()
582            .map(Option::unwrap)
583            .collect::<Vec<_>>();
584
585        // Create some commits
586        let commits = linearizer.handle_commit(leaders.clone());
587
588        // Write them in DagState
589        dag_state.write().add_scoring_subdags(commits);
590        // Now update the leader schedule
591        leader_schedule.update_leader_schedule_v2(&dag_state);
592        assert!(
593            leader_schedule.leader_schedule_updated(&dag_state),
594            "Leader schedule should have been updated"
595        );
596
597        // Try to commit now the rest of the 10 leaders
598        let leaders = dag_builder
599            .leader_blocks(11..=20)
600            .into_iter()
601            .map(Option::unwrap)
602            .collect::<Vec<_>>();
603
604        // Now on the commits only the first one should contain the updated scores, the
605        // other should be empty
606        let commits = linearizer.handle_commit(leaders.clone());
607        assert_eq!(commits.len(), 10);
608        let scores = vec![
609            (AuthorityIndex::new_for_test(1), 29),
610            (AuthorityIndex::new_for_test(0), 29),
611            (AuthorityIndex::new_for_test(3), 29),
612            (AuthorityIndex::new_for_test(2), 29),
613        ];
614        assert_eq!(commits[0].reputation_scores_desc, scores);
615        for commit in commits.into_iter().skip(1) {
616            assert_eq!(commit.reputation_scores_desc, vec![]);
617        }
618    }
619
620    // TODO: Remove when DistributedVoteScoring is enabled.
621    #[tokio::test]
622    async fn test_handle_commit_with_schedule_update_with_unscored_subdags() {
623        telemetry_subscribers::init_for_testing();
624        let num_authorities = 4;
625        let context = Arc::new(Context::new_for_test(num_authorities).0);
626        let dag_state = Arc::new(RwLock::new(DagState::new(
627            context.clone(),
628            Arc::new(MemStore::new()),
629        )));
630        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
631        let leader_schedule = Arc::new(
632            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
633                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
634        );
635        let mut linearizer =
636            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
637
638        // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
639        let num_rounds: u32 = 20;
640        let mut dag_builder = DagBuilder::new(context.clone());
641        dag_builder
642            .layers(1..=num_rounds)
643            .build()
644            .persist_layers(dag_state.clone());
645
646        // Take the first 10 leaders
647        let leaders = dag_builder
648            .leader_blocks(1..=10)
649            .into_iter()
650            .map(Option::unwrap)
651            .collect::<Vec<_>>();
652
653        // Create some commits
654        let commits = linearizer.handle_commit(leaders.clone());
655
656        // Write them in DagState
657        dag_state.write().add_unscored_committed_subdags(commits);
658
659        // Now update the leader schedule
660        leader_schedule.update_leader_schedule_v1(&dag_state);
661
662        assert!(
663            leader_schedule.leader_schedule_updated(&dag_state),
664            "Leader schedule should have been updated"
665        );
666
667        // Try to commit now the rest of the 10 leaders
668        let leaders = dag_builder
669            .leader_blocks(11..=20)
670            .into_iter()
671            .map(Option::unwrap)
672            .collect::<Vec<_>>();
673
674        // Now on the commits only the first one should contain the updated scores, the
675        // other should be empty
676        let commits = linearizer.handle_commit(leaders.clone());
677        assert_eq!(commits.len(), 10);
678        let scores = vec![
679            (AuthorityIndex::new_for_test(2), 9),
680            (AuthorityIndex::new_for_test(1), 8),
681            (AuthorityIndex::new_for_test(0), 8),
682            (AuthorityIndex::new_for_test(3), 8),
683        ];
684        assert_eq!(commits[0].reputation_scores_desc, scores);
685
686        for commit in commits.into_iter().skip(1) {
687            assert_eq!(commit.reputation_scores_desc, vec![]);
688        }
689    }
690
691    #[rstest]
692    #[tokio::test]
693    async fn test_handle_already_committed(
694        #[values(true, false)] consensus_median_timestamp: bool,
695    ) {
696        telemetry_subscribers::init_for_testing();
697        let num_authorities = 4;
698        let (mut context, _) = Context::new_for_test(num_authorities);
699        context
700            .protocol_config
701            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
702                consensus_median_timestamp,
703            );
704
705        let context = Arc::new(context);
706
707        let dag_state = Arc::new(RwLock::new(DagState::new(
708            context.clone(),
709            Arc::new(MemStore::new()),
710        )));
711        let leader_schedule = Arc::new(LeaderSchedule::new(
712            context.clone(),
713            LeaderSwapTable::default(),
714        ));
715        let mut linearizer =
716            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
717        let wave_length = DEFAULT_WAVE_LENGTH;
718
719        let leader_round_wave_1 = 3;
720        let leader_round_wave_2 = leader_round_wave_1 + wave_length;
721
722        // Build a Dag from round 1..=6
723        let mut dag_builder = DagBuilder::new(context.clone());
724        dag_builder.layers(1..=leader_round_wave_2).build();
725
726        // Now retrieve all the blocks up to round leader_round_wave_1 - 1
727        // And then only the leader of round leader_round_wave_1
728        // Also store those to DagState
729        let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
730        blocks.push(
731            dag_builder
732                .leader_block(leader_round_wave_1)
733                .expect("Leader block should have been found"),
734        );
735        dag_state.write().accept_blocks(blocks.clone());
736
737        let first_leader = dag_builder
738            .leader_block(leader_round_wave_1)
739            .expect("Wave 1 leader round block should exist");
740        let mut last_commit_index = 1;
741        let first_commit_data = TrustedCommit::new_for_test(
742            last_commit_index,
743            CommitDigest::MIN,
744            0,
745            first_leader.reference(),
746            blocks.iter().map(|block| block.reference()).collect(),
747        );
748        dag_state.write().add_commit(first_commit_data);
749
750        // Mark the blocks as committed in DagState. This will allow to correctly detect
751        // the committed blocks when the new linearizer logic is enabled.
752        for block in blocks.iter() {
753            dag_state.write().set_committed(&block.reference());
754        }
755
756        // Now take all the blocks from round `leader_round_wave_1` up to round
757        // `leader_round_wave_2-1`
758        let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
759        // Filter out leader block of round `leader_round_wave_1`
760        blocks.retain(|block| {
761            !(block.round() == leader_round_wave_1
762                && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
763        });
764        // Add the leader block of round `leader_round_wave_2`
765        blocks.push(
766            dag_builder
767                .leader_block(leader_round_wave_2)
768                .expect("Leader block should have been found"),
769        );
770        // Write them in dag state
771        dag_state.write().accept_blocks(blocks.clone());
772
773        let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
774
775        // Now get the latest leader which is the leader round of wave 2
776        let leader = dag_builder
777            .leader_block(leader_round_wave_2)
778            .expect("Leader block should exist");
779
780        last_commit_index += 1;
781        let expected_second_commit = TrustedCommit::new_for_test(
782            last_commit_index,
783            CommitDigest::MIN,
784            0,
785            leader.reference(),
786            blocks.clone(),
787        );
788
789        let commit = linearizer.handle_commit(vec![leader.clone()]);
790        assert_eq!(commit.len(), 1);
791
792        let subdag = &commit[0];
793        tracing::info!("{subdag:?}");
794        assert_eq!(subdag.leader, leader.reference());
795        assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
796
797        let expected_ts = if consensus_median_timestamp {
798            median_timestamp_by_stake(
799                &context,
800                subdag.blocks.iter().filter_map(|block| {
801                    if block.round() == subdag.leader.round - 1 {
802                        Some(block.clone())
803                    } else {
804                        None
805                    }
806                }),
807            )
808            .unwrap()
809        } else {
810            leader.timestamp_ms()
811        };
812        assert_eq!(subdag.timestamp_ms, expected_ts);
813
814        // Using the same sorting as used in CommittedSubDag::sort
815        blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
816        assert_eq!(
817            subdag
818                .blocks
819                .clone()
820                .into_iter()
821                .map(|b| b.reference())
822                .collect::<Vec<_>>(),
823            blocks
824        );
825        for block in subdag.blocks.iter() {
826            assert!(block.round() <= expected_second_commit.leader().round);
827        }
828    }
829
830    /// This test will run the linearizer with GC disabled (gc_depth = 0) and gc
831    /// enabled (gc_depth = 3) and make sure that for the exact same DAG the
832    /// linearizer will commit different blocks according to the rules.
833    #[rstest]
834    #[case(0, false)]
835    #[case(3, false)]
836    #[case(3, true)]
837    #[tokio::test]
838    async fn test_handle_commit_with_gc_simple(
839        #[case] gc_depth: u32,
840        #[case] consensus_median_timestamp: bool,
841    ) {
842        telemetry_subscribers::init_for_testing();
843
844        let num_authorities = 4;
845        let (mut context, _keys) = Context::new_for_test(num_authorities);
846        context.protocol_config.set_gc_depth_for_testing(gc_depth);
847        context
848            .protocol_config
849            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
850                consensus_median_timestamp,
851            );
852        if gc_depth == 0 {
853            context
854                .protocol_config
855                .set_consensus_linearize_subdag_v2_for_testing(false);
856        }
857
858        let context = Arc::new(context);
859        let dag_state = Arc::new(RwLock::new(DagState::new(
860            context.clone(),
861            Arc::new(MemStore::new()),
862        )));
863        let leader_schedule = Arc::new(LeaderSchedule::new(
864            context.clone(),
865            LeaderSwapTable::default(),
866        ));
867        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
868
869        // Authorities of index 0->2 will always creates blocks that see each other, but
870        // until round 5 they won't see the blocks of authority 3. For authority
871        // 3 we create blocks that connect to all the other authorities.
872        // On round 5 we finally make the other authorities see the blocks of authority
873        // 3. Practically we "simulate" here a long chain created by authority 3
874        // that is visible in round 5, but due to GC blocks of only round >=2 will
875        // be committed, when GC is enabled. When GC is disabled all blocks will be
876        // committed for rounds >= 1.
877        let dag_str = "DAG {
878                Round 0 : { 4 },
879                Round 1 : { * },
880                Round 2 : {
881                    A -> [-D1],
882                    B -> [-D1],
883                    C -> [-D1],
884                    D -> [*],
885                },
886                Round 3 : {
887                    A -> [-D2],
888                    B -> [-D2],
889                    C -> [-D2],
890                },
891                Round 4 : {
892                    A -> [-D3],
893                    B -> [-D3],
894                    C -> [-D3],
895                    D -> [A3, B3, C3, D2],
896                },
897                Round 5 : { * },
898            }";
899
900        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
901        dag_builder.print();
902        dag_builder.persist_all_blocks(dag_state.clone());
903
904        let leaders = dag_builder
905            .leader_blocks(1..=6)
906            .into_iter()
907            .flatten()
908            .collect::<Vec<_>>();
909
910        let commits = linearizer.handle_commit(leaders.clone());
911        for (idx, subdag) in commits.into_iter().enumerate() {
912            tracing::info!("{subdag:?}");
913            assert_eq!(subdag.leader, leaders[idx].reference());
914
915            let expected_ts = if consensus_median_timestamp {
916                let block_refs = leaders[idx]
917                    .ancestors()
918                    .iter()
919                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
920                    .cloned()
921                    .collect::<Vec<_>>();
922                let blocks = dag_state
923                    .read()
924                    .get_blocks(&block_refs)
925                    .into_iter()
926                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
927
928                median_timestamp_by_stake(&context, blocks).unwrap()
929            } else {
930                leaders[idx].timestamp_ms()
931            };
932            assert_eq!(subdag.timestamp_ms, expected_ts);
933
934            if idx == 0 {
935                // First subdag includes the leader block only
936                assert_eq!(subdag.blocks.len(), 1);
937            } else if idx == 1 {
938                assert_eq!(subdag.blocks.len(), 3);
939            } else if idx == 2 {
940                // We commit:
941                // * 1 block on round 4, the leader block
942                // * 3 blocks on round 3, as no commit happened on round 3 since the leader was
943                //   missing
944                // * 2 blocks on round 2, again as no commit happened on round 3, we commit the
945                //   "sub dag" of leader of round 3, which will be another 2 blocks
946                assert_eq!(subdag.blocks.len(), 6);
947            } else {
948                // GC is enabled, so we expect to see only blocks of round >= 2
949                if gc_depth > 0 {
950                    // Now it's going to be the first time that a leader will see the blocks of
951                    // authority 3 and will attempt to commit the long chain.
952                    // However, due to GC it will only commit blocks of round > 1. That's because it
953                    // will commit blocks up to previous leader's round (round =
954                    // 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
955                    // to see on the sub dag committed blocks of round >= 2.
956                    assert_eq!(subdag.blocks.len(), 5);
957
958                    assert!(
959                        subdag.blocks.iter().all(|block| block.round() >= 2),
960                        "Found blocks that are of round < 2."
961                    );
962
963                    // Also ensure that gc_round has advanced with the latest committed leader
964                    assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
965                } else {
966                    // GC is disabled, so we expect to see all blocks of round >= 1
967                    assert_eq!(subdag.blocks.len(), 6);
968                    assert!(
969                        subdag.blocks.iter().all(|block| block.round() >= 1),
970                        "Found blocks that are of round < 1."
971                    );
972
973                    // GC round should never have moved
974                    assert_eq!(dag_state.read().gc_round(), 0);
975                }
976            }
977            for block in subdag.blocks.iter() {
978                assert!(block.round() <= leaders[idx].round());
979            }
980            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
981        }
982    }
983
984    #[rstest]
985    #[case(3, false)]
986    #[case(3, true)]
987    #[tokio::test]
988    async fn test_handle_commit_below_highest_committed_round(
989        #[case] gc_depth: u32,
990        #[case] consensus_median_timestamp: bool,
991    ) {
992        telemetry_subscribers::init_for_testing();
993
994        let num_authorities = 4;
995        let (mut context, _keys) = Context::new_for_test(num_authorities);
996        context
997            .protocol_config
998            .set_consensus_gc_depth_for_testing(gc_depth);
999        context
1000            .protocol_config
1001            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1002                consensus_median_timestamp,
1003            );
1004        context
1005            .protocol_config
1006            .set_consensus_linearize_subdag_v2_for_testing(true);
1007
1008        let context = Arc::new(context);
1009        let dag_state = Arc::new(RwLock::new(DagState::new(
1010            context.clone(),
1011            Arc::new(MemStore::new()),
1012        )));
1013        let leader_schedule = Arc::new(LeaderSchedule::new(
1014            context.clone(),
1015            LeaderSwapTable::default(),
1016        ));
1017        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
1018
1019        // Authority D will create an "orphaned" block on round 1 as it won't reference
1020        // to it on the block of round 2. Similar, no other authority will reference to
1021        // it on round 2. Then on round 3 the authorities A, B & C will link to
1022        // block D1. Once the DAG gets committed we should see the block D1 getting
1023        // committed as well. Normally ,as block D2 would have been committed
1024        // first block D1 should be omitted. With the new logic this is no longer true.
1025        let dag_str = "DAG {
1026                Round 0 : { 4 },
1027                Round 1 : { * },
1028                Round 2 : {
1029                    A -> [-D1],
1030                    B -> [-D1],
1031                    C -> [-D1],
1032                    D -> [-D1],
1033                },
1034                Round 3 : {
1035                    A -> [A2, B2, C2, D1],
1036                    B -> [A2, B2, C2, D1],
1037                    C -> [A2, B2, C2, D1],
1038                    D -> [A2, B2, C2, D2]
1039                },
1040                Round 4 : { * },
1041            }";
1042
1043        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1044        dag_builder.print();
1045        dag_builder.persist_all_blocks(dag_state.clone());
1046
1047        let leaders = dag_builder
1048            .leader_blocks(1..=4)
1049            .into_iter()
1050            .flatten()
1051            .collect::<Vec<_>>();
1052
1053        let commits = linearizer.handle_commit(leaders.clone());
1054        for (idx, subdag) in commits.into_iter().enumerate() {
1055            tracing::info!("{subdag:?}");
1056            assert_eq!(subdag.leader, leaders[idx].reference());
1057
1058            let expected_ts = if consensus_median_timestamp {
1059                let block_refs = leaders[idx]
1060                    .ancestors()
1061                    .iter()
1062                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
1063                    .cloned()
1064                    .collect::<Vec<_>>();
1065                let blocks = dag_state
1066                    .read()
1067                    .get_blocks(&block_refs)
1068                    .into_iter()
1069                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
1070
1071                median_timestamp_by_stake(&context, blocks).unwrap()
1072            } else {
1073                leaders[idx].timestamp_ms()
1074            };
1075            assert_eq!(subdag.timestamp_ms, expected_ts);
1076
1077            if idx == 0 {
1078                // First subdag includes the leader block only B1
1079                assert_eq!(subdag.blocks.len(), 1);
1080            } else if idx == 1 {
1081                // We commit:
1082                // * 1 block on round 2, the leader block C2
1083                // * 2 blocks on round 1, A1, C1
1084                assert_eq!(subdag.blocks.len(), 3);
1085            } else if idx == 2 {
1086                // We commit:
1087                // * 1 block on round 3, the leader block D3
1088                // * 3 blocks on round 2, A2, B2, D2
1089                assert_eq!(subdag.blocks.len(), 4);
1090
1091                assert!(
1092                    subdag.blocks.iter().any(|block| block.round() == 2
1093                        && block.author() == AuthorityIndex::new_for_test(3)),
1094                    "Block D2 should have been committed."
1095                );
1096            } else if idx == 3 {
1097                // We commit:
1098                // * 1 block on round 4, the leader block A4
1099                // * 3 blocks on round 3, A3, B3, C3
1100                // * 1 block of round 1, D1
1101                assert_eq!(subdag.blocks.len(), 5);
1102                assert!(
1103                    subdag.blocks.iter().any(|block| block.round() == 1
1104                        && block.author() == AuthorityIndex::new_for_test(3)),
1105                    "Block D1 should have been committed."
1106                );
1107            } else {
1108                panic!("Unexpected subdag with index {idx:?}");
1109            }
1110
1111            for block in subdag.blocks.iter() {
1112                assert!(block.round() <= leaders[idx].round());
1113            }
1114            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
1115        }
1116    }
1117
1118    #[rstest]
1119    #[case(false, 5_000, 5_000, 6_000)]
1120    #[case(true, 3_000, 3_000, 6_000)]
1121    #[tokio::test]
1122    async fn test_calculate_commit_timestamp(
1123        #[case] consensus_median_timestamp: bool,
1124        #[case] timestamp_1: u64,
1125        #[case] timestamp_2: u64,
1126        #[case] timestamp_3: u64,
1127    ) {
1128        // GIVEN
1129        telemetry_subscribers::init_for_testing();
1130
1131        let num_authorities = 4;
1132        let (mut context, _keys) = Context::new_for_test(num_authorities);
1133
1134        context
1135            .protocol_config
1136            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1137                consensus_median_timestamp,
1138            );
1139
1140        let context = Arc::new(context);
1141        let store = Arc::new(MemStore::new());
1142        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1143        let mut dag_state = dag_state.write();
1144
1145        let ancestors = vec![
1146            VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
1147            VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
1148            VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
1149            VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
1150        ];
1151
1152        let leader_block = VerifiedBlock::new_for_test(
1153            TestBlock::new(5, 0)
1154                .set_timestamp_ms(5_000)
1155                .set_ancestors(
1156                    ancestors
1157                        .iter()
1158                        .map(|block| block.reference())
1159                        .collect::<Vec<_>>(),
1160                )
1161                .build(),
1162        );
1163
1164        for block in &ancestors {
1165            dag_state.accept_block(block.clone());
1166        }
1167
1168        let last_commit_timestamp_ms = 0;
1169
1170        // WHEN
1171        let timestamp = Linearizer::calculate_commit_timestamp(
1172            &context,
1173            &mut dag_state,
1174            &leader_block,
1175            last_commit_timestamp_ms,
1176        );
1177        assert_eq!(timestamp, timestamp_1);
1178
1179        // AND skip the block of authority 0 and round 4.
1180        let leader_block = VerifiedBlock::new_for_test(
1181            TestBlock::new(5, 0)
1182                .set_timestamp_ms(5_000)
1183                .set_ancestors(
1184                    ancestors
1185                        .iter()
1186                        .skip(1)
1187                        .map(|block| block.reference())
1188                        .collect::<Vec<_>>(),
1189                )
1190                .build(),
1191        );
1192
1193        let timestamp = Linearizer::calculate_commit_timestamp(
1194            &context,
1195            &mut dag_state,
1196            &leader_block,
1197            last_commit_timestamp_ms,
1198        );
1199        assert_eq!(timestamp, timestamp_2);
1200
1201        // AND set the `last_commit_timestamp_ms` to 6_000
1202        let last_commit_timestamp_ms = 6_000;
1203        let timestamp = Linearizer::calculate_commit_timestamp(
1204            &context,
1205            &mut dag_state,
1206            &leader_block,
1207            last_commit_timestamp_ms,
1208        );
1209        assert_eq!(timestamp, timestamp_3);
1210
1211        // AND there is only one ancestor block to commit
1212        let (mut context, _) = Context::new_for_test(1);
1213        context
1214            .protocol_config
1215            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
1216                consensus_median_timestamp,
1217            );
1218        let leader_block = VerifiedBlock::new_for_test(
1219            TestBlock::new(5, 0)
1220                .set_timestamp_ms(5_000)
1221                .set_ancestors(
1222                    ancestors
1223                        .iter()
1224                        .take(1)
1225                        .map(|block| block.reference())
1226                        .collect::<Vec<_>>(),
1227                )
1228                .build(),
1229        );
1230        let last_commit_timestamp_ms = 0;
1231        let timestamp = Linearizer::calculate_commit_timestamp(
1232            &context,
1233            &mut dag_state,
1234            &leader_block,
1235            last_commit_timestamp_ms,
1236        );
1237        if consensus_median_timestamp {
1238            assert_eq!(timestamp, 1_000);
1239        } else {
1240            assert_eq!(timestamp, leader_block.timestamp_ms());
1241        }
1242    }
1243
1244    #[test]
1245    fn test_median_timestamps_by_stake() {
1246        // One total stake.
1247        let timestamps = vec![(1_000, 1)];
1248        assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
1249
1250        // Odd number of total stakes.
1251        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
1252        assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
1253
1254        // Even number of total stakes.
1255        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
1256        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
1257
1258        // Even number of total stakes, different order.
1259        let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
1260        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
1261
1262        // Unequal stakes.
1263        let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
1264        assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
1265
1266        // Unequal stakes.
1267        let timestamps = vec![
1268            (500, 2),
1269            (4_000, 2),
1270            (2_500, 3),
1271            (1_000, 5),
1272            (3_000, 3),
1273            (2_000, 4),
1274        ];
1275        assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
1276
1277        // One authority dominates.
1278        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
1279        assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
1280    }
1281
1282    #[tokio::test]
1283    async fn test_median_timestamps_by_stake_errors() {
1284        let num_authorities = 4;
1285        let (mut context, _keys) = Context::new_for_test(num_authorities);
1286        context
1287            .protocol_config
1288            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(true);
1289
1290        let context = Arc::new(context);
1291
1292        // No blocks provided
1293        let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
1294        assert_eq!(err, "No blocks provided");
1295
1296        // Blocks provided but total stake is less than quorum threshold
1297        let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
1298        let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
1299        assert_eq!(err, "Total stake 1 < quorum threshold 3");
1300    }
1301}