consensus_core/
commit_consumer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, RwLock};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use tokio::sync::watch;
9
10use crate::{CommitIndex, CommittedSubDag};
11
12pub struct CommitConsumer {
13    // A channel to send the committed sub dags through
14    pub(crate) sender: UnboundedSender<CommittedSubDag>,
15    // Index of the last commit that the consumer has processed. This is useful for
16    // crash/recovery so mysticeti can replay the commits from the next index.
17    // First commit in the replayed sequence will have index last_processed_commit_index + 1.
18    // Set 0 to replay from the start (as generated commit sequence starts at index = 1).
19    pub(crate) last_processed_commit_index: CommitIndex,
20    // Allows the commit consumer to report its progress.
21    monitor: Arc<CommitConsumerMonitor>,
22}
23
24impl CommitConsumer {
25    pub fn new(
26        sender: UnboundedSender<CommittedSubDag>,
27        last_processed_commit_index: CommitIndex,
28    ) -> Self {
29        let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
30        Self {
31            sender,
32            last_processed_commit_index,
33            monitor,
34        }
35    }
36
37    pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
38        self.monitor.clone()
39    }
40}
41
42pub struct CommitConsumerMonitor {
43    // highest commit that has been handled by IOTA
44    highest_handled_commit: watch::Sender<u32>,
45
46    // the highest commit found in local storage at startup
47    highest_observed_commit_at_startup: RwLock<u32>,
48}
49
50impl CommitConsumerMonitor {
51    pub(crate) fn new(last_handled_commit: CommitIndex) -> Self {
52        Self {
53            highest_handled_commit: watch::Sender::new(last_handled_commit),
54            highest_observed_commit_at_startup: RwLock::new(0),
55        }
56    }
57
58    pub fn highest_handled_commit(&self) -> CommitIndex {
59        *self.highest_handled_commit.borrow()
60    }
61
62    pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
63        self.highest_handled_commit
64            .send_replace(highest_handled_commit);
65    }
66
67    pub fn highest_observed_commit_at_startup(&self) -> CommitIndex {
68        *self.highest_observed_commit_at_startup.read().unwrap()
69    }
70
71    pub fn set_highest_observed_commit_at_startup(
72        &self,
73        highest_observed_commit_at_startup: CommitIndex,
74    ) {
75        let highest_handled_commit = self.highest_handled_commit();
76        assert!(
77            highest_observed_commit_at_startup >= highest_handled_commit,
78            "we cannot have handled a commit that we do not know about: {} < {}",
79            highest_observed_commit_at_startup,
80            highest_handled_commit,
81        );
82
83        let mut commit = self.highest_observed_commit_at_startup.write().unwrap();
84
85        assert!(
86            *commit == 0,
87            "highest_known_commit_at_startup can only be set once"
88        );
89        *commit = highest_observed_commit_at_startup;
90    }
91
92    pub(crate) async fn replay_complete(&self) {
93        let highest_observed_commit_at_startup = self.highest_observed_commit_at_startup();
94        let mut rx = self.highest_handled_commit.subscribe();
95        loop {
96            let highest_handled = *rx.borrow_and_update();
97            if highest_handled >= highest_observed_commit_at_startup {
98                return;
99            }
100            rx.changed().await.unwrap();
101        }
102    }
103}
104
105#[cfg(test)]
106mod test {
107    use crate::CommitConsumerMonitor;
108
109    #[test]
110    fn test_commit_consumer_monitor() {
111        let monitor = CommitConsumerMonitor::new(10);
112        assert_eq!(monitor.highest_handled_commit(), 10);
113
114        monitor.set_highest_handled_commit(100);
115        assert_eq!(monitor.highest_handled_commit(), 100);
116    }
117}