iota_core/
signature_verifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9    zk_login::{JWK, JwkId},
10    zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16    committee::Committee,
17    crypto::{AuthoritySignInfoTrait, VerificationObligation},
18    digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19    error::{IotaError, IotaResult},
20    message_envelope::Message,
21    messages_checkpoint::SignedCheckpointSummary,
22    signature::VerifyParams,
23    signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24    transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::{Itertools as _, izip};
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32    runtime::Handle,
33    sync::oneshot,
34    time::{Duration, timeout},
35};
36use tracing::debug;
37// Maximum amount of time we wait for a batch to fill up before verifying a
38// partial batch.
39const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41// Maximum size of batch to verify. Increasing this value will slightly improve
42// CPU utilization (batching starts to hit steeply diminishing marginal returns
43// around batch sizes of 16), at the cost of slightly increasing latency
44// (BATCH_TIMEOUT_MS will be hit more frequently if system is not heavily
45// loaded).
46const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51    certs: Vec<CertifiedTransaction>,
52    senders: Vec<Sender>,
53    id: u64,
54}
55
56impl CertBuffer {
57    fn new(capacity: usize) -> Self {
58        Self {
59            certs: Vec::with_capacity(capacity),
60            senders: Vec::with_capacity(capacity),
61            id: 0,
62        }
63    }
64
65    // Function consumes MutexGuard, therefore releasing the lock after mem swap is
66    // done
67    fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68        let this = &mut *guard;
69        let mut new = CertBuffer::new(this.capacity());
70        new.id = this.id + 1;
71        std::mem::swap(&mut new, this);
72        new
73    }
74
75    fn capacity(&self) -> usize {
76        debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77        self.certs.capacity()
78    }
79
80    fn len(&self) -> usize {
81        debug_assert_eq!(self.certs.len(), self.senders.len());
82        self.certs.len()
83    }
84
85    fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86        self.senders.push(tx);
87        self.certs.push(cert);
88    }
89}
90
91/// Verifies signatures in ways that faster than verifying each signature
92/// individually.
93/// - BLS signatures - caching and batch verification.
94/// - User signed data - caching.
95pub struct SignatureVerifier {
96    committee: Arc<Committee>,
97    certificate_cache: VerifiedDigestCache<CertificateDigest>,
98    signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99    zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101    /// Map from JwkId (iss, kid) to the fetched JWK for that key.
102    /// We use an immutable data structure because verification of ZKLogins may
103    /// be slow, so we don't want to pass a reference to the map to the
104    /// verify method, since that would lead to a lengthy critical section.
105    /// Instead, we use an immutable data structure which can be cloned very
106    /// cheaply.
107    jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109    /// Params that contains a list of supported providers for ZKLogin and the
110    /// environment (prod/test) the code runs in.
111    zk_login_params: ZkLoginParams,
112
113    queue: Mutex<CertBuffer>,
114    pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117/// Contains two parameters to pass in to verify a ZkLogin signature.
118#[derive(Clone)]
119struct ZkLoginParams {
120    /// The environment (prod/test) the code runs in. It decides which verifying
121    /// key to use in fastcrypto.
122    pub env: ZkLoginEnv,
123    // Flag to determine whether zkLogin inside multisig is accepted.
124    pub accept_zklogin_in_multisig: bool,
125    // Flag to determine whether passkey inside multisig is accepted.
126    pub accept_passkey_in_multisig: bool,
127    /// Value that sets the upper bound for max_epoch in zkLogin signature.
128    pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
129}
130
131impl SignatureVerifier {
132    pub fn new_with_batch_size(
133        committee: Arc<Committee>,
134        batch_size: usize,
135        metrics: Arc<SignatureVerifierMetrics>,
136        env: ZkLoginEnv,
137        accept_zklogin_in_multisig: bool,
138        accept_passkey_in_multisig: bool,
139        zklogin_max_epoch_upper_bound_delta: Option<u64>,
140    ) -> Self {
141        Self {
142            committee,
143            certificate_cache: VerifiedDigestCache::new(
144                metrics.certificate_signatures_cache_hits.clone(),
145                metrics.certificate_signatures_cache_misses.clone(),
146                metrics.certificate_signatures_cache_evictions.clone(),
147            ),
148            signed_data_cache: VerifiedDigestCache::new(
149                metrics.signed_data_cache_hits.clone(),
150                metrics.signed_data_cache_misses.clone(),
151                metrics.signed_data_cache_evictions.clone(),
152            ),
153            zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
154                metrics.zklogin_inputs_cache_hits.clone(),
155                metrics.zklogin_inputs_cache_misses.clone(),
156                metrics.zklogin_inputs_cache_evictions.clone(),
157            )),
158            jwks: Default::default(),
159            queue: Mutex::new(CertBuffer::new(batch_size)),
160            metrics,
161            zk_login_params: ZkLoginParams {
162                env,
163                accept_zklogin_in_multisig,
164                accept_passkey_in_multisig,
165                zklogin_max_epoch_upper_bound_delta,
166            },
167        }
168    }
169
170    pub fn new(
171        committee: Arc<Committee>,
172        metrics: Arc<SignatureVerifierMetrics>,
173        zklogin_env: ZkLoginEnv,
174        accept_zklogin_in_multisig: bool,
175        accept_passkey_in_multisig: bool,
176        zklogin_max_epoch_upper_bound_delta: Option<u64>,
177    ) -> Self {
178        Self::new_with_batch_size(
179            committee,
180            MAX_BATCH_SIZE,
181            metrics,
182            zklogin_env,
183            accept_zklogin_in_multisig,
184            accept_passkey_in_multisig,
185            zklogin_max_epoch_upper_bound_delta,
186        )
187    }
188
189    /// Verifies all certs, returns Ok only if all are valid.
190    pub fn verify_certs_and_checkpoints(
191        &self,
192        certs: Vec<&CertifiedTransaction>,
193        checkpoints: Vec<&SignedCheckpointSummary>,
194    ) -> IotaResult {
195        let certs: Vec<_> = certs
196            .into_iter()
197            .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
198            .collect();
199
200        // Verify only the user sigs of certificates that were not cached already, since
201        // whenever we insert a certificate into the cache, it is already
202        // verified.
203        for cert in &certs {
204            self.verify_tx(cert.data())?;
205        }
206        batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
207        self.certificate_cache
208            .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
209        Ok(())
210    }
211
212    /// Verifies one cert asynchronously, in a batch.
213    pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
214        let cert_digest = cert.certificate_digest();
215        if self.certificate_cache.is_cached(&cert_digest) {
216            return Ok(VerifiedCertificate::new_unchecked(cert));
217        }
218        self.verify_tx(cert.data())?;
219        self.verify_cert_skip_cache(cert)
220            .await
221            .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
222    }
223
224    pub async fn multi_verify_certs(
225        &self,
226        certs: Vec<CertifiedTransaction>,
227    ) -> Vec<IotaResult<VerifiedCertificate>> {
228        // TODO: We could do better by pushing the all of `certs` into the verification
229        // queue at once, but that's significantly more complex.
230        let mut futures = Vec::with_capacity(certs.len());
231        for cert in certs {
232            futures.push(self.verify_cert(cert));
233        }
234        futures::future::join_all(futures).await
235    }
236
237    /// exposed as a public method for the benchmarks
238    pub async fn verify_cert_skip_cache(
239        &self,
240        cert: CertifiedTransaction,
241    ) -> IotaResult<VerifiedCertificate> {
242        // this is the only innocent error we are likely to encounter - filter it before
243        // we poison a whole batch.
244        if cert.auth_sig().epoch != self.committee.epoch() {
245            return Err(IotaError::WrongEpoch {
246                expected_epoch: self.committee.epoch(),
247                actual_epoch: cert.auth_sig().epoch,
248            });
249        }
250
251        self.verify_cert_inner(cert).await
252    }
253
254    async fn verify_cert_inner(
255        &self,
256        cert: CertifiedTransaction,
257    ) -> IotaResult<VerifiedCertificate> {
258        // Cancellation safety: we use parking_lot locks, which cannot be held across
259        // awaits. Therefore once the queue has been taken by a thread, it is
260        // guaranteed to process the queue and send all results before the
261        // future can be cancelled by the caller.
262        let (tx, rx) = oneshot::channel();
263        pin_mut!(rx);
264
265        let prev_id_or_buffer = {
266            let mut queue = self.queue.lock();
267            queue.push(tx, cert);
268            if queue.len() == queue.capacity() {
269                Either::Right(CertBuffer::take_and_replace(queue))
270            } else {
271                Either::Left(queue.id)
272            }
273        };
274        let prev_id = match prev_id_or_buffer {
275            Either::Left(prev_id) => prev_id,
276            Either::Right(buffer) => {
277                self.metrics.full_batches.inc();
278                self.process_queue(buffer).await;
279                // unwrap ok - process_queue will have sent the result already
280                return rx.try_recv().unwrap();
281            }
282        };
283
284        if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
285            // unwrap ok - tx cannot have been dropped without sending a result.
286            return res.unwrap();
287        }
288        self.metrics.timeouts.inc();
289
290        let buffer = {
291            let queue = self.queue.lock();
292            // check if another thread took the queue while we were re-acquiring lock.
293            if prev_id == queue.id {
294                debug_assert_ne!(queue.len(), queue.capacity());
295                Some(CertBuffer::take_and_replace(queue))
296            } else {
297                None
298            }
299        };
300
301        if let Some(buffer) = buffer {
302            self.metrics.partial_batches.inc();
303            self.process_queue(buffer).await;
304            // unwrap ok - process_queue will have sent the result already
305            return rx.try_recv().unwrap();
306        }
307
308        // unwrap ok - another thread took the queue while we were re-acquiring the lock
309        // and is guaranteed to process the queue immediately.
310        rx.await.unwrap()
311    }
312
313    async fn process_queue(&self, buffer: CertBuffer) {
314        let committee = self.committee.clone();
315        let metrics = self.metrics.clone();
316        let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
317        Handle::current()
318            .spawn_blocking(move || {
319                Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
320            })
321            .await
322            .expect("Spawn blocking should not fail");
323    }
324
325    fn process_queue_sync(
326        committee: Arc<Committee>,
327        metrics: Arc<SignatureVerifierMetrics>,
328        buffer: CertBuffer,
329        zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
330    ) {
331        let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
332
333        let results = batch_verify_certificates(
334            &committee,
335            &buffer.certs.iter().collect_vec(),
336            zklogin_inputs_cache,
337        );
338        izip!(
339            results.into_iter(),
340            buffer.certs.into_iter(),
341            buffer.senders.into_iter(),
342        )
343        .for_each(|(result, cert, tx)| {
344            tx.send(match result {
345                Ok(()) => {
346                    metrics.total_verified_certs.inc();
347                    Ok(VerifiedCertificate::new_unchecked(cert))
348                }
349                Err(e) => {
350                    metrics.total_failed_certs.inc();
351                    Err(e)
352                }
353            })
354            .ok();
355        });
356    }
357
358    /// Insert a JWK into the verifier state. Pre-existing entries for a given
359    /// JwkId will not be overwritten.
360    pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
361        let mut jwks = self.jwks.write();
362        match jwks.entry(jwk_id.clone()) {
363            im::hashmap::Entry::Occupied(_) => {
364                debug!("JWK with kid {:?} already exists", jwk_id);
365            }
366            im::hashmap::Entry::Vacant(entry) => {
367                debug!("inserting JWK with kid: {:?}", jwk_id);
368                entry.insert(jwk.clone());
369            }
370        }
371    }
372
373    pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
374        let jwks = self.jwks.read();
375        jwks.get(jwk_id) == Some(jwk)
376    }
377
378    pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
379        self.jwks.read().clone()
380    }
381
382    pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
383        self.signed_data_cache.is_verified(
384            signed_tx.full_message_digest(),
385            || {
386                let jwks = self.jwks.read().clone();
387                let verify_params = VerifyParams::new(
388                    jwks,
389                    self.zk_login_params.env,
390                    self.zk_login_params.accept_zklogin_in_multisig,
391                    self.zk_login_params.accept_passkey_in_multisig,
392                    self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
393                );
394                verify_sender_signed_data_message_signatures(
395                    signed_tx,
396                    self.committee.epoch(),
397                    &verify_params,
398                    self.zklogin_inputs_cache.clone(),
399                )
400            },
401            || Ok(()),
402        )
403    }
404
405    pub fn clear_signature_cache(&self) {
406        self.certificate_cache.clear();
407        self.signed_data_cache.clear();
408        self.zklogin_inputs_cache.clear();
409    }
410}
411
412pub struct SignatureVerifierMetrics {
413    pub certificate_signatures_cache_hits: IntCounter,
414    pub certificate_signatures_cache_misses: IntCounter,
415    pub certificate_signatures_cache_evictions: IntCounter,
416    pub signed_data_cache_hits: IntCounter,
417    pub signed_data_cache_misses: IntCounter,
418    pub signed_data_cache_evictions: IntCounter,
419    pub zklogin_inputs_cache_hits: IntCounter,
420    pub zklogin_inputs_cache_misses: IntCounter,
421    pub zklogin_inputs_cache_evictions: IntCounter,
422    timeouts: IntCounter,
423    full_batches: IntCounter,
424    partial_batches: IntCounter,
425    total_verified_certs: IntCounter,
426    total_failed_certs: IntCounter,
427}
428
429impl SignatureVerifierMetrics {
430    pub fn new(registry: &Registry) -> Arc<Self> {
431        Arc::new(Self {
432            certificate_signatures_cache_hits: register_int_counter_with_registry!(
433                "certificate_signatures_cache_hits",
434                "Number of certificates which were known to be verified because of signature cache.",
435                registry
436            )
437            .unwrap(),
438            certificate_signatures_cache_misses: register_int_counter_with_registry!(
439                "certificate_signatures_cache_misses",
440                "Number of certificates which missed the signature cache",
441                registry
442            )
443            .unwrap(),
444            certificate_signatures_cache_evictions: register_int_counter_with_registry!(
445                "certificate_signatures_cache_evictions",
446                "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
447                registry
448            )
449            .unwrap(),
450            signed_data_cache_hits: register_int_counter_with_registry!(
451                "signed_data_cache_hits",
452                "Number of signed data which were known to be verified because of signature cache.",
453                registry
454            )
455            .unwrap(),
456            signed_data_cache_misses: register_int_counter_with_registry!(
457                "signed_data_cache_misses",
458                "Number of signed data which missed the signature cache.",
459                registry
460            )
461            .unwrap(),
462            signed_data_cache_evictions: register_int_counter_with_registry!(
463                "signed_data_cache_evictions",
464                "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
465                registry
466            )
467                .unwrap(),
468                zklogin_inputs_cache_hits: register_int_counter_with_registry!(
469                    "zklogin_inputs_cache_hits",
470                    "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
471                    registry
472                )
473                .unwrap(),
474                zklogin_inputs_cache_misses: register_int_counter_with_registry!(
475                    "zklogin_inputs_cache_misses",
476                    "Number of zklogin signatures which missed the zklogin inputs cache.",
477                    registry
478                )
479                .unwrap(),
480                zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
481                    "zklogin_inputs_cache_evictions",
482                    "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
483                    registry
484                )
485                .unwrap(),
486            timeouts: register_int_counter_with_registry!(
487                "async_batch_verifier_timeouts",
488                "Number of times batch verifier times out and verifies a partial batch",
489                registry
490            )
491            .unwrap(),
492            full_batches: register_int_counter_with_registry!(
493                "async_batch_verifier_full_batches",
494                "Number of times batch verifier verifies a full batch",
495                registry
496            )
497            .unwrap(),
498            partial_batches: register_int_counter_with_registry!(
499                "async_batch_verifier_partial_batches",
500                "Number of times batch verifier verifies a partial batch",
501                registry
502            )
503            .unwrap(),
504            total_verified_certs: register_int_counter_with_registry!(
505                "async_batch_verifier_total_verified_certs",
506                "Total number of certs batch verifier has verified",
507                registry
508            )
509            .unwrap(),
510            total_failed_certs: register_int_counter_with_registry!(
511                "async_batch_verifier_total_failed_certs",
512                "Total number of certs batch verifier has rejected",
513                registry
514            )
515            .unwrap(),
516        })
517    }
518}
519
520/// Verifies all certificates - if any fail return error.
521pub fn batch_verify_all_certificates_and_checkpoints(
522    committee: &Committee,
523    certs: &[&CertifiedTransaction],
524    checkpoints: &[&SignedCheckpointSummary],
525) -> IotaResult {
526    // certs.data() is assumed to be verified already by the caller.
527
528    for ckpt in checkpoints {
529        ckpt.data().verify_epoch(committee.epoch())?;
530    }
531
532    batch_verify(committee, certs, checkpoints)
533}
534
535/// Verifies certificates in batch mode, but returns a separate result for each
536/// cert.
537pub fn batch_verify_certificates(
538    committee: &Committee,
539    certs: &[&CertifiedTransaction],
540    zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
541) -> Vec<IotaResult> {
542    // certs.data() is assumed to be verified already by the caller.
543    let verify_params = VerifyParams::default();
544    match batch_verify(committee, certs, &[]) {
545        Ok(_) => vec![Ok(()); certs.len()],
546
547        // Verify one by one to find which certs were invalid.
548        Err(_) if certs.len() > 1 => certs
549            .iter()
550            // TODO: verify_signature currently checks the tx sig as well, which might be cached
551            // already.
552            .map(|c| {
553                c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
554            })
555            .collect(),
556
557        Err(e) => vec![Err(e)],
558    }
559}
560
561fn batch_verify(
562    committee: &Committee,
563    certs: &[&CertifiedTransaction],
564    checkpoints: &[&SignedCheckpointSummary],
565) -> IotaResult {
566    let mut obligation = VerificationObligation::default();
567
568    for cert in certs {
569        let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
570        cert.auth_sig()
571            .add_to_verification_obligation(committee, &mut obligation, idx)?;
572    }
573
574    for ckpt in checkpoints {
575        let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
576        ckpt.auth_sig()
577            .add_to_verification_obligation(committee, &mut obligation, idx)?;
578    }
579
580    obligation.verify_all()
581}