1use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info, instrument};
11
12use crate::{
13 CommitConsumer, CommittedSubDag,
14 block::{BlockAPI, VerifiedBlock},
15 commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16 context::Context,
17 dag_state::DagState,
18 error::{ConsensusError, ConsensusResult},
19 leader_schedule::LeaderSchedule,
20 linearizer::Linearizer,
21 storage::Store,
22};
23
24pub(crate) struct CommitObserver {
40 context: Arc<Context>,
41 commit_interpreter: Linearizer,
43 sender: UnboundedSender<CommittedSubDag>,
46 store: Arc<dyn Store>,
48 leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52 pub(crate) fn new(
53 context: Arc<Context>,
54 commit_consumer: CommitConsumer,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 leader_schedule: Arc<LeaderSchedule>,
58 ) -> Self {
59 let mut observer = Self {
60 commit_interpreter: Linearizer::new(
61 context.clone(),
62 dag_state.clone(),
63 leader_schedule.clone(),
64 ),
65 context,
66 sender: commit_consumer.sender,
67 store,
68 leader_schedule,
69 };
70
71 observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72 observer
73 }
74
75 #[instrument(level = "trace", skip_all)]
76 pub(crate) fn handle_commit(
77 &mut self,
78 committed_leaders: Vec<VerifiedBlock>,
79 ) -> ConsensusResult<Vec<CommittedSubDag>> {
80 let _s = self
81 .context
82 .metrics
83 .node_metrics
84 .scope_processing_time
85 .with_label_values(&["CommitObserver::handle_commit"])
86 .start_timer();
87
88 let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
89 let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
90 for committed_sub_dag in committed_sub_dags.into_iter() {
91 if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
93 tracing::error!(
94 "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
95 );
96 return Err(ConsensusError::Shutdown);
97 }
98 tracing::debug!(
99 "Sending to execution commit {} leader {}",
100 committed_sub_dag.commit_ref,
101 committed_sub_dag.leader
102 );
103 sent_sub_dags.push(committed_sub_dag);
104 }
105
106 self.report_metrics(&sent_sub_dags);
107 tracing::trace!("Committed & sent {sent_sub_dags:#?}");
108 Ok(sent_sub_dags)
109 }
110
111 fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
112 let now = Instant::now();
113 let last_commit = self
115 .store
116 .read_last_commit()
117 .expect("Reading the last commit should not fail");
118
119 if let Some(last_commit) = &last_commit {
120 let last_commit_index = last_commit.index();
121
122 assert!(last_commit_index >= last_processed_commit_index);
123 if last_commit_index == last_processed_commit_index {
124 debug!(
125 "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
126 );
127 return;
128 }
129 };
130
131 let unsent_commits = self
134 .store
135 .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
136 .expect("Scanning commits should not fail");
137
138 info!(
139 "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
140 last_commit.map(|c| c.index()).unwrap_or_default(),
141 unsent_commits.len()
142 );
143
144 let mut last_sent_commit_index = last_processed_commit_index;
147 let num_unsent_commits = unsent_commits.len();
148 for (index, commit) in unsent_commits.into_iter().enumerate() {
149 assert_eq!(commit.index(), last_sent_commit_index + 1);
151
152 let reputation_scores = if index == num_unsent_commits - 1 {
156 self.leader_schedule
157 .leader_swap_table
158 .read()
159 .reputation_scores_desc
160 .clone()
161 } else {
162 vec![]
163 };
164
165 info!("Sending commit {} during recovery", commit.index());
166 let committed_sub_dag =
167 load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
168 self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
169 panic!("Failed to send commit during recovery, probably due to shutdown: {e:?}")
170 });
171
172 last_sent_commit_index += 1;
173 }
174
175 info!(
176 "Commit observer recovery completed, took {:?}",
177 now.elapsed()
178 );
179 }
180
181 fn report_metrics(&self, committed: &[CommittedSubDag]) {
182 let metrics = &self.context.metrics.node_metrics;
183 let utc_now = self.context.clock.timestamp_utc_ms();
184
185 for commit in committed {
186 debug!(
187 "Consensus commit {} with leader {} has {} blocks",
188 commit.commit_ref,
189 commit.leader,
190 commit.blocks.len()
191 );
192
193 metrics
194 .last_committed_leader_round
195 .set(commit.leader.round as i64);
196 metrics
197 .last_commit_index
198 .set(commit.commit_ref.index as i64);
199 metrics
200 .blocks_per_commit_count
201 .observe(commit.blocks.len() as f64);
202
203 for block in &commit.blocks {
204 let latency_ms = utc_now
205 .checked_sub(block.timestamp_ms())
206 .unwrap_or_default();
207 metrics
208 .block_commit_latency
209 .observe(Duration::from_millis(latency_ms).as_secs_f64());
210 }
211 }
212
213 self.context
214 .metrics
215 .node_metrics
216 .sub_dags_per_commit_count
217 .observe(committed.len() as f64);
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
224 use parking_lot::RwLock;
225
226 use super::*;
227 use crate::{
228 block::BlockRef, context::Context, dag_state::DagState, storage::mem_store::MemStore,
229 test_dag_builder::DagBuilder,
230 };
231
232 #[tokio::test]
233 async fn test_handle_commit() {
234 telemetry_subscribers::init_for_testing();
235 let num_authorities = 4;
236 let context = Arc::new(Context::new_for_test(num_authorities).0);
237 let mem_store = Arc::new(MemStore::new());
238 let dag_state = Arc::new(RwLock::new(DagState::new(
239 context.clone(),
240 mem_store.clone(),
241 )));
242 let last_processed_commit_index = 0;
243 let (sender, mut receiver) = unbounded_channel("consensus_output");
244
245 let leader_schedule = Arc::new(LeaderSchedule::from_store(
246 context.clone(),
247 dag_state.clone(),
248 ));
249
250 let mut observer = CommitObserver::new(
251 context.clone(),
252 CommitConsumer::new(sender, last_processed_commit_index),
253 dag_state.clone(),
254 mem_store.clone(),
255 leader_schedule,
256 );
257
258 let num_rounds = 10;
260 let mut builder = DagBuilder::new(context.clone());
261 builder
262 .layers(1..=num_rounds)
263 .build()
264 .persist_layers(dag_state.clone());
265
266 let leaders = builder
267 .leader_blocks(1..=num_rounds)
268 .into_iter()
269 .map(Option::unwrap)
270 .collect::<Vec<_>>();
271
272 let commits = observer.handle_commit(leaders.clone()).unwrap();
273
274 let mut expected_stored_refs: Vec<BlockRef> = vec![];
276 for (idx, subdag) in commits.iter().enumerate() {
277 tracing::info!("{subdag:?}");
278 assert_eq!(subdag.leader, leaders[idx].reference());
279 let expected_ts = if idx == 0 {
280 leaders[idx].timestamp_ms()
281 } else {
282 leaders[idx]
283 .timestamp_ms()
284 .max(commits[idx - 1].timestamp_ms)
285 };
286 assert_eq!(expected_ts, subdag.timestamp_ms);
287 if idx == 0 {
288 assert_eq!(subdag.blocks.len(), 1);
291 } else {
292 assert_eq!(subdag.blocks.len(), num_authorities);
295 }
296 for block in subdag.blocks.iter() {
297 expected_stored_refs.push(block.reference());
298 assert!(block.round() <= leaders[idx].round());
299 }
300 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
301 }
302
303 let mut processed_subdag_index = 0;
305 while let Ok(subdag) = receiver.try_recv() {
306 assert_eq!(subdag, commits[processed_subdag_index]);
307 assert_eq!(subdag.reputation_scores_desc, vec![]);
308 processed_subdag_index = subdag.commit_ref.index as usize;
309 if processed_subdag_index == leaders.len() {
310 break;
311 }
312 }
313 assert_eq!(processed_subdag_index, leaders.len());
314
315 verify_channel_empty(&mut receiver);
316
317 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
319 assert_eq!(
320 last_commit.index(),
321 commits.last().unwrap().commit_ref.index
322 );
323 let all_stored_commits = mem_store
324 .scan_commits((0..=CommitIndex::MAX).into())
325 .unwrap();
326 assert_eq!(all_stored_commits.len(), leaders.len());
327 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
328 assert!(blocks_existence.iter().all(|exists| *exists));
329 }
330
331 #[tokio::test]
332 async fn test_recover_and_send_commits() {
333 telemetry_subscribers::init_for_testing();
334 let num_authorities = 4;
335 let context = Arc::new(Context::new_for_test(num_authorities).0);
336 let mem_store = Arc::new(MemStore::new());
337 let dag_state = Arc::new(RwLock::new(DagState::new(
338 context.clone(),
339 mem_store.clone(),
340 )));
341 let last_processed_commit_index = 0;
342 let (sender, mut receiver) = unbounded_channel("consensus_output");
343
344 let leader_schedule = Arc::new(LeaderSchedule::from_store(
345 context.clone(),
346 dag_state.clone(),
347 ));
348
349 let mut observer = CommitObserver::new(
350 context.clone(),
351 CommitConsumer::new(sender.clone(), last_processed_commit_index),
352 dag_state.clone(),
353 mem_store.clone(),
354 leader_schedule.clone(),
355 );
356
357 let num_rounds = 10;
359 let mut builder = DagBuilder::new(context.clone());
360 builder
361 .layers(1..=num_rounds)
362 .build()
363 .persist_layers(dag_state.clone());
364
365 let leaders = builder
366 .leader_blocks(1..=num_rounds)
367 .into_iter()
368 .map(Option::unwrap)
369 .collect::<Vec<_>>();
370
371 let expected_last_processed_index: usize = 2;
374 let mut commits = observer
375 .handle_commit(
376 leaders
377 .clone()
378 .into_iter()
379 .take(expected_last_processed_index)
380 .collect::<Vec<_>>(),
381 )
382 .unwrap();
383
384 let mut processed_subdag_index = 0;
386 while let Ok(subdag) = receiver.try_recv() {
387 tracing::info!("Processed {subdag}");
388 assert_eq!(subdag, commits[processed_subdag_index]);
389 assert_eq!(subdag.reputation_scores_desc, vec![]);
390 processed_subdag_index = subdag.commit_ref.index as usize;
391 if processed_subdag_index == expected_last_processed_index {
392 break;
393 }
394 }
395 assert_eq!(processed_subdag_index, expected_last_processed_index);
396
397 verify_channel_empty(&mut receiver);
398
399 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
401 assert_eq!(
402 last_commit.index(),
403 expected_last_processed_index as CommitIndex
404 );
405
406 commits.append(
410 &mut observer
411 .handle_commit(
412 leaders
413 .clone()
414 .into_iter()
415 .skip(expected_last_processed_index)
416 .collect::<Vec<_>>(),
417 )
418 .unwrap(),
419 );
420
421 let expected_last_sent_index = num_rounds as usize;
422 while let Ok(subdag) = receiver.try_recv() {
423 tracing::info!("{subdag} was sent but not processed by consumer");
424 assert_eq!(subdag, commits[processed_subdag_index]);
425 assert_eq!(subdag.reputation_scores_desc, vec![]);
426 processed_subdag_index = subdag.commit_ref.index as usize;
427 if processed_subdag_index == expected_last_sent_index {
428 break;
429 }
430 }
431 assert_eq!(processed_subdag_index, expected_last_sent_index);
432
433 verify_channel_empty(&mut receiver);
434
435 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
439 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
440
441 let _observer = CommitObserver::new(
444 context.clone(),
445 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
446 dag_state.clone(),
447 mem_store.clone(),
448 leader_schedule,
449 );
450
451 processed_subdag_index = expected_last_processed_index;
454 while let Ok(subdag) = receiver.try_recv() {
455 tracing::info!("Processed {subdag} on resubmission");
456 assert_eq!(subdag, commits[processed_subdag_index]);
457 assert_eq!(subdag.reputation_scores_desc, vec![]);
458 processed_subdag_index = subdag.commit_ref.index as usize;
459 if processed_subdag_index == expected_last_sent_index {
460 break;
461 }
462 }
463 assert_eq!(processed_subdag_index, expected_last_sent_index);
464
465 verify_channel_empty(&mut receiver);
466 }
467
468 #[tokio::test]
469 async fn test_send_no_missing_commits() {
470 telemetry_subscribers::init_for_testing();
471 let num_authorities = 4;
472 let context = Arc::new(Context::new_for_test(num_authorities).0);
473 let mem_store = Arc::new(MemStore::new());
474 let dag_state = Arc::new(RwLock::new(DagState::new(
475 context.clone(),
476 mem_store.clone(),
477 )));
478 let last_processed_commit_index = 0;
479 let (sender, mut receiver) = unbounded_channel("consensus_output");
480
481 let leader_schedule = Arc::new(LeaderSchedule::from_store(
482 context.clone(),
483 dag_state.clone(),
484 ));
485
486 let mut observer = CommitObserver::new(
487 context.clone(),
488 CommitConsumer::new(sender.clone(), last_processed_commit_index),
489 dag_state.clone(),
490 mem_store.clone(),
491 leader_schedule.clone(),
492 );
493
494 let num_rounds = 10;
496 let mut builder = DagBuilder::new(context.clone());
497 builder
498 .layers(1..=num_rounds)
499 .build()
500 .persist_layers(dag_state.clone());
501
502 let leaders = builder
503 .leader_blocks(1..=num_rounds)
504 .into_iter()
505 .map(Option::unwrap)
506 .collect::<Vec<_>>();
507
508 let expected_last_processed_index: usize = 10;
511 let commits = observer.handle_commit(leaders.clone()).unwrap();
512
513 let mut processed_subdag_index = 0;
515 while let Ok(subdag) = receiver.try_recv() {
516 tracing::info!("Processed {subdag}");
517 assert_eq!(subdag, commits[processed_subdag_index]);
518 assert_eq!(subdag.reputation_scores_desc, vec![]);
519 processed_subdag_index = subdag.commit_ref.index as usize;
520 if processed_subdag_index == expected_last_processed_index {
521 break;
522 }
523 }
524 assert_eq!(processed_subdag_index, expected_last_processed_index);
525
526 verify_channel_empty(&mut receiver);
527
528 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
530 assert_eq!(
531 last_commit.index(),
532 expected_last_processed_index as CommitIndex
533 );
534
535 let _observer = CommitObserver::new(
538 context.clone(),
539 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
540 dag_state.clone(),
541 mem_store.clone(),
542 leader_schedule,
543 );
544
545 verify_channel_empty(&mut receiver);
548 }
549
550 fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
552 match receiver.try_recv() {
553 Ok(_) => {
554 panic!("Expected the consensus output channel to be empty, but found more subdags.")
555 }
556 Err(e) => match e {
557 tokio::sync::mpsc::error::TryRecvError::Empty => {}
558 tokio::sync::mpsc::error::TryRecvError::Disconnected => {
559 panic!("The consensus output channel was unexpectedly closed.")
560 }
561 },
562 }
563 }
564}