iota_core/
consensus_validator.rs1use std::sync::Arc;
6
7use consensus_core;
8use eyre::WrapErr;
9use fastcrypto_tbls::dkg_v1;
10use iota_metrics::monitored_scope;
11use iota_types::{
12 error::IotaError,
13 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
14};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use starfish_core;
17use tap::TapFallible;
18use tracing::{info, warn};
19
20use crate::{
21 authority::authority_per_epoch_store::AuthorityPerEpochStore,
22 checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
23};
24
25#[derive(Clone)]
27pub struct IotaTxValidator {
28 epoch_store: Arc<AuthorityPerEpochStore>,
29 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
30 _transaction_manager: Arc<TransactionManager>,
31 metrics: Arc<IotaTxValidatorMetrics>,
32}
33
34impl IotaTxValidator {
35 pub fn new(
36 epoch_store: Arc<AuthorityPerEpochStore>,
37 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
38 transaction_manager: Arc<TransactionManager>,
39 metrics: Arc<IotaTxValidatorMetrics>,
40 ) -> Self {
41 info!(
42 "IotaTxValidator constructed for epoch {}",
43 epoch_store.epoch()
44 );
45 Self {
46 epoch_store,
47 checkpoint_service,
48 _transaction_manager: transaction_manager,
49 metrics,
50 }
51 }
52
53 fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
54 let mut cert_batch = Vec::new();
55 let mut ckpt_messages = Vec::new();
56 let mut ckpt_batch = Vec::new();
57 for tx in txs.iter() {
58 match tx {
59 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
60 cert_batch.push(certificate.as_ref());
61 }
62 ConsensusTransactionKind::CheckpointSignature(signature) => {
63 ckpt_messages.push(signature.as_ref());
64 ckpt_batch.push(&signature.summary);
65 }
66 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
67 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
68 warn!("batch verification error: DKG Message too large");
69 return Err(IotaError::InvalidDkgMessageSize);
70 }
71 }
72 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
73 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
74 warn!("batch verification error: DKG Confirmation too large");
75 return Err(IotaError::InvalidDkgMessageSize);
76 }
77 }
78
79 ConsensusTransactionKind::EndOfPublish(_)
80 | ConsensusTransactionKind::NewJWKFetched(_, _, _)
81 | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
82 }
83 }
84
85 let cert_count = cert_batch.len();
87 let ckpt_count = ckpt_batch.len();
88
89 self.epoch_store
90 .signature_verifier
91 .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
92 .tap_err(|e| warn!("batch verification error: {}", e))?;
93
94 for ckpt in ckpt_messages {
97 self.checkpoint_service
98 .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
99 }
100
101 self.metrics
102 .certificate_signatures_verified
103 .inc_by(cert_count as u64);
104 self.metrics
105 .checkpoint_signatures_verified
106 .inc_by(ckpt_count as u64);
107 Ok(())
108
109 }
119}
120
121fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
122 bcs::from_bytes::<ConsensusTransaction>(tx)
123 .wrap_err("Malformed transaction (failed to deserialize)")
124}
125
126macro_rules! impl_tx_verifier_for {
127 (
128 type = $impl_ty:path,
130 trait = $trait_path:path,
132 error = $err_path:path,
134 ) => {
135 impl $trait_path for $impl_ty {
136 fn verify_batch(&self, batch: &[&[u8]]) -> core::result::Result<(), $err_path> {
137 let _scope = monitored_scope("ValidateBatch");
138
139 let txs = batch
140 .iter()
141 .map(|tx| {
142 tx_from_bytes(tx)
143 .map(|tx| tx.kind)
144 .map_err(|e| <$err_path>::InvalidTransaction(e.to_string()))
145 })
146 .collect::<core::result::Result<Vec<_>, _>>()?;
147
148 self.validate_transactions(txs)
149 .map_err(|e| <$err_path>::InvalidTransaction(e.to_string()))
150 }
151 }
152 };
153}
154impl_tx_verifier_for!(
156 type = IotaTxValidator,
157 trait = consensus_core::TransactionVerifier,
158 error = consensus_core::ValidationError,
159);
160impl_tx_verifier_for!(
161 type = IotaTxValidator,
162 trait = starfish_core::TransactionVerifier,
163 error = starfish_core::ValidationError,
164);
165
166pub struct IotaTxValidatorMetrics {
167 certificate_signatures_verified: IntCounter,
168 checkpoint_signatures_verified: IntCounter,
169}
170
171impl IotaTxValidatorMetrics {
172 pub fn new(registry: &Registry) -> Arc<Self> {
173 Arc::new(Self {
174 certificate_signatures_verified: register_int_counter_with_registry!(
175 "certificate_signatures_verified",
176 "Number of certificates verified in consensus batch verifier",
177 registry
178 )
179 .unwrap(),
180 checkpoint_signatures_verified: register_int_counter_with_registry!(
181 "checkpoint_signatures_verified",
182 "Number of checkpoint verified in consensus batch verifier",
183 registry
184 )
185 .unwrap(),
186 })
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::sync::Arc;
193
194 use consensus_core::TransactionVerifier as _;
195 use iota_macros::sim_test;
196 use iota_types::{
197 crypto::Ed25519IotaSignature, messages_consensus::ConsensusTransaction, object::Object,
198 signature::GenericSignature,
199 };
200
201 use crate::{
202 authority::test_authority_builder::TestAuthorityBuilder,
203 checkpoints::CheckpointServiceNoop,
204 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
205 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
206 };
207
208 #[sim_test]
209 async fn accept_valid_transaction() {
210 let mut objects = test_gas_objects();
213 let shared_object = Object::shared_for_testing();
214 objects.push(shared_object.clone());
215
216 let network_config =
217 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
218 .with_objects(objects.clone())
219 .build();
220
221 let state = TestAuthorityBuilder::new()
222 .with_network_config(&network_config, 0)
223 .build()
224 .await;
225 let name1 = state.name;
226 let certificates = test_certificates(&state, shared_object).await;
227
228 let first_transaction = certificates[0].clone();
229 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
230 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
231 )
232 .unwrap();
233
234 let metrics = IotaTxValidatorMetrics::new(&Default::default());
235 let validator = IotaTxValidator::new(
236 state.epoch_store_for_testing().clone(),
237 Arc::new(CheckpointServiceNoop {}),
238 state.transaction_manager().clone(),
239 metrics,
240 );
241 let res = validator.verify_batch(&[&first_transaction_bytes]);
242 assert!(res.is_ok(), "{res:?}");
243
244 let transaction_bytes: Vec<_> = certificates
245 .clone()
246 .into_iter()
247 .map(|cert| {
248 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
249 })
250 .collect();
251
252 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
253 let res_batch = validator.verify_batch(&batch);
254 assert!(res_batch.is_ok(), "{res_batch:?}");
255
256 let bogus_transaction_bytes: Vec<_> = certificates
257 .into_iter()
258 .map(|mut cert| {
259 cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
261 iota_types::crypto::Signature::Ed25519IotaSignature(
262 Ed25519IotaSignature::default(),
263 ),
264 );
265 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
266 })
267 .collect();
268
269 let batch: Vec<_> = bogus_transaction_bytes
270 .iter()
271 .map(|t| t.as_slice())
272 .collect();
273 let res_batch = validator.verify_batch(&batch);
274 assert!(res_batch.is_err());
275 }
276}