Skip to main content

iota_core/
consensus_validator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use eyre::WrapErr;
8use fastcrypto_tbls::dkg_v1;
9use iota_metrics::monitored_scope;
10use iota_types::{
11    error::IotaError,
12    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
13};
14use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
15use starfish_core;
16use tap::TapFallible;
17use tracing::{info, instrument, warn};
18
19use crate::{
20    authority::authority_per_epoch_store::AuthorityPerEpochStore,
21    checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
22};
23
24/// Allows verifying the validity of transactions
25#[derive(Clone)]
26pub struct IotaTxValidator {
27    epoch_store: Arc<AuthorityPerEpochStore>,
28    checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
29    _transaction_manager: Arc<TransactionManager>,
30    metrics: Arc<IotaTxValidatorMetrics>,
31}
32
33impl IotaTxValidator {
34    pub fn new(
35        epoch_store: Arc<AuthorityPerEpochStore>,
36        checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
37        transaction_manager: Arc<TransactionManager>,
38        metrics: Arc<IotaTxValidatorMetrics>,
39    ) -> Self {
40        info!(
41            "IotaTxValidator constructed for epoch {}",
42            epoch_store.epoch()
43        );
44        Self {
45            epoch_store,
46            checkpoint_service,
47            _transaction_manager: transaction_manager,
48            metrics,
49        }
50    }
51
52    #[instrument(level = "trace", skip_all)]
53    fn validate_transactions(&self, txs: Vec<ConsensusTransactionKind>) -> Result<(), IotaError> {
54        let mut cert_batch = Vec::new();
55        let mut ckpt_messages = Vec::new();
56        let mut ckpt_batch = Vec::new();
57        let mut authority_cap_batch = Vec::new();
58
59        for tx in txs.iter() {
60            match tx {
61                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
62                    cert_batch.push(certificate.as_ref());
63                }
64                ConsensusTransactionKind::CheckpointSignature(signature) => {
65                    ckpt_messages.push(signature.as_ref());
66                    ckpt_batch.push(&signature.summary);
67                }
68                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
69                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
70                        warn!("batch verification error: DKG Message too large");
71                        return Err(IotaError::InvalidDkgMessageSize);
72                    }
73                }
74                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
75                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
76                        warn!("batch verification error: DKG Confirmation too large");
77                        return Err(IotaError::InvalidDkgMessageSize);
78                    }
79                }
80                ConsensusTransactionKind::SignedCapabilityNotificationV1(signed_cap) => {
81                    authority_cap_batch.push(signed_cap);
82                }
83
84                ConsensusTransactionKind::MisbehaviorReport(_) => {
85                    if !self
86                        .epoch_store
87                        .protocol_config()
88                        .calculate_validator_scores()
89                    {
90                        return Err(IotaError::UnsupportedFeature {
91                            error: "MisbehaviorReport not supported at current protocol version"
92                                .into(),
93                        });
94                    }
95                }
96                #[allow(deprecated)]
97                ConsensusTransactionKind::NewJWKFetchedDeprecated => {
98                    return Err(IotaError::UnsupportedFeature {
99                        error: "NewJWKFetched (zkLogin) is deprecated and not supported".into(),
100                    });
101                }
102                ConsensusTransactionKind::EndOfPublish(_)
103                | ConsensusTransactionKind::CapabilityNotificationV1(_) => {}
104            }
105        }
106
107        // verify the certificate signatures as a batch
108        let cert_count = cert_batch.len();
109        let ckpt_count = ckpt_batch.len();
110        let authority_cap_count = authority_cap_batch.len();
111
112        self.epoch_store
113            .signature_verifier
114            .verify_certs_and_checkpoints(cert_batch, ckpt_batch, authority_cap_batch)
115            .tap_err(|e| warn!("batch verification error: {}", e))?;
116
117        // All checkpoint sigs have been verified, forward them to the checkpoint
118        // service
119        for ckpt in ckpt_messages {
120            self.checkpoint_service
121                .notify_checkpoint_signature(&self.epoch_store, ckpt)?;
122        }
123
124        self.metrics
125            .certificate_signatures_verified
126            .inc_by(cert_count as u64);
127        self.metrics
128            .checkpoint_signatures_verified
129            .inc_by(ckpt_count as u64);
130        self.metrics
131            .authority_capabilities_verified
132            .inc_by(authority_cap_count as u64);
133        Ok(())
134
135        // todo - we should un-comment line below once we have a way to revert
136        // those transactions at the end of epoch all certificates had
137        // valid signatures, schedule them for execution prior to sequencing
138        // which is unnecessary for owned object transactions.
139        // It is unnecessary to write to pending_certificates table because the
140        // certs will be written via consensus output.
141        // self.transaction_manager
142        //     .enqueue_certificates(owned_tx_certs, &self.epoch_store)
143        //     .wrap_err("Failed to schedule certificates for execution")
144    }
145}
146
147fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
148    bcs::from_bytes::<ConsensusTransaction>(tx)
149        .wrap_err("Malformed transaction (failed to deserialize)")
150}
151
152impl starfish_core::TransactionVerifier for IotaTxValidator {
153    #[instrument(level = "trace", skip_all)]
154    fn verify_batch(
155        &self,
156        batch: &[&[u8]],
157    ) -> core::result::Result<(), starfish_core::ValidationError> {
158        let _scope = monitored_scope("ValidateBatch");
159
160        let txs = batch
161            .iter()
162            .map(|tx| {
163                tx_from_bytes(tx)
164                    .map(|tx| tx.kind)
165                    .map_err(|e| starfish_core::ValidationError::InvalidTransaction(e.to_string()))
166            })
167            .collect::<core::result::Result<Vec<_>, _>>()?;
168
169        self.validate_transactions(txs)
170            .map_err(|e| starfish_core::ValidationError::InvalidTransaction(e.to_string()))
171    }
172}
173
174pub struct IotaTxValidatorMetrics {
175    certificate_signatures_verified: IntCounter,
176    checkpoint_signatures_verified: IntCounter,
177    authority_capabilities_verified: IntCounter,
178}
179
180impl IotaTxValidatorMetrics {
181    pub fn new(registry: &Registry) -> Arc<Self> {
182        Arc::new(Self {
183            certificate_signatures_verified: register_int_counter_with_registry!(
184                "certificate_signatures_verified",
185                "Number of certificates verified in consensus batch verifier",
186                registry
187            )
188            .unwrap(),
189            checkpoint_signatures_verified: register_int_counter_with_registry!(
190                "checkpoint_signatures_verified",
191                "Number of checkpoint verified in consensus batch verifier",
192                registry
193            )
194            .unwrap(),
195            authority_capabilities_verified: register_int_counter_with_registry!(
196                "authority_capabilities_verified",
197                "Number of signed authority capabilities verified in consensus batch verifier",
198                registry
199            )
200            .unwrap(),
201        })
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::sync::Arc;
208
209    use iota_macros::sim_test;
210    use iota_protocol_config::Chain;
211    use iota_types::{
212        crypto::Ed25519IotaSignature,
213        error::IotaError,
214        messages_consensus::{
215            ConsensusTransaction, ConsensusTransactionKind, MisbehaviorObservationsV1,
216            VersionedMisbehaviorReport,
217        },
218        object::Object,
219        signature::GenericSignature,
220    };
221    use starfish_core::TransactionVerifier as _;
222
223    use crate::{
224        authority::test_authority_builder::TestAuthorityBuilder,
225        checkpoints::CheckpointServiceNoop,
226        consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
227        consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
228    };
229
230    #[sim_test]
231    async fn accept_valid_transaction() {
232        // Initialize an authority with a (owned) gas object and a shared object; then
233        // make a test certificate.
234        let mut objects = test_gas_objects();
235        let shared_object = Object::shared_for_testing();
236        objects.push(shared_object.clone());
237
238        let network_config =
239            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
240                .with_objects(objects.clone())
241                .build();
242
243        let state = TestAuthorityBuilder::new()
244            .with_network_config(&network_config, 0)
245            .build()
246            .await;
247        let name1 = state.name;
248        let certificates = test_certificates(&state, shared_object).await;
249
250        let first_transaction = certificates[0].clone();
251        let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
252            &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
253        )
254        .unwrap();
255
256        let metrics = IotaTxValidatorMetrics::new(&Default::default());
257        let validator = IotaTxValidator::new(
258            state.epoch_store_for_testing().clone(),
259            Arc::new(CheckpointServiceNoop {}),
260            state.transaction_manager().clone(),
261            metrics,
262        );
263        let res = validator.verify_batch(&[&first_transaction_bytes]);
264        assert!(res.is_ok(), "{res:?}");
265
266        let transaction_bytes: Vec<_> = certificates
267            .clone()
268            .into_iter()
269            .map(|cert| {
270                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
271            })
272            .collect();
273
274        let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
275        let res_batch = validator.verify_batch(&batch);
276        assert!(res_batch.is_ok(), "{res_batch:?}");
277
278        let bogus_transaction_bytes: Vec<_> = certificates
279            .into_iter()
280            .map(|mut cert| {
281                // set it to an all-zero user signature
282                cert.tx_signatures_mut_for_testing()[0] = GenericSignature::Signature(
283                    iota_types::crypto::Signature::Ed25519IotaSignature(
284                        Ed25519IotaSignature::default(),
285                    ),
286                );
287                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
288            })
289            .collect();
290
291        let batch: Vec<_> = bogus_transaction_bytes
292            .iter()
293            .map(|t| t.as_slice())
294            .collect();
295        let res_batch = validator.verify_batch(&batch);
296        assert!(res_batch.is_err());
297    }
298
299    /// Verifies that `validate_transactions` correctly gates every
300    /// `ConsensusTransactionKind` variant against the current protocol config's
301    /// feature flags.
302    ///
303    /// The exhaustive match forces a compile error when new variants are added,
304    /// so the developer must explicitly map each variant to its gating flag.
305    #[sim_test]
306    async fn validate_transactions_feature_gating() {
307        use iota_protocol_config::ProtocolConfig;
308        use iota_types::crypto::AuthorityPublicKeyBytes;
309
310        let network_config =
311            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
312
313        let state = TestAuthorityBuilder::new()
314            .with_network_config(&network_config, 0)
315            .with_chain_override(Chain::Mainnet)
316            .build()
317            .await;
318
319        let metrics = IotaTxValidatorMetrics::new(&Default::default());
320        let validator = IotaTxValidator::new(
321            state.epoch_store_for_testing().clone(),
322            Arc::new(CheckpointServiceNoop {}),
323            state.transaction_manager().clone(),
324            metrics,
325        );
326
327        let protocol_config = validator.epoch_store.protocol_config();
328        let authority = AuthorityPublicKeyBytes::default();
329
330        // Returns the feature flag value that gates a variant, or `None` if the
331        // variant is always allowed. The exhaustive match ensures this function
332        // must be updated when new variants are added to ConsensusTransactionKind.
333        #[allow(deprecated)]
334        fn is_feature_gated(
335            kind: &ConsensusTransactionKind,
336            config: &ProtocolConfig,
337        ) -> Option<bool> {
338            match kind {
339                // Always allowed (no feature flag gating).
340                ConsensusTransactionKind::CertifiedTransaction(_)
341                | ConsensusTransactionKind::CheckpointSignature(_)
342                | ConsensusTransactionKind::EndOfPublish(_)
343                | ConsensusTransactionKind::CapabilityNotificationV1(_)
344                | ConsensusTransactionKind::SignedCapabilityNotificationV1(_)
345                | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
346                | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => None,
347
348                // Gated behind `calculate_validator_scores`.
349                ConsensusTransactionKind::MisbehaviorReport(_) => {
350                    Some(config.calculate_validator_scores())
351                }
352
353                // Always rejected: zkLogin JWK support was never enabled on
354                // IOTA and the variant is retained only for serialization
355                // compatibility.
356                ConsensusTransactionKind::NewJWKFetchedDeprecated => Some(false),
357            }
358        }
359
360        // Variants that can be validated without signature verification setup.
361        // CertifiedTransaction, CheckpointSignature, and
362        // SignedCapabilityNotificationV1 are excluded because they require valid
363        // cryptographic signatures and would fail before reaching the feature
364        // gate check; their gating is verified by the exhaustive match above.
365        #[allow(deprecated)]
366        let testable_variants: Vec<(&str, ConsensusTransactionKind)> = vec![
367            (
368                "EndOfPublish",
369                ConsensusTransactionKind::EndOfPublish(authority),
370            ),
371            (
372                "NewJWKFetchedDeprecated",
373                ConsensusTransactionKind::NewJWKFetchedDeprecated,
374            ),
375            (
376                "CapabilityNotificationV1",
377                ConsensusTransactionKind::CapabilityNotificationV1(
378                    iota_types::messages_consensus::AuthorityCapabilitiesV1::new(
379                        authority,
380                        Chain::Mainnet,
381                        iota_types::supported_protocol_versions::SupportedProtocolVersions::SYSTEM_DEFAULT,
382                        vec![],
383                    ),
384                ),
385            ),
386            (
387                "RandomnessDkgMessage",
388                ConsensusTransactionKind::RandomnessDkgMessage(authority, vec![]),
389            ),
390            (
391                "RandomnessDkgConfirmation",
392                ConsensusTransactionKind::RandomnessDkgConfirmation(authority, vec![]),
393            ),
394            (
395                "MisbehaviorReport",
396                ConsensusTransactionKind::MisbehaviorReport(VersionedMisbehaviorReport::new_v1(
397                    authority,
398                    0,
399                    MisbehaviorObservationsV1 {
400                        faulty_blocks_provable: vec![],
401                        faulty_blocks_unprovable: vec![],
402                        missing_proposals: vec![],
403                        equivocations: vec![],
404                    },
405                )),
406            ),
407        ];
408
409        for (name, kind) in testable_variants {
410            let gated = is_feature_gated(&kind, protocol_config);
411            let result = validator.validate_transactions(vec![kind]);
412
413            match gated {
414                Some(false) => {
415                    // Feature flag is disabled: must be rejected.
416                    assert!(
417                        matches!(&result, Err(IotaError::UnsupportedFeature { .. })),
418                        "{name}: feature flag is disabled, expected UnsupportedFeature, \
419                         got {result:?}",
420                    );
421                }
422                Some(true) | None => {
423                    // Feature flag is enabled or variant is ungated: must not
424                    // be rejected as unsupported.
425                    assert!(
426                        !matches!(&result, Err(IotaError::UnsupportedFeature { .. })),
427                        "{name}: should not be rejected as UnsupportedFeature, got {result:?}",
428                    );
429                }
430            }
431        }
432    }
433}