1use std::{
6 collections::{HashMap, HashSet},
7 hash::Hash,
8 num::NonZeroUsize,
9 sync::Arc,
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CommitConsumerMonitor, CommitIndex};
15use iota_macros::{fail_point, fail_point_if};
16use iota_metrics::{monitored_mpsc::UnboundedReceiver, monitored_scope, spawn_monitored_task};
17use iota_types::{
18 authenticator_state::ActiveJwk,
19 base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest},
20 digests::ConsensusCommitDigest,
21 executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction},
22 iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
23 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
24 transaction::{SenderSignedData, VerifiedTransaction},
25};
26use lru::LruCache;
27use serde::{Deserialize, Serialize};
28use tracing::{debug, info, instrument, trace_span, warn};
29
30use crate::{
31 authority::{
32 AuthorityMetrics, AuthorityState,
33 authority_per_epoch_store::{
34 AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndices,
35 ExecutionIndicesWithStats,
36 },
37 backpressure::{BackpressureManager, BackpressureSubscriber},
38 epoch_start_configuration::EpochStartConfigTrait,
39 },
40 checkpoints::{CheckpointService, CheckpointServiceNotify},
41 consensus_types::{AuthorityIndex, consensus_output_api::ConsensusOutputAPI},
42 execution_cache::{ObjectCacheRead, TransactionCacheRead},
43 scoring_decision::update_low_scoring_authorities,
44 transaction_manager::TransactionManager,
45};
46
47pub struct ConsensusHandlerInitializer {
48 state: Arc<AuthorityState>,
49 checkpoint_service: Arc<CheckpointService>,
50 epoch_store: Arc<AuthorityPerEpochStore>,
51 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
52 backpressure_manager: Arc<BackpressureManager>,
53}
54
55impl ConsensusHandlerInitializer {
56 pub fn new(
57 state: Arc<AuthorityState>,
58 checkpoint_service: Arc<CheckpointService>,
59 epoch_store: Arc<AuthorityPerEpochStore>,
60 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
61 backpressure_manager: Arc<BackpressureManager>,
62 ) -> Self {
63 Self {
64 state,
65 checkpoint_service,
66 epoch_store,
67 low_scoring_authorities,
68 backpressure_manager,
69 }
70 }
71
72 pub fn new_for_testing(
73 state: Arc<AuthorityState>,
74 checkpoint_service: Arc<CheckpointService>,
75 ) -> Self {
76 let backpressure_manager = BackpressureManager::new_for_tests();
77 Self {
78 state: state.clone(),
79 checkpoint_service,
80 epoch_store: state.epoch_store_for_testing().clone(),
81 low_scoring_authorities: Arc::new(Default::default()),
82 backpressure_manager,
83 }
84 }
85
86 pub fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
87 let new_epoch_start_state = self.epoch_store.epoch_start_state();
88 let consensus_committee = new_epoch_start_state.get_consensus_committee();
89
90 ConsensusHandler::new(
91 self.epoch_store.clone(),
92 self.checkpoint_service.clone(),
93 self.state.transaction_manager().clone(),
94 self.state.get_object_cache_reader().clone(),
95 self.state.get_transaction_cache_reader().clone(),
96 self.low_scoring_authorities.clone(),
97 consensus_committee,
98 self.state.metrics.clone(),
99 self.backpressure_manager.subscribe(),
100 )
101 }
102}
103
104pub struct ConsensusHandler<C> {
105 epoch_store: Arc<AuthorityPerEpochStore>,
109 last_consensus_stats: ExecutionIndicesWithStats,
114 checkpoint_service: Arc<C>,
115 cache_reader: Arc<dyn ObjectCacheRead>,
118 tx_reader: Arc<dyn TransactionCacheRead>,
120 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
123 committee: ConsensusCommittee,
126 metrics: Arc<AuthorityMetrics>,
129 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
131 transaction_scheduler: AsyncTransactionScheduler,
132
133 backpressure_subscriber: BackpressureSubscriber,
134}
135
136const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
137
138impl<C> ConsensusHandler<C> {
139 pub fn new(
140 epoch_store: Arc<AuthorityPerEpochStore>,
141 checkpoint_service: Arc<C>,
142 transaction_manager: Arc<TransactionManager>,
143 cache_reader: Arc<dyn ObjectCacheRead>,
144 tx_reader: Arc<dyn TransactionCacheRead>,
145 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
146 committee: ConsensusCommittee,
147 metrics: Arc<AuthorityMetrics>,
148 backpressure_subscriber: BackpressureSubscriber,
149 ) -> Self {
150 let mut last_consensus_stats = epoch_store
152 .get_last_consensus_stats()
153 .expect("Should be able to read last consensus index");
154 if !last_consensus_stats.stats.is_initialized() {
156 last_consensus_stats.stats = ConsensusStats::new(committee.size());
157 }
158 let transaction_scheduler =
159 AsyncTransactionScheduler::start(transaction_manager, epoch_store.clone());
160 Self {
161 epoch_store,
162 last_consensus_stats,
163 checkpoint_service,
164 cache_reader,
165 tx_reader,
166 low_scoring_authorities,
167 committee,
168 metrics,
169 processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()),
170 transaction_scheduler,
171 backpressure_subscriber,
172 }
173 }
174
175 pub fn last_processed_subdag_index(&self) -> u64 {
177 self.last_consensus_stats.index.sub_dag_index
178 }
179}
180
181impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
182 fn handle_prior_consensus_output(&mut self, consensus_commit: impl ConsensusOutputAPI) {
187 let round = consensus_commit.leader_round();
190 info!("Ignoring prior consensus commit for round {:?}", round);
191 }
192
193 #[instrument("handle_consensus_output", level = "trace", skip_all)]
194 async fn handle_consensus_output(&mut self, consensus_output: impl ConsensusOutputAPI) {
195 self.backpressure_subscriber.await_no_backpressure().await;
201
202 let _scope = monitored_scope("HandleConsensusOutput");
203
204 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
205
206 let round = consensus_output.leader_round();
207
208 assert!(
212 round >= last_committed_round,
213 "Consensus output round {round} is less than last committed round {last_committed_round}"
214 );
215 if last_committed_round == round {
216 warn!(
221 "Ignoring consensus output for round {} as it is already committed. NOTE: This is only expected if consensus is running.",
222 round
223 );
224 return;
225 }
226
227 let mut transactions = vec![];
229 let leader_author = consensus_output.leader_author_index();
230 let commit_sub_dag_index = consensus_output.commit_sub_dag_index();
231
232 debug!(
233 %consensus_output,
234 epoch = ?self.epoch_store.epoch(),
235 "Received consensus output"
236 );
237
238 let execution_index = ExecutionIndices {
239 last_committed_round: round,
240 sub_dag_index: commit_sub_dag_index,
241 transaction_index: 0_u64,
242 };
243 assert!(self.last_consensus_stats.index < execution_index);
246
247 self.last_consensus_stats.index = execution_index;
252
253 let new_jwks = self
263 .epoch_store
264 .get_new_jwks(last_committed_round)
265 .expect("Unrecoverable error in consensus handler");
266
267 if !new_jwks.is_empty() {
268 let authenticator_state_update_transaction =
269 self.authenticator_state_update_transaction(round, new_jwks);
270 debug!(
271 "adding AuthenticatorStateUpdateV1({:?}) tx: {:?}",
272 authenticator_state_update_transaction.digest(),
273 authenticator_state_update_transaction,
274 );
275
276 transactions.push((
277 SequencedConsensusTransactionKind::System(authenticator_state_update_transaction),
278 leader_author,
279 ));
280 }
281
282 update_low_scoring_authorities(
283 self.low_scoring_authorities.clone(),
284 self.epoch_store.committee(),
285 &self.committee,
286 consensus_output.reputation_score_sorted_desc(),
287 &self.metrics,
288 self.epoch_store
289 .protocol_config()
290 .consensus_bad_nodes_stake_threshold(),
291 );
292
293 self.metrics
294 .consensus_committed_subdags
295 .with_label_values(&[&leader_author.to_string()])
296 .inc();
297
298 for (authority_index, number_of_committed_headers) in
299 consensus_output.number_of_headers_in_commit_by_authority()
300 {
301 self.last_consensus_stats
302 .stats
303 .inc_num_messages(authority_index as usize, number_of_committed_headers);
304 }
305
306 {
307 let span = trace_span!("process_consensus_certs");
308 let _guard = span.enter();
309 for (authority_index, authority_transactions) in consensus_output.transactions() {
310 for (transaction, serialized_len) in authority_transactions {
312 let kind = classify(&transaction);
313 self.metrics
314 .consensus_handler_processed
315 .with_label_values(&[kind])
316 .inc();
317 self.metrics
318 .consensus_handler_transaction_sizes
319 .with_label_values(&[kind])
320 .observe(serialized_len as f64);
321 if matches!(
322 &transaction.kind,
323 ConsensusTransactionKind::CertifiedTransaction(_)
324 ) {
325 self.last_consensus_stats
326 .stats
327 .inc_num_user_transactions(authority_index as usize);
328 }
329 let transaction = SequencedConsensusTransactionKind::External(transaction);
330 transactions.push((transaction, authority_index));
331 }
332 }
333 }
334
335 for (i, authority) in self.committee.authorities() {
336 let hostname = &authority.hostname;
337 self.metrics
338 .consensus_committed_messages
339 .with_label_values(&[hostname])
340 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
341 self.metrics
342 .consensus_committed_user_transactions
343 .with_label_values(&[hostname])
344 .set(
345 self.last_consensus_stats
346 .stats
347 .get_num_user_transactions(i.value()) as i64,
348 );
349 }
350
351 let mut all_transactions = Vec::new();
352 {
353 let mut processed_set = HashSet::new();
357
358 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
359 let current_tx_index = ExecutionIndices {
364 last_committed_round: round,
365 sub_dag_index: commit_sub_dag_index,
366 transaction_index: (seq + 1) as u64,
367 };
368
369 self.last_consensus_stats.index = current_tx_index;
370
371 let certificate_author = *self
372 .epoch_store
373 .committee()
374 .authority_by_index(cert_origin)
375 .unwrap();
376
377 let sequenced_transaction = SequencedConsensusTransaction {
378 certificate_author_index: cert_origin,
379 certificate_author,
380 consensus_index: current_tx_index,
381 transaction,
382 };
383
384 let key = sequenced_transaction.key();
385 let in_set = !processed_set.insert(key);
386 let in_cache = self
387 .processed_cache
388 .put(sequenced_transaction.key(), ())
389 .is_some();
390
391 if in_set || in_cache {
392 self.metrics.skipped_consensus_txns_cache_hit.inc();
393 continue;
394 }
395
396 all_transactions.push(sequenced_transaction);
397 }
398 }
399
400 let transactions_to_schedule = self
401 .epoch_store
402 .process_consensus_transactions_and_commit_boundary(
403 all_transactions,
404 &self.last_consensus_stats,
405 &self.checkpoint_service,
406 self.cache_reader.as_ref(),
407 self.tx_reader.as_ref(),
408 &ConsensusCommitInfo::new(&consensus_output),
409 &self.metrics,
410 )
411 .await
412 .expect("Unrecoverable error in consensus handler");
413
414 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
415 let key = [commit_sub_dag_index, self.epoch_store.epoch()];
416 if iota_simulator::random::deterministic_probability_once(key, 0.01) {
417 iota_simulator::task::kill_current_node(None);
418 }
419 });
420
421 fail_point!("crash"); self.transaction_scheduler
424 .schedule(transactions_to_schedule)
425 .await;
426 }
427}
428
429struct AsyncTransactionScheduler {
430 sender: tokio::sync::mpsc::Sender<Vec<VerifiedExecutableTransaction>>,
431}
432
433impl AsyncTransactionScheduler {
434 pub fn start(
435 transaction_manager: Arc<TransactionManager>,
436 epoch_store: Arc<AuthorityPerEpochStore>,
437 ) -> Self {
438 let (sender, recv) = tokio::sync::mpsc::channel(16);
439 spawn_monitored_task!(Self::run(recv, transaction_manager, epoch_store));
440 Self { sender }
441 }
442
443 pub async fn schedule(&self, transactions: Vec<VerifiedExecutableTransaction>) {
444 tracing::trace_span!("transaction_scheduler_enqueue");
445 self.sender.send(transactions).await.ok();
446 }
447
448 pub async fn run(
449 mut recv: tokio::sync::mpsc::Receiver<Vec<VerifiedExecutableTransaction>>,
450 transaction_manager: Arc<TransactionManager>,
451 epoch_store: Arc<AuthorityPerEpochStore>,
452 ) {
453 while let Some(transactions) = recv.recv().await {
454 let _guard = monitored_scope("ConsensusHandler::enqueue");
455 transaction_manager.enqueue(transactions, &epoch_store);
456 }
457 }
458}
459
460pub struct MysticetiConsensusHandler {
465 handle: Option<tokio::task::JoinHandle<()>>,
466}
467
468impl MysticetiConsensusHandler {
469 pub fn new(
470 last_processed_commit_at_startup: CommitIndex,
471 mut consensus_handler: ConsensusHandler<CheckpointService>,
472 mut receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
473 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
474 ) -> Self {
475 let handle = spawn_monitored_task!(async move {
476 while let Some(consensus_output) = receiver.recv().await {
479 let commit_index = consensus_output.commit_ref.index;
480 if commit_index <= last_processed_commit_at_startup {
481 consensus_handler.handle_prior_consensus_output(consensus_output);
482 } else {
483 consensus_handler
484 .handle_consensus_output(consensus_output)
485 .await;
486 commit_consumer_monitor.set_highest_handled_commit(commit_index);
487 }
488 }
489 });
490 Self {
491 handle: Some(handle),
492 }
493 }
494
495 pub async fn abort(&mut self) {
496 if let Some(handle) = self.handle.take() {
497 handle.abort();
498 let _ = handle.await;
499 }
500 }
501}
502
503impl Drop for MysticetiConsensusHandler {
504 fn drop(&mut self) {
505 if let Some(handle) = self.handle.take() {
506 handle.abort();
507 }
508 }
509}
510
511pub struct StarfishConsensusHandler {
515 handle: Option<tokio::task::JoinHandle<()>>,
516}
517
518impl StarfishConsensusHandler {
519 pub fn new(
520 last_processed_commit_at_startup: starfish_core::CommitIndex,
521 mut consensus_handler: ConsensusHandler<CheckpointService>,
522 mut receiver: UnboundedReceiver<starfish_core::CommittedSubDag>,
523 commit_consumer_monitor: Arc<starfish_core::CommitConsumerMonitor>,
524 ) -> Self {
525 let handle = spawn_monitored_task!(async move {
526 while let Some(consensus_output) = receiver.recv().await {
529 let commit_index = consensus_output.commit_ref.index;
530 if commit_index <= last_processed_commit_at_startup {
531 consensus_handler.handle_prior_consensus_output(consensus_output);
532 } else {
533 consensus_handler
534 .handle_consensus_output(consensus_output)
535 .await;
536 commit_consumer_monitor.set_highest_handled_commit(commit_index);
537 }
538 }
539 });
540 Self {
541 handle: Some(handle),
542 }
543 }
544
545 pub async fn abort(&mut self) {
546 if let Some(handle) = self.handle.take() {
547 handle.abort();
548 let _ = handle.await;
549 }
550 }
551}
552
553impl Drop for StarfishConsensusHandler {
554 fn drop(&mut self) {
555 if let Some(handle) = self.handle.take() {
556 handle.abort();
557 }
558 }
559}
560
561impl<C> ConsensusHandler<C> {
562 fn authenticator_state_update_transaction(
563 &self,
564 round: u64,
565 mut new_active_jwks: Vec<ActiveJwk>,
566 ) -> VerifiedExecutableTransaction {
567 new_active_jwks.sort();
568
569 info!("creating authenticator state update transaction");
570 assert!(self.epoch_store.authenticator_state_enabled());
571 let transaction = VerifiedTransaction::new_authenticator_state_update(
572 self.epoch(),
573 round,
574 new_active_jwks,
575 self.epoch_store
576 .epoch_start_config()
577 .authenticator_obj_initial_shared_version()
578 .expect("authenticator state obj must exist"),
579 );
580 VerifiedExecutableTransaction::new_system(transaction, self.epoch())
581 }
582
583 fn epoch(&self) -> EpochId {
584 self.epoch_store.epoch()
585 }
586}
587
588pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
589 match &transaction.kind {
590 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
591 if certificate.contains_shared_object() {
592 "shared_certificate"
593 } else {
594 "owned_certificate"
595 }
596 }
597 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
598 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
599 ConsensusTransactionKind::CapabilityNotificationV1(_) => "capability_notification_v1",
600 ConsensusTransactionKind::SignedCapabilityNotificationV1(_) => {
601 "signed_capability_notification_v1"
602 }
603 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
604 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
605 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
606 }
607}
608
609#[derive(Debug, Clone, Serialize, Deserialize)]
610pub struct SequencedConsensusTransaction {
611 pub certificate_author_index: AuthorityIndex,
612 pub certificate_author: AuthorityName,
613 pub consensus_index: ExecutionIndices,
614 pub transaction: SequencedConsensusTransactionKind,
615}
616
617#[derive(Debug, Clone)]
618pub enum SequencedConsensusTransactionKind {
619 External(ConsensusTransaction),
620 System(VerifiedExecutableTransaction),
621}
622
623impl Serialize for SequencedConsensusTransactionKind {
624 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
625 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
626 serializable.serialize(serializer)
627 }
628}
629
630impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
631 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
632 let serializable =
633 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
634 Ok(serializable.into())
635 }
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
642enum SerializableSequencedConsensusTransactionKind {
643 External(Box<ConsensusTransaction>),
644 System(Box<TrustedExecutableTransaction>),
645}
646
647impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
648 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
649 match kind {
650 SequencedConsensusTransactionKind::External(ext) => {
651 SerializableSequencedConsensusTransactionKind::External(Box::new(ext.clone()))
652 }
653 SequencedConsensusTransactionKind::System(txn) => {
654 SerializableSequencedConsensusTransactionKind::System(Box::new(
655 txn.clone().serializable(),
656 ))
657 }
658 }
659 }
660}
661
662impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
663 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
664 match kind {
665 SerializableSequencedConsensusTransactionKind::External(ext) => {
666 SequencedConsensusTransactionKind::External(*ext)
667 }
668 SerializableSequencedConsensusTransactionKind::System(txn) => {
669 SequencedConsensusTransactionKind::System((*txn).into())
670 }
671 }
672 }
673}
674
675#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
676pub enum SequencedConsensusTransactionKey {
677 External(ConsensusTransactionKey),
678 System(TransactionDigest),
679}
680
681impl SequencedConsensusTransactionKind {
682 pub fn key(&self) -> SequencedConsensusTransactionKey {
683 match self {
684 SequencedConsensusTransactionKind::External(ext) => {
685 SequencedConsensusTransactionKey::External(ext.key())
686 }
687 SequencedConsensusTransactionKind::System(txn) => {
688 SequencedConsensusTransactionKey::System(*txn.digest())
689 }
690 }
691 }
692
693 pub fn get_tracking_id(&self) -> u64 {
694 match self {
695 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
696 SequencedConsensusTransactionKind::System(_txn) => 0,
697 }
698 }
699
700 pub fn is_executable_transaction(&self) -> bool {
701 match self {
702 SequencedConsensusTransactionKind::External(ext) => ext.is_user_certificate(),
703 SequencedConsensusTransactionKind::System(_) => true,
704 }
705 }
706
707 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
708 match self {
709 SequencedConsensusTransactionKind::External(ext) => {
710 if let ConsensusTransactionKind::CertifiedTransaction(txn) = &ext.kind {
711 Some(*txn.digest())
712 } else {
713 None
714 }
715 }
716 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
717 }
718 }
719
720 pub fn is_end_of_publish(&self) -> bool {
721 match self {
722 SequencedConsensusTransactionKind::External(ext) => {
723 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
724 }
725 SequencedConsensusTransactionKind::System(_) => false,
726 }
727 }
728}
729
730impl SequencedConsensusTransaction {
731 pub fn sender_authority(&self) -> AuthorityName {
732 self.certificate_author
733 }
734
735 pub fn key(&self) -> SequencedConsensusTransactionKey {
736 self.transaction.key()
737 }
738
739 pub fn is_end_of_publish(&self) -> bool {
740 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
741 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
742 } else {
743 false
744 }
745 }
746
747 pub fn is_system(&self) -> bool {
748 matches!(
749 self.transaction,
750 SequencedConsensusTransactionKind::System(_)
751 )
752 }
753
754 pub fn is_user_tx_with_randomness(&self) -> bool {
755 let SequencedConsensusTransactionKind::External(ConsensusTransaction {
756 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
757 ..
758 }) = &self.transaction
759 else {
760 return false;
761 };
762 certificate.transaction_data().uses_randomness()
763 }
764
765 pub fn as_shared_object_txn(&self) -> Option<&SenderSignedData> {
766 match &self.transaction {
767 SequencedConsensusTransactionKind::External(ConsensusTransaction {
768 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
769 ..
770 }) if certificate.contains_shared_object() => Some(certificate.data()),
771 SequencedConsensusTransactionKind::System(txn) if txn.contains_shared_object() => {
772 Some(txn.data())
773 }
774 _ => None,
775 }
776 }
777}
778
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
781
782#[cfg(test)]
783impl VerifiedSequencedConsensusTransaction {
784 pub fn new_test(transaction: ConsensusTransaction) -> Self {
785 Self(SequencedConsensusTransaction::new_test(transaction))
786 }
787}
788
789impl SequencedConsensusTransaction {
790 pub fn new_test(transaction: ConsensusTransaction) -> Self {
791 Self {
792 certificate_author_index: 0,
793 certificate_author: AuthorityName::ZERO,
794 consensus_index: Default::default(),
795 transaction: SequencedConsensusTransactionKind::External(transaction),
796 }
797 }
798}
799
800pub struct ConsensusCommitInfo {
802 pub round: u64,
803 pub timestamp: u64,
804 pub consensus_commit_digest: ConsensusCommitDigest,
805
806 skip_consensus_commit_prologue_in_test: bool,
807}
808
809impl ConsensusCommitInfo {
810 fn new(consensus_output: &impl ConsensusOutputAPI) -> Self {
811 Self {
812 round: consensus_output.leader_round(),
813 timestamp: consensus_output.commit_timestamp_ms(),
814 consensus_commit_digest: consensus_output.consensus_digest(),
815
816 skip_consensus_commit_prologue_in_test: false,
817 }
818 }
819
820 pub fn new_for_test(
821 commit_round: u64,
822 commit_timestamp: u64,
823 skip_consensus_commit_prologue_in_test: bool,
824 ) -> Self {
825 Self {
826 round: commit_round,
827 timestamp: commit_timestamp,
828 consensus_commit_digest: ConsensusCommitDigest::default(),
829 skip_consensus_commit_prologue_in_test,
830 }
831 }
832
833 pub fn skip_consensus_commit_prologue_in_test(&self) -> bool {
834 self.skip_consensus_commit_prologue_in_test
835 }
836
837 fn consensus_commit_prologue_v1_transaction(
838 &self,
839 epoch: u64,
840 cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
841 ) -> VerifiedExecutableTransaction {
842 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v1(
843 epoch,
844 self.round,
845 self.timestamp,
846 self.consensus_commit_digest,
847 cancelled_txn_version_assignment,
848 );
849 VerifiedExecutableTransaction::new_system(transaction, epoch)
850 }
851
852 pub fn create_consensus_commit_prologue_transaction(
853 &self,
854 epoch: u64,
855 cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
856 ) -> VerifiedExecutableTransaction {
857 self.consensus_commit_prologue_v1_transaction(epoch, cancelled_txn_version_assignment)
858 }
859}
860
861#[cfg(test)]
862mod tests {
863 use consensus_core::{
864 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
865 };
866 use futures::pin_mut;
867 use iota_protocol_config::{Chain, ConsensusTransactionOrdering};
868 use iota_types::{
869 base_types::{AuthorityName, IotaAddress, random_object_ref},
870 committee::Committee,
871 messages_consensus::{
872 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
873 },
874 object::Object,
875 supported_protocol_versions::{
876 SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
877 },
878 transaction::{
879 CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
880 },
881 };
882 use prometheus::Registry;
883
884 use super::*;
885 use crate::{
886 authority::{
887 authority_per_epoch_store::ConsensusStatsAPI,
888 test_authority_builder::TestAuthorityBuilder,
889 },
890 checkpoints::CheckpointServiceNoop,
891 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
892 post_consensus_tx_reorder::PostConsensusTxReorder,
893 };
894
895 #[tokio::test(flavor = "current_thread", start_paused = true)]
896 pub async fn test_consensus_handler() {
897 let mut objects = test_gas_objects();
899 let shared_object = Object::shared_for_testing();
900 objects.push(shared_object.clone());
901
902 let network_config =
903 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
904 .with_objects(objects.clone())
905 .build();
906
907 let state = TestAuthorityBuilder::new()
908 .with_network_config(&network_config, 0)
909 .build()
910 .await;
911
912 let epoch_store = state.epoch_store_for_testing().clone();
913 let new_epoch_start_state = epoch_store.epoch_start_state();
914 let consensus_committee = new_epoch_start_state.get_consensus_committee();
915
916 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
917
918 let backpressure_manager = BackpressureManager::new_for_tests();
919
920 let mut consensus_handler = ConsensusHandler::new(
921 epoch_store,
922 Arc::new(CheckpointServiceNoop {}),
923 state.transaction_manager().clone(),
924 state.get_object_cache_reader().clone(),
925 state.get_transaction_cache_reader().clone(),
926 Arc::new(ArcSwap::default()),
927 consensus_committee.clone(),
928 metrics,
929 backpressure_manager.subscribe(),
930 );
931
932 let transactions = test_certificates(&state, shared_object).await;
935 let mut blocks = Vec::new();
936
937 for (i, transaction) in transactions.iter().enumerate() {
938 let transaction_bytes: Vec<u8> = bcs::to_bytes(
939 &ConsensusTransaction::new_certificate_message(&state.name, transaction.clone()),
940 )
941 .unwrap();
942
943 let block = VerifiedBlock::new_for_test(
945 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
946 .set_transactions(vec![Transaction::new(transaction_bytes)])
947 .build(),
948 );
949
950 blocks.push(block);
951 }
952
953 let leader_block = blocks[0].clone();
955 let committed_sub_dag = CommittedSubDag::new(
956 leader_block.reference(),
957 blocks.clone(),
958 leader_block.timestamp_ms(),
959 CommitRef::new(10, CommitDigest::MIN),
960 vec![],
961 );
962
963 backpressure_manager.set_backpressure(true);
965 backpressure_manager.update_highest_certified_checkpoint(1);
967
968 {
970 let waiter = consensus_handler.handle_consensus_output(committed_sub_dag.clone());
971 pin_mut!(waiter);
972
973 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
975 .await
976 .unwrap_err();
977
978 backpressure_manager.set_backpressure(false);
980
981 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
983 .await
984 .unwrap();
985 }
986
987 let num_blocks = blocks.len();
989 let num_transactions = transactions.len();
990 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
991 assert_eq!(
992 last_consensus_stats_1.index.transaction_index,
993 num_transactions as u64
994 );
995 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
996 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
997 assert_eq!(last_consensus_stats_1.hash, 0);
998 assert_eq!(
999 last_consensus_stats_1.stats.get_num_messages(0),
1000 num_blocks as u64
1001 );
1002 assert_eq!(
1003 last_consensus_stats_1.stats.get_num_user_transactions(0),
1004 num_transactions as u64
1005 );
1006
1007 for _ in 0..2 {
1010 consensus_handler
1011 .handle_consensus_output(committed_sub_dag.clone())
1012 .await;
1013 let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone();
1014 assert_eq!(last_consensus_stats_1, last_consensus_stats_2);
1015 }
1016 }
1017
1018 #[test]
1019 fn test_order_by_gas_price() {
1020 let chain = Chain::Unknown;
1021 let mut v = vec![
1022 cap_txn(10, chain),
1023 user_txn(42),
1024 user_txn(100),
1025 cap_txn(1, chain),
1026 ];
1027 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
1028 assert_eq!(
1029 extract(v),
1030 vec![
1031 "cap(10)".to_string(),
1032 "cap(1)".to_string(),
1033 "user(100)".to_string(),
1034 "user(42)".to_string(),
1035 ]
1036 );
1037
1038 let mut v = vec![
1039 user_txn(1200),
1040 cap_txn(10, chain),
1041 user_txn(12),
1042 user_txn(1000),
1043 user_txn(42),
1044 user_txn(100),
1045 cap_txn(1, chain),
1046 user_txn(1000),
1047 ];
1048 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
1049 assert_eq!(
1050 extract(v),
1051 vec![
1052 "cap(10)".to_string(),
1053 "cap(1)".to_string(),
1054 "user(1200)".to_string(),
1055 "user(1000)".to_string(),
1056 "user(1000)".to_string(),
1057 "user(100)".to_string(),
1058 "user(42)".to_string(),
1059 "user(12)".to_string(),
1060 ]
1061 );
1062
1063 let mut v = vec![
1065 cap_txn(10, chain),
1066 eop_txn(12),
1067 eop_txn(10),
1068 cap_txn(1, chain),
1069 eop_txn(11),
1070 ];
1071 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
1072 assert_eq!(
1073 extract(v),
1074 vec![
1075 "cap(10)".to_string(),
1076 "eop(12)".to_string(),
1077 "eop(10)".to_string(),
1078 "cap(1)".to_string(),
1079 "eop(11)".to_string(),
1080 ]
1081 );
1082 }
1083
1084 fn extract(v: Vec<VerifiedSequencedConsensusTransaction>) -> Vec<String> {
1085 v.into_iter().map(extract_one).collect()
1086 }
1087
1088 fn extract_one(t: VerifiedSequencedConsensusTransaction) -> String {
1089 match t.0.transaction {
1090 SequencedConsensusTransactionKind::External(ext) => match ext.kind {
1091 ConsensusTransactionKind::EndOfPublish(authority) => {
1092 format!("eop({})", authority.0[0])
1093 }
1094 ConsensusTransactionKind::CapabilityNotificationV1(cap) => {
1095 format!("cap({})", cap.generation)
1096 }
1097 ConsensusTransactionKind::CertifiedTransaction(txn) => {
1098 format!("user({})", txn.transaction_data().gas_price())
1099 }
1100 _ => unreachable!(),
1101 },
1102 SequencedConsensusTransactionKind::System(_) => unreachable!(),
1103 }
1104 }
1105
1106 fn eop_txn(a: u8) -> VerifiedSequencedConsensusTransaction {
1107 let mut authority = AuthorityName::default();
1108 authority.0[0] = a;
1109 txn(ConsensusTransactionKind::EndOfPublish(authority))
1110 }
1111
1112 fn cap_txn(generation: u64, chain: Chain) -> VerifiedSequencedConsensusTransaction {
1113 txn(ConsensusTransactionKind::CapabilityNotificationV1(
1114 AuthorityCapabilitiesV1 {
1116 authority: Default::default(),
1117 generation,
1118 supported_protocol_versions:
1119 SupportedProtocolVersionsWithHashes::from_supported_versions(
1120 SupportedProtocolVersions::SYSTEM_DEFAULT,
1121 chain,
1122 ),
1123 available_system_packages: vec![],
1124 },
1125 ))
1126 }
1127
1128 fn user_txn(gas_price: u64) -> VerifiedSequencedConsensusTransaction {
1129 let (committee, keypairs) = Committee::new_simple_test_committee();
1130 let data = SenderSignedData::new(
1131 TransactionData::new_transfer(
1132 IotaAddress::default(),
1133 random_object_ref(),
1134 IotaAddress::default(),
1135 random_object_ref(),
1136 1000 * gas_price,
1137 gas_price,
1138 ),
1139 vec![],
1140 );
1141 txn(ConsensusTransactionKind::CertifiedTransaction(Box::new(
1142 CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
1143 )))
1144 }
1145
1146 fn txn(kind: ConsensusTransactionKind) -> VerifiedSequencedConsensusTransaction {
1147 VerifiedSequencedConsensusTransaction::new_test(ConsensusTransaction {
1148 kind,
1149 tracking_id: Default::default(),
1150 })
1151 }
1152}