1use std::sync::Arc;
6
7use eyre::WrapErr;
8use fastcrypto_tbls::dkg_v1;
9use iota_metrics::monitored_scope;
10use iota_types::{
11 error::IotaError,
12 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
13};
14use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
15use starfish_core;
16use tap::TapFallible;
17use tracing::{info, instrument, warn};
18
19use crate::{
20 authority::authority_per_epoch_store::AuthorityPerEpochStore,
21 checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
22};
23
24#[derive(Clone)]
26pub struct IotaTxValidator {
27 epoch_store: Arc<AuthorityPerEpochStore>,
28 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
29 _transaction_manager: Arc<TransactionManager>,
30 metrics: Arc<IotaTxValidatorMetrics>,
31}
32
33impl IotaTxValidator {
34 pub fn new(
35 epoch_store: Arc<AuthorityPerEpochStore>,
36 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
37 transaction_manager: Arc<TransactionManager>,
38 metrics: Arc<IotaTxValidatorMetrics>,
39 ) -> Self {
40 info!(
41 "IotaTxValidator constructed for epoch {}",
42 epoch_store.epoch()
43 );
44 Self {
45 epoch_store,
46 checkpoint_service,
47 _transaction_manager: transaction_manager,
48 metrics,
49 }
50 }
51
52 #[instrument(level = "trace", skip_all)]
53 fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
54 let mut cert_batch = Vec::new();
55 let mut ckpt_messages = Vec::new();
56 let mut ckpt_batch = Vec::new();
57 let mut authority_cap_batch = Vec::new();
58
59 for tx in txs.iter() {
60 match tx {
61 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
62 cert_batch.push(certificate.as_ref());
63 }
64 ConsensusTransactionKind::CheckpointSignature(signature) => {
65 ckpt_messages.push(signature.as_ref());
66 ckpt_batch.push(&signature.summary);
67 }
68 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
69 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
70 warn!("batch verification error: DKG Message too large");
71 return Err(IotaError::InvalidDkgMessageSize);
72 }
73 }
74 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
75 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
76 warn!("batch verification error: DKG Confirmation too large");
77 return Err(IotaError::InvalidDkgMessageSize);
78 }
79 }
80 ConsensusTransactionKind::SignedCapabilityNotificationV1(signed_cap) => {
81 authority_cap_batch.push(signed_cap);
82 }
83
84 ConsensusTransactionKind::MisbehaviorReport(_) => {
85 if !self
86 .epoch_store
87 .protocol_config()
88 .calculate_validator_scores()
89 {
90 return Err(IotaError::UnsupportedFeature {
91 error: "MisbehaviorReport not supported at current protocol version"
92 .into(),
93 });
94 }
95 }
96 #[allow(deprecated)]
97 ConsensusTransactionKind::NewJWKFetchedDeprecated => {
98 return Err(IotaError::UnsupportedFeature {
99 error: "NewJWKFetched (zkLogin) is deprecated and not supported".into(),
100 });
101 }
102 ConsensusTransactionKind::EndOfPublish(_)
103 | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
104 }
105 }
106
107 let cert_count = cert_batch.len();
109 let ckpt_count = ckpt_batch.len();
110 let authority_cap_count = authority_cap_batch.len();
111
112 self.epoch_store
113 .signature_verifier
114 .verify_certs_and_checkpoints(cert_batch, ckpt_batch, authority_cap_batch)
115 .tap_err(|e| warn!("batch verification error: {}", e))?;
116
117 for ckpt in ckpt_messages {
120 self.checkpoint_service
121 .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
122 }
123
124 self.metrics
125 .certificate_signatures_verified
126 .inc_by(cert_count as u64);
127 self.metrics
128 .checkpoint_signatures_verified
129 .inc_by(ckpt_count as u64);
130 self.metrics
131 .authority_capabilities_verified
132 .inc_by(authority_cap_count as u64);
133 Ok(())
134
135 }
145}
146
147fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
148 bcs::from_bytes::<ConsensusTransaction>(tx)
149 .wrap_err("Malformed transaction (failed to deserialize)")
150}
151
152impl starfish_core::TransactionVerifier for IotaTxValidator {
153 #[instrument(level = "trace", skip_all)]
154 fn verify_batch(
155 &self,
156 batch: &[&[u8]],
157 ) -> core::result::Result<(), starfish_core::ValidationError> {
158 let _scope = monitored_scope("ValidateBatch");
159
160 let txs = batch
161 .iter()
162 .map(|tx| {
163 tx_from_bytes(tx)
164 .map(|tx| tx.kind)
165 .map_err(|e| starfish_core::ValidationError::InvalidTransaction(e.to_string()))
166 })
167 .collect::<core::result::Result<Vec<_>, _>>()?;
168
169 self.validate_transactions(txs)
170 .map_err(|e| starfish_core::ValidationError::InvalidTransaction(e.to_string()))
171 }
172}
173
174pub struct IotaTxValidatorMetrics {
175 certificate_signatures_verified: IntCounter,
176 checkpoint_signatures_verified: IntCounter,
177 authority_capabilities_verified: IntCounter,
178}
179
180impl IotaTxValidatorMetrics {
181 pub fn new(registry: &Registry) -> Arc<Self> {
182 Arc::new(Self {
183 certificate_signatures_verified: register_int_counter_with_registry!(
184 "certificate_signatures_verified",
185 "Number of certificates verified in consensus batch verifier",
186 registry
187 )
188 .unwrap(),
189 checkpoint_signatures_verified: register_int_counter_with_registry!(
190 "checkpoint_signatures_verified",
191 "Number of checkpoint verified in consensus batch verifier",
192 registry
193 )
194 .unwrap(),
195 authority_capabilities_verified: register_int_counter_with_registry!(
196 "authority_capabilities_verified",
197 "Number of signed authority capabilities verified in consensus batch verifier",
198 registry
199 )
200 .unwrap(),
201 })
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::sync::Arc;
208
209 use iota_macros::sim_test;
210 use iota_protocol_config::Chain;
211 use iota_types::{
212 crypto::Ed25519IotaSignature,
213 error::IotaError,
214 messages_consensus::{
215 ConsensusTransaction, ConsensusTransactionKind, MisbehaviorObservationsV1,
216 VersionedMisbehaviorReport,
217 },
218 object::Object,
219 signature::GenericSignature,
220 };
221 use starfish_core::TransactionVerifier as _;
222
223 use crate::{
224 authority::test_authority_builder::TestAuthorityBuilder,
225 checkpoints::CheckpointServiceNoop,
226 consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
227 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
228 };
229
230 #[sim_test]
231 async fn accept_valid_transaction() {
232 let mut objects = test_gas_objects();
235 let shared_object = Object::shared_for_testing();
236 objects.push(shared_object.clone());
237
238 let network_config =
239 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
240 .with_objects(objects.clone())
241 .build();
242
243 let state = TestAuthorityBuilder::new()
244 .with_network_config(&network_config, 0)
245 .build()
246 .await;
247 let name1 = state.name;
248 let certificates = test_certificates(&state, shared_object).await;
249
250 let first_transaction = certificates[0].clone();
251 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
252 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
253 )
254 .unwrap();
255
256 let metrics = IotaTxValidatorMetrics::new(&Default::default());
257 let validator = IotaTxValidator::new(
258 state.epoch_store_for_testing().clone(),
259 Arc::new(CheckpointServiceNoop {}),
260 state.transaction_manager().clone(),
261 metrics,
262 );
263 let res = validator.verify_batch(&[&first_transaction_bytes]);
264 assert!(res.is_ok(), "{res:?}");
265
266 let transaction_bytes: Vec<_> = certificates
267 .clone()
268 .into_iter()
269 .map(|cert| {
270 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
271 })
272 .collect();
273
274 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
275 let res_batch = validator.verify_batch(&batch);
276 assert!(res_batch.is_ok(), "{res_batch:?}");
277
278 let bogus_transaction_bytes: Vec<_> = certificates
279 .into_iter()
280 .map(|mut cert| {
281 cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
283 iota_types::crypto::Signature::Ed25519IotaSignature(
284 Ed25519IotaSignature::default(),
285 ),
286 );
287 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
288 })
289 .collect();
290
291 let batch: Vec<_> = bogus_transaction_bytes
292 .iter()
293 .map(|t| t.as_slice())
294 .collect();
295 let res_batch = validator.verify_batch(&batch);
296 assert!(res_batch.is_err());
297 }
298
299 #[sim_test]
306 async fn validate_transactions_feature_gating() {
307 use iota_protocol_config::ProtocolConfig;
308 use iota_types::crypto::AuthorityPublicKeyBytes;
309
310 let network_config =
311 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
312
313 let state = TestAuthorityBuilder::new()
314 .with_network_config(&network_config, 0)
315 .with_chain_override(Chain::Mainnet)
316 .build()
317 .await;
318
319 let metrics = IotaTxValidatorMetrics::new(&Default::default());
320 let validator = IotaTxValidator::new(
321 state.epoch_store_for_testing().clone(),
322 Arc::new(CheckpointServiceNoop {}),
323 state.transaction_manager().clone(),
324 metrics,
325 );
326
327 let protocol_config = validator.epoch_store.protocol_config();
328 let authority = AuthorityPublicKeyBytes::default();
329
330 #[allow(deprecated)]
334 fn is_feature_gated(
335 kind: &ConsensusTransactionKind,
336 config: &ProtocolConfig,
337 ) -> Option<bool> {
338 match kind {
339 ConsensusTransactionKind::CertifiedTransaction(_)
341 | ConsensusTransactionKind::CheckpointSignature(_)
342 | ConsensusTransactionKind::EndOfPublish(_)
343 | ConsensusTransactionKind::CapabilityNotificationV1(_)
344 | ConsensusTransactionKind::SignedCapabilityNotificationV1(_)
345 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
346 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => None,
347
348 ConsensusTransactionKind::MisbehaviorReport(_) => {
350 Some(config.calculate_validator_scores())
351 }
352
353 ConsensusTransactionKind::NewJWKFetchedDeprecated => Some(false),
357 }
358 }
359
360 #[allow(deprecated)]
366 let testable_variants: Vec<(&str, ConsensusTransactionKind)> = vec![
367 (
368 "EndOfPublish",
369 ConsensusTransactionKind::EndOfPublish(authority),
370 ),
371 (
372 "NewJWKFetchedDeprecated",
373 ConsensusTransactionKind::NewJWKFetchedDeprecated,
374 ),
375 (
376 "CapabilityNotificationV1",
377 ConsensusTransactionKind::CapabilityNotificationV1(
378 iota_types::messages_consensus::AuthorityCapabilitiesV1::new(
379 authority,
380 Chain::Mainnet,
381 iota_types::supported_protocol_versions::SupportedProtocolVersions::SYSTEM_DEFAULT,
382 vec![],
383 ),
384 ),
385 ),
386 (
387 "RandomnessDkgMessage",
388 ConsensusTransactionKind::RandomnessDkgMessage(authority, vec![]),
389 ),
390 (
391 "RandomnessDkgConfirmation",
392 ConsensusTransactionKind::RandomnessDkgConfirmation(authority, vec![]),
393 ),
394 (
395 "MisbehaviorReport",
396 ConsensusTransactionKind::MisbehaviorReport(VersionedMisbehaviorReport::new_v1(
397 authority,
398 0,
399 MisbehaviorObservationsV1 {
400 faulty_blocks_provable: vec![],
401 faulty_blocks_unprovable: vec![],
402 missing_proposals: vec![],
403 equivocations: vec![],
404 },
405 )),
406 ),
407 ];
408
409 for (name, kind) in testable_variants {
410 let gated = is_feature_gated(&kind, protocol_config);
411 let result = validator.validate_transactions(vec![kind]);
412
413 match gated {
414 Some(false) => {
415 assert!(
417 matches!(&result, Err(IotaError::UnsupportedFeature { .. })),
418 "{name}: feature flag is disabled, expected UnsupportedFeature, \
419 got {result:?}",
420 );
421 }
422 Some(true) | None => {
423 assert!(
426 !matches!(&result, Err(IotaError::UnsupportedFeature { .. })),
427 "{name}: should not be rejected as UnsupportedFeature, got {result:?}",
428 );
429 }
430 }
431 }
432 }
433}