1use std::{sync::Arc, time::Duration};
6
7use iota_metrics::monitored_mpsc::UnboundedSender;
8use parking_lot::RwLock;
9use tokio::time::Instant;
10use tracing::{debug, info};
11
12use crate::{
13 CommitConsumer, CommittedSubDag,
14 block::{BlockAPI, VerifiedBlock},
15 commit::{CommitAPI, CommitIndex, load_committed_subdag_from_store},
16 context::Context,
17 dag_state::DagState,
18 error::{ConsensusError, ConsensusResult},
19 leader_schedule::LeaderSchedule,
20 linearizer::Linearizer,
21 storage::Store,
22};
23
24pub(crate) struct CommitObserver {
40 context: Arc<Context>,
41 commit_interpreter: Linearizer,
43 sender: UnboundedSender<CommittedSubDag>,
46 store: Arc<dyn Store>,
48 leader_schedule: Arc<LeaderSchedule>,
49}
50
51impl CommitObserver {
52 pub(crate) fn new(
53 context: Arc<Context>,
54 commit_consumer: CommitConsumer,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 leader_schedule: Arc<LeaderSchedule>,
58 ) -> Self {
59 let mut observer = Self {
60 commit_interpreter: Linearizer::new(
61 context.clone(),
62 dag_state.clone(),
63 leader_schedule.clone(),
64 ),
65 context,
66 sender: commit_consumer.sender,
67 store,
68 leader_schedule,
69 };
70
71 observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
72 observer
73 }
74
75 pub(crate) fn handle_commit(
76 &mut self,
77 committed_leaders: Vec<VerifiedBlock>,
78 ) -> ConsensusResult<Vec<CommittedSubDag>> {
79 let _s = self
80 .context
81 .metrics
82 .node_metrics
83 .scope_processing_time
84 .with_label_values(&["CommitObserver::handle_commit"])
85 .start_timer();
86
87 let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
88 let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
89 for committed_sub_dag in committed_sub_dags.into_iter() {
90 if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
92 tracing::error!(
93 "Failed to send committed sub-dag, probably due to shutdown: {err:?}"
94 );
95 return Err(ConsensusError::Shutdown);
96 }
97 tracing::debug!(
98 "Sending to execution commit {} leader {}",
99 committed_sub_dag.commit_ref,
100 committed_sub_dag.leader
101 );
102 sent_sub_dags.push(committed_sub_dag);
103 }
104
105 self.report_metrics(&sent_sub_dags);
106 tracing::trace!("Committed & sent {sent_sub_dags:#?}");
107 Ok(sent_sub_dags)
108 }
109
110 fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
111 let now = Instant::now();
112 let last_commit = self
114 .store
115 .read_last_commit()
116 .expect("Reading the last commit should not fail");
117
118 if let Some(last_commit) = &last_commit {
119 let last_commit_index = last_commit.index();
120
121 assert!(last_commit_index >= last_processed_commit_index);
122 if last_commit_index == last_processed_commit_index {
123 debug!(
124 "Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"
125 );
126 return;
127 }
128 };
129
130 let unsent_commits = self
133 .store
134 .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
135 .expect("Scanning commits should not fail");
136
137 info!(
138 "Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits",
139 last_commit.map(|c| c.index()).unwrap_or_default(),
140 unsent_commits.len()
141 );
142
143 let mut last_sent_commit_index = last_processed_commit_index;
146 let num_unsent_commits = unsent_commits.len();
147 for (index, commit) in unsent_commits.into_iter().enumerate() {
148 assert_eq!(commit.index(), last_sent_commit_index + 1);
150
151 let reputation_scores = if index == num_unsent_commits - 1 {
155 self.leader_schedule
156 .leader_swap_table
157 .read()
158 .reputation_scores_desc
159 .clone()
160 } else {
161 vec![]
162 };
163
164 info!("Sending commit {} during recovery", commit.index());
165 let committed_sub_dag =
166 load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
167 self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
168 panic!("Failed to send commit during recovery, probably due to shutdown: {e:?}")
169 });
170
171 last_sent_commit_index += 1;
172 }
173
174 info!(
175 "Commit observer recovery completed, took {:?}",
176 now.elapsed()
177 );
178 }
179
180 fn report_metrics(&self, committed: &[CommittedSubDag]) {
181 let metrics = &self.context.metrics.node_metrics;
182 let utc_now = self.context.clock.timestamp_utc_ms();
183
184 for commit in committed {
185 debug!(
186 "Consensus commit {} with leader {} has {} blocks",
187 commit.commit_ref,
188 commit.leader,
189 commit.blocks.len()
190 );
191
192 metrics
193 .last_committed_leader_round
194 .set(commit.leader.round as i64);
195 metrics
196 .last_commit_index
197 .set(commit.commit_ref.index as i64);
198 metrics
199 .blocks_per_commit_count
200 .observe(commit.blocks.len() as f64);
201
202 for block in &commit.blocks {
203 let latency_ms = utc_now
204 .checked_sub(block.timestamp_ms())
205 .unwrap_or_default();
206 metrics
207 .block_commit_latency
208 .observe(Duration::from_millis(latency_ms).as_secs_f64());
209 }
210 }
211
212 self.context
213 .metrics
214 .node_metrics
215 .sub_dags_per_commit_count
216 .observe(committed.len() as f64);
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
223 use parking_lot::RwLock;
224
225 use super::*;
226 use crate::{
227 block::BlockRef, context::Context, dag_state::DagState, storage::mem_store::MemStore,
228 test_dag_builder::DagBuilder,
229 };
230
231 #[tokio::test]
232 async fn test_handle_commit() {
233 telemetry_subscribers::init_for_testing();
234 let num_authorities = 4;
235 let context = Arc::new(Context::new_for_test(num_authorities).0);
236 let mem_store = Arc::new(MemStore::new());
237 let dag_state = Arc::new(RwLock::new(DagState::new(
238 context.clone(),
239 mem_store.clone(),
240 )));
241 let last_processed_commit_index = 0;
242 let (sender, mut receiver) = unbounded_channel("consensus_output");
243
244 let leader_schedule = Arc::new(LeaderSchedule::from_store(
245 context.clone(),
246 dag_state.clone(),
247 ));
248
249 let mut observer = CommitObserver::new(
250 context.clone(),
251 CommitConsumer::new(sender, last_processed_commit_index),
252 dag_state.clone(),
253 mem_store.clone(),
254 leader_schedule,
255 );
256
257 let num_rounds = 10;
259 let mut builder = DagBuilder::new(context.clone());
260 builder
261 .layers(1..=num_rounds)
262 .build()
263 .persist_layers(dag_state.clone());
264
265 let leaders = builder
266 .leader_blocks(1..=num_rounds)
267 .into_iter()
268 .map(Option::unwrap)
269 .collect::<Vec<_>>();
270
271 let commits = observer.handle_commit(leaders.clone()).unwrap();
272
273 let mut expected_stored_refs: Vec<BlockRef> = vec![];
275 for (idx, subdag) in commits.iter().enumerate() {
276 tracing::info!("{subdag:?}");
277 assert_eq!(subdag.leader, leaders[idx].reference());
278 let expected_ts = if idx == 0 {
279 leaders[idx].timestamp_ms()
280 } else {
281 leaders[idx]
282 .timestamp_ms()
283 .max(commits[idx - 1].timestamp_ms)
284 };
285 assert_eq!(expected_ts, subdag.timestamp_ms);
286 if idx == 0 {
287 assert_eq!(subdag.blocks.len(), 1);
290 } else {
291 assert_eq!(subdag.blocks.len(), num_authorities);
294 }
295 for block in subdag.blocks.iter() {
296 expected_stored_refs.push(block.reference());
297 assert!(block.round() <= leaders[idx].round());
298 }
299 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
300 }
301
302 let mut processed_subdag_index = 0;
304 while let Ok(subdag) = receiver.try_recv() {
305 assert_eq!(subdag, commits[processed_subdag_index]);
306 assert_eq!(subdag.reputation_scores_desc, vec![]);
307 processed_subdag_index = subdag.commit_ref.index as usize;
308 if processed_subdag_index == leaders.len() {
309 break;
310 }
311 }
312 assert_eq!(processed_subdag_index, leaders.len());
313
314 verify_channel_empty(&mut receiver);
315
316 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
318 assert_eq!(
319 last_commit.index(),
320 commits.last().unwrap().commit_ref.index
321 );
322 let all_stored_commits = mem_store
323 .scan_commits((0..=CommitIndex::MAX).into())
324 .unwrap();
325 assert_eq!(all_stored_commits.len(), leaders.len());
326 let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
327 assert!(blocks_existence.iter().all(|exists| *exists));
328 }
329
330 #[tokio::test]
331 async fn test_recover_and_send_commits() {
332 telemetry_subscribers::init_for_testing();
333 let num_authorities = 4;
334 let context = Arc::new(Context::new_for_test(num_authorities).0);
335 let mem_store = Arc::new(MemStore::new());
336 let dag_state = Arc::new(RwLock::new(DagState::new(
337 context.clone(),
338 mem_store.clone(),
339 )));
340 let last_processed_commit_index = 0;
341 let (sender, mut receiver) = unbounded_channel("consensus_output");
342
343 let leader_schedule = Arc::new(LeaderSchedule::from_store(
344 context.clone(),
345 dag_state.clone(),
346 ));
347
348 let mut observer = CommitObserver::new(
349 context.clone(),
350 CommitConsumer::new(sender.clone(), last_processed_commit_index),
351 dag_state.clone(),
352 mem_store.clone(),
353 leader_schedule.clone(),
354 );
355
356 let num_rounds = 10;
358 let mut builder = DagBuilder::new(context.clone());
359 builder
360 .layers(1..=num_rounds)
361 .build()
362 .persist_layers(dag_state.clone());
363
364 let leaders = builder
365 .leader_blocks(1..=num_rounds)
366 .into_iter()
367 .map(Option::unwrap)
368 .collect::<Vec<_>>();
369
370 let expected_last_processed_index: usize = 2;
373 let mut commits = observer
374 .handle_commit(
375 leaders
376 .clone()
377 .into_iter()
378 .take(expected_last_processed_index)
379 .collect::<Vec<_>>(),
380 )
381 .unwrap();
382
383 let mut processed_subdag_index = 0;
385 while let Ok(subdag) = receiver.try_recv() {
386 tracing::info!("Processed {subdag}");
387 assert_eq!(subdag, commits[processed_subdag_index]);
388 assert_eq!(subdag.reputation_scores_desc, vec![]);
389 processed_subdag_index = subdag.commit_ref.index as usize;
390 if processed_subdag_index == expected_last_processed_index {
391 break;
392 }
393 }
394 assert_eq!(processed_subdag_index, expected_last_processed_index);
395
396 verify_channel_empty(&mut receiver);
397
398 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
400 assert_eq!(
401 last_commit.index(),
402 expected_last_processed_index as CommitIndex
403 );
404
405 commits.append(
409 &mut observer
410 .handle_commit(
411 leaders
412 .clone()
413 .into_iter()
414 .skip(expected_last_processed_index)
415 .collect::<Vec<_>>(),
416 )
417 .unwrap(),
418 );
419
420 let expected_last_sent_index = num_rounds as usize;
421 while let Ok(subdag) = receiver.try_recv() {
422 tracing::info!("{subdag} was sent but not processed by consumer");
423 assert_eq!(subdag, commits[processed_subdag_index]);
424 assert_eq!(subdag.reputation_scores_desc, vec![]);
425 processed_subdag_index = subdag.commit_ref.index as usize;
426 if processed_subdag_index == expected_last_sent_index {
427 break;
428 }
429 }
430 assert_eq!(processed_subdag_index, expected_last_sent_index);
431
432 verify_channel_empty(&mut receiver);
433
434 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
438 assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
439
440 let _observer = CommitObserver::new(
443 context.clone(),
444 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
445 dag_state.clone(),
446 mem_store.clone(),
447 leader_schedule,
448 );
449
450 processed_subdag_index = expected_last_processed_index;
453 while let Ok(subdag) = receiver.try_recv() {
454 tracing::info!("Processed {subdag} on resubmission");
455 assert_eq!(subdag, commits[processed_subdag_index]);
456 assert_eq!(subdag.reputation_scores_desc, vec![]);
457 processed_subdag_index = subdag.commit_ref.index as usize;
458 if processed_subdag_index == expected_last_sent_index {
459 break;
460 }
461 }
462 assert_eq!(processed_subdag_index, expected_last_sent_index);
463
464 verify_channel_empty(&mut receiver);
465 }
466
467 #[tokio::test]
468 async fn test_send_no_missing_commits() {
469 telemetry_subscribers::init_for_testing();
470 let num_authorities = 4;
471 let context = Arc::new(Context::new_for_test(num_authorities).0);
472 let mem_store = Arc::new(MemStore::new());
473 let dag_state = Arc::new(RwLock::new(DagState::new(
474 context.clone(),
475 mem_store.clone(),
476 )));
477 let last_processed_commit_index = 0;
478 let (sender, mut receiver) = unbounded_channel("consensus_output");
479
480 let leader_schedule = Arc::new(LeaderSchedule::from_store(
481 context.clone(),
482 dag_state.clone(),
483 ));
484
485 let mut observer = CommitObserver::new(
486 context.clone(),
487 CommitConsumer::new(sender.clone(), last_processed_commit_index),
488 dag_state.clone(),
489 mem_store.clone(),
490 leader_schedule.clone(),
491 );
492
493 let num_rounds = 10;
495 let mut builder = DagBuilder::new(context.clone());
496 builder
497 .layers(1..=num_rounds)
498 .build()
499 .persist_layers(dag_state.clone());
500
501 let leaders = builder
502 .leader_blocks(1..=num_rounds)
503 .into_iter()
504 .map(Option::unwrap)
505 .collect::<Vec<_>>();
506
507 let expected_last_processed_index: usize = 10;
510 let commits = observer.handle_commit(leaders.clone()).unwrap();
511
512 let mut processed_subdag_index = 0;
514 while let Ok(subdag) = receiver.try_recv() {
515 tracing::info!("Processed {subdag}");
516 assert_eq!(subdag, commits[processed_subdag_index]);
517 assert_eq!(subdag.reputation_scores_desc, vec![]);
518 processed_subdag_index = subdag.commit_ref.index as usize;
519 if processed_subdag_index == expected_last_processed_index {
520 break;
521 }
522 }
523 assert_eq!(processed_subdag_index, expected_last_processed_index);
524
525 verify_channel_empty(&mut receiver);
526
527 let last_commit = mem_store.read_last_commit().unwrap().unwrap();
529 assert_eq!(
530 last_commit.index(),
531 expected_last_processed_index as CommitIndex
532 );
533
534 let _observer = CommitObserver::new(
537 context.clone(),
538 CommitConsumer::new(sender, expected_last_processed_index as CommitIndex),
539 dag_state.clone(),
540 mem_store.clone(),
541 leader_schedule,
542 );
543
544 verify_channel_empty(&mut receiver);
547 }
548
549 fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
551 match receiver.try_recv() {
552 Ok(_) => {
553 panic!("Expected the consensus output channel to be empty, but found more subdags.")
554 }
555 Err(e) => match e {
556 tokio::sync::mpsc::error::TryRecvError::Empty => {}
557 tokio::sync::mpsc::error::TryRecvError::Disconnected => {
558 panic!("The consensus output channel was unexpectedly closed.")
559 }
560 },
561 }
562 }
563}