1use std::{collections::BTreeSet, sync::Arc};
6
7use either::Either;
8use fastcrypto::traits::{AggregateAuthenticator, ToFromBytes};
9use fastcrypto_zkp::bn254::{
10 zk_login::{JWK, JwkId},
11 zk_login_api::ZkLoginEnv,
12};
13use futures::pin_mut;
14use im::hashmap::HashMap as ImHashMap;
15use iota_metrics::monitored_scope;
16use iota_types::{
17 base_types::AuthorityName,
18 committee::Committee,
19 crypto::{AuthorityPublicKey, AuthoritySignInfoTrait, VerificationObligation},
20 digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
21 error::{IotaError, IotaResult},
22 message_envelope::Message,
23 messages_checkpoint::SignedCheckpointSummary,
24 messages_consensus::{AuthorityCapabilitiesDigest, SignedAuthorityCapabilitiesV1},
25 signature::VerifyParams,
26 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
27 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
28};
29use itertools::{Itertools as _, izip};
30use parking_lot::{Mutex, MutexGuard, RwLock};
31use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
32use shared_crypto::intent::Intent;
33use tap::TapFallible;
34use tokio::{
35 runtime::Handle,
36 sync::oneshot,
37 time::{Duration, timeout},
38};
39use tracing::{debug, instrument};
40const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
43
44const MAX_BATCH_SIZE: usize = 8;
50
51type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
52
53struct CertBuffer {
54 certs: Vec<CertifiedTransaction>,
55 senders: Vec<Sender>,
56 id: u64,
57}
58
59impl CertBuffer {
60 fn new(capacity: usize) -> Self {
61 Self {
62 certs: Vec::with_capacity(capacity),
63 senders: Vec::with_capacity(capacity),
64 id: 0,
65 }
66 }
67
68 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
71 let this = &mut *guard;
72 let mut new = CertBuffer::new(this.capacity());
73 new.id = this.id + 1;
74 std::mem::swap(&mut new, this);
75 new
76 }
77
78 fn capacity(&self) -> usize {
79 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
80 self.certs.capacity()
81 }
82
83 fn len(&self) -> usize {
84 debug_assert_eq!(self.certs.len(), self.senders.len());
85 self.certs.len()
86 }
87
88 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
89 self.senders.push(tx);
90 self.certs.push(cert);
91 }
92}
93
94pub struct SignatureVerifier {
99 committee: Arc<Committee>,
100 non_committee_validators: BTreeSet<AuthorityName>,
101
102 certificate_cache: VerifiedDigestCache<CertificateDigest>,
103 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
104 authority_capability_cache: VerifiedDigestCache<AuthorityCapabilitiesDigest>,
105 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
106
107 jwks: RwLock<ImHashMap<JwkId, JWK>>,
114
115 zk_login_params: ZkLoginParams,
118
119 queue: Mutex<CertBuffer>,
120 pub metrics: Arc<SignatureVerifierMetrics>,
121}
122
123#[derive(Clone)]
125struct ZkLoginParams {
126 pub env: ZkLoginEnv,
129 pub accept_zklogin_in_multisig: bool,
131 pub accept_passkey_in_multisig: bool,
133 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
135 pub additional_multisig_checks: bool,
137}
138
139impl SignatureVerifier {
140 pub fn new_with_batch_size(
141 committee: Arc<Committee>,
142 non_committee_validators: BTreeSet<AuthorityName>,
143 batch_size: usize,
144 metrics: Arc<SignatureVerifierMetrics>,
145 env: ZkLoginEnv,
146 accept_zklogin_in_multisig: bool,
147 accept_passkey_in_multisig: bool,
148 zklogin_max_epoch_upper_bound_delta: Option<u64>,
149 additional_multisig_checks: bool,
150 ) -> Self {
151 Self {
152 committee,
153 non_committee_validators,
154 certificate_cache: VerifiedDigestCache::new(
155 metrics.certificate_signatures_cache_hits.clone(),
156 metrics.certificate_signatures_cache_misses.clone(),
157 metrics.certificate_signatures_cache_evictions.clone(),
158 ),
159 signed_data_cache: VerifiedDigestCache::new(
160 metrics.signed_data_cache_hits.clone(),
161 metrics.signed_data_cache_misses.clone(),
162 metrics.signed_data_cache_evictions.clone(),
163 ),
164 authority_capability_cache: VerifiedDigestCache::new(
165 metrics.authority_capabilities_cache_hits.clone(),
166 metrics.authority_capabilities_cache_misses.clone(),
167 metrics.authority_capabilities_cache_evictions.clone(),
168 ),
169 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
170 metrics.zklogin_inputs_cache_hits.clone(),
171 metrics.zklogin_inputs_cache_misses.clone(),
172 metrics.zklogin_inputs_cache_evictions.clone(),
173 )),
174 jwks: Default::default(),
175 queue: Mutex::new(CertBuffer::new(batch_size)),
176 metrics,
177 zk_login_params: ZkLoginParams {
178 env,
179 accept_zklogin_in_multisig,
180 accept_passkey_in_multisig,
181 zklogin_max_epoch_upper_bound_delta,
182 additional_multisig_checks,
183 },
184 }
185 }
186
187 pub fn new(
188 committee: Arc<Committee>,
189 non_committee_validators: BTreeSet<AuthorityName>,
190 metrics: Arc<SignatureVerifierMetrics>,
191 zklogin_env: ZkLoginEnv,
192 accept_zklogin_in_multisig: bool,
193 accept_passkey_in_multisig: bool,
194 zklogin_max_epoch_upper_bound_delta: Option<u64>,
195 additional_multisig_checks: bool,
196 ) -> Self {
197 Self::new_with_batch_size(
198 committee,
199 non_committee_validators,
200 MAX_BATCH_SIZE,
201 metrics,
202 zklogin_env,
203 accept_zklogin_in_multisig,
204 accept_passkey_in_multisig,
205 zklogin_max_epoch_upper_bound_delta,
206 additional_multisig_checks,
207 )
208 }
209
210 pub fn verify_certs_and_checkpoints(
212 &self,
213 certs: Vec<&CertifiedTransaction>,
214 checkpoints: Vec<&SignedCheckpointSummary>,
215 authority_capabilities: Vec<&SignedAuthorityCapabilitiesV1>,
216 ) -> IotaResult {
217 for cert in &certs {
220 self.verify_tx(cert.data())?;
221 }
222
223 for cap in &authority_capabilities {
226 self.verify_authority_capabilities(cap)?;
227 }
228
229 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
230 Ok(())
231 }
232
233 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
235 let cert_digest = cert.certificate_digest();
236 if self.certificate_cache.is_cached(&cert_digest) {
237 return Ok(VerifiedCertificate::new_unchecked(cert));
238 }
239 self.verify_tx(cert.data())?;
240 self.verify_cert_skip_cache(cert)
241 .await
242 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
243 }
244
245 pub async fn multi_verify_certs(
246 &self,
247 certs: Vec<CertifiedTransaction>,
248 ) -> Vec<IotaResult<VerifiedCertificate>> {
249 let mut futures = Vec::with_capacity(certs.len());
252 for cert in certs {
253 futures.push(self.verify_cert(cert));
254 }
255 futures::future::join_all(futures).await
256 }
257
258 pub async fn verify_cert_skip_cache(
260 &self,
261 cert: CertifiedTransaction,
262 ) -> IotaResult<VerifiedCertificate> {
263 if cert.auth_sig().epoch != self.committee.epoch() {
266 return Err(IotaError::WrongEpoch {
267 expected_epoch: self.committee.epoch(),
268 actual_epoch: cert.auth_sig().epoch,
269 });
270 }
271
272 self.verify_cert_inner(cert).await
273 }
274
275 async fn verify_cert_inner(
276 &self,
277 cert: CertifiedTransaction,
278 ) -> IotaResult<VerifiedCertificate> {
279 let (tx, rx) = oneshot::channel();
284 pin_mut!(rx);
285
286 let prev_id_or_buffer = {
287 let mut queue = self.queue.lock();
288 queue.push(tx, cert);
289 if queue.len() == queue.capacity() {
290 Either::Right(CertBuffer::take_and_replace(queue))
291 } else {
292 Either::Left(queue.id)
293 }
294 };
295 let prev_id = match prev_id_or_buffer {
296 Either::Left(prev_id) => prev_id,
297 Either::Right(buffer) => {
298 self.metrics.full_batches.inc();
299 self.process_queue(buffer).await;
300 return rx.try_recv().unwrap();
302 }
303 };
304
305 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
306 return res.unwrap();
308 }
309 self.metrics.timeouts.inc();
310
311 let buffer = {
312 let queue = self.queue.lock();
313 if prev_id == queue.id {
315 debug_assert_ne!(queue.len(), queue.capacity());
316 Some(CertBuffer::take_and_replace(queue))
317 } else {
318 None
319 }
320 };
321
322 if let Some(buffer) = buffer {
323 self.metrics.partial_batches.inc();
324 self.process_queue(buffer).await;
325 return rx.try_recv().unwrap();
327 }
328
329 rx.await.unwrap()
332 }
333
334 async fn process_queue(&self, buffer: CertBuffer) {
335 let committee = self.committee.clone();
336 let metrics = self.metrics.clone();
337 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
338 Handle::current()
339 .spawn_blocking(move || {
340 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
341 })
342 .await
343 .expect("Spawn blocking should not fail");
344 }
345
346 fn process_queue_sync(
347 committee: Arc<Committee>,
348 metrics: Arc<SignatureVerifierMetrics>,
349 buffer: CertBuffer,
350 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
351 ) {
352 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
353
354 let results = batch_verify_certificates(
355 &committee,
356 &buffer.certs.iter().collect_vec(),
357 zklogin_inputs_cache,
358 );
359 izip!(
360 results.into_iter(),
361 buffer.certs.into_iter(),
362 buffer.senders.into_iter(),
363 )
364 .for_each(|(result, cert, tx)| {
365 tx.send(match result {
366 Ok(()) => {
367 metrics.total_verified_certs.inc();
368 Ok(VerifiedCertificate::new_unchecked(cert))
369 }
370 Err(e) => {
371 metrics.total_failed_certs.inc();
372 Err(e)
373 }
374 })
375 .ok();
376 });
377 }
378
379 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
382 let mut jwks = self.jwks.write();
383 match jwks.entry(jwk_id.clone()) {
384 im::hashmap::Entry::Occupied(_) => {
385 debug!("JWK with kid {:?} already exists", jwk_id);
386 }
387 im::hashmap::Entry::Vacant(entry) => {
388 debug!("inserting JWK with kid: {:?}", jwk_id);
389 entry.insert(jwk.clone());
390 }
391 }
392 }
393
394 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
395 let jwks = self.jwks.read();
396 jwks.get(jwk_id) == Some(jwk)
397 }
398
399 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
400 self.jwks.read().clone()
401 }
402
403 #[instrument(level = "trace", skip_all, fields(tx_digest = ?signed_tx.digest()))]
404 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
405 self.signed_data_cache.is_verified(
406 signed_tx.full_message_digest(),
407 || {
408 let jwks = self.jwks.read().clone();
409 let verify_params = VerifyParams::new(
410 jwks,
411 self.zk_login_params.env,
412 self.zk_login_params.accept_zklogin_in_multisig,
413 self.zk_login_params.accept_passkey_in_multisig,
414 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
415 self.zk_login_params.additional_multisig_checks,
416 );
417 verify_sender_signed_data_message_signatures(
418 signed_tx,
419 self.committee.epoch(),
420 &verify_params,
421 self.zklogin_inputs_cache.clone(),
422 )
423 },
424 || Ok(()),
425 )
426 }
427
428 pub fn verify_authority_capabilities(
429 &self,
430 signed_authority_capabilities: &SignedAuthorityCapabilitiesV1,
431 ) -> IotaResult {
432 let epoch = self.committee.epoch();
433 self.authority_capability_cache.is_verified(
434 signed_authority_capabilities.cache_digest(epoch),
435 || {
436 let authority_name = signed_authority_capabilities.data().authority;
438 if !self.non_committee_validators.contains(&authority_name) {
439 return Err(IotaError::IncorrectSigner {
440 error: "Signer must be part of non-committee active validators".to_string(),
441 });
442 }
443
444 let mut obligation = VerificationObligation::default();
446 let idx = obligation.add_message(
447 signed_authority_capabilities.data(),
448 epoch, Intent::iota_app(signed_authority_capabilities.scope()),
451 );
452
453 let authority_key = AuthorityPublicKey::from_bytes(authority_name.as_bytes())
455 .map_err(|_| IotaError::IncorrectSigner {
456 error: "Invalid authority public key bytes".to_string(),
457 })?;
458 obligation
459 .public_keys
460 .get_mut(idx)
461 .ok_or(IotaError::InvalidAuthenticator)?
462 .push(&authority_key);
463
464 obligation
465 .signatures
466 .get_mut(idx)
467 .ok_or(IotaError::InvalidAuthenticator)?
468 .add_signature(signed_authority_capabilities.auth_sig().clone())
469 .map_err(|_| IotaError::InvalidSignature {
470 error: "Failed to add authority signature to obligation".to_string(),
471 })?;
472
473 obligation.verify_all()
474 },
475 || Ok(()),
476 )
477 }
478
479 pub fn clear_signature_cache(&self) {
480 self.certificate_cache.clear();
481 self.authority_capability_cache.clear();
482 self.signed_data_cache.clear();
483 self.zklogin_inputs_cache.clear();
484 }
485}
486
487pub struct SignatureVerifierMetrics {
488 pub certificate_signatures_cache_hits: IntCounter,
489 pub certificate_signatures_cache_misses: IntCounter,
490 pub certificate_signatures_cache_evictions: IntCounter,
491 pub signed_data_cache_hits: IntCounter,
492 pub signed_data_cache_misses: IntCounter,
493 pub signed_data_cache_evictions: IntCounter,
494 pub zklogin_inputs_cache_hits: IntCounter,
495 pub zklogin_inputs_cache_misses: IntCounter,
496 pub zklogin_inputs_cache_evictions: IntCounter,
497 pub authority_capabilities_cache_hits: IntCounter,
498 pub authority_capabilities_cache_misses: IntCounter,
499 pub authority_capabilities_cache_evictions: IntCounter,
500 timeouts: IntCounter,
501 full_batches: IntCounter,
502 partial_batches: IntCounter,
503 total_verified_certs: IntCounter,
504 total_failed_certs: IntCounter,
505}
506
507impl SignatureVerifierMetrics {
508 pub fn new(registry: &Registry) -> Arc<Self> {
509 Arc::new(Self {
510 certificate_signatures_cache_hits: register_int_counter_with_registry!(
511 "certificate_signatures_cache_hits",
512 "Number of certificates which were known to be verified because of signature cache.",
513 registry
514 )
515 .unwrap(),
516 certificate_signatures_cache_misses: register_int_counter_with_registry!(
517 "certificate_signatures_cache_misses",
518 "Number of certificates which missed the signature cache",
519 registry
520 )
521 .unwrap(),
522 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
523 "certificate_signatures_cache_evictions",
524 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
525 registry
526 )
527 .unwrap(),
528 signed_data_cache_hits: register_int_counter_with_registry!(
529 "signed_data_cache_hits",
530 "Number of signed data which were known to be verified because of signature cache.",
531 registry
532 )
533 .unwrap(),
534 signed_data_cache_misses: register_int_counter_with_registry!(
535 "signed_data_cache_misses",
536 "Number of signed data which missed the signature cache.",
537 registry
538 )
539 .unwrap(),
540 signed_data_cache_evictions: register_int_counter_with_registry!(
541 "signed_data_cache_evictions",
542 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
543 registry
544 )
545 .unwrap(),
546 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
547 "zklogin_inputs_cache_hits",
548 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
549 registry
550 )
551 .unwrap(),
552 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
553 "zklogin_inputs_cache_misses",
554 "Number of zklogin signatures which missed the zklogin inputs cache.",
555 registry
556 )
557 .unwrap(),
558 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
559 "zklogin_inputs_cache_evictions",
560 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
561 registry
562 )
563 .unwrap(),
564 authority_capabilities_cache_hits: register_int_counter_with_registry!(
565 "authority_capabilities_cache_hits",
566 "Number of authority capabilities which were known to be verified because of capabilities cache.",
567 registry
568 )
569 .unwrap(),
570 authority_capabilities_cache_misses: register_int_counter_with_registry!(
571 "authority_capabilities_cache_misses",
572 "Number of authority capabilities which missed the capabilities cache.",
573 registry
574 )
575 .unwrap(),
576 authority_capabilities_cache_evictions: register_int_counter_with_registry!(
577 "authority_capabilities_cache_evictions",
578 "Number of times we evict a pre-existing authority capabilities that were known to be verified.",
579 registry
580 )
581 .unwrap(),
582 timeouts: register_int_counter_with_registry!(
583 "async_batch_verifier_timeouts",
584 "Number of times batch verifier times out and verifies a partial batch",
585 registry
586 )
587 .unwrap(),
588 full_batches: register_int_counter_with_registry!(
589 "async_batch_verifier_full_batches",
590 "Number of times batch verifier verifies a full batch",
591 registry
592 )
593 .unwrap(),
594 partial_batches: register_int_counter_with_registry!(
595 "async_batch_verifier_partial_batches",
596 "Number of times batch verifier verifies a partial batch",
597 registry
598 )
599 .unwrap(),
600 total_verified_certs: register_int_counter_with_registry!(
601 "async_batch_verifier_total_verified_certs",
602 "Total number of certs batch verifier has verified",
603 registry
604 )
605 .unwrap(),
606 total_failed_certs: register_int_counter_with_registry!(
607 "async_batch_verifier_total_failed_certs",
608 "Total number of certs batch verifier has rejected",
609 registry
610 )
611 .unwrap(),
612 })
613 }
614}
615
616pub fn batch_verify_all_certificates_and_checkpoints(
618 committee: &Committee,
619 certs: &[&CertifiedTransaction],
620 checkpoints: &[&SignedCheckpointSummary],
621) -> IotaResult {
622 for ckpt in checkpoints {
625 ckpt.data().verify_epoch(committee.epoch())?;
626 }
627
628 batch_verify(committee, certs, checkpoints)
629}
630
631pub fn batch_verify_certificates(
634 committee: &Committee,
635 certs: &[&CertifiedTransaction],
636 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
637) -> Vec<IotaResult> {
638 let verify_params = VerifyParams::default();
640 match batch_verify(committee, certs, &[]) {
641 Ok(_) => vec![Ok(()); certs.len()],
642
643 Err(_) if certs.len() > 1 => certs
645 .iter()
646 .map(|c| {
649 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
650 })
651 .collect(),
652
653 Err(e) => vec![Err(e)],
654 }
655}
656
657fn batch_verify(
658 committee: &Committee,
659 certs: &[&CertifiedTransaction],
660 checkpoints: &[&SignedCheckpointSummary],
661) -> IotaResult {
662 let mut obligation = VerificationObligation::default();
663
664 for cert in certs {
665 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
666 cert.auth_sig()
667 .add_to_verification_obligation(committee, &mut obligation, idx)?;
668 }
669
670 for ckpt in checkpoints {
671 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
672 ckpt.auth_sig()
673 .add_to_verification_obligation(committee, &mut obligation, idx)?;
674 }
675
676 obligation.verify_all()
677}