iota_core/
consensus_validator.rs1use std::sync::Arc;
6
7use consensus_core;
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg_v1;
10use iota_metrics::monitored_scope;
11use iota_types::{
12 error::IotaError,
13 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use starfish_core;
17use tap::TapFallible;
18use tracing::{info, warn};
19
20use crate::{
21 authority::authority_per_epoch_store::AuthorityPerEpochStore,
22 checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
23};
24
25#[derive(Clone)]
27pub struct IotaTxValidator {
28 epoch_store: Arc<AuthorityPerEpochStore>,
29 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
30 _transaction_manager: Arc<TransactionManager>,
31 metrics: Arc<IotaTxValidatorMetrics>,
32}
33
34impl IotaTxValidator {
35 pub fn new(
36 epoch_store: Arc<AuthorityPerEpochStore>,
37 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
38 transaction_manager: Arc<TransactionManager>,
39 metrics: Arc<IotaTxValidatorMetrics>,
40 ) -> Self {
41 info!(
42 "IotaTxValidator constructed for epoch {}",
43 epoch_store.epoch()
44 );
45 Self {
46 epoch_store,
47 checkpoint_service,
48 _transaction_manager: transaction_manager,
49 metrics,
50 }
51 }
52
53 fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
54 let mut cert_batch = Vec::new();
55 let mut ckpt_messages = Vec::new();
56 let mut ckpt_batch = Vec::new();
57 let mut authority_cap_batch = Vec::new();
58
59 for tx in txs.iter() {
60 match tx {
61 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
62 cert_batch.push(certificate.as_ref());
63 }
64 ConsensusTransactionKind::CheckpointSignature(signature) => {
65 ckpt_messages.push(signature.as_ref());
66 ckpt_batch.push(&signature.summary);
67 }
68 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
69 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
70 warn!("batch verification error: DKG Message too large");
71 return Err(IotaError::InvalidDkgMessageSize);
72 }
73 }
74 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
75 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
76 warn!("batch verification error: DKG Confirmation too large");
77 return Err(IotaError::InvalidDkgMessageSize);
78 }
79 }
80 ConsensusTransactionKind::SignedCapabilityNotificationV1(signed_cap) => {
81 authority_cap_batch.push(signed_cap);
82 }
83
84 ConsensusTransactionKind::EndOfPublish(_)
85 | ConsensusTransactionKind::NewJWKFetched(_, _, _)
86 | ConsensusTransactionKind::CapabilityNotificationV1(_)
87 | ConsensusTransactionKind::MisbehaviorReport(_, _, _) => {}
88 }
89 }
90
91 let cert_count = cert_batch.len();
93 let ckpt_count = ckpt_batch.len();
94 let authority_cap_count = authority_cap_batch.len();
95
96 self.epoch_store
97 .signature_verifier
98 .verify_certs_and_checkpoints(cert_batch, ckpt_batch, authority_cap_batch)
99 .tap_err(|e| warn!("batch verification error: {}", e))?;
100
101 for ckpt in ckpt_messages {
104 self.checkpoint_service
105 .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
106 }
107
108 self.metrics
109 .certificate_signatures_verified
110 .inc_by(cert_count as u64);
111 self.metrics
112 .checkpoint_signatures_verified
113 .inc_by(ckpt_count as u64);
114 self.metrics
115 .authority_capabilities_verified
116 .inc_by(authority_cap_count as u64);
117 Ok(())
118
119 }
129}
130
131fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
132 bcs::from_bytes::<ConsensusTransaction>(tx)
133 .wrap_err("Malformed transaction (failed to deserialize)")
134}
135
136macro_rules! impl_tx_verifier_for {
137 (
138 type = $impl_ty:path,
140 trait = $trait_path:path,
142 error = $err_path:path,
144 ) => {
145 impl $trait_path for $impl_ty {
146 fn verify_batch(&self, batch: &[&[u8]]) -> core::result::Result<(), $err_path> {
147 let _scope = monitored_scope("ValidateBatch");
148
149 let txs = batch
150 .iter()
151 .map(|tx| {
152 tx_from_bytes(tx)
153 .map(|tx| tx.kind)
154 .map_err(|e| <$err_path>::InvalidTransaction(e.to_string()))
155 })
156 .collect::<core::result::Result<Vec<_>, _>>()?;
157
158 self.validate_transactions(txs)
159 .map_err(|e| <$err_path>::InvalidTransaction(e.to_string()))
160 }
161 }
162 };
163}
164impl_tx_verifier_for!(
166 type = IotaTxValidator,
167 trait = consensus_core::TransactionVerifier,
168 error = consensus_core::ValidationError,
169);
170impl_tx_verifier_for!(
171 type = IotaTxValidator,
172 trait = starfish_core::TransactionVerifier,
173 error = starfish_core::ValidationError,
174);
175
176pub struct IotaTxValidatorMetrics {
177 certificate_signatures_verified: IntCounter,
178 checkpoint_signatures_verified: IntCounter,
179 authority_capabilities_verified: IntCounter,
180}
181
182impl IotaTxValidatorMetrics {
183 pub fn new(registry: &Registry) -> Arc<Self> {
184 Arc::new(Self {
185 certificate_signatures_verified: register_int_counter_with_registry!(
186 "certificate_signatures_verified",
187 "Number of certificates verified in consensus batch verifier",
188 registry
189 )
190 .unwrap(),
191 checkpoint_signatures_verified: register_int_counter_with_registry!(
192 "checkpoint_signatures_verified",
193 "Number of checkpoint verified in consensus batch verifier",
194 registry
195 )
196 .unwrap(),
197 authority_capabilities_verified: register_int_counter_with_registry!(
198 "authority_capabilities_verified",
199 "Number of signed authority capabilities verified in consensus batch verifier",
200 registry
201 )
202 .unwrap(),
203 })
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use std::sync::Arc;
210
211 use consensus_core::TransactionVerifier as _;
212 use iota_macros::sim_test;
213 use iota_types::{
214 crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
215 signature::GenericSignature,
216 };
217
218 use crate::{
219 authority::test_authority_builder::TestAuthorityBuilder,
220 checkpoints::CheckpointServiceNoop,
221 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
222 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
223 };
224
225 #[sim_test]
226 async fn accept_valid_transaction() {
227 let mut objects = test_gas_objects();
230 let shared_object = Object::shared_for_testing();
231 objects.push(shared_object.clone());
232
233 let network_config =
234 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
235 .with_objects(objects.clone())
236 .build();
237
238 let state = TestAuthorityBuilder::new()
239 .with_network_config(&network_config, 0)
240 .build()
241 .await;
242 let name1 = state.name;
243 let certificates = test_certificates(&state, shared_object).await;
244
245 let first_transaction = certificates[0].clone();
246 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
247 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
248 )
249 .unwrap();
250
251 let metrics = IotaTxValidatorMetrics::new(&Default::default());
252 let validator = IotaTxValidator::new(
253 state.epoch_store_for_testing().clone(),
254 Arc::new(CheckpointServiceNoop {}),
255 state.transaction_manager().clone(),
256 metrics,
257 );
258 let res = validator.verify_batch(&[&first_transaction_bytes]);
259 assert!(res.is_ok(), "{res:?}");
260
261 let transaction_bytes: Vec<_> = certificates
262 .clone()
263 .into_iter()
264 .map(|cert| {
265 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
266 })
267 .collect();
268
269 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
270 let res_batch = validator.verify_batch(&batch);
271 assert!(res_batch.is_ok(), "{res_batch:?}");
272
273 let bogus_transaction_bytes: Vec<_> = certificates
274 .into_iter()
275 .map(|mut cert| {
276 cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
278 iota_types::crypto::Signature::Ed25519IotaSignature(
279 Ed25519IotaSignature::default(),
280 ),
281 );
282 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
283 })
284 .collect();
285
286 let batch: Vec<_> = bogus_transaction_bytes
287 .iter()
288 .map(|t| t.as_slice())
289 .collect();
290 let res_batch = validator.verify_batch(&batch);
291 assert!(res_batch.is_err());
292 }
293}