consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info};
11
12use crate::{
13    CommitConsumer, CommittedSubDag,
14    block::{BlockAPI, VerifiedBlock},
15    commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16    context::Context,
17    dag_state::DagState,
18    error::{ConsensusError, ConsensusResult},
19    leader_schedule::LeaderSchedule,
20    linearizer::Linearizer,
21    storage::Store,
22};
23
24/// Role of CommitObserver
25/// - Called by core when try_commit() returns newly committed leaders.
26/// - The newly committed leaders are sent to commit observer and then commit
27///   observer gets subdags for each leader via the commit interpreter
28///   (linearizer)
29/// - The committed subdags are sent as consensus output via an unbounded tokio
30///   channel.
31///
32/// No back pressure mechanism is needed as backpressure is handled as input
33/// into consensus.
34///
35/// - Commit metadata including index is persisted in store, before the
36///   CommittedSubDag is sent to the consumer.
37/// - When CommitObserver is initialized a last processed commit index can be
38///   used to ensure any missing commits are re-sent.
39pub(crate) struct CommitObserver {
40    context: Arc<Context>,
41    /// Component to deterministically collect subdags for committed leaders.
42    commit_interpreter: Linearizer,
43    /// An unbounded channel to send committed sub-dags to the consumer of
44    /// consensus output.
45    sender: UnboundedSender<CommittedSubDag>,
46    /// Persistent storage for blocks, commits and other consensus data.
47    store: Arc<dyn Store>,
48    leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52    pub(crate) fn new(
53        context: Arc<Context>,
54        commit_consumer: CommitConsumer,
55        dag_state: Arc<RwLock<DagState>>,
56        store: Arc<dyn Store>,
57        leader_schedule: Arc<LeaderSchedule>,
58    ) -> Self {
59        let mut observer = Self {
60            commit_interpreter: Linearizer::new(
61                context.clone(),
62                dag_state.clone(),
63                leader_schedule.clone(),
64            ),
65            context,
66            sender: commit_consumer.sender,
67            store,
68            leader_schedule,
69        };
70
71        observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72        observer
73    }
74
75    pub(crate) fn handle_commit(
76        &mut self,
77        committed_leaders: Vec<VerifiedBlock>,
78    ) -> ConsensusResult<Vec<CommittedSubDag>> {
79        let _s = self
80            .context
81            .metrics
82            .node_metrics
83            .scope_processing_time
84            .with_label_values(&["CommitObserver::handle_commit"])
85            .start_timer();
86
87        let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
88        let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
89        for committed_sub_dag in committed_sub_dags.into_iter() {
90            // Failures in sender.send() are assumed to be permanent
91            if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
92                tracing::error!(
93                    "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
94                );
95                return Err(ConsensusError::Shutdown);
96            }
97            tracing::debug!(
98                "Sending to execution commit {} leader {}",
99                committed_sub_dag.commit_ref,
100                committed_sub_dag.leader
101            );
102            sent_sub_dags.push(committed_sub_dag);
103        }
104
105        self.report_metrics(&sent_sub_dags);
106        tracing::trace!("Committed & sent {sent_sub_dags:#?}");
107        Ok(sent_sub_dags)
108    }
109
110    fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
111        let now = Instant::now();
112        // TODO: remove this check, to allow consensus to regenerate commits?
113        let last_commit = self
114            .store
115            .read_last_commit()
116            .expect("Reading the last commit should not fail");
117
118        if let Some(last_commit) = &last_commit {
119            let last_commit_index = last_commit.index();
120
121            assert!(last_commit_index >= last_processed_commit_index);
122            if last_commit_index == last_processed_commit_index {
123                debug!(
124                    "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
125                );
126                return;
127            }
128        };
129
130        // We should not send the last processed commit again, so
131        // last_processed_commit_index+1
132        let unsent_commits = self
133            .store
134            .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
135            .expect("Scanning commits should not fail");
136
137        info!(
138            "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
139            last_commit.map(|c| c.index()).unwrap_or_default(),
140            unsent_commits.len()
141        );
142
143        // Resend all the committed subdags to the consensus output channel
144        // for all the commits above the last processed index.
145        let mut last_sent_commit_index = last_processed_commit_index;
146        let num_unsent_commits = unsent_commits.len();
147        for (index, commit) in unsent_commits.into_iter().enumerate() {
148            // Commit index must be continuous.
149            assert_eq!(commit.index(), last_sent_commit_index + 1);
150
151            // On recovery leader schedule will be updated with the current scores
152            // and the scores will be passed along with the last commit sent to
153            // iota so that the current scores are available for submission.
154            let reputation_scores = if index == num_unsent_commits - 1 {
155                self.leader_schedule
156                    .leader_swap_table
157                    .read()
158                    .reputation_scores_desc
159                    .clone()
160            } else {
161                vec![]
162            };
163
164            info!("Sending commit {} during recovery", commit.index());
165            let committed_sub_dag =
166                load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
167            self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
168                panic!("Failed to send commit during recovery, probably due to shutdown: {e:?}")
169            });
170
171            last_sent_commit_index += 1;
172        }
173
174        info!(
175            "Commit observer recovery completed, took {:?}",
176            now.elapsed()
177        );
178    }
179
180    fn report_metrics(&self, committed: &[CommittedSubDag]) {
181        let metrics = &self.context.metrics.node_metrics;
182        let utc_now = self.context.clock.timestamp_utc_ms();
183
184        for commit in committed {
185            debug!(
186                "Consensus commit {} with leader {} has {} blocks",
187                commit.commit_ref,
188                commit.leader,
189                commit.blocks.len()
190            );
191
192            metrics
193                .last_committed_leader_round
194                .set(commit.leader.round as i64);
195            metrics
196                .last_commit_index
197                .set(commit.commit_ref.index as i64);
198            metrics
199                .blocks_per_commit_count
200                .observe(commit.blocks.len() as f64);
201
202            for block in &commit.blocks {
203                let latency_ms = utc_now
204                    .checked_sub(block.timestamp_ms())
205                    .unwrap_or_default();
206                metrics
207                    .block_commit_latency
208                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
209            }
210        }
211
212        self.context
213            .metrics
214            .node_metrics
215            .sub_dags_per_commit_count
216            .observe(committed.len() as f64);
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
223    use parking_lot::RwLock;
224
225    use super::*;
226    use crate::{
227        block::BlockRef, context::Context, dag_state::DagState, storage::mem_store::MemStore,
228        test_dag_builder::DagBuilder,
229    };
230
231    #[tokio::test]
232    async fn test_handle_commit() {
233        telemetry_subscribers::init_for_testing();
234        let num_authorities = 4;
235        let context = Arc::new(Context::new_for_test(num_authorities).0);
236        let mem_store = Arc::new(MemStore::new());
237        let dag_state = Arc::new(RwLock::new(DagState::new(
238            context.clone(),
239            mem_store.clone(),
240        )));
241        let last_processed_commit_index = 0;
242        let (sender, mut receiver) = unbounded_channel("consensus_output");
243
244        let leader_schedule = Arc::new(LeaderSchedule::from_store(
245            context.clone(),
246            dag_state.clone(),
247        ));
248
249        let mut observer = CommitObserver::new(
250            context.clone(),
251            CommitConsumer::new(sender, last_processed_commit_index),
252            dag_state.clone(),
253            mem_store.clone(),
254            leader_schedule,
255        );
256
257        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
258        let num_rounds = 10;
259        let mut builder = DagBuilder::new(context.clone());
260        builder
261            .layers(1..=num_rounds)
262            .build()
263            .persist_layers(dag_state.clone());
264
265        let leaders = builder
266            .leader_blocks(1..=num_rounds)
267            .into_iter()
268            .map(Option::unwrap)
269            .collect::<Vec<_>>();
270
271        let commits = observer.handle_commit(leaders.clone()).unwrap();
272
273        // Check commits are returned by CommitObserver::handle_commit is accurate
274        let mut expected_stored_refs: Vec<BlockRef> = vec![];
275        for (idx, subdag) in commits.iter().enumerate() {
276            tracing::info!("{subdag:?}");
277            assert_eq!(subdag.leader, leaders[idx].reference());
278            let expected_ts = if idx == 0 {
279                leaders[idx].timestamp_ms()
280            } else {
281                leaders[idx]
282                    .timestamp_ms()
283                    .max(commits[idx - 1].timestamp_ms)
284            };
285            assert_eq!(expected_ts, subdag.timestamp_ms);
286            if idx == 0 {
287                // First subdag includes the leader block plus all ancestor blocks
288                // of the leader minus the genesis round blocks
289                assert_eq!(subdag.blocks.len(), 1);
290            } else {
291                // Every subdag after will be missing the leader block from the previous
292                // committed subdag
293                assert_eq!(subdag.blocks.len(), num_authorities);
294            }
295            for block in subdag.blocks.iter() {
296                expected_stored_refs.push(block.reference());
297                assert!(block.round() <= leaders[idx].round());
298            }
299            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
300        }
301
302        // Check commits sent over consensus output channel is accurate
303        let mut processed_subdag_index = 0;
304        while let Ok(subdag) = receiver.try_recv() {
305            assert_eq!(subdag, commits[processed_subdag_index]);
306            assert_eq!(subdag.reputation_scores_desc, vec![]);
307            processed_subdag_index = subdag.commit_ref.index as usize;
308            if processed_subdag_index == leaders.len() {
309                break;
310            }
311        }
312        assert_eq!(processed_subdag_index, leaders.len());
313
314        verify_channel_empty(&mut receiver);
315
316        // Check commits have been persisted to storage
317        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
318        assert_eq!(
319            last_commit.index(),
320            commits.last().unwrap().commit_ref.index
321        );
322        let all_stored_commits = mem_store
323            .scan_commits((0..=CommitIndex::MAX).into())
324            .unwrap();
325        assert_eq!(all_stored_commits.len(), leaders.len());
326        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
327        assert!(blocks_existence.iter().all(|exists| *exists));
328    }
329
330    #[tokio::test]
331    async fn test_recover_and_send_commits() {
332        telemetry_subscribers::init_for_testing();
333        let num_authorities = 4;
334        let context = Arc::new(Context::new_for_test(num_authorities).0);
335        let mem_store = Arc::new(MemStore::new());
336        let dag_state = Arc::new(RwLock::new(DagState::new(
337            context.clone(),
338            mem_store.clone(),
339        )));
340        let last_processed_commit_index = 0;
341        let (sender, mut receiver) = unbounded_channel("consensus_output");
342
343        let leader_schedule = Arc::new(LeaderSchedule::from_store(
344            context.clone(),
345            dag_state.clone(),
346        ));
347
348        let mut observer = CommitObserver::new(
349            context.clone(),
350            CommitConsumer::new(sender.clone(), last_processed_commit_index),
351            dag_state.clone(),
352            mem_store.clone(),
353            leader_schedule.clone(),
354        );
355
356        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
357        let num_rounds = 10;
358        let mut builder = DagBuilder::new(context.clone());
359        builder
360            .layers(1..=num_rounds)
361            .build()
362            .persist_layers(dag_state.clone());
363
364        let leaders = builder
365            .leader_blocks(1..=num_rounds)
366            .into_iter()
367            .map(Option::unwrap)
368            .collect::<Vec<_>>();
369
370        // Commit first batch of leaders (2) and "receive" the subdags as the
371        // consumer of the consensus output channel.
372        let expected_last_processed_index: usize = 2;
373        let mut commits = observer
374            .handle_commit(
375                leaders
376                    .clone()
377                    .into_iter()
378                    .take(expected_last_processed_index)
379                    .collect::<Vec<_>>(),
380            )
381            .unwrap();
382
383        // Check commits sent over consensus output channel is accurate
384        let mut processed_subdag_index = 0;
385        while let Ok(subdag) = receiver.try_recv() {
386            tracing::info!("Processed {subdag}");
387            assert_eq!(subdag, commits[processed_subdag_index]);
388            assert_eq!(subdag.reputation_scores_desc, vec![]);
389            processed_subdag_index = subdag.commit_ref.index as usize;
390            if processed_subdag_index == expected_last_processed_index {
391                break;
392            }
393        }
394        assert_eq!(processed_subdag_index, expected_last_processed_index);
395
396        verify_channel_empty(&mut receiver);
397
398        // Check last stored commit is correct
399        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
400        assert_eq!(
401            last_commit.index(),
402            expected_last_processed_index as CommitIndex
403        );
404
405        // Handle next batch of leaders (1), these will be sent by consensus but not
406        // "processed" by consensus output channel. Simulating something happened on
407        // the consumer side where the commits were not persisted.
408        commits.append(
409            &mut observer
410                .handle_commit(
411                    leaders
412                        .clone()
413                        .into_iter()
414                        .skip(expected_last_processed_index)
415                        .collect::<Vec<_>>(),
416                )
417                .unwrap(),
418        );
419
420        let expected_last_sent_index = num_rounds as usize;
421        while let Ok(subdag) = receiver.try_recv() {
422            tracing::info!("{subdag} was sent but not processed by consumer");
423            assert_eq!(subdag, commits[processed_subdag_index]);
424            assert_eq!(subdag.reputation_scores_desc, vec![]);
425            processed_subdag_index = subdag.commit_ref.index as usize;
426            if processed_subdag_index == expected_last_sent_index {
427                break;
428            }
429        }
430        assert_eq!(processed_subdag_index, expected_last_sent_index);
431
432        verify_channel_empty(&mut receiver);
433
434        // Check last stored commit is correct. We should persist the last commit
435        // that was sent over the channel regardless of how the consumer handled
436        // the commit on their end.
437        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
438        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
439
440        // Re-create commit observer starting from index 2 which represents the
441        // last processed index from the consumer over consensus output channel
442        let _observer = CommitObserver::new(
443            context.clone(),
444            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
445            dag_state.clone(),
446            mem_store.clone(),
447            leader_schedule,
448        );
449
450        // Check commits sent over consensus output channel is accurate starting
451        // from last processed index of 2 and finishing at last sent index of 3.
452        processed_subdag_index = expected_last_processed_index;
453        while let Ok(subdag) = receiver.try_recv() {
454            tracing::info!("Processed {subdag} on resubmission");
455            assert_eq!(subdag, commits[processed_subdag_index]);
456            assert_eq!(subdag.reputation_scores_desc, vec![]);
457            processed_subdag_index = subdag.commit_ref.index as usize;
458            if processed_subdag_index == expected_last_sent_index {
459                break;
460            }
461        }
462        assert_eq!(processed_subdag_index, expected_last_sent_index);
463
464        verify_channel_empty(&mut receiver);
465    }
466
467    #[tokio::test]
468    async fn test_send_no_missing_commits() {
469        telemetry_subscribers::init_for_testing();
470        let num_authorities = 4;
471        let context = Arc::new(Context::new_for_test(num_authorities).0);
472        let mem_store = Arc::new(MemStore::new());
473        let dag_state = Arc::new(RwLock::new(DagState::new(
474            context.clone(),
475            mem_store.clone(),
476        )));
477        let last_processed_commit_index = 0;
478        let (sender, mut receiver) = unbounded_channel("consensus_output");
479
480        let leader_schedule = Arc::new(LeaderSchedule::from_store(
481            context.clone(),
482            dag_state.clone(),
483        ));
484
485        let mut observer = CommitObserver::new(
486            context.clone(),
487            CommitConsumer::new(sender.clone(), last_processed_commit_index),
488            dag_state.clone(),
489            mem_store.clone(),
490            leader_schedule.clone(),
491        );
492
493        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
494        let num_rounds = 10;
495        let mut builder = DagBuilder::new(context.clone());
496        builder
497            .layers(1..=num_rounds)
498            .build()
499            .persist_layers(dag_state.clone());
500
501        let leaders = builder
502            .leader_blocks(1..=num_rounds)
503            .into_iter()
504            .map(Option::unwrap)
505            .collect::<Vec<_>>();
506
507        // Commit all of the leaders and "receive" the subdags as the consumer of
508        // the consensus output channel.
509        let expected_last_processed_index: usize = 10;
510        let commits = observer.handle_commit(leaders.clone()).unwrap();
511
512        // Check commits sent over consensus output channel is accurate
513        let mut processed_subdag_index = 0;
514        while let Ok(subdag) = receiver.try_recv() {
515            tracing::info!("Processed {subdag}");
516            assert_eq!(subdag, commits[processed_subdag_index]);
517            assert_eq!(subdag.reputation_scores_desc, vec![]);
518            processed_subdag_index = subdag.commit_ref.index as usize;
519            if processed_subdag_index == expected_last_processed_index {
520                break;
521            }
522        }
523        assert_eq!(processed_subdag_index, expected_last_processed_index);
524
525        verify_channel_empty(&mut receiver);
526
527        // Check last stored commit is correct
528        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
529        assert_eq!(
530            last_commit.index(),
531            expected_last_processed_index as CommitIndex
532        );
533
534        // Re-create commit observer starting from index 3 which represents the
535        // last processed index from the consumer over consensus output channel
536        let _observer = CommitObserver::new(
537            context.clone(),
538            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
539            dag_state.clone(),
540            mem_store.clone(),
541            leader_schedule,
542        );
543
544        // No commits should be resubmitted as consensus store's last commit index
545        // is equal to last processed index by consumer
546        verify_channel_empty(&mut receiver);
547    }
548
549    /// After receiving all expected subdags, ensure channel is empty
550    fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
551        match receiver.try_recv() {
552            Ok(_) => {
553                panic!("Expected the consensus output channel to be empty, but found more subdags.")
554            }
555            Err(e) => match e {
556                tokio::sync::mpsc::error::TryRecvError::Empty => {}
557                tokio::sync::mpsc::error::TryRecvError::Disconnected => {
558                    panic!("The consensus output channel was unexpectedly closed.")
559                }
560            },
561        }
562    }
563}