consensus_core/
commit_consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, RwLock};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use tokio::sync::watch;
9
10use crate::{CommitIndex, CommittedSubDag};
11
12pub struct CommitConsumer {
13    // A channel to send the committed sub dags through
14    pub(crate) sender: UnboundedSender<CommittedSubDag>,
15    // Index of the last commit that the consumer has processed. This is useful for
16    // crash/recovery so mysticeti can replay the commits from the next index.
17    // First commit in the replayed sequence will have index last_processed_commit_index + 1.
18    // Set 0 to replay from the start (as generated commit sequence starts at index = 1).
19    pub(crate) last_processed_commit_index: CommitIndex,
20    // Allows the commit consumer to report its progress.
21    monitor: Arc<CommitConsumerMonitor>,
22}
23
24impl CommitConsumer {
25    pub fn new(
26        sender: UnboundedSender<CommittedSubDag>,
27        last_processed_commit_index: CommitIndex,
28    ) -> Self {
29        let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
30        Self {
31            sender,
32            last_processed_commit_index,
33            monitor,
34        }
35    }
36
37    pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
38        self.monitor.clone()
39    }
40}
41
42pub struct CommitConsumerMonitor {
43    // highest commit that has been handled by IOTA
44    highest_handled_commit: watch::Sender<u32>,
45
46    // the highest commit found in local storage at startup
47    highest_observed_commit_at_startup: RwLock<u32>,
48}
49
50impl CommitConsumerMonitor {
51    pub(crate) fn new(last_handled_commit: CommitIndex) -> Self {
52        Self {
53            highest_handled_commit: watch::Sender::new(last_handled_commit),
54            highest_observed_commit_at_startup: RwLock::new(0),
55        }
56    }
57
58    pub fn highest_handled_commit(&self) -> CommitIndex {
59        *self.highest_handled_commit.borrow()
60    }
61
62    pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
63        self.highest_handled_commit
64            .send_replace(highest_handled_commit);
65    }
66
67    pub fn highest_observed_commit_at_startup(&self) -> CommitIndex {
68        *self.highest_observed_commit_at_startup.read().unwrap()
69    }
70
71    pub fn set_highest_observed_commit_at_startup(
72        &self,
73        highest_observed_commit_at_startup: CommitIndex,
74    ) {
75        let highest_handled_commit = self.highest_handled_commit();
76        assert!(
77            highest_observed_commit_at_startup >= highest_handled_commit,
78            "we cannot have handled a commit that we do not know about: {highest_observed_commit_at_startup} < {highest_handled_commit}",
79        );
80
81        let mut commit = self.highest_observed_commit_at_startup.write().unwrap();
82
83        assert!(
84            *commit == 0,
85            "highest_known_commit_at_startup can only be set once"
86        );
87        *commit = highest_observed_commit_at_startup;
88    }
89
90    pub(crate) async fn replay_complete(&self) {
91        let highest_observed_commit_at_startup = self.highest_observed_commit_at_startup();
92        let mut rx = self.highest_handled_commit.subscribe();
93        loop {
94            let highest_handled = *rx.borrow_and_update();
95            if highest_handled >= highest_observed_commit_at_startup {
96                return;
97            }
98            rx.changed().await.unwrap();
99        }
100    }
101}
102
103#[cfg(test)]
104mod test {
105    use crate::CommitConsumerMonitor;
106
107    #[test]
108    fn test_commit_consumer_monitor() {
109        let monitor = CommitConsumerMonitor::new(10);
110        assert_eq!(monitor.highest_handled_commit(), 10);
111
112        monitor.set_highest_handled_commit(100);
113        assert_eq!(monitor.highest_handled_commit(), 100);
114    }
115}