iota_core/
mock_consensus.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, Weak};
6
7use consensus_core::BlockRef;
8use iota_types::{
9    error::{IotaError, IotaResult},
10    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
11    transaction::VerifiedCertificate,
12};
13use prometheus::Registry;
14use tokio::{
15    sync::{mpsc, oneshot},
16    task::JoinHandle,
17};
18use tracing::debug;
19
20use crate::{
21    authority::{
22        AuthorityMetrics, AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
23    },
24    checkpoints::CheckpointServiceNoop,
25    consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus},
26    consensus_handler::SequencedConsensusTransaction,
27};
28pub struct MockConsensusClient {
29    tx_sender: mpsc::Sender<ConsensusTransaction>,
30    _consensus_handle: JoinHandle<()>,
31}
32
33pub enum ConsensusMode {
34    // ConsensusClient does absolutely nothing when receiving a transaction
35    Noop,
36    // ConsensusClient directly sequences the transaction into the store.
37    DirectSequencing,
38}
39
40impl MockConsensusClient {
41    pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
42        let (tx_sender, tx_receiver) = mpsc::channel(1000000);
43        let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
44        Self {
45            tx_sender,
46            _consensus_handle,
47        }
48    }
49
50    pub fn run(
51        validator: Weak<AuthorityState>,
52        tx_receiver: mpsc::Receiver<ConsensusTransaction>,
53        consensus_mode: ConsensusMode,
54    ) -> JoinHandle<()> {
55        tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
56    }
57
58    async fn run_impl(
59        validator: Weak<AuthorityState>,
60        mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
61        consensus_mode: ConsensusMode,
62    ) {
63        let checkpoint_service = Arc::new(CheckpointServiceNoop {});
64        let authority_metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
65        while let Some(tx) = tx_receiver.recv().await {
66            let Some(validator) = validator.upgrade() else {
67                debug!("validator shut down; exiting MockConsensusClient");
68                return;
69            };
70            let epoch_store = validator.epoch_store_for_testing();
71            match consensus_mode {
72                ConsensusMode::Noop => {}
73                ConsensusMode::DirectSequencing => {
74                    epoch_store
75                        .process_consensus_transactions_for_tests(
76                            vec![SequencedConsensusTransaction::new_test(tx.clone())],
77                            &checkpoint_service,
78                            validator.get_object_cache_reader().as_ref(),
79                            validator.get_transaction_cache_reader().as_ref(),
80                            &authority_metrics,
81                            true,
82                        )
83                        .await
84                        .unwrap();
85                }
86            }
87            if let ConsensusTransactionKind::CertifiedTransaction(tx) = tx.kind {
88                if tx.contains_shared_object() {
89                    validator.enqueue_certificates_for_execution(
90                        vec![VerifiedCertificate::new_unchecked(*tx)],
91                        &epoch_store,
92                    );
93                }
94            }
95        }
96    }
97
98    fn submit_impl(
99        &self,
100        transactions: &[ConsensusTransaction],
101    ) -> IotaResult<BlockStatusReceiver> {
102        // TODO: maybe support multi-transactions and remove this check
103        assert!(transactions.len() == 1);
104        let transaction = &transactions[0];
105        self.tx_sender
106            .try_send(transaction.clone())
107            .map_err(|_| IotaError::from("MockConsensusClient channel overflowed"))?;
108        Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
109            BlockRef::MIN,
110        )))
111    }
112}
113
114impl SubmitToConsensus for MockConsensusClient {
115    fn submit_to_consensus(
116        &self,
117        transactions: &[ConsensusTransaction],
118        _epoch_store: &Arc<AuthorityPerEpochStore>,
119    ) -> IotaResult {
120        self.submit_impl(transactions).map(|_response| ())
121    }
122}
123
124#[async_trait::async_trait]
125impl ConsensusClient for MockConsensusClient {
126    async fn submit(
127        &self,
128        transactions: &[ConsensusTransaction],
129        _epoch_store: &Arc<AuthorityPerEpochStore>,
130    ) -> IotaResult<BlockStatusReceiver> {
131        self.submit_impl(transactions)
132    }
133}
134
135pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
136    let (tx, rx) = oneshot::channel();
137    tx.send(status.into()).ok();
138    rx
139}