iota_core/
consensus_validator.rs1use std::sync::Arc;
6
7use consensus_core::{TransactionVerifier, ValidationError};
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg_v1;
10use iota_metrics::monitored_scope;
11use iota_types::{
12 error::IotaError,
13 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use tap::TapFallible;
17use tracing::{info, warn};
18
19use crate::{
20 authority::authority_per_epoch_store::AuthorityPerEpochStore,
21 checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
22};
23
24#[derive(Clone)]
26pub struct IotaTxValidator {
27 epoch_store: Arc<AuthorityPerEpochStore>,
28 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
29 _transaction_manager: Arc<TransactionManager>,
30 metrics: Arc<IotaTxValidatorMetrics>,
31}
32
33impl IotaTxValidator {
34 pub fn new(
35 epoch_store: Arc<AuthorityPerEpochStore>,
36 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
37 transaction_manager: Arc<TransactionManager>,
38 metrics: Arc<IotaTxValidatorMetrics>,
39 ) -> Self {
40 info!(
41 "IotaTxValidator constructed for epoch {}",
42 epoch_store.epoch()
43 );
44 Self {
45 epoch_store,
46 checkpoint_service,
47 _transaction_manager: transaction_manager,
48 metrics,
49 }
50 }
51
52 fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
53 let mut cert_batch = Vec::new();
54 let mut ckpt_messages = Vec::new();
55 let mut ckpt_batch = Vec::new();
56 for tx in txs.iter() {
57 match tx {
58 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
59 cert_batch.push(certificate.as_ref());
60 }
61 ConsensusTransactionKind::CheckpointSignature(signature) => {
62 ckpt_messages.push(signature.as_ref());
63 ckpt_batch.push(&signature.summary);
64 }
65 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
66 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
67 warn!("batch verification error: DKG Message too large");
68 return Err(IotaError::InvalidDkgMessageSize);
69 }
70 }
71 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
72 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
73 warn!("batch verification error: DKG Confirmation too large");
74 return Err(IotaError::InvalidDkgMessageSize);
75 }
76 }
77
78 ConsensusTransactionKind::EndOfPublish(_)
79 | ConsensusTransactionKind::NewJWKFetched(_, _, _)
80 | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
81 }
82 }
83
84 let cert_count = cert_batch.len();
86 let ckpt_count = ckpt_batch.len();
87
88 self.epoch_store
89 .signature_verifier
90 .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
91 .tap_err(|e| warn!("batch verification error: {}", e))?;
92
93 for ckpt in ckpt_messages {
96 self.checkpoint_service
97 .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
98 }
99
100 self.metrics
101 .certificate_signatures_verified
102 .inc_by(cert_count as u64);
103 self.metrics
104 .checkpoint_signatures_verified
105 .inc_by(ckpt_count as u64);
106 Ok(())
107
108 }
118}
119
120fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
121 bcs::from_bytes::<ConsensusTransaction>(tx)
122 .wrap_err("Malformed transaction (failed to deserialize)")
123}
124
125impl TransactionVerifier for IotaTxValidator {
126 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
127 let _scope = monitored_scope("ValidateBatch");
128
129 let txs = batch
130 .iter()
131 .map(|tx| {
132 tx_from_bytes(tx)
133 .map(|tx| tx.kind)
134 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
135 })
136 .collect::<Result<Vec<_>, _>>()?;
137
138 self.validate_transactions(txs)
139 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
140 }
141}
142
143pub struct IotaTxValidatorMetrics {
144 certificate_signatures_verified: IntCounter,
145 checkpoint_signatures_verified: IntCounter,
146}
147
148impl IotaTxValidatorMetrics {
149 pub fn new(registry: &Registry) -> Arc<Self> {
150 Arc::new(Self {
151 certificate_signatures_verified: register_int_counter_with_registry!(
152 "certificate_signatures_verified",
153 "Number of certificates verified in consensus batch verifier",
154 registry
155 )
156 .unwrap(),
157 checkpoint_signatures_verified: register_int_counter_with_registry!(
158 "checkpoint_signatures_verified",
159 "Number of checkpoint verified in consensus batch verifier",
160 registry
161 )
162 .unwrap(),
163 })
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::sync::Arc;
170
171 use consensus_core::TransactionVerifier as _;
172 use iota_macros::sim_test;
173 use iota_types::{
174 crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
175 signature::GenericSignature,
176 };
177
178 use crate::{
179 authority::test_authority_builder::TestAuthorityBuilder,
180 checkpoints::CheckpointServiceNoop,
181 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
182 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
183 };
184
185 #[sim_test]
186 async fn accept_valid_transaction() {
187 let mut objects = test_gas_objects();
190 let shared_object = Object::shared_for_testing();
191 objects.push(shared_object.clone());
192
193 let network_config =
194 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
195 .with_objects(objects.clone())
196 .build();
197
198 let state = TestAuthorityBuilder::new()
199 .with_network_config(&network_config, 0)
200 .build()
201 .await;
202 let name1 = state.name;
203 let certificates = test_certificates(&state, shared_object).await;
204
205 let first_transaction = certificates[0].clone();
206 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
207 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
208 )
209 .unwrap();
210
211 let metrics = IotaTxValidatorMetrics::new(&Default::default());
212 let validator = IotaTxValidator::new(
213 state.epoch_store_for_testing().clone(),
214 Arc::new(CheckpointServiceNoop {}),
215 state.transaction_manager().clone(),
216 metrics,
217 );
218 let res = validator.verify_batch(&[&first_transaction_bytes]);
219 assert!(res.is_ok(), "{res:?}");
220
221 let transaction_bytes: Vec<_> = certificates
222 .clone()
223 .into_iter()
224 .map(|cert| {
225 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
226 })
227 .collect();
228
229 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
230 let res_batch = validator.verify_batch(&batch);
231 assert!(res_batch.is_ok(), "{res_batch:?}");
232
233 let bogus_transaction_bytes: Vec<_> = certificates
234 .into_iter()
235 .map(|mut cert| {
236 cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
238 iota_types::crypto::Signature::Ed25519IotaSignature(
239 Ed25519IotaSignature::default(),
240 ),
241 );
242 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
243 })
244 .collect();
245
246 let batch: Vec<_> = bogus_transaction_bytes
247 .iter()
248 .map(|t| t.as_slice())
249 .collect();
250 let res_batch = validator.verify_batch(&batch);
251 assert!(res_batch.is_err());
252 }
253}