1use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9 zk_login::{JWK, JwkId},
10 zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16 committee::Committee,
17 crypto::{AuthoritySignInfoTrait, VerificationObligation},
18 digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19 error::{IotaError, IotaResult},
20 message_envelope::Message,
21 messages_checkpoint::SignedCheckpointSummary,
22 signature::VerifyParams,
23 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::izip;
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32 runtime::Handle,
33 sync::oneshot,
34 time::{Duration, timeout},
35};
36use tracing::debug;
37const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51 certs: Vec<CertifiedTransaction>,
52 senders: Vec<Sender>,
53 id: u64,
54}
55
56impl CertBuffer {
57 fn new(capacity: usize) -> Self {
58 Self {
59 certs: Vec::with_capacity(capacity),
60 senders: Vec::with_capacity(capacity),
61 id: 0,
62 }
63 }
64
65 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68 let this = &mut *guard;
69 let mut new = CertBuffer::new(this.capacity());
70 new.id = this.id + 1;
71 std::mem::swap(&mut new, this);
72 new
73 }
74
75 fn capacity(&self) -> usize {
76 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77 self.certs.capacity()
78 }
79
80 fn len(&self) -> usize {
81 debug_assert_eq!(self.certs.len(), self.senders.len());
82 self.certs.len()
83 }
84
85 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86 self.senders.push(tx);
87 self.certs.push(cert);
88 }
89}
90
91pub struct SignatureVerifier {
96 committee: Arc<Committee>,
97 certificate_cache: VerifiedDigestCache<CertificateDigest>,
98 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101 jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109 zk_login_params: ZkLoginParams,
112
113 queue: Mutex<CertBuffer>,
114 pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117#[derive(Clone)]
119struct ZkLoginParams {
120 pub env: ZkLoginEnv,
123 pub accept_zklogin_in_multisig: bool,
125 pub accept_passkey_in_multisig: bool,
127 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
129}
130
131impl SignatureVerifier {
132 pub fn new_with_batch_size(
133 committee: Arc<Committee>,
134 batch_size: usize,
135 metrics: Arc<SignatureVerifierMetrics>,
136 env: ZkLoginEnv,
137 accept_zklogin_in_multisig: bool,
138 accept_passkey_in_multisig: bool,
139 zklogin_max_epoch_upper_bound_delta: Option<u64>,
140 ) -> Self {
141 Self {
142 committee,
143 certificate_cache: VerifiedDigestCache::new(
144 metrics.certificate_signatures_cache_hits.clone(),
145 metrics.certificate_signatures_cache_misses.clone(),
146 metrics.certificate_signatures_cache_evictions.clone(),
147 ),
148 signed_data_cache: VerifiedDigestCache::new(
149 metrics.signed_data_cache_hits.clone(),
150 metrics.signed_data_cache_misses.clone(),
151 metrics.signed_data_cache_evictions.clone(),
152 ),
153 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
154 metrics.zklogin_inputs_cache_hits.clone(),
155 metrics.zklogin_inputs_cache_misses.clone(),
156 metrics.zklogin_inputs_cache_evictions.clone(),
157 )),
158 jwks: Default::default(),
159 queue: Mutex::new(CertBuffer::new(batch_size)),
160 metrics,
161 zk_login_params: ZkLoginParams {
162 env,
163 accept_zklogin_in_multisig,
164 accept_passkey_in_multisig,
165 zklogin_max_epoch_upper_bound_delta,
166 },
167 }
168 }
169
170 pub fn new(
171 committee: Arc<Committee>,
172 metrics: Arc<SignatureVerifierMetrics>,
173 zklogin_env: ZkLoginEnv,
174 accept_zklogin_in_multisig: bool,
175 accept_passkey_in_multisig: bool,
176 zklogin_max_epoch_upper_bound_delta: Option<u64>,
177 ) -> Self {
178 Self::new_with_batch_size(
179 committee,
180 MAX_BATCH_SIZE,
181 metrics,
182 zklogin_env,
183 accept_zklogin_in_multisig,
184 accept_passkey_in_multisig,
185 zklogin_max_epoch_upper_bound_delta,
186 )
187 }
188
189 pub fn verify_certs_and_checkpoints(
191 &self,
192 certs: Vec<CertifiedTransaction>,
193 checkpoints: Vec<SignedCheckpointSummary>,
194 ) -> IotaResult {
195 let certs: Vec<_> = certs
196 .into_iter()
197 .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
198 .collect();
199
200 for cert in &certs {
204 self.verify_tx(cert.data())?;
205 }
206 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
207 self.certificate_cache
208 .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
209 Ok(())
210 }
211
212 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
214 let cert_digest = cert.certificate_digest();
215 if self.certificate_cache.is_cached(&cert_digest) {
216 return Ok(VerifiedCertificate::new_unchecked(cert));
217 }
218 self.verify_tx(cert.data())?;
219 self.verify_cert_skip_cache(cert)
220 .await
221 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
222 }
223
224 pub async fn multi_verify_certs(
225 &self,
226 certs: Vec<CertifiedTransaction>,
227 ) -> Vec<IotaResult<VerifiedCertificate>> {
228 let mut futures = Vec::with_capacity(certs.len());
231 for cert in certs {
232 futures.push(self.verify_cert(cert));
233 }
234 futures::future::join_all(futures).await
235 }
236
237 pub async fn verify_cert_skip_cache(
239 &self,
240 cert: CertifiedTransaction,
241 ) -> IotaResult<VerifiedCertificate> {
242 if cert.auth_sig().epoch != self.committee.epoch() {
245 return Err(IotaError::WrongEpoch {
246 expected_epoch: self.committee.epoch(),
247 actual_epoch: cert.auth_sig().epoch,
248 });
249 }
250
251 self.verify_cert_inner(cert).await
252 }
253
254 async fn verify_cert_inner(
255 &self,
256 cert: CertifiedTransaction,
257 ) -> IotaResult<VerifiedCertificate> {
258 let (tx, rx) = oneshot::channel();
263 pin_mut!(rx);
264
265 let prev_id_or_buffer = {
266 let mut queue = self.queue.lock();
267 queue.push(tx, cert);
268 if queue.len() == queue.capacity() {
269 Either::Right(CertBuffer::take_and_replace(queue))
270 } else {
271 Either::Left(queue.id)
272 }
273 };
274 let prev_id = match prev_id_or_buffer {
275 Either::Left(prev_id) => prev_id,
276 Either::Right(buffer) => {
277 self.metrics.full_batches.inc();
278 self.process_queue(buffer).await;
279 return rx.try_recv().unwrap();
281 }
282 };
283
284 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
285 return res.unwrap();
287 }
288 self.metrics.timeouts.inc();
289
290 let buffer = {
291 let queue = self.queue.lock();
292 if prev_id == queue.id {
294 debug_assert_ne!(queue.len(), queue.capacity());
295 Some(CertBuffer::take_and_replace(queue))
296 } else {
297 None
298 }
299 };
300
301 if let Some(buffer) = buffer {
302 self.metrics.partial_batches.inc();
303 self.process_queue(buffer).await;
304 return rx.try_recv().unwrap();
306 }
307
308 rx.await.unwrap()
311 }
312
313 async fn process_queue(&self, buffer: CertBuffer) {
314 let committee = self.committee.clone();
315 let metrics = self.metrics.clone();
316 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
317 Handle::current()
318 .spawn_blocking(move || {
319 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
320 })
321 .await
322 .expect("Spawn blocking should not fail");
323 }
324
325 fn process_queue_sync(
326 committee: Arc<Committee>,
327 metrics: Arc<SignatureVerifierMetrics>,
328 buffer: CertBuffer,
329 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
330 ) {
331 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
332
333 let results = batch_verify_certificates(&committee, &buffer.certs, zklogin_inputs_cache);
334 izip!(
335 results.into_iter(),
336 buffer.certs.into_iter(),
337 buffer.senders.into_iter(),
338 )
339 .for_each(|(result, cert, tx)| {
340 tx.send(match result {
341 Ok(()) => {
342 metrics.total_verified_certs.inc();
343 Ok(VerifiedCertificate::new_unchecked(cert))
344 }
345 Err(e) => {
346 metrics.total_failed_certs.inc();
347 Err(e)
348 }
349 })
350 .ok();
351 });
352 }
353
354 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
357 let mut jwks = self.jwks.write();
358 match jwks.entry(jwk_id.clone()) {
359 im::hashmap::Entry::Occupied(_) => {
360 debug!("JWK with kid {:?} already exists", jwk_id);
361 }
362 im::hashmap::Entry::Vacant(entry) => {
363 debug!("inserting JWK with kid: {:?}", jwk_id);
364 entry.insert(jwk.clone());
365 }
366 }
367 }
368
369 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
370 let jwks = self.jwks.read();
371 jwks.get(jwk_id) == Some(jwk)
372 }
373
374 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
375 self.jwks.read().clone()
376 }
377
378 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
379 self.signed_data_cache.is_verified(
380 signed_tx.full_message_digest(),
381 || {
382 let jwks = self.jwks.read().clone();
383 let verify_params = VerifyParams::new(
384 jwks,
385 self.zk_login_params.env,
386 self.zk_login_params.accept_zklogin_in_multisig,
387 self.zk_login_params.accept_passkey_in_multisig,
388 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
389 );
390 verify_sender_signed_data_message_signatures(
391 signed_tx,
392 self.committee.epoch(),
393 &verify_params,
394 self.zklogin_inputs_cache.clone(),
395 )
396 },
397 || Ok(()),
398 )
399 }
400
401 pub fn clear_signature_cache(&self) {
402 self.certificate_cache.clear();
403 self.signed_data_cache.clear();
404 self.zklogin_inputs_cache.clear();
405 }
406}
407
408pub struct SignatureVerifierMetrics {
409 pub certificate_signatures_cache_hits: IntCounter,
410 pub certificate_signatures_cache_misses: IntCounter,
411 pub certificate_signatures_cache_evictions: IntCounter,
412 pub signed_data_cache_hits: IntCounter,
413 pub signed_data_cache_misses: IntCounter,
414 pub signed_data_cache_evictions: IntCounter,
415 pub zklogin_inputs_cache_hits: IntCounter,
416 pub zklogin_inputs_cache_misses: IntCounter,
417 pub zklogin_inputs_cache_evictions: IntCounter,
418 timeouts: IntCounter,
419 full_batches: IntCounter,
420 partial_batches: IntCounter,
421 total_verified_certs: IntCounter,
422 total_failed_certs: IntCounter,
423}
424
425impl SignatureVerifierMetrics {
426 pub fn new(registry: &Registry) -> Arc<Self> {
427 Arc::new(Self {
428 certificate_signatures_cache_hits: register_int_counter_with_registry!(
429 "certificate_signatures_cache_hits",
430 "Number of certificates which were known to be verified because of signature cache.",
431 registry
432 )
433 .unwrap(),
434 certificate_signatures_cache_misses: register_int_counter_with_registry!(
435 "certificate_signatures_cache_misses",
436 "Number of certificates which missed the signature cache",
437 registry
438 )
439 .unwrap(),
440 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
441 "certificate_signatures_cache_evictions",
442 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
443 registry
444 )
445 .unwrap(),
446 signed_data_cache_hits: register_int_counter_with_registry!(
447 "signed_data_cache_hits",
448 "Number of signed data which were known to be verified because of signature cache.",
449 registry
450 )
451 .unwrap(),
452 signed_data_cache_misses: register_int_counter_with_registry!(
453 "signed_data_cache_misses",
454 "Number of signed data which missed the signature cache.",
455 registry
456 )
457 .unwrap(),
458 signed_data_cache_evictions: register_int_counter_with_registry!(
459 "signed_data_cache_evictions",
460 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
461 registry
462 )
463 .unwrap(),
464 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
465 "zklogin_inputs_cache_hits",
466 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
467 registry
468 )
469 .unwrap(),
470 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
471 "zklogin_inputs_cache_misses",
472 "Number of zklogin signatures which missed the zklogin inputs cache.",
473 registry
474 )
475 .unwrap(),
476 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
477 "zklogin_inputs_cache_evictions",
478 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
479 registry
480 )
481 .unwrap(),
482 timeouts: register_int_counter_with_registry!(
483 "async_batch_verifier_timeouts",
484 "Number of times batch verifier times out and verifies a partial batch",
485 registry
486 )
487 .unwrap(),
488 full_batches: register_int_counter_with_registry!(
489 "async_batch_verifier_full_batches",
490 "Number of times batch verifier verifies a full batch",
491 registry
492 )
493 .unwrap(),
494 partial_batches: register_int_counter_with_registry!(
495 "async_batch_verifier_partial_batches",
496 "Number of times batch verifier verifies a partial batch",
497 registry
498 )
499 .unwrap(),
500 total_verified_certs: register_int_counter_with_registry!(
501 "async_batch_verifier_total_verified_certs",
502 "Total number of certs batch verifier has verified",
503 registry
504 )
505 .unwrap(),
506 total_failed_certs: register_int_counter_with_registry!(
507 "async_batch_verifier_total_failed_certs",
508 "Total number of certs batch verifier has rejected",
509 registry
510 )
511 .unwrap(),
512 })
513 }
514}
515
516pub fn batch_verify_all_certificates_and_checkpoints(
518 committee: &Committee,
519 certs: &[CertifiedTransaction],
520 checkpoints: &[SignedCheckpointSummary],
521) -> IotaResult {
522 for ckpt in checkpoints {
525 ckpt.data().verify_epoch(committee.epoch())?;
526 }
527
528 batch_verify(committee, certs, checkpoints)
529}
530
531pub fn batch_verify_certificates(
534 committee: &Committee,
535 certs: &[CertifiedTransaction],
536 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
537) -> Vec<IotaResult> {
538 let verify_params = VerifyParams::default();
540 match batch_verify(committee, certs, &[]) {
541 Ok(_) => vec![Ok(()); certs.len()],
542
543 Err(_) if certs.len() > 1 => certs
545 .iter()
546 .map(|c| {
549 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
550 })
551 .collect(),
552
553 Err(e) => vec![Err(e)],
554 }
555}
556
557fn batch_verify(
558 committee: &Committee,
559 certs: &[CertifiedTransaction],
560 checkpoints: &[SignedCheckpointSummary],
561) -> IotaResult {
562 let mut obligation = VerificationObligation::default();
563
564 for cert in certs {
565 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
566 cert.auth_sig()
567 .add_to_verification_obligation(committee, &mut obligation, idx)?;
568 }
569
570 for ckpt in checkpoints {
571 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
572 ckpt.auth_sig()
573 .add_to_verification_obligation(committee, &mut obligation, idx)?;
574 }
575
576 obligation.verify_all()
577}