1use std::{
6 collections::{HashMap, HashSet, btree_map::BTreeMap},
7 ops::Bound,
8 sync::Arc,
9 time::{self, Duration},
10};
11
12use anemo::PeerId;
13use anyhow::Result;
14use fastcrypto::groups::bls12381;
15use fastcrypto_tbls::{
16 dkg,
17 nodes::PartyId,
18 tbls::ThresholdBls,
19 types::{ShareIndex, ThresholdBls12381MinSig},
20};
21use iota_config::p2p::RandomnessConfig;
22use iota_macros::fail_point_if;
23use iota_metrics::spawn_monitored_task;
24use iota_network_stack::anemo_ext::NetworkExt;
25use iota_types::{
26 base_types::AuthorityName,
27 committee::EpochId,
28 crypto::{RandomnessPartialSignature, RandomnessRound, RandomnessSignature},
29};
30use serde::{Deserialize, Serialize};
31use tokio::sync::{OnceCell, mpsc, oneshot};
32use tracing::{debug, error, info, instrument, warn};
33
34use self::{auth::AllowedPeersUpdatable, metrics::Metrics};
35
36mod auth;
37mod builder;
38mod generated {
39 include!(concat!(env!("OUT_DIR"), "/iota.Randomness.rs"));
40}
41mod metrics;
42mod server;
43#[cfg(test)]
44mod tests;
45
46pub use builder::{Builder, UnstartedRandomness};
47pub use generated::{
48 randomness_client::RandomnessClient,
49 randomness_server::{Randomness, RandomnessServer},
50};
51
52#[derive(Clone, Debug, Serialize, Deserialize)]
53pub struct SendSignaturesRequest {
54 epoch: EpochId,
55 round: RandomnessRound,
56 partial_sigs: Vec<Vec<u8>>,
61 sig: Option<RandomnessSignature>,
64}
65
66#[derive(Clone, Debug)]
72pub struct Handle {
73 sender: mpsc::Sender<RandomnessMessage>,
74}
75
76impl Handle {
77 pub fn update_epoch(
80 &self,
81 new_epoch: EpochId,
82 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
83 dkg_output: dkg::Output<bls12381::G2Element, bls12381::G2Element>,
84 aggregation_threshold: u16,
85 recovered_last_completed_round: Option<RandomnessRound>, ) {
88 self.sender
89 .try_send(RandomnessMessage::UpdateEpoch(
90 new_epoch,
91 authority_info,
92 dkg_output,
93 aggregation_threshold,
94 recovered_last_completed_round,
95 ))
96 .expect("RandomnessEventLoop mailbox should not overflow or be closed")
97 }
98
99 pub fn send_partial_signatures(&self, epoch: EpochId, round: RandomnessRound) {
102 self.sender
103 .try_send(RandomnessMessage::SendPartialSignatures(epoch, round))
104 .expect("RandomnessEventLoop mailbox should not overflow or be closed")
105 }
106
107 pub fn complete_round(&self, epoch: EpochId, round: RandomnessRound) {
110 self.sender
111 .try_send(RandomnessMessage::CompleteRound(epoch, round))
112 .expect("RandomnessEventLoop mailbox should not overflow or be closed")
113 }
114
115 pub fn admin_get_partial_signatures(
118 &self,
119 round: RandomnessRound,
120 tx: oneshot::Sender<Vec<u8>>,
121 ) {
122 self.sender
123 .try_send(RandomnessMessage::AdminGetPartialSignatures(round, tx))
124 .expect("RandomnessEventLoop mailbox should not overflow or be closed")
125 }
126
127 pub fn admin_inject_partial_signatures(
130 &self,
131 authority_name: AuthorityName,
132 round: RandomnessRound,
133 sigs: Vec<RandomnessPartialSignature>,
134 result_channel: oneshot::Sender<Result<()>>,
135 ) {
136 self.sender
137 .try_send(RandomnessMessage::AdminInjectPartialSignatures(
138 authority_name,
139 round,
140 sigs,
141 result_channel,
142 ))
143 .expect("RandomnessEventLoop mailbox should not overflow or be closed")
144 }
145
146 pub fn admin_inject_full_signature(
149 &self,
150 round: RandomnessRound,
151 sig: RandomnessSignature,
152 result_channel: oneshot::Sender<Result<()>>,
153 ) {
154 self.sender
155 .try_send(RandomnessMessage::AdminInjectFullSignature(
156 round,
157 sig,
158 result_channel,
159 ))
160 .expect("RandomnessEventLoop mailbox should not overflow or be closed")
161 }
162
163 pub fn new_stub() -> Self {
165 let (sender, mut receiver) = mpsc::channel(1);
166 tokio::spawn(async move {
168 loop {
169 tokio::select! {
170 m = receiver.recv() => {
171 if m.is_none() {
172 break;
173 }
174 },
175 }
176 }
177 });
178 Self { sender }
179 }
180}
181
182#[derive(Debug)]
183enum RandomnessMessage {
184 UpdateEpoch(
185 EpochId,
186 HashMap<AuthorityName, (PeerId, PartyId)>,
187 dkg::Output<bls12381::G2Element, bls12381::G2Element>,
188 u16, Option<RandomnessRound>, ),
191 SendPartialSignatures(EpochId, RandomnessRound),
192 CompleteRound(EpochId, RandomnessRound),
193 ReceiveSignatures(
194 PeerId,
195 EpochId,
196 RandomnessRound,
197 Vec<Vec<u8>>,
198 Option<RandomnessSignature>,
199 ),
200 MaybeIgnoreByzantinePeer(EpochId, PeerId),
201 AdminGetPartialSignatures(RandomnessRound, oneshot::Sender<Vec<u8>>),
202 AdminInjectPartialSignatures(
203 AuthorityName,
204 RandomnessRound,
205 Vec<RandomnessPartialSignature>,
206 oneshot::Sender<Result<()>>,
207 ),
208 AdminInjectFullSignature(
209 RandomnessRound,
210 RandomnessSignature,
211 oneshot::Sender<Result<()>>,
212 ),
213}
214
215struct RandomnessEventLoop {
216 name: AuthorityName,
217 config: RandomnessConfig,
218 mailbox: mpsc::Receiver<RandomnessMessage>,
219 mailbox_sender: mpsc::WeakSender<RandomnessMessage>,
220 network: anemo::Network,
221 allowed_peers: AllowedPeersUpdatable,
222 allowed_peers_set: HashSet<PeerId>,
223 metrics: Metrics,
224 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
225
226 epoch: EpochId,
227 authority_info: Arc<HashMap<AuthorityName, (PeerId, PartyId)>>,
228 peer_share_ids: Option<HashMap<PeerId, Vec<ShareIndex>>>,
229 blocked_share_id_count: usize,
230 dkg_output: Option<dkg::Output<bls12381::G2Element, bls12381::G2Element>>,
231 aggregation_threshold: u16,
232 highest_requested_round: BTreeMap<EpochId, RandomnessRound>,
233 send_tasks: BTreeMap<
234 RandomnessRound,
235 (
236 tokio::task::JoinHandle<()>,
237 Arc<OnceCell<RandomnessSignature>>,
238 ),
239 >,
240 round_request_time: BTreeMap<(EpochId, RandomnessRound), time::Instant>,
241 future_epoch_partial_sigs: BTreeMap<(EpochId, RandomnessRound, PeerId), Vec<Vec<u8>>>,
242 received_partial_sigs: BTreeMap<(RandomnessRound, PeerId), Vec<RandomnessPartialSignature>>,
243 completed_sigs: BTreeMap<RandomnessRound, RandomnessSignature>,
244 highest_completed_round: BTreeMap<EpochId, RandomnessRound>,
245}
246
247impl RandomnessEventLoop {
248 pub async fn start(mut self) {
249 info!("Randomness network event loop started");
250
251 loop {
252 tokio::select! {
253 maybe_message = self.mailbox.recv() => {
254 if let Some(message) = maybe_message {
257 self.handle_message(message);
258 } else {
259 break;
260 }
261 },
262 }
263 }
264
265 info!("Randomness network event loop ended");
266 }
267
268 fn handle_message(&mut self, message: RandomnessMessage) {
269 match message {
270 RandomnessMessage::UpdateEpoch(
271 epoch,
272 authority_info,
273 dkg_output,
274 aggregation_threshold,
275 recovered_highest_completed_round,
276 ) => {
277 if let Err(e) = self.update_epoch(
278 epoch,
279 authority_info,
280 dkg_output,
281 aggregation_threshold,
282 recovered_highest_completed_round,
283 ) {
284 error!("BUG: failed to update epoch in RandomnessEventLoop: {e:?}");
285 }
286 }
287 RandomnessMessage::SendPartialSignatures(epoch, round) => {
288 self.send_partial_signatures(epoch, round)
289 }
290 RandomnessMessage::CompleteRound(epoch, round) => self.complete_round(epoch, round),
291 RandomnessMessage::ReceiveSignatures(peer_id, epoch, round, partial_sigs, sig) => {
292 if let Some(sig) = sig {
293 self.receive_full_signature(peer_id, epoch, round, sig)
294 } else {
295 self.receive_partial_signatures(peer_id, epoch, round, partial_sigs)
296 }
297 }
298 RandomnessMessage::MaybeIgnoreByzantinePeer(epoch, peer_id) => {
299 self.maybe_ignore_byzantine_peer(epoch, peer_id)
300 }
301 RandomnessMessage::AdminGetPartialSignatures(round, tx) => {
302 self.admin_get_partial_signatures(round, tx)
303 }
304 RandomnessMessage::AdminInjectPartialSignatures(
305 authority_name,
306 round,
307 sigs,
308 result_channel,
309 ) => {
310 let _ = result_channel.send(self.admin_inject_partial_signatures(
311 authority_name,
312 round,
313 sigs,
314 ));
315 }
316 RandomnessMessage::AdminInjectFullSignature(round, sig, result_channel) => {
317 let _ = result_channel.send(self.admin_inject_full_signature(round, sig));
318 }
319 }
320 }
321
322 #[instrument(level = "debug", skip_all, fields(?new_epoch))]
323 fn update_epoch(
324 &mut self,
325 new_epoch: EpochId,
326 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
327 dkg_output: dkg::Output<bls12381::G2Element, bls12381::G2Element>,
328 aggregation_threshold: u16,
329 recovered_highest_completed_round: Option<RandomnessRound>,
330 ) -> Result<()> {
331 assert!(self.dkg_output.is_none() || new_epoch > self.epoch);
332
333 debug!("updating randomness network loop to new epoch");
334
335 self.peer_share_ids = Some(authority_info.iter().try_fold(
336 HashMap::new(),
337 |mut acc, (_name, (peer_id, party_id))| -> Result<_> {
338 let ids = dkg_output
339 .nodes
340 .share_ids_of(*party_id)
341 .expect("party_id should be valid");
342 acc.insert(*peer_id, ids);
343 Ok(acc)
344 },
345 )?);
346 self.allowed_peers_set = authority_info
347 .values()
348 .map(|(peer_id, _)| *peer_id)
349 .collect();
350 self.allowed_peers
351 .update(Arc::new(self.allowed_peers_set.clone()));
352 self.epoch = new_epoch;
353 self.authority_info = Arc::new(authority_info);
354 self.dkg_output = Some(dkg_output);
355 self.aggregation_threshold = aggregation_threshold;
356 if let Some(round) = recovered_highest_completed_round {
357 self.highest_completed_round
358 .entry(new_epoch)
359 .and_modify(|r| *r = std::cmp::max(*r, round))
360 .or_insert(round);
361 }
362 for (_, (task, _)) in std::mem::take(&mut self.send_tasks) {
363 task.abort();
364 }
365 self.metrics.set_epoch(new_epoch);
366
367 self.highest_requested_round = self.highest_requested_round.split_off(&new_epoch);
369 self.round_request_time = self
370 .round_request_time
371 .split_off(&(new_epoch, RandomnessRound(0)));
372 self.received_partial_sigs.clear();
373 self.completed_sigs.clear();
374 self.highest_completed_round = self.highest_completed_round.split_off(&new_epoch);
375
376 self.maybe_start_pending_tasks();
378
379 for ((epoch, round, peer_id), sig_bytes) in
383 std::mem::take(&mut self.future_epoch_partial_sigs)
384 {
385 self.receive_partial_signatures(peer_id, epoch, round, sig_bytes);
387 }
388 let rounds_to_aggregate: Vec<_> =
389 self.received_partial_sigs.keys().map(|(r, _)| *r).collect();
390 for round in rounds_to_aggregate {
391 self.maybe_aggregate_partial_signatures(new_epoch, round);
392 }
393
394 Ok(())
395 }
396
397 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
398 fn send_partial_signatures(&mut self, epoch: EpochId, round: RandomnessRound) {
399 if epoch < self.epoch {
400 error!(
401 "BUG: skipping sending partial sigs, we are already up to epoch {}",
402 self.epoch
403 );
404 debug_assert!(
405 false,
406 "skipping sending partial sigs, we are already up to higher epoch"
407 );
408 return;
409 }
410 if epoch == self.epoch {
411 if let Some(highest_completed_round) = self.highest_completed_round.get(&epoch) {
412 if round <= *highest_completed_round {
413 info!("skipping sending partial sigs, we already have completed this round");
414 return;
415 }
416 }
417 }
418
419 self.highest_requested_round
420 .entry(epoch)
421 .and_modify(|r| *r = std::cmp::max(*r, round))
422 .or_insert(round);
423 self.round_request_time
424 .insert((epoch, round), time::Instant::now());
425 self.maybe_start_pending_tasks();
426 }
427
428 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
429 fn complete_round(&mut self, epoch: EpochId, round: RandomnessRound) {
430 debug!("completing randomness round");
431 let new_highest_round = *self
432 .highest_completed_round
433 .entry(epoch)
434 .and_modify(|r| *r = std::cmp::max(*r, round))
435 .or_insert(round);
436 if round != new_highest_round {
437 return;
440 }
441
442 self.round_request_time = self.round_request_time.split_off(&(epoch, round + 1));
443
444 if epoch == self.epoch {
445 self.remove_partial_sigs_in_range((
446 Bound::Included((RandomnessRound(0), PeerId([0; 32]))),
447 Bound::Excluded((round + 1, PeerId([0; 32]))),
448 ));
449 self.completed_sigs = self.completed_sigs.split_off(&(round + 1));
450 for (_, (task, _)) in self.send_tasks.iter().take_while(|(r, _)| **r <= round) {
451 task.abort();
452 }
453 self.send_tasks = self.send_tasks.split_off(&(round + 1));
454 self.maybe_start_pending_tasks();
455 }
456
457 self.update_rounds_pending_metric();
458 }
459
460 #[instrument(level = "debug", skip_all, fields(?peer_id, ?epoch, ?round))]
461 fn receive_partial_signatures(
462 &mut self,
463 peer_id: PeerId,
464 epoch: EpochId,
465 round: RandomnessRound,
466 sig_bytes: Vec<Vec<u8>>,
467 ) {
468 if epoch < self.epoch {
470 debug!(
471 "skipping received partial sigs, we are already up to epoch {}",
472 self.epoch
473 );
474 return;
475 }
476 if epoch > self.epoch + 1 {
477 debug!(
478 "skipping received partial sigs, we are still on epoch {}",
479 self.epoch
480 );
481 return;
482 }
483 if epoch == self.epoch && self.completed_sigs.contains_key(&round) {
484 debug!("skipping received partial sigs, we already have completed this sig");
485 return;
486 }
487 let highest_completed_round = self.highest_completed_round.get(&epoch).copied();
488 if let Some(highest_completed_round) = &highest_completed_round {
489 if *highest_completed_round >= round {
490 debug!("skipping received partial sigs, we already have completed this round");
491 return;
492 }
493 }
494
495 if epoch != self.epoch || self.peer_share_ids.is_none() {
498 if round.0 >= self.config.max_partial_sigs_rounds_ahead() {
499 debug!("skipping received partial sigs for future epoch, round too far ahead",);
500 return;
501 }
502
503 debug!("saving partial sigs from future epoch for later use");
504 self.future_epoch_partial_sigs
505 .insert((epoch, round, peer_id), sig_bytes);
506 return;
507 }
508
509 let peer_share_ids = self.peer_share_ids.as_ref().expect("checked above");
511 let expected_share_ids = if let Some(expected_share_ids) = peer_share_ids.get(&peer_id) {
512 expected_share_ids
513 } else {
514 debug!("received partial sigs from unknown peer");
515 return;
516 };
517 if sig_bytes.len() != expected_share_ids.len() as usize {
518 warn!(
519 "received partial sigs with wrong share ids count: expected {}, got {}",
520 expected_share_ids.len(),
521 sig_bytes.len(),
522 );
523 return;
524 }
525
526 let last_completed_signature = self.completed_sigs.last_key_value().map(|(r, _)| *r);
530 let last_completed_round = std::cmp::max(last_completed_signature, highest_completed_round)
531 .unwrap_or(RandomnessRound(0));
532 if round.0
533 >= last_completed_round
534 .0
535 .saturating_add(self.config.max_partial_sigs_rounds_ahead())
536 {
537 debug!(
538 "skipping received partial sigs, most recent round we completed was only {last_completed_round}",
539 );
540 return;
541 }
542
543 let partial_sigs =
545 match sig_bytes
546 .iter()
547 .try_fold(Vec::new(), |mut acc, bytes| -> Result<_> {
548 let sig: RandomnessPartialSignature = bcs::from_bytes(bytes)?;
549 acc.push(sig);
550 Ok(acc)
551 }) {
552 Ok(partial_sigs) => partial_sigs,
553 Err(e) => {
554 warn!("failed to deserialize partial sigs: {e:?}");
555 return;
556 }
557 };
558 let received_share_ids = partial_sigs.iter().map(|s| s.index);
562 if received_share_ids
563 .zip(expected_share_ids.iter())
564 .any(|(a, b)| a != *b)
565 {
566 let received_share_ids = partial_sigs.iter().map(|s| s.index).collect::<Vec<_>>();
567 warn!(
568 "received partial sigs with wrong share ids: expected {expected_share_ids:?}, received {received_share_ids:?}"
569 );
570 return;
571 }
572
573 debug!("recording received partial signatures");
575 self.received_partial_sigs
576 .insert((round, peer_id), partial_sigs);
577
578 self.maybe_aggregate_partial_signatures(epoch, round);
579 }
580
581 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
582 fn maybe_aggregate_partial_signatures(&mut self, epoch: EpochId, round: RandomnessRound) {
583 if let Some(highest_completed_round) = self.highest_completed_round.get(&epoch) {
584 if round <= *highest_completed_round {
585 info!("skipping aggregation for already-completed round");
586 return;
587 }
588 }
589
590 let highest_requested_round = self.highest_requested_round.get(&epoch);
591 if highest_requested_round.is_none() || round > *highest_requested_round.unwrap() {
592 debug!(
598 "waiting to aggregate randomness partial signatures until local consensus catches up"
599 );
600 return;
601 }
602
603 if epoch != self.epoch {
604 debug!(
605 "waiting to aggregate randomness partial signatures until DKG completes for epoch"
606 );
607 return;
608 }
609
610 if self.completed_sigs.contains_key(&round) {
611 info!("skipping aggregation for already-completed signature");
612 return;
613 }
614
615 let vss_pk = {
616 let Some(dkg_output) = &self.dkg_output else {
617 debug!("called maybe_aggregate_partial_signatures before DKG completed");
618 return;
619 };
620 &dkg_output.vss_pk
621 };
622
623 let sig_bounds = (
624 Bound::Included((round, PeerId([0; 32]))),
625 Bound::Excluded((round + 1, PeerId([0; 32]))),
626 );
627
628 let sig_range = self
630 .received_partial_sigs
631 .range(sig_bounds)
632 .flat_map(|(_, sigs)| sigs);
633 let mut sig = match ThresholdBls12381MinSig::aggregate(
634 self.aggregation_threshold,
635 sig_range,
636 ) {
637 Ok(sig) => sig,
638 Err(fastcrypto::error::FastCryptoError::NotEnoughInputs) => return, Err(e) => {
641 error!("error while aggregating randomness partial signatures: {e:?}");
642 return;
643 }
644 };
645
646 if ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig).is_err() {
649 self.received_partial_sigs
653 .retain(|&(r, peer_id), partial_sigs| {
654 if round != r {
655 return true;
656 }
657 if ThresholdBls12381MinSig::partial_verify_batch(
658 vss_pk,
659 &round.signature_message(),
660 partial_sigs.iter(),
661 &mut rand::thread_rng(),
662 )
663 .is_err()
664 {
665 warn!(
666 "received invalid partial signatures from possibly-Byzantine peer {peer_id}"
667 );
668 if let Some(sender) = self.mailbox_sender.upgrade() {
669 sender.try_send(RandomnessMessage::MaybeIgnoreByzantinePeer(
670 epoch,
671 peer_id,
672 ))
673 .expect("RandomnessEventLoop mailbox should not overflow or be closed");
674 }
675 return false;
676 }
677 true
678 });
679 let sig_range = self
680 .received_partial_sigs
681 .range(sig_bounds)
682 .flat_map(|(_, sigs)| sigs);
683 sig = match ThresholdBls12381MinSig::aggregate(self.aggregation_threshold, sig_range) {
684 Ok(sig) => sig,
685 Err(fastcrypto::error::FastCryptoError::NotEnoughInputs) => return, Err(e) => {
687 error!("error while aggregating randomness partial signatures: {e:?}");
688 return;
689 }
690 };
691 if let Err(e) =
692 ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
693 {
694 error!(
695 "error while verifying randomness partial signatures after removing invalid partials: {e:?}"
696 );
697 debug_assert!(
698 false,
699 "error while verifying randomness partial signatures after removing invalid partials"
700 );
701 return;
702 }
703 }
704
705 debug!("successfully generated randomness full signature");
706 self.process_valid_full_signature(epoch, round, sig);
707 }
708
709 #[instrument(level = "debug", skip_all, fields(?peer_id, ?epoch, ?round))]
710 fn receive_full_signature(
711 &mut self,
712 peer_id: PeerId,
713 epoch: EpochId,
714 round: RandomnessRound,
715 sig: RandomnessSignature,
716 ) {
717 let vss_pk = {
718 let Some(dkg_output) = &self.dkg_output else {
719 debug!("called receive_full_signature before DKG completed");
720 return;
721 };
722 &dkg_output.vss_pk
723 };
724
725 if epoch != self.epoch {
727 debug!("skipping received full sig, we are on epoch {}", self.epoch);
728 return;
729 }
730 if self.completed_sigs.contains_key(&round) {
731 debug!("skipping received full sigs, we already have completed this sig");
732 return;
733 }
734 let highest_completed_round = self.highest_completed_round.get(&epoch).copied();
735 if let Some(highest_completed_round) = &highest_completed_round {
736 if *highest_completed_round >= round {
737 debug!("skipping received full sig, we already have completed this round");
738 return;
739 }
740 }
741
742 let highest_requested_round = self.highest_requested_round.get(&epoch);
743 if highest_requested_round.is_none() || round > *highest_requested_round.unwrap() {
744 debug!(
746 "skipping received full signature, local consensus is not caught up to its round"
747 );
748 return;
749 }
750
751 if let Err(e) =
752 ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
753 {
754 info!("received invalid full signature from peer {peer_id}: {e:?}");
755 if let Some(sender) = self.mailbox_sender.upgrade() {
756 sender
757 .try_send(RandomnessMessage::MaybeIgnoreByzantinePeer(epoch, peer_id))
758 .expect("RandomnessEventLoop mailbox should not overflow or be closed");
759 }
760 return;
761 }
762
763 debug!("received valid randomness full signature");
764 self.process_valid_full_signature(epoch, round, sig);
765 }
766
767 fn process_valid_full_signature(
768 &mut self,
769 epoch: EpochId,
770 round: RandomnessRound,
771 sig: RandomnessSignature,
772 ) {
773 assert_eq!(epoch, self.epoch);
774
775 if let Some((_, full_sig_cell)) = self.send_tasks.get(&round) {
776 full_sig_cell
777 .set(sig)
778 .expect("full signature should never be processed twice");
779 }
780 self.completed_sigs.insert(round, sig);
781 self.remove_partial_sigs_in_range((
782 Bound::Included((round, PeerId([0; 32]))),
783 Bound::Excluded((round + 1, PeerId([0; 32]))),
784 ));
785 self.metrics.record_completed_round(round);
786 if let Some(start_time) = self.round_request_time.get(&(epoch, round)) {
787 if let Some(metric) = self.metrics.round_generation_latency_metric() {
788 metric.observe(start_time.elapsed().as_secs_f64());
789 }
790 }
791
792 let sig_bytes = bcs::to_bytes(&sig).expect("signature serialization should not fail");
793 self.randomness_tx
794 .try_send((epoch, round, sig_bytes))
795 .expect("RandomnessRoundReceiver mailbox should not overflow or be closed");
796 }
797
798 fn maybe_ignore_byzantine_peer(&mut self, epoch: EpochId, peer_id: PeerId) {
799 if epoch != self.epoch {
800 return; }
802 let Some(dkg_output) = &self.dkg_output else {
803 return; };
805 if !self.allowed_peers_set.contains(&peer_id) {
806 return; }
808 let Some(peer_share_ids) = &self.peer_share_ids else {
809 return; };
811 let Some(peer_shares) = peer_share_ids.get(&peer_id) else {
812 warn!("can't ignore unknown byzantine peer {peer_id:?}");
813 return;
814 };
815 let max_ignored_shares = (self.config.max_ignored_peer_weight_factor()
816 * (dkg_output.nodes.total_weight() as f64)) as usize;
817 if self.blocked_share_id_count + peer_shares.len() > max_ignored_shares {
818 warn!(
819 "ignoring byzantine peer {peer_id:?} with {} shares would exceed max ignored peer weight {max_ignored_shares}",
820 peer_shares.len()
821 );
822 return;
823 }
824
825 warn!(
826 "ignoring byzantine peer {peer_id:?} with {} shares",
827 peer_shares.len()
828 );
829 self.blocked_share_id_count += peer_shares.len();
830 self.allowed_peers_set.remove(&peer_id);
831 self.allowed_peers
832 .update(Arc::new(self.allowed_peers_set.clone()));
833 self.metrics.inc_num_ignored_byzantine_peers();
834 }
835
836 fn maybe_start_pending_tasks(&mut self) {
837 let dkg_output = if let Some(dkg_output) = &self.dkg_output {
838 dkg_output
839 } else {
840 return; };
842 let shares = if let Some(shares) = &dkg_output.shares {
843 shares
844 } else {
845 return; };
847 let highest_requested_round =
848 if let Some(highest_requested_round) = self.highest_requested_round.get(&self.epoch) {
849 highest_requested_round
850 } else {
851 return; };
853 let start_round = std::cmp::max(
856 if let Some(highest_completed_round) = self.highest_completed_round.get(&self.epoch) {
857 highest_completed_round.checked_add(1).unwrap()
858 } else {
859 RandomnessRound(0)
860 },
861 self.send_tasks
862 .last_key_value()
863 .map(|(r, _)| r.checked_add(1).unwrap())
864 .unwrap_or(RandomnessRound(0)),
865 );
866
867 let mut rounds_to_aggregate = Vec::new();
868 for round in start_round.0..=highest_requested_round.0 {
869 let round = RandomnessRound(round);
870
871 if self.send_tasks.len() >= self.config.max_partial_sigs_concurrent_sends() {
872 break; }
874
875 let full_sig_cell = Arc::new(OnceCell::new());
876 self.send_tasks.entry(round).or_insert_with(|| {
877 let name = self.name;
878 let network = self.network.clone();
879 let retry_interval = self.config.partial_signature_retry_interval();
880 let metrics = self.metrics.clone();
881 let authority_info = self.authority_info.clone();
882 let epoch = self.epoch;
883 let partial_sigs = ThresholdBls12381MinSig::partial_sign_batch(
884 shares.iter(),
885 &round.signature_message(),
886 );
887 let full_sig_cell_clone = full_sig_cell.clone();
888
889 if !self.completed_sigs.contains_key(&round) {
891 self.received_partial_sigs
892 .insert((round, self.network.peer_id()), partial_sigs.clone());
893 rounds_to_aggregate.push((epoch, round));
894 }
895
896 debug!("sending partial sigs for epoch {epoch}, round {round}");
897 (
898 spawn_monitored_task!(RandomnessEventLoop::send_signatures_task(
899 name,
900 network,
901 retry_interval,
902 metrics,
903 authority_info,
904 epoch,
905 round,
906 partial_sigs,
907 full_sig_cell_clone,
908 )),
909 full_sig_cell,
910 )
911 });
912 }
913
914 self.update_rounds_pending_metric();
915
916 for (epoch, round) in rounds_to_aggregate {
919 self.maybe_aggregate_partial_signatures(epoch, round);
920 }
921 }
922
923 #[expect(clippy::type_complexity)]
924 fn remove_partial_sigs_in_range(
925 &mut self,
926 range: (
927 Bound<(RandomnessRound, PeerId)>,
928 Bound<(RandomnessRound, PeerId)>,
929 ),
930 ) {
931 let keys_to_remove: Vec<_> = self
932 .received_partial_sigs
933 .range(range)
934 .map(|(key, _)| *key)
935 .collect();
936 for key in keys_to_remove {
937 self.received_partial_sigs.remove(&key);
940 }
941 }
942
943 async fn send_signatures_task(
944 name: AuthorityName,
945 network: anemo::Network,
946 retry_interval: Duration,
947 metrics: Metrics,
948 authority_info: Arc<HashMap<AuthorityName, (PeerId, PartyId)>>,
949 epoch: EpochId,
950 round: RandomnessRound,
951 partial_sigs: Vec<RandomnessPartialSignature>,
952 full_sig: Arc<OnceCell<RandomnessSignature>>,
953 ) {
954 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
956 let mut fail_point_skip_sending = false;
957 fail_point_if!("rb-send-partial-signatures", || {
958 fail_point_skip_sending = true;
959 });
960 if fail_point_skip_sending {
961 warn!("skipping sending partial sigs due to simtest fail point");
962 return;
963 }
964
965 let _metrics_guard = metrics
966 .round_observation_latency_metric()
967 .map(|metric| metric.start_timer());
968
969 let peers: HashMap<_, _> = authority_info
970 .iter()
971 .map(|(name, (peer_id, _party_id))| (name, network.waiting_peer(*peer_id)))
972 .collect();
973 let partial_sigs: Vec<_> = partial_sigs
974 .iter()
975 .map(|sig| bcs::to_bytes(sig).expect("message serialization should not fail"))
976 .collect();
977
978 loop {
979 let mut requests = Vec::new();
980 for (peer_name, peer) in &peers {
981 if name == **peer_name {
982 continue; }
984 let mut client = RandomnessClient::new(peer.clone());
985 #[cfg(test)]
989 const SEND_PARTIAL_SIGNATURES_TIMEOUT: Duration = Duration::from_secs(300);
990 #[cfg(not(test))]
993 const SEND_PARTIAL_SIGNATURES_TIMEOUT: Duration = Duration::from_secs(10);
994 let full_sig = full_sig.get().cloned();
995 let request = anemo::Request::new(SendSignaturesRequest {
996 epoch,
997 round,
998 partial_sigs: if full_sig.is_none() {
999 partial_sigs.clone()
1000 } else {
1001 Vec::new()
1002 },
1003 sig: full_sig,
1004 })
1005 .with_timeout(SEND_PARTIAL_SIGNATURES_TIMEOUT);
1006 requests.push(async move {
1007 let result = client.send_signatures(request).await;
1008 if let Err(_error) = result {
1009 debug!("failed to send partial signatures to {peer_name}");
1011 }
1012 });
1013 }
1014
1015 futures::future::join_all(requests).await;
1017
1018 tokio::time::sleep(retry_interval).await;
1020 }
1021 }
1022
1023 fn update_rounds_pending_metric(&self) {
1024 let highest_requested_round = self
1025 .highest_requested_round
1026 .get(&self.epoch)
1027 .map(|r| r.0)
1028 .unwrap_or(0);
1029 let highest_completed_round = self
1030 .highest_completed_round
1031 .get(&self.epoch)
1032 .map(|r| r.0)
1033 .unwrap_or(0);
1034 let num_rounds_pending =
1035 highest_requested_round.saturating_sub(highest_completed_round) as i64;
1036 let prev_value = self.metrics.num_rounds_pending().unwrap_or_default();
1037 if num_rounds_pending / 100 > prev_value / 100 {
1038 warn!(
1039 "RandomnessEventLoop randomness generation backlog: over {} rounds are pending (oldest is {:?})",
1041 (num_rounds_pending / 100) * 100,
1042 highest_completed_round + 1,
1043 );
1044 }
1045 self.metrics.set_num_rounds_pending(num_rounds_pending);
1046 }
1047
1048 fn admin_get_partial_signatures(&self, round: RandomnessRound, tx: oneshot::Sender<Vec<u8>>) {
1049 let shares = if let Some(shares) = self.dkg_output.as_ref().and_then(|d| d.shares.as_ref())
1050 {
1051 shares
1052 } else {
1053 let _ = tx.send(Vec::new()); return;
1055 };
1056
1057 let partial_sigs =
1058 ThresholdBls12381MinSig::partial_sign_batch(shares.iter(), &round.signature_message());
1059 let _ = tx.send(bcs::to_bytes(&partial_sigs).expect("serialization should not fail"));
1061 }
1062
1063 fn admin_inject_partial_signatures(
1064 &mut self,
1065 authority_name: AuthorityName,
1066 round: RandomnessRound,
1067 sigs: Vec<RandomnessPartialSignature>,
1068 ) -> Result<()> {
1069 let peer_id = self
1070 .authority_info
1071 .get(&authority_name)
1072 .map(|(peer_id, _)| *peer_id)
1073 .ok_or(anyhow::anyhow!("unknown AuthorityName {authority_name:?}"))?;
1074 self.received_partial_sigs.insert((round, peer_id), sigs);
1075 self.maybe_aggregate_partial_signatures(self.epoch, round);
1076 Ok(())
1077 }
1078
1079 fn admin_inject_full_signature(
1080 &mut self,
1081 round: RandomnessRound,
1082 sig: RandomnessSignature,
1083 ) -> Result<()> {
1084 let vss_pk = {
1085 let Some(dkg_output) = &self.dkg_output else {
1086 return Err(anyhow::anyhow!(
1087 "called admin_inject_full_signature before DKG completed"
1088 ));
1089 };
1090 &dkg_output.vss_pk
1091 };
1092
1093 ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
1094 .map_err(|e| anyhow::anyhow!("invalid full signature: {e:?}"))?;
1095
1096 self.process_valid_full_signature(self.epoch, round, sig);
1097 Ok(())
1098 }
1099}