1use std::{
6 collections::{HashMap, HashSet},
7 hash::Hash,
8 num::NonZeroUsize,
9 sync::Arc,
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::CommitConsumerMonitor;
15use iota_macros::{fail_point_async, fail_point_if};
16use iota_metrics::{monitored_mpsc::UnboundedReceiver, monitored_scope, spawn_monitored_task};
17use iota_types::{
18 authenticator_state::ActiveJwk,
19 base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest},
20 digests::ConsensusCommitDigest,
21 executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction},
22 iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
23 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
24 transaction::{SenderSignedData, VerifiedTransaction},
25};
26use lru::LruCache;
27use serde::{Deserialize, Serialize};
28use tracing::{debug, info, instrument, trace_span, warn};
29
30use crate::{
31 authority::{
32 AuthorityMetrics, AuthorityState,
33 authority_per_epoch_store::{
34 AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndices,
35 ExecutionIndicesWithStats,
36 },
37 epoch_start_configuration::EpochStartConfigTrait,
38 },
39 checkpoints::{CheckpointService, CheckpointServiceNotify},
40 consensus_types::{AuthorityIndex, consensus_output_api::ConsensusOutputAPI},
41 execution_cache::ObjectCacheRead,
42 scoring_decision::update_low_scoring_authorities,
43 transaction_manager::TransactionManager,
44};
45
46pub struct ConsensusHandlerInitializer {
47 state: Arc<AuthorityState>,
48 checkpoint_service: Arc<CheckpointService>,
49 epoch_store: Arc<AuthorityPerEpochStore>,
50 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
51}
52
53impl ConsensusHandlerInitializer {
54 pub fn new(
55 state: Arc<AuthorityState>,
56 checkpoint_service: Arc<CheckpointService>,
57 epoch_store: Arc<AuthorityPerEpochStore>,
58 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
59 ) -> Self {
60 Self {
61 state,
62 checkpoint_service,
63 epoch_store,
64 low_scoring_authorities,
65 }
66 }
67
68 pub fn new_for_testing(
69 state: Arc<AuthorityState>,
70 checkpoint_service: Arc<CheckpointService>,
71 ) -> Self {
72 Self {
73 state: state.clone(),
74 checkpoint_service,
75 epoch_store: state.epoch_store_for_testing().clone(),
76 low_scoring_authorities: Arc::new(Default::default()),
77 }
78 }
79
80 pub fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
81 let new_epoch_start_state = self.epoch_store.epoch_start_state();
82 let consensus_committee = new_epoch_start_state.get_consensus_committee();
83
84 ConsensusHandler::new(
85 self.epoch_store.clone(),
86 self.checkpoint_service.clone(),
87 self.state.transaction_manager().clone(),
88 self.state.get_object_cache_reader().clone(),
89 self.low_scoring_authorities.clone(),
90 consensus_committee,
91 self.state.metrics.clone(),
92 )
93 }
94}
95
96pub struct ConsensusHandler<C> {
97 epoch_store: Arc<AuthorityPerEpochStore>,
101 last_consensus_stats: ExecutionIndicesWithStats,
106 checkpoint_service: Arc<C>,
107 cache_reader: Arc<dyn ObjectCacheRead>,
110 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
113 committee: ConsensusCommittee,
116 metrics: Arc<AuthorityMetrics>,
119 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
121 transaction_scheduler: AsyncTransactionScheduler,
122}
123
124const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
125
126impl<C> ConsensusHandler<C> {
127 pub fn new(
128 epoch_store: Arc<AuthorityPerEpochStore>,
129 checkpoint_service: Arc<C>,
130 transaction_manager: Arc<TransactionManager>,
131 cache_reader: Arc<dyn ObjectCacheRead>,
132 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
133 committee: ConsensusCommittee,
134 metrics: Arc<AuthorityMetrics>,
135 ) -> Self {
136 let mut last_consensus_stats = epoch_store
138 .get_last_consensus_stats()
139 .expect("Should be able to read last consensus index");
140 if !last_consensus_stats.stats.is_initialized() {
142 last_consensus_stats.stats = ConsensusStats::new(committee.size());
143 }
144 let transaction_scheduler =
145 AsyncTransactionScheduler::start(transaction_manager, epoch_store.clone());
146 Self {
147 epoch_store,
148 last_consensus_stats,
149 checkpoint_service,
150 cache_reader,
151 low_scoring_authorities,
152 committee,
153 metrics,
154 processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()),
155 transaction_scheduler,
156 }
157 }
158
159 pub fn last_processed_subdag_index(&self) -> u64 {
161 self.last_consensus_stats.index.sub_dag_index
162 }
163}
164
165impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
166 #[instrument(level = "debug", skip_all)]
167 async fn handle_consensus_output(&mut self, consensus_output: impl ConsensusOutputAPI) {
168 let _scope = monitored_scope("HandleConsensusOutput");
169
170 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
171
172 let round = consensus_output.leader_round();
173
174 assert!(round >= last_committed_round);
178 if last_committed_round == round {
179 warn!(
184 "Ignoring consensus output for round {} as it is already committed. NOTE: This is only expected if consensus is running.",
185 round
186 );
187 return;
188 }
189
190 let mut transactions = vec![];
192 let leader_author = consensus_output.leader_author_index();
193 let commit_sub_dag_index = consensus_output.commit_sub_dag_index();
194
195 debug!(
196 %consensus_output,
197 epoch = ?self.epoch_store.epoch(),
198 "Received consensus output"
199 );
200
201 let execution_index = ExecutionIndices {
202 last_committed_round: round,
203 sub_dag_index: commit_sub_dag_index,
204 transaction_index: 0_u64,
205 };
206 assert!(self.last_consensus_stats.index < execution_index);
209
210 self.last_consensus_stats.index = execution_index;
215
216 let new_jwks = self
226 .epoch_store
227 .get_new_jwks(last_committed_round)
228 .expect("Unrecoverable error in consensus handler");
229
230 if !new_jwks.is_empty() {
231 let authenticator_state_update_transaction =
232 self.authenticator_state_update_transaction(round, new_jwks);
233 debug!(
234 "adding AuthenticatorStateUpdateV1({:?}) tx: {:?}",
235 authenticator_state_update_transaction.digest(),
236 authenticator_state_update_transaction,
237 );
238
239 transactions.push((
240 SequencedConsensusTransactionKind::System(authenticator_state_update_transaction),
241 leader_author,
242 ));
243 }
244
245 update_low_scoring_authorities(
246 self.low_scoring_authorities.clone(),
247 self.epoch_store.committee(),
248 &self.committee,
249 consensus_output.reputation_score_sorted_desc(),
250 &self.metrics,
251 self.epoch_store
252 .protocol_config()
253 .consensus_bad_nodes_stake_threshold(),
254 );
255
256 self.metrics
257 .consensus_committed_subdags
258 .with_label_values(&[&leader_author.to_string()])
259 .inc();
260
261 {
262 let span = trace_span!("process_consensus_certs");
263 let _guard = span.enter();
264 for (authority_index, authority_transactions) in consensus_output.transactions() {
265 self.last_consensus_stats
267 .stats
268 .inc_num_messages(authority_index as usize);
269 for (transaction, serialized_len) in authority_transactions {
270 let kind = classify(&transaction);
271 self.metrics
272 .consensus_handler_processed
273 .with_label_values(&[kind])
274 .inc();
275 self.metrics
276 .consensus_handler_transaction_sizes
277 .with_label_values(&[kind])
278 .observe(serialized_len as f64);
279 if matches!(
280 &transaction.kind,
281 ConsensusTransactionKind::CertifiedTransaction(_)
282 ) {
283 self.last_consensus_stats
284 .stats
285 .inc_num_user_transactions(authority_index as usize);
286 }
287 let transaction = SequencedConsensusTransactionKind::External(transaction);
288 transactions.push((transaction, authority_index));
289 }
290 }
291 }
292
293 for (i, authority) in self.committee.authorities() {
294 let hostname = &authority.hostname;
295 self.metrics
296 .consensus_committed_messages
297 .with_label_values(&[hostname])
298 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
299 self.metrics
300 .consensus_committed_user_transactions
301 .with_label_values(&[hostname])
302 .set(
303 self.last_consensus_stats
304 .stats
305 .get_num_user_transactions(i.value()) as i64,
306 );
307 }
308
309 let mut all_transactions = Vec::new();
310 {
311 let mut processed_set = HashSet::new();
315
316 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
317 let current_tx_index = ExecutionIndices {
322 last_committed_round: round,
323 sub_dag_index: commit_sub_dag_index,
324 transaction_index: (seq + 1) as u64,
325 };
326
327 self.last_consensus_stats.index = current_tx_index;
328
329 let certificate_author = *self
330 .epoch_store
331 .committee()
332 .authority_by_index(cert_origin)
333 .unwrap();
334
335 let sequenced_transaction = SequencedConsensusTransaction {
336 certificate_author_index: cert_origin,
337 certificate_author,
338 consensus_index: current_tx_index,
339 transaction,
340 };
341
342 let key = sequenced_transaction.key();
343 let in_set = !processed_set.insert(key);
344 let in_cache = self
345 .processed_cache
346 .put(sequenced_transaction.key(), ())
347 .is_some();
348
349 if in_set || in_cache {
350 self.metrics.skipped_consensus_txns_cache_hit.inc();
351 continue;
352 }
353
354 all_transactions.push(sequenced_transaction);
355 }
356 }
357
358 let transactions_to_schedule = self
359 .epoch_store
360 .process_consensus_transactions_and_commit_boundary(
361 all_transactions,
362 &self.last_consensus_stats,
363 &self.checkpoint_service,
364 self.cache_reader.as_ref(),
365 &ConsensusCommitInfo::new(&consensus_output),
366 &self.metrics,
367 )
368 .await
369 .expect("Unrecoverable error in consensus handler");
370
371 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
372 let key = [commit_sub_dag_index, self.epoch_store.epoch()];
373 if iota_simulator::random::deterministic_probability(key, 0.01) {
374 iota_simulator::task::kill_current_node(None);
375 }
376 });
377
378 fail_point_async!("crash"); self.transaction_scheduler
381 .schedule(transactions_to_schedule)
382 .await;
383 }
384}
385
386struct AsyncTransactionScheduler {
387 sender: tokio::sync::mpsc::Sender<Vec<VerifiedExecutableTransaction>>,
388}
389
390impl AsyncTransactionScheduler {
391 pub fn start(
392 transaction_manager: Arc<TransactionManager>,
393 epoch_store: Arc<AuthorityPerEpochStore>,
394 ) -> Self {
395 let (sender, recv) = tokio::sync::mpsc::channel(16);
396 spawn_monitored_task!(Self::run(recv, transaction_manager, epoch_store));
397 Self { sender }
398 }
399
400 pub async fn schedule(&self, transactions: Vec<VerifiedExecutableTransaction>) {
401 self.sender.send(transactions).await.ok();
402 }
403
404 pub async fn run(
405 mut recv: tokio::sync::mpsc::Receiver<Vec<VerifiedExecutableTransaction>>,
406 transaction_manager: Arc<TransactionManager>,
407 epoch_store: Arc<AuthorityPerEpochStore>,
408 ) {
409 while let Some(transactions) = recv.recv().await {
410 let _guard = monitored_scope("ConsensusHandler::enqueue");
411 transaction_manager.enqueue(transactions, &epoch_store);
412 }
413 }
414}
415
416pub struct MysticetiConsensusHandler {
421 handle: Option<tokio::task::JoinHandle<()>>,
422}
423
424impl MysticetiConsensusHandler {
425 pub fn new(
426 mut consensus_handler: ConsensusHandler<CheckpointService>,
427 mut receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
428 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
429 ) -> Self {
430 let handle = spawn_monitored_task!(async move {
431 while let Some(consensus_output) = receiver.recv().await {
434 let commit_index = consensus_output.commit_ref.index;
435 consensus_handler
436 .handle_consensus_output(consensus_output)
437 .await;
438 commit_consumer_monitor.set_highest_handled_commit(commit_index);
439 }
440 });
441 Self {
442 handle: Some(handle),
443 }
444 }
445
446 pub async fn abort(&mut self) {
447 if let Some(handle) = self.handle.take() {
448 handle.abort();
449 let _ = handle.await;
450 }
451 }
452}
453
454impl Drop for MysticetiConsensusHandler {
455 fn drop(&mut self) {
456 if let Some(handle) = self.handle.take() {
457 handle.abort();
458 }
459 }
460}
461
462impl<C> ConsensusHandler<C> {
463 fn authenticator_state_update_transaction(
464 &self,
465 round: u64,
466 mut new_active_jwks: Vec<ActiveJwk>,
467 ) -> VerifiedExecutableTransaction {
468 new_active_jwks.sort();
469
470 info!("creating authenticator state update transaction");
471 assert!(self.epoch_store.authenticator_state_enabled());
472 let transaction = VerifiedTransaction::new_authenticator_state_update(
473 self.epoch(),
474 round,
475 new_active_jwks,
476 self.epoch_store
477 .epoch_start_config()
478 .authenticator_obj_initial_shared_version()
479 .expect("authenticator state obj must exist"),
480 );
481 VerifiedExecutableTransaction::new_system(transaction, self.epoch())
482 }
483
484 fn epoch(&self) -> EpochId {
485 self.epoch_store.epoch()
486 }
487}
488
489pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
490 match &transaction.kind {
491 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
492 if certificate.contains_shared_object() {
493 "shared_certificate"
494 } else {
495 "owned_certificate"
496 }
497 }
498 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
499 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
500 ConsensusTransactionKind::CapabilityNotificationV1(_) => "capability_notification_v1",
501 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
502 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
503 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
504 }
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
508pub struct SequencedConsensusTransaction {
509 pub certificate_author_index: AuthorityIndex,
510 pub certificate_author: AuthorityName,
511 pub consensus_index: ExecutionIndices,
512 pub transaction: SequencedConsensusTransactionKind,
513}
514
515#[derive(Debug, Clone)]
516pub enum SequencedConsensusTransactionKind {
517 External(ConsensusTransaction),
518 System(VerifiedExecutableTransaction),
519}
520
521impl Serialize for SequencedConsensusTransactionKind {
522 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
523 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
524 serializable.serialize(serializer)
525 }
526}
527
528impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
529 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
530 let serializable =
531 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
532 Ok(serializable.into())
533 }
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize)]
540enum SerializableSequencedConsensusTransactionKind {
541 External(ConsensusTransaction),
542 System(TrustedExecutableTransaction),
543}
544
545impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
546 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
547 match kind {
548 SequencedConsensusTransactionKind::External(ext) => {
549 SerializableSequencedConsensusTransactionKind::External(ext.clone())
550 }
551 SequencedConsensusTransactionKind::System(txn) => {
552 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
553 }
554 }
555 }
556}
557
558impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
559 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
560 match kind {
561 SerializableSequencedConsensusTransactionKind::External(ext) => {
562 SequencedConsensusTransactionKind::External(ext)
563 }
564 SerializableSequencedConsensusTransactionKind::System(txn) => {
565 SequencedConsensusTransactionKind::System(txn.into())
566 }
567 }
568 }
569}
570
571#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
572pub enum SequencedConsensusTransactionKey {
573 External(ConsensusTransactionKey),
574 System(TransactionDigest),
575}
576
577impl SequencedConsensusTransactionKind {
578 pub fn key(&self) -> SequencedConsensusTransactionKey {
579 match self {
580 SequencedConsensusTransactionKind::External(ext) => {
581 SequencedConsensusTransactionKey::External(ext.key())
582 }
583 SequencedConsensusTransactionKind::System(txn) => {
584 SequencedConsensusTransactionKey::System(*txn.digest())
585 }
586 }
587 }
588
589 pub fn get_tracking_id(&self) -> u64 {
590 match self {
591 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
592 SequencedConsensusTransactionKind::System(_txn) => 0,
593 }
594 }
595
596 pub fn is_executable_transaction(&self) -> bool {
597 match self {
598 SequencedConsensusTransactionKind::External(ext) => ext.is_user_certificate(),
599 SequencedConsensusTransactionKind::System(_) => true,
600 }
601 }
602
603 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
604 match self {
605 SequencedConsensusTransactionKind::External(ext) => {
606 if let ConsensusTransactionKind::CertifiedTransaction(txn) = &ext.kind {
607 Some(*txn.digest())
608 } else {
609 None
610 }
611 }
612 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
613 }
614 }
615
616 pub fn is_end_of_publish(&self) -> bool {
617 match self {
618 SequencedConsensusTransactionKind::External(ext) => {
619 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
620 }
621 SequencedConsensusTransactionKind::System(_) => false,
622 }
623 }
624}
625
626impl SequencedConsensusTransaction {
627 pub fn sender_authority(&self) -> AuthorityName {
628 self.certificate_author
629 }
630
631 pub fn key(&self) -> SequencedConsensusTransactionKey {
632 self.transaction.key()
633 }
634
635 pub fn is_end_of_publish(&self) -> bool {
636 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
637 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
638 } else {
639 false
640 }
641 }
642
643 pub fn is_system(&self) -> bool {
644 matches!(
645 self.transaction,
646 SequencedConsensusTransactionKind::System(_)
647 )
648 }
649
650 pub fn is_user_tx_with_randomness(&self) -> bool {
651 let SequencedConsensusTransactionKind::External(ConsensusTransaction {
652 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
653 ..
654 }) = &self.transaction
655 else {
656 return false;
657 };
658 certificate.transaction_data().uses_randomness()
659 }
660
661 pub fn as_shared_object_txn(&self) -> Option<&SenderSignedData> {
662 match &self.transaction {
663 SequencedConsensusTransactionKind::External(ConsensusTransaction {
664 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
665 ..
666 }) if certificate.contains_shared_object() => Some(certificate.data()),
667 SequencedConsensusTransactionKind::System(txn) if txn.contains_shared_object() => {
668 Some(txn.data())
669 }
670 _ => None,
671 }
672 }
673}
674
675#[derive(Debug, Clone, Serialize, Deserialize)]
676pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
677
678#[cfg(test)]
679impl VerifiedSequencedConsensusTransaction {
680 pub fn new_test(transaction: ConsensusTransaction) -> Self {
681 Self(SequencedConsensusTransaction::new_test(transaction))
682 }
683}
684
685impl SequencedConsensusTransaction {
686 pub fn new_test(transaction: ConsensusTransaction) -> Self {
687 Self {
688 certificate_author_index: 0,
689 certificate_author: AuthorityName::ZERO,
690 consensus_index: Default::default(),
691 transaction: SequencedConsensusTransactionKind::External(transaction),
692 }
693 }
694}
695
696pub struct ConsensusCommitInfo {
698 pub round: u64,
699 pub timestamp: u64,
700 pub consensus_commit_digest: ConsensusCommitDigest,
701
702 #[cfg(any(test, feature = "test-utils"))]
703 skip_consensus_commit_prologue_in_test: bool,
704}
705
706impl ConsensusCommitInfo {
707 fn new(consensus_output: &impl ConsensusOutputAPI) -> Self {
708 Self {
709 round: consensus_output.leader_round(),
710 timestamp: consensus_output.commit_timestamp_ms(),
711 consensus_commit_digest: consensus_output.consensus_digest(),
712
713 #[cfg(any(test, feature = "test-utils"))]
714 skip_consensus_commit_prologue_in_test: false,
715 }
716 }
717
718 #[cfg(any(test, feature = "test-utils"))]
719 pub fn new_for_test(
720 commit_round: u64,
721 commit_timestamp: u64,
722 skip_consensus_commit_prologue_in_test: bool,
723 ) -> Self {
724 Self {
725 round: commit_round,
726 timestamp: commit_timestamp,
727 consensus_commit_digest: ConsensusCommitDigest::default(),
728 skip_consensus_commit_prologue_in_test,
729 }
730 }
731
732 #[cfg(any(test, feature = "test-utils"))]
733 pub fn skip_consensus_commit_prologue_in_test(&self) -> bool {
734 self.skip_consensus_commit_prologue_in_test
735 }
736
737 fn consensus_commit_prologue_v1_transaction(
738 &self,
739 epoch: u64,
740 cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
741 ) -> VerifiedExecutableTransaction {
742 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v1(
743 epoch,
744 self.round,
745 self.timestamp,
746 self.consensus_commit_digest,
747 cancelled_txn_version_assignment,
748 );
749 VerifiedExecutableTransaction::new_system(transaction, epoch)
750 }
751
752 pub fn create_consensus_commit_prologue_transaction(
753 &self,
754 epoch: u64,
755 cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
756 ) -> VerifiedExecutableTransaction {
757 self.consensus_commit_prologue_v1_transaction(epoch, cancelled_txn_version_assignment)
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use consensus_core::{
764 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
765 };
766 use iota_protocol_config::{Chain, ConsensusTransactionOrdering};
767 use iota_types::{
768 base_types::{AuthorityName, IotaAddress, random_object_ref},
769 committee::Committee,
770 messages_consensus::{
771 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
772 },
773 object::Object,
774 supported_protocol_versions::{
775 SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
776 },
777 transaction::{
778 CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
779 },
780 };
781 use prometheus::Registry;
782
783 use super::*;
784 use crate::{
785 authority::{
786 authority_per_epoch_store::ConsensusStatsAPI,
787 test_authority_builder::TestAuthorityBuilder,
788 },
789 checkpoints::CheckpointServiceNoop,
790 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
791 post_consensus_tx_reorder::PostConsensusTxReorder,
792 };
793
794 #[tokio::test]
795 pub async fn test_consensus_handler() {
796 let mut objects = test_gas_objects();
798 let shared_object = Object::shared_for_testing();
799 objects.push(shared_object.clone());
800
801 let network_config =
802 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
803 .with_objects(objects.clone())
804 .build();
805
806 let state = TestAuthorityBuilder::new()
807 .with_network_config(&network_config, 0)
808 .build()
809 .await;
810
811 let epoch_store = state.epoch_store_for_testing().clone();
812 let new_epoch_start_state = epoch_store.epoch_start_state();
813 let consensus_committee = new_epoch_start_state.get_consensus_committee();
814
815 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
816
817 let mut consensus_handler = ConsensusHandler::new(
818 epoch_store,
819 Arc::new(CheckpointServiceNoop {}),
820 state.transaction_manager().clone(),
821 state.get_object_cache_reader().clone(),
822 Arc::new(ArcSwap::default()),
823 consensus_committee.clone(),
824 metrics,
825 );
826
827 let transactions = test_certificates(&state, shared_object).await;
830 let mut blocks = Vec::new();
831
832 for (i, transaction) in transactions.iter().enumerate() {
833 let transaction_bytes: Vec<u8> = bcs::to_bytes(
834 &ConsensusTransaction::new_certificate_message(&state.name, transaction.clone()),
835 )
836 .unwrap();
837
838 let block = VerifiedBlock::new_for_test(
840 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
841 .set_transactions(vec![Transaction::new(transaction_bytes)])
842 .build(),
843 );
844
845 blocks.push(block);
846 }
847
848 let leader_block = blocks[0].clone();
850 let committed_sub_dag = CommittedSubDag::new(
851 leader_block.reference(),
852 blocks.clone(),
853 leader_block.timestamp_ms(),
854 CommitRef::new(10, CommitDigest::MIN),
855 vec![],
856 );
857
858 consensus_handler
860 .handle_consensus_output(committed_sub_dag.clone())
861 .await;
862
863 let num_blocks = blocks.len();
865 let num_transactions = transactions.len();
866 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
867 assert_eq!(
868 last_consensus_stats_1.index.transaction_index,
869 num_transactions as u64
870 );
871 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
872 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
873 assert_eq!(last_consensus_stats_1.hash, 0);
874 assert_eq!(
875 last_consensus_stats_1.stats.get_num_messages(0),
876 num_blocks as u64
877 );
878 assert_eq!(
879 last_consensus_stats_1.stats.get_num_user_transactions(0),
880 num_transactions as u64
881 );
882
883 for _ in 0..2 {
886 consensus_handler
887 .handle_consensus_output(committed_sub_dag.clone())
888 .await;
889 let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone();
890 assert_eq!(last_consensus_stats_1, last_consensus_stats_2);
891 }
892 }
893
894 #[test]
895 fn test_order_by_gas_price() {
896 let chain = Chain::Unknown;
897 let mut v = vec![
898 cap_txn(10, chain),
899 user_txn(42),
900 user_txn(100),
901 cap_txn(1, chain),
902 ];
903 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
904 assert_eq!(
905 extract(v),
906 vec![
907 "cap(10)".to_string(),
908 "cap(1)".to_string(),
909 "user(100)".to_string(),
910 "user(42)".to_string(),
911 ]
912 );
913
914 let mut v = vec![
915 user_txn(1200),
916 cap_txn(10, chain),
917 user_txn(12),
918 user_txn(1000),
919 user_txn(42),
920 user_txn(100),
921 cap_txn(1, chain),
922 user_txn(1000),
923 ];
924 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
925 assert_eq!(
926 extract(v),
927 vec![
928 "cap(10)".to_string(),
929 "cap(1)".to_string(),
930 "user(1200)".to_string(),
931 "user(1000)".to_string(),
932 "user(1000)".to_string(),
933 "user(100)".to_string(),
934 "user(42)".to_string(),
935 "user(12)".to_string(),
936 ]
937 );
938
939 let mut v = vec![
941 cap_txn(10, chain),
942 eop_txn(12),
943 eop_txn(10),
944 cap_txn(1, chain),
945 eop_txn(11),
946 ];
947 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
948 assert_eq!(
949 extract(v),
950 vec![
951 "cap(10)".to_string(),
952 "eop(12)".to_string(),
953 "eop(10)".to_string(),
954 "cap(1)".to_string(),
955 "eop(11)".to_string(),
956 ]
957 );
958 }
959
960 fn extract(v: Vec<VerifiedSequencedConsensusTransaction>) -> Vec<String> {
961 v.into_iter().map(extract_one).collect()
962 }
963
964 fn extract_one(t: VerifiedSequencedConsensusTransaction) -> String {
965 match t.0.transaction {
966 SequencedConsensusTransactionKind::External(ext) => match ext.kind {
967 ConsensusTransactionKind::EndOfPublish(authority) => {
968 format!("eop({})", authority.0[0])
969 }
970 ConsensusTransactionKind::CapabilityNotificationV1(cap) => {
971 format!("cap({})", cap.generation)
972 }
973 ConsensusTransactionKind::CertifiedTransaction(txn) => {
974 format!("user({})", txn.transaction_data().gas_price())
975 }
976 _ => unreachable!(),
977 },
978 SequencedConsensusTransactionKind::System(_) => unreachable!(),
979 }
980 }
981
982 fn eop_txn(a: u8) -> VerifiedSequencedConsensusTransaction {
983 let mut authority = AuthorityName::default();
984 authority.0[0] = a;
985 txn(ConsensusTransactionKind::EndOfPublish(authority))
986 }
987
988 fn cap_txn(generation: u64, chain: Chain) -> VerifiedSequencedConsensusTransaction {
989 txn(ConsensusTransactionKind::CapabilityNotificationV1(
990 AuthorityCapabilitiesV1 {
992 authority: Default::default(),
993 generation,
994 supported_protocol_versions:
995 SupportedProtocolVersionsWithHashes::from_supported_versions(
996 SupportedProtocolVersions::SYSTEM_DEFAULT,
997 chain,
998 ),
999 available_system_packages: vec![],
1000 },
1001 ))
1002 }
1003
1004 fn user_txn(gas_price: u64) -> VerifiedSequencedConsensusTransaction {
1005 let (committee, keypairs) = Committee::new_simple_test_committee();
1006 let data = SenderSignedData::new(
1007 TransactionData::new_transfer(
1008 IotaAddress::default(),
1009 random_object_ref(),
1010 IotaAddress::default(),
1011 random_object_ref(),
1012 1000 * gas_price,
1013 gas_price,
1014 ),
1015 vec![],
1016 );
1017 txn(ConsensusTransactionKind::CertifiedTransaction(Box::new(
1018 CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
1019 )))
1020 }
1021
1022 fn txn(kind: ConsensusTransactionKind) -> VerifiedSequencedConsensusTransaction {
1023 VerifiedSequencedConsensusTransaction::new_test(ConsensusTransaction {
1024 kind,
1025 tracking_id: Default::default(),
1026 })
1027 }
1028}