consensus_core/
commit_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info, instrument};
11
12use crate::{
13    CommitConsumer, CommittedSubDag,
14    block::{BlockAPI, VerifiedBlock},
15    commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16    context::Context,
17    dag_state::DagState,
18    error::{ConsensusError, ConsensusResult},
19    leader_schedule::LeaderSchedule,
20    linearizer::Linearizer,
21    storage::Store,
22};
23
24/// Role of CommitObserver
25/// - Called by core when try_commit() returns newly committed leaders.
26/// - The newly committed leaders are sent to commit observer and then commit
27///   observer gets subdags for each leader via the commit interpreter
28///   (linearizer)
29/// - The committed subdags are sent as consensus output via an unbounded tokio
30///   channel.
31///
32/// No back pressure mechanism is needed as backpressure is handled as input
33/// into consensus.
34///
35/// - Commit metadata including index is persisted in store, before the
36///   CommittedSubDag is sent to the consumer.
37/// - When CommitObserver is initialized a last processed commit index can be
38///   used to ensure any missing commits are re-sent.
39pub(crate) struct CommitObserver {
40    context: Arc<Context>,
41    /// Component to deterministically collect subdags for committed leaders.
42    commit_interpreter: Linearizer,
43    /// An unbounded channel to send committed sub-dags to the consumer of
44    /// consensus output.
45    sender: UnboundedSender<CommittedSubDag>,
46    /// Persistent storage for blocks, commits and other consensus data.
47    store: Arc<dyn Store>,
48    leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52    pub(crate) fn new(
53        context: Arc<Context>,
54        commit_consumer: CommitConsumer,
55        dag_state: Arc<RwLock<DagState>>,
56        store: Arc<dyn Store>,
57        leader_schedule: Arc<LeaderSchedule>,
58    ) -> Self {
59        let mut observer = Self {
60            commit_interpreter: Linearizer::new(
61                context.clone(),
62                dag_state.clone(),
63                leader_schedule.clone(),
64            ),
65            context,
66            sender: commit_consumer.sender,
67            store,
68            leader_schedule,
69        };
70
71        observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72        observer
73    }
74
75    #[instrument(level = "trace", skip_all)]
76    pub(crate) fn handle_commit(
77        &mut self,
78        committed_leaders: Vec<VerifiedBlock>,
79    ) -> ConsensusResult<Vec<CommittedSubDag>> {
80        let _s = self
81            .context
82            .metrics
83            .node_metrics
84            .scope_processing_time
85            .with_label_values(&["CommitObserver::handle_commit"])
86            .start_timer();
87
88        let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
89        let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
90        for committed_sub_dag in committed_sub_dags.into_iter() {
91            // Failures in sender.send() are assumed to be permanent
92            if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
93                tracing::error!(
94                    "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
95                );
96                return Err(ConsensusError::Shutdown);
97            }
98            tracing::debug!(
99                "Sending to execution commit {} leader {}",
100                committed_sub_dag.commit_ref,
101                committed_sub_dag.leader
102            );
103            sent_sub_dags.push(committed_sub_dag);
104        }
105
106        self.report_metrics(&sent_sub_dags);
107        tracing::trace!("Committed & sent {sent_sub_dags:#?}");
108        Ok(sent_sub_dags)
109    }
110
111    fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
112        let now = Instant::now();
113        // TODO: remove this check, to allow consensus to regenerate commits?
114        let last_commit = self
115            .store
116            .read_last_commit()
117            .expect("Reading the last commit should not fail");
118
119        if let Some(last_commit) = &last_commit {
120            let last_commit_index = last_commit.index();
121
122            assert!(last_commit_index >= last_processed_commit_index);
123            if last_commit_index == last_processed_commit_index {
124                debug!(
125                    "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
126                );
127                return;
128            }
129        };
130
131        // We should not send the last processed commit again, so
132        // last_processed_commit_index+1
133        let unsent_commits = self
134            .store
135            .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
136            .expect("Scanning commits should not fail");
137
138        info!(
139            "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
140            last_commit.map(|c| c.index()).unwrap_or_default(),
141            unsent_commits.len()
142        );
143
144        // Resend all the committed subdags to the consensus output channel
145        // for all the commits above the last processed index.
146        let mut last_sent_commit_index = last_processed_commit_index;
147        let num_unsent_commits = unsent_commits.len();
148        for (index, commit) in unsent_commits.into_iter().enumerate() {
149            // Commit index must be continuous.
150            assert_eq!(commit.index(), last_sent_commit_index + 1);
151
152            // On recovery leader schedule will be updated with the current scores
153            // and the scores will be passed along with the last commit sent to
154            // iota so that the current scores are available for submission.
155            let reputation_scores = if index == num_unsent_commits - 1 {
156                self.leader_schedule
157                    .leader_swap_table
158                    .read()
159                    .reputation_scores_desc
160                    .clone()
161            } else {
162                vec![]
163            };
164
165            info!("Sending commit {} during recovery", commit.index());
166            let committed_sub_dag =
167                load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
168            self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
169                panic!("Failed to send commit during recovery, probably due to shutdown: {e:?}")
170            });
171
172            last_sent_commit_index += 1;
173        }
174
175        info!(
176            "Commit observer recovery completed, took {:?}",
177            now.elapsed()
178        );
179    }
180
181    fn report_metrics(&self, committed: &[CommittedSubDag]) {
182        let metrics = &self.context.metrics.node_metrics;
183        let utc_now = self.context.clock.timestamp_utc_ms();
184
185        for commit in committed {
186            debug!(
187                "Consensus commit {} with leader {} has {} blocks",
188                commit.commit_ref,
189                commit.leader,
190                commit.blocks.len()
191            );
192
193            metrics
194                .last_committed_leader_round
195                .set(commit.leader.round as i64);
196            metrics
197                .last_commit_index
198                .set(commit.commit_ref.index as i64);
199            metrics
200                .blocks_per_commit_count
201                .observe(commit.blocks.len() as f64);
202
203            for block in &commit.blocks {
204                let latency_ms = utc_now
205                    .checked_sub(block.timestamp_ms())
206                    .unwrap_or_default();
207                metrics
208                    .block_commit_latency
209                    .observe(Duration::from_millis(latency_ms).as_secs_f64());
210            }
211        }
212
213        self.context
214            .metrics
215            .node_metrics
216            .sub_dags_per_commit_count
217            .observe(committed.len() as f64);
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
224    use parking_lot::RwLock;
225    use rstest::rstest;
226
227    use super::*;
228    use crate::{
229        block::BlockRef, context::Context, dag_state::DagState,
230        linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
231        test_dag_builder::DagBuilder,
232    };
233
234    #[rstest]
235    #[tokio::test]
236    async fn test_handle_commit(#[values(true, false)] consensus_median_timestamp: bool) {
237        telemetry_subscribers::init_for_testing();
238        let num_authorities = 4;
239
240        let (mut context, _keys) = Context::new_for_test(num_authorities);
241        context
242            .protocol_config
243            .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
244                consensus_median_timestamp,
245            );
246
247        let context = Arc::new(context);
248
249        let mem_store = Arc::new(MemStore::new());
250        let dag_state = Arc::new(RwLock::new(DagState::new(
251            context.clone(),
252            mem_store.clone(),
253        )));
254        let last_processed_commit_index = 0;
255        let (sender, mut receiver) = unbounded_channel("consensus_output");
256
257        let leader_schedule = Arc::new(LeaderSchedule::from_store(
258            context.clone(),
259            dag_state.clone(),
260        ));
261
262        let mut observer = CommitObserver::new(
263            context.clone(),
264            CommitConsumer::new(sender, last_processed_commit_index),
265            dag_state.clone(),
266            mem_store.clone(),
267            leader_schedule,
268        );
269
270        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
271        let num_rounds = 10;
272        let mut builder = DagBuilder::new(context.clone());
273        builder
274            .layers(1..=num_rounds)
275            .build()
276            .persist_layers(dag_state.clone());
277
278        let leaders = builder
279            .leader_blocks(1..=num_rounds)
280            .into_iter()
281            .map(Option::unwrap)
282            .collect::<Vec<_>>();
283
284        let commits = observer.handle_commit(leaders.clone()).unwrap();
285
286        // Check commits are returned by CommitObserver::handle_commit is accurate
287        let mut expected_stored_refs: Vec<BlockRef> = vec![];
288        for (idx, subdag) in commits.iter().enumerate() {
289            tracing::info!("{subdag:?}");
290            assert_eq!(subdag.leader, leaders[idx].reference());
291
292            let expected_ts = if consensus_median_timestamp {
293                let block_refs = leaders[idx]
294                    .ancestors()
295                    .iter()
296                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
297                    .cloned()
298                    .collect::<Vec<_>>();
299                let blocks = dag_state
300                    .read()
301                    .get_blocks(&block_refs)
302                    .into_iter()
303                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
304                median_timestamp_by_stake(&context, blocks).unwrap()
305            } else {
306                leaders[idx].timestamp_ms()
307            };
308
309            let expected_ts = if idx == 0 {
310                expected_ts
311            } else {
312                expected_ts.max(commits[idx - 1].timestamp_ms)
313            };
314            assert_eq!(expected_ts, subdag.timestamp_ms);
315            if idx == 0 {
316                // First subdag includes the leader block plus all ancestor blocks
317                // of the leader minus the genesis round blocks
318                assert_eq!(subdag.blocks.len(), 1);
319            } else {
320                // Every subdag after will be missing the leader block from the previous
321                // committed subdag
322                assert_eq!(subdag.blocks.len(), num_authorities);
323            }
324            for block in subdag.blocks.iter() {
325                expected_stored_refs.push(block.reference());
326                assert!(block.round() <= leaders[idx].round());
327            }
328            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
329        }
330
331        // Check commits sent over consensus output channel is accurate
332        let mut processed_subdag_index = 0;
333        while let Ok(subdag) = receiver.try_recv() {
334            assert_eq!(subdag, commits[processed_subdag_index]);
335            assert_eq!(subdag.reputation_scores_desc, vec![]);
336            processed_subdag_index = subdag.commit_ref.index as usize;
337            if processed_subdag_index == leaders.len() {
338                break;
339            }
340        }
341        assert_eq!(processed_subdag_index, leaders.len());
342
343        verify_channel_empty(&mut receiver);
344
345        // Check commits have been persisted to storage
346        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
347        assert_eq!(
348            last_commit.index(),
349            commits.last().unwrap().commit_ref.index
350        );
351        let all_stored_commits = mem_store
352            .scan_commits((0..=CommitIndex::MAX).into())
353            .unwrap();
354        assert_eq!(all_stored_commits.len(), leaders.len());
355        let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
356        assert!(blocks_existence.iter().all(|exists| *exists));
357    }
358
359    #[tokio::test]
360    async fn test_recover_and_send_commits() {
361        telemetry_subscribers::init_for_testing();
362        let num_authorities = 4;
363        let context = Arc::new(Context::new_for_test(num_authorities).0);
364        let mem_store = Arc::new(MemStore::new());
365        let dag_state = Arc::new(RwLock::new(DagState::new(
366            context.clone(),
367            mem_store.clone(),
368        )));
369        let last_processed_commit_index = 0;
370        let (sender, mut receiver) = unbounded_channel("consensus_output");
371
372        let leader_schedule = Arc::new(LeaderSchedule::from_store(
373            context.clone(),
374            dag_state.clone(),
375        ));
376
377        let mut observer = CommitObserver::new(
378            context.clone(),
379            CommitConsumer::new(sender.clone(), last_processed_commit_index),
380            dag_state.clone(),
381            mem_store.clone(),
382            leader_schedule.clone(),
383        );
384
385        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
386        let num_rounds = 10;
387        let mut builder = DagBuilder::new(context.clone());
388        builder
389            .layers(1..=num_rounds)
390            .build()
391            .persist_layers(dag_state.clone());
392
393        let leaders = builder
394            .leader_blocks(1..=num_rounds)
395            .into_iter()
396            .map(Option::unwrap)
397            .collect::<Vec<_>>();
398
399        // Commit first batch of leaders (2) and "receive" the subdags as the
400        // consumer of the consensus output channel.
401        let expected_last_processed_index: usize = 2;
402        let mut commits = observer
403            .handle_commit(
404                leaders
405                    .clone()
406                    .into_iter()
407                    .take(expected_last_processed_index)
408                    .collect::<Vec<_>>(),
409            )
410            .unwrap();
411
412        // Check commits sent over consensus output channel is accurate
413        let mut processed_subdag_index = 0;
414        while let Ok(subdag) = receiver.try_recv() {
415            tracing::info!("Processed {subdag}");
416            assert_eq!(subdag, commits[processed_subdag_index]);
417            assert_eq!(subdag.reputation_scores_desc, vec![]);
418            processed_subdag_index = subdag.commit_ref.index as usize;
419            if processed_subdag_index == expected_last_processed_index {
420                break;
421            }
422        }
423        assert_eq!(processed_subdag_index, expected_last_processed_index);
424
425        verify_channel_empty(&mut receiver);
426
427        // Check last stored commit is correct
428        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
429        assert_eq!(
430            last_commit.index(),
431            expected_last_processed_index as CommitIndex
432        );
433
434        // Handle next batch of leaders (1), these will be sent by consensus but not
435        // "processed" by consensus output channel. Simulating something happened on
436        // the consumer side where the commits were not persisted.
437        commits.append(
438            &mut observer
439                .handle_commit(
440                    leaders
441                        .clone()
442                        .into_iter()
443                        .skip(expected_last_processed_index)
444                        .collect::<Vec<_>>(),
445                )
446                .unwrap(),
447        );
448
449        let expected_last_sent_index = num_rounds as usize;
450        while let Ok(subdag) = receiver.try_recv() {
451            tracing::info!("{subdag} was sent but not processed by consumer");
452            assert_eq!(subdag, commits[processed_subdag_index]);
453            assert_eq!(subdag.reputation_scores_desc, vec![]);
454            processed_subdag_index = subdag.commit_ref.index as usize;
455            if processed_subdag_index == expected_last_sent_index {
456                break;
457            }
458        }
459        assert_eq!(processed_subdag_index, expected_last_sent_index);
460
461        verify_channel_empty(&mut receiver);
462
463        // Check last stored commit is correct. We should persist the last commit
464        // that was sent over the channel regardless of how the consumer handled
465        // the commit on their end.
466        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
467        assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
468
469        // Re-create commit observer starting from index 2 which represents the
470        // last processed index from the consumer over consensus output channel
471        let _observer = CommitObserver::new(
472            context.clone(),
473            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
474            dag_state.clone(),
475            mem_store.clone(),
476            leader_schedule,
477        );
478
479        // Check commits sent over consensus output channel is accurate starting
480        // from last processed index of 2 and finishing at last sent index of 3.
481        processed_subdag_index = expected_last_processed_index;
482        while let Ok(subdag) = receiver.try_recv() {
483            tracing::info!("Processed {subdag} on resubmission");
484            assert_eq!(subdag, commits[processed_subdag_index]);
485            assert_eq!(subdag.reputation_scores_desc, vec![]);
486            processed_subdag_index = subdag.commit_ref.index as usize;
487            if processed_subdag_index == expected_last_sent_index {
488                break;
489            }
490        }
491        assert_eq!(processed_subdag_index, expected_last_sent_index);
492
493        verify_channel_empty(&mut receiver);
494    }
495
496    #[tokio::test]
497    async fn test_send_no_missing_commits() {
498        telemetry_subscribers::init_for_testing();
499        let num_authorities = 4;
500        let context = Arc::new(Context::new_for_test(num_authorities).0);
501        let mem_store = Arc::new(MemStore::new());
502        let dag_state = Arc::new(RwLock::new(DagState::new(
503            context.clone(),
504            mem_store.clone(),
505        )));
506        let last_processed_commit_index = 0;
507        let (sender, mut receiver) = unbounded_channel("consensus_output");
508
509        let leader_schedule = Arc::new(LeaderSchedule::from_store(
510            context.clone(),
511            dag_state.clone(),
512        ));
513
514        let mut observer = CommitObserver::new(
515            context.clone(),
516            CommitConsumer::new(sender.clone(), last_processed_commit_index),
517            dag_state.clone(),
518            mem_store.clone(),
519            leader_schedule.clone(),
520        );
521
522        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
523        let num_rounds = 10;
524        let mut builder = DagBuilder::new(context.clone());
525        builder
526            .layers(1..=num_rounds)
527            .build()
528            .persist_layers(dag_state.clone());
529
530        let leaders = builder
531            .leader_blocks(1..=num_rounds)
532            .into_iter()
533            .map(Option::unwrap)
534            .collect::<Vec<_>>();
535
536        // Commit all of the leaders and "receive" the subdags as the consumer of
537        // the consensus output channel.
538        let expected_last_processed_index: usize = 10;
539        let commits = observer.handle_commit(leaders.clone()).unwrap();
540
541        // Check commits sent over consensus output channel is accurate
542        let mut processed_subdag_index = 0;
543        while let Ok(subdag) = receiver.try_recv() {
544            tracing::info!("Processed {subdag}");
545            assert_eq!(subdag, commits[processed_subdag_index]);
546            assert_eq!(subdag.reputation_scores_desc, vec![]);
547            processed_subdag_index = subdag.commit_ref.index as usize;
548            if processed_subdag_index == expected_last_processed_index {
549                break;
550            }
551        }
552        assert_eq!(processed_subdag_index, expected_last_processed_index);
553
554        verify_channel_empty(&mut receiver);
555
556        // Check last stored commit is correct
557        let last_commit = mem_store.read_last_commit().unwrap().unwrap();
558        assert_eq!(
559            last_commit.index(),
560            expected_last_processed_index as CommitIndex
561        );
562
563        // Re-create commit observer starting from index 3 which represents the
564        // last processed index from the consumer over consensus output channel
565        let _observer = CommitObserver::new(
566            context.clone(),
567            CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
568            dag_state.clone(),
569            mem_store.clone(),
570            leader_schedule,
571        );
572
573        // No commits should be resubmitted as consensus store's last commit index
574        // is equal to last processed index by consumer
575        verify_channel_empty(&mut receiver);
576    }
577
578    /// After receiving all expected subdags, ensure channel is empty
579    fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
580        match receiver.try_recv() {
581            Ok(_) => {
582                panic!("Expected the consensus output channel to be empty, but found more subdags.")
583            }
584            Err(e) => match e {
585                tokio::sync::mpsc::error::TryRecvError::Empty => {}
586                tokio::sync::mpsc::error::TryRecvError::Disconnected => {
587                    panic!("The consensus output channel was unexpectedly closed.")
588                }
589            },
590        }
591    }
592}