iota_core/
mock_consensus.rs1use std::sync::{Arc, Weak};
6
7use consensus_core::BlockRef;
8use iota_types::{
9 error::{IotaError, IotaResult},
10 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
11 transaction::VerifiedCertificate,
12};
13use prometheus::Registry;
14use tokio::{
15 sync::{mpsc, oneshot},
16 task::JoinHandle,
17};
18use tracing::debug;
19
20use crate::{
21 authority::{
22 AuthorityMetrics, AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
23 },
24 checkpoints::CheckpointServiceNoop,
25 consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus},
26 consensus_handler::SequencedConsensusTransaction,
27};
28pub struct MockConsensusClient {
29 tx_sender: mpsc::Sender<ConsensusTransaction>,
30 _consensus_handle: JoinHandle<()>,
31}
32
33pub enum ConsensusMode {
34 Noop,
36 DirectSequencing,
38}
39
40impl MockConsensusClient {
41 pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
42 let (tx_sender, tx_receiver) = mpsc::channel(1000000);
43 let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
44 Self {
45 tx_sender,
46 _consensus_handle,
47 }
48 }
49
50 pub fn run(
51 validator: Weak<AuthorityState>,
52 tx_receiver: mpsc::Receiver<ConsensusTransaction>,
53 consensus_mode: ConsensusMode,
54 ) -> JoinHandle<()> {
55 tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
56 }
57
58 async fn run_impl(
59 validator: Weak<AuthorityState>,
60 mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
61 consensus_mode: ConsensusMode,
62 ) {
63 let checkpoint_service = Arc::new(CheckpointServiceNoop {});
64 let authority_metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
65 while let Some(tx) = tx_receiver.recv().await {
66 let Some(validator) = validator.upgrade() else {
67 debug!("validator shut down; exiting MockConsensusClient");
68 return;
69 };
70 let epoch_store = validator.epoch_store_for_testing();
71 match consensus_mode {
72 ConsensusMode::Noop => {}
73 ConsensusMode::DirectSequencing => {
74 epoch_store
75 .process_consensus_transactions_for_tests(
76 vec![SequencedConsensusTransaction::new_test(tx.clone())],
77 &checkpoint_service,
78 validator.get_object_cache_reader().as_ref(),
79 &authority_metrics,
80 true,
81 )
82 .await
83 .unwrap();
84 }
85 }
86 if let ConsensusTransactionKind::CertifiedTransaction(tx) = tx.kind {
87 if tx.contains_shared_object() {
88 validator.enqueue_certificates_for_execution(
89 vec![VerifiedCertificate::new_unchecked(*tx)],
90 &epoch_store,
91 );
92 }
93 }
94 }
95 }
96
97 fn submit_impl(
98 &self,
99 transactions: &[ConsensusTransaction],
100 ) -> IotaResult<BlockStatusReceiver> {
101 assert!(transactions.len() == 1);
103 let transaction = &transactions[0];
104 self.tx_sender
105 .try_send(transaction.clone())
106 .map_err(|_| IotaError::from("MockConsensusClient channel overflowed"))?;
107 Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
108 BlockRef::MIN,
109 )))
110 }
111}
112
113impl SubmitToConsensus for MockConsensusClient {
114 fn submit_to_consensus(
115 &self,
116 transactions: &[ConsensusTransaction],
117 _epoch_store: &Arc<AuthorityPerEpochStore>,
118 ) -> IotaResult {
119 self.submit_impl(transactions).map(|_response| ())
120 }
121}
122
123#[async_trait::async_trait]
124impl ConsensusClient for MockConsensusClient {
125 async fn submit(
126 &self,
127 transactions: &[ConsensusTransaction],
128 _epoch_store: &Arc<AuthorityPerEpochStore>,
129 ) -> IotaResult<BlockStatusReceiver> {
130 self.submit_impl(transactions)
131 }
132}
133
134pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
135 let (tx, rx) = oneshot::channel();
136 tx.send(status).ok();
137 rx
138}