Skip to main content

iota_core/
signature_verifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeSet, sync::Arc};
6
7use either::Either;
8use fastcrypto::traits::{AggregateAuthenticator, ToFromBytes};
9use futures::pin_mut;
10use iota_metrics::monitored_scope;
11use iota_sdk_types::crypto::Intent;
12use iota_types::{
13    base_types::AuthorityName,
14    committee::Committee,
15    crypto::{AuthorityPublicKey, AuthoritySignInfoTrait, VerificationObligation},
16    digests::{CertificateDigest, SenderSignedDataDigest},
17    error::{IotaError, IotaResult},
18    message_envelope::Message,
19    messages_checkpoint::SignedCheckpointSummary,
20    messages_consensus::{AuthorityCapabilitiesDigest, SignedAuthorityCapabilitiesV1},
21    signature::VerifyParams,
22    signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
23    transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
24};
25use itertools::{Itertools as _, izip};
26use parking_lot::{Mutex, MutexGuard};
27use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
28use tap::TapFallible;
29use tokio::{
30    runtime::Handle,
31    sync::oneshot,
32    time::{Duration, timeout},
33};
34use tracing::{Instrument, instrument, trace_span};
35// Maximum amount of time we wait for a batch to fill up before verifying a
36// partial batch.
37const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
38
39// Maximum size of batch to verify. Increasing this value will slightly improve
40// CPU utilization (batching starts to hit steeply diminishing marginal returns
41// around batch sizes of 16), at the cost of slightly increasing latency
42// (BATCH_TIMEOUT_MS will be hit more frequently if system is not heavily
43// loaded).
44const MAX_BATCH_SIZE: usize = 8;
45
46type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
47
48struct CertBuffer {
49    certs: Vec<CertifiedTransaction>,
50    senders: Vec<Sender>,
51    id: u64,
52}
53
54impl CertBuffer {
55    fn new(capacity: usize) -> Self {
56        Self {
57            certs: Vec::with_capacity(capacity),
58            senders: Vec::with_capacity(capacity),
59            id: 0,
60        }
61    }
62
63    // Function consumes MutexGuard, therefore releasing the lock after mem swap is
64    // done
65    fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
66        let this = &mut *guard;
67        let mut new = CertBuffer::new(this.capacity());
68        new.id = this.id + 1;
69        std::mem::swap(&mut new, this);
70        new
71    }
72
73    fn capacity(&self) -> usize {
74        debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
75        self.certs.capacity()
76    }
77
78    fn len(&self) -> usize {
79        debug_assert_eq!(self.certs.len(), self.senders.len());
80        self.certs.len()
81    }
82
83    fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
84        self.senders.push(tx);
85        self.certs.push(cert);
86    }
87}
88
89/// Verifies signatures in ways that faster than verifying each signature
90/// individually.
91/// - BLS signatures - caching and batch verification.
92/// - User signed data - caching.
93pub struct SignatureVerifier {
94    committee: Arc<Committee>,
95    non_committee_validators: BTreeSet<AuthorityName>,
96
97    certificate_cache: VerifiedDigestCache<CertificateDigest>,
98    signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99    authority_capability_cache: VerifiedDigestCache<AuthorityCapabilitiesDigest>,
100
101    /// Params for signature verification.
102    verify_params: VerifyParams,
103
104    queue: Mutex<CertBuffer>,
105    pub metrics: Arc<SignatureVerifierMetrics>,
106}
107
108impl SignatureVerifier {
109    pub fn new_with_batch_size(
110        committee: Arc<Committee>,
111        non_committee_validators: BTreeSet<AuthorityName>,
112        batch_size: usize,
113        metrics: Arc<SignatureVerifierMetrics>,
114        accept_passkey_in_multisig: bool,
115        additional_multisig_checks: bool,
116    ) -> Self {
117        Self {
118            committee,
119            non_committee_validators,
120            certificate_cache: VerifiedDigestCache::new(
121                metrics.certificate_signatures_cache_hits.clone(),
122                metrics.certificate_signatures_cache_misses.clone(),
123                metrics.certificate_signatures_cache_evictions.clone(),
124            ),
125            signed_data_cache: VerifiedDigestCache::new(
126                metrics.signed_data_cache_hits.clone(),
127                metrics.signed_data_cache_misses.clone(),
128                metrics.signed_data_cache_evictions.clone(),
129            ),
130            authority_capability_cache: VerifiedDigestCache::new(
131                metrics.authority_capabilities_cache_hits.clone(),
132                metrics.authority_capabilities_cache_misses.clone(),
133                metrics.authority_capabilities_cache_evictions.clone(),
134            ),
135            queue: Mutex::new(CertBuffer::new(batch_size)),
136            metrics,
137            verify_params: VerifyParams::new(
138                accept_passkey_in_multisig,
139                additional_multisig_checks,
140            ),
141        }
142    }
143
144    pub fn new(
145        committee: Arc<Committee>,
146        non_committee_validators: BTreeSet<AuthorityName>,
147        metrics: Arc<SignatureVerifierMetrics>,
148        accept_passkey_in_multisig: bool,
149        additional_multisig_checks: bool,
150    ) -> Self {
151        Self::new_with_batch_size(
152            committee,
153            non_committee_validators,
154            MAX_BATCH_SIZE,
155            metrics,
156            accept_passkey_in_multisig,
157            additional_multisig_checks,
158        )
159    }
160
161    /// Verifies all certs, returns Ok only if all are valid.
162    #[instrument(level = "trace", skip_all)]
163    pub fn verify_certs_and_checkpoints(
164        &self,
165        certs: Vec<&CertifiedTransaction>,
166        checkpoints: Vec<&SignedCheckpointSummary>,
167        authority_capabilities: Vec<&SignedAuthorityCapabilitiesV1>,
168    ) -> IotaResult {
169        // Verify all user sigs, since caching is handled by the underlying
170        // implementation.
171        for cert in &certs {
172            self.verify_tx(cert.data())?;
173        }
174
175        // Verify authority capabilities signatures. Caching is handled inside to avoid
176        // checking the same message multiple times.
177        for cap in &authority_capabilities {
178            self.verify_authority_capabilities(cap)?;
179        }
180
181        batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
182        Ok(())
183    }
184
185    /// Verifies one cert asynchronously, in a batch.
186    pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
187        let cert_digest = cert.certificate_digest();
188        if self.certificate_cache.is_cached(&cert_digest) {
189            return Ok(VerifiedCertificate::new_unchecked(cert));
190        }
191        self.verify_tx(cert.data())?;
192        self.verify_cert_skip_cache(cert)
193            .await
194            .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
195    }
196
197    pub async fn multi_verify_certs(
198        &self,
199        certs: Vec<CertifiedTransaction>,
200    ) -> Vec<IotaResult<VerifiedCertificate>> {
201        // TODO: We could do better by pushing the all of `certs` into the verification
202        // queue at once, but that's significantly more complex.
203        let mut futures = Vec::with_capacity(certs.len());
204        for cert in certs {
205            futures.push(self.verify_cert(cert));
206        }
207        futures::future::join_all(futures).await
208    }
209
210    /// exposed as a public method for the benchmarks
211    pub async fn verify_cert_skip_cache(
212        &self,
213        cert: CertifiedTransaction,
214    ) -> IotaResult<VerifiedCertificate> {
215        // this is the only innocent error we are likely to encounter - filter it before
216        // we poison a whole batch.
217        if cert.auth_sig().epoch != self.committee.epoch() {
218            return Err(IotaError::WrongEpoch {
219                expected_epoch: self.committee.epoch(),
220                actual_epoch: cert.auth_sig().epoch,
221            });
222        }
223
224        self.verify_cert_inner(cert).await
225    }
226
227    async fn verify_cert_inner(
228        &self,
229        cert: CertifiedTransaction,
230    ) -> IotaResult<VerifiedCertificate> {
231        // Cancellation safety: we use parking_lot locks, which cannot be held across
232        // awaits. Therefore once the queue has been taken by a thread, it is
233        // guaranteed to process the queue and send all results before the
234        // future can be cancelled by the caller.
235        let (tx, rx) = oneshot::channel();
236        pin_mut!(rx);
237
238        let prev_id_or_buffer = {
239            let mut queue = self.queue.lock();
240            queue.push(tx, cert);
241            if queue.len() == queue.capacity() {
242                Either::Right(CertBuffer::take_and_replace(queue))
243            } else {
244                Either::Left(queue.id)
245            }
246        };
247        let prev_id = match prev_id_or_buffer {
248            Either::Left(prev_id) => prev_id,
249            Either::Right(buffer) => {
250                self.metrics.full_batches.inc();
251                self.process_queue(buffer)
252                    .instrument(trace_span!("SignatureVerifier::process_queue"))
253                    .await;
254                // unwrap ok - process_queue will have sent the result already
255                return rx.try_recv().unwrap();
256            }
257        };
258
259        if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
260            // unwrap ok - tx cannot have been dropped without sending a result.
261            return res.unwrap();
262        }
263        self.metrics.timeouts.inc();
264
265        let buffer = {
266            let queue = self.queue.lock();
267            // check if another thread took the queue while we were re-acquiring lock.
268            if prev_id == queue.id {
269                debug_assert_ne!(queue.len(), queue.capacity());
270                Some(CertBuffer::take_and_replace(queue))
271            } else {
272                None
273            }
274        };
275
276        if let Some(buffer) = buffer {
277            self.metrics.partial_batches.inc();
278            self.process_queue(buffer).await;
279            // unwrap ok - process_queue will have sent the result already
280            return rx.try_recv().unwrap();
281        }
282
283        // unwrap ok - another thread took the queue while we were re-acquiring the lock
284        // and is guaranteed to process the queue immediately.
285        rx.await.unwrap()
286    }
287
288    async fn process_queue(&self, buffer: CertBuffer) {
289        let committee = self.committee.clone();
290        let metrics = self.metrics.clone();
291        Handle::current()
292            .spawn_blocking(move || Self::process_queue_sync(committee, metrics, buffer))
293            .await
294            .expect("Spawn blocking should not fail");
295    }
296
297    #[instrument(level = "trace", skip_all)]
298    fn process_queue_sync(
299        committee: Arc<Committee>,
300        metrics: Arc<SignatureVerifierMetrics>,
301        buffer: CertBuffer,
302    ) {
303        let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
304
305        let results = batch_verify_certificates(&committee, &buffer.certs.iter().collect_vec());
306        izip!(
307            results.into_iter(),
308            buffer.certs.into_iter(),
309            buffer.senders.into_iter(),
310        )
311        .for_each(|(result, cert, tx)| {
312            tx.send(match result {
313                Ok(()) => {
314                    metrics.total_verified_certs.inc();
315                    Ok(VerifiedCertificate::new_unchecked(cert))
316                }
317                Err(e) => {
318                    metrics.total_failed_certs.inc();
319                    Err(e)
320                }
321            })
322            .ok();
323        });
324    }
325
326    #[instrument(level = "trace", skip_all, fields(tx_digest = ?signed_tx.digest()))]
327    pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
328        self.signed_data_cache.is_verified(
329            signed_tx.full_message_digest(),
330            || verify_sender_signed_data_message_signatures(signed_tx, &self.verify_params),
331            || Ok(()),
332        )
333    }
334
335    #[instrument(level = "trace", skip_all)]
336    pub fn verify_authority_capabilities(
337        &self,
338        signed_authority_capabilities: &SignedAuthorityCapabilitiesV1,
339    ) -> IotaResult {
340        let epoch = self.committee.epoch();
341        self.authority_capability_cache.is_verified(
342            signed_authority_capabilities.cache_digest(epoch),
343            || {
344                // Check if authority exists in non-committee validators
345                let authority_name = signed_authority_capabilities.data().authority;
346                if !self.non_committee_validators.contains(&authority_name) {
347                    return Err(IotaError::IncorrectSigner {
348                        error: "Signer must be part of non-committee active validators".to_string(),
349                    });
350                }
351
352                // Create a verification obligation
353                let mut obligation = VerificationObligation::default();
354                let idx = obligation.add_message(
355                    signed_authority_capabilities.data(),
356                    epoch, /* epoch is shared between the committee and
357                            * non-committee validators */
358                    Intent::iota_app(signed_authority_capabilities.scope()),
359                );
360
361                // Add the signature and public key to the obligation
362                let authority_key = AuthorityPublicKey::from_bytes(authority_name.as_bytes())
363                    .map_err(|_| IotaError::IncorrectSigner {
364                        error: "Invalid authority public key bytes".to_string(),
365                    })?;
366                obligation
367                    .public_keys
368                    .get_mut(idx)
369                    .ok_or(IotaError::InvalidAuthenticator)?
370                    .push(&authority_key);
371
372                obligation
373                    .signatures
374                    .get_mut(idx)
375                    .ok_or(IotaError::InvalidAuthenticator)?
376                    .add_signature(signed_authority_capabilities.auth_sig().clone())
377                    .map_err(|_| IotaError::InvalidSignature {
378                        error: "Failed to add authority signature to obligation".to_string(),
379                    })?;
380
381                obligation.verify_all()
382            },
383            || Ok(()),
384        )
385    }
386
387    pub fn clear_signature_cache(&self) {
388        self.certificate_cache.clear();
389        self.authority_capability_cache.clear();
390        self.signed_data_cache.clear();
391    }
392}
393
394pub struct SignatureVerifierMetrics {
395    pub certificate_signatures_cache_hits: IntCounter,
396    pub certificate_signatures_cache_misses: IntCounter,
397    pub certificate_signatures_cache_evictions: IntCounter,
398    pub signed_data_cache_hits: IntCounter,
399    pub signed_data_cache_misses: IntCounter,
400    pub signed_data_cache_evictions: IntCounter,
401    pub authority_capabilities_cache_hits: IntCounter,
402    pub authority_capabilities_cache_misses: IntCounter,
403    pub authority_capabilities_cache_evictions: IntCounter,
404    timeouts: IntCounter,
405    full_batches: IntCounter,
406    partial_batches: IntCounter,
407    total_verified_certs: IntCounter,
408    total_failed_certs: IntCounter,
409}
410
411impl SignatureVerifierMetrics {
412    pub fn new(registry: &Registry) -> Arc<Self> {
413        Arc::new(Self {
414            certificate_signatures_cache_hits: register_int_counter_with_registry!(
415                "certificate_signatures_cache_hits",
416                "Number of certificates which were known to be verified because of signature cache.",
417                registry
418            )
419            .unwrap(),
420            certificate_signatures_cache_misses: register_int_counter_with_registry!(
421                "certificate_signatures_cache_misses",
422                "Number of certificates which missed the signature cache",
423                registry
424            )
425            .unwrap(),
426            certificate_signatures_cache_evictions: register_int_counter_with_registry!(
427                "certificate_signatures_cache_evictions",
428                "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
429                registry
430            )
431            .unwrap(),
432            signed_data_cache_hits: register_int_counter_with_registry!(
433                "signed_data_cache_hits",
434                "Number of signed data which were known to be verified because of signature cache.",
435                registry
436            )
437            .unwrap(),
438            signed_data_cache_misses: register_int_counter_with_registry!(
439                "signed_data_cache_misses",
440                "Number of signed data which missed the signature cache.",
441                registry
442            )
443            .unwrap(),
444            signed_data_cache_evictions: register_int_counter_with_registry!(
445                "signed_data_cache_evictions",
446                "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
447                registry
448            )
449            .unwrap(),
450            authority_capabilities_cache_hits: register_int_counter_with_registry!(
451                "authority_capabilities_cache_hits",
452                "Number of authority capabilities which were known to be verified because of capabilities cache.",
453                registry
454            )
455            .unwrap(),
456            authority_capabilities_cache_misses: register_int_counter_with_registry!(
457                "authority_capabilities_cache_misses",
458                "Number of authority capabilities which missed the capabilities cache.",
459                registry
460            )
461            .unwrap(),
462            authority_capabilities_cache_evictions: register_int_counter_with_registry!(
463                "authority_capabilities_cache_evictions",
464                "Number of times we evict a pre-existing authority capabilities that were known to be verified.",
465                registry
466            )
467            .unwrap(),
468            timeouts: register_int_counter_with_registry!(
469                "async_batch_verifier_timeouts",
470                "Number of times batch verifier times out and verifies a partial batch",
471                registry
472            )
473            .unwrap(),
474            full_batches: register_int_counter_with_registry!(
475                "async_batch_verifier_full_batches",
476                "Number of times batch verifier verifies a full batch",
477                registry
478            )
479            .unwrap(),
480            partial_batches: register_int_counter_with_registry!(
481                "async_batch_verifier_partial_batches",
482                "Number of times batch verifier verifies a partial batch",
483                registry
484            )
485            .unwrap(),
486            total_verified_certs: register_int_counter_with_registry!(
487                "async_batch_verifier_total_verified_certs",
488                "Total number of certs batch verifier has verified",
489                registry
490            )
491            .unwrap(),
492            total_failed_certs: register_int_counter_with_registry!(
493                "async_batch_verifier_total_failed_certs",
494                "Total number of certs batch verifier has rejected",
495                registry
496            )
497            .unwrap(),
498        })
499    }
500}
501
502/// Verifies all certificates - if any fail return error.
503#[instrument(level = "trace", skip_all)]
504pub fn batch_verify_all_certificates_and_checkpoints(
505    committee: &Committee,
506    certs: &[&CertifiedTransaction],
507    checkpoints: &[&SignedCheckpointSummary],
508) -> IotaResult {
509    // certs.data() is assumed to be verified already by the caller.
510
511    for ckpt in checkpoints {
512        ckpt.data().verify_epoch(committee.epoch())?;
513    }
514
515    batch_verify(committee, certs, checkpoints)
516}
517
518/// Verifies certificates in batch mode, but returns a separate result for each
519/// cert.
520#[instrument(level = "trace", skip_all)]
521pub fn batch_verify_certificates(
522    committee: &Committee,
523    certs: &[&CertifiedTransaction],
524) -> Vec<IotaResult> {
525    // certs.data() is assumed to be verified already by the caller.
526    let verify_params = VerifyParams::default();
527    match batch_verify(committee, certs, &[]) {
528        Ok(_) => vec![Ok(()); certs.len()],
529
530        // Verify one by one to find which certs were invalid.
531        Err(_) if certs.len() > 1 => certs
532            .iter()
533            // TODO: verify_signature currently checks the tx sig as well, which might be cached
534            // already.
535            .map(|c| c.verify_signatures_authenticated(committee, &verify_params))
536            .collect(),
537
538        Err(e) => vec![Err(e)],
539    }
540}
541
542fn batch_verify(
543    committee: &Committee,
544    certs: &[&CertifiedTransaction],
545    checkpoints: &[&SignedCheckpointSummary],
546) -> IotaResult {
547    let mut obligation = VerificationObligation::default();
548
549    for cert in certs {
550        let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
551        cert.auth_sig()
552            .add_to_verification_obligation(committee, &mut obligation, idx)?;
553    }
554
555    for ckpt in checkpoints {
556        let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
557        ckpt.auth_sig()
558            .add_to_verification_obligation(committee, &mut obligation, idx)?;
559    }
560
561    obligation.verify_all()
562}