1use std::{collections::BTreeSet, sync::Arc};
6
7use either::Either;
8use fastcrypto::traits::{AggregateAuthenticator, ToFromBytes};
9use futures::pin_mut;
10use iota_metrics::monitored_scope;
11use iota_sdk_types::crypto::Intent;
12use iota_types::{
13 base_types::AuthorityName,
14 committee::Committee,
15 crypto::{AuthorityPublicKey, AuthoritySignInfoTrait, VerificationObligation},
16 digests::{CertificateDigest, SenderSignedDataDigest},
17 error::{IotaError, IotaResult},
18 message_envelope::Message,
19 messages_checkpoint::SignedCheckpointSummary,
20 messages_consensus::{AuthorityCapabilitiesDigest, SignedAuthorityCapabilitiesV1},
21 signature::VerifyParams,
22 signature_verification::{VerifiedDigestCache, verify_sender_signed_data_message_signatures},
23 transaction::{CertifiedTransaction, SenderSignedData, VerifiedCertificate},
24};
25use itertools::{Itertools as _, izip};
26use parking_lot::{Mutex, MutexGuard};
27use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
28use tap::TapFallible;
29use tokio::{
30 runtime::Handle,
31 sync::oneshot,
32 time::{Duration, timeout},
33};
34use tracing::{Instrument, instrument, trace_span};
35const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
38
39const MAX_BATCH_SIZE: usize = 8;
45
46type Sender = oneshot::Sender<IotaResult<VerifiedCertificate>>;
47
48struct CertBuffer {
49 certs: Vec<CertifiedTransaction>,
50 senders: Vec<Sender>,
51 id: u64,
52}
53
54impl CertBuffer {
55 fn new(capacity: usize) -> Self {
56 Self {
57 certs: Vec::with_capacity(capacity),
58 senders: Vec::with_capacity(capacity),
59 id: 0,
60 }
61 }
62
63 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
66 let this = &mut *guard;
67 let mut new = CertBuffer::new(this.capacity());
68 new.id = this.id + 1;
69 std::mem::swap(&mut new, this);
70 new
71 }
72
73 fn capacity(&self) -> usize {
74 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
75 self.certs.capacity()
76 }
77
78 fn len(&self) -> usize {
79 debug_assert_eq!(self.certs.len(), self.senders.len());
80 self.certs.len()
81 }
82
83 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
84 self.senders.push(tx);
85 self.certs.push(cert);
86 }
87}
88
89pub struct SignatureVerifier {
94 committee: Arc<Committee>,
95 non_committee_validators: BTreeSet<AuthorityName>,
96
97 certificate_cache: VerifiedDigestCache<CertificateDigest>,
98 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
99 authority_capability_cache: VerifiedDigestCache<AuthorityCapabilitiesDigest>,
100
101 verify_params: VerifyParams,
103
104 queue: Mutex<CertBuffer>,
105 pub metrics: Arc<SignatureVerifierMetrics>,
106}
107
108impl SignatureVerifier {
109 pub fn new_with_batch_size(
110 committee: Arc<Committee>,
111 non_committee_validators: BTreeSet<AuthorityName>,
112 batch_size: usize,
113 metrics: Arc<SignatureVerifierMetrics>,
114 accept_passkey_in_multisig: bool,
115 additional_multisig_checks: bool,
116 ) -> Self {
117 Self {
118 committee,
119 non_committee_validators,
120 certificate_cache: VerifiedDigestCache::new(
121 metrics.certificate_signatures_cache_hits.clone(),
122 metrics.certificate_signatures_cache_misses.clone(),
123 metrics.certificate_signatures_cache_evictions.clone(),
124 ),
125 signed_data_cache: VerifiedDigestCache::new(
126 metrics.signed_data_cache_hits.clone(),
127 metrics.signed_data_cache_misses.clone(),
128 metrics.signed_data_cache_evictions.clone(),
129 ),
130 authority_capability_cache: VerifiedDigestCache::new(
131 metrics.authority_capabilities_cache_hits.clone(),
132 metrics.authority_capabilities_cache_misses.clone(),
133 metrics.authority_capabilities_cache_evictions.clone(),
134 ),
135 queue: Mutex::new(CertBuffer::new(batch_size)),
136 metrics,
137 verify_params: VerifyParams::new(
138 accept_passkey_in_multisig,
139 additional_multisig_checks,
140 ),
141 }
142 }
143
144 pub fn new(
145 committee: Arc<Committee>,
146 non_committee_validators: BTreeSet<AuthorityName>,
147 metrics: Arc<SignatureVerifierMetrics>,
148 accept_passkey_in_multisig: bool,
149 additional_multisig_checks: bool,
150 ) -> Self {
151 Self::new_with_batch_size(
152 committee,
153 non_committee_validators,
154 MAX_BATCH_SIZE,
155 metrics,
156 accept_passkey_in_multisig,
157 additional_multisig_checks,
158 )
159 }
160
161 #[instrument(level = "trace", skip_all)]
163 pub fn verify_certs_and_checkpoints(
164 &self,
165 certs: Vec<&CertifiedTransaction>,
166 checkpoints: Vec<&SignedCheckpointSummary>,
167 authority_capabilities: Vec<&SignedAuthorityCapabilitiesV1>,
168 ) -> IotaResult {
169 for cert in &certs {
172 self.verify_tx(cert.data())?;
173 }
174
175 for cap in &authority_capabilities {
178 self.verify_authority_capabilities(cap)?;
179 }
180
181 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
182 Ok(())
183 }
184
185 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> IotaResult<VerifiedCertificate> {
187 let cert_digest = cert.certificate_digest();
188 if self.certificate_cache.is_cached(&cert_digest) {
189 return Ok(VerifiedCertificate::new_unchecked(cert));
190 }
191 self.verify_tx(cert.data())?;
192 self.verify_cert_skip_cache(cert)
193 .await
194 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
195 }
196
197 pub async fn multi_verify_certs(
198 &self,
199 certs: Vec<CertifiedTransaction>,
200 ) -> Vec<IotaResult<VerifiedCertificate>> {
201 let mut futures = Vec::with_capacity(certs.len());
204 for cert in certs {
205 futures.push(self.verify_cert(cert));
206 }
207 futures::future::join_all(futures).await
208 }
209
210 pub async fn verify_cert_skip_cache(
212 &self,
213 cert: CertifiedTransaction,
214 ) -> IotaResult<VerifiedCertificate> {
215 if cert.auth_sig().epoch != self.committee.epoch() {
218 return Err(IotaError::WrongEpoch {
219 expected_epoch: self.committee.epoch(),
220 actual_epoch: cert.auth_sig().epoch,
221 });
222 }
223
224 self.verify_cert_inner(cert).await
225 }
226
227 async fn verify_cert_inner(
228 &self,
229 cert: CertifiedTransaction,
230 ) -> IotaResult<VerifiedCertificate> {
231 let (tx, rx) = oneshot::channel();
236 pin_mut!(rx);
237
238 let prev_id_or_buffer = {
239 let mut queue = self.queue.lock();
240 queue.push(tx, cert);
241 if queue.len() == queue.capacity() {
242 Either::Right(CertBuffer::take_and_replace(queue))
243 } else {
244 Either::Left(queue.id)
245 }
246 };
247 let prev_id = match prev_id_or_buffer {
248 Either::Left(prev_id) => prev_id,
249 Either::Right(buffer) => {
250 self.metrics.full_batches.inc();
251 self.process_queue(buffer)
252 .instrument(trace_span!("SignatureVerifier::process_queue"))
253 .await;
254 return rx.try_recv().unwrap();
256 }
257 };
258
259 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
260 return res.unwrap();
262 }
263 self.metrics.timeouts.inc();
264
265 let buffer = {
266 let queue = self.queue.lock();
267 if prev_id == queue.id {
269 debug_assert_ne!(queue.len(), queue.capacity());
270 Some(CertBuffer::take_and_replace(queue))
271 } else {
272 None
273 }
274 };
275
276 if let Some(buffer) = buffer {
277 self.metrics.partial_batches.inc();
278 self.process_queue(buffer).await;
279 return rx.try_recv().unwrap();
281 }
282
283 rx.await.unwrap()
286 }
287
288 async fn process_queue(&self, buffer: CertBuffer) {
289 let committee = self.committee.clone();
290 let metrics = self.metrics.clone();
291 Handle::current()
292 .spawn_blocking(move || Self::process_queue_sync(committee, metrics, buffer))
293 .await
294 .expect("Spawn blocking should not fail");
295 }
296
297 #[instrument(level = "trace", skip_all)]
298 fn process_queue_sync(
299 committee: Arc<Committee>,
300 metrics: Arc<SignatureVerifierMetrics>,
301 buffer: CertBuffer,
302 ) {
303 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
304
305 let results = batch_verify_certificates(&committee, &buffer.certs.iter().collect_vec());
306 izip!(
307 results.into_iter(),
308 buffer.certs.into_iter(),
309 buffer.senders.into_iter(),
310 )
311 .for_each(|(result, cert, tx)| {
312 tx.send(match result {
313 Ok(()) => {
314 metrics.total_verified_certs.inc();
315 Ok(VerifiedCertificate::new_unchecked(cert))
316 }
317 Err(e) => {
318 metrics.total_failed_certs.inc();
319 Err(e)
320 }
321 })
322 .ok();
323 });
324 }
325
326 #[instrument(level = "trace", skip_all, fields(tx_digest = ?signed_tx.digest()))]
327 pub fn verify_tx(&self, signed_tx: &SenderSignedData) -> IotaResult {
328 self.signed_data_cache.is_verified(
329 signed_tx.full_message_digest(),
330 || verify_sender_signed_data_message_signatures(signed_tx, &self.verify_params),
331 || Ok(()),
332 )
333 }
334
335 #[instrument(level = "trace", skip_all)]
336 pub fn verify_authority_capabilities(
337 &self,
338 signed_authority_capabilities: &SignedAuthorityCapabilitiesV1,
339 ) -> IotaResult {
340 let epoch = self.committee.epoch();
341 self.authority_capability_cache.is_verified(
342 signed_authority_capabilities.cache_digest(epoch),
343 || {
344 let authority_name = signed_authority_capabilities.data().authority;
346 if !self.non_committee_validators.contains(&authority_name) {
347 return Err(IotaError::IncorrectSigner {
348 error: "Signer must be part of non-committee active validators".to_string(),
349 });
350 }
351
352 let mut obligation = VerificationObligation::default();
354 let idx = obligation.add_message(
355 signed_authority_capabilities.data(),
356 epoch, Intent::iota_app(signed_authority_capabilities.scope()),
359 );
360
361 let authority_key = AuthorityPublicKey::from_bytes(authority_name.as_bytes())
363 .map_err(|_| IotaError::IncorrectSigner {
364 error: "Invalid authority public key bytes".to_string(),
365 })?;
366 obligation
367 .public_keys
368 .get_mut(idx)
369 .ok_or(IotaError::InvalidAuthenticator)?
370 .push(&authority_key);
371
372 obligation
373 .signatures
374 .get_mut(idx)
375 .ok_or(IotaError::InvalidAuthenticator)?
376 .add_signature(signed_authority_capabilities.auth_sig().clone())
377 .map_err(|_| IotaError::InvalidSignature {
378 error: "Failed to add authority signature to obligation".to_string(),
379 })?;
380
381 obligation.verify_all()
382 },
383 || Ok(()),
384 )
385 }
386
387 pub fn clear_signature_cache(&self) {
388 self.certificate_cache.clear();
389 self.authority_capability_cache.clear();
390 self.signed_data_cache.clear();
391 }
392}
393
394pub struct SignatureVerifierMetrics {
395 pub certificate_signatures_cache_hits: IntCounter,
396 pub certificate_signatures_cache_misses: IntCounter,
397 pub certificate_signatures_cache_evictions: IntCounter,
398 pub signed_data_cache_hits: IntCounter,
399 pub signed_data_cache_misses: IntCounter,
400 pub signed_data_cache_evictions: IntCounter,
401 pub authority_capabilities_cache_hits: IntCounter,
402 pub authority_capabilities_cache_misses: IntCounter,
403 pub authority_capabilities_cache_evictions: IntCounter,
404 timeouts: IntCounter,
405 full_batches: IntCounter,
406 partial_batches: IntCounter,
407 total_verified_certs: IntCounter,
408 total_failed_certs: IntCounter,
409}
410
411impl SignatureVerifierMetrics {
412 pub fn new(registry: &Registry) -> Arc<Self> {
413 Arc::new(Self {
414 certificate_signatures_cache_hits: register_int_counter_with_registry!(
415 "certificate_signatures_cache_hits",
416 "Number of certificates which were known to be verified because of signature cache.",
417 registry
418 )
419 .unwrap(),
420 certificate_signatures_cache_misses: register_int_counter_with_registry!(
421 "certificate_signatures_cache_misses",
422 "Number of certificates which missed the signature cache",
423 registry
424 )
425 .unwrap(),
426 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
427 "certificate_signatures_cache_evictions",
428 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
429 registry
430 )
431 .unwrap(),
432 signed_data_cache_hits: register_int_counter_with_registry!(
433 "signed_data_cache_hits",
434 "Number of signed data which were known to be verified because of signature cache.",
435 registry
436 )
437 .unwrap(),
438 signed_data_cache_misses: register_int_counter_with_registry!(
439 "signed_data_cache_misses",
440 "Number of signed data which missed the signature cache.",
441 registry
442 )
443 .unwrap(),
444 signed_data_cache_evictions: register_int_counter_with_registry!(
445 "signed_data_cache_evictions",
446 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
447 registry
448 )
449 .unwrap(),
450 authority_capabilities_cache_hits: register_int_counter_with_registry!(
451 "authority_capabilities_cache_hits",
452 "Number of authority capabilities which were known to be verified because of capabilities cache.",
453 registry
454 )
455 .unwrap(),
456 authority_capabilities_cache_misses: register_int_counter_with_registry!(
457 "authority_capabilities_cache_misses",
458 "Number of authority capabilities which missed the capabilities cache.",
459 registry
460 )
461 .unwrap(),
462 authority_capabilities_cache_evictions: register_int_counter_with_registry!(
463 "authority_capabilities_cache_evictions",
464 "Number of times we evict a pre-existing authority capabilities that were known to be verified.",
465 registry
466 )
467 .unwrap(),
468 timeouts: register_int_counter_with_registry!(
469 "async_batch_verifier_timeouts",
470 "Number of times batch verifier times out and verifies a partial batch",
471 registry
472 )
473 .unwrap(),
474 full_batches: register_int_counter_with_registry!(
475 "async_batch_verifier_full_batches",
476 "Number of times batch verifier verifies a full batch",
477 registry
478 )
479 .unwrap(),
480 partial_batches: register_int_counter_with_registry!(
481 "async_batch_verifier_partial_batches",
482 "Number of times batch verifier verifies a partial batch",
483 registry
484 )
485 .unwrap(),
486 total_verified_certs: register_int_counter_with_registry!(
487 "async_batch_verifier_total_verified_certs",
488 "Total number of certs batch verifier has verified",
489 registry
490 )
491 .unwrap(),
492 total_failed_certs: register_int_counter_with_registry!(
493 "async_batch_verifier_total_failed_certs",
494 "Total number of certs batch verifier has rejected",
495 registry
496 )
497 .unwrap(),
498 })
499 }
500}
501
502#[instrument(level = "trace", skip_all)]
504pub fn batch_verify_all_certificates_and_checkpoints(
505 committee: &Committee,
506 certs: &[&CertifiedTransaction],
507 checkpoints: &[&SignedCheckpointSummary],
508) -> IotaResult {
509 for ckpt in checkpoints {
512 ckpt.data().verify_epoch(committee.epoch())?;
513 }
514
515 batch_verify(committee, certs, checkpoints)
516}
517
518#[instrument(level = "trace", skip_all)]
521pub fn batch_verify_certificates(
522 committee: &Committee,
523 certs: &[&CertifiedTransaction],
524) -> Vec<IotaResult> {
525 let verify_params = VerifyParams::default();
527 match batch_verify(committee, certs, &[]) {
528 Ok(_) => vec![Ok(()); certs.len()],
529
530 Err(_) if certs.len() > 1 => certs
532 .iter()
533 .map(|c| c.verify_signatures_authenticated(committee, &verify_params))
536 .collect(),
537
538 Err(e) => vec![Err(e)],
539 }
540}
541
542fn batch_verify(
543 committee: &Committee,
544 certs: &[&CertifiedTransaction],
545 checkpoints: &[&SignedCheckpointSummary],
546) -> IotaResult {
547 let mut obligation = VerificationObligation::default();
548
549 for cert in certs {
550 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::iota_app(cert.scope()));
551 cert.auth_sig()
552 .add_to_verification_obligation(committee, &mut obligation, idx)?;
553 }
554
555 for ckpt in checkpoints {
556 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::iota_app(ckpt.scope()));
557 ckpt.auth_sig()
558 .add_to_verification_obligation(committee, &mut obligation, idx)?;
559 }
560
561 obligation.verify_all()
562}