iota_core/
consensus_validator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use consensus_core::{TransactionVerifier, ValidationError};
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg;
10use iota_metrics::monitored_scope;
11use iota_types::{
12    error::IotaError,
13    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use tap::TapFallible;
17use tracing::{info, warn};
18
19use crate::{
20    authority::authority_per_epoch_store::AuthorityPerEpochStore,
21    checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
22};
23
24/// Allows verifying the validity of transactions
25#[derive(Clone)]
26pub struct IotaTxValidator {
27    epoch_store: Arc<AuthorityPerEpochStore>,
28    checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
29    _transaction_manager: Arc<TransactionManager>,
30    metrics: Arc<IotaTxValidatorMetrics>,
31}
32
33impl IotaTxValidator {
34    pub fn new(
35        epoch_store: Arc<AuthorityPerEpochStore>,
36        checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
37        transaction_manager: Arc<TransactionManager>,
38        metrics: Arc<IotaTxValidatorMetrics>,
39    ) -> Self {
40        info!(
41            "IotaTxValidator constructed for epoch {}",
42            epoch_store.epoch()
43        );
44        Self {
45            epoch_store,
46            checkpoint_service,
47            _transaction_manager: transaction_manager,
48            metrics,
49        }
50    }
51
52    fn validate_transactions(
53        &self,
54        txs: Vec<ConsensusTransactionKind>,
55    ) -> Result<(), eyre::Report> {
56        let mut cert_batch = Vec::new();
57        let mut ckpt_messages = Vec::new();
58        let mut ckpt_batch = Vec::new();
59        for tx in txs.into_iter() {
60            match tx {
61                ConsensusTransactionKind::UserTransaction(certificate) => {
62                    cert_batch.push(*certificate);
63
64                    // if !certificate.contains_shared_object() {
65                    //     // new_unchecked safety: we do not use the certs in
66                    // this list until all     // have had
67                    // their signatures verified.
68                    //     owned_tx_certs.
69                    // push(VerifiedCertificate::new_unchecked(*certificate));
70                    // }
71                }
72                ConsensusTransactionKind::CheckpointSignature(signature) => {
73                    ckpt_messages.push(signature.clone());
74                    ckpt_batch.push(signature.summary);
75                }
76                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
77                    if bytes.len() > dkg::DKG_MESSAGES_MAX_SIZE {
78                        warn!("batch verification error: DKG Message too large");
79                        return Err(IotaError::InvalidDkgMessageSize.into());
80                    }
81                }
82                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
83                    if bytes.len() > dkg::DKG_MESSAGES_MAX_SIZE {
84                        warn!("batch verification error: DKG Confirmation too large");
85                        return Err(IotaError::InvalidDkgMessageSize.into());
86                    }
87                }
88
89                ConsensusTransactionKind::EndOfPublish(_)
90                | ConsensusTransactionKind::NewJWKFetched(_, _, _)
91                | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
92            }
93        }
94
95        // verify the certificate signatures as a batch
96        let cert_count = cert_batch.len();
97        let ckpt_count = ckpt_batch.len();
98
99        self.epoch_store
100            .signature_verifier
101            .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
102            .tap_err(|e| warn!("batch verification error: {}", e))
103            .wrap_err("Malformed batch (failed to verify)")?;
104
105        // All checkpoint sigs have been verified, forward them to the checkpoint
106        // service
107        for ckpt in ckpt_messages {
108            self.checkpoint_service
109                .notify_checkpoint_signature(&self.epoch_store, &ckpt)?;
110        }
111
112        self.metrics
113            .certificate_signatures_verified
114            .inc_by(cert_count as u64);
115        self.metrics
116            .checkpoint_signatures_verified
117            .inc_by(ckpt_count as u64);
118        Ok(())
119
120        // todo - we should un-comment line below once we have a way to revert
121        // those transactions at the end of epoch all certificates had
122        // valid signatures, schedule them for execution prior to sequencing
123        // which is unnecessary for owned object transactions.
124        // It is unnecessary to write to pending_certificates table because the
125        // certs will be written via consensus output.
126        // self.transaction_manager
127        //     .enqueue_certificates(owned_tx_certs, &self.epoch_store)
128        //     .wrap_err("Failed to schedule certificates for execution")
129    }
130}
131
132fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
133    bcs::from_bytes::<ConsensusTransaction>(tx)
134        .wrap_err("Malformed transaction (failed to deserialize)")
135}
136
137impl TransactionVerifier for IotaTxValidator {
138    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
139        let _scope = monitored_scope("ValidateBatch");
140
141        let txs = batch
142            .iter()
143            .map(|tx| {
144                tx_from_bytes(tx)
145                    .map(|tx| tx.kind)
146                    .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
147            })
148            .collect::<Result<Vec<_>, _>>()?;
149
150        self.validate_transactions(txs)
151            .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
152    }
153}
154
155pub struct IotaTxValidatorMetrics {
156    certificate_signatures_verified: IntCounter,
157    checkpoint_signatures_verified: IntCounter,
158}
159
160impl IotaTxValidatorMetrics {
161    pub fn new(registry: &Registry) -> Arc<Self> {
162        Arc::new(Self {
163            certificate_signatures_verified: register_int_counter_with_registry!(
164                "certificate_signatures_verified",
165                "Number of certificates verified in consensus batch verifier",
166                registry
167            )
168            .unwrap(),
169            checkpoint_signatures_verified: register_int_counter_with_registry!(
170                "checkpoint_signatures_verified",
171                "Number of checkpoint verified in consensus batch verifier",
172                registry
173            )
174            .unwrap(),
175        })
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::sync::Arc;
182
183    use consensus_core::TransactionVerifier as _;
184    use iota_macros::sim_test;
185    use iota_types::{
186        crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
187        signature::GenericSignature,
188    };
189
190    use crate::{
191        authority::test_authority_builder::TestAuthorityBuilder,
192        checkpoints::CheckpointServiceNoop,
193        consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
194        consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
195    };
196
197    #[sim_test]
198    async fn accept_valid_transaction() {
199        // Initialize an authority with a (owned) gas object and a shared object; then
200        // make a test certificate.
201        let mut objects = test_gas_objects();
202        let shared_object = Object::shared_for_testing();
203        objects.push(shared_object.clone());
204
205        let network_config =
206            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
207                .with_objects(objects.clone())
208                .build();
209
210        let state = TestAuthorityBuilder::new()
211            .with_network_config(&network_config, 0)
212            .build()
213            .await;
214        let name1 = state.name;
215        let certificates = test_certificates(&state, shared_object).await;
216
217        let first_transaction = certificates[0].clone();
218        let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
219            &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
220        )
221        .unwrap();
222
223        let metrics = IotaTxValidatorMetrics::new(&Default::default());
224        let validator = IotaTxValidator::new(
225            state.epoch_store_for_testing().clone(),
226            Arc::new(CheckpointServiceNoop {}),
227            state.transaction_manager().clone(),
228            metrics,
229        );
230        let res = validator.verify_batch(&[&first_transaction_bytes]);
231        assert!(res.is_ok(), "{res:?}");
232
233        let transaction_bytes: Vec<_> = certificates
234            .clone()
235            .into_iter()
236            .map(|cert| {
237                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
238            })
239            .collect();
240
241        let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
242        let res_batch = validator.verify_batch(&batch);
243        assert!(res_batch.is_ok(), "{res_batch:?}");
244
245        let bogus_transaction_bytes: Vec<_> = certificates
246            .into_iter()
247            .map(|mut cert| {
248                // set it to an all-zero user signature
249                cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
250                    iota_types::crypto::Signature::Ed25519IotaSignature(
251                        Ed25519IotaSignature::default(),
252                    ),
253                );
254                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
255            })
256            .collect();
257
258        let batch: Vec<_> = bogus_transaction_bytes
259            .iter()
260            .map(|t| t.as_slice())
261            .collect();
262        let res_batch = validator.verify_batch(&batch);
263        assert!(res_batch.is_err());
264    }
265}