1use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9 zk_login::{JWK, JwkId},
10 zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16 committee::Committee,
17 crypto::{AuthoritySignInfoTrait, VerificationObligation},
18 digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19 error::{IotaError, IotaResult},
20 message_envelope::Message,
21 messages_checkpoint::SignedCheckpointSummary,
22 signature::VerifyParams,
23 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::{Itertools as _, izip};
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32 runtime::Handle,
33 sync::oneshot,
34 time::{Duration, timeout},
35};
36use tracing::debug;
37const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51 certs: Vec<CertifiedTransaction>,
52 senders: Vec<Sender>,
53 id: u64,
54}
55
56impl CertBuffer {
57 fn new(capacity: usize) -> Self {
58 Self {
59 certs: Vec::with_capacity(capacity),
60 senders: Vec::with_capacity(capacity),
61 id: 0,
62 }
63 }
64
65 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68 let this = &mut *guard;
69 let mut new = CertBuffer::new(this.capacity());
70 new.id = this.id + 1;
71 std::mem::swap(&mut new, this);
72 new
73 }
74
75 fn capacity(&self) -> usize {
76 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77 self.certs.capacity()
78 }
79
80 fn len(&self) -> usize {
81 debug_assert_eq!(self.certs.len(), self.senders.len());
82 self.certs.len()
83 }
84
85 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86 self.senders.push(tx);
87 self.certs.push(cert);
88 }
89}
90
91pub struct SignatureVerifier {
96 committee: Arc<Committee>,
97 certificate_cache: VerifiedDigestCache<CertificateDigest>,
98 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101 jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109 zk_login_params: ZkLoginParams,
112
113 queue: Mutex<CertBuffer>,
114 pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117#[derive(Clone)]
119struct ZkLoginParams {
120 pub env: ZkLoginEnv,
123 pub accept_zklogin_in_multisig: bool,
125 pub accept_passkey_in_multisig: bool,
127 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
129 pub additional_multisig_checks: bool,
131}
132
133impl SignatureVerifier {
134 pub fn new_with_batch_size(
135 committee: Arc<Committee>,
136 batch_size: usize,
137 metrics: Arc<SignatureVerifierMetrics>,
138 env: ZkLoginEnv,
139 accept_zklogin_in_multisig: bool,
140 accept_passkey_in_multisig: bool,
141 zklogin_max_epoch_upper_bound_delta: Option<u64>,
142 additional_multisig_checks: bool,
143 ) -> Self {
144 Self {
145 committee,
146 certificate_cache: VerifiedDigestCache::new(
147 metrics.certificate_signatures_cache_hits.clone(),
148 metrics.certificate_signatures_cache_misses.clone(),
149 metrics.certificate_signatures_cache_evictions.clone(),
150 ),
151 signed_data_cache: VerifiedDigestCache::new(
152 metrics.signed_data_cache_hits.clone(),
153 metrics.signed_data_cache_misses.clone(),
154 metrics.signed_data_cache_evictions.clone(),
155 ),
156 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
157 metrics.zklogin_inputs_cache_hits.clone(),
158 metrics.zklogin_inputs_cache_misses.clone(),
159 metrics.zklogin_inputs_cache_evictions.clone(),
160 )),
161 jwks: Default::default(),
162 queue: Mutex::new(CertBuffer::new(batch_size)),
163 metrics,
164 zk_login_params: ZkLoginParams {
165 env,
166 accept_zklogin_in_multisig,
167 accept_passkey_in_multisig,
168 zklogin_max_epoch_upper_bound_delta,
169 additional_multisig_checks,
170 },
171 }
172 }
173
174 pub fn new(
175 committee: Arc<Committee>,
176 metrics: Arc<SignatureVerifierMetrics>,
177 zklogin_env: ZkLoginEnv,
178 accept_zklogin_in_multisig: bool,
179 accept_passkey_in_multisig: bool,
180 zklogin_max_epoch_upper_bound_delta: Option<u64>,
181 additional_multisig_checks: bool,
182 ) -> Self {
183 Self::new_with_batch_size(
184 committee,
185 MAX_BATCH_SIZE,
186 metrics,
187 zklogin_env,
188 accept_zklogin_in_multisig,
189 accept_passkey_in_multisig,
190 zklogin_max_epoch_upper_bound_delta,
191 additional_multisig_checks,
192 )
193 }
194
195 pub fn verify_certs_and_checkpoints(
197 &self,
198 certs: Vec<&CertifiedTransaction>,
199 checkpoints: Vec<&SignedCheckpointSummary>,
200 ) -> IotaResult {
201 let certs: Vec<_> = certs
202 .into_iter()
203 .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
204 .collect();
205
206 for cert in &certs {
210 self.verify_tx(cert.data())?;
211 }
212 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
213 self.certificate_cache
214 .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
215 Ok(())
216 }
217
218 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
220 let cert_digest = cert.certificate_digest();
221 if self.certificate_cache.is_cached(&cert_digest) {
222 return Ok(VerifiedCertificate::new_unchecked(cert));
223 }
224 self.verify_tx(cert.data())?;
225 self.verify_cert_skip_cache(cert)
226 .await
227 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
228 }
229
230 pub async fn multi_verify_certs(
231 &self,
232 certs: Vec<CertifiedTransaction>,
233 ) -> Vec<IotaResult<VerifiedCertificate>> {
234 let mut futures = Vec::with_capacity(certs.len());
237 for cert in certs {
238 futures.push(self.verify_cert(cert));
239 }
240 futures::future::join_all(futures).await
241 }
242
243 pub async fn verify_cert_skip_cache(
245 &self,
246 cert: CertifiedTransaction,
247 ) -> IotaResult<VerifiedCertificate> {
248 if cert.auth_sig().epoch != self.committee.epoch() {
251 return Err(IotaError::WrongEpoch {
252 expected_epoch: self.committee.epoch(),
253 actual_epoch: cert.auth_sig().epoch,
254 });
255 }
256
257 self.verify_cert_inner(cert).await
258 }
259
260 async fn verify_cert_inner(
261 &self,
262 cert: CertifiedTransaction,
263 ) -> IotaResult<VerifiedCertificate> {
264 let (tx, rx) = oneshot::channel();
269 pin_mut!(rx);
270
271 let prev_id_or_buffer = {
272 let mut queue = self.queue.lock();
273 queue.push(tx, cert);
274 if queue.len() == queue.capacity() {
275 Either::Right(CertBuffer::take_and_replace(queue))
276 } else {
277 Either::Left(queue.id)
278 }
279 };
280 let prev_id = match prev_id_or_buffer {
281 Either::Left(prev_id) => prev_id,
282 Either::Right(buffer) => {
283 self.metrics.full_batches.inc();
284 self.process_queue(buffer).await;
285 return rx.try_recv().unwrap();
287 }
288 };
289
290 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
291 return res.unwrap();
293 }
294 self.metrics.timeouts.inc();
295
296 let buffer = {
297 let queue = self.queue.lock();
298 if prev_id == queue.id {
300 debug_assert_ne!(queue.len(), queue.capacity());
301 Some(CertBuffer::take_and_replace(queue))
302 } else {
303 None
304 }
305 };
306
307 if let Some(buffer) = buffer {
308 self.metrics.partial_batches.inc();
309 self.process_queue(buffer).await;
310 return rx.try_recv().unwrap();
312 }
313
314 rx.await.unwrap()
317 }
318
319 async fn process_queue(&self, buffer: CertBuffer) {
320 let committee = self.committee.clone();
321 let metrics = self.metrics.clone();
322 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
323 Handle::current()
324 .spawn_blocking(move || {
325 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
326 })
327 .await
328 .expect("Spawn blocking should not fail");
329 }
330
331 fn process_queue_sync(
332 committee: Arc<Committee>,
333 metrics: Arc<SignatureVerifierMetrics>,
334 buffer: CertBuffer,
335 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
336 ) {
337 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
338
339 let results = batch_verify_certificates(
340 &committee,
341 &buffer.certs.iter().collect_vec(),
342 zklogin_inputs_cache,
343 );
344 izip!(
345 results.into_iter(),
346 buffer.certs.into_iter(),
347 buffer.senders.into_iter(),
348 )
349 .for_each(|(result, cert, tx)| {
350 tx.send(match result {
351 Ok(()) => {
352 metrics.total_verified_certs.inc();
353 Ok(VerifiedCertificate::new_unchecked(cert))
354 }
355 Err(e) => {
356 metrics.total_failed_certs.inc();
357 Err(e)
358 }
359 })
360 .ok();
361 });
362 }
363
364 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
367 let mut jwks = self.jwks.write();
368 match jwks.entry(jwk_id.clone()) {
369 im::hashmap::Entry::Occupied(_) => {
370 debug!("JWK with kid {:?} already exists", jwk_id);
371 }
372 im::hashmap::Entry::Vacant(entry) => {
373 debug!("inserting JWK with kid: {:?}", jwk_id);
374 entry.insert(jwk.clone());
375 }
376 }
377 }
378
379 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
380 let jwks = self.jwks.read();
381 jwks.get(jwk_id) == Some(jwk)
382 }
383
384 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
385 self.jwks.read().clone()
386 }
387
388 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
389 self.signed_data_cache.is_verified(
390 signed_tx.full_message_digest(),
391 || {
392 let jwks = self.jwks.read().clone();
393 let verify_params = VerifyParams::new(
394 jwks,
395 self.zk_login_params.env,
396 self.zk_login_params.accept_zklogin_in_multisig,
397 self.zk_login_params.accept_passkey_in_multisig,
398 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
399 self.zk_login_params.additional_multisig_checks,
400 );
401 verify_sender_signed_data_message_signatures(
402 signed_tx,
403 self.committee.epoch(),
404 &verify_params,
405 self.zklogin_inputs_cache.clone(),
406 )
407 },
408 || Ok(()),
409 )
410 }
411
412 pub fn clear_signature_cache(&self) {
413 self.certificate_cache.clear();
414 self.signed_data_cache.clear();
415 self.zklogin_inputs_cache.clear();
416 }
417}
418
419pub struct SignatureVerifierMetrics {
420 pub certificate_signatures_cache_hits: IntCounter,
421 pub certificate_signatures_cache_misses: IntCounter,
422 pub certificate_signatures_cache_evictions: IntCounter,
423 pub signed_data_cache_hits: IntCounter,
424 pub signed_data_cache_misses: IntCounter,
425 pub signed_data_cache_evictions: IntCounter,
426 pub zklogin_inputs_cache_hits: IntCounter,
427 pub zklogin_inputs_cache_misses: IntCounter,
428 pub zklogin_inputs_cache_evictions: IntCounter,
429 timeouts: IntCounter,
430 full_batches: IntCounter,
431 partial_batches: IntCounter,
432 total_verified_certs: IntCounter,
433 total_failed_certs: IntCounter,
434}
435
436impl SignatureVerifierMetrics {
437 pub fn new(registry: &Registry) -> Arc<Self> {
438 Arc::new(Self {
439 certificate_signatures_cache_hits: register_int_counter_with_registry!(
440 "certificate_signatures_cache_hits",
441 "Number of certificates which were known to be verified because of signature cache.",
442 registry
443 )
444 .unwrap(),
445 certificate_signatures_cache_misses: register_int_counter_with_registry!(
446 "certificate_signatures_cache_misses",
447 "Number of certificates which missed the signature cache",
448 registry
449 )
450 .unwrap(),
451 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
452 "certificate_signatures_cache_evictions",
453 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
454 registry
455 )
456 .unwrap(),
457 signed_data_cache_hits: register_int_counter_with_registry!(
458 "signed_data_cache_hits",
459 "Number of signed data which were known to be verified because of signature cache.",
460 registry
461 )
462 .unwrap(),
463 signed_data_cache_misses: register_int_counter_with_registry!(
464 "signed_data_cache_misses",
465 "Number of signed data which missed the signature cache.",
466 registry
467 )
468 .unwrap(),
469 signed_data_cache_evictions: register_int_counter_with_registry!(
470 "signed_data_cache_evictions",
471 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
472 registry
473 )
474 .unwrap(),
475 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
476 "zklogin_inputs_cache_hits",
477 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
478 registry
479 )
480 .unwrap(),
481 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
482 "zklogin_inputs_cache_misses",
483 "Number of zklogin signatures which missed the zklogin inputs cache.",
484 registry
485 )
486 .unwrap(),
487 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
488 "zklogin_inputs_cache_evictions",
489 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
490 registry
491 )
492 .unwrap(),
493 timeouts: register_int_counter_with_registry!(
494 "async_batch_verifier_timeouts",
495 "Number of times batch verifier times out and verifies a partial batch",
496 registry
497 )
498 .unwrap(),
499 full_batches: register_int_counter_with_registry!(
500 "async_batch_verifier_full_batches",
501 "Number of times batch verifier verifies a full batch",
502 registry
503 )
504 .unwrap(),
505 partial_batches: register_int_counter_with_registry!(
506 "async_batch_verifier_partial_batches",
507 "Number of times batch verifier verifies a partial batch",
508 registry
509 )
510 .unwrap(),
511 total_verified_certs: register_int_counter_with_registry!(
512 "async_batch_verifier_total_verified_certs",
513 "Total number of certs batch verifier has verified",
514 registry
515 )
516 .unwrap(),
517 total_failed_certs: register_int_counter_with_registry!(
518 "async_batch_verifier_total_failed_certs",
519 "Total number of certs batch verifier has rejected",
520 registry
521 )
522 .unwrap(),
523 })
524 }
525}
526
527pub fn batch_verify_all_certificates_and_checkpoints(
529 committee: &Committee,
530 certs: &[&CertifiedTransaction],
531 checkpoints: &[&SignedCheckpointSummary],
532) -> IotaResult {
533 for ckpt in checkpoints {
536 ckpt.data().verify_epoch(committee.epoch())?;
537 }
538
539 batch_verify(committee, certs, checkpoints)
540}
541
542pub fn batch_verify_certificates(
545 committee: &Committee,
546 certs: &[&CertifiedTransaction],
547 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
548) -> Vec<IotaResult> {
549 let verify_params = VerifyParams::default();
551 match batch_verify(committee, certs, &[]) {
552 Ok(_) => vec![Ok(()); certs.len()],
553
554 Err(_) if certs.len() > 1 => certs
556 .iter()
557 .map(|c| {
560 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
561 })
562 .collect(),
563
564 Err(e) => vec![Err(e)],
565 }
566}
567
568fn batch_verify(
569 committee: &Committee,
570 certs: &[&CertifiedTransaction],
571 checkpoints: &[&SignedCheckpointSummary],
572) -> IotaResult {
573 let mut obligation = VerificationObligation::default();
574
575 for cert in certs {
576 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
577 cert.auth_sig()
578 .add_to_verification_obligation(committee, &mut obligation, idx)?;
579 }
580
581 for ckpt in checkpoints {
582 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
583 ckpt.auth_sig()
584 .add_to_verification_obligation(committee, &mut obligation, idx)?;
585 }
586
587 obligation.verify_all()
588}