iota_core/
mock_consensus.rs1use std::sync::{Arc, Weak};
6
7use consensus_core::BlockRef;
8use iota_types::{
9 error::{IotaError, IotaResult},
10 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
11 transaction::VerifiedCertificate,
12};
13use prometheus::Registry;
14use tokio::{
15 sync::{mpsc, oneshot},
16 task::JoinHandle,
17};
18use tracing::debug;
19
20use crate::{
21 authority::{
22 AuthorityMetrics, AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
23 },
24 checkpoints::CheckpointServiceNoop,
25 consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus},
26 consensus_handler::SequencedConsensusTransaction,
27};
28pub struct MockConsensusClient {
29 tx_sender: mpsc::Sender<ConsensusTransaction>,
30 _consensus_handle: JoinHandle<()>,
31}
32
33pub enum ConsensusMode {
34 Noop,
36 DirectSequencing,
38}
39
40impl MockConsensusClient {
41 pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
42 let (tx_sender, tx_receiver) = mpsc::channel(1000000);
43 let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
44 Self {
45 tx_sender,
46 _consensus_handle,
47 }
48 }
49
50 pub fn run(
51 validator: Weak<AuthorityState>,
52 tx_receiver: mpsc::Receiver<ConsensusTransaction>,
53 consensus_mode: ConsensusMode,
54 ) -> JoinHandle<()> {
55 tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
56 }
57
58 async fn run_impl(
59 validator: Weak<AuthorityState>,
60 mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
61 consensus_mode: ConsensusMode,
62 ) {
63 let checkpoint_service = Arc::new(CheckpointServiceNoop {});
64 let authority_metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
65 while let Some(tx) = tx_receiver.recv().await {
66 let Some(validator) = validator.upgrade() else {
67 debug!("validator shut down; exiting MockConsensusClient");
68 return;
69 };
70 let epoch_store = validator.epoch_store_for_testing();
71 match consensus_mode {
72 ConsensusMode::Noop => {}
73 ConsensusMode::DirectSequencing => {
74 epoch_store
75 .process_consensus_transactions_for_tests(
76 vec![SequencedConsensusTransaction::new_test(tx.clone())],
77 &checkpoint_service,
78 validator.get_object_cache_reader().as_ref(),
79 &authority_metrics,
80 true,
81 )
82 .await
83 .unwrap();
84 }
85 }
86 if let ConsensusTransactionKind::UserTransaction(tx) = tx.kind {
87 if tx.contains_shared_object() {
88 validator.enqueue_certificates_for_execution(
89 vec![VerifiedCertificate::new_unchecked(*tx)],
90 &epoch_store,
91 );
92 }
93 }
94 }
95 }
96}
97
98#[async_trait::async_trait]
99impl SubmitToConsensus for MockConsensusClient {
100 async fn submit_to_consensus(
101 &self,
102 transactions: &[ConsensusTransaction],
103 epoch_store: &Arc<AuthorityPerEpochStore>,
104 ) -> IotaResult {
105 self.submit(transactions, epoch_store)
106 .await
107 .map(|_response| ())
108 }
109}
110
111#[async_trait::async_trait]
112impl ConsensusClient for MockConsensusClient {
113 async fn submit(
114 &self,
115 transactions: &[ConsensusTransaction],
116 _epoch_store: &Arc<AuthorityPerEpochStore>,
117 ) -> IotaResult<BlockStatusReceiver> {
118 assert!(transactions.len() == 1);
120 let transaction = &transactions[0];
121 self.tx_sender
122 .send(transaction.clone())
123 .await
124 .map_err(|e| IotaError::Unknown(e.to_string()))?;
125 Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
126 BlockRef::MIN,
127 )))
128 }
129}
130
131pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
132 let (tx, rx) = oneshot::channel();
133 tx.send(status).ok();
134 rx
135}