consensus_core/
commit_consumer.rs1use std::sync::{Arc, RwLock};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use tokio::sync::watch;
9
10use crate::{CommitIndex, CommittedSubDag};
11
12pub struct CommitConsumer {
13 pub(crate) sender: UnboundedSender<CommittedSubDag>,
15 pub(crate) last_processed_commit_index: CommitIndex,
20 monitor: Arc<CommitConsumerMonitor>,
22}
23
24impl CommitConsumer {
25 pub fn new(
26 sender: UnboundedSender<CommittedSubDag>,
27 last_processed_commit_index: CommitIndex,
28 ) -> Self {
29 let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
30 Self {
31 sender,
32 last_processed_commit_index,
33 monitor,
34 }
35 }
36
37 pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
38 self.monitor.clone()
39 }
40}
41
42pub struct CommitConsumerMonitor {
43 highest_handled_commit: watch::Sender<u32>,
45
46 highest_observed_commit_at_startup: RwLock<u32>,
48}
49
50impl CommitConsumerMonitor {
51 pub(crate) fn new(last_handled_commit: CommitIndex) -> Self {
52 Self {
53 highest_handled_commit: watch::Sender::new(last_handled_commit),
54 highest_observed_commit_at_startup: RwLock::new(0),
55 }
56 }
57
58 pub fn highest_handled_commit(&self) -> CommitIndex {
59 *self.highest_handled_commit.borrow()
60 }
61
62 pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
63 self.highest_handled_commit
64 .send_replace(highest_handled_commit);
65 }
66
67 pub fn highest_observed_commit_at_startup(&self) -> CommitIndex {
68 *self.highest_observed_commit_at_startup.read().unwrap()
69 }
70
71 pub fn set_highest_observed_commit_at_startup(
72 &self,
73 highest_observed_commit_at_startup: CommitIndex,
74 ) {
75 let highest_handled_commit = self.highest_handled_commit();
76 assert!(
77 highest_observed_commit_at_startup >= highest_handled_commit,
78 "we cannot have handled a commit that we do not know about: {highest_observed_commit_at_startup} < {highest_handled_commit}",
79 );
80
81 let mut commit = self.highest_observed_commit_at_startup.write().unwrap();
82
83 assert!(
84 *commit == 0,
85 "highest_known_commit_at_startup can only be set once"
86 );
87 *commit = highest_observed_commit_at_startup;
88 }
89
90 pub(crate) async fn replay_complete(&self) {
91 let highest_observed_commit_at_startup = self.highest_observed_commit_at_startup();
92 let mut rx = self.highest_handled_commit.subscribe();
93 loop {
94 let highest_handled = *rx.borrow_and_update();
95 if highest_handled >= highest_observed_commit_at_startup {
96 return;
97 }
98 rx.changed().await.unwrap();
99 }
100 }
101}
102
103#[cfg(test)]
104mod test {
105 use crate::CommitConsumerMonitor;
106
107 #[test]
108 fn test_commit_consumer_monitor() {
109 let monitor = CommitConsumerMonitor::new(10);
110 assert_eq!(monitor.highest_handled_commit(), 10);
111
112 monitor.set_highest_handled_commit(100);
113 assert_eq!(monitor.highest_handled_commit(), 100);
114 }
115}