iota_core/
consensus_validator.rs1use std::sync::Arc;
6
7use consensus_core;
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg_v1;
10use iota_metrics::monitored_scope;
11use iota_types::{
12    error::IotaError,
13    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use starfish_core;
17use tap::TapFallible;
18use tracing::{info, warn};
19
20use crate::{
21    authority::authority_per_epoch_store::AuthorityPerEpochStore,
22    checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
23};
24
25#[derive(Clone)]
27pub struct IotaTxValidator {
28    epoch_store: Arc<AuthorityPerEpochStore>,
29    checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
30    _transaction_manager: Arc<TransactionManager>,
31    metrics: Arc<IotaTxValidatorMetrics>,
32}
33
34impl IotaTxValidator {
35    pub fn new(
36        epoch_store: Arc<AuthorityPerEpochStore>,
37        checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
38        transaction_manager: Arc<TransactionManager>,
39        metrics: Arc<IotaTxValidatorMetrics>,
40    ) -> Self {
41        info!(
42            "IotaTxValidator constructed for epoch {}",
43            epoch_store.epoch()
44        );
45        Self {
46            epoch_store,
47            checkpoint_service,
48            _transaction_manager: transaction_manager,
49            metrics,
50        }
51    }
52
53    fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
54        let mut cert_batch = Vec::new();
55        let mut ckpt_messages = Vec::new();
56        let mut ckpt_batch = Vec::new();
57        let mut authority_cap_batch = Vec::new();
58
59        for tx in txs.iter() {
60            match tx {
61                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
62                    cert_batch.push(certificate.as_ref());
63                }
64                ConsensusTransactionKind::CheckpointSignature(signature) => {
65                    ckpt_messages.push(signature.as_ref());
66                    ckpt_batch.push(&signature.summary);
67                }
68                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
69                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
70                        warn!("batch verification error: DKG Message too large");
71                        return Err(IotaError::InvalidDkgMessageSize);
72                    }
73                }
74                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
75                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
76                        warn!("batch verification error: DKG Confirmation too large");
77                        return Err(IotaError::InvalidDkgMessageSize);
78                    }
79                }
80                ConsensusTransactionKind::SignedCapabilityNotificationV1(signed_cap) => {
81                    authority_cap_batch.push(signed_cap);
82                }
83
84                ConsensusTransactionKind::EndOfPublish(_)
85                | ConsensusTransactionKind::NewJWKFetched(_, _, _)
86                | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
87            }
88        }
89
90        let cert_count = cert_batch.len();
92        let ckpt_count = ckpt_batch.len();
93        let authority_cap_count = authority_cap_batch.len();
94
95        self.epoch_store
96            .signature_verifier
97            .verify_certs_and_checkpoints(cert_batch, ckpt_batch, authority_cap_batch)
98            .tap_err(|e| warn!("batch verification error: {}", e))?;
99
100        for ckpt in ckpt_messages {
103            self.checkpoint_service
104                .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
105        }
106
107        self.metrics
108            .certificate_signatures_verified
109            .inc_by(cert_count as u64);
110        self.metrics
111            .checkpoint_signatures_verified
112            .inc_by(ckpt_count as u64);
113        self.metrics
114            .authority_capabilities_verified
115            .inc_by(authority_cap_count as u64);
116        Ok(())
117
118        }
128}
129
130fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
131    bcs::from_bytes::<ConsensusTransaction>(tx)
132        .wrap_err("Malformed transaction (failed to deserialize)")
133}
134
135macro_rules! impl_tx_verifier_for {
136    (
137        type = $impl_ty:path,
139        trait = $trait_path:path,
141        error = $err_path:path,
143    ) => {
144        impl $trait_path for $impl_ty {
145            fn verify_batch(&self, batch: &[&[u8]]) -> core::result::Result<(), $err_path> {
146                let _scope = monitored_scope("ValidateBatch");
147
148                let txs = batch
149                    .iter()
150                    .map(|tx| {
151                        tx_from_bytes(tx)
152                            .map(|tx| tx.kind)
153                            .map_err(|e| <$err_path>::InvalidTransaction(e.to_string()))
154                    })
155                    .collect::<core::result::Result<Vec<_>, _>>()?;
156
157                self.validate_transactions(txs)
158                    .map_err(|e| <$err_path>::InvalidTransaction(e.to_string()))
159            }
160        }
161    };
162}
163impl_tx_verifier_for!(
165    type = IotaTxValidator,
166    trait = consensus_core::TransactionVerifier,
167    error = consensus_core::ValidationError,
168);
169impl_tx_verifier_for!(
170    type = IotaTxValidator,
171    trait = starfish_core::TransactionVerifier,
172    error = starfish_core::ValidationError,
173);
174
175pub struct IotaTxValidatorMetrics {
176    certificate_signatures_verified: IntCounter,
177    checkpoint_signatures_verified: IntCounter,
178    authority_capabilities_verified: IntCounter,
179}
180
181impl IotaTxValidatorMetrics {
182    pub fn new(registry: &Registry) -> Arc<Self> {
183        Arc::new(Self {
184            certificate_signatures_verified: register_int_counter_with_registry!(
185                "certificate_signatures_verified",
186                "Number of certificates verified in consensus batch verifier",
187                registry
188            )
189            .unwrap(),
190            checkpoint_signatures_verified: register_int_counter_with_registry!(
191                "checkpoint_signatures_verified",
192                "Number of checkpoint verified in consensus batch verifier",
193                registry
194            )
195            .unwrap(),
196            authority_capabilities_verified: register_int_counter_with_registry!(
197                "authority_capabilities_verified",
198                "Number of signed authority capabilities verified in consensus batch verifier",
199                registry
200            )
201            .unwrap(),
202        })
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::sync::Arc;
209
210    use consensus_core::TransactionVerifier as _;
211    use iota_macros::sim_test;
212    use iota_types::{
213        crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
214        signature::GenericSignature,
215    };
216
217    use crate::{
218        authority::test_authority_builder::TestAuthorityBuilder,
219        checkpoints::CheckpointServiceNoop,
220        consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
221        consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
222    };
223
224    #[sim_test]
225    async fn accept_valid_transaction() {
226        let mut objects = test_gas_objects();
229        let shared_object = Object::shared_for_testing();
230        objects.push(shared_object.clone());
231
232        let network_config =
233            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
234                .with_objects(objects.clone())
235                .build();
236
237        let state = TestAuthorityBuilder::new()
238            .with_network_config(&network_config, 0)
239            .build()
240            .await;
241        let name1 = state.name;
242        let certificates = test_certificates(&state, shared_object).await;
243
244        let first_transaction = certificates[0].clone();
245        let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
246            &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
247        )
248        .unwrap();
249
250        let metrics = IotaTxValidatorMetrics::new(&Default::default());
251        let validator = IotaTxValidator::new(
252            state.epoch_store_for_testing().clone(),
253            Arc::new(CheckpointServiceNoop {}),
254            state.transaction_manager().clone(),
255            metrics,
256        );
257        let res = validator.verify_batch(&[&first_transaction_bytes]);
258        assert!(res.is_ok(), "{res:?}");
259
260        let transaction_bytes: Vec<_> = certificates
261            .clone()
262            .into_iter()
263            .map(|cert| {
264                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
265            })
266            .collect();
267
268        let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
269        let res_batch = validator.verify_batch(&batch);
270        assert!(res_batch.is_ok(), "{res_batch:?}");
271
272        let bogus_transaction_bytes: Vec<_> = certificates
273            .into_iter()
274            .map(|mut cert| {
275                cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
277                    iota_types::crypto::Signature::Ed25519IotaSignature(
278                        Ed25519IotaSignature::default(),
279                    ),
280                );
281                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
282            })
283            .collect();
284
285        let batch: Vec<_> = bogus_transaction_bytes
286            .iter()
287            .map(|t| t.as_slice())
288            .collect();
289        let res_batch = validator.verify_batch(&batch);
290        assert!(res_batch.is_err());
291    }
292}