iota_core/
consensus_validator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use consensus_core::{TransactionVerifier, ValidationError};
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg_v1;
10use iota_metrics::monitored_scope;
11use iota_types::{
12    error::IotaError,
13    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use tap::TapFallible;
17use tracing::{info, warn};
18
19use crate::{
20    authority::authority_per_epoch_store::AuthorityPerEpochStore,
21    checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
22};
23
24/// Allows verifying the validity of transactions
25#[derive(Clone)]
26pub struct IotaTxValidator {
27    epoch_store: Arc<AuthorityPerEpochStore>,
28    checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
29    _transaction_manager: Arc<TransactionManager>,
30    metrics: Arc<IotaTxValidatorMetrics>,
31}
32
33impl IotaTxValidator {
34    pub fn new(
35        epoch_store: Arc<AuthorityPerEpochStore>,
36        checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
37        transaction_manager: Arc<TransactionManager>,
38        metrics: Arc<IotaTxValidatorMetrics>,
39    ) -> Self {
40        info!(
41            "IotaTxValidator constructed for epoch {}",
42            epoch_store.epoch()
43        );
44        Self {
45            epoch_store,
46            checkpoint_service,
47            _transaction_manager: transaction_manager,
48            metrics,
49        }
50    }
51
52    fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
53        let mut cert_batch = Vec::new();
54        let mut ckpt_messages = Vec::new();
55        let mut ckpt_batch = Vec::new();
56        for tx in txs.iter() {
57            match tx {
58                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
59                    cert_batch.push(certificate.as_ref());
60                }
61                ConsensusTransactionKind::CheckpointSignature(signature) => {
62                    ckpt_messages.push(signature.as_ref());
63                    ckpt_batch.push(&signature.summary);
64                }
65                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
66                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
67                        warn!("batch verification error: DKG Message too large");
68                        return Err(IotaError::InvalidDkgMessageSize);
69                    }
70                }
71                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
72                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
73                        warn!("batch verification error: DKG Confirmation too large");
74                        return Err(IotaError::InvalidDkgMessageSize);
75                    }
76                }
77
78                ConsensusTransactionKind::EndOfPublish(_)
79                | ConsensusTransactionKind::NewJWKFetched(_, _, _)
80                | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
81            }
82        }
83
84        // verify the certificate signatures as a batch
85        let cert_count = cert_batch.len();
86        let ckpt_count = ckpt_batch.len();
87
88        self.epoch_store
89            .signature_verifier
90            .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
91            .tap_err(|e| warn!("batch verification error: {}", e))?;
92
93        // All checkpoint sigs have been verified, forward them to the checkpoint
94        // service
95        for ckpt in ckpt_messages {
96            self.checkpoint_service
97                .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
98        }
99
100        self.metrics
101            .certificate_signatures_verified
102            .inc_by(cert_count as u64);
103        self.metrics
104            .checkpoint_signatures_verified
105            .inc_by(ckpt_count as u64);
106        Ok(())
107
108        // todo - we should un-comment line below once we have a way to revert
109        // those transactions at the end of epoch all certificates had
110        // valid signatures, schedule them for execution prior to sequencing
111        // which is unnecessary for owned object transactions.
112        // It is unnecessary to write to pending_certificates table because the
113        // certs will be written via consensus output.
114        // self.transaction_manager
115        //     .enqueue_certificates(owned_tx_certs, &self.epoch_store)
116        //     .wrap_err("Failed to schedule certificates for execution")
117    }
118}
119
120fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
121    bcs::from_bytes::<ConsensusTransaction>(tx)
122        .wrap_err("Malformed transaction (failed to deserialize)")
123}
124
125impl TransactionVerifier for IotaTxValidator {
126    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
127        let _scope = monitored_scope("ValidateBatch");
128
129        let txs = batch
130            .iter()
131            .map(|tx| {
132                tx_from_bytes(tx)
133                    .map(|tx| tx.kind)
134                    .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
135            })
136            .collect::<Result<Vec<_>, _>>()?;
137
138        self.validate_transactions(txs)
139            .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
140    }
141}
142
143pub struct IotaTxValidatorMetrics {
144    certificate_signatures_verified: IntCounter,
145    checkpoint_signatures_verified: IntCounter,
146}
147
148impl IotaTxValidatorMetrics {
149    pub fn new(registry: &Registry) -> Arc<Self> {
150        Arc::new(Self {
151            certificate_signatures_verified: register_int_counter_with_registry!(
152                "certificate_signatures_verified",
153                "Number of certificates verified in consensus batch verifier",
154                registry
155            )
156            .unwrap(),
157            checkpoint_signatures_verified: register_int_counter_with_registry!(
158                "checkpoint_signatures_verified",
159                "Number of checkpoint verified in consensus batch verifier",
160                registry
161            )
162            .unwrap(),
163        })
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use std::sync::Arc;
170
171    use consensus_core::TransactionVerifier as _;
172    use iota_macros::sim_test;
173    use iota_types::{
174        crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
175        signature::GenericSignature,
176    };
177
178    use crate::{
179        authority::test_authority_builder::TestAuthorityBuilder,
180        checkpoints::CheckpointServiceNoop,
181        consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
182        consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
183    };
184
185    #[sim_test]
186    async fn accept_valid_transaction() {
187        // Initialize an authority with a (owned) gas object and a shared object; then
188        // make a test certificate.
189        let mut objects = test_gas_objects();
190        let shared_object = Object::shared_for_testing();
191        objects.push(shared_object.clone());
192
193        let network_config =
194            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
195                .with_objects(objects.clone())
196                .build();
197
198        let state = TestAuthorityBuilder::new()
199            .with_network_config(&network_config, 0)
200            .build()
201            .await;
202        let name1 = state.name;
203        let certificates = test_certificates(&state, shared_object).await;
204
205        let first_transaction = certificates[0].clone();
206        let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
207            &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
208        )
209        .unwrap();
210
211        let metrics = IotaTxValidatorMetrics::new(&Default::default());
212        let validator = IotaTxValidator::new(
213            state.epoch_store_for_testing().clone(),
214            Arc::new(CheckpointServiceNoop {}),
215            state.transaction_manager().clone(),
216            metrics,
217        );
218        let res = validator.verify_batch(&[&first_transaction_bytes]);
219        assert!(res.is_ok(), "{res:?}");
220
221        let transaction_bytes: Vec<_> = certificates
222            .clone()
223            .into_iter()
224            .map(|cert| {
225                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
226            })
227            .collect();
228
229        let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
230        let res_batch = validator.verify_batch(&batch);
231        assert!(res_batch.is_ok(), "{res_batch:?}");
232
233        let bogus_transaction_bytes: Vec<_> = certificates
234            .into_iter()
235            .map(|mut cert| {
236                // set it to an all-zero user signature
237                cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
238                    iota_types::crypto::Signature::Ed25519IotaSignature(
239                        Ed25519IotaSignature::default(),
240                    ),
241                );
242                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
243            })
244            .collect();
245
246        let batch: Vec<_> = bogus_transaction_bytes
247            .iter()
248            .map(|t| t.as_slice())
249            .collect();
250        let res_batch = validator.verify_batch(&batch);
251        assert!(res_batch.is_err());
252    }
253}