1use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9 zk_login::{JWK, JwkId},
10 zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16 committee::Committee,
17 crypto::{AuthoritySignInfoTrait, VerificationObligation},
18 digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19 error::{IotaError, IotaResult},
20 message_envelope::Message,
21 messages_checkpoint::SignedCheckpointSummary,
22 signature::VerifyParams,
23 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::{Itertools as _, izip};
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32 runtime::Handle,
33 sync::oneshot,
34 time::{Duration, timeout},
35};
36use tracing::debug;
37const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51 certs: Vec<CertifiedTransaction>,
52 senders: Vec<Sender>,
53 id: u64,
54}
55
56impl CertBuffer {
57 fn new(capacity: usize) -> Self {
58 Self {
59 certs: Vec::with_capacity(capacity),
60 senders: Vec::with_capacity(capacity),
61 id: 0,
62 }
63 }
64
65 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68 let this = &mut *guard;
69 let mut new = CertBuffer::new(this.capacity());
70 new.id = this.id + 1;
71 std::mem::swap(&mut new, this);
72 new
73 }
74
75 fn capacity(&self) -> usize {
76 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77 self.certs.capacity()
78 }
79
80 fn len(&self) -> usize {
81 debug_assert_eq!(self.certs.len(), self.senders.len());
82 self.certs.len()
83 }
84
85 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86 self.senders.push(tx);
87 self.certs.push(cert);
88 }
89}
90
91pub struct SignatureVerifier {
96 committee: Arc<Committee>,
97 certificate_cache: VerifiedDigestCache<CertificateDigest>,
98 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101 jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109 zk_login_params: ZkLoginParams,
112
113 queue: Mutex<CertBuffer>,
114 pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117#[derive(Clone)]
119struct ZkLoginParams {
120 pub env: ZkLoginEnv,
123 pub accept_zklogin_in_multisig: bool,
125 pub accept_passkey_in_multisig: bool,
127 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
129}
130
131impl SignatureVerifier {
132 pub fn new_with_batch_size(
133 committee: Arc<Committee>,
134 batch_size: usize,
135 metrics: Arc<SignatureVerifierMetrics>,
136 env: ZkLoginEnv,
137 accept_zklogin_in_multisig: bool,
138 accept_passkey_in_multisig: bool,
139 zklogin_max_epoch_upper_bound_delta: Option<u64>,
140 ) -> Self {
141 Self {
142 committee,
143 certificate_cache: VerifiedDigestCache::new(
144 metrics.certificate_signatures_cache_hits.clone(),
145 metrics.certificate_signatures_cache_misses.clone(),
146 metrics.certificate_signatures_cache_evictions.clone(),
147 ),
148 signed_data_cache: VerifiedDigestCache::new(
149 metrics.signed_data_cache_hits.clone(),
150 metrics.signed_data_cache_misses.clone(),
151 metrics.signed_data_cache_evictions.clone(),
152 ),
153 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
154 metrics.zklogin_inputs_cache_hits.clone(),
155 metrics.zklogin_inputs_cache_misses.clone(),
156 metrics.zklogin_inputs_cache_evictions.clone(),
157 )),
158 jwks: Default::default(),
159 queue: Mutex::new(CertBuffer::new(batch_size)),
160 metrics,
161 zk_login_params: ZkLoginParams {
162 env,
163 accept_zklogin_in_multisig,
164 accept_passkey_in_multisig,
165 zklogin_max_epoch_upper_bound_delta,
166 },
167 }
168 }
169
170 pub fn new(
171 committee: Arc<Committee>,
172 metrics: Arc<SignatureVerifierMetrics>,
173 zklogin_env: ZkLoginEnv,
174 accept_zklogin_in_multisig: bool,
175 accept_passkey_in_multisig: bool,
176 zklogin_max_epoch_upper_bound_delta: Option<u64>,
177 ) -> Self {
178 Self::new_with_batch_size(
179 committee,
180 MAX_BATCH_SIZE,
181 metrics,
182 zklogin_env,
183 accept_zklogin_in_multisig,
184 accept_passkey_in_multisig,
185 zklogin_max_epoch_upper_bound_delta,
186 )
187 }
188
189 pub fn verify_certs_and_checkpoints(
191 &self,
192 certs: Vec<&CertifiedTransaction>,
193 checkpoints: Vec<&SignedCheckpointSummary>,
194 ) -> IotaResult {
195 let certs: Vec<_> = certs
196 .into_iter()
197 .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
198 .collect();
199
200 for cert in &certs {
204 self.verify_tx(cert.data())?;
205 }
206 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
207 self.certificate_cache
208 .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
209 Ok(())
210 }
211
212 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
214 let cert_digest = cert.certificate_digest();
215 if self.certificate_cache.is_cached(&cert_digest) {
216 return Ok(VerifiedCertificate::new_unchecked(cert));
217 }
218 self.verify_tx(cert.data())?;
219 self.verify_cert_skip_cache(cert)
220 .await
221 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
222 }
223
224 pub async fn multi_verify_certs(
225 &self,
226 certs: Vec<CertifiedTransaction>,
227 ) -> Vec<IotaResult<VerifiedCertificate>> {
228 let mut futures = Vec::with_capacity(certs.len());
231 for cert in certs {
232 futures.push(self.verify_cert(cert));
233 }
234 futures::future::join_all(futures).await
235 }
236
237 pub async fn verify_cert_skip_cache(
239 &self,
240 cert: CertifiedTransaction,
241 ) -> IotaResult<VerifiedCertificate> {
242 if cert.auth_sig().epoch != self.committee.epoch() {
245 return Err(IotaError::WrongEpoch {
246 expected_epoch: self.committee.epoch(),
247 actual_epoch: cert.auth_sig().epoch,
248 });
249 }
250
251 self.verify_cert_inner(cert).await
252 }
253
254 async fn verify_cert_inner(
255 &self,
256 cert: CertifiedTransaction,
257 ) -> IotaResult<VerifiedCertificate> {
258 let (tx, rx) = oneshot::channel();
263 pin_mut!(rx);
264
265 let prev_id_or_buffer = {
266 let mut queue = self.queue.lock();
267 queue.push(tx, cert);
268 if queue.len() == queue.capacity() {
269 Either::Right(CertBuffer::take_and_replace(queue))
270 } else {
271 Either::Left(queue.id)
272 }
273 };
274 let prev_id = match prev_id_or_buffer {
275 Either::Left(prev_id) => prev_id,
276 Either::Right(buffer) => {
277 self.metrics.full_batches.inc();
278 self.process_queue(buffer).await;
279 return rx.try_recv().unwrap();
281 }
282 };
283
284 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
285 return res.unwrap();
287 }
288 self.metrics.timeouts.inc();
289
290 let buffer = {
291 let queue = self.queue.lock();
292 if prev_id == queue.id {
294 debug_assert_ne!(queue.len(), queue.capacity());
295 Some(CertBuffer::take_and_replace(queue))
296 } else {
297 None
298 }
299 };
300
301 if let Some(buffer) = buffer {
302 self.metrics.partial_batches.inc();
303 self.process_queue(buffer).await;
304 return rx.try_recv().unwrap();
306 }
307
308 rx.await.unwrap()
311 }
312
313 async fn process_queue(&self, buffer: CertBuffer) {
314 let committee = self.committee.clone();
315 let metrics = self.metrics.clone();
316 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
317 Handle::current()
318 .spawn_blocking(move || {
319 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
320 })
321 .await
322 .expect("Spawn blocking should not fail");
323 }
324
325 fn process_queue_sync(
326 committee: Arc<Committee>,
327 metrics: Arc<SignatureVerifierMetrics>,
328 buffer: CertBuffer,
329 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
330 ) {
331 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
332
333 let results = batch_verify_certificates(
334 &committee,
335 &buffer.certs.iter().collect_vec(),
336 zklogin_inputs_cache,
337 );
338 izip!(
339 results.into_iter(),
340 buffer.certs.into_iter(),
341 buffer.senders.into_iter(),
342 )
343 .for_each(|(result, cert, tx)| {
344 tx.send(match result {
345 Ok(()) => {
346 metrics.total_verified_certs.inc();
347 Ok(VerifiedCertificate::new_unchecked(cert))
348 }
349 Err(e) => {
350 metrics.total_failed_certs.inc();
351 Err(e)
352 }
353 })
354 .ok();
355 });
356 }
357
358 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
361 let mut jwks = self.jwks.write();
362 match jwks.entry(jwk_id.clone()) {
363 im::hashmap::Entry::Occupied(_) => {
364 debug!("JWK with kid {:?} already exists", jwk_id);
365 }
366 im::hashmap::Entry::Vacant(entry) => {
367 debug!("inserting JWK with kid: {:?}", jwk_id);
368 entry.insert(jwk.clone());
369 }
370 }
371 }
372
373 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
374 let jwks = self.jwks.read();
375 jwks.get(jwk_id) == Some(jwk)
376 }
377
378 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
379 self.jwks.read().clone()
380 }
381
382 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
383 self.signed_data_cache.is_verified(
384 signed_tx.full_message_digest(),
385 || {
386 let jwks = self.jwks.read().clone();
387 let verify_params = VerifyParams::new(
388 jwks,
389 self.zk_login_params.env,
390 self.zk_login_params.accept_zklogin_in_multisig,
391 self.zk_login_params.accept_passkey_in_multisig,
392 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
393 );
394 verify_sender_signed_data_message_signatures(
395 signed_tx,
396 self.committee.epoch(),
397 &verify_params,
398 self.zklogin_inputs_cache.clone(),
399 )
400 },
401 || Ok(()),
402 )
403 }
404
405 pub fn clear_signature_cache(&self) {
406 self.certificate_cache.clear();
407 self.signed_data_cache.clear();
408 self.zklogin_inputs_cache.clear();
409 }
410}
411
412pub struct SignatureVerifierMetrics {
413 pub certificate_signatures_cache_hits: IntCounter,
414 pub certificate_signatures_cache_misses: IntCounter,
415 pub certificate_signatures_cache_evictions: IntCounter,
416 pub signed_data_cache_hits: IntCounter,
417 pub signed_data_cache_misses: IntCounter,
418 pub signed_data_cache_evictions: IntCounter,
419 pub zklogin_inputs_cache_hits: IntCounter,
420 pub zklogin_inputs_cache_misses: IntCounter,
421 pub zklogin_inputs_cache_evictions: IntCounter,
422 timeouts: IntCounter,
423 full_batches: IntCounter,
424 partial_batches: IntCounter,
425 total_verified_certs: IntCounter,
426 total_failed_certs: IntCounter,
427}
428
429impl SignatureVerifierMetrics {
430 pub fn new(registry: &Registry) -> Arc<Self> {
431 Arc::new(Self {
432 certificate_signatures_cache_hits: register_int_counter_with_registry!(
433 "certificate_signatures_cache_hits",
434 "Number of certificates which were known to be verified because of signature cache.",
435 registry
436 )
437 .unwrap(),
438 certificate_signatures_cache_misses: register_int_counter_with_registry!(
439 "certificate_signatures_cache_misses",
440 "Number of certificates which missed the signature cache",
441 registry
442 )
443 .unwrap(),
444 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
445 "certificate_signatures_cache_evictions",
446 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
447 registry
448 )
449 .unwrap(),
450 signed_data_cache_hits: register_int_counter_with_registry!(
451 "signed_data_cache_hits",
452 "Number of signed data which were known to be verified because of signature cache.",
453 registry
454 )
455 .unwrap(),
456 signed_data_cache_misses: register_int_counter_with_registry!(
457 "signed_data_cache_misses",
458 "Number of signed data which missed the signature cache.",
459 registry
460 )
461 .unwrap(),
462 signed_data_cache_evictions: register_int_counter_with_registry!(
463 "signed_data_cache_evictions",
464 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
465 registry
466 )
467 .unwrap(),
468 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
469 "zklogin_inputs_cache_hits",
470 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
471 registry
472 )
473 .unwrap(),
474 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
475 "zklogin_inputs_cache_misses",
476 "Number of zklogin signatures which missed the zklogin inputs cache.",
477 registry
478 )
479 .unwrap(),
480 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
481 "zklogin_inputs_cache_evictions",
482 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
483 registry
484 )
485 .unwrap(),
486 timeouts: register_int_counter_with_registry!(
487 "async_batch_verifier_timeouts",
488 "Number of times batch verifier times out and verifies a partial batch",
489 registry
490 )
491 .unwrap(),
492 full_batches: register_int_counter_with_registry!(
493 "async_batch_verifier_full_batches",
494 "Number of times batch verifier verifies a full batch",
495 registry
496 )
497 .unwrap(),
498 partial_batches: register_int_counter_with_registry!(
499 "async_batch_verifier_partial_batches",
500 "Number of times batch verifier verifies a partial batch",
501 registry
502 )
503 .unwrap(),
504 total_verified_certs: register_int_counter_with_registry!(
505 "async_batch_verifier_total_verified_certs",
506 "Total number of certs batch verifier has verified",
507 registry
508 )
509 .unwrap(),
510 total_failed_certs: register_int_counter_with_registry!(
511 "async_batch_verifier_total_failed_certs",
512 "Total number of certs batch verifier has rejected",
513 registry
514 )
515 .unwrap(),
516 })
517 }
518}
519
520pub fn batch_verify_all_certificates_and_checkpoints(
522 committee: &Committee,
523 certs: &[&CertifiedTransaction],
524 checkpoints: &[&SignedCheckpointSummary],
525) -> IotaResult {
526 for ckpt in checkpoints {
529 ckpt.data().verify_epoch(committee.epoch())?;
530 }
531
532 batch_verify(committee, certs, checkpoints)
533}
534
535pub fn batch_verify_certificates(
538 committee: &Committee,
539 certs: &[&CertifiedTransaction],
540 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
541) -> Vec<IotaResult> {
542 let verify_params = VerifyParams::default();
544 match batch_verify(committee, certs, &[]) {
545 Ok(_) => vec![Ok(()); certs.len()],
546
547 Err(_) if certs.len() > 1 => certs
549 .iter()
550 .map(|c| {
553 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
554 })
555 .collect(),
556
557 Err(e) => vec![Err(e)],
558 }
559}
560
561fn batch_verify(
562 committee: &Committee,
563 certs: &[&CertifiedTransaction],
564 checkpoints: &[&SignedCheckpointSummary],
565) -> IotaResult {
566 let mut obligation = VerificationObligation::default();
567
568 for cert in certs {
569 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
570 cert.auth_sig()
571 .add_to_verification_obligation(committee, &mut obligation, idx)?;
572 }
573
574 for ckpt in checkpoints {
575 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
576 ckpt.auth_sig()
577 .add_to_verification_obligation(committee, &mut obligation, idx)?;
578 }
579
580 obligation.verify_all()
581}