consensus_core/
linearizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc};
6
7use consensus_config::AuthorityIndex;
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12    Round,
13    block::{BlockAPI, BlockRef, VerifiedBlock},
14    commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
15    context::Context,
16    dag_state::DagState,
17    leader_schedule::LeaderSchedule,
18};
19
20/// The `StorageAPI` trait provides an interface for the block store and has
21/// been mostly introduced for allowing to inject the test store in
22/// `DagBuilder`.
23pub(crate) trait BlockStoreAPI {
24    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
25
26    fn gc_round(&self) -> Round;
27
28    fn gc_enabled(&self) -> bool;
29
30    fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
31
32    fn is_committed(&self, block_ref: &BlockRef) -> bool;
33}
34
35impl BlockStoreAPI
36    for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
37{
38    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
39        DagState::get_blocks(self, refs)
40    }
41
42    fn gc_round(&self) -> Round {
43        DagState::gc_round(self)
44    }
45
46    fn gc_enabled(&self) -> bool {
47        DagState::gc_enabled(self)
48    }
49
50    fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
51        DagState::set_committed(self, block_ref)
52    }
53
54    fn is_committed(&self, block_ref: &BlockRef) -> bool {
55        DagState::is_committed(self, block_ref)
56    }
57}
58
59/// Expand a committed sequence of leader into a sequence of sub-dags.
60#[derive(Clone)]
61pub(crate) struct Linearizer {
62    /// In memory block store representing the dag state
63    context: Arc<Context>,
64    dag_state: Arc<RwLock<DagState>>,
65    leader_schedule: Arc<LeaderSchedule>,
66}
67
68impl Linearizer {
69    pub(crate) fn new(
70        context: Arc<Context>,
71        dag_state: Arc<RwLock<DagState>>,
72        leader_schedule: Arc<LeaderSchedule>,
73    ) -> Self {
74        Self {
75            dag_state,
76            leader_schedule,
77            context,
78        }
79    }
80
81    /// Collect the sub-dag and the corresponding commit from a specific leader
82    /// excluding any duplicates or blocks that have already been committed
83    /// (within previous sub-dags).
84    fn collect_sub_dag_and_commit(
85        &mut self,
86        leader_block: VerifiedBlock,
87        reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
88    ) -> (CommittedSubDag, TrustedCommit) {
89        let _s = self
90            .context
91            .metrics
92            .node_metrics
93            .scope_processing_time
94            .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
95            .start_timer();
96        // Grab latest commit state from dag state
97        let mut dag_state = self.dag_state.write();
98        let last_commit_index = dag_state.last_commit_index();
99        let last_commit_digest = dag_state.last_commit_digest();
100        let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
101        let last_committed_rounds = dag_state.last_committed_rounds();
102        let timestamp_ms = leader_block.timestamp_ms().max(last_commit_timestamp_ms);
103
104        // Now linearize the sub-dag starting from the leader block
105        let to_commit = Self::linearize_sub_dag(
106            &self.context,
107            leader_block.clone(),
108            last_committed_rounds,
109            &mut dag_state,
110        );
111
112        drop(dag_state);
113
114        // Create the Commit.
115        let commit = Commit::new(
116            last_commit_index + 1,
117            last_commit_digest,
118            timestamp_ms,
119            leader_block.reference(),
120            to_commit
121                .iter()
122                .map(|block| block.reference())
123                .collect::<Vec<_>>(),
124        );
125        let serialized = commit
126            .serialize()
127            .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
128        let commit = TrustedCommit::new_trusted(commit, serialized);
129
130        // Create the corresponding committed sub dag
131        let sub_dag = CommittedSubDag::new(
132            leader_block.reference(),
133            to_commit,
134            timestamp_ms,
135            commit.reference(),
136            reputation_scores_desc,
137        );
138
139        (sub_dag, commit)
140    }
141
142    pub(crate) fn linearize_sub_dag(
143        context: &Context,
144        leader_block: VerifiedBlock,
145        last_committed_rounds: Vec<u32>,
146        dag_state: &mut impl BlockStoreAPI,
147    ) -> Vec<VerifiedBlock> {
148        let gc_enabled = dag_state.gc_enabled();
149        // The GC round here is calculated based on the last committed round of the
150        // leader block. The algorithm will attempt to commit blocks up to this
151        // GC round. Once this commit has been processed and written to DagState, then
152        // gc round will update and on the processing of the next commit we'll
153        // have it already updated, so no need to do any gc_round recalculations here.
154        // We just use whatever is currently in DagState.
155        let gc_round: Round = dag_state.gc_round();
156        let leader_block_ref = leader_block.reference();
157        let mut buffer = vec![leader_block];
158
159        let mut to_commit = Vec::new();
160
161        // The new logic will perform the recursion without stopping at the highest
162        // round round that has been committed per authority. Instead it will
163        // allow to commit blocks that are lower than the highest committed round for an
164        // authority but higher than gc_round.
165        if context.protocol_config.consensus_linearize_subdag_v2() {
166            assert!(
167                dag_state.set_committed(&leader_block_ref),
168                "Leader block with reference {:?} attempted to be committed twice",
169                leader_block_ref
170            );
171
172            while let Some(x) = buffer.pop() {
173                to_commit.push(x.clone());
174
175                let ancestors: Vec<VerifiedBlock> = dag_state
176                    .get_blocks(
177                        &x.ancestors()
178                            .iter()
179                            .copied()
180                            .filter(|ancestor| {
181                                ancestor.round > gc_round && !dag_state.is_committed(ancestor)
182                            })
183                            .collect::<Vec<_>>(),
184                    )
185                    .into_iter()
186                    .map(|ancestor_opt| {
187                        ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
188                    })
189                    .collect();
190
191                for ancestor in ancestors {
192                    buffer.push(ancestor.clone());
193                    assert!(
194                        dag_state.set_committed(&ancestor.reference()),
195                        "Block with reference {:?} attempted to be committed twice",
196                        ancestor.reference()
197                    );
198                }
199            }
200        } else {
201            let mut committed = HashSet::new();
202            assert!(committed.insert(leader_block_ref));
203
204            while let Some(x) = buffer.pop() {
205                to_commit.push(x.clone());
206
207                let ancestors: Vec<VerifiedBlock> = dag_state
208                    .get_blocks(
209                        &x.ancestors()
210                            .iter()
211                            .copied()
212                            .filter(|ancestor| {
213                                // We skip the block if we already committed it or we reached a
214                                // round that we already committed.
215                                // TODO: for Fast Path we need to amend the recursion rule here and
216                                // allow us to commit blocks all the way up to the `gc_round`.
217                                // Some additional work will be needed to make sure that we keep the
218                                // uncommitted blocks up to the `gc_round` across commits.
219                                !committed.contains(ancestor)
220                                    && last_committed_rounds[ancestor.author] < ancestor.round
221                            })
222                            .filter(|ancestor| {
223                                // Keep the block if GC is not enabled or it is enabled and the
224                                // block is above the gc_round. We do this
225                                // to stop the recursion early and avoid going to deep when it's
226                                // unnecessary.
227                                !gc_enabled || ancestor.round > gc_round
228                            })
229                            .collect::<Vec<_>>(),
230                    )
231                    .into_iter()
232                    .map(|ancestor_opt| {
233                        ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
234                    })
235                    .collect();
236
237                for ancestor in ancestors {
238                    buffer.push(ancestor.clone());
239                    assert!(committed.insert(ancestor.reference()));
240                }
241            }
242        }
243
244        // The above code should have not yielded any blocks that are <= gc_round, but
245        // just to make sure that we'll never commit anything that should be
246        // garbage collected we attempt to prune here as well.
247        if gc_enabled {
248            assert!(
249                to_commit.iter().all(|block| block.round() > gc_round),
250                "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
251                leader_block_ref
252            );
253        }
254
255        // Sort the blocks of the sub-dag blocks
256        sort_sub_dag_blocks(&mut to_commit);
257
258        to_commit
259    }
260
261    // This function should be called whenever a new commit is observed. This will
262    // iterate over the sequence of committed leaders and produce a list of
263    // committed sub-dags.
264    pub(crate) fn handle_commit(
265        &mut self,
266        committed_leaders: Vec<VerifiedBlock>,
267    ) -> Vec<CommittedSubDag> {
268        if committed_leaders.is_empty() {
269            return vec![];
270        }
271
272        // We check whether the leader schedule has been updated. If yes, then we'll
273        // send the scores as part of the first sub dag.
274        let schedule_updated = self
275            .leader_schedule
276            .leader_schedule_updated(&self.dag_state);
277
278        let mut committed_sub_dags = vec![];
279        for (i, leader_block) in committed_leaders.into_iter().enumerate() {
280            let reputation_scores_desc = if schedule_updated && i == 0 {
281                self.leader_schedule
282                    .leader_swap_table
283                    .read()
284                    .reputation_scores_desc
285                    .clone()
286            } else {
287                vec![]
288            };
289
290            // Collect the sub-dag generated using each of these leaders and the
291            // corresponding commit.
292            let (sub_dag, commit) =
293                self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);
294
295            self.update_blocks_pruned_metric(&sub_dag);
296
297            // Buffer commit in dag state for persistence later.
298            // This also updates the last committed rounds.
299            self.dag_state.write().add_commit(commit.clone());
300
301            committed_sub_dags.push(sub_dag);
302        }
303
304        // Committed blocks must be persisted to storage before sending them to IOTA and
305        // executing their transactions.
306        // Commit metadata can be persisted more lazily because they are recoverable.
307        // Uncommitted blocks can wait to persist too.
308        // But for simplicity, all unpersisted blocks and commits are flushed to
309        // storage.
310        self.dag_state.write().flush();
311
312        committed_sub_dags
313    }
314
315    // Try to measure the number of blocks that get pruned due to GC. This is not
316    // very accurate, but it can give us a good enough idea. We consider a block
317    // as pruned when it is an ancestor of a block that has been committed as part
318    // of the provided `sub_dag`, but it has not been committed as part of
319    // previous commits. Right now we measure this via checking that highest
320    // committed round for the authority as we don't an efficient look up
321    // functionality to check if a block has been committed or not.
322    fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
323        let (last_committed_rounds, gc_round) = {
324            let dag_state = self.dag_state.read();
325            (dag_state.last_committed_rounds(), dag_state.gc_round())
326        };
327
328        for block_ref in sub_dag
329            .blocks
330            .iter()
331            .flat_map(|block| block.ancestors())
332            .filter(
333                |ancestor_ref| {
334                    ancestor_ref.round <= gc_round
335                        && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
336                }, /* If the last committed round is the same as the pruned block's round, then
337                    * we know for sure that it has been committed and it doesn't count here
338                    * as pruned block. */
339            )
340            .unique()
341        {
342            let hostname = &self.context.committee.authority(block_ref.author).hostname;
343
344            // If the last committed round from this authority is lower than the pruned
345            // ancestor in question, then we know for sure that it has not been committed.
346            let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
347                &[hostname, "uncommitted"]
348            } else {
349                // If last committed round is higher for this authority, then we don't really
350                // know it's status, but we know that there is a higher committed block from
351                // this authority.
352                &[hostname, "higher_committed"]
353            };
354
355            self.context
356                .metrics
357                .node_metrics
358                .blocks_pruned_on_commit
359                .with_label_values(label_values)
360                .inc();
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use rstest::rstest;
368
369    use super::*;
370    use crate::{
371        CommitIndex,
372        commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
373        context::Context,
374        leader_schedule::{LeaderSchedule, LeaderSwapTable},
375        storage::mem_store::MemStore,
376        test_dag_builder::DagBuilder,
377        test_dag_parser::parse_dag,
378    };
379
380    #[tokio::test]
381    async fn test_handle_commit() {
382        telemetry_subscribers::init_for_testing();
383        let num_authorities = 4;
384        let context = Arc::new(Context::new_for_test(num_authorities).0);
385        let dag_state = Arc::new(RwLock::new(DagState::new(
386            context.clone(),
387            Arc::new(MemStore::new()),
388        )));
389        let leader_schedule = Arc::new(LeaderSchedule::new(
390            context.clone(),
391            LeaderSwapTable::default(),
392        ));
393        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
394
395        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
396        let num_rounds: u32 = 10;
397        let mut dag_builder = DagBuilder::new(context.clone());
398        dag_builder
399            .layers(1..=num_rounds)
400            .build()
401            .persist_layers(dag_state.clone());
402
403        let leaders = dag_builder
404            .leader_blocks(1..=num_rounds)
405            .into_iter()
406            .map(Option::unwrap)
407            .collect::<Vec<_>>();
408
409        let commits = linearizer.handle_commit(leaders.clone());
410        for (idx, subdag) in commits.into_iter().enumerate() {
411            tracing::info!("{subdag:?}");
412            assert_eq!(subdag.leader, leaders[idx].reference());
413            assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
414            if idx == 0 {
415                // First subdag includes the leader block only
416                assert_eq!(subdag.blocks.len(), 1);
417            } else {
418                // Every subdag after will be missing the leader block from the previous
419                // committed subdag
420                assert_eq!(subdag.blocks.len(), num_authorities);
421            }
422            for block in subdag.blocks.iter() {
423                assert!(block.round() <= leaders[idx].round());
424            }
425            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
426        }
427    }
428
429    #[tokio::test]
430    async fn test_handle_commit_with_schedule_update() {
431        telemetry_subscribers::init_for_testing();
432        let num_authorities = 4;
433        let context = Arc::new(Context::new_for_test(num_authorities).0);
434        let dag_state = Arc::new(RwLock::new(DagState::new(
435            context.clone(),
436            Arc::new(MemStore::new()),
437        )));
438        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
439        let leader_schedule = Arc::new(
440            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
441                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
442        );
443        let mut linearizer =
444            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
445
446        // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
447        let num_rounds: u32 = 20;
448        let mut dag_builder = DagBuilder::new(context.clone());
449        dag_builder
450            .layers(1..=num_rounds)
451            .build()
452            .persist_layers(dag_state.clone());
453
454        // Take the first 10 leaders
455        let leaders = dag_builder
456            .leader_blocks(1..=10)
457            .into_iter()
458            .map(Option::unwrap)
459            .collect::<Vec<_>>();
460
461        // Create some commits
462        let commits = linearizer.handle_commit(leaders.clone());
463
464        // Write them in DagState
465        dag_state.write().add_scoring_subdags(commits);
466        // Now update the leader schedule
467        leader_schedule.update_leader_schedule_v2(&dag_state);
468        assert!(
469            leader_schedule.leader_schedule_updated(&dag_state),
470            "Leader schedule should have been updated"
471        );
472
473        // Try to commit now the rest of the 10 leaders
474        let leaders = dag_builder
475            .leader_blocks(11..=20)
476            .into_iter()
477            .map(Option::unwrap)
478            .collect::<Vec<_>>();
479
480        // Now on the commits only the first one should contain the updated scores, the
481        // other should be empty
482        let commits = linearizer.handle_commit(leaders.clone());
483        assert_eq!(commits.len(), 10);
484        let scores = vec![
485            (AuthorityIndex::new_for_test(1), 29),
486            (AuthorityIndex::new_for_test(0), 29),
487            (AuthorityIndex::new_for_test(3), 29),
488            (AuthorityIndex::new_for_test(2), 29),
489        ];
490        assert_eq!(commits[0].reputation_scores_desc, scores);
491        for commit in commits.into_iter().skip(1) {
492            assert_eq!(commit.reputation_scores_desc, vec![]);
493        }
494    }
495
496    // TODO: Remove when DistributedVoteScoring is enabled.
497    #[tokio::test]
498    async fn test_handle_commit_with_schedule_update_with_unscored_subdags() {
499        telemetry_subscribers::init_for_testing();
500        let num_authorities = 4;
501        let context = Arc::new(Context::new_for_test(num_authorities).0);
502        let dag_state = Arc::new(RwLock::new(DagState::new(
503            context.clone(),
504            Arc::new(MemStore::new()),
505        )));
506        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
507        let leader_schedule = Arc::new(
508            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
509                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
510        );
511        let mut linearizer =
512            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
513
514        // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
515        let num_rounds: u32 = 20;
516        let mut dag_builder = DagBuilder::new(context.clone());
517        dag_builder
518            .layers(1..=num_rounds)
519            .build()
520            .persist_layers(dag_state.clone());
521
522        // Take the first 10 leaders
523        let leaders = dag_builder
524            .leader_blocks(1..=10)
525            .into_iter()
526            .map(Option::unwrap)
527            .collect::<Vec<_>>();
528
529        // Create some commits
530        let commits = linearizer.handle_commit(leaders.clone());
531
532        // Write them in DagState
533        dag_state.write().add_unscored_committed_subdags(commits);
534
535        // Now update the leader schedule
536        leader_schedule.update_leader_schedule_v1(&dag_state);
537
538        assert!(
539            leader_schedule.leader_schedule_updated(&dag_state),
540            "Leader schedule should have been updated"
541        );
542
543        // Try to commit now the rest of the 10 leaders
544        let leaders = dag_builder
545            .leader_blocks(11..=20)
546            .into_iter()
547            .map(Option::unwrap)
548            .collect::<Vec<_>>();
549
550        // Now on the commits only the first one should contain the updated scores, the
551        // other should be empty
552        let commits = linearizer.handle_commit(leaders.clone());
553        assert_eq!(commits.len(), 10);
554        let scores = vec![
555            (AuthorityIndex::new_for_test(2), 9),
556            (AuthorityIndex::new_for_test(1), 8),
557            (AuthorityIndex::new_for_test(0), 8),
558            (AuthorityIndex::new_for_test(3), 8),
559        ];
560        assert_eq!(commits[0].reputation_scores_desc, scores);
561
562        for commit in commits.into_iter().skip(1) {
563            assert_eq!(commit.reputation_scores_desc, vec![]);
564        }
565    }
566
567    #[tokio::test]
568    async fn test_handle_already_committed() {
569        telemetry_subscribers::init_for_testing();
570        let num_authorities = 4;
571        let (mut context, _) = Context::new_for_test(num_authorities);
572        context
573            .protocol_config
574            .set_consensus_gc_depth_for_testing(0);
575        context
576            .protocol_config
577            .set_consensus_linearize_subdag_v2_for_testing(false);
578
579        let context = Arc::new(context);
580
581        let dag_state = Arc::new(RwLock::new(DagState::new(
582            context.clone(),
583            Arc::new(MemStore::new()),
584        )));
585        let leader_schedule = Arc::new(LeaderSchedule::new(
586            context.clone(),
587            LeaderSwapTable::default(),
588        ));
589        let mut linearizer =
590            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
591        let wave_length = DEFAULT_WAVE_LENGTH;
592
593        let leader_round_wave_1 = 3;
594        let leader_round_wave_2 = leader_round_wave_1 + wave_length;
595
596        // Build a Dag from round 1..=6
597        let mut dag_builder = DagBuilder::new(context.clone());
598        dag_builder.layers(1..=leader_round_wave_2).build();
599
600        // Now retrieve all the blocks up to round leader_round_wave_1 - 1
601        // And then only the leader of round leader_round_wave_1
602        // Also store those to DagState
603        let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
604        blocks.push(
605            dag_builder
606                .leader_block(leader_round_wave_1)
607                .expect("Leader block should have been found"),
608        );
609        dag_state.write().accept_blocks(blocks.clone());
610
611        let first_leader = dag_builder
612            .leader_block(leader_round_wave_1)
613            .expect("Wave 1 leader round block should exist");
614        let mut last_commit_index = 1;
615        let first_commit_data = TrustedCommit::new_for_test(
616            last_commit_index,
617            CommitDigest::MIN,
618            0,
619            first_leader.reference(),
620            blocks.into_iter().map(|block| block.reference()).collect(),
621        );
622        dag_state.write().add_commit(first_commit_data);
623
624        // Now take all the blocks from round `leader_round_wave_1` up to round
625        // `leader_round_wave_2-1`
626        let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
627        // Filter out leader block of round `leader_round_wave_1`
628        blocks.retain(|block| {
629            !(block.round() == leader_round_wave_1
630                && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
631        });
632        // Add the leader block of round `leader_round_wave_2`
633        blocks.push(
634            dag_builder
635                .leader_block(leader_round_wave_2)
636                .expect("Leader block should have been found"),
637        );
638        // Write them in dag state
639        dag_state.write().accept_blocks(blocks.clone());
640
641        let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
642
643        // Now get the latest leader which is the leader round of wave 2
644        let leader = dag_builder
645            .leader_block(leader_round_wave_2)
646            .expect("Leader block should exist");
647
648        last_commit_index += 1;
649        let expected_second_commit = TrustedCommit::new_for_test(
650            last_commit_index,
651            CommitDigest::MIN,
652            0,
653            leader.reference(),
654            blocks.clone(),
655        );
656
657        let commit = linearizer.handle_commit(vec![leader.clone()]);
658        assert_eq!(commit.len(), 1);
659
660        let subdag = &commit[0];
661        tracing::info!("{subdag:?}");
662        assert_eq!(subdag.leader, leader.reference());
663        assert_eq!(subdag.timestamp_ms, leader.timestamp_ms());
664        assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
665
666        // Using the same sorting as used in CommittedSubDag::sort
667        blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
668        assert_eq!(
669            subdag
670                .blocks
671                .clone()
672                .into_iter()
673                .map(|b| b.reference())
674                .collect::<Vec<_>>(),
675            blocks
676        );
677        for block in subdag.blocks.iter() {
678            assert!(block.round() <= expected_second_commit.leader().round);
679        }
680    }
681
682    /// This test will run the linearizer with GC disabled (gc_depth = 0) and gc
683    /// enabled (gc_depth = 3) and make sure that for the exact same DAG the
684    /// linearizer will commit different blocks according to the rules.
685    #[rstest]
686    #[tokio::test]
687    async fn test_handle_commit_with_gc_simple(#[values(0, 3)] gc_depth: u32) {
688        telemetry_subscribers::init_for_testing();
689
690        let num_authorities = 4;
691        let (mut context, _keys) = Context::new_for_test(num_authorities);
692        context.protocol_config.set_gc_depth_for_testing(gc_depth);
693
694        if gc_depth == 0 {
695            context
696                .protocol_config
697                .set_consensus_linearize_subdag_v2_for_testing(false);
698        }
699
700        let context = Arc::new(context);
701        let dag_state = Arc::new(RwLock::new(DagState::new(
702            context.clone(),
703            Arc::new(MemStore::new()),
704        )));
705        let leader_schedule = Arc::new(LeaderSchedule::new(
706            context.clone(),
707            LeaderSwapTable::default(),
708        ));
709        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
710
711        // Authorities of index 0->2 will always creates blocks that see each other, but
712        // until round 5 they won't see the blocks of authority 3. For authority
713        // 3 we create blocks that connect to all the other authorities.
714        // On round 5 we finally make the other authorities see the blocks of authority
715        // 3. Practically we "simulate" here a long chain created by authority 3
716        // that is visible in round 5, but due to GC blocks of only round >=2 will
717        // be committed, when GC is enabled. When GC is disabled all blocks will be
718        // committed for rounds >= 1.
719        let dag_str = "DAG {
720                Round 0 : { 4 },
721                Round 1 : { * },
722                Round 2 : {
723                    A -> [-D1],
724                    B -> [-D1],
725                    C -> [-D1],
726                    D -> [*],
727                },
728                Round 3 : {
729                    A -> [-D2],
730                    B -> [-D2],
731                    C -> [-D2],
732                },
733                Round 4 : {
734                    A -> [-D3],
735                    B -> [-D3],
736                    C -> [-D3],
737                    D -> [A3, B3, C3, D2],
738                },
739                Round 5 : { * },
740            }";
741
742        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
743        dag_builder.print();
744        dag_builder.persist_all_blocks(dag_state.clone());
745
746        let leaders = dag_builder
747            .leader_blocks(1..=6)
748            .into_iter()
749            .flatten()
750            .collect::<Vec<_>>();
751
752        let commits = linearizer.handle_commit(leaders.clone());
753        for (idx, subdag) in commits.into_iter().enumerate() {
754            tracing::info!("{subdag:?}");
755            assert_eq!(subdag.leader, leaders[idx].reference());
756            assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
757            if idx == 0 {
758                // First subdag includes the leader block only
759                assert_eq!(subdag.blocks.len(), 1);
760            } else if idx == 1 {
761                assert_eq!(subdag.blocks.len(), 3);
762            } else if idx == 2 {
763                // We commit:
764                // * 1 block on round 4, the leader block
765                // * 3 blocks on round 3, as no commit happened on round 3 since the leader was
766                //   missing
767                // * 2 blocks on round 2, again as no commit happened on round 3, we commit the
768                //   "sub dag" of leader of round 3, which will be another 2 blocks
769                assert_eq!(subdag.blocks.len(), 6);
770            } else {
771                // GC is enabled, so we expect to see only blocks of round >= 2
772                if gc_depth > 0 {
773                    // Now it's going to be the first time that a leader will see the blocks of
774                    // authority 3 and will attempt to commit the long chain.
775                    // However, due to GC it will only commit blocks of round > 1. That's because it
776                    // will commit blocks up to previous leader's round (round =
777                    // 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
778                    // to see on the sub dag committed blocks of round >= 2.
779                    assert_eq!(subdag.blocks.len(), 5);
780
781                    assert!(
782                        subdag.blocks.iter().all(|block| block.round() >= 2),
783                        "Found blocks that are of round < 2."
784                    );
785
786                    // Also ensure that gc_round has advanced with the latest committed leader
787                    assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
788                } else {
789                    // GC is disabled, so we expect to see all blocks of round >= 1
790                    assert_eq!(subdag.blocks.len(), 6);
791                    assert!(
792                        subdag.blocks.iter().all(|block| block.round() >= 1),
793                        "Found blocks that are of round < 1."
794                    );
795
796                    // GC round should never have moved
797                    assert_eq!(dag_state.read().gc_round(), 0);
798                }
799            }
800            for block in subdag.blocks.iter() {
801                assert!(block.round() <= leaders[idx].round());
802            }
803            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
804        }
805    }
806
807    #[rstest]
808    #[tokio::test]
809    async fn test_handle_commit_below_highest_committed_round(#[values(3)] gc_depth: u32) {
810        telemetry_subscribers::init_for_testing();
811
812        let num_authorities = 4;
813        let (mut context, _keys) = Context::new_for_test(num_authorities);
814        context
815            .protocol_config
816            .set_consensus_gc_depth_for_testing(gc_depth);
817        context
818            .protocol_config
819            .set_consensus_linearize_subdag_v2_for_testing(true);
820
821        let context = Arc::new(context);
822        let dag_state = Arc::new(RwLock::new(DagState::new(
823            context.clone(),
824            Arc::new(MemStore::new()),
825        )));
826        let leader_schedule = Arc::new(LeaderSchedule::new(
827            context.clone(),
828            LeaderSwapTable::default(),
829        ));
830        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
831
832        // Authority D will create an "orphaned" block on round 1 as it won't reference
833        // to it on the block of round 2. Similar, no other authority will reference to
834        // it on round 2. Then on round 3 the authorities A, B & C will link to
835        // block D1. Once the DAG gets committed we should see the block D1 getting
836        // committed as well. Normally ,as block D2 would have been committed
837        // first block D1 should be omitted. With the new logic this is no longer true.
838        let dag_str = "DAG {
839                Round 0 : { 4 },
840                Round 1 : { * },
841                Round 2 : {
842                    A -> [-D1],
843                    B -> [-D1],
844                    C -> [-D1],
845                    D -> [-D1],
846                },
847                Round 3 : {
848                    A -> [A2, B2, C2, D1],
849                    B -> [A2, B2, C2, D1],
850                    C -> [A2, B2, C2, D1],
851                    D -> [A2, B2, C2, D2]
852                },
853                Round 4 : { * },
854            }";
855
856        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
857        dag_builder.print();
858        dag_builder.persist_all_blocks(dag_state.clone());
859
860        let leaders = dag_builder
861            .leader_blocks(1..=4)
862            .into_iter()
863            .flatten()
864            .collect::<Vec<_>>();
865
866        let commits = linearizer.handle_commit(leaders.clone());
867        for (idx, subdag) in commits.into_iter().enumerate() {
868            tracing::info!("{subdag:?}");
869            assert_eq!(subdag.leader, leaders[idx].reference());
870            assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
871            if idx == 0 {
872                // First subdag includes the leader block only B1
873                assert_eq!(subdag.blocks.len(), 1);
874            } else if idx == 1 {
875                // We commit:
876                // * 1 block on round 2, the leader block C2
877                // * 2 blocks on round 1, A1, C1
878                assert_eq!(subdag.blocks.len(), 3);
879            } else if idx == 2 {
880                // We commit:
881                // * 1 block on round 3, the leader block D3
882                // * 3 blocks on round 2, A2, B2, D2
883                assert_eq!(subdag.blocks.len(), 4);
884
885                assert!(
886                    subdag.blocks.iter().any(|block| block.round() == 2
887                        && block.author() == AuthorityIndex::new_for_test(3)),
888                    "Block D2 should have been committed."
889                );
890            } else if idx == 3 {
891                // We commit:
892                // * 1 block on round 4, the leader block A4
893                // * 3 blocks on round 3, A3, B3, C3
894                // * 1 block of round 1, D1
895                assert_eq!(subdag.blocks.len(), 5);
896                assert!(
897                    subdag.blocks.iter().any(|block| block.round() == 1
898                        && block.author() == AuthorityIndex::new_for_test(3)),
899                    "Block D1 should have been committed."
900                );
901            } else {
902                panic!("Unexpected subdag with index {:?}", idx);
903            }
904
905            for block in subdag.blocks.iter() {
906                assert!(block.round() <= leaders[idx].round());
907            }
908            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
909        }
910    }
911}