iota_core/
consensus_validator.rs1use std::sync::Arc;
6
7use consensus_core::{TransactionVerifier, ValidationError};
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg;
10use iota_metrics::monitored_scope;
11use iota_types::{
12 error::IotaError,
13 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use tap::TapFallible;
17use tracing::{info, warn};
18
19use crate::{
20 authority::authority_per_epoch_store::AuthorityPerEpochStore,
21 checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
22};
23
24#[derive(Clone)]
26pub struct IotaTxValidator {
27 epoch_store: Arc<AuthorityPerEpochStore>,
28 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
29 _transaction_manager: Arc<TransactionManager>,
30 metrics: Arc<IotaTxValidatorMetrics>,
31}
32
33impl IotaTxValidator {
34 pub fn new(
35 epoch_store: Arc<AuthorityPerEpochStore>,
36 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
37 transaction_manager: Arc<TransactionManager>,
38 metrics: Arc<IotaTxValidatorMetrics>,
39 ) -> Self {
40 info!(
41 "IotaTxValidator constructed for epoch {}",
42 epoch_store.epoch()
43 );
44 Self {
45 epoch_store,
46 checkpoint_service,
47 _transaction_manager: transaction_manager,
48 metrics,
49 }
50 }
51
52 fn validate_transactions(
53 &self,
54 txs: Vec<ConsensusTransactionKind>,
55 ) -> Result<(), eyre::Report> {
56 let mut cert_batch = Vec::new();
57 let mut ckpt_messages = Vec::new();
58 let mut ckpt_batch = Vec::new();
59 for tx in txs.into_iter() {
60 match tx {
61 ConsensusTransactionKind::UserTransaction(certificate) => {
62 cert_batch.push(*certificate);
63
64 }
72 ConsensusTransactionKind::CheckpointSignature(signature) => {
73 ckpt_messages.push(signature.clone());
74 ckpt_batch.push(signature.summary);
75 }
76 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
77 if bytes.len() > dkg::DKG_MESSAGES_MAX_SIZE {
78 warn!("batch verification error: DKG Message too large");
79 return Err(IotaError::InvalidDkgMessageSize.into());
80 }
81 }
82 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
83 if bytes.len() > dkg::DKG_MESSAGES_MAX_SIZE {
84 warn!("batch verification error: DKG Confirmation too large");
85 return Err(IotaError::InvalidDkgMessageSize.into());
86 }
87 }
88
89 ConsensusTransactionKind::EndOfPublish(_)
90 | ConsensusTransactionKind::NewJWKFetched(_, _, _)
91 | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
92 }
93 }
94
95 let cert_count = cert_batch.len();
97 let ckpt_count = ckpt_batch.len();
98
99 self.epoch_store
100 .signature_verifier
101 .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
102 .tap_err(|e| warn!("batch verification error: {}", e))
103 .wrap_err("Malformed batch (failed to verify)")?;
104
105 for ckpt in ckpt_messages {
108 self.checkpoint_service
109 .notify_checkpoint_signature(&self.epoch_store, &ckpt)?;
110 }
111
112 self.metrics
113 .certificate_signatures_verified
114 .inc_by(cert_count as u64);
115 self.metrics
116 .checkpoint_signatures_verified
117 .inc_by(ckpt_count as u64);
118 Ok(())
119
120 }
130}
131
132fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
133 bcs::from_bytes::<ConsensusTransaction>(tx)
134 .wrap_err("Malformed transaction (failed to deserialize)")
135}
136
137impl TransactionVerifier for IotaTxValidator {
138 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
139 let _scope = monitored_scope("ValidateBatch");
140
141 let txs = batch
142 .iter()
143 .map(|tx| {
144 tx_from_bytes(tx)
145 .map(|tx| tx.kind)
146 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
147 })
148 .collect::<Result<Vec<_>, _>>()?;
149
150 self.validate_transactions(txs)
151 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
152 }
153}
154
155pub struct IotaTxValidatorMetrics {
156 certificate_signatures_verified: IntCounter,
157 checkpoint_signatures_verified: IntCounter,
158}
159
160impl IotaTxValidatorMetrics {
161 pub fn new(registry: &Registry) -> Arc<Self> {
162 Arc::new(Self {
163 certificate_signatures_verified: register_int_counter_with_registry!(
164 "certificate_signatures_verified",
165 "Number of certificates verified in consensus batch verifier",
166 registry
167 )
168 .unwrap(),
169 checkpoint_signatures_verified: register_int_counter_with_registry!(
170 "checkpoint_signatures_verified",
171 "Number of checkpoint verified in consensus batch verifier",
172 registry
173 )
174 .unwrap(),
175 })
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::sync::Arc;
182
183 use consensus_core::TransactionVerifier as _;
184 use iota_macros::sim_test;
185 use iota_types::{
186 crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
187 signature::GenericSignature,
188 };
189
190 use crate::{
191 authority::test_authority_builder::TestAuthorityBuilder,
192 checkpoints::CheckpointServiceNoop,
193 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
194 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
195 };
196
197 #[sim_test]
198 async fn accept_valid_transaction() {
199 let mut objects = test_gas_objects();
202 let shared_object = Object::shared_for_testing();
203 objects.push(shared_object.clone());
204
205 let network_config =
206 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
207 .with_objects(objects.clone())
208 .build();
209
210 let state = TestAuthorityBuilder::new()
211 .with_network_config(&network_config, 0)
212 .build()
213 .await;
214 let name1 = state.name;
215 let certificates = test_certificates(&state, shared_object).await;
216
217 let first_transaction = certificates[0].clone();
218 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
219 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
220 )
221 .unwrap();
222
223 let metrics = IotaTxValidatorMetrics::new(&Default::default());
224 let validator = IotaTxValidator::new(
225 state.epoch_store_for_testing().clone(),
226 Arc::new(CheckpointServiceNoop {}),
227 state.transaction_manager().clone(),
228 metrics,
229 );
230 let res = validator.verify_batch(&[&first_transaction_bytes]);
231 assert!(res.is_ok(), "{res:?}");
232
233 let transaction_bytes: Vec<_> = certificates
234 .clone()
235 .into_iter()
236 .map(|cert| {
237 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
238 })
239 .collect();
240
241 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
242 let res_batch = validator.verify_batch(&batch);
243 assert!(res_batch.is_ok(), "{res_batch:?}");
244
245 let bogus_transaction_bytes: Vec<_> = certificates
246 .into_iter()
247 .map(|mut cert| {
248 cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
250 iota_types::crypto::Signature::Ed25519IotaSignature(
251 Ed25519IotaSignature::default(),
252 ),
253 );
254 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
255 })
256 .collect();
257
258 let batch: Vec<_> = bogus_transaction_bytes
259 .iter()
260 .map(|t| t.as_slice())
261 .collect();
262 let res_batch = validator.verify_batch(&batch);
263 assert!(res_batch.is_err());
264 }
265}