1use std::{collections::BTreeSet, sync::Arc};
6
7use either::Either;
8use fastcrypto::traits::{AggregateAuthenticator, ToFromBytes};
9use fastcrypto_zkp::bn254::{
10 zk_login::{JWK, JwkId},
11 zk_login_api::ZkLoginEnv,
12};
13use futures::pin_mut;
14use im::hashmap::HashMap as ImHashMap;
15use iota_metrics::monitored_scope;
16use iota_sdk_types::crypto::Intent;
17use iota_types::{
18 base_types::AuthorityName,
19 committee::Committee,
20 crypto::{AuthorityPublicKey, AuthoritySignInfoTrait, VerificationObligation},
21 digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
22 error::{IotaError, IotaResult},
23 message_envelope::Message,
24 messages_checkpoint::SignedCheckpointSummary,
25 messages_consensus::{AuthorityCapabilitiesDigest, SignedAuthorityCapabilitiesV1},
26 signature::VerifyParams,
27 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
28 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
29};
30use itertools::{Itertools as _, izip};
31use parking_lot::{Mutex, MutexGuard, RwLock};
32use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
33use tap::TapFallible;
34use tokio::{
35 runtime::Handle,
36 sync::oneshot,
37 time::{Duration, timeout},
38};
39use tracing::{Instrument, debug, instrument, trace_span};
40const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
43
44const MAX_BATCH_SIZE: usize = 8;
50
51type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
52
53struct CertBuffer {
54 certs: Vec<CertifiedTransaction>,
55 senders: Vec<Sender>,
56 id: u64,
57}
58
59impl CertBuffer {
60 fn new(capacity: usize) -> Self {
61 Self {
62 certs: Vec::with_capacity(capacity),
63 senders: Vec::with_capacity(capacity),
64 id: 0,
65 }
66 }
67
68 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
71 let this = &mut *guard;
72 let mut new = CertBuffer::new(this.capacity());
73 new.id = this.id + 1;
74 std::mem::swap(&mut new, this);
75 new
76 }
77
78 fn capacity(&self) -> usize {
79 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
80 self.certs.capacity()
81 }
82
83 fn len(&self) -> usize {
84 debug_assert_eq!(self.certs.len(), self.senders.len());
85 self.certs.len()
86 }
87
88 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
89 self.senders.push(tx);
90 self.certs.push(cert);
91 }
92}
93
94pub struct SignatureVerifier {
99 committee: Arc<Committee>,
100 non_committee_validators: BTreeSet<AuthorityName>,
101
102 certificate_cache: VerifiedDigestCache<CertificateDigest>,
103 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
104 authority_capability_cache: VerifiedDigestCache<AuthorityCapabilitiesDigest>,
105 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
106
107 jwks: RwLock<ImHashMap<JwkId, JWK>>,
114
115 zk_login_params: ZkLoginParams,
118
119 queue: Mutex<CertBuffer>,
120 pub metrics: Arc<SignatureVerifierMetrics>,
121}
122
123#[derive(Clone)]
125struct ZkLoginParams {
126 pub env: ZkLoginEnv,
129 pub accept_zklogin_in_multisig: bool,
131 pub accept_passkey_in_multisig: bool,
133 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
135 pub additional_multisig_checks: bool,
137}
138
139impl SignatureVerifier {
140 pub fn new_with_batch_size(
141 committee: Arc<Committee>,
142 non_committee_validators: BTreeSet<AuthorityName>,
143 batch_size: usize,
144 metrics: Arc<SignatureVerifierMetrics>,
145 env: ZkLoginEnv,
146 accept_zklogin_in_multisig: bool,
147 accept_passkey_in_multisig: bool,
148 zklogin_max_epoch_upper_bound_delta: Option<u64>,
149 additional_multisig_checks: bool,
150 ) -> Self {
151 Self {
152 committee,
153 non_committee_validators,
154 certificate_cache: VerifiedDigestCache::new(
155 metrics.certificate_signatures_cache_hits.clone(),
156 metrics.certificate_signatures_cache_misses.clone(),
157 metrics.certificate_signatures_cache_evictions.clone(),
158 ),
159 signed_data_cache: VerifiedDigestCache::new(
160 metrics.signed_data_cache_hits.clone(),
161 metrics.signed_data_cache_misses.clone(),
162 metrics.signed_data_cache_evictions.clone(),
163 ),
164 authority_capability_cache: VerifiedDigestCache::new(
165 metrics.authority_capabilities_cache_hits.clone(),
166 metrics.authority_capabilities_cache_misses.clone(),
167 metrics.authority_capabilities_cache_evictions.clone(),
168 ),
169 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
170 metrics.zklogin_inputs_cache_hits.clone(),
171 metrics.zklogin_inputs_cache_misses.clone(),
172 metrics.zklogin_inputs_cache_evictions.clone(),
173 )),
174 jwks: Default::default(),
175 queue: Mutex::new(CertBuffer::new(batch_size)),
176 metrics,
177 zk_login_params: ZkLoginParams {
178 env,
179 accept_zklogin_in_multisig,
180 accept_passkey_in_multisig,
181 zklogin_max_epoch_upper_bound_delta,
182 additional_multisig_checks,
183 },
184 }
185 }
186
187 pub fn new(
188 committee: Arc<Committee>,
189 non_committee_validators: BTreeSet<AuthorityName>,
190 metrics: Arc<SignatureVerifierMetrics>,
191 zklogin_env: ZkLoginEnv,
192 accept_zklogin_in_multisig: bool,
193 accept_passkey_in_multisig: bool,
194 zklogin_max_epoch_upper_bound_delta: Option<u64>,
195 additional_multisig_checks: bool,
196 ) -> Self {
197 Self::new_with_batch_size(
198 committee,
199 non_committee_validators,
200 MAX_BATCH_SIZE,
201 metrics,
202 zklogin_env,
203 accept_zklogin_in_multisig,
204 accept_passkey_in_multisig,
205 zklogin_max_epoch_upper_bound_delta,
206 additional_multisig_checks,
207 )
208 }
209
210 #[instrument(level = "trace", skip_all)]
212 pub fn verify_certs_and_checkpoints(
213 &self,
214 certs: Vec<&CertifiedTransaction>,
215 checkpoints: Vec<&SignedCheckpointSummary>,
216 authority_capabilities: Vec<&SignedAuthorityCapabilitiesV1>,
217 ) -> IotaResult {
218 for cert in &certs {
221 self.verify_tx(cert.data())?;
222 }
223
224 for cap in &authority_capabilities {
227 self.verify_authority_capabilities(cap)?;
228 }
229
230 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
231 Ok(())
232 }
233
234 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
236 let cert_digest = cert.certificate_digest();
237 if self.certificate_cache.is_cached(&cert_digest) {
238 return Ok(VerifiedCertificate::new_unchecked(cert));
239 }
240 self.verify_tx(cert.data())?;
241 self.verify_cert_skip_cache(cert)
242 .await
243 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
244 }
245
246 pub async fn multi_verify_certs(
247 &self,
248 certs: Vec<CertifiedTransaction>,
249 ) -> Vec<IotaResult<VerifiedCertificate>> {
250 let mut futures = Vec::with_capacity(certs.len());
253 for cert in certs {
254 futures.push(self.verify_cert(cert));
255 }
256 futures::future::join_all(futures).await
257 }
258
259 pub async fn verify_cert_skip_cache(
261 &self,
262 cert: CertifiedTransaction,
263 ) -> IotaResult<VerifiedCertificate> {
264 if cert.auth_sig().epoch != self.committee.epoch() {
267 return Err(IotaError::WrongEpoch {
268 expected_epoch: self.committee.epoch(),
269 actual_epoch: cert.auth_sig().epoch,
270 });
271 }
272
273 self.verify_cert_inner(cert).await
274 }
275
276 async fn verify_cert_inner(
277 &self,
278 cert: CertifiedTransaction,
279 ) -> IotaResult<VerifiedCertificate> {
280 let (tx, rx) = oneshot::channel();
285 pin_mut!(rx);
286
287 let prev_id_or_buffer = {
288 let mut queue = self.queue.lock();
289 queue.push(tx, cert);
290 if queue.len() == queue.capacity() {
291 Either::Right(CertBuffer::take_and_replace(queue))
292 } else {
293 Either::Left(queue.id)
294 }
295 };
296 let prev_id = match prev_id_or_buffer {
297 Either::Left(prev_id) => prev_id,
298 Either::Right(buffer) => {
299 self.metrics.full_batches.inc();
300 self.process_queue(buffer)
301 .instrument(trace_span!("SignatureVerifier::process_queue"))
302 .await;
303 return rx.try_recv().unwrap();
305 }
306 };
307
308 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
309 return res.unwrap();
311 }
312 self.metrics.timeouts.inc();
313
314 let buffer = {
315 let queue = self.queue.lock();
316 if prev_id == queue.id {
318 debug_assert_ne!(queue.len(), queue.capacity());
319 Some(CertBuffer::take_and_replace(queue))
320 } else {
321 None
322 }
323 };
324
325 if let Some(buffer) = buffer {
326 self.metrics.partial_batches.inc();
327 self.process_queue(buffer).await;
328 return rx.try_recv().unwrap();
330 }
331
332 rx.await.unwrap()
335 }
336
337 async fn process_queue(&self, buffer: CertBuffer) {
338 let committee = self.committee.clone();
339 let metrics = self.metrics.clone();
340 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
341 Handle::current()
342 .spawn_blocking(move || {
343 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
344 })
345 .await
346 .expect("Spawn blocking should not fail");
347 }
348
349 #[instrument(level = "trace", skip_all)]
350 fn process_queue_sync(
351 committee: Arc<Committee>,
352 metrics: Arc<SignatureVerifierMetrics>,
353 buffer: CertBuffer,
354 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
355 ) {
356 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
357
358 let results = batch_verify_certificates(
359 &committee,
360 &buffer.certs.iter().collect_vec(),
361 zklogin_inputs_cache,
362 );
363 izip!(
364 results.into_iter(),
365 buffer.certs.into_iter(),
366 buffer.senders.into_iter(),
367 )
368 .for_each(|(result, cert, tx)| {
369 tx.send(match result {
370 Ok(()) => {
371 metrics.total_verified_certs.inc();
372 Ok(VerifiedCertificate::new_unchecked(cert))
373 }
374 Err(e) => {
375 metrics.total_failed_certs.inc();
376 Err(e)
377 }
378 })
379 .ok();
380 });
381 }
382
383 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
386 let mut jwks = self.jwks.write();
387 match jwks.entry(jwk_id.clone()) {
388 im::hashmap::Entry::Occupied(_) => {
389 debug!("JWK with kid {:?} already exists", jwk_id);
390 }
391 im::hashmap::Entry::Vacant(entry) => {
392 debug!("inserting JWK with kid: {:?}", jwk_id);
393 entry.insert(jwk.clone());
394 }
395 }
396 }
397
398 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
399 let jwks = self.jwks.read();
400 jwks.get(jwk_id) == Some(jwk)
401 }
402
403 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
404 self.jwks.read().clone()
405 }
406
407 #[instrument(level = "trace", skip_all, fields(tx_digest = ?signed_tx.digest()))]
408 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
409 self.signed_data_cache.is_verified(
410 signed_tx.full_message_digest(),
411 || {
412 let jwks = self.jwks.read().clone();
413 let verify_params = VerifyParams::new(
414 jwks,
415 self.zk_login_params.env,
416 self.zk_login_params.accept_zklogin_in_multisig,
417 self.zk_login_params.accept_passkey_in_multisig,
418 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
419 self.zk_login_params.additional_multisig_checks,
420 );
421 verify_sender_signed_data_message_signatures(
422 signed_tx,
423 self.committee.epoch(),
424 &verify_params,
425 self.zklogin_inputs_cache.clone(),
426 )
427 },
428 || Ok(()),
429 )
430 }
431
432 #[instrument(level = "trace", skip_all)]
433 pub fn verify_authority_capabilities(
434 &self,
435 signed_authority_capabilities: &SignedAuthorityCapabilitiesV1,
436 ) -> IotaResult {
437 let epoch = self.committee.epoch();
438 self.authority_capability_cache.is_verified(
439 signed_authority_capabilities.cache_digest(epoch),
440 || {
441 let authority_name = signed_authority_capabilities.data().authority;
443 if !self.non_committee_validators.contains(&authority_name) {
444 return Err(IotaError::IncorrectSigner {
445 error: "Signer must be part of non-committee active validators".to_string(),
446 });
447 }
448
449 let mut obligation = VerificationObligation::default();
451 let idx = obligation.add_message(
452 signed_authority_capabilities.data(),
453 epoch, Intent::iota_app(signed_authority_capabilities.scope()),
456 );
457
458 let authority_key = AuthorityPublicKey::from_bytes(authority_name.as_bytes())
460 .map_err(|_| IotaError::IncorrectSigner {
461 error: "Invalid authority public key bytes".to_string(),
462 })?;
463 obligation
464 .public_keys
465 .get_mut(idx)
466 .ok_or(IotaError::InvalidAuthenticator)?
467 .push(&authority_key);
468
469 obligation
470 .signatures
471 .get_mut(idx)
472 .ok_or(IotaError::InvalidAuthenticator)?
473 .add_signature(signed_authority_capabilities.auth_sig().clone())
474 .map_err(|_| IotaError::InvalidSignature {
475 error: "Failed to add authority signature to obligation".to_string(),
476 })?;
477
478 obligation.verify_all()
479 },
480 || Ok(()),
481 )
482 }
483
484 pub fn clear_signature_cache(&self) {
485 self.certificate_cache.clear();
486 self.authority_capability_cache.clear();
487 self.signed_data_cache.clear();
488 self.zklogin_inputs_cache.clear();
489 }
490}
491
492pub struct SignatureVerifierMetrics {
493 pub certificate_signatures_cache_hits: IntCounter,
494 pub certificate_signatures_cache_misses: IntCounter,
495 pub certificate_signatures_cache_evictions: IntCounter,
496 pub signed_data_cache_hits: IntCounter,
497 pub signed_data_cache_misses: IntCounter,
498 pub signed_data_cache_evictions: IntCounter,
499 pub zklogin_inputs_cache_hits: IntCounter,
500 pub zklogin_inputs_cache_misses: IntCounter,
501 pub zklogin_inputs_cache_evictions: IntCounter,
502 pub authority_capabilities_cache_hits: IntCounter,
503 pub authority_capabilities_cache_misses: IntCounter,
504 pub authority_capabilities_cache_evictions: IntCounter,
505 timeouts: IntCounter,
506 full_batches: IntCounter,
507 partial_batches: IntCounter,
508 total_verified_certs: IntCounter,
509 total_failed_certs: IntCounter,
510}
511
512impl SignatureVerifierMetrics {
513 pub fn new(registry: &Registry) -> Arc<Self> {
514 Arc::new(Self {
515 certificate_signatures_cache_hits: register_int_counter_with_registry!(
516 "certificate_signatures_cache_hits",
517 "Number of certificates which were known to be verified because of signature cache.",
518 registry
519 )
520 .unwrap(),
521 certificate_signatures_cache_misses: register_int_counter_with_registry!(
522 "certificate_signatures_cache_misses",
523 "Number of certificates which missed the signature cache",
524 registry
525 )
526 .unwrap(),
527 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
528 "certificate_signatures_cache_evictions",
529 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
530 registry
531 )
532 .unwrap(),
533 signed_data_cache_hits: register_int_counter_with_registry!(
534 "signed_data_cache_hits",
535 "Number of signed data which were known to be verified because of signature cache.",
536 registry
537 )
538 .unwrap(),
539 signed_data_cache_misses: register_int_counter_with_registry!(
540 "signed_data_cache_misses",
541 "Number of signed data which missed the signature cache.",
542 registry
543 )
544 .unwrap(),
545 signed_data_cache_evictions: register_int_counter_with_registry!(
546 "signed_data_cache_evictions",
547 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
548 registry
549 )
550 .unwrap(),
551 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
552 "zklogin_inputs_cache_hits",
553 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
554 registry
555 )
556 .unwrap(),
557 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
558 "zklogin_inputs_cache_misses",
559 "Number of zklogin signatures which missed the zklogin inputs cache.",
560 registry
561 )
562 .unwrap(),
563 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
564 "zklogin_inputs_cache_evictions",
565 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
566 registry
567 )
568 .unwrap(),
569 authority_capabilities_cache_hits: register_int_counter_with_registry!(
570 "authority_capabilities_cache_hits",
571 "Number of authority capabilities which were known to be verified because of capabilities cache.",
572 registry
573 )
574 .unwrap(),
575 authority_capabilities_cache_misses: register_int_counter_with_registry!(
576 "authority_capabilities_cache_misses",
577 "Number of authority capabilities which missed the capabilities cache.",
578 registry
579 )
580 .unwrap(),
581 authority_capabilities_cache_evictions: register_int_counter_with_registry!(
582 "authority_capabilities_cache_evictions",
583 "Number of times we evict a pre-existing authority capabilities that were known to be verified.",
584 registry
585 )
586 .unwrap(),
587 timeouts: register_int_counter_with_registry!(
588 "async_batch_verifier_timeouts",
589 "Number of times batch verifier times out and verifies a partial batch",
590 registry
591 )
592 .unwrap(),
593 full_batches: register_int_counter_with_registry!(
594 "async_batch_verifier_full_batches",
595 "Number of times batch verifier verifies a full batch",
596 registry
597 )
598 .unwrap(),
599 partial_batches: register_int_counter_with_registry!(
600 "async_batch_verifier_partial_batches",
601 "Number of times batch verifier verifies a partial batch",
602 registry
603 )
604 .unwrap(),
605 total_verified_certs: register_int_counter_with_registry!(
606 "async_batch_verifier_total_verified_certs",
607 "Total number of certs batch verifier has verified",
608 registry
609 )
610 .unwrap(),
611 total_failed_certs: register_int_counter_with_registry!(
612 "async_batch_verifier_total_failed_certs",
613 "Total number of certs batch verifier has rejected",
614 registry
615 )
616 .unwrap(),
617 })
618 }
619}
620
621#[instrument(level = "trace", skip_all)]
623pub fn batch_verify_all_certificates_and_checkpoints(
624 committee: &Committee,
625 certs: &[&CertifiedTransaction],
626 checkpoints: &[&SignedCheckpointSummary],
627) -> IotaResult {
628 for ckpt in checkpoints {
631 ckpt.data().verify_epoch(committee.epoch())?;
632 }
633
634 batch_verify(committee, certs, checkpoints)
635}
636
637#[instrument(level = "trace", skip_all)]
640pub fn batch_verify_certificates(
641 committee: &Committee,
642 certs: &[&CertifiedTransaction],
643 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
644) -> Vec<IotaResult> {
645 let verify_params = VerifyParams::default();
647 match batch_verify(committee, certs, &[]) {
648 Ok(_) => vec![Ok(()); certs.len()],
649
650 Err(_) if certs.len() > 1 => certs
652 .iter()
653 .map(|c| {
656 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
657 })
658 .collect(),
659
660 Err(e) => vec![Err(e)],
661 }
662}
663
664fn batch_verify(
665 committee: &Committee,
666 certs: &[&CertifiedTransaction],
667 checkpoints: &[&SignedCheckpointSummary],
668) -> IotaResult {
669 let mut obligation = VerificationObligation::default();
670
671 for cert in certs {
672 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
673 cert.auth_sig()
674 .add_to_verification_obligation(committee, &mut obligation, idx)?;
675 }
676
677 for ckpt in checkpoints {
678 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
679 ckpt.auth_sig()
680 .add_to_verification_obligation(committee, &mut obligation, idx)?;
681 }
682
683 obligation.verify_all()
684}