1use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info, instrument};
11
12use crate::{
13 CommitConsumer, CommittedSubDag,
14 block::{BlockAPI, VerifiedBlock},
15 commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16 context::Context,
17 dag_state::DagState,
18 error::{ConsensusError, ConsensusResult},
19 leader_schedule::LeaderSchedule,
20 linearizer::Linearizer,
21 storage::Store,
22};
23
24pub(crate) struct CommitObserver {
40 context: Arc<Context>,
41 commit_interpreter: Linearizer,
43 sender: UnboundedSender<CommittedSubDag>,
46 store: Arc<dyn Store>,
48 leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52 pub(crate) fn new(
53 context: Arc<Context>,
54 commit_consumer: CommitConsumer,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 leader_schedule: Arc<LeaderSchedule>,
58 ) -> Self {
59 let mut observer = Self {
60 commit_interpreter: Linearizer::new(
61 context.clone(),
62 dag_state.clone(),
63 leader_schedule.clone(),
64 ),
65 context,
66 sender: commit_consumer.sender,
67 store,
68 leader_schedule,
69 };
70
71 observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72 observer
73 }
74
75 #[instrument(level = "trace", skip_all)]
76 pub(crate) fn handle_commit(
77 &mut self,
78 committed_leaders: Vec<VerifiedBlock>,
79 ) -> ConsensusResult<Vec<CommittedSubDag>> {
80 let _s = self
81 .context
82 .metrics
83 .node_metrics
84 .scope_processing_time
85 .with_label_values(&["CommitObserver::handle_commit"])
86 .start_timer();
87
88 let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
89 let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
90 for committed_sub_dag in committed_sub_dags.into_iter() {
91 if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
93 tracing::error!(
94 "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
95 );
96 return Err(ConsensusError::Shutdown);
97 }
98 tracing::debug!(
99 "Sending to execution commit {} leader {}",
100 committed_sub_dag.commit_ref,
101 committed_sub_dag.leader
102 );
103 sent_sub_dags.push(committed_sub_dag);
104 }
105
106 self.report_metrics(&sent_sub_dags);
107 tracing::trace!("Committed & sent {sent_sub_dags:#?}");
108 Ok(sent_sub_dags)
109 }
110
111 fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
112 let now = Instant::now();
113 let last_commit = self
115 .store
116 .read_last_commit()
117 .expect("Reading the last commit should not fail");
118
119 if let Some(last_commit) = &last_commit {
120 let last_commit_index = last_commit.index();
121
122 assert!(last_commit_index >= last_processed_commit_index);
123 if last_commit_index == last_processed_commit_index {
124 debug!(
125 "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
126 );
127 return;
128 }
129 };
130
131 let unsent_commits = self
134 .store
135 .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
136 .expect("Scanning commits should not fail");
137
138 info!(
139 "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
140 last_commit.map(|c| c.index()).unwrap_or_default(),
141 unsent_commits.len()
142 );
143
144 let mut last_sent_commit_index = last_processed_commit_index;
147 let num_unsent_commits = unsent_commits.len();
148 for (index, commit) in unsent_commits.into_iter().enumerate() {
149 assert_eq!(commit.index(), last_sent_commit_index + 1);
151
152 let reputation_scores = if index == num_unsent_commits - 1 {
156 self.leader_schedule
157 .leader_swap_table
158 .read()
159 .reputation_scores_desc
160 .clone()
161 } else {
162 vec![]
163 };
164
165 info!("Sending commit {} during recovery", commit.index());
166 let committed_sub_dag =
167 load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
168 self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
169 panic!("Failed to send commit during recovery, probably due to shutdown: {e:?}")
170 });
171
172 last_sent_commit_index += 1;
173 }
174
175 info!(
176 "Commit observer recovery completed, took {:?}",
177 now.elapsed()
178 );
179 }
180
181 fn report_metrics(&self, committed: &[CommittedSubDag]) {
182 let metrics = &self.context.metrics.node_metrics;
183 let utc_now = self.context.clock.timestamp_utc_ms();
184
185 for commit in committed {
186 debug!(
187 "Consensus commit {} with leader {} has {} blocks",
188 commit.commit_ref,
189 commit.leader,
190 commit.blocks.len()
191 );
192
193 metrics
194 .last_committed_leader_round
195 .set(commit.leader.round as i64);
196 metrics
197 .last_commit_index
198 .set(commit.commit_ref.index as i64);
199 metrics
200 .blocks_per_commit_count
201 .observe(commit.blocks.len() as f64);
202
203 for block in &commit.blocks {
204 let latency_ms = utc_now
205 .checked_sub(block.timestamp_ms())
206 .unwrap_or_default();
207 metrics
208 .block_commit_latency
209 .observe(Duration::from_millis(latency_ms).as_secs_f64());
210 }
211 }
212
213 self.context
214 .metrics
215 .node_metrics
216 .sub_dags_per_commit_count
217 .observe(committed.len() as f64);
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
224 use parking_lot::RwLock;
225 use rstest::rstest;
226
227 use super::*;
228 use crate::{
229 block::BlockRef, context::Context, dag_state::DagState,
230 linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
231 test_dag_builder::DagBuilder,
232 };
233
234 #[rstest]
235 #[tokio::test]
236 async fn test_handle_commit(#[values(true, false)] consensus_median_timestamp: bool) {
237 telemetry_subscribers::init_for_testing();
238 let num_authorities = 4;
239
240 let (mut context, _keys) = Context::new_for_test(num_authorities);
241 context
242 .protocol_config
243 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(
244 consensus_median_timestamp,
245 );
246
247 let context = Arc::new(context);
248
249 let mem_store = Arc::new(MemStore::new());
250 let dag_state = Arc::new(RwLock::new(DagState::new(
251 context.clone(),
252 mem_store.clone(),
253 )));
254 let last_processed_commit_index = 0;
255 let (sender, mut receiver) = unbounded_channel("consensus_output");
256
257 let leader_schedule = Arc::new(LeaderSchedule::from_store(
258 context.clone(),
259 dag_state.clone(),
260 ));
261
262 let mut observer = CommitObserver::new(
263 context.clone(),
264 CommitConsumer::new(sender, last_processed_commit_index),
265 dag_state.clone(),
266 mem_store.clone(),
267 leader_schedule,
268 );
269
270 let num_rounds = 10;
272 let mut builder = DagBuilder::new(context.clone());
273 builder
274 .layers(1..=num_rounds)
275 .build()
276 .persist_layers(dag_state.clone());
277
278 let leaders = builder
279 .leader_blocks(1..=num_rounds)
280 .into_iter()
281 .map(Option::unwrap)
282 .collect::<Vec<_>>();
283
284 let commits = observer.handle_commit(leaders.clone()).unwrap();
285
286 let mut expected_stored_refs: Vec<BlockRef> = vec![];
288 for (idx, subdag) in commits.iter().enumerate() {
289 tracing::info!("{subdag:?}");
290 assert_eq!(subdag.leader, leaders[idx].reference());
291
292 let expected_ts = if consensus_median_timestamp {
293 let block_refs = leaders[idx]
294 .ancestors()
295 .iter()
296 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
297 .cloned()
298 .collect::<Vec<_>>();
299 let blocks = dag_state
300 .read()
301 .get_blocks(&block_refs)
302 .into_iter()
303 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
304 median_timestamp_by_stake(&context, blocks).unwrap()
305 } else {
306 leaders[idx].timestamp_ms()
307 };
308
309 let expected_ts = if idx == 0 {
310 expected_ts
311 } else {
312 expected_ts.max(commits[idx - 1].timestamp_ms)
313 };
314 assert_eq!(expected_ts, subdag.timestamp_ms);
315 if idx == 0 {
316 assert_eq!(subdag.blocks.len(), 1);
319 } else {
320 assert_eq!(subdag.blocks.len(), num_authorities);
323 }
324 for block in subdag.blocks.iter() {
325 expected_stored_refs.push(block.reference());
326 assert!(block.round() <= leaders[idx].round());
327 }
328 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
329 }
330
331 let mut processed_subdag_index = 0;
333 while let Ok(subdag) = receiver.try_recv() {
334 assert_eq!(subdag, commits[processed_subdag_index]);
335 assert_eq!(subdag.reputation_scores_desc, vec![]);
336 processed_subdag_index = subdag.commit_ref.index as usize;
337 if processed_subdag_index == leaders.len() {
338 break;
339 }
340 }
341 assert_eq!(processed_subdag_index, leaders.len());
342
343 verify_channel_empty(&mut receiver);
344
345 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
347 assert_eq!(
348 last_commit.index(),
349 commits.last().unwrap().commit_ref.index
350 );
351 let all_stored_commits = mem_store
352 .scan_commits((0..=CommitIndex::MAX).into())
353 .unwrap();
354 assert_eq!(all_stored_commits.len(), leaders.len());
355 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
356 assert!(blocks_existence.iter().all(|exists| *exists));
357 }
358
359 #[tokio::test]
360 async fn test_recover_and_send_commits() {
361 telemetry_subscribers::init_for_testing();
362 let num_authorities = 4;
363 let context = Arc::new(Context::new_for_test(num_authorities).0);
364 let mem_store = Arc::new(MemStore::new());
365 let dag_state = Arc::new(RwLock::new(DagState::new(
366 context.clone(),
367 mem_store.clone(),
368 )));
369 let last_processed_commit_index = 0;
370 let (sender, mut receiver) = unbounded_channel("consensus_output");
371
372 let leader_schedule = Arc::new(LeaderSchedule::from_store(
373 context.clone(),
374 dag_state.clone(),
375 ));
376
377 let mut observer = CommitObserver::new(
378 context.clone(),
379 CommitConsumer::new(sender.clone(), last_processed_commit_index),
380 dag_state.clone(),
381 mem_store.clone(),
382 leader_schedule.clone(),
383 );
384
385 let num_rounds = 10;
387 let mut builder = DagBuilder::new(context.clone());
388 builder
389 .layers(1..=num_rounds)
390 .build()
391 .persist_layers(dag_state.clone());
392
393 let leaders = builder
394 .leader_blocks(1..=num_rounds)
395 .into_iter()
396 .map(Option::unwrap)
397 .collect::<Vec<_>>();
398
399 let expected_last_processed_index: usize = 2;
402 let mut commits = observer
403 .handle_commit(
404 leaders
405 .clone()
406 .into_iter()
407 .take(expected_last_processed_index)
408 .collect::<Vec<_>>(),
409 )
410 .unwrap();
411
412 let mut processed_subdag_index = 0;
414 while let Ok(subdag) = receiver.try_recv() {
415 tracing::info!("Processed {subdag}");
416 assert_eq!(subdag, commits[processed_subdag_index]);
417 assert_eq!(subdag.reputation_scores_desc, vec![]);
418 processed_subdag_index = subdag.commit_ref.index as usize;
419 if processed_subdag_index == expected_last_processed_index {
420 break;
421 }
422 }
423 assert_eq!(processed_subdag_index, expected_last_processed_index);
424
425 verify_channel_empty(&mut receiver);
426
427 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
429 assert_eq!(
430 last_commit.index(),
431 expected_last_processed_index as CommitIndex
432 );
433
434 commits.append(
438 &mut observer
439 .handle_commit(
440 leaders
441 .clone()
442 .into_iter()
443 .skip(expected_last_processed_index)
444 .collect::<Vec<_>>(),
445 )
446 .unwrap(),
447 );
448
449 let expected_last_sent_index = num_rounds as usize;
450 while let Ok(subdag) = receiver.try_recv() {
451 tracing::info!("{subdag} was sent but not processed by consumer");
452 assert_eq!(subdag, commits[processed_subdag_index]);
453 assert_eq!(subdag.reputation_scores_desc, vec![]);
454 processed_subdag_index = subdag.commit_ref.index as usize;
455 if processed_subdag_index == expected_last_sent_index {
456 break;
457 }
458 }
459 assert_eq!(processed_subdag_index, expected_last_sent_index);
460
461 verify_channel_empty(&mut receiver);
462
463 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
467 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
468
469 let _observer = CommitObserver::new(
472 context.clone(),
473 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
474 dag_state.clone(),
475 mem_store.clone(),
476 leader_schedule,
477 );
478
479 processed_subdag_index = expected_last_processed_index;
482 while let Ok(subdag) = receiver.try_recv() {
483 tracing::info!("Processed {subdag} on resubmission");
484 assert_eq!(subdag, commits[processed_subdag_index]);
485 assert_eq!(subdag.reputation_scores_desc, vec![]);
486 processed_subdag_index = subdag.commit_ref.index as usize;
487 if processed_subdag_index == expected_last_sent_index {
488 break;
489 }
490 }
491 assert_eq!(processed_subdag_index, expected_last_sent_index);
492
493 verify_channel_empty(&mut receiver);
494 }
495
496 #[tokio::test]
497 async fn test_send_no_missing_commits() {
498 telemetry_subscribers::init_for_testing();
499 let num_authorities = 4;
500 let context = Arc::new(Context::new_for_test(num_authorities).0);
501 let mem_store = Arc::new(MemStore::new());
502 let dag_state = Arc::new(RwLock::new(DagState::new(
503 context.clone(),
504 mem_store.clone(),
505 )));
506 let last_processed_commit_index = 0;
507 let (sender, mut receiver) = unbounded_channel("consensus_output");
508
509 let leader_schedule = Arc::new(LeaderSchedule::from_store(
510 context.clone(),
511 dag_state.clone(),
512 ));
513
514 let mut observer = CommitObserver::new(
515 context.clone(),
516 CommitConsumer::new(sender.clone(), last_processed_commit_index),
517 dag_state.clone(),
518 mem_store.clone(),
519 leader_schedule.clone(),
520 );
521
522 let num_rounds = 10;
524 let mut builder = DagBuilder::new(context.clone());
525 builder
526 .layers(1..=num_rounds)
527 .build()
528 .persist_layers(dag_state.clone());
529
530 let leaders = builder
531 .leader_blocks(1..=num_rounds)
532 .into_iter()
533 .map(Option::unwrap)
534 .collect::<Vec<_>>();
535
536 let expected_last_processed_index: usize = 10;
539 let commits = observer.handle_commit(leaders.clone()).unwrap();
540
541 let mut processed_subdag_index = 0;
543 while let Ok(subdag) = receiver.try_recv() {
544 tracing::info!("Processed {subdag}");
545 assert_eq!(subdag, commits[processed_subdag_index]);
546 assert_eq!(subdag.reputation_scores_desc, vec![]);
547 processed_subdag_index = subdag.commit_ref.index as usize;
548 if processed_subdag_index == expected_last_processed_index {
549 break;
550 }
551 }
552 assert_eq!(processed_subdag_index, expected_last_processed_index);
553
554 verify_channel_empty(&mut receiver);
555
556 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
558 assert_eq!(
559 last_commit.index(),
560 expected_last_processed_index as CommitIndex
561 );
562
563 let _observer = CommitObserver::new(
566 context.clone(),
567 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
568 dag_state.clone(),
569 mem_store.clone(),
570 leader_schedule,
571 );
572
573 verify_channel_empty(&mut receiver);
576 }
577
578 fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
580 match receiver.try_recv() {
581 Ok(_) => {
582 panic!("Expected the consensus output channel to be empty, but found more subdags.")
583 }
584 Err(e) => match e {
585 tokio::sync::mpsc::error::TryRecvError::Empty => {}
586 tokio::sync::mpsc::error::TryRecvError::Disconnected => {
587 panic!("The consensus output channel was unexpectedly closed.")
588 }
589 },
590 }
591 }
592}