1use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info};
11
12use crate::{
13 CommitConsumer, CommittedSubDag,
14 block::{BlockAPI, VerifiedBlock},
15 commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16 context::Context,
17 dag_state::DagState,
18 error::{ConsensusError, ConsensusResult},
19 leader_schedule::LeaderSchedule,
20 linearizer::Linearizer,
21 storage::Store,
22};
23
24pub(crate) struct CommitObserver {
40 context: Arc<Context>,
41 commit_interpreter: Linearizer,
43 sender: UnboundedSender<CommittedSubDag>,
46 store: Arc<dyn Store>,
48 leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52 pub(crate) fn new(
53 context: Arc<Context>,
54 commit_consumer: CommitConsumer,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 leader_schedule: Arc<LeaderSchedule>,
58 ) -> Self {
59 let mut observer = Self {
60 commit_interpreter: Linearizer::new(
61 context.clone(),
62 dag_state.clone(),
63 leader_schedule.clone(),
64 ),
65 context,
66 sender: commit_consumer.sender,
67 store,
68 leader_schedule,
69 };
70
71 observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72 observer
73 }
74
75 pub(crate) fn handle_commit(
76 &mut self,
77 committed_leaders: Vec<VerifiedBlock>,
78 ) -> ConsensusResult<Vec<CommittedSubDag>> {
79 let _s = self
80 .context
81 .metrics
82 .node_metrics
83 .scope_processing_time
84 .with_label_values(&["CommitObserver::handle_commit"])
85 .start_timer();
86
87 let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
88 let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
89 for committed_sub_dag in committed_sub_dags.into_iter() {
90 if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
92 tracing::error!(
93 "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
94 );
95 return Err(ConsensusError::Shutdown);
96 }
97 tracing::debug!(
98 "Sending to execution commit {} leader {}",
99 committed_sub_dag.commit_ref,
100 committed_sub_dag.leader
101 );
102 sent_sub_dags.push(committed_sub_dag);
103 }
104
105 self.report_metrics(&sent_sub_dags);
106 tracing::trace!("Committed & sent {sent_sub_dags:#?}");
107 Ok(sent_sub_dags)
108 }
109
110 fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
111 let now = Instant::now();
112 let last_commit = self
114 .store
115 .read_last_commit()
116 .expect("Reading the last commit should not fail");
117
118 if let Some(last_commit) = &last_commit {
119 let last_commit_index = last_commit.index();
120
121 assert!(last_commit_index >= last_processed_commit_index);
122 if last_commit_index == last_processed_commit_index {
123 debug!(
124 "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
125 );
126 return;
127 }
128 };
129
130 let unsent_commits = self
133 .store
134 .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
135 .expect("Scanning commits should not fail");
136
137 info!(
138 "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
139 last_commit.map(|c| c.index()).unwrap_or_default(),
140 unsent_commits.len()
141 );
142
143 let mut last_sent_commit_index = last_processed_commit_index;
146 let num_unsent_commits = unsent_commits.len();
147 for (index, commit) in unsent_commits.into_iter().enumerate() {
148 assert_eq!(commit.index(), last_sent_commit_index + 1);
150
151 let reputation_scores = if index == num_unsent_commits - 1 {
155 self.leader_schedule
156 .leader_swap_table
157 .read()
158 .reputation_scores_desc
159 .clone()
160 } else {
161 vec![]
162 };
163
164 info!("Sending commit {} during recovery", commit.index());
165 let committed_sub_dag =
166 load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
167 self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
168 panic!(
169 "Failed to send commit during recovery, probably due to shutdown: {:?}",
170 e
171 )
172 });
173
174 last_sent_commit_index += 1;
175 }
176
177 info!(
178 "Commit observer recovery completed, took {:?}",
179 now.elapsed()
180 );
181 }
182
183 fn report_metrics(&self, committed: &[CommittedSubDag]) {
184 let metrics = &self.context.metrics.node_metrics;
185 let utc_now = self.context.clock.timestamp_utc_ms();
186
187 for commit in committed {
188 debug!(
189 "Consensus commit {} with leader {} has {} blocks",
190 commit.commit_ref,
191 commit.leader,
192 commit.blocks.len()
193 );
194
195 metrics
196 .last_committed_leader_round
197 .set(commit.leader.round as i64);
198 metrics
199 .last_commit_index
200 .set(commit.commit_ref.index as i64);
201 metrics
202 .blocks_per_commit_count
203 .observe(commit.blocks.len() as f64);
204
205 for block in &commit.blocks {
206 let latency_ms = utc_now
207 .checked_sub(block.timestamp_ms())
208 .unwrap_or_default();
209 metrics
210 .block_commit_latency
211 .observe(Duration::from_millis(latency_ms).as_secs_f64());
212 }
213 }
214
215 self.context
216 .metrics
217 .node_metrics
218 .sub_dags_per_commit_count
219 .observe(committed.len() as f64);
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
226 use parking_lot::RwLock;
227
228 use super::*;
229 use crate::{
230 block::BlockRef, context::Context, dag_state::DagState, storage::mem_store::MemStore,
231 test_dag_builder::DagBuilder,
232 };
233
234 #[tokio::test]
235 async fn test_handle_commit() {
236 telemetry_subscribers::init_for_testing();
237 let num_authorities = 4;
238 let context = Arc::new(Context::new_for_test(num_authorities).0);
239 let mem_store = Arc::new(MemStore::new());
240 let dag_state = Arc::new(RwLock::new(DagState::new(
241 context.clone(),
242 mem_store.clone(),
243 )));
244 let last_processed_commit_index = 0;
245 let (sender, mut receiver) = unbounded_channel("consensus_output");
246
247 let leader_schedule = Arc::new(LeaderSchedule::from_store(
248 context.clone(),
249 dag_state.clone(),
250 ));
251
252 let mut observer = CommitObserver::new(
253 context.clone(),
254 CommitConsumer::new(sender, last_processed_commit_index),
255 dag_state.clone(),
256 mem_store.clone(),
257 leader_schedule,
258 );
259
260 let num_rounds = 10;
262 let mut builder = DagBuilder::new(context.clone());
263 builder
264 .layers(1..=num_rounds)
265 .build()
266 .persist_layers(dag_state.clone());
267
268 let leaders = builder
269 .leader_blocks(1..=num_rounds)
270 .into_iter()
271 .map(Option::unwrap)
272 .collect::<Vec<_>>();
273
274 let commits = observer.handle_commit(leaders.clone()).unwrap();
275
276 let mut expected_stored_refs: Vec<BlockRef> = vec![];
278 for (idx, subdag) in commits.iter().enumerate() {
279 tracing::info!("{subdag:?}");
280 assert_eq!(subdag.leader, leaders[idx].reference());
281 let expected_ts = if idx == 0 {
282 leaders[idx].timestamp_ms()
283 } else {
284 leaders[idx]
285 .timestamp_ms()
286 .max(commits[idx - 1].timestamp_ms)
287 };
288 assert_eq!(expected_ts, subdag.timestamp_ms);
289 if idx == 0 {
290 assert_eq!(subdag.blocks.len(), 1);
293 } else {
294 assert_eq!(subdag.blocks.len(), num_authorities);
297 }
298 for block in subdag.blocks.iter() {
299 expected_stored_refs.push(block.reference());
300 assert!(block.round() <= leaders[idx].round());
301 }
302 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
303 }
304
305 let mut processed_subdag_index = 0;
307 while let Ok(subdag) = receiver.try_recv() {
308 assert_eq!(subdag, commits[processed_subdag_index]);
309 assert_eq!(subdag.reputation_scores_desc, vec![]);
310 processed_subdag_index = subdag.commit_ref.index as usize;
311 if processed_subdag_index == leaders.len() {
312 break;
313 }
314 }
315 assert_eq!(processed_subdag_index, leaders.len());
316
317 verify_channel_empty(&mut receiver);
318
319 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
321 assert_eq!(
322 last_commit.index(),
323 commits.last().unwrap().commit_ref.index
324 );
325 let all_stored_commits = mem_store
326 .scan_commits((0..=CommitIndex::MAX).into())
327 .unwrap();
328 assert_eq!(all_stored_commits.len(), leaders.len());
329 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
330 assert!(blocks_existence.iter().all(|exists| *exists));
331 }
332
333 #[tokio::test]
334 async fn test_recover_and_send_commits() {
335 telemetry_subscribers::init_for_testing();
336 let num_authorities = 4;
337 let context = Arc::new(Context::new_for_test(num_authorities).0);
338 let mem_store = Arc::new(MemStore::new());
339 let dag_state = Arc::new(RwLock::new(DagState::new(
340 context.clone(),
341 mem_store.clone(),
342 )));
343 let last_processed_commit_index = 0;
344 let (sender, mut receiver) = unbounded_channel("consensus_output");
345
346 let leader_schedule = Arc::new(LeaderSchedule::from_store(
347 context.clone(),
348 dag_state.clone(),
349 ));
350
351 let mut observer = CommitObserver::new(
352 context.clone(),
353 CommitConsumer::new(sender.clone(), last_processed_commit_index),
354 dag_state.clone(),
355 mem_store.clone(),
356 leader_schedule.clone(),
357 );
358
359 let num_rounds = 10;
361 let mut builder = DagBuilder::new(context.clone());
362 builder
363 .layers(1..=num_rounds)
364 .build()
365 .persist_layers(dag_state.clone());
366
367 let leaders = builder
368 .leader_blocks(1..=num_rounds)
369 .into_iter()
370 .map(Option::unwrap)
371 .collect::<Vec<_>>();
372
373 let expected_last_processed_index: usize = 2;
376 let mut commits = observer
377 .handle_commit(
378 leaders
379 .clone()
380 .into_iter()
381 .take(expected_last_processed_index)
382 .collect::<Vec<_>>(),
383 )
384 .unwrap();
385
386 let mut processed_subdag_index = 0;
388 while let Ok(subdag) = receiver.try_recv() {
389 tracing::info!("Processed {subdag}");
390 assert_eq!(subdag, commits[processed_subdag_index]);
391 assert_eq!(subdag.reputation_scores_desc, vec![]);
392 processed_subdag_index = subdag.commit_ref.index as usize;
393 if processed_subdag_index == expected_last_processed_index {
394 break;
395 }
396 }
397 assert_eq!(processed_subdag_index, expected_last_processed_index);
398
399 verify_channel_empty(&mut receiver);
400
401 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
403 assert_eq!(
404 last_commit.index(),
405 expected_last_processed_index as CommitIndex
406 );
407
408 commits.append(
412 &mut observer
413 .handle_commit(
414 leaders
415 .clone()
416 .into_iter()
417 .skip(expected_last_processed_index)
418 .collect::<Vec<_>>(),
419 )
420 .unwrap(),
421 );
422
423 let expected_last_sent_index = num_rounds as usize;
424 while let Ok(subdag) = receiver.try_recv() {
425 tracing::info!("{subdag} was sent but not processed by consumer");
426 assert_eq!(subdag, commits[processed_subdag_index]);
427 assert_eq!(subdag.reputation_scores_desc, vec![]);
428 processed_subdag_index = subdag.commit_ref.index as usize;
429 if processed_subdag_index == expected_last_sent_index {
430 break;
431 }
432 }
433 assert_eq!(processed_subdag_index, expected_last_sent_index);
434
435 verify_channel_empty(&mut receiver);
436
437 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
441 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
442
443 let _observer = CommitObserver::new(
446 context.clone(),
447 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
448 dag_state.clone(),
449 mem_store.clone(),
450 leader_schedule,
451 );
452
453 processed_subdag_index = expected_last_processed_index;
456 while let Ok(subdag) = receiver.try_recv() {
457 tracing::info!("Processed {subdag} on resubmission");
458 assert_eq!(subdag, commits[processed_subdag_index]);
459 assert_eq!(subdag.reputation_scores_desc, vec![]);
460 processed_subdag_index = subdag.commit_ref.index as usize;
461 if processed_subdag_index == expected_last_sent_index {
462 break;
463 }
464 }
465 assert_eq!(processed_subdag_index, expected_last_sent_index);
466
467 verify_channel_empty(&mut receiver);
468 }
469
470 #[tokio::test]
471 async fn test_send_no_missing_commits() {
472 telemetry_subscribers::init_for_testing();
473 let num_authorities = 4;
474 let context = Arc::new(Context::new_for_test(num_authorities).0);
475 let mem_store = Arc::new(MemStore::new());
476 let dag_state = Arc::new(RwLock::new(DagState::new(
477 context.clone(),
478 mem_store.clone(),
479 )));
480 let last_processed_commit_index = 0;
481 let (sender, mut receiver) = unbounded_channel("consensus_output");
482
483 let leader_schedule = Arc::new(LeaderSchedule::from_store(
484 context.clone(),
485 dag_state.clone(),
486 ));
487
488 let mut observer = CommitObserver::new(
489 context.clone(),
490 CommitConsumer::new(sender.clone(), last_processed_commit_index),
491 dag_state.clone(),
492 mem_store.clone(),
493 leader_schedule.clone(),
494 );
495
496 let num_rounds = 10;
498 let mut builder = DagBuilder::new(context.clone());
499 builder
500 .layers(1..=num_rounds)
501 .build()
502 .persist_layers(dag_state.clone());
503
504 let leaders = builder
505 .leader_blocks(1..=num_rounds)
506 .into_iter()
507 .map(Option::unwrap)
508 .collect::<Vec<_>>();
509
510 let expected_last_processed_index: usize = 10;
513 let commits = observer.handle_commit(leaders.clone()).unwrap();
514
515 let mut processed_subdag_index = 0;
517 while let Ok(subdag) = receiver.try_recv() {
518 tracing::info!("Processed {subdag}");
519 assert_eq!(subdag, commits[processed_subdag_index]);
520 assert_eq!(subdag.reputation_scores_desc, vec![]);
521 processed_subdag_index = subdag.commit_ref.index as usize;
522 if processed_subdag_index == expected_last_processed_index {
523 break;
524 }
525 }
526 assert_eq!(processed_subdag_index, expected_last_processed_index);
527
528 verify_channel_empty(&mut receiver);
529
530 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
532 assert_eq!(
533 last_commit.index(),
534 expected_last_processed_index as CommitIndex
535 );
536
537 let _observer = CommitObserver::new(
540 context.clone(),
541 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
542 dag_state.clone(),
543 mem_store.clone(),
544 leader_schedule,
545 );
546
547 verify_channel_empty(&mut receiver);
550 }
551
552 fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
554 match receiver.try_recv() {
555 Ok(_) => {
556 panic!("Expected the consensus output channel to be empty, but found more subdags.")
557 }
558 Err(e) => match e {
559 tokio::sync::mpsc::error::TryRecvError::Empty => {}
560 tokio::sync::mpsc::error::TryRecvError::Disconnected => {
561 panic!("The consensus output channel was unexpectedly closed.")
562 }
563 },
564 }
565 }
566}