consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info};
11
12use crate::{
13    CommitConsumer, CommittedSubDag,
14    block::{BlockAPI, VerifiedBlock},
15    commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16    context::Context,
17    dag_state::DagState,
18    error::{ConsensusError, ConsensusResult},
19    leader_schedule::LeaderSchedule,
20    linearizer::Linearizer,
21    storage::Store,
22};
23
24/// Role of CommitObserver
25/// - Called by core when try_commit() returns newly committed leaders.
26/// - The newly committed leaders are sent to commit observer and then commit
27///   observer gets subdags for each leader via the commit interpreter
28///   (linearizer)
29/// - The committed subdags are sent as consensus output via an unbounded tokio
30///   channel.
31///
32/// No back pressure mechanism is needed as backpressure is handled as input
33/// into consensus.
34///
35/// - Commit metadata including index is persisted in store, before the
36///   CommittedSubDag is sent to the consumer.
37/// - When CommitObserver is initialized a last processed commit index can be
38///   used to ensure any missing commits are re-sent.
39pub(crate) struct CommitObserver {
40    context: Arc<Context>,
41    /// Component to deterministically collect subdags for committed leaders.
42    commit_interpreter: Linearizer,
43    /// An unbounded channel to send committed sub-dags to the consumer of
44    /// consensus output.
45    sender: UnboundedSender<CommittedSubDag>,
46    /// Persistent storage for blocks, commits and other consensus data.
47    store: Arc<dyn Store>,
48    leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52    pub(crate) fn new(
53        context: Arc<Context>,
54        commit_consumer: CommitConsumer,
55        dag_state: Arc<RwLock<DagState>>,
56        store: Arc<dyn Store>,
57        leader_schedule: Arc<LeaderSchedule>,
58    ) -> Self {
59        let mut observer = Self {
60            commit_interpreter: Linearizer::new(
61                context.clone(),
62                dag_state.clone(),
63                leader_schedule.clone(),
64            ),
65            context,
66            sender: commit_consumer.sender,
67            store,
68            leader_schedule,
69        };
70
71        observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72        observer
73    }
74
75    pub(crate) fn handle_commit(
76        &mut self,
77        committed_leaders: Vec<VerifiedBlock>,
78    ) -> ConsensusResult<Vec<CommittedSubDag>> {
79        let _s = self
80            .context
81            .metrics
82            .node_metrics
83            .scope_processing_time
84            .with_label_values(&["CommitObserver::handle_commit"])
85            .start_timer();
86
87        let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
88        let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
89        for committed_sub_dag in committed_sub_dags.into_iter() {
90            // Failures in sender.send() are assumed to be permanent
91            if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
92                tracing::error!(
93                    "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
94                );
95                return Err(ConsensusError::Shutdown);
96            }
97            tracing::debug!(
98                "Sending to execution commit {} leader {}",
99                committed_sub_dag.commit_ref,
100                committed_sub_dag.leader
101            );
102            sent_sub_dags.push(committed_sub_dag);
103        }
104
105        self.report_metrics(&sent_sub_dags);
106        tracing::trace!("Committed & sent {sent_sub_dags:#?}");
107        Ok(sent_sub_dags)
108    }
109
110    fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
111        let now = Instant::now();
112        // TODO: remove this check, to allow consensus to regenerate commits?
113        let last_commit = self
114            .store
115            .read_last_commit()
116            .expect("Reading the last commit should not fail");
117
118        if let Some(last_commit) = &last_commit {
119            let last_commit_index = last_commit.index();
120
121            assert!(last_commit_index >= last_processed_commit_index);
122            if last_commit_index == last_processed_commit_index {
123                debug!(
124                    "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
125                );
126                return;
127            }
128        };
129
130        // We should not send the last processed commit again, so
131        // last_processed_commit_index+1
132        let unsent_commits = self
133            .store
134            .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
135            .expect("Scanning commits should not fail");
136
137        info!(
138            "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
139            last_commit.map(|c| c.index()).unwrap_or_default(),
140            unsent_commits.len()
141        );
142
143        // Resend all the committed subdags to the consensus output channel
144        // for all the commits above the last processed index.
145        let mut last_sent_commit_index = last_processed_commit_index;
146        let num_unsent_commits = unsent_commits.len();
147        for (index, commit) in unsent_commits.into_iter().enumerate() {
148            // Commit index must be continuous.
149            assert_eq!(commit.index(), last_sent_commit_index + 1);
150
151            // On recovery leader schedule will be updated with the current scores
152            // and the scores will be passed along with the last commit sent to
153            // iota so that the current scores are available for submission.
154            let reputation_scores = if index == num_unsent_commits - 1 {
155                self.leader_schedule
156                    .leader_swap_table
157                    .read()
158                    .reputation_scores_desc
159                    .clone()
160            } else {
161                vec![]
162            };
163
164            info!("Sending commit {} during recovery", commit.index());
165            let committed_sub_dag =
166                load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
167            self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
168                panic!(
169                    "Failed to send commit during recovery, probably due to shutdown: {:?}",
170                    e
171                )
172            });
173
174            last_sent_commit_index += 1;
175        }
176
177        info!(
178            "Commit observer recovery completed, took {:?}",
179            now.elapsed()
180        );
181    }
182
183    fn report_metrics(&self, committed: &[CommittedSubDag]) {
184        let metrics = &self.context.metrics.node_metrics;
185        let utc_now = self.context.clock.timestamp_utc_ms();
186
187        for commit in committed {
188            debug!(
189                "Consensus commit {} with leader {} has {} blocks",
190                commit.commit_ref,
191                commit.leader,
192                commit.blocks.len()
193            );
194
195            metrics
196                .last_committed_leader_round
197                .set(commit.leader.round as i64);
198            metrics
199                .last_commit_index
200                .set(commit.commit_ref.index as i64);
201            metrics
202                .blocks_per_commit_count
203                .observe(commit.blocks.len() as f64);
204
205            for block in &commit.blocks {
206                let latency_ms = utc_now
207                    .checked_sub(block.timestamp_ms())
208                    .unwrap_or_default();
209                metrics
210                    .block_commit_latency
211                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
212            }
213        }
214
215        self.context
216            .metrics
217            .node_metrics
218            .sub_dags_per_commit_count
219            .observe(committed.len() as f64);
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
226    use parking_lot::RwLock;
227
228    use super::*;
229    use crate::{
230        block::BlockRef, context::Context, dag_state::DagState, storage::mem_store::MemStore,
231        test_dag_builder::DagBuilder,
232    };
233
234    #[tokio::test]
235    async fn test_handle_commit() {
236        telemetry_subscribers::init_for_testing();
237        let num_authorities = 4;
238        let context = Arc::new(Context::new_for_test(num_authorities).0);
239        let mem_store = Arc::new(MemStore::new());
240        let dag_state = Arc::new(RwLock::new(DagState::new(
241            context.clone(),
242            mem_store.clone(),
243        )));
244        let last_processed_commit_index = 0;
245        let (sender, mut receiver) = unbounded_channel("consensus_output");
246
247        let leader_schedule = Arc::new(LeaderSchedule::from_store(
248            context.clone(),
249            dag_state.clone(),
250        ));
251
252        let mut observer = CommitObserver::new(
253            context.clone(),
254            CommitConsumer::new(sender, last_processed_commit_index),
255            dag_state.clone(),
256            mem_store.clone(),
257            leader_schedule,
258        );
259
260        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
261        let num_rounds = 10;
262        let mut builder = DagBuilder::new(context.clone());
263        builder
264            .layers(1..=num_rounds)
265            .build()
266            .persist_layers(dag_state.clone());
267
268        let leaders = builder
269            .leader_blocks(1..=num_rounds)
270            .into_iter()
271            .map(Option::unwrap)
272            .collect::<Vec<_>>();
273
274        let commits = observer.handle_commit(leaders.clone()).unwrap();
275
276        // Check commits are returned by CommitObserver::handle_commit is accurate
277        let mut expected_stored_refs: Vec<BlockRef> = vec![];
278        for (idx, subdag) in commits.iter().enumerate() {
279            tracing::info!("{subdag:?}");
280            assert_eq!(subdag.leader, leaders[idx].reference());
281            let expected_ts = if idx == 0 {
282                leaders[idx].timestamp_ms()
283            } else {
284                leaders[idx]
285                    .timestamp_ms()
286                    .max(commits[idx - 1].timestamp_ms)
287            };
288            assert_eq!(expected_ts, subdag.timestamp_ms);
289            if idx == 0 {
290                // First subdag includes the leader block plus all ancestor blocks
291                // of the leader minus the genesis round blocks
292                assert_eq!(subdag.blocks.len(), 1);
293            } else {
294                // Every subdag after will be missing the leader block from the previous
295                // committed subdag
296                assert_eq!(subdag.blocks.len(), num_authorities);
297            }
298            for block in subdag.blocks.iter() {
299                expected_stored_refs.push(block.reference());
300                assert!(block.round() <= leaders[idx].round());
301            }
302            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
303        }
304
305        // Check commits sent over consensus output channel is accurate
306        let mut processed_subdag_index = 0;
307        while let Ok(subdag) = receiver.try_recv() {
308            assert_eq!(subdag, commits[processed_subdag_index]);
309            assert_eq!(subdag.reputation_scores_desc, vec![]);
310            processed_subdag_index = subdag.commit_ref.index as usize;
311            if processed_subdag_index == leaders.len() {
312                break;
313            }
314        }
315        assert_eq!(processed_subdag_index, leaders.len());
316
317        verify_channel_empty(&mut receiver);
318
319        // Check commits have been persisted to storage
320        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
321        assert_eq!(
322            last_commit.index(),
323            commits.last().unwrap().commit_ref.index
324        );
325        let all_stored_commits = mem_store
326            .scan_commits((0..=CommitIndex::MAX).into())
327            .unwrap();
328        assert_eq!(all_stored_commits.len(), leaders.len());
329        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
330        assert!(blocks_existence.iter().all(|exists| *exists));
331    }
332
333    #[tokio::test]
334    async fn test_recover_and_send_commits() {
335        telemetry_subscribers::init_for_testing();
336        let num_authorities = 4;
337        let context = Arc::new(Context::new_for_test(num_authorities).0);
338        let mem_store = Arc::new(MemStore::new());
339        let dag_state = Arc::new(RwLock::new(DagState::new(
340            context.clone(),
341            mem_store.clone(),
342        )));
343        let last_processed_commit_index = 0;
344        let (sender, mut receiver) = unbounded_channel("consensus_output");
345
346        let leader_schedule = Arc::new(LeaderSchedule::from_store(
347            context.clone(),
348            dag_state.clone(),
349        ));
350
351        let mut observer = CommitObserver::new(
352            context.clone(),
353            CommitConsumer::new(sender.clone(), last_processed_commit_index),
354            dag_state.clone(),
355            mem_store.clone(),
356            leader_schedule.clone(),
357        );
358
359        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
360        let num_rounds = 10;
361        let mut builder = DagBuilder::new(context.clone());
362        builder
363            .layers(1..=num_rounds)
364            .build()
365            .persist_layers(dag_state.clone());
366
367        let leaders = builder
368            .leader_blocks(1..=num_rounds)
369            .into_iter()
370            .map(Option::unwrap)
371            .collect::<Vec<_>>();
372
373        // Commit first batch of leaders (2) and "receive" the subdags as the
374        // consumer of the consensus output channel.
375        let expected_last_processed_index: usize = 2;
376        let mut commits = observer
377            .handle_commit(
378                leaders
379                    .clone()
380                    .into_iter()
381                    .take(expected_last_processed_index)
382                    .collect::<Vec<_>>(),
383            )
384            .unwrap();
385
386        // Check commits sent over consensus output channel is accurate
387        let mut processed_subdag_index = 0;
388        while let Ok(subdag) = receiver.try_recv() {
389            tracing::info!("Processed {subdag}");
390            assert_eq!(subdag, commits[processed_subdag_index]);
391            assert_eq!(subdag.reputation_scores_desc, vec![]);
392            processed_subdag_index = subdag.commit_ref.index as usize;
393            if processed_subdag_index == expected_last_processed_index {
394                break;
395            }
396        }
397        assert_eq!(processed_subdag_index, expected_last_processed_index);
398
399        verify_channel_empty(&mut receiver);
400
401        // Check last stored commit is correct
402        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
403        assert_eq!(
404            last_commit.index(),
405            expected_last_processed_index as CommitIndex
406        );
407
408        // Handle next batch of leaders (1), these will be sent by consensus but not
409        // "processed" by consensus output channel. Simulating something happened on
410        // the consumer side where the commits were not persisted.
411        commits.append(
412            &mut observer
413                .handle_commit(
414                    leaders
415                        .clone()
416                        .into_iter()
417                        .skip(expected_last_processed_index)
418                        .collect::<Vec<_>>(),
419                )
420                .unwrap(),
421        );
422
423        let expected_last_sent_index = num_rounds as usize;
424        while let Ok(subdag) = receiver.try_recv() {
425            tracing::info!("{subdag} was sent but not processed by consumer");
426            assert_eq!(subdag, commits[processed_subdag_index]);
427            assert_eq!(subdag.reputation_scores_desc, vec![]);
428            processed_subdag_index = subdag.commit_ref.index as usize;
429            if processed_subdag_index == expected_last_sent_index {
430                break;
431            }
432        }
433        assert_eq!(processed_subdag_index, expected_last_sent_index);
434
435        verify_channel_empty(&mut receiver);
436
437        // Check last stored commit is correct. We should persist the last commit
438        // that was sent over the channel regardless of how the consumer handled
439        // the commit on their end.
440        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
441        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
442
443        // Re-create commit observer starting from index 2 which represents the
444        // last processed index from the consumer over consensus output channel
445        let _observer = CommitObserver::new(
446            context.clone(),
447            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
448            dag_state.clone(),
449            mem_store.clone(),
450            leader_schedule,
451        );
452
453        // Check commits sent over consensus output channel is accurate starting
454        // from last processed index of 2 and finishing at last sent index of 3.
455        processed_subdag_index = expected_last_processed_index;
456        while let Ok(subdag) = receiver.try_recv() {
457            tracing::info!("Processed {subdag} on resubmission");
458            assert_eq!(subdag, commits[processed_subdag_index]);
459            assert_eq!(subdag.reputation_scores_desc, vec![]);
460            processed_subdag_index = subdag.commit_ref.index as usize;
461            if processed_subdag_index == expected_last_sent_index {
462                break;
463            }
464        }
465        assert_eq!(processed_subdag_index, expected_last_sent_index);
466
467        verify_channel_empty(&mut receiver);
468    }
469
470    #[tokio::test]
471    async fn test_send_no_missing_commits() {
472        telemetry_subscribers::init_for_testing();
473        let num_authorities = 4;
474        let context = Arc::new(Context::new_for_test(num_authorities).0);
475        let mem_store = Arc::new(MemStore::new());
476        let dag_state = Arc::new(RwLock::new(DagState::new(
477            context.clone(),
478            mem_store.clone(),
479        )));
480        let last_processed_commit_index = 0;
481        let (sender, mut receiver) = unbounded_channel("consensus_output");
482
483        let leader_schedule = Arc::new(LeaderSchedule::from_store(
484            context.clone(),
485            dag_state.clone(),
486        ));
487
488        let mut observer = CommitObserver::new(
489            context.clone(),
490            CommitConsumer::new(sender.clone(), last_processed_commit_index),
491            dag_state.clone(),
492            mem_store.clone(),
493            leader_schedule.clone(),
494        );
495
496        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
497        let num_rounds = 10;
498        let mut builder = DagBuilder::new(context.clone());
499        builder
500            .layers(1..=num_rounds)
501            .build()
502            .persist_layers(dag_state.clone());
503
504        let leaders = builder
505            .leader_blocks(1..=num_rounds)
506            .into_iter()
507            .map(Option::unwrap)
508            .collect::<Vec<_>>();
509
510        // Commit all of the leaders and "receive" the subdags as the consumer of
511        // the consensus output channel.
512        let expected_last_processed_index: usize = 10;
513        let commits = observer.handle_commit(leaders.clone()).unwrap();
514
515        // Check commits sent over consensus output channel is accurate
516        let mut processed_subdag_index = 0;
517        while let Ok(subdag) = receiver.try_recv() {
518            tracing::info!("Processed {subdag}");
519            assert_eq!(subdag, commits[processed_subdag_index]);
520            assert_eq!(subdag.reputation_scores_desc, vec![]);
521            processed_subdag_index = subdag.commit_ref.index as usize;
522            if processed_subdag_index == expected_last_processed_index {
523                break;
524            }
525        }
526        assert_eq!(processed_subdag_index, expected_last_processed_index);
527
528        verify_channel_empty(&mut receiver);
529
530        // Check last stored commit is correct
531        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
532        assert_eq!(
533            last_commit.index(),
534            expected_last_processed_index as CommitIndex
535        );
536
537        // Re-create commit observer starting from index 3 which represents the
538        // last processed index from the consumer over consensus output channel
539        let _observer = CommitObserver::new(
540            context.clone(),
541            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
542            dag_state.clone(),
543            mem_store.clone(),
544            leader_schedule,
545        );
546
547        // No commits should be resubmitted as consensus store's last commit index
548        // is equal to last processed index by consumer
549        verify_channel_empty(&mut receiver);
550    }
551
552    /// After receiving all expected subdags, ensure channel is empty
553    fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
554        match receiver.try_recv() {
555            Ok(_) => {
556                panic!("Expected the consensus output channel to be empty, but found more subdags.")
557            }
558            Err(e) => match e {
559                tokio::sync::mpsc::error::TryRecvError::Empty => {}
560                tokio::sync::mpsc::error::TryRecvError::Disconnected => {
561                    panic!("The consensus output channel was unexpectedly closed.")
562                }
563            },
564        }
565    }
566}