iota_core/
signature_verifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9    zk_login::{JWK, JwkId},
10    zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16    committee::Committee,
17    crypto::{AuthoritySignInfoTrait, VerificationObligation},
18    digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19    error::{IotaError, IotaResult},
20    message_envelope::Message,
21    messages_checkpoint::SignedCheckpointSummary,
22    signature::VerifyParams,
23    signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24    transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::izip;
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32    runtime::Handle,
33    sync::oneshot,
34    time::{Duration, timeout},
35};
36use tracing::debug;
37// Maximum amount of time we wait for a batch to fill up before verifying a
38// partial batch.
39const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41// Maximum size of batch to verify. Increasing this value will slightly improve
42// CPU utilization (batching starts to hit steeply diminishing marginal returns
43// around batch sizes of 16), at the cost of slightly increasing latency
44// (BATCH_TIMEOUT_MS will be hit more frequently if system is not heavily
45// loaded).
46const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51    certs: Vec<CertifiedTransaction>,
52    senders: Vec<Sender>,
53    id: u64,
54}
55
56impl CertBuffer {
57    fn new(capacity: usize) -> Self {
58        Self {
59            certs: Vec::with_capacity(capacity),
60            senders: Vec::with_capacity(capacity),
61            id: 0,
62        }
63    }
64
65    // Function consumes MutexGuard, therefore releasing the lock after mem swap is
66    // done
67    fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68        let this = &mut *guard;
69        let mut new = CertBuffer::new(this.capacity());
70        new.id = this.id + 1;
71        std::mem::swap(&mut new, this);
72        new
73    }
74
75    fn capacity(&self) -> usize {
76        debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77        self.certs.capacity()
78    }
79
80    fn len(&self) -> usize {
81        debug_assert_eq!(self.certs.len(), self.senders.len());
82        self.certs.len()
83    }
84
85    fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86        self.senders.push(tx);
87        self.certs.push(cert);
88    }
89}
90
91/// Verifies signatures in ways that faster than verifying each signature
92/// individually.
93/// - BLS signatures - caching and batch verification.
94/// - User signed data - caching.
95pub struct SignatureVerifier {
96    committee: Arc<Committee>,
97    certificate_cache: VerifiedDigestCache<CertificateDigest>,
98    signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99    zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101    /// Map from JwkId (iss, kid) to the fetched JWK for that key.
102    /// We use an immutable data structure because verification of ZKLogins may
103    /// be slow, so we don't want to pass a reference to the map to the
104    /// verify method, since that would lead to a lengthy critical section.
105    /// Instead, we use an immutable data structure which can be cloned very
106    /// cheaply.
107    jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109    /// Params that contains a list of supported providers for ZKLogin and the
110    /// environment (prod/test) the code runs in.
111    zk_login_params: ZkLoginParams,
112
113    queue: Mutex<CertBuffer>,
114    pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117/// Contains two parameters to pass in to verify a ZkLogin signature.
118#[derive(Clone)]
119struct ZkLoginParams {
120    /// The environment (prod/test) the code runs in. It decides which verifying
121    /// key to use in fastcrypto.
122    pub env: ZkLoginEnv,
123    // Flag to determine whether zkLogin inside multisig is accepted.
124    pub accept_zklogin_in_multisig: bool,
125    /// Value that sets the upper bound for max_epoch in zkLogin signature.
126    pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
127}
128
129impl SignatureVerifier {
130    pub fn new_with_batch_size(
131        committee: Arc<Committee>,
132        batch_size: usize,
133        metrics: Arc<SignatureVerifierMetrics>,
134        env: ZkLoginEnv,
135        accept_zklogin_in_multisig: bool,
136        zklogin_max_epoch_upper_bound_delta: Option<u64>,
137    ) -> Self {
138        Self {
139            committee,
140            certificate_cache: VerifiedDigestCache::new(
141                metrics.certificate_signatures_cache_hits.clone(),
142                metrics.certificate_signatures_cache_misses.clone(),
143                metrics.certificate_signatures_cache_evictions.clone(),
144            ),
145            signed_data_cache: VerifiedDigestCache::new(
146                metrics.signed_data_cache_hits.clone(),
147                metrics.signed_data_cache_misses.clone(),
148                metrics.signed_data_cache_evictions.clone(),
149            ),
150            zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
151                metrics.zklogin_inputs_cache_hits.clone(),
152                metrics.zklogin_inputs_cache_misses.clone(),
153                metrics.zklogin_inputs_cache_evictions.clone(),
154            )),
155            jwks: Default::default(),
156            queue: Mutex::new(CertBuffer::new(batch_size)),
157            metrics,
158            zk_login_params: ZkLoginParams {
159                env,
160                accept_zklogin_in_multisig,
161                zklogin_max_epoch_upper_bound_delta,
162            },
163        }
164    }
165
166    pub fn new(
167        committee: Arc<Committee>,
168        metrics: Arc<SignatureVerifierMetrics>,
169        zklogin_env: ZkLoginEnv,
170        accept_zklogin_in_multisig: bool,
171        zklogin_max_epoch_upper_bound_delta: Option<u64>,
172    ) -> Self {
173        Self::new_with_batch_size(
174            committee,
175            MAX_BATCH_SIZE,
176            metrics,
177            zklogin_env,
178            accept_zklogin_in_multisig,
179            zklogin_max_epoch_upper_bound_delta,
180        )
181    }
182
183    /// Verifies all certs, returns Ok only if all are valid.
184    pub fn verify_certs_and_checkpoints(
185        &self,
186        certs: Vec<CertifiedTransaction>,
187        checkpoints: Vec<SignedCheckpointSummary>,
188    ) -> IotaResult {
189        let certs: Vec<_> = certs
190            .into_iter()
191            .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
192            .collect();
193
194        // Verify only the user sigs of certificates that were not cached already, since
195        // whenever we insert a certificate into the cache, it is already
196        // verified.
197        for cert in &certs {
198            self.verify_tx(cert.data())?;
199        }
200        batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
201        self.certificate_cache
202            .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
203        Ok(())
204    }
205
206    /// Verifies one cert asynchronously, in a batch.
207    pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
208        let cert_digest = cert.certificate_digest();
209        if self.certificate_cache.is_cached(&cert_digest) {
210            return Ok(VerifiedCertificate::new_unchecked(cert));
211        }
212        self.verify_tx(cert.data())?;
213        self.verify_cert_skip_cache(cert)
214            .await
215            .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
216    }
217
218    pub async fn multi_verify_certs(
219        &self,
220        certs: Vec<CertifiedTransaction>,
221    ) -> Vec<IotaResult<VerifiedCertificate>> {
222        // TODO: We could do better by pushing the all of `certs` into the verification
223        // queue at once, but that's significantly more complex.
224        let mut futures = Vec::with_capacity(certs.len());
225        for cert in certs {
226            futures.push(self.verify_cert(cert));
227        }
228        futures::future::join_all(futures).await
229    }
230
231    /// exposed as a public method for the benchmarks
232    pub async fn verify_cert_skip_cache(
233        &self,
234        cert: CertifiedTransaction,
235    ) -> IotaResult<VerifiedCertificate> {
236        // this is the only innocent error we are likely to encounter - filter it before
237        // we poison a whole batch.
238        if cert.auth_sig().epoch != self.committee.epoch() {
239            return Err(IotaError::WrongEpoch {
240                expected_epoch: self.committee.epoch(),
241                actual_epoch: cert.auth_sig().epoch,
242            });
243        }
244
245        self.verify_cert_inner(cert).await
246    }
247
248    async fn verify_cert_inner(
249        &self,
250        cert: CertifiedTransaction,
251    ) -> IotaResult<VerifiedCertificate> {
252        // Cancellation safety: we use parking_lot locks, which cannot be held across
253        // awaits. Therefore once the queue has been taken by a thread, it is
254        // guaranteed to process the queue and send all results before the
255        // future can be cancelled by the caller.
256        let (tx, rx) = oneshot::channel();
257        pin_mut!(rx);
258
259        let prev_id_or_buffer = {
260            let mut queue = self.queue.lock();
261            queue.push(tx, cert);
262            if queue.len() == queue.capacity() {
263                Either::Right(CertBuffer::take_and_replace(queue))
264            } else {
265                Either::Left(queue.id)
266            }
267        };
268        let prev_id = match prev_id_or_buffer {
269            Either::Left(prev_id) => prev_id,
270            Either::Right(buffer) => {
271                self.metrics.full_batches.inc();
272                self.process_queue(buffer).await;
273                // unwrap ok - process_queue will have sent the result already
274                return rx.try_recv().unwrap();
275            }
276        };
277
278        if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
279            // unwrap ok - tx cannot have been dropped without sending a result.
280            return res.unwrap();
281        }
282        self.metrics.timeouts.inc();
283
284        let buffer = {
285            let queue = self.queue.lock();
286            // check if another thread took the queue while we were re-acquiring lock.
287            if prev_id == queue.id {
288                debug_assert_ne!(queue.len(), queue.capacity());
289                Some(CertBuffer::take_and_replace(queue))
290            } else {
291                None
292            }
293        };
294
295        if let Some(buffer) = buffer {
296            self.metrics.partial_batches.inc();
297            self.process_queue(buffer).await;
298            // unwrap ok - process_queue will have sent the result already
299            return rx.try_recv().unwrap();
300        }
301
302        // unwrap ok - another thread took the queue while we were re-acquiring the lock
303        // and is guaranteed to process the queue immediately.
304        rx.await.unwrap()
305    }
306
307    async fn process_queue(&self, buffer: CertBuffer) {
308        let committee = self.committee.clone();
309        let metrics = self.metrics.clone();
310        let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
311        Handle::current()
312            .spawn_blocking(move || {
313                Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
314            })
315            .await
316            .expect("Spawn blocking should not fail");
317    }
318
319    fn process_queue_sync(
320        committee: Arc<Committee>,
321        metrics: Arc<SignatureVerifierMetrics>,
322        buffer: CertBuffer,
323        zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
324    ) {
325        let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
326
327        let results = batch_verify_certificates(&committee, &buffer.certs, zklogin_inputs_cache);
328        izip!(
329            results.into_iter(),
330            buffer.certs.into_iter(),
331            buffer.senders.into_iter(),
332        )
333        .for_each(|(result, cert, tx)| {
334            tx.send(match result {
335                Ok(()) => {
336                    metrics.total_verified_certs.inc();
337                    Ok(VerifiedCertificate::new_unchecked(cert))
338                }
339                Err(e) => {
340                    metrics.total_failed_certs.inc();
341                    Err(e)
342                }
343            })
344            .ok();
345        });
346    }
347
348    /// Insert a JWK into the verifier state. Pre-existing entries for a given
349    /// JwkId will not be overwritten.
350    pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
351        let mut jwks = self.jwks.write();
352        match jwks.entry(jwk_id.clone()) {
353            im::hashmap::Entry::Occupied(_) => {
354                debug!("JWK with kid {:?} already exists", jwk_id);
355            }
356            im::hashmap::Entry::Vacant(entry) => {
357                debug!("inserting JWK with kid: {:?}", jwk_id);
358                entry.insert(jwk.clone());
359            }
360        }
361    }
362
363    pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
364        let jwks = self.jwks.read();
365        jwks.get(jwk_id) == Some(jwk)
366    }
367
368    pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
369        self.jwks.read().clone()
370    }
371
372    pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
373        self.signed_data_cache.is_verified(
374            signed_tx.full_message_digest(),
375            || {
376                let jwks = self.jwks.read().clone();
377                let verify_params = VerifyParams::new(
378                    jwks,
379                    self.zk_login_params.env,
380                    self.zk_login_params.accept_zklogin_in_multisig,
381                    self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
382                );
383                verify_sender_signed_data_message_signatures(
384                    signed_tx,
385                    self.committee.epoch(),
386                    &verify_params,
387                    self.zklogin_inputs_cache.clone(),
388                )
389            },
390            || Ok(()),
391        )
392    }
393
394    pub fn clear_signature_cache(&self) {
395        self.certificate_cache.clear();
396        self.signed_data_cache.clear();
397        self.zklogin_inputs_cache.clear();
398    }
399}
400
401pub struct SignatureVerifierMetrics {
402    pub certificate_signatures_cache_hits: IntCounter,
403    pub certificate_signatures_cache_misses: IntCounter,
404    pub certificate_signatures_cache_evictions: IntCounter,
405    pub signed_data_cache_hits: IntCounter,
406    pub signed_data_cache_misses: IntCounter,
407    pub signed_data_cache_evictions: IntCounter,
408    pub zklogin_inputs_cache_hits: IntCounter,
409    pub zklogin_inputs_cache_misses: IntCounter,
410    pub zklogin_inputs_cache_evictions: IntCounter,
411    timeouts: IntCounter,
412    full_batches: IntCounter,
413    partial_batches: IntCounter,
414    total_verified_certs: IntCounter,
415    total_failed_certs: IntCounter,
416}
417
418impl SignatureVerifierMetrics {
419    pub fn new(registry: &Registry) -> Arc<Self> {
420        Arc::new(Self {
421            certificate_signatures_cache_hits: register_int_counter_with_registry!(
422                "certificate_signatures_cache_hits",
423                "Number of certificates which were known to be verified because of signature cache.",
424                registry
425            )
426            .unwrap(),
427            certificate_signatures_cache_misses: register_int_counter_with_registry!(
428                "certificate_signatures_cache_misses",
429                "Number of certificates which missed the signature cache",
430                registry
431            )
432            .unwrap(),
433            certificate_signatures_cache_evictions: register_int_counter_with_registry!(
434                "certificate_signatures_cache_evictions",
435                "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
436                registry
437            )
438            .unwrap(),
439            signed_data_cache_hits: register_int_counter_with_registry!(
440                "signed_data_cache_hits",
441                "Number of signed data which were known to be verified because of signature cache.",
442                registry
443            )
444            .unwrap(),
445            signed_data_cache_misses: register_int_counter_with_registry!(
446                "signed_data_cache_misses",
447                "Number of signed data which missed the signature cache.",
448                registry
449            )
450            .unwrap(),
451            signed_data_cache_evictions: register_int_counter_with_registry!(
452                "signed_data_cache_evictions",
453                "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
454                registry
455            )
456                .unwrap(),
457                zklogin_inputs_cache_hits: register_int_counter_with_registry!(
458                    "zklogin_inputs_cache_hits",
459                    "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
460                    registry
461                )
462                .unwrap(),
463                zklogin_inputs_cache_misses: register_int_counter_with_registry!(
464                    "zklogin_inputs_cache_misses",
465                    "Number of zklogin signatures which missed the zklogin inputs cache.",
466                    registry
467                )
468                .unwrap(),
469                zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
470                    "zklogin_inputs_cache_evictions",
471                    "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
472                    registry
473                )
474                .unwrap(),
475            timeouts: register_int_counter_with_registry!(
476                "async_batch_verifier_timeouts",
477                "Number of times batch verifier times out and verifies a partial batch",
478                registry
479            )
480            .unwrap(),
481            full_batches: register_int_counter_with_registry!(
482                "async_batch_verifier_full_batches",
483                "Number of times batch verifier verifies a full batch",
484                registry
485            )
486            .unwrap(),
487            partial_batches: register_int_counter_with_registry!(
488                "async_batch_verifier_partial_batches",
489                "Number of times batch verifier verifies a partial batch",
490                registry
491            )
492            .unwrap(),
493            total_verified_certs: register_int_counter_with_registry!(
494                "async_batch_verifier_total_verified_certs",
495                "Total number of certs batch verifier has verified",
496                registry
497            )
498            .unwrap(),
499            total_failed_certs: register_int_counter_with_registry!(
500                "async_batch_verifier_total_failed_certs",
501                "Total number of certs batch verifier has rejected",
502                registry
503            )
504            .unwrap(),
505        })
506    }
507}
508
509/// Verifies all certificates - if any fail return error.
510pub fn batch_verify_all_certificates_and_checkpoints(
511    committee: &Committee,
512    certs: &[CertifiedTransaction],
513    checkpoints: &[SignedCheckpointSummary],
514) -> IotaResult {
515    // certs.data() is assumed to be verified already by the caller.
516
517    for ckpt in checkpoints {
518        ckpt.data().verify_epoch(committee.epoch())?;
519    }
520
521    batch_verify(committee, certs, checkpoints)
522}
523
524/// Verifies certificates in batch mode, but returns a separate result for each
525/// cert.
526pub fn batch_verify_certificates(
527    committee: &Committee,
528    certs: &[CertifiedTransaction],
529    zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
530) -> Vec<IotaResult> {
531    // certs.data() is assumed to be verified already by the caller.
532    let verify_params = VerifyParams::default();
533    match batch_verify(committee, certs, &[]) {
534        Ok(_) => vec![Ok(()); certs.len()],
535
536        // Verify one by one to find which certs were invalid.
537        Err(_) if certs.len() > 1 => certs
538            .iter()
539            // TODO: verify_signature currently checks the tx sig as well, which might be cached
540            // already.
541            .map(|c| {
542                c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
543            })
544            .collect(),
545
546        Err(e) => vec![Err(e)],
547    }
548}
549
550fn batch_verify(
551    committee: &Committee,
552    certs: &[CertifiedTransaction],
553    checkpoints: &[SignedCheckpointSummary],
554) -> IotaResult {
555    let mut obligation = VerificationObligation::default();
556
557    for cert in certs {
558        let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
559        cert.auth_sig()
560            .add_to_verification_obligation(committee, &mut obligation, idx)?;
561    }
562
563    for ckpt in checkpoints {
564        let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
565        ckpt.auth_sig()
566            .add_to_verification_obligation(committee, &mut obligation, idx)?;
567    }
568
569    obligation.verify_all()
570}