consensus_core/
linearizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc};
6
7use consensus_config::AuthorityIndex;
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12    Round,
13    block::{BlockAPI, BlockRef, VerifiedBlock},
14    commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
15    context::Context,
16    dag_state::DagState,
17    leader_schedule::LeaderSchedule,
18};
19
20/// The `StorageAPI` trait provides an interface for the block store and has
21/// been mostly introduced for allowing to inject the test store in
22/// `DagBuilder`.
23pub(crate) trait BlockStoreAPI {
24    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
25
26    fn gc_round(&self) -> Round;
27
28    fn gc_enabled(&self) -> bool;
29
30    fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
31
32    fn is_committed(&self, block_ref: &BlockRef) -> bool;
33}
34
35impl BlockStoreAPI
36    for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
37{
38    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
39        DagState::get_blocks(self, refs)
40    }
41
42    fn gc_round(&self) -> Round {
43        DagState::gc_round(self)
44    }
45
46    fn gc_enabled(&self) -> bool {
47        DagState::gc_enabled(self)
48    }
49
50    fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
51        DagState::set_committed(self, block_ref)
52    }
53
54    fn is_committed(&self, block_ref: &BlockRef) -> bool {
55        DagState::is_committed(self, block_ref)
56    }
57}
58
59/// Expand a committed sequence of leader into a sequence of sub-dags.
60#[derive(Clone)]
61pub(crate) struct Linearizer {
62    /// In memory block store representing the dag state
63    context: Arc<Context>,
64    dag_state: Arc<RwLock<DagState>>,
65    leader_schedule: Arc<LeaderSchedule>,
66}
67
68impl Linearizer {
69    pub(crate) fn new(
70        context: Arc<Context>,
71        dag_state: Arc<RwLock<DagState>>,
72        leader_schedule: Arc<LeaderSchedule>,
73    ) -> Self {
74        Self {
75            dag_state,
76            leader_schedule,
77            context,
78        }
79    }
80
81    /// Collect the sub-dag and the corresponding commit from a specific leader
82    /// excluding any duplicates or blocks that have already been committed
83    /// (within previous sub-dags).
84    fn collect_sub_dag_and_commit(
85        &mut self,
86        leader_block: VerifiedBlock,
87        reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
88    ) -> (CommittedSubDag, TrustedCommit) {
89        let _s = self
90            .context
91            .metrics
92            .node_metrics
93            .scope_processing_time
94            .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
95            .start_timer();
96        // Grab latest commit state from dag state
97        let mut dag_state = self.dag_state.write();
98        let last_commit_index = dag_state.last_commit_index();
99        let last_commit_digest = dag_state.last_commit_digest();
100        let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
101        let last_committed_rounds = dag_state.last_committed_rounds();
102        let timestamp_ms = leader_block.timestamp_ms().max(last_commit_timestamp_ms);
103
104        // Now linearize the sub-dag starting from the leader block
105        let to_commit = Self::linearize_sub_dag(
106            &self.context,
107            leader_block.clone(),
108            last_committed_rounds,
109            &mut dag_state,
110        );
111
112        drop(dag_state);
113
114        // Create the Commit.
115        let commit = Commit::new(
116            last_commit_index + 1,
117            last_commit_digest,
118            timestamp_ms,
119            leader_block.reference(),
120            to_commit
121                .iter()
122                .map(|block| block.reference())
123                .collect::<Vec<_>>(),
124        );
125        let serialized = commit
126            .serialize()
127            .unwrap_or_else(|e| panic!("Failed to serialize commit: {e}"));
128        let commit = TrustedCommit::new_trusted(commit, serialized);
129
130        // Create the corresponding committed sub dag
131        let sub_dag = CommittedSubDag::new(
132            leader_block.reference(),
133            to_commit,
134            timestamp_ms,
135            commit.reference(),
136            reputation_scores_desc,
137        );
138
139        (sub_dag, commit)
140    }
141
142    pub(crate) fn linearize_sub_dag(
143        context: &Context,
144        leader_block: VerifiedBlock,
145        last_committed_rounds: Vec<u32>,
146        dag_state: &mut impl BlockStoreAPI,
147    ) -> Vec<VerifiedBlock> {
148        let gc_enabled = dag_state.gc_enabled();
149        // The GC round here is calculated based on the last committed round of the
150        // leader block. The algorithm will attempt to commit blocks up to this
151        // GC round. Once this commit has been processed and written to DagState, then
152        // gc round will update and on the processing of the next commit we'll
153        // have it already updated, so no need to do any gc_round recalculations here.
154        // We just use whatever is currently in DagState.
155        let gc_round: Round = dag_state.gc_round();
156        let leader_block_ref = leader_block.reference();
157        let mut buffer = vec![leader_block];
158
159        let mut to_commit = Vec::new();
160
161        // The new logic will perform the recursion without stopping at the highest
162        // round round that has been committed per authority. Instead it will
163        // allow to commit blocks that are lower than the highest committed round for an
164        // authority but higher than gc_round.
165        if context.protocol_config.consensus_linearize_subdag_v2() {
166            assert!(
167                dag_state.set_committed(&leader_block_ref),
168                "Leader block with reference {leader_block_ref:?} attempted to be committed twice"
169            );
170
171            while let Some(x) = buffer.pop() {
172                to_commit.push(x.clone());
173
174                let ancestors: Vec<VerifiedBlock> = dag_state
175                    .get_blocks(
176                        &x.ancestors()
177                            .iter()
178                            .copied()
179                            .filter(|ancestor| {
180                                ancestor.round > gc_round && !dag_state.is_committed(ancestor)
181                            })
182                            .collect::<Vec<_>>(),
183                    )
184                    .into_iter()
185                    .map(|ancestor_opt| {
186                        ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
187                    })
188                    .collect();
189
190                for ancestor in ancestors {
191                    buffer.push(ancestor.clone());
192                    assert!(
193                        dag_state.set_committed(&ancestor.reference()),
194                        "Block with reference {:?} attempted to be committed twice",
195                        ancestor.reference()
196                    );
197                }
198            }
199        } else {
200            let mut committed = HashSet::new();
201            assert!(committed.insert(leader_block_ref));
202
203            while let Some(x) = buffer.pop() {
204                to_commit.push(x.clone());
205
206                let ancestors: Vec<VerifiedBlock> = dag_state
207                    .get_blocks(
208                        &x.ancestors()
209                            .iter()
210                            .copied()
211                            .filter(|ancestor| {
212                                // We skip the block if we already committed it or we reached a
213                                // round that we already committed.
214                                // TODO: for Fast Path we need to amend the recursion rule here and
215                                // allow us to commit blocks all the way up to the `gc_round`.
216                                // Some additional work will be needed to make sure that we keep the
217                                // uncommitted blocks up to the `gc_round` across commits.
218                                !committed.contains(ancestor)
219                                    && last_committed_rounds[ancestor.author] < ancestor.round
220                            })
221                            .filter(|ancestor| {
222                                // Keep the block if GC is not enabled or it is enabled and the
223                                // block is above the gc_round. We do this
224                                // to stop the recursion early and avoid going to deep when it's
225                                // unnecessary.
226                                !gc_enabled || ancestor.round > gc_round
227                            })
228                            .collect::<Vec<_>>(),
229                    )
230                    .into_iter()
231                    .map(|ancestor_opt| {
232                        ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
233                    })
234                    .collect();
235
236                for ancestor in ancestors {
237                    buffer.push(ancestor.clone());
238                    assert!(committed.insert(ancestor.reference()));
239                }
240            }
241        }
242
243        // The above code should have not yielded any blocks that are <= gc_round, but
244        // just to make sure that we'll never commit anything that should be
245        // garbage collected we attempt to prune here as well.
246        if gc_enabled {
247            assert!(
248                to_commit.iter().all(|block| block.round() > gc_round),
249                "No blocks <= {gc_round} should be committed. Leader round {leader_block_ref}, blocks {to_commit:?}."
250            );
251        }
252
253        // Sort the blocks of the sub-dag blocks
254        sort_sub_dag_blocks(&mut to_commit);
255
256        to_commit
257    }
258
259    // This function should be called whenever a new commit is observed. This will
260    // iterate over the sequence of committed leaders and produce a list of
261    // committed sub-dags.
262    pub(crate) fn handle_commit(
263        &mut self,
264        committed_leaders: Vec<VerifiedBlock>,
265    ) -> Vec<CommittedSubDag> {
266        if committed_leaders.is_empty() {
267            return vec![];
268        }
269
270        // We check whether the leader schedule has been updated. If yes, then we'll
271        // send the scores as part of the first sub dag.
272        let schedule_updated = self
273            .leader_schedule
274            .leader_schedule_updated(&self.dag_state);
275
276        let mut committed_sub_dags = vec![];
277        for (i, leader_block) in committed_leaders.into_iter().enumerate() {
278            let reputation_scores_desc = if schedule_updated && i == 0 {
279                self.leader_schedule
280                    .leader_swap_table
281                    .read()
282                    .reputation_scores_desc
283                    .clone()
284            } else {
285                vec![]
286            };
287
288            // Collect the sub-dag generated using each of these leaders and the
289            // corresponding commit.
290            let (sub_dag, commit) =
291                self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);
292
293            self.update_blocks_pruned_metric(&sub_dag);
294
295            // Buffer commit in dag state for persistence later.
296            // This also updates the last committed rounds.
297            self.dag_state.write().add_commit(commit.clone());
298
299            committed_sub_dags.push(sub_dag);
300        }
301
302        // Committed blocks must be persisted to storage before sending them to IOTA and
303        // executing their transactions.
304        // Commit metadata can be persisted more lazily because they are recoverable.
305        // Uncommitted blocks can wait to persist too.
306        // But for simplicity, all unpersisted blocks and commits are flushed to
307        // storage.
308        self.dag_state.write().flush();
309
310        committed_sub_dags
311    }
312
313    // Try to measure the number of blocks that get pruned due to GC. This is not
314    // very accurate, but it can give us a good enough idea. We consider a block
315    // as pruned when it is an ancestor of a block that has been committed as part
316    // of the provided `sub_dag`, but it has not been committed as part of
317    // previous commits. Right now we measure this via checking that highest
318    // committed round for the authority as we don't an efficient look up
319    // functionality to check if a block has been committed or not.
320    fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
321        let (last_committed_rounds, gc_round) = {
322            let dag_state = self.dag_state.read();
323            (dag_state.last_committed_rounds(), dag_state.gc_round())
324        };
325
326        for block_ref in sub_dag
327            .blocks
328            .iter()
329            .flat_map(|block| block.ancestors())
330            .filter(
331                |ancestor_ref| {
332                    ancestor_ref.round <= gc_round
333                        && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
334                }, /* If the last committed round is the same as the pruned block's round, then
335                    * we know for sure that it has been committed and it doesn't count here
336                    * as pruned block. */
337            )
338            .unique()
339        {
340            let hostname = &self.context.committee.authority(block_ref.author).hostname;
341
342            // If the last committed round from this authority is lower than the pruned
343            // ancestor in question, then we know for sure that it has not been committed.
344            let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
345                &[hostname, "uncommitted"]
346            } else {
347                // If last committed round is higher for this authority, then we don't really
348                // know it's status, but we know that there is a higher committed block from
349                // this authority.
350                &[hostname, "higher_committed"]
351            };
352
353            self.context
354                .metrics
355                .node_metrics
356                .blocks_pruned_on_commit
357                .with_label_values(label_values)
358                .inc();
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use rstest::rstest;
366
367    use super::*;
368    use crate::{
369        CommitIndex,
370        commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
371        context::Context,
372        leader_schedule::{LeaderSchedule, LeaderSwapTable},
373        storage::mem_store::MemStore,
374        test_dag_builder::DagBuilder,
375        test_dag_parser::parse_dag,
376    };
377
378    #[tokio::test]
379    async fn test_handle_commit() {
380        telemetry_subscribers::init_for_testing();
381        let num_authorities = 4;
382        let context = Arc::new(Context::new_for_test(num_authorities).0);
383        let dag_state = Arc::new(RwLock::new(DagState::new(
384            context.clone(),
385            Arc::new(MemStore::new()),
386        )));
387        let leader_schedule = Arc::new(LeaderSchedule::new(
388            context.clone(),
389            LeaderSwapTable::default(),
390        ));
391        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
392
393        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
394        let num_rounds: u32 = 10;
395        let mut dag_builder = DagBuilder::new(context.clone());
396        dag_builder
397            .layers(1..=num_rounds)
398            .build()
399            .persist_layers(dag_state.clone());
400
401        let leaders = dag_builder
402            .leader_blocks(1..=num_rounds)
403            .into_iter()
404            .map(Option::unwrap)
405            .collect::<Vec<_>>();
406
407        let commits = linearizer.handle_commit(leaders.clone());
408        for (idx, subdag) in commits.into_iter().enumerate() {
409            tracing::info!("{subdag:?}");
410            assert_eq!(subdag.leader, leaders[idx].reference());
411            assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
412            if idx == 0 {
413                // First subdag includes the leader block only
414                assert_eq!(subdag.blocks.len(), 1);
415            } else {
416                // Every subdag after will be missing the leader block from the previous
417                // committed subdag
418                assert_eq!(subdag.blocks.len(), num_authorities);
419            }
420            for block in subdag.blocks.iter() {
421                assert!(block.round() <= leaders[idx].round());
422            }
423            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
424        }
425    }
426
427    #[tokio::test]
428    async fn test_handle_commit_with_schedule_update() {
429        telemetry_subscribers::init_for_testing();
430        let num_authorities = 4;
431        let context = Arc::new(Context::new_for_test(num_authorities).0);
432        let dag_state = Arc::new(RwLock::new(DagState::new(
433            context.clone(),
434            Arc::new(MemStore::new()),
435        )));
436        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
437        let leader_schedule = Arc::new(
438            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
439                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
440        );
441        let mut linearizer =
442            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
443
444        // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
445        let num_rounds: u32 = 20;
446        let mut dag_builder = DagBuilder::new(context.clone());
447        dag_builder
448            .layers(1..=num_rounds)
449            .build()
450            .persist_layers(dag_state.clone());
451
452        // Take the first 10 leaders
453        let leaders = dag_builder
454            .leader_blocks(1..=10)
455            .into_iter()
456            .map(Option::unwrap)
457            .collect::<Vec<_>>();
458
459        // Create some commits
460        let commits = linearizer.handle_commit(leaders.clone());
461
462        // Write them in DagState
463        dag_state.write().add_scoring_subdags(commits);
464        // Now update the leader schedule
465        leader_schedule.update_leader_schedule_v2(&dag_state);
466        assert!(
467            leader_schedule.leader_schedule_updated(&dag_state),
468            "Leader schedule should have been updated"
469        );
470
471        // Try to commit now the rest of the 10 leaders
472        let leaders = dag_builder
473            .leader_blocks(11..=20)
474            .into_iter()
475            .map(Option::unwrap)
476            .collect::<Vec<_>>();
477
478        // Now on the commits only the first one should contain the updated scores, the
479        // other should be empty
480        let commits = linearizer.handle_commit(leaders.clone());
481        assert_eq!(commits.len(), 10);
482        let scores = vec![
483            (AuthorityIndex::new_for_test(1), 29),
484            (AuthorityIndex::new_for_test(0), 29),
485            (AuthorityIndex::new_for_test(3), 29),
486            (AuthorityIndex::new_for_test(2), 29),
487        ];
488        assert_eq!(commits[0].reputation_scores_desc, scores);
489        for commit in commits.into_iter().skip(1) {
490            assert_eq!(commit.reputation_scores_desc, vec![]);
491        }
492    }
493
494    // TODO: Remove when DistributedVoteScoring is enabled.
495    #[tokio::test]
496    async fn test_handle_commit_with_schedule_update_with_unscored_subdags() {
497        telemetry_subscribers::init_for_testing();
498        let num_authorities = 4;
499        let context = Arc::new(Context::new_for_test(num_authorities).0);
500        let dag_state = Arc::new(RwLock::new(DagState::new(
501            context.clone(),
502            Arc::new(MemStore::new()),
503        )));
504        const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
505        let leader_schedule = Arc::new(
506            LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
507                .with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
508        );
509        let mut linearizer =
510            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
511
512        // Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
513        let num_rounds: u32 = 20;
514        let mut dag_builder = DagBuilder::new(context.clone());
515        dag_builder
516            .layers(1..=num_rounds)
517            .build()
518            .persist_layers(dag_state.clone());
519
520        // Take the first 10 leaders
521        let leaders = dag_builder
522            .leader_blocks(1..=10)
523            .into_iter()
524            .map(Option::unwrap)
525            .collect::<Vec<_>>();
526
527        // Create some commits
528        let commits = linearizer.handle_commit(leaders.clone());
529
530        // Write them in DagState
531        dag_state.write().add_unscored_committed_subdags(commits);
532
533        // Now update the leader schedule
534        leader_schedule.update_leader_schedule_v1(&dag_state);
535
536        assert!(
537            leader_schedule.leader_schedule_updated(&dag_state),
538            "Leader schedule should have been updated"
539        );
540
541        // Try to commit now the rest of the 10 leaders
542        let leaders = dag_builder
543            .leader_blocks(11..=20)
544            .into_iter()
545            .map(Option::unwrap)
546            .collect::<Vec<_>>();
547
548        // Now on the commits only the first one should contain the updated scores, the
549        // other should be empty
550        let commits = linearizer.handle_commit(leaders.clone());
551        assert_eq!(commits.len(), 10);
552        let scores = vec![
553            (AuthorityIndex::new_for_test(2), 9),
554            (AuthorityIndex::new_for_test(1), 8),
555            (AuthorityIndex::new_for_test(0), 8),
556            (AuthorityIndex::new_for_test(3), 8),
557        ];
558        assert_eq!(commits[0].reputation_scores_desc, scores);
559
560        for commit in commits.into_iter().skip(1) {
561            assert_eq!(commit.reputation_scores_desc, vec![]);
562        }
563    }
564
565    #[tokio::test]
566    async fn test_handle_already_committed() {
567        telemetry_subscribers::init_for_testing();
568        let num_authorities = 4;
569        let (mut context, _) = Context::new_for_test(num_authorities);
570        context
571            .protocol_config
572            .set_consensus_gc_depth_for_testing(0);
573        context
574            .protocol_config
575            .set_consensus_linearize_subdag_v2_for_testing(false);
576
577        let context = Arc::new(context);
578
579        let dag_state = Arc::new(RwLock::new(DagState::new(
580            context.clone(),
581            Arc::new(MemStore::new()),
582        )));
583        let leader_schedule = Arc::new(LeaderSchedule::new(
584            context.clone(),
585            LeaderSwapTable::default(),
586        ));
587        let mut linearizer =
588            Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
589        let wave_length = DEFAULT_WAVE_LENGTH;
590
591        let leader_round_wave_1 = 3;
592        let leader_round_wave_2 = leader_round_wave_1 + wave_length;
593
594        // Build a Dag from round 1..=6
595        let mut dag_builder = DagBuilder::new(context.clone());
596        dag_builder.layers(1..=leader_round_wave_2).build();
597
598        // Now retrieve all the blocks up to round leader_round_wave_1 - 1
599        // And then only the leader of round leader_round_wave_1
600        // Also store those to DagState
601        let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
602        blocks.push(
603            dag_builder
604                .leader_block(leader_round_wave_1)
605                .expect("Leader block should have been found"),
606        );
607        dag_state.write().accept_blocks(blocks.clone());
608
609        let first_leader = dag_builder
610            .leader_block(leader_round_wave_1)
611            .expect("Wave 1 leader round block should exist");
612        let mut last_commit_index = 1;
613        let first_commit_data = TrustedCommit::new_for_test(
614            last_commit_index,
615            CommitDigest::MIN,
616            0,
617            first_leader.reference(),
618            blocks.into_iter().map(|block| block.reference()).collect(),
619        );
620        dag_state.write().add_commit(first_commit_data);
621
622        // Now take all the blocks from round `leader_round_wave_1` up to round
623        // `leader_round_wave_2-1`
624        let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
625        // Filter out leader block of round `leader_round_wave_1`
626        blocks.retain(|block| {
627            !(block.round() == leader_round_wave_1
628                && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
629        });
630        // Add the leader block of round `leader_round_wave_2`
631        blocks.push(
632            dag_builder
633                .leader_block(leader_round_wave_2)
634                .expect("Leader block should have been found"),
635        );
636        // Write them in dag state
637        dag_state.write().accept_blocks(blocks.clone());
638
639        let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
640
641        // Now get the latest leader which is the leader round of wave 2
642        let leader = dag_builder
643            .leader_block(leader_round_wave_2)
644            .expect("Leader block should exist");
645
646        last_commit_index += 1;
647        let expected_second_commit = TrustedCommit::new_for_test(
648            last_commit_index,
649            CommitDigest::MIN,
650            0,
651            leader.reference(),
652            blocks.clone(),
653        );
654
655        let commit = linearizer.handle_commit(vec![leader.clone()]);
656        assert_eq!(commit.len(), 1);
657
658        let subdag = &commit[0];
659        tracing::info!("{subdag:?}");
660        assert_eq!(subdag.leader, leader.reference());
661        assert_eq!(subdag.timestamp_ms, leader.timestamp_ms());
662        assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
663
664        // Using the same sorting as used in CommittedSubDag::sort
665        blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
666        assert_eq!(
667            subdag
668                .blocks
669                .clone()
670                .into_iter()
671                .map(|b| b.reference())
672                .collect::<Vec<_>>(),
673            blocks
674        );
675        for block in subdag.blocks.iter() {
676            assert!(block.round() <= expected_second_commit.leader().round);
677        }
678    }
679
680    /// This test will run the linearizer with GC disabled (gc_depth = 0) and gc
681    /// enabled (gc_depth = 3) and make sure that for the exact same DAG the
682    /// linearizer will commit different blocks according to the rules.
683    #[rstest]
684    #[tokio::test]
685    async fn test_handle_commit_with_gc_simple(#[values(0, 3)] gc_depth: u32) {
686        telemetry_subscribers::init_for_testing();
687
688        let num_authorities = 4;
689        let (mut context, _keys) = Context::new_for_test(num_authorities);
690        context.protocol_config.set_gc_depth_for_testing(gc_depth);
691
692        if gc_depth == 0 {
693            context
694                .protocol_config
695                .set_consensus_linearize_subdag_v2_for_testing(false);
696        }
697
698        let context = Arc::new(context);
699        let dag_state = Arc::new(RwLock::new(DagState::new(
700            context.clone(),
701            Arc::new(MemStore::new()),
702        )));
703        let leader_schedule = Arc::new(LeaderSchedule::new(
704            context.clone(),
705            LeaderSwapTable::default(),
706        ));
707        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
708
709        // Authorities of index 0->2 will always creates blocks that see each other, but
710        // until round 5 they won't see the blocks of authority 3. For authority
711        // 3 we create blocks that connect to all the other authorities.
712        // On round 5 we finally make the other authorities see the blocks of authority
713        // 3. Practically we "simulate" here a long chain created by authority 3
714        // that is visible in round 5, but due to GC blocks of only round >=2 will
715        // be committed, when GC is enabled. When GC is disabled all blocks will be
716        // committed for rounds >= 1.
717        let dag_str = "DAG {
718                Round 0 : { 4 },
719                Round 1 : { * },
720                Round 2 : {
721                    A -> [-D1],
722                    B -> [-D1],
723                    C -> [-D1],
724                    D -> [*],
725                },
726                Round 3 : {
727                    A -> [-D2],
728                    B -> [-D2],
729                    C -> [-D2],
730                },
731                Round 4 : {
732                    A -> [-D3],
733                    B -> [-D3],
734                    C -> [-D3],
735                    D -> [A3, B3, C3, D2],
736                },
737                Round 5 : { * },
738            }";
739
740        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
741        dag_builder.print();
742        dag_builder.persist_all_blocks(dag_state.clone());
743
744        let leaders = dag_builder
745            .leader_blocks(1..=6)
746            .into_iter()
747            .flatten()
748            .collect::<Vec<_>>();
749
750        let commits = linearizer.handle_commit(leaders.clone());
751        for (idx, subdag) in commits.into_iter().enumerate() {
752            tracing::info!("{subdag:?}");
753            assert_eq!(subdag.leader, leaders[idx].reference());
754            assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
755            if idx == 0 {
756                // First subdag includes the leader block only
757                assert_eq!(subdag.blocks.len(), 1);
758            } else if idx == 1 {
759                assert_eq!(subdag.blocks.len(), 3);
760            } else if idx == 2 {
761                // We commit:
762                // * 1 block on round 4, the leader block
763                // * 3 blocks on round 3, as no commit happened on round 3 since the leader was
764                //   missing
765                // * 2 blocks on round 2, again as no commit happened on round 3, we commit the
766                //   "sub dag" of leader of round 3, which will be another 2 blocks
767                assert_eq!(subdag.blocks.len(), 6);
768            } else {
769                // GC is enabled, so we expect to see only blocks of round >= 2
770                if gc_depth > 0 {
771                    // Now it's going to be the first time that a leader will see the blocks of
772                    // authority 3 and will attempt to commit the long chain.
773                    // However, due to GC it will only commit blocks of round > 1. That's because it
774                    // will commit blocks up to previous leader's round (round =
775                    // 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
776                    // to see on the sub dag committed blocks of round >= 2.
777                    assert_eq!(subdag.blocks.len(), 5);
778
779                    assert!(
780                        subdag.blocks.iter().all(|block| block.round() >= 2),
781                        "Found blocks that are of round < 2."
782                    );
783
784                    // Also ensure that gc_round has advanced with the latest committed leader
785                    assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
786                } else {
787                    // GC is disabled, so we expect to see all blocks of round >= 1
788                    assert_eq!(subdag.blocks.len(), 6);
789                    assert!(
790                        subdag.blocks.iter().all(|block| block.round() >= 1),
791                        "Found blocks that are of round < 1."
792                    );
793
794                    // GC round should never have moved
795                    assert_eq!(dag_state.read().gc_round(), 0);
796                }
797            }
798            for block in subdag.blocks.iter() {
799                assert!(block.round() <= leaders[idx].round());
800            }
801            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
802        }
803    }
804
805    #[rstest]
806    #[tokio::test]
807    async fn test_handle_commit_below_highest_committed_round(#[values(3)] gc_depth: u32) {
808        telemetry_subscribers::init_for_testing();
809
810        let num_authorities = 4;
811        let (mut context, _keys) = Context::new_for_test(num_authorities);
812        context
813            .protocol_config
814            .set_consensus_gc_depth_for_testing(gc_depth);
815        context
816            .protocol_config
817            .set_consensus_linearize_subdag_v2_for_testing(true);
818
819        let context = Arc::new(context);
820        let dag_state = Arc::new(RwLock::new(DagState::new(
821            context.clone(),
822            Arc::new(MemStore::new()),
823        )));
824        let leader_schedule = Arc::new(LeaderSchedule::new(
825            context.clone(),
826            LeaderSwapTable::default(),
827        ));
828        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone(), leader_schedule);
829
830        // Authority D will create an "orphaned" block on round 1 as it won't reference
831        // to it on the block of round 2. Similar, no other authority will reference to
832        // it on round 2. Then on round 3 the authorities A, B & C will link to
833        // block D1. Once the DAG gets committed we should see the block D1 getting
834        // committed as well. Normally ,as block D2 would have been committed
835        // first block D1 should be omitted. With the new logic this is no longer true.
836        let dag_str = "DAG {
837                Round 0 : { 4 },
838                Round 1 : { * },
839                Round 2 : {
840                    A -> [-D1],
841                    B -> [-D1],
842                    C -> [-D1],
843                    D -> [-D1],
844                },
845                Round 3 : {
846                    A -> [A2, B2, C2, D1],
847                    B -> [A2, B2, C2, D1],
848                    C -> [A2, B2, C2, D1],
849                    D -> [A2, B2, C2, D2]
850                },
851                Round 4 : { * },
852            }";
853
854        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
855        dag_builder.print();
856        dag_builder.persist_all_blocks(dag_state.clone());
857
858        let leaders = dag_builder
859            .leader_blocks(1..=4)
860            .into_iter()
861            .flatten()
862            .collect::<Vec<_>>();
863
864        let commits = linearizer.handle_commit(leaders.clone());
865        for (idx, subdag) in commits.into_iter().enumerate() {
866            tracing::info!("{subdag:?}");
867            assert_eq!(subdag.leader, leaders[idx].reference());
868            assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
869            if idx == 0 {
870                // First subdag includes the leader block only B1
871                assert_eq!(subdag.blocks.len(), 1);
872            } else if idx == 1 {
873                // We commit:
874                // * 1 block on round 2, the leader block C2
875                // * 2 blocks on round 1, A1, C1
876                assert_eq!(subdag.blocks.len(), 3);
877            } else if idx == 2 {
878                // We commit:
879                // * 1 block on round 3, the leader block D3
880                // * 3 blocks on round 2, A2, B2, D2
881                assert_eq!(subdag.blocks.len(), 4);
882
883                assert!(
884                    subdag.blocks.iter().any(|block| block.round() == 2
885                        && block.author() == AuthorityIndex::new_for_test(3)),
886                    "Block D2 should have been committed."
887                );
888            } else if idx == 3 {
889                // We commit:
890                // * 1 block on round 4, the leader block A4
891                // * 3 blocks on round 3, A3, B3, C3
892                // * 1 block of round 1, D1
893                assert_eq!(subdag.blocks.len(), 5);
894                assert!(
895                    subdag.blocks.iter().any(|block| block.round() == 1
896                        && block.author() == AuthorityIndex::new_for_test(3)),
897                    "Block D1 should have been committed."
898                );
899            } else {
900                panic!("Unexpected subdag with index {idx:?}");
901            }
902
903            for block in subdag.blocks.iter() {
904                assert!(block.round() <= leaders[idx].round());
905            }
906            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
907        }
908    }
909}