iota_core/
signature_verifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9    zk_login::{JWK, JwkId},
10    zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16    committee::Committee,
17    crypto::{AuthoritySignInfoTrait, VerificationObligation},
18    digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19    error::{IotaError, IotaResult},
20    message_envelope::Message,
21    messages_checkpoint::SignedCheckpointSummary,
22    signature::VerifyParams,
23    signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24    transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::{Itertools as _, izip};
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32    runtime::Handle,
33    sync::oneshot,
34    time::{Duration, timeout},
35};
36use tracing::debug;
37// Maximum amount of time we wait for a batch to fill up before verifying a
38// partial batch.
39const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41// Maximum size of batch to verify. Increasing this value will slightly improve
42// CPU utilization (batching starts to hit steeply diminishing marginal returns
43// around batch sizes of 16), at the cost of slightly increasing latency
44// (BATCH_TIMEOUT_MS will be hit more frequently if system is not heavily
45// loaded).
46const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51    certs: Vec<CertifiedTransaction>,
52    senders: Vec<Sender>,
53    id: u64,
54}
55
56impl CertBuffer {
57    fn new(capacity: usize) -> Self {
58        Self {
59            certs: Vec::with_capacity(capacity),
60            senders: Vec::with_capacity(capacity),
61            id: 0,
62        }
63    }
64
65    // Function consumes MutexGuard, therefore releasing the lock after mem swap is
66    // done
67    fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68        let this = &mut *guard;
69        let mut new = CertBuffer::new(this.capacity());
70        new.id = this.id + 1;
71        std::mem::swap(&mut new, this);
72        new
73    }
74
75    fn capacity(&self) -> usize {
76        debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77        self.certs.capacity()
78    }
79
80    fn len(&self) -> usize {
81        debug_assert_eq!(self.certs.len(), self.senders.len());
82        self.certs.len()
83    }
84
85    fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86        self.senders.push(tx);
87        self.certs.push(cert);
88    }
89}
90
91/// Verifies signatures in ways that faster than verifying each signature
92/// individually.
93/// - BLS signatures - caching and batch verification.
94/// - User signed data - caching.
95pub struct SignatureVerifier {
96    committee: Arc<Committee>,
97    certificate_cache: VerifiedDigestCache<CertificateDigest>,
98    signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99    zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101    /// Map from JwkId (iss, kid) to the fetched JWK for that key.
102    /// We use an immutable data structure because verification of ZKLogins may
103    /// be slow, so we don't want to pass a reference to the map to the
104    /// verify method, since that would lead to a lengthy critical section.
105    /// Instead, we use an immutable data structure which can be cloned very
106    /// cheaply.
107    jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109    /// Params that contains a list of supported providers for ZKLogin and the
110    /// environment (prod/test) the code runs in.
111    zk_login_params: ZkLoginParams,
112
113    queue: Mutex<CertBuffer>,
114    pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117/// Contains two parameters to pass in to verify a ZkLogin signature.
118#[derive(Clone)]
119struct ZkLoginParams {
120    /// The environment (prod/test) the code runs in. It decides which verifying
121    /// key to use in fastcrypto.
122    pub env: ZkLoginEnv,
123    // Flag to determine whether zkLogin inside multisig is accepted.
124    pub accept_zklogin_in_multisig: bool,
125    // Flag to determine whether passkey inside multisig is accepted.
126    pub accept_passkey_in_multisig: bool,
127    /// Value that sets the upper bound for max_epoch in zkLogin signature.
128    pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
129    /// Flag to determine whether additional multisig checks are performed.
130    pub additional_multisig_checks: bool,
131}
132
133impl SignatureVerifier {
134    pub fn new_with_batch_size(
135        committee: Arc<Committee>,
136        batch_size: usize,
137        metrics: Arc<SignatureVerifierMetrics>,
138        env: ZkLoginEnv,
139        accept_zklogin_in_multisig: bool,
140        accept_passkey_in_multisig: bool,
141        zklogin_max_epoch_upper_bound_delta: Option<u64>,
142        additional_multisig_checks: bool,
143    ) -> Self {
144        Self {
145            committee,
146            certificate_cache: VerifiedDigestCache::new(
147                metrics.certificate_signatures_cache_hits.clone(),
148                metrics.certificate_signatures_cache_misses.clone(),
149                metrics.certificate_signatures_cache_evictions.clone(),
150            ),
151            signed_data_cache: VerifiedDigestCache::new(
152                metrics.signed_data_cache_hits.clone(),
153                metrics.signed_data_cache_misses.clone(),
154                metrics.signed_data_cache_evictions.clone(),
155            ),
156            zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
157                metrics.zklogin_inputs_cache_hits.clone(),
158                metrics.zklogin_inputs_cache_misses.clone(),
159                metrics.zklogin_inputs_cache_evictions.clone(),
160            )),
161            jwks: Default::default(),
162            queue: Mutex::new(CertBuffer::new(batch_size)),
163            metrics,
164            zk_login_params: ZkLoginParams {
165                env,
166                accept_zklogin_in_multisig,
167                accept_passkey_in_multisig,
168                zklogin_max_epoch_upper_bound_delta,
169                additional_multisig_checks,
170            },
171        }
172    }
173
174    pub fn new(
175        committee: Arc<Committee>,
176        metrics: Arc<SignatureVerifierMetrics>,
177        zklogin_env: ZkLoginEnv,
178        accept_zklogin_in_multisig: bool,
179        accept_passkey_in_multisig: bool,
180        zklogin_max_epoch_upper_bound_delta: Option<u64>,
181        additional_multisig_checks: bool,
182    ) -> Self {
183        Self::new_with_batch_size(
184            committee,
185            MAX_BATCH_SIZE,
186            metrics,
187            zklogin_env,
188            accept_zklogin_in_multisig,
189            accept_passkey_in_multisig,
190            zklogin_max_epoch_upper_bound_delta,
191            additional_multisig_checks,
192        )
193    }
194
195    /// Verifies all certs, returns Ok only if all are valid.
196    pub fn verify_certs_and_checkpoints(
197        &self,
198        certs: Vec<&CertifiedTransaction>,
199        checkpoints: Vec<&SignedCheckpointSummary>,
200    ) -> IotaResult {
201        let certs: Vec<_> = certs
202            .into_iter()
203            .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
204            .collect();
205
206        // Verify only the user sigs of certificates that were not cached already, since
207        // whenever we insert a certificate into the cache, it is already
208        // verified.
209        for cert in &certs {
210            self.verify_tx(cert.data())?;
211        }
212        batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
213        self.certificate_cache
214            .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
215        Ok(())
216    }
217
218    /// Verifies one cert asynchronously, in a batch.
219    pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
220        let cert_digest = cert.certificate_digest();
221        if self.certificate_cache.is_cached(&cert_digest) {
222            return Ok(VerifiedCertificate::new_unchecked(cert));
223        }
224        self.verify_tx(cert.data())?;
225        self.verify_cert_skip_cache(cert)
226            .await
227            .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
228    }
229
230    pub async fn multi_verify_certs(
231        &self,
232        certs: Vec<CertifiedTransaction>,
233    ) -> Vec<IotaResult<VerifiedCertificate>> {
234        // TODO: We could do better by pushing the all of `certs` into the verification
235        // queue at once, but that's significantly more complex.
236        let mut futures = Vec::with_capacity(certs.len());
237        for cert in certs {
238            futures.push(self.verify_cert(cert));
239        }
240        futures::future::join_all(futures).await
241    }
242
243    /// exposed as a public method for the benchmarks
244    pub async fn verify_cert_skip_cache(
245        &self,
246        cert: CertifiedTransaction,
247    ) -> IotaResult<VerifiedCertificate> {
248        // this is the only innocent error we are likely to encounter - filter it before
249        // we poison a whole batch.
250        if cert.auth_sig().epoch != self.committee.epoch() {
251            return Err(IotaError::WrongEpoch {
252                expected_epoch: self.committee.epoch(),
253                actual_epoch: cert.auth_sig().epoch,
254            });
255        }
256
257        self.verify_cert_inner(cert).await
258    }
259
260    async fn verify_cert_inner(
261        &self,
262        cert: CertifiedTransaction,
263    ) -> IotaResult<VerifiedCertificate> {
264        // Cancellation safety: we use parking_lot locks, which cannot be held across
265        // awaits. Therefore once the queue has been taken by a thread, it is
266        // guaranteed to process the queue and send all results before the
267        // future can be cancelled by the caller.
268        let (tx, rx) = oneshot::channel();
269        pin_mut!(rx);
270
271        let prev_id_or_buffer = {
272            let mut queue = self.queue.lock();
273            queue.push(tx, cert);
274            if queue.len() == queue.capacity() {
275                Either::Right(CertBuffer::take_and_replace(queue))
276            } else {
277                Either::Left(queue.id)
278            }
279        };
280        let prev_id = match prev_id_or_buffer {
281            Either::Left(prev_id) => prev_id,
282            Either::Right(buffer) => {
283                self.metrics.full_batches.inc();
284                self.process_queue(buffer).await;
285                // unwrap ok - process_queue will have sent the result already
286                return rx.try_recv().unwrap();
287            }
288        };
289
290        if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
291            // unwrap ok - tx cannot have been dropped without sending a result.
292            return res.unwrap();
293        }
294        self.metrics.timeouts.inc();
295
296        let buffer = {
297            let queue = self.queue.lock();
298            // check if another thread took the queue while we were re-acquiring lock.
299            if prev_id == queue.id {
300                debug_assert_ne!(queue.len(), queue.capacity());
301                Some(CertBuffer::take_and_replace(queue))
302            } else {
303                None
304            }
305        };
306
307        if let Some(buffer) = buffer {
308            self.metrics.partial_batches.inc();
309            self.process_queue(buffer).await;
310            // unwrap ok - process_queue will have sent the result already
311            return rx.try_recv().unwrap();
312        }
313
314        // unwrap ok - another thread took the queue while we were re-acquiring the lock
315        // and is guaranteed to process the queue immediately.
316        rx.await.unwrap()
317    }
318
319    async fn process_queue(&self, buffer: CertBuffer) {
320        let committee = self.committee.clone();
321        let metrics = self.metrics.clone();
322        let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
323        Handle::current()
324            .spawn_blocking(move || {
325                Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
326            })
327            .await
328            .expect("Spawn blocking should not fail");
329    }
330
331    fn process_queue_sync(
332        committee: Arc<Committee>,
333        metrics: Arc<SignatureVerifierMetrics>,
334        buffer: CertBuffer,
335        zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
336    ) {
337        let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
338
339        let results = batch_verify_certificates(
340            &committee,
341            &buffer.certs.iter().collect_vec(),
342            zklogin_inputs_cache,
343        );
344        izip!(
345            results.into_iter(),
346            buffer.certs.into_iter(),
347            buffer.senders.into_iter(),
348        )
349        .for_each(|(result, cert, tx)| {
350            tx.send(match result {
351                Ok(()) => {
352                    metrics.total_verified_certs.inc();
353                    Ok(VerifiedCertificate::new_unchecked(cert))
354                }
355                Err(e) => {
356                    metrics.total_failed_certs.inc();
357                    Err(e)
358                }
359            })
360            .ok();
361        });
362    }
363
364    /// Insert a JWK into the verifier state. Pre-existing entries for a given
365    /// JwkId will not be overwritten.
366    pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
367        let mut jwks = self.jwks.write();
368        match jwks.entry(jwk_id.clone()) {
369            im::hashmap::Entry::Occupied(_) => {
370                debug!("JWK with kid {:?} already exists", jwk_id);
371            }
372            im::hashmap::Entry::Vacant(entry) => {
373                debug!("inserting JWK with kid: {:?}", jwk_id);
374                entry.insert(jwk.clone());
375            }
376        }
377    }
378
379    pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
380        let jwks = self.jwks.read();
381        jwks.get(jwk_id) == Some(jwk)
382    }
383
384    pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
385        self.jwks.read().clone()
386    }
387
388    pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
389        self.signed_data_cache.is_verified(
390            signed_tx.full_message_digest(),
391            || {
392                let jwks = self.jwks.read().clone();
393                let verify_params = VerifyParams::new(
394                    jwks,
395                    self.zk_login_params.env,
396                    self.zk_login_params.accept_zklogin_in_multisig,
397                    self.zk_login_params.accept_passkey_in_multisig,
398                    self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
399                    self.zk_login_params.additional_multisig_checks,
400                );
401                verify_sender_signed_data_message_signatures(
402                    signed_tx,
403                    self.committee.epoch(),
404                    &verify_params,
405                    self.zklogin_inputs_cache.clone(),
406                )
407            },
408            || Ok(()),
409        )
410    }
411
412    pub fn clear_signature_cache(&self) {
413        self.certificate_cache.clear();
414        self.signed_data_cache.clear();
415        self.zklogin_inputs_cache.clear();
416    }
417}
418
419pub struct SignatureVerifierMetrics {
420    pub certificate_signatures_cache_hits: IntCounter,
421    pub certificate_signatures_cache_misses: IntCounter,
422    pub certificate_signatures_cache_evictions: IntCounter,
423    pub signed_data_cache_hits: IntCounter,
424    pub signed_data_cache_misses: IntCounter,
425    pub signed_data_cache_evictions: IntCounter,
426    pub zklogin_inputs_cache_hits: IntCounter,
427    pub zklogin_inputs_cache_misses: IntCounter,
428    pub zklogin_inputs_cache_evictions: IntCounter,
429    timeouts: IntCounter,
430    full_batches: IntCounter,
431    partial_batches: IntCounter,
432    total_verified_certs: IntCounter,
433    total_failed_certs: IntCounter,
434}
435
436impl SignatureVerifierMetrics {
437    pub fn new(registry: &Registry) -> Arc<Self> {
438        Arc::new(Self {
439            certificate_signatures_cache_hits: register_int_counter_with_registry!(
440                "certificate_signatures_cache_hits",
441                "Number of certificates which were known to be verified because of signature cache.",
442                registry
443            )
444            .unwrap(),
445            certificate_signatures_cache_misses: register_int_counter_with_registry!(
446                "certificate_signatures_cache_misses",
447                "Number of certificates which missed the signature cache",
448                registry
449            )
450            .unwrap(),
451            certificate_signatures_cache_evictions: register_int_counter_with_registry!(
452                "certificate_signatures_cache_evictions",
453                "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
454                registry
455            )
456            .unwrap(),
457            signed_data_cache_hits: register_int_counter_with_registry!(
458                "signed_data_cache_hits",
459                "Number of signed data which were known to be verified because of signature cache.",
460                registry
461            )
462            .unwrap(),
463            signed_data_cache_misses: register_int_counter_with_registry!(
464                "signed_data_cache_misses",
465                "Number of signed data which missed the signature cache.",
466                registry
467            )
468            .unwrap(),
469            signed_data_cache_evictions: register_int_counter_with_registry!(
470                "signed_data_cache_evictions",
471                "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
472                registry
473            )
474                .unwrap(),
475                zklogin_inputs_cache_hits: register_int_counter_with_registry!(
476                    "zklogin_inputs_cache_hits",
477                    "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
478                    registry
479                )
480                .unwrap(),
481                zklogin_inputs_cache_misses: register_int_counter_with_registry!(
482                    "zklogin_inputs_cache_misses",
483                    "Number of zklogin signatures which missed the zklogin inputs cache.",
484                    registry
485                )
486                .unwrap(),
487                zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
488                    "zklogin_inputs_cache_evictions",
489                    "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
490                    registry
491                )
492                .unwrap(),
493            timeouts: register_int_counter_with_registry!(
494                "async_batch_verifier_timeouts",
495                "Number of times batch verifier times out and verifies a partial batch",
496                registry
497            )
498            .unwrap(),
499            full_batches: register_int_counter_with_registry!(
500                "async_batch_verifier_full_batches",
501                "Number of times batch verifier verifies a full batch",
502                registry
503            )
504            .unwrap(),
505            partial_batches: register_int_counter_with_registry!(
506                "async_batch_verifier_partial_batches",
507                "Number of times batch verifier verifies a partial batch",
508                registry
509            )
510            .unwrap(),
511            total_verified_certs: register_int_counter_with_registry!(
512                "async_batch_verifier_total_verified_certs",
513                "Total number of certs batch verifier has verified",
514                registry
515            )
516            .unwrap(),
517            total_failed_certs: register_int_counter_with_registry!(
518                "async_batch_verifier_total_failed_certs",
519                "Total number of certs batch verifier has rejected",
520                registry
521            )
522            .unwrap(),
523        })
524    }
525}
526
527/// Verifies all certificates - if any fail return error.
528pub fn batch_verify_all_certificates_and_checkpoints(
529    committee: &Committee,
530    certs: &[&CertifiedTransaction],
531    checkpoints: &[&SignedCheckpointSummary],
532) -> IotaResult {
533    // certs.data() is assumed to be verified already by the caller.
534
535    for ckpt in checkpoints {
536        ckpt.data().verify_epoch(committee.epoch())?;
537    }
538
539    batch_verify(committee, certs, checkpoints)
540}
541
542/// Verifies certificates in batch mode, but returns a separate result for each
543/// cert.
544pub fn batch_verify_certificates(
545    committee: &Committee,
546    certs: &[&CertifiedTransaction],
547    zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
548) -> Vec<IotaResult> {
549    // certs.data() is assumed to be verified already by the caller.
550    let verify_params = VerifyParams::default();
551    match batch_verify(committee, certs, &[]) {
552        Ok(_) => vec![Ok(()); certs.len()],
553
554        // Verify one by one to find which certs were invalid.
555        Err(_) if certs.len() > 1 => certs
556            .iter()
557            // TODO: verify_signature currently checks the tx sig as well, which might be cached
558            // already.
559            .map(|c| {
560                c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
561            })
562            .collect(),
563
564        Err(e) => vec![Err(e)],
565    }
566}
567
568fn batch_verify(
569    committee: &Committee,
570    certs: &[&CertifiedTransaction],
571    checkpoints: &[&SignedCheckpointSummary],
572) -> IotaResult {
573    let mut obligation = VerificationObligation::default();
574
575    for cert in certs {
576        let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
577        cert.auth_sig()
578            .add_to_verification_obligation(committee, &mut obligation, idx)?;
579    }
580
581    for ckpt in checkpoints {
582        let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
583        ckpt.auth_sig()
584            .add_to_verification_obligation(committee, &mut obligation, idx)?;
585    }
586
587    obligation.verify_all()
588}