1use std::sync::Arc;
6
7use either::Either;
8use fastcrypto_zkp::bn254::{
9 zk_login::{JWK, JwkId},
10 zk_login_api::ZkLoginEnv,
11};
12use futures::pin_mut;
13use im::hashmap::HashMap as ImHashMap;
14use iota_metrics::monitored_scope;
15use iota_types::{
16 committee::Committee,
17 crypto::{AuthoritySignInfoTrait, VerificationObligation},
18 digests::{CertificateDigest, SenderSignedDataDigest, ZKLoginInputsDigest},
19 error::{IotaError, IotaResult},
20 message_envelope::Message,
21 messages_checkpoint::SignedCheckpointSummary,
22 signature::VerifyParams,
23 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
24 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
25};
26use itertools::izip;
27use parking_lot::{Mutex, MutexGuard, RwLock};
28use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
29use shared_crypto::intent::Intent;
30use tap::TapFallible;
31use tokio::{
32 runtime::Handle,
33 sync::oneshot,
34 time::{Duration, timeout},
35};
36use tracing::debug;
37const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
40
41const MAX_BATCH_SIZE: usize = 8;
47
48type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
49
50struct CertBuffer {
51 certs: Vec<CertifiedTransaction>,
52 senders: Vec<Sender>,
53 id: u64,
54}
55
56impl CertBuffer {
57 fn new(capacity: usize) -> Self {
58 Self {
59 certs: Vec::with_capacity(capacity),
60 senders: Vec::with_capacity(capacity),
61 id: 0,
62 }
63 }
64
65 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
68 let this = &mut *guard;
69 let mut new = CertBuffer::new(this.capacity());
70 new.id = this.id + 1;
71 std::mem::swap(&mut new, this);
72 new
73 }
74
75 fn capacity(&self) -> usize {
76 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
77 self.certs.capacity()
78 }
79
80 fn len(&self) -> usize {
81 debug_assert_eq!(self.certs.len(), self.senders.len());
82 self.certs.len()
83 }
84
85 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
86 self.senders.push(tx);
87 self.certs.push(cert);
88 }
89}
90
91pub struct SignatureVerifier {
96 committee: Arc<Committee>,
97 certificate_cache: VerifiedDigestCache<CertificateDigest>,
98 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
100
101 jwks: RwLock<ImHashMap<JwkId, JWK>>,
108
109 zk_login_params: ZkLoginParams,
112
113 queue: Mutex<CertBuffer>,
114 pub metrics: Arc<SignatureVerifierMetrics>,
115}
116
117#[derive(Clone)]
119struct ZkLoginParams {
120 pub env: ZkLoginEnv,
123 pub accept_zklogin_in_multisig: bool,
125 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
127}
128
129impl SignatureVerifier {
130 pub fn new_with_batch_size(
131 committee: Arc<Committee>,
132 batch_size: usize,
133 metrics: Arc<SignatureVerifierMetrics>,
134 env: ZkLoginEnv,
135 accept_zklogin_in_multisig: bool,
136 zklogin_max_epoch_upper_bound_delta: Option<u64>,
137 ) -> Self {
138 Self {
139 committee,
140 certificate_cache: VerifiedDigestCache::new(
141 metrics.certificate_signatures_cache_hits.clone(),
142 metrics.certificate_signatures_cache_misses.clone(),
143 metrics.certificate_signatures_cache_evictions.clone(),
144 ),
145 signed_data_cache: VerifiedDigestCache::new(
146 metrics.signed_data_cache_hits.clone(),
147 metrics.signed_data_cache_misses.clone(),
148 metrics.signed_data_cache_evictions.clone(),
149 ),
150 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
151 metrics.zklogin_inputs_cache_hits.clone(),
152 metrics.zklogin_inputs_cache_misses.clone(),
153 metrics.zklogin_inputs_cache_evictions.clone(),
154 )),
155 jwks: Default::default(),
156 queue: Mutex::new(CertBuffer::new(batch_size)),
157 metrics,
158 zk_login_params: ZkLoginParams {
159 env,
160 accept_zklogin_in_multisig,
161 zklogin_max_epoch_upper_bound_delta,
162 },
163 }
164 }
165
166 pub fn new(
167 committee: Arc<Committee>,
168 metrics: Arc<SignatureVerifierMetrics>,
169 zklogin_env: ZkLoginEnv,
170 accept_zklogin_in_multisig: bool,
171 zklogin_max_epoch_upper_bound_delta: Option<u64>,
172 ) -> Self {
173 Self::new_with_batch_size(
174 committee,
175 MAX_BATCH_SIZE,
176 metrics,
177 zklogin_env,
178 accept_zklogin_in_multisig,
179 zklogin_max_epoch_upper_bound_delta,
180 )
181 }
182
183 pub fn verify_certs_and_checkpoints(
185 &self,
186 certs: Vec<CertifiedTransaction>,
187 checkpoints: Vec<SignedCheckpointSummary>,
188 ) -> IotaResult {
189 let certs: Vec<_> = certs
190 .into_iter()
191 .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
192 .collect();
193
194 for cert in &certs {
198 self.verify_tx(cert.data())?;
199 }
200 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
201 self.certificate_cache
202 .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
203 Ok(())
204 }
205
206 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
208 let cert_digest = cert.certificate_digest();
209 if self.certificate_cache.is_cached(&cert_digest) {
210 return Ok(VerifiedCertificate::new_unchecked(cert));
211 }
212 self.verify_tx(cert.data())?;
213 self.verify_cert_skip_cache(cert)
214 .await
215 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
216 }
217
218 pub async fn multi_verify_certs(
219 &self,
220 certs: Vec<CertifiedTransaction>,
221 ) -> Vec<IotaResult<VerifiedCertificate>> {
222 let mut futures = Vec::with_capacity(certs.len());
225 for cert in certs {
226 futures.push(self.verify_cert(cert));
227 }
228 futures::future::join_all(futures).await
229 }
230
231 pub async fn verify_cert_skip_cache(
233 &self,
234 cert: CertifiedTransaction,
235 ) -> IotaResult<VerifiedCertificate> {
236 if cert.auth_sig().epoch != self.committee.epoch() {
239 return Err(IotaError::WrongEpoch {
240 expected_epoch: self.committee.epoch(),
241 actual_epoch: cert.auth_sig().epoch,
242 });
243 }
244
245 self.verify_cert_inner(cert).await
246 }
247
248 async fn verify_cert_inner(
249 &self,
250 cert: CertifiedTransaction,
251 ) -> IotaResult<VerifiedCertificate> {
252 let (tx, rx) = oneshot::channel();
257 pin_mut!(rx);
258
259 let prev_id_or_buffer = {
260 let mut queue = self.queue.lock();
261 queue.push(tx, cert);
262 if queue.len() == queue.capacity() {
263 Either::Right(CertBuffer::take_and_replace(queue))
264 } else {
265 Either::Left(queue.id)
266 }
267 };
268 let prev_id = match prev_id_or_buffer {
269 Either::Left(prev_id) => prev_id,
270 Either::Right(buffer) => {
271 self.metrics.full_batches.inc();
272 self.process_queue(buffer).await;
273 return rx.try_recv().unwrap();
275 }
276 };
277
278 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
279 return res.unwrap();
281 }
282 self.metrics.timeouts.inc();
283
284 let buffer = {
285 let queue = self.queue.lock();
286 if prev_id == queue.id {
288 debug_assert_ne!(queue.len(), queue.capacity());
289 Some(CertBuffer::take_and_replace(queue))
290 } else {
291 None
292 }
293 };
294
295 if let Some(buffer) = buffer {
296 self.metrics.partial_batches.inc();
297 self.process_queue(buffer).await;
298 return rx.try_recv().unwrap();
300 }
301
302 rx.await.unwrap()
305 }
306
307 async fn process_queue(&self, buffer: CertBuffer) {
308 let committee = self.committee.clone();
309 let metrics = self.metrics.clone();
310 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
311 Handle::current()
312 .spawn_blocking(move || {
313 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
314 })
315 .await
316 .expect("Spawn blocking should not fail");
317 }
318
319 fn process_queue_sync(
320 committee: Arc<Committee>,
321 metrics: Arc<SignatureVerifierMetrics>,
322 buffer: CertBuffer,
323 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
324 ) {
325 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
326
327 let results = batch_verify_certificates(&committee, &buffer.certs, zklogin_inputs_cache);
328 izip!(
329 results.into_iter(),
330 buffer.certs.into_iter(),
331 buffer.senders.into_iter(),
332 )
333 .for_each(|(result, cert, tx)| {
334 tx.send(match result {
335 Ok(()) => {
336 metrics.total_verified_certs.inc();
337 Ok(VerifiedCertificate::new_unchecked(cert))
338 }
339 Err(e) => {
340 metrics.total_failed_certs.inc();
341 Err(e)
342 }
343 })
344 .ok();
345 });
346 }
347
348 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
351 let mut jwks = self.jwks.write();
352 match jwks.entry(jwk_id.clone()) {
353 im::hashmap::Entry::Occupied(_) => {
354 debug!("JWK with kid {:?} already exists", jwk_id);
355 }
356 im::hashmap::Entry::Vacant(entry) => {
357 debug!("inserting JWK with kid: {:?}", jwk_id);
358 entry.insert(jwk.clone());
359 }
360 }
361 }
362
363 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
364 let jwks = self.jwks.read();
365 jwks.get(jwk_id) == Some(jwk)
366 }
367
368 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
369 self.jwks.read().clone()
370 }
371
372 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
373 self.signed_data_cache.is_verified(
374 signed_tx.full_message_digest(),
375 || {
376 let jwks = self.jwks.read().clone();
377 let verify_params = VerifyParams::new(
378 jwks,
379 self.zk_login_params.env,
380 self.zk_login_params.accept_zklogin_in_multisig,
381 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
382 );
383 verify_sender_signed_data_message_signatures(
384 signed_tx,
385 self.committee.epoch(),
386 &verify_params,
387 self.zklogin_inputs_cache.clone(),
388 )
389 },
390 || Ok(()),
391 )
392 }
393
394 pub fn clear_signature_cache(&self) {
395 self.certificate_cache.clear();
396 self.signed_data_cache.clear();
397 self.zklogin_inputs_cache.clear();
398 }
399}
400
401pub struct SignatureVerifierMetrics {
402 pub certificate_signatures_cache_hits: IntCounter,
403 pub certificate_signatures_cache_misses: IntCounter,
404 pub certificate_signatures_cache_evictions: IntCounter,
405 pub signed_data_cache_hits: IntCounter,
406 pub signed_data_cache_misses: IntCounter,
407 pub signed_data_cache_evictions: IntCounter,
408 pub zklogin_inputs_cache_hits: IntCounter,
409 pub zklogin_inputs_cache_misses: IntCounter,
410 pub zklogin_inputs_cache_evictions: IntCounter,
411 timeouts: IntCounter,
412 full_batches: IntCounter,
413 partial_batches: IntCounter,
414 total_verified_certs: IntCounter,
415 total_failed_certs: IntCounter,
416}
417
418impl SignatureVerifierMetrics {
419 pub fn new(registry: &Registry) -> Arc<Self> {
420 Arc::new(Self {
421 certificate_signatures_cache_hits: register_int_counter_with_registry!(
422 "certificate_signatures_cache_hits",
423 "Number of certificates which were known to be verified because of signature cache.",
424 registry
425 )
426 .unwrap(),
427 certificate_signatures_cache_misses: register_int_counter_with_registry!(
428 "certificate_signatures_cache_misses",
429 "Number of certificates which missed the signature cache",
430 registry
431 )
432 .unwrap(),
433 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
434 "certificate_signatures_cache_evictions",
435 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
436 registry
437 )
438 .unwrap(),
439 signed_data_cache_hits: register_int_counter_with_registry!(
440 "signed_data_cache_hits",
441 "Number of signed data which were known to be verified because of signature cache.",
442 registry
443 )
444 .unwrap(),
445 signed_data_cache_misses: register_int_counter_with_registry!(
446 "signed_data_cache_misses",
447 "Number of signed data which missed the signature cache.",
448 registry
449 )
450 .unwrap(),
451 signed_data_cache_evictions: register_int_counter_with_registry!(
452 "signed_data_cache_evictions",
453 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
454 registry
455 )
456 .unwrap(),
457 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
458 "zklogin_inputs_cache_hits",
459 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
460 registry
461 )
462 .unwrap(),
463 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
464 "zklogin_inputs_cache_misses",
465 "Number of zklogin signatures which missed the zklogin inputs cache.",
466 registry
467 )
468 .unwrap(),
469 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
470 "zklogin_inputs_cache_evictions",
471 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
472 registry
473 )
474 .unwrap(),
475 timeouts: register_int_counter_with_registry!(
476 "async_batch_verifier_timeouts",
477 "Number of times batch verifier times out and verifies a partial batch",
478 registry
479 )
480 .unwrap(),
481 full_batches: register_int_counter_with_registry!(
482 "async_batch_verifier_full_batches",
483 "Number of times batch verifier verifies a full batch",
484 registry
485 )
486 .unwrap(),
487 partial_batches: register_int_counter_with_registry!(
488 "async_batch_verifier_partial_batches",
489 "Number of times batch verifier verifies a partial batch",
490 registry
491 )
492 .unwrap(),
493 total_verified_certs: register_int_counter_with_registry!(
494 "async_batch_verifier_total_verified_certs",
495 "Total number of certs batch verifier has verified",
496 registry
497 )
498 .unwrap(),
499 total_failed_certs: register_int_counter_with_registry!(
500 "async_batch_verifier_total_failed_certs",
501 "Total number of certs batch verifier has rejected",
502 registry
503 )
504 .unwrap(),
505 })
506 }
507}
508
509pub fn batch_verify_all_certificates_and_checkpoints(
511 committee: &Committee,
512 certs: &[CertifiedTransaction],
513 checkpoints: &[SignedCheckpointSummary],
514) -> IotaResult {
515 for ckpt in checkpoints {
518 ckpt.data().verify_epoch(committee.epoch())?;
519 }
520
521 batch_verify(committee, certs, checkpoints)
522}
523
524pub fn batch_verify_certificates(
527 committee: &Committee,
528 certs: &[CertifiedTransaction],
529 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
530) -> Vec<IotaResult> {
531 let verify_params = VerifyParams::default();
533 match batch_verify(committee, certs, &[]) {
534 Ok(_) => vec![Ok(()); certs.len()],
535
536 Err(_) if certs.len() > 1 => certs
538 .iter()
539 .map(|c| {
542 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
543 })
544 .collect(),
545
546 Err(e) => vec![Err(e)],
547 }
548}
549
550fn batch_verify(
551 committee: &Committee,
552 certs: &[CertifiedTransaction],
553 checkpoints: &[SignedCheckpointSummary],
554) -> IotaResult {
555 let mut obligation = VerificationObligation::default();
556
557 for cert in certs {
558 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
559 cert.auth_sig()
560 .add_to_verification_obligation(committee, &mut obligation, idx)?;
561 }
562
563 for ckpt in checkpoints {
564 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
565 ckpt.auth_sig()
566 .add_to_verification_obligation(committee, &mut obligation, idx)?;
567 }
568
569 obligation.verify_all()
570}