consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info, instrument};
11
12use crate::{
13    CommitConsumer, CommittedSubDag,
14    block::{BlockAPI, VerifiedBlock},
15    commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16    context::Context,
17    dag_state::DagState,
18    error::{ConsensusError, ConsensusResult},
19    leader_schedule::LeaderSchedule,
20    linearizer::Linearizer,
21    storage::Store,
22};
23
24/// Role of CommitObserver
25/// - Called by core when try_commit() returns newly committed leaders.
26/// - The newly committed leaders are sent to commit observer and then commit
27///   observer gets subdags for each leader via the commit interpreter
28///   (linearizer)
29/// - The committed subdags are sent as consensus output via an unbounded tokio
30///   channel.
31///
32/// No back pressure mechanism is needed as backpressure is handled as input
33/// into consensus.
34///
35/// - Commit metadata including index is persisted in store, before the
36///   CommittedSubDag is sent to the consumer.
37/// - When CommitObserver is initialized a last processed commit index can be
38///   used to ensure any missing commits are re-sent.
39pub(crate) struct CommitObserver {
40    context: Arc<Context>,
41    /// Component to deterministically collect subdags for committed leaders.
42    commit_interpreter: Linearizer,
43    /// An unbounded channel to send committed sub-dags to the consumer of
44    /// consensus output.
45    sender: UnboundedSender<CommittedSubDag>,
46    /// Persistent storage for blocks, commits and other consensus data.
47    store: Arc<dyn Store>,
48    leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52    pub(crate) fn new(
53        context: Arc<Context>,
54        commit_consumer: CommitConsumer,
55        dag_state: Arc<RwLock<DagState>>,
56        store: Arc<dyn Store>,
57        leader_schedule: Arc<LeaderSchedule>,
58    ) -> Self {
59        let mut observer = Self {
60            commit_interpreter: Linearizer::new(
61                context.clone(),
62                dag_state.clone(),
63                leader_schedule.clone(),
64            ),
65            context,
66            sender: commit_consumer.sender,
67            store,
68            leader_schedule,
69        };
70
71        observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72        observer
73    }
74
75    #[instrument(level = "trace", skip_all)]
76    pub(crate) fn handle_commit(
77        &mut self,
78        committed_leaders: Vec<VerifiedBlock>,
79    ) -> ConsensusResult<Vec<CommittedSubDag>> {
80        let _s = self
81            .context
82            .metrics
83            .node_metrics
84            .scope_processing_time
85            .with_label_values(&["CommitObserver::handle_commit"])
86            .start_timer();
87
88        let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
89        let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
90        for committed_sub_dag in committed_sub_dags.into_iter() {
91            // Failures in sender.send() are assumed to be permanent
92            if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
93                tracing::error!(
94                    "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
95                );
96                return Err(ConsensusError::Shutdown);
97            }
98            tracing::debug!(
99                "Sending to execution commit {} leader {}",
100                committed_sub_dag.commit_ref,
101                committed_sub_dag.leader
102            );
103            sent_sub_dags.push(committed_sub_dag);
104        }
105
106        self.report_metrics(&sent_sub_dags);
107        tracing::trace!("Committed & sent {sent_sub_dags:#?}");
108        Ok(sent_sub_dags)
109    }
110
111    fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
112        let now = Instant::now();
113        // TODO: remove this check, to allow consensus to regenerate commits?
114        let last_commit = self
115            .store
116            .read_last_commit()
117            .expect("Reading the last commit should not fail");
118
119        if let Some(last_commit) = &last_commit {
120            let last_commit_index = last_commit.index();
121
122            assert!(last_commit_index >= last_processed_commit_index);
123            if last_commit_index == last_processed_commit_index {
124                debug!(
125                    "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
126                );
127                return;
128            }
129        };
130
131        // We should not send the last processed commit again, so
132        // last_processed_commit_index+1
133        let unsent_commits = self
134            .store
135            .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
136            .expect("Scanning commits should not fail");
137
138        info!(
139            "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
140            last_commit.map(|c| c.index()).unwrap_or_default(),
141            unsent_commits.len()
142        );
143
144        // Resend all the committed subdags to the consensus output channel
145        // for all the commits above the last processed index.
146        let mut last_sent_commit_index = last_processed_commit_index;
147        let num_unsent_commits = unsent_commits.len();
148        for (index, commit) in unsent_commits.into_iter().enumerate() {
149            // Commit index must be continuous.
150            assert_eq!(commit.index(), last_sent_commit_index + 1);
151
152            // On recovery leader schedule will be updated with the current scores
153            // and the scores will be passed along with the last commit sent to
154            // iota so that the current scores are available for submission.
155            let reputation_scores = if index == num_unsent_commits - 1 {
156                self.leader_schedule
157                    .leader_swap_table
158                    .read()
159                    .reputation_scores_desc
160                    .clone()
161            } else {
162                vec![]
163            };
164
165            info!("Sending commit {} during recovery", commit.index());
166            let committed_sub_dag =
167                load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
168            self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
169                panic!("Failed to send commit during recovery, probably due to shutdown: {e:?}")
170            });
171
172            last_sent_commit_index += 1;
173        }
174
175        info!(
176            "Commit observer recovery completed, took {:?}",
177            now.elapsed()
178        );
179    }
180
181    fn report_metrics(&self, committed: &[CommittedSubDag]) {
182        let metrics = &self.context.metrics.node_metrics;
183        let utc_now = self.context.clock.timestamp_utc_ms();
184
185        for commit in committed {
186            debug!(
187                "Consensus commit {} with leader {} has {} blocks",
188                commit.commit_ref,
189                commit.leader,
190                commit.blocks.len()
191            );
192
193            metrics
194                .last_committed_leader_round
195                .set(commit.leader.round as i64);
196            metrics
197                .last_commit_index
198                .set(commit.commit_ref.index as i64);
199            metrics
200                .blocks_per_commit_count
201                .observe(commit.blocks.len() as f64);
202
203            for block in &commit.blocks {
204                let latency_ms = utc_now
205                    .checked_sub(block.timestamp_ms())
206                    .unwrap_or_default();
207                metrics
208                    .block_commit_latency
209                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
210            }
211        }
212
213        self.context
214            .metrics
215            .node_metrics
216            .sub_dags_per_commit_count
217            .observe(committed.len() as f64);
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
224    use parking_lot::RwLock;
225
226    use super::*;
227    use crate::{
228        block::BlockRef, context::Context, dag_state::DagState, storage::mem_store::MemStore,
229        test_dag_builder::DagBuilder,
230    };
231
232    #[tokio::test]
233    async fn test_handle_commit() {
234        telemetry_subscribers::init_for_testing();
235        let num_authorities = 4;
236        let context = Arc::new(Context::new_for_test(num_authorities).0);
237        let mem_store = Arc::new(MemStore::new());
238        let dag_state = Arc::new(RwLock::new(DagState::new(
239            context.clone(),
240            mem_store.clone(),
241        )));
242        let last_processed_commit_index = 0;
243        let (sender, mut receiver) = unbounded_channel("consensus_output");
244
245        let leader_schedule = Arc::new(LeaderSchedule::from_store(
246            context.clone(),
247            dag_state.clone(),
248        ));
249
250        let mut observer = CommitObserver::new(
251            context.clone(),
252            CommitConsumer::new(sender, last_processed_commit_index),
253            dag_state.clone(),
254            mem_store.clone(),
255            leader_schedule,
256        );
257
258        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
259        let num_rounds = 10;
260        let mut builder = DagBuilder::new(context.clone());
261        builder
262            .layers(1..=num_rounds)
263            .build()
264            .persist_layers(dag_state.clone());
265
266        let leaders = builder
267            .leader_blocks(1..=num_rounds)
268            .into_iter()
269            .map(Option::unwrap)
270            .collect::<Vec<_>>();
271
272        let commits = observer.handle_commit(leaders.clone()).unwrap();
273
274        // Check commits are returned by CommitObserver::handle_commit is accurate
275        let mut expected_stored_refs: Vec<BlockRef> = vec![];
276        for (idx, subdag) in commits.iter().enumerate() {
277            tracing::info!("{subdag:?}");
278            assert_eq!(subdag.leader, leaders[idx].reference());
279            let expected_ts = if idx == 0 {
280                leaders[idx].timestamp_ms()
281            } else {
282                leaders[idx]
283                    .timestamp_ms()
284                    .max(commits[idx - 1].timestamp_ms)
285            };
286            assert_eq!(expected_ts, subdag.timestamp_ms);
287            if idx == 0 {
288                // First subdag includes the leader block plus all ancestor blocks
289                // of the leader minus the genesis round blocks
290                assert_eq!(subdag.blocks.len(), 1);
291            } else {
292                // Every subdag after will be missing the leader block from the previous
293                // committed subdag
294                assert_eq!(subdag.blocks.len(), num_authorities);
295            }
296            for block in subdag.blocks.iter() {
297                expected_stored_refs.push(block.reference());
298                assert!(block.round() <= leaders[idx].round());
299            }
300            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
301        }
302
303        // Check commits sent over consensus output channel is accurate
304        let mut processed_subdag_index = 0;
305        while let Ok(subdag) = receiver.try_recv() {
306            assert_eq!(subdag, commits[processed_subdag_index]);
307            assert_eq!(subdag.reputation_scores_desc, vec![]);
308            processed_subdag_index = subdag.commit_ref.index as usize;
309            if processed_subdag_index == leaders.len() {
310                break;
311            }
312        }
313        assert_eq!(processed_subdag_index, leaders.len());
314
315        verify_channel_empty(&mut receiver);
316
317        // Check commits have been persisted to storage
318        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
319        assert_eq!(
320            last_commit.index(),
321            commits.last().unwrap().commit_ref.index
322        );
323        let all_stored_commits = mem_store
324            .scan_commits((0..=CommitIndex::MAX).into())
325            .unwrap();
326        assert_eq!(all_stored_commits.len(), leaders.len());
327        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
328        assert!(blocks_existence.iter().all(|exists| *exists));
329    }
330
331    #[tokio::test]
332    async fn test_recover_and_send_commits() {
333        telemetry_subscribers::init_for_testing();
334        let num_authorities = 4;
335        let context = Arc::new(Context::new_for_test(num_authorities).0);
336        let mem_store = Arc::new(MemStore::new());
337        let dag_state = Arc::new(RwLock::new(DagState::new(
338            context.clone(),
339            mem_store.clone(),
340        )));
341        let last_processed_commit_index = 0;
342        let (sender, mut receiver) = unbounded_channel("consensus_output");
343
344        let leader_schedule = Arc::new(LeaderSchedule::from_store(
345            context.clone(),
346            dag_state.clone(),
347        ));
348
349        let mut observer = CommitObserver::new(
350            context.clone(),
351            CommitConsumer::new(sender.clone(), last_processed_commit_index),
352            dag_state.clone(),
353            mem_store.clone(),
354            leader_schedule.clone(),
355        );
356
357        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
358        let num_rounds = 10;
359        let mut builder = DagBuilder::new(context.clone());
360        builder
361            .layers(1..=num_rounds)
362            .build()
363            .persist_layers(dag_state.clone());
364
365        let leaders = builder
366            .leader_blocks(1..=num_rounds)
367            .into_iter()
368            .map(Option::unwrap)
369            .collect::<Vec<_>>();
370
371        // Commit first batch of leaders (2) and "receive" the subdags as the
372        // consumer of the consensus output channel.
373        let expected_last_processed_index: usize = 2;
374        let mut commits = observer
375            .handle_commit(
376                leaders
377                    .clone()
378                    .into_iter()
379                    .take(expected_last_processed_index)
380                    .collect::<Vec<_>>(),
381            )
382            .unwrap();
383
384        // Check commits sent over consensus output channel is accurate
385        let mut processed_subdag_index = 0;
386        while let Ok(subdag) = receiver.try_recv() {
387            tracing::info!("Processed {subdag}");
388            assert_eq!(subdag, commits[processed_subdag_index]);
389            assert_eq!(subdag.reputation_scores_desc, vec![]);
390            processed_subdag_index = subdag.commit_ref.index as usize;
391            if processed_subdag_index == expected_last_processed_index {
392                break;
393            }
394        }
395        assert_eq!(processed_subdag_index, expected_last_processed_index);
396
397        verify_channel_empty(&mut receiver);
398
399        // Check last stored commit is correct
400        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
401        assert_eq!(
402            last_commit.index(),
403            expected_last_processed_index as CommitIndex
404        );
405
406        // Handle next batch of leaders (1), these will be sent by consensus but not
407        // "processed" by consensus output channel. Simulating something happened on
408        // the consumer side where the commits were not persisted.
409        commits.append(
410            &mut observer
411                .handle_commit(
412                    leaders
413                        .clone()
414                        .into_iter()
415                        .skip(expected_last_processed_index)
416                        .collect::<Vec<_>>(),
417                )
418                .unwrap(),
419        );
420
421        let expected_last_sent_index = num_rounds as usize;
422        while let Ok(subdag) = receiver.try_recv() {
423            tracing::info!("{subdag} was sent but not processed by consumer");
424            assert_eq!(subdag, commits[processed_subdag_index]);
425            assert_eq!(subdag.reputation_scores_desc, vec![]);
426            processed_subdag_index = subdag.commit_ref.index as usize;
427            if processed_subdag_index == expected_last_sent_index {
428                break;
429            }
430        }
431        assert_eq!(processed_subdag_index, expected_last_sent_index);
432
433        verify_channel_empty(&mut receiver);
434
435        // Check last stored commit is correct. We should persist the last commit
436        // that was sent over the channel regardless of how the consumer handled
437        // the commit on their end.
438        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
439        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
440
441        // Re-create commit observer starting from index 2 which represents the
442        // last processed index from the consumer over consensus output channel
443        let _observer = CommitObserver::new(
444            context.clone(),
445            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
446            dag_state.clone(),
447            mem_store.clone(),
448            leader_schedule,
449        );
450
451        // Check commits sent over consensus output channel is accurate starting
452        // from last processed index of 2 and finishing at last sent index of 3.
453        processed_subdag_index = expected_last_processed_index;
454        while let Ok(subdag) = receiver.try_recv() {
455            tracing::info!("Processed {subdag} on resubmission");
456            assert_eq!(subdag, commits[processed_subdag_index]);
457            assert_eq!(subdag.reputation_scores_desc, vec![]);
458            processed_subdag_index = subdag.commit_ref.index as usize;
459            if processed_subdag_index == expected_last_sent_index {
460                break;
461            }
462        }
463        assert_eq!(processed_subdag_index, expected_last_sent_index);
464
465        verify_channel_empty(&mut receiver);
466    }
467
468    #[tokio::test]
469    async fn test_send_no_missing_commits() {
470        telemetry_subscribers::init_for_testing();
471        let num_authorities = 4;
472        let context = Arc::new(Context::new_for_test(num_authorities).0);
473        let mem_store = Arc::new(MemStore::new());
474        let dag_state = Arc::new(RwLock::new(DagState::new(
475            context.clone(),
476            mem_store.clone(),
477        )));
478        let last_processed_commit_index = 0;
479        let (sender, mut receiver) = unbounded_channel("consensus_output");
480
481        let leader_schedule = Arc::new(LeaderSchedule::from_store(
482            context.clone(),
483            dag_state.clone(),
484        ));
485
486        let mut observer = CommitObserver::new(
487            context.clone(),
488            CommitConsumer::new(sender.clone(), last_processed_commit_index),
489            dag_state.clone(),
490            mem_store.clone(),
491            leader_schedule.clone(),
492        );
493
494        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
495        let num_rounds = 10;
496        let mut builder = DagBuilder::new(context.clone());
497        builder
498            .layers(1..=num_rounds)
499            .build()
500            .persist_layers(dag_state.clone());
501
502        let leaders = builder
503            .leader_blocks(1..=num_rounds)
504            .into_iter()
505            .map(Option::unwrap)
506            .collect::<Vec<_>>();
507
508        // Commit all of the leaders and "receive" the subdags as the consumer of
509        // the consensus output channel.
510        let expected_last_processed_index: usize = 10;
511        let commits = observer.handle_commit(leaders.clone()).unwrap();
512
513        // Check commits sent over consensus output channel is accurate
514        let mut processed_subdag_index = 0;
515        while let Ok(subdag) = receiver.try_recv() {
516            tracing::info!("Processed {subdag}");
517            assert_eq!(subdag, commits[processed_subdag_index]);
518            assert_eq!(subdag.reputation_scores_desc, vec![]);
519            processed_subdag_index = subdag.commit_ref.index as usize;
520            if processed_subdag_index == expected_last_processed_index {
521                break;
522            }
523        }
524        assert_eq!(processed_subdag_index, expected_last_processed_index);
525
526        verify_channel_empty(&mut receiver);
527
528        // Check last stored commit is correct
529        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
530        assert_eq!(
531            last_commit.index(),
532            expected_last_processed_index as CommitIndex
533        );
534
535        // Re-create commit observer starting from index 3 which represents the
536        // last processed index from the consumer over consensus output channel
537        let _observer = CommitObserver::new(
538            context.clone(),
539            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
540            dag_state.clone(),
541            mem_store.clone(),
542            leader_schedule,
543        );
544
545        // No commits should be resubmitted as consensus store's last commit index
546        // is equal to last processed index by consumer
547        verify_channel_empty(&mut receiver);
548    }
549
550    /// After receiving all expected subdags, ensure channel is empty
551    fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
552        match receiver.try_recv() {
553            Ok(_) => {
554                panic!("Expected the consensus output channel to be empty, but found more subdags.")
555            }
556            Err(e) => match e {
557                tokio::sync::mpsc::error::TryRecvError::Empty => {}
558                tokio::sync::mpsc::error::TryRecvError::Disconnected => {
559                    panic!("The consensus output channel was unexpectedly closed.")
560                }
561            },
562        }
563    }
564}