iota_core/
signature_verifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9    zk_login::{JWK, JwkId},
10    zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16    committee::Committee,
17    crypto::{AuthoritySignInfoTrait, VerificationObligation},
18    digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19    error::{IotaError, IotaResult},
20    message_envelope::Message,
21    messages_checkpoint::SignedCheckpointSummary,
22    signature::VerifyParams,
23    signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24    transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::izip;
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32    runtime::Handle,
33    sync::oneshot,
34    time::{Duration, timeout},
35};
36use tracing::debug;
37// Maximum amount of time we wait for a batch to fill up before verifying a
38// partial batch.
39const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41// Maximum size of batch to verify. Increasing this value will slightly improve
42// CPU utilization (batching starts to hit steeply diminishing marginal returns
43// around batch sizes of 16), at the cost of slightly increasing latency
44// (BATCH_TIMEOUT_MS will be hit more frequently if system is not heavily
45// loaded).
46const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51    certs: Vec<CertifiedTransaction>,
52    senders: Vec<Sender>,
53    id: u64,
54}
55
56impl CertBuffer {
57    fn new(capacity: usize) -> Self {
58        Self {
59            certs: Vec::with_capacity(capacity),
60            senders: Vec::with_capacity(capacity),
61            id: 0,
62        }
63    }
64
65    // Function consumes MutexGuard, therefore releasing the lock after mem swap is
66    // done
67    fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68        let this = &mut *guard;
69        let mut new = CertBuffer::new(this.capacity());
70        new.id = this.id + 1;
71        std::mem::swap(&mut new, this);
72        new
73    }
74
75    fn capacity(&self) -> usize {
76        debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77        self.certs.capacity()
78    }
79
80    fn len(&self) -> usize {
81        debug_assert_eq!(self.certs.len(), self.senders.len());
82        self.certs.len()
83    }
84
85    fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86        self.senders.push(tx);
87        self.certs.push(cert);
88    }
89}
90
91/// Verifies signatures in ways that faster than verifying each signature
92/// individually.
93/// - BLS signatures - caching and batch verification.
94/// - User signed data - caching.
95pub struct SignatureVerifier {
96    committee: Arc<Committee>,
97    certificate_cache: VerifiedDigestCache<CertificateDigest>,
98    signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99    zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101    /// Map from JwkId (iss, kid) to the fetched JWK for that key.
102    /// We use an immutable data structure because verification of ZKLogins may
103    /// be slow, so we don't want to pass a reference to the map to the
104    /// verify method, since that would lead to a lengthy critical section.
105    /// Instead, we use an immutable data structure which can be cloned very
106    /// cheaply.
107    jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109    /// Params that contains a list of supported providers for ZKLogin and the
110    /// environment (prod/test) the code runs in.
111    zk_login_params: ZkLoginParams,
112
113    queue: Mutex<CertBuffer>,
114    pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117/// Contains two parameters to pass in to verify a ZkLogin signature.
118#[derive(Clone)]
119struct ZkLoginParams {
120    /// The environment (prod/test) the code runs in. It decides which verifying
121    /// key to use in fastcrypto.
122    pub env: ZkLoginEnv,
123    // Flag to determine whether zkLogin inside multisig is accepted.
124    pub accept_zklogin_in_multisig: bool,
125    // Flag to determine whether passkey inside multisig is accepted.
126    pub accept_passkey_in_multisig: bool,
127    /// Value that sets the upper bound for max_epoch in zkLogin signature.
128    pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
129}
130
131impl SignatureVerifier {
132    pub fn new_with_batch_size(
133        committee: Arc<Committee>,
134        batch_size: usize,
135        metrics: Arc<SignatureVerifierMetrics>,
136        env: ZkLoginEnv,
137        accept_zklogin_in_multisig: bool,
138        accept_passkey_in_multisig: bool,
139        zklogin_max_epoch_upper_bound_delta: Option<u64>,
140    ) -> Self {
141        Self {
142            committee,
143            certificate_cache: VerifiedDigestCache::new(
144                metrics.certificate_signatures_cache_hits.clone(),
145                metrics.certificate_signatures_cache_misses.clone(),
146                metrics.certificate_signatures_cache_evictions.clone(),
147            ),
148            signed_data_cache: VerifiedDigestCache::new(
149                metrics.signed_data_cache_hits.clone(),
150                metrics.signed_data_cache_misses.clone(),
151                metrics.signed_data_cache_evictions.clone(),
152            ),
153            zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
154                metrics.zklogin_inputs_cache_hits.clone(),
155                metrics.zklogin_inputs_cache_misses.clone(),
156                metrics.zklogin_inputs_cache_evictions.clone(),
157            )),
158            jwks: Default::default(),
159            queue: Mutex::new(CertBuffer::new(batch_size)),
160            metrics,
161            zk_login_params: ZkLoginParams {
162                env,
163                accept_zklogin_in_multisig,
164                accept_passkey_in_multisig,
165                zklogin_max_epoch_upper_bound_delta,
166            },
167        }
168    }
169
170    pub fn new(
171        committee: Arc<Committee>,
172        metrics: Arc<SignatureVerifierMetrics>,
173        zklogin_env: ZkLoginEnv,
174        accept_zklogin_in_multisig: bool,
175        accept_passkey_in_multisig: bool,
176        zklogin_max_epoch_upper_bound_delta: Option<u64>,
177    ) -> Self {
178        Self::new_with_batch_size(
179            committee,
180            MAX_BATCH_SIZE,
181            metrics,
182            zklogin_env,
183            accept_zklogin_in_multisig,
184            accept_passkey_in_multisig,
185            zklogin_max_epoch_upper_bound_delta,
186        )
187    }
188
189    /// Verifies all certs, returns Ok only if all are valid.
190    pub fn verify_certs_and_checkpoints(
191        &self,
192        certs: Vec<CertifiedTransaction>,
193        checkpoints: Vec<SignedCheckpointSummary>,
194    ) -> IotaResult {
195        let certs: Vec<_> = certs
196            .into_iter()
197            .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
198            .collect();
199
200        // Verify only the user sigs of certificates that were not cached already, since
201        // whenever we insert a certificate into the cache, it is already
202        // verified.
203        for cert in &certs {
204            self.verify_tx(cert.data())?;
205        }
206        batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
207        self.certificate_cache
208            .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
209        Ok(())
210    }
211
212    /// Verifies one cert asynchronously, in a batch.
213    pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
214        let cert_digest = cert.certificate_digest();
215        if self.certificate_cache.is_cached(&cert_digest) {
216            return Ok(VerifiedCertificate::new_unchecked(cert));
217        }
218        self.verify_tx(cert.data())?;
219        self.verify_cert_skip_cache(cert)
220            .await
221            .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
222    }
223
224    pub async fn multi_verify_certs(
225        &self,
226        certs: Vec<CertifiedTransaction>,
227    ) -> Vec<IotaResult<VerifiedCertificate>> {
228        // TODO: We could do better by pushing the all of `certs` into the verification
229        // queue at once, but that's significantly more complex.
230        let mut futures = Vec::with_capacity(certs.len());
231        for cert in certs {
232            futures.push(self.verify_cert(cert));
233        }
234        futures::future::join_all(futures).await
235    }
236
237    /// exposed as a public method for the benchmarks
238    pub async fn verify_cert_skip_cache(
239        &self,
240        cert: CertifiedTransaction,
241    ) -> IotaResult<VerifiedCertificate> {
242        // this is the only innocent error we are likely to encounter - filter it before
243        // we poison a whole batch.
244        if cert.auth_sig().epoch != self.committee.epoch() {
245            return Err(IotaError::WrongEpoch {
246                expected_epoch: self.committee.epoch(),
247                actual_epoch: cert.auth_sig().epoch,
248            });
249        }
250
251        self.verify_cert_inner(cert).await
252    }
253
254    async fn verify_cert_inner(
255        &self,
256        cert: CertifiedTransaction,
257    ) -> IotaResult<VerifiedCertificate> {
258        // Cancellation safety: we use parking_lot locks, which cannot be held across
259        // awaits. Therefore once the queue has been taken by a thread, it is
260        // guaranteed to process the queue and send all results before the
261        // future can be cancelled by the caller.
262        let (tx, rx) = oneshot::channel();
263        pin_mut!(rx);
264
265        let prev_id_or_buffer = {
266            let mut queue = self.queue.lock();
267            queue.push(tx, cert);
268            if queue.len() == queue.capacity() {
269                Either::Right(CertBuffer::take_and_replace(queue))
270            } else {
271                Either::Left(queue.id)
272            }
273        };
274        let prev_id = match prev_id_or_buffer {
275            Either::Left(prev_id) => prev_id,
276            Either::Right(buffer) => {
277                self.metrics.full_batches.inc();
278                self.process_queue(buffer).await;
279                // unwrap ok - process_queue will have sent the result already
280                return rx.try_recv().unwrap();
281            }
282        };
283
284        if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
285            // unwrap ok - tx cannot have been dropped without sending a result.
286            return res.unwrap();
287        }
288        self.metrics.timeouts.inc();
289
290        let buffer = {
291            let queue = self.queue.lock();
292            // check if another thread took the queue while we were re-acquiring lock.
293            if prev_id == queue.id {
294                debug_assert_ne!(queue.len(), queue.capacity());
295                Some(CertBuffer::take_and_replace(queue))
296            } else {
297                None
298            }
299        };
300
301        if let Some(buffer) = buffer {
302            self.metrics.partial_batches.inc();
303            self.process_queue(buffer).await;
304            // unwrap ok - process_queue will have sent the result already
305            return rx.try_recv().unwrap();
306        }
307
308        // unwrap ok - another thread took the queue while we were re-acquiring the lock
309        // and is guaranteed to process the queue immediately.
310        rx.await.unwrap()
311    }
312
313    async fn process_queue(&self, buffer: CertBuffer) {
314        let committee = self.committee.clone();
315        let metrics = self.metrics.clone();
316        let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
317        Handle::current()
318            .spawn_blocking(move || {
319                Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
320            })
321            .await
322            .expect("Spawn blocking should not fail");
323    }
324
325    fn process_queue_sync(
326        committee: Arc<Committee>,
327        metrics: Arc<SignatureVerifierMetrics>,
328        buffer: CertBuffer,
329        zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
330    ) {
331        let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
332
333        let results = batch_verify_certificates(&committee, &buffer.certs, zklogin_inputs_cache);
334        izip!(
335            results.into_iter(),
336            buffer.certs.into_iter(),
337            buffer.senders.into_iter(),
338        )
339        .for_each(|(result, cert, tx)| {
340            tx.send(match result {
341                Ok(()) => {
342                    metrics.total_verified_certs.inc();
343                    Ok(VerifiedCertificate::new_unchecked(cert))
344                }
345                Err(e) => {
346                    metrics.total_failed_certs.inc();
347                    Err(e)
348                }
349            })
350            .ok();
351        });
352    }
353
354    /// Insert a JWK into the verifier state. Pre-existing entries for a given
355    /// JwkId will not be overwritten.
356    pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
357        let mut jwks = self.jwks.write();
358        match jwks.entry(jwk_id.clone()) {
359            im::hashmap::Entry::Occupied(_) => {
360                debug!("JWK with kid {:?} already exists", jwk_id);
361            }
362            im::hashmap::Entry::Vacant(entry) => {
363                debug!("inserting JWK with kid: {:?}", jwk_id);
364                entry.insert(jwk.clone());
365            }
366        }
367    }
368
369    pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
370        let jwks = self.jwks.read();
371        jwks.get(jwk_id) == Some(jwk)
372    }
373
374    pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
375        self.jwks.read().clone()
376    }
377
378    pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
379        self.signed_data_cache.is_verified(
380            signed_tx.full_message_digest(),
381            || {
382                let jwks = self.jwks.read().clone();
383                let verify_params = VerifyParams::new(
384                    jwks,
385                    self.zk_login_params.env,
386                    self.zk_login_params.accept_zklogin_in_multisig,
387                    self.zk_login_params.accept_passkey_in_multisig,
388                    self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
389                );
390                verify_sender_signed_data_message_signatures(
391                    signed_tx,
392                    self.committee.epoch(),
393                    &verify_params,
394                    self.zklogin_inputs_cache.clone(),
395                )
396            },
397            || Ok(()),
398        )
399    }
400
401    pub fn clear_signature_cache(&self) {
402        self.certificate_cache.clear();
403        self.signed_data_cache.clear();
404        self.zklogin_inputs_cache.clear();
405    }
406}
407
408pub struct SignatureVerifierMetrics {
409    pub certificate_signatures_cache_hits: IntCounter,
410    pub certificate_signatures_cache_misses: IntCounter,
411    pub certificate_signatures_cache_evictions: IntCounter,
412    pub signed_data_cache_hits: IntCounter,
413    pub signed_data_cache_misses: IntCounter,
414    pub signed_data_cache_evictions: IntCounter,
415    pub zklogin_inputs_cache_hits: IntCounter,
416    pub zklogin_inputs_cache_misses: IntCounter,
417    pub zklogin_inputs_cache_evictions: IntCounter,
418    timeouts: IntCounter,
419    full_batches: IntCounter,
420    partial_batches: IntCounter,
421    total_verified_certs: IntCounter,
422    total_failed_certs: IntCounter,
423}
424
425impl SignatureVerifierMetrics {
426    pub fn new(registry: &Registry) -> Arc<Self> {
427        Arc::new(Self {
428            certificate_signatures_cache_hits: register_int_counter_with_registry!(
429                "certificate_signatures_cache_hits",
430                "Number of certificates which were known to be verified because of signature cache.",
431                registry
432            )
433            .unwrap(),
434            certificate_signatures_cache_misses: register_int_counter_with_registry!(
435                "certificate_signatures_cache_misses",
436                "Number of certificates which missed the signature cache",
437                registry
438            )
439            .unwrap(),
440            certificate_signatures_cache_evictions: register_int_counter_with_registry!(
441                "certificate_signatures_cache_evictions",
442                "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
443                registry
444            )
445            .unwrap(),
446            signed_data_cache_hits: register_int_counter_with_registry!(
447                "signed_data_cache_hits",
448                "Number of signed data which were known to be verified because of signature cache.",
449                registry
450            )
451            .unwrap(),
452            signed_data_cache_misses: register_int_counter_with_registry!(
453                "signed_data_cache_misses",
454                "Number of signed data which missed the signature cache.",
455                registry
456            )
457            .unwrap(),
458            signed_data_cache_evictions: register_int_counter_with_registry!(
459                "signed_data_cache_evictions",
460                "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
461                registry
462            )
463                .unwrap(),
464                zklogin_inputs_cache_hits: register_int_counter_with_registry!(
465                    "zklogin_inputs_cache_hits",
466                    "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
467                    registry
468                )
469                .unwrap(),
470                zklogin_inputs_cache_misses: register_int_counter_with_registry!(
471                    "zklogin_inputs_cache_misses",
472                    "Number of zklogin signatures which missed the zklogin inputs cache.",
473                    registry
474                )
475                .unwrap(),
476                zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
477                    "zklogin_inputs_cache_evictions",
478                    "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
479                    registry
480                )
481                .unwrap(),
482            timeouts: register_int_counter_with_registry!(
483                "async_batch_verifier_timeouts",
484                "Number of times batch verifier times out and verifies a partial batch",
485                registry
486            )
487            .unwrap(),
488            full_batches: register_int_counter_with_registry!(
489                "async_batch_verifier_full_batches",
490                "Number of times batch verifier verifies a full batch",
491                registry
492            )
493            .unwrap(),
494            partial_batches: register_int_counter_with_registry!(
495                "async_batch_verifier_partial_batches",
496                "Number of times batch verifier verifies a partial batch",
497                registry
498            )
499            .unwrap(),
500            total_verified_certs: register_int_counter_with_registry!(
501                "async_batch_verifier_total_verified_certs",
502                "Total number of certs batch verifier has verified",
503                registry
504            )
505            .unwrap(),
506            total_failed_certs: register_int_counter_with_registry!(
507                "async_batch_verifier_total_failed_certs",
508                "Total number of certs batch verifier has rejected",
509                registry
510            )
511            .unwrap(),
512        })
513    }
514}
515
516/// Verifies all certificates - if any fail return error.
517pub fn batch_verify_all_certificates_and_checkpoints(
518    committee: &Committee,
519    certs: &[CertifiedTransaction],
520    checkpoints: &[SignedCheckpointSummary],
521) -> IotaResult {
522    // certs.data() is assumed to be verified already by the caller.
523
524    for ckpt in checkpoints {
525        ckpt.data().verify_epoch(committee.epoch())?;
526    }
527
528    batch_verify(committee, certs, checkpoints)
529}
530
531/// Verifies certificates in batch mode, but returns a separate result for each
532/// cert.
533pub fn batch_verify_certificates(
534    committee: &Committee,
535    certs: &[CertifiedTransaction],
536    zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
537) -> Vec<IotaResult> {
538    // certs.data() is assumed to be verified already by the caller.
539    let verify_params = VerifyParams::default();
540    match batch_verify(committee, certs, &[]) {
541        Ok(_) => vec![Ok(()); certs.len()],
542
543        // Verify one by one to find which certs were invalid.
544        Err(_) if certs.len() > 1 => certs
545            .iter()
546            // TODO: verify_signature currently checks the tx sig as well, which might be cached
547            // already.
548            .map(|c| {
549                c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
550            })
551            .collect(),
552
553        Err(e) => vec![Err(e)],
554    }
555}
556
557fn batch_verify(
558    committee: &Committee,
559    certs: &[CertifiedTransaction],
560    checkpoints: &[SignedCheckpointSummary],
561) -> IotaResult {
562    let mut obligation = VerificationObligation::default();
563
564    for cert in certs {
565        let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
566        cert.auth_sig()
567            .add_to_verification_obligation(committee, &mut obligation, idx)?;
568    }
569
570    for ckpt in checkpoints {
571        let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
572        ckpt.auth_sig()
573            .add_to_verification_obligation(committee, &mut obligation, idx)?;
574    }
575
576    obligation.verify_all()
577}