iota_core/
mock_consensus.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, Weak};
6
7use consensus_core::BlockRef;
8use iota_types::{
9    error::{IotaError, IotaResult},
10    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
11    transaction::VerifiedCertificate,
12};
13use prometheus::Registry;
14use tokio::{
15    sync::{mpsc, oneshot},
16    task::JoinHandle,
17};
18use tracing::debug;
19
20use crate::{
21    authority::{
22        AuthorityMetrics, AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
23    },
24    checkpoints::CheckpointServiceNoop,
25    consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus},
26    consensus_handler::SequencedConsensusTransaction,
27};
28pub struct MockConsensusClient {
29    tx_sender: mpsc::Sender<ConsensusTransaction>,
30    _consensus_handle: JoinHandle<()>,
31}
32
33pub enum ConsensusMode {
34    // ConsensusClient does absolutely nothing when receiving a transaction
35    Noop,
36    // ConsensusClient directly sequences the transaction into the store.
37    DirectSequencing,
38}
39
40impl MockConsensusClient {
41    pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
42        let (tx_sender, tx_receiver) = mpsc::channel(1000000);
43        let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
44        Self {
45            tx_sender,
46            _consensus_handle,
47        }
48    }
49
50    pub fn run(
51        validator: Weak<AuthorityState>,
52        tx_receiver: mpsc::Receiver<ConsensusTransaction>,
53        consensus_mode: ConsensusMode,
54    ) -> JoinHandle<()> {
55        tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
56    }
57
58    async fn run_impl(
59        validator: Weak<AuthorityState>,
60        mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
61        consensus_mode: ConsensusMode,
62    ) {
63        let checkpoint_service = Arc::new(CheckpointServiceNoop {});
64        let authority_metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
65        while let Some(tx) = tx_receiver.recv().await {
66            let Some(validator) = validator.upgrade() else {
67                debug!("validator shut down; exiting MockConsensusClient");
68                return;
69            };
70            let epoch_store = validator.epoch_store_for_testing();
71            match consensus_mode {
72                ConsensusMode::Noop => {}
73                ConsensusMode::DirectSequencing => {
74                    epoch_store
75                        .process_consensus_transactions_for_tests(
76                            vec![SequencedConsensusTransaction::new_test(tx.clone())],
77                            &checkpoint_service,
78                            validator.get_object_cache_reader().as_ref(),
79                            &authority_metrics,
80                            true,
81                        )
82                        .await
83                        .unwrap();
84                }
85            }
86            if let ConsensusTransactionKind::CertifiedTransaction(tx) = tx.kind {
87                if tx.contains_shared_object() {
88                    validator.enqueue_certificates_for_execution(
89                        vec![VerifiedCertificate::new_unchecked(*tx)],
90                        &epoch_store,
91                    );
92                }
93            }
94        }
95    }
96
97    fn submit_impl(
98        &self,
99        transactions: &[ConsensusTransaction],
100    ) -> IotaResult<BlockStatusReceiver> {
101        // TODO: maybe support multi-transactions and remove this check
102        assert!(transactions.len() == 1);
103        let transaction = &transactions[0];
104        self.tx_sender
105            .try_send(transaction.clone())
106            .map_err(|_| IotaError::from("MockConsensusClient channel overflowed"))?;
107        Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
108            BlockRef::MIN,
109        )))
110    }
111}
112
113impl SubmitToConsensus for MockConsensusClient {
114    fn submit_to_consensus(
115        &self,
116        transactions: &[ConsensusTransaction],
117        _epoch_store: &Arc<AuthorityPerEpochStore>,
118    ) -> IotaResult {
119        self.submit_impl(transactions).map(|_response| ())
120    }
121}
122
123#[async_trait::async_trait]
124impl ConsensusClient for MockConsensusClient {
125    async fn submit(
126        &self,
127        transactions: &[ConsensusTransaction],
128        _epoch_store: &Arc<AuthorityPerEpochStore>,
129    ) -> IotaResult<BlockStatusReceiver> {
130        self.submit_impl(transactions)
131    }
132}
133
134pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
135    let (tx, rx) = oneshot::channel();
136    tx.send(status).ok();
137    rx
138}