consensus_core/
commit_consumer.rs1use std::sync::{Arc, RwLock};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use tokio::sync::watch;
9
10use crate::{CommitIndex, CommittedSubDag};
11
12pub struct CommitConsumer {
13 pub(crate) sender: UnboundedSender<CommittedSubDag>,
15 pub(crate) last_processed_commit_index: CommitIndex,
20 monitor: Arc<CommitConsumerMonitor>,
22}
23
24impl CommitConsumer {
25 pub fn new(
26 sender: UnboundedSender<CommittedSubDag>,
27 last_processed_commit_index: CommitIndex,
28 ) -> Self {
29 let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index));
30 Self {
31 sender,
32 last_processed_commit_index,
33 monitor,
34 }
35 }
36
37 pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
38 self.monitor.clone()
39 }
40}
41
42pub struct CommitConsumerMonitor {
43 highest_handled_commit: watch::Sender<u32>,
45
46 highest_observed_commit_at_startup: RwLock<u32>,
48}
49
50impl CommitConsumerMonitor {
51 pub(crate) fn new(last_handled_commit: CommitIndex) -> Self {
52 Self {
53 highest_handled_commit: watch::Sender::new(last_handled_commit),
54 highest_observed_commit_at_startup: RwLock::new(0),
55 }
56 }
57
58 pub fn highest_handled_commit(&self) -> CommitIndex {
59 *self.highest_handled_commit.borrow()
60 }
61
62 pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
63 self.highest_handled_commit
64 .send_replace(highest_handled_commit);
65 }
66
67 pub fn highest_observed_commit_at_startup(&self) -> CommitIndex {
68 *self.highest_observed_commit_at_startup.read().unwrap()
69 }
70
71 pub fn set_highest_observed_commit_at_startup(
72 &self,
73 highest_observed_commit_at_startup: CommitIndex,
74 ) {
75 let highest_handled_commit = self.highest_handled_commit();
76 assert!(
77 highest_observed_commit_at_startup >= highest_handled_commit,
78 "we cannot have handled a commit that we do not know about: {} < {}",
79 highest_observed_commit_at_startup,
80 highest_handled_commit,
81 );
82
83 let mut commit = self.highest_observed_commit_at_startup.write().unwrap();
84
85 assert!(
86 *commit == 0,
87 "highest_known_commit_at_startup can only be set once"
88 );
89 *commit = highest_observed_commit_at_startup;
90 }
91
92 pub(crate) async fn replay_complete(&self) {
93 let highest_observed_commit_at_startup = self.highest_observed_commit_at_startup();
94 let mut rx = self.highest_handled_commit.subscribe();
95 loop {
96 let highest_handled = *rx.borrow_and_update();
97 if highest_handled >= highest_observed_commit_at_startup {
98 return;
99 }
100 rx.changed().await.unwrap();
101 }
102 }
103}
104
105#[cfg(test)]
106mod test {
107 use crate::CommitConsumerMonitor;
108
109 #[test]
110 fn test_commit_consumer_monitor() {
111 let monitor = CommitConsumerMonitor::new(10);
112 assert_eq!(monitor.highest_handled_commit(), 10);
113
114 monitor.set_highest_handled_commit(100);
115 assert_eq!(monitor.highest_handled_commit(), 100);
116 }
117}