iota_core/
mock_consensus.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, Weak};
6
7use consensus_core::BlockRef;
8use iota_types::{
9    error::{IotaError, IotaResult},
10    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
11    transaction::VerifiedCertificate,
12};
13use prometheus::Registry;
14use tokio::{
15    sync::{mpsc, oneshot},
16    task::JoinHandle,
17};
18use tracing::debug;
19
20use crate::{
21    authority::{
22        AuthorityMetrics, AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
23    },
24    checkpoints::CheckpointServiceNoop,
25    consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus},
26    consensus_handler::SequencedConsensusTransaction,
27};
28pub struct MockConsensusClient {
29    tx_sender: mpsc::Sender<ConsensusTransaction>,
30    _consensus_handle: JoinHandle<()>,
31}
32
33pub enum ConsensusMode {
34    // ConsensusClient does absolutely nothing when receiving a transaction
35    Noop,
36    // ConsensusClient directly sequences the transaction into the store.
37    DirectSequencing,
38}
39
40impl MockConsensusClient {
41    pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
42        let (tx_sender, tx_receiver) = mpsc::channel(1000000);
43        let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
44        Self {
45            tx_sender,
46            _consensus_handle,
47        }
48    }
49
50    pub fn run(
51        validator: Weak<AuthorityState>,
52        tx_receiver: mpsc::Receiver<ConsensusTransaction>,
53        consensus_mode: ConsensusMode,
54    ) -> JoinHandle<()> {
55        tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
56    }
57
58    async fn run_impl(
59        validator: Weak<AuthorityState>,
60        mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
61        consensus_mode: ConsensusMode,
62    ) {
63        let checkpoint_service = Arc::new(CheckpointServiceNoop {});
64        let authority_metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
65        while let Some(tx) = tx_receiver.recv().await {
66            let Some(validator) = validator.upgrade() else {
67                debug!("validator shut down; exiting MockConsensusClient");
68                return;
69            };
70            let epoch_store = validator.epoch_store_for_testing();
71            match consensus_mode {
72                ConsensusMode::Noop => {}
73                ConsensusMode::DirectSequencing => {
74                    epoch_store
75                        .process_consensus_transactions_for_tests(
76                            vec![SequencedConsensusTransaction::new_test(tx.clone())],
77                            &checkpoint_service,
78                            validator.get_object_cache_reader().as_ref(),
79                            &authority_metrics,
80                            true,
81                        )
82                        .await
83                        .unwrap();
84                }
85            }
86            if let ConsensusTransactionKind::UserTransaction(tx) = tx.kind {
87                if tx.contains_shared_object() {
88                    validator.enqueue_certificates_for_execution(
89                        vec![VerifiedCertificate::new_unchecked(*tx)],
90                        &epoch_store,
91                    );
92                }
93            }
94        }
95    }
96}
97
98#[async_trait::async_trait]
99impl SubmitToConsensus for MockConsensusClient {
100    async fn submit_to_consensus(
101        &self,
102        transactions: &[ConsensusTransaction],
103        epoch_store: &Arc<AuthorityPerEpochStore>,
104    ) -> IotaResult {
105        self.submit(transactions, epoch_store)
106            .await
107            .map(|_response| ())
108    }
109}
110
111#[async_trait::async_trait]
112impl ConsensusClient for MockConsensusClient {
113    async fn submit(
114        &self,
115        transactions: &[ConsensusTransaction],
116        _epoch_store: &Arc<AuthorityPerEpochStore>,
117    ) -> IotaResult<BlockStatusReceiver> {
118        // TODO: maybe support multi-transactions and remove this check
119        assert!(transactions.len() == 1);
120        let transaction = &transactions[0];
121        self.tx_sender
122            .send(transaction.clone())
123            .await
124            .map_err(|e| IotaError::Unknown(e.to_string()))?;
125        Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
126            BlockRef::MIN,
127        )))
128    }
129}
130
131pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
132    let (tx, rx) = oneshot::channel();
133    tx.send(status).ok();
134    rx
135}