iota_network/randomness/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet, btree_map::BTreeMap},
7    ops::Bound,
8    sync::Arc,
9    time::{self, Duration},
10};
11
12use anemo::PeerId;
13use anyhow::Result;
14use fastcrypto::groups::bls12381;
15use fastcrypto_tbls::{
16    dkg,
17    nodes::PartyId,
18    tbls::ThresholdBls,
19    types::{ShareIndex, ThresholdBls12381MinSig},
20};
21use iota_config::p2p::RandomnessConfig;
22use iota_macros::fail_point_if;
23use iota_metrics::spawn_monitored_task;
24use iota_network_stack::anemo_ext::NetworkExt;
25use iota_types::{
26    base_types::AuthorityName,
27    committee::EpochId,
28    crypto::{RandomnessPartialSignature, RandomnessRound, RandomnessSignature},
29};
30use serde::{Deserialize, Serialize};
31use tokio::sync::{OnceCell, mpsc, oneshot};
32use tracing::{debug, error, info, instrument, warn};
33
34use self::{auth::AllowedPeersUpdatable, metrics::Metrics};
35
36mod auth;
37mod builder;
38mod generated {
39    include!(concat!(env!("OUT_DIR"), "/iota.Randomness.rs"));
40}
41mod metrics;
42mod server;
43#[cfg(test)]
44mod tests;
45
46pub use builder::{Builder, UnstartedRandomness};
47pub use generated::{
48    randomness_client::RandomnessClient,
49    randomness_server::{Randomness, RandomnessServer},
50};
51
52#[derive(Clone, Debug, Serialize, Deserialize)]
53pub struct SendSignaturesRequest {
54    epoch: EpochId,
55    round: RandomnessRound,
56    // BCS-serialized `RandomnessPartialSignature` values. We store raw bytes here to enable
57    // defenses against too-large messages.
58    // The protocol requires the signatures to be ordered by share index (as provided by
59    // fastcrypto).
60    partial_sigs: Vec<Vec<u8>>,
61    // If peer already has a full signature available for the round, it's provided here in lieu
62    // of partial sigs.
63    sig: Option<RandomnessSignature>,
64}
65
66/// A handle to the Randomness network subsystem.
67///
68/// This handle can be cloned and shared. Once all copies of a Randomness
69/// system's Handle have been dropped, the Randomness system will be gracefully
70/// shutdown.
71#[derive(Clone, Debug)]
72pub struct Handle {
73    sender: mpsc::Sender<RandomnessMessage>,
74}
75
76impl Handle {
77    /// Transitions the Randomness system to a new epoch. Cancels all partial
78    /// signature sends for prior epochs.
79    pub fn update_epoch(
80        &self,
81        new_epoch: EpochId,
82        authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
83        dkg_output: dkg::Output<bls12381::G2Element, bls12381::G2Element>,
84        aggregation_threshold: u16,
85        recovered_last_completed_round: Option<RandomnessRound>, /* set to None if not starting
86                                                                  * up mid-epoch */
87    ) {
88        self.sender
89            .try_send(RandomnessMessage::UpdateEpoch(
90                new_epoch,
91                authority_info,
92                dkg_output,
93                aggregation_threshold,
94                recovered_last_completed_round,
95            ))
96            .expect("RandomnessEventLoop mailbox should not overflow or be closed")
97    }
98
99    /// Begins transmitting partial signatures for the given epoch and round
100    /// until completed.
101    pub fn send_partial_signatures(&self, epoch: EpochId, round: RandomnessRound) {
102        self.sender
103            .try_send(RandomnessMessage::SendPartialSignatures(epoch, round))
104            .expect("RandomnessEventLoop mailbox should not overflow or be closed")
105    }
106
107    /// Records the given round as complete, stopping any partial signature
108    /// sends.
109    pub fn complete_round(&self, epoch: EpochId, round: RandomnessRound) {
110        self.sender
111            .try_send(RandomnessMessage::CompleteRound(epoch, round))
112            .expect("RandomnessEventLoop mailbox should not overflow or be closed")
113    }
114
115    /// Admin interface handler: generates partial signatures for the given
116    /// round at the current epoch.
117    pub fn admin_get_partial_signatures(
118        &self,
119        round: RandomnessRound,
120        tx: oneshot::Sender<Vec<u8>>,
121    ) {
122        self.sender
123            .try_send(RandomnessMessage::AdminGetPartialSignatures(round, tx))
124            .expect("RandomnessEventLoop mailbox should not overflow or be closed")
125    }
126
127    /// Admin interface handler: injects partial signatures for the given round
128    /// at the current epoch, skipping validity checks.
129    pub fn admin_inject_partial_signatures(
130        &self,
131        authority_name: AuthorityName,
132        round: RandomnessRound,
133        sigs: Vec<RandomnessPartialSignature>,
134        result_channel: oneshot::Sender<Result<()>>,
135    ) {
136        self.sender
137            .try_send(RandomnessMessage::AdminInjectPartialSignatures(
138                authority_name,
139                round,
140                sigs,
141                result_channel,
142            ))
143            .expect("RandomnessEventLoop mailbox should not overflow or be closed")
144    }
145
146    /// Admin interface handler: injects full signature for the given round at
147    /// the current epoch, skipping validity checks.
148    pub fn admin_inject_full_signature(
149        &self,
150        round: RandomnessRound,
151        sig: RandomnessSignature,
152        result_channel: oneshot::Sender<Result<()>>,
153    ) {
154        self.sender
155            .try_send(RandomnessMessage::AdminInjectFullSignature(
156                round,
157                sig,
158                result_channel,
159            ))
160            .expect("RandomnessEventLoop mailbox should not overflow or be closed")
161    }
162
163    // For testing.
164    pub fn new_stub() -> Self {
165        let (sender, mut receiver) = mpsc::channel(1);
166        // Keep receiver open until all senders are closed.
167        tokio::spawn(async move {
168            loop {
169                tokio::select! {
170                    m = receiver.recv() => {
171                        if m.is_none() {
172                            break;
173                        }
174                    },
175                }
176            }
177        });
178        Self { sender }
179    }
180}
181
182#[derive(Debug)]
183enum RandomnessMessage {
184    UpdateEpoch(
185        EpochId,
186        HashMap<AuthorityName, (PeerId, PartyId)>,
187        dkg::Output<bls12381::G2Element, bls12381::G2Element>,
188        u16,                     // aggregation_threshold
189        Option<RandomnessRound>, // recovered_highest_completed_round
190    ),
191    SendPartialSignatures(EpochId, RandomnessRound),
192    CompleteRound(EpochId, RandomnessRound),
193    ReceiveSignatures(
194        PeerId,
195        EpochId,
196        RandomnessRound,
197        Vec<Vec<u8>>,
198        Option<RandomnessSignature>,
199    ),
200    MaybeIgnoreByzantinePeer(EpochId, PeerId),
201    AdminGetPartialSignatures(RandomnessRound, oneshot::Sender<Vec<u8>>),
202    AdminInjectPartialSignatures(
203        AuthorityName,
204        RandomnessRound,
205        Vec<RandomnessPartialSignature>,
206        oneshot::Sender<Result<()>>,
207    ),
208    AdminInjectFullSignature(
209        RandomnessRound,
210        RandomnessSignature,
211        oneshot::Sender<Result<()>>,
212    ),
213}
214
215struct RandomnessEventLoop {
216    name: AuthorityName,
217    config: RandomnessConfig,
218    mailbox: mpsc::Receiver<RandomnessMessage>,
219    mailbox_sender: mpsc::WeakSender<RandomnessMessage>,
220    network: anemo::Network,
221    allowed_peers: AllowedPeersUpdatable,
222    allowed_peers_set: HashSet<PeerId>,
223    metrics: Metrics,
224    randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
225
226    epoch: EpochId,
227    authority_info: Arc<HashMap<AuthorityName, (PeerId, PartyId)>>,
228    peer_share_ids: Option<HashMap<PeerId, Vec<ShareIndex>>>,
229    blocked_share_id_count: usize,
230    dkg_output: Option<dkg::Output<bls12381::G2Element, bls12381::G2Element>>,
231    aggregation_threshold: u16,
232    highest_requested_round: BTreeMap<EpochId, RandomnessRound>,
233    send_tasks: BTreeMap<
234        RandomnessRound,
235        (
236            tokio::task::JoinHandle<()>,
237            Arc<OnceCell<RandomnessSignature>>,
238        ),
239    >,
240    round_request_time: BTreeMap<(EpochId, RandomnessRound), time::Instant>,
241    future_epoch_partial_sigs: BTreeMap<(EpochId, RandomnessRound, PeerId), Vec<Vec<u8>>>,
242    received_partial_sigs: BTreeMap<(RandomnessRound, PeerId), Vec<RandomnessPartialSignature>>,
243    completed_sigs: BTreeMap<RandomnessRound, RandomnessSignature>,
244    highest_completed_round: BTreeMap<EpochId, RandomnessRound>,
245}
246
247impl RandomnessEventLoop {
248    pub async fn start(mut self) {
249        info!("Randomness network event loop started");
250
251        loop {
252            tokio::select! {
253                maybe_message = self.mailbox.recv() => {
254                    // Once all handles to our mailbox have been dropped this
255                    // will yield `None` and we can terminate the event loop.
256                    if let Some(message) = maybe_message {
257                        self.handle_message(message);
258                    } else {
259                        break;
260                    }
261                },
262            }
263        }
264
265        info!("Randomness network event loop ended");
266    }
267
268    fn handle_message(&mut self, message: RandomnessMessage) {
269        match message {
270            RandomnessMessage::UpdateEpoch(
271                epoch,
272                authority_info,
273                dkg_output,
274                aggregation_threshold,
275                recovered_highest_completed_round,
276            ) => {
277                if let Err(e) = self.update_epoch(
278                    epoch,
279                    authority_info,
280                    dkg_output,
281                    aggregation_threshold,
282                    recovered_highest_completed_round,
283                ) {
284                    error!("BUG: failed to update epoch in RandomnessEventLoop: {e:?}");
285                }
286            }
287            RandomnessMessage::SendPartialSignatures(epoch, round) => {
288                self.send_partial_signatures(epoch, round)
289            }
290            RandomnessMessage::CompleteRound(epoch, round) => self.complete_round(epoch, round),
291            RandomnessMessage::ReceiveSignatures(peer_id, epoch, round, partial_sigs, sig) => {
292                if let Some(sig) = sig {
293                    self.receive_full_signature(peer_id, epoch, round, sig)
294                } else {
295                    self.receive_partial_signatures(peer_id, epoch, round, partial_sigs)
296                }
297            }
298            RandomnessMessage::MaybeIgnoreByzantinePeer(epoch, peer_id) => {
299                self.maybe_ignore_byzantine_peer(epoch, peer_id)
300            }
301            RandomnessMessage::AdminGetPartialSignatures(round, tx) => {
302                self.admin_get_partial_signatures(round, tx)
303            }
304            RandomnessMessage::AdminInjectPartialSignatures(
305                authority_name,
306                round,
307                sigs,
308                result_channel,
309            ) => {
310                let _ = result_channel.send(self.admin_inject_partial_signatures(
311                    authority_name,
312                    round,
313                    sigs,
314                ));
315            }
316            RandomnessMessage::AdminInjectFullSignature(round, sig, result_channel) => {
317                let _ = result_channel.send(self.admin_inject_full_signature(round, sig));
318            }
319        }
320    }
321
322    #[instrument(level = "debug", skip_all, fields(?new_epoch))]
323    fn update_epoch(
324        &mut self,
325        new_epoch: EpochId,
326        authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
327        dkg_output: dkg::Output<bls12381::G2Element, bls12381::G2Element>,
328        aggregation_threshold: u16,
329        recovered_highest_completed_round: Option<RandomnessRound>,
330    ) -> Result<()> {
331        assert!(self.dkg_output.is_none() || new_epoch > self.epoch);
332
333        debug!("updating randomness network loop to new epoch");
334
335        self.peer_share_ids = Some(authority_info.iter().try_fold(
336            HashMap::new(),
337            |mut acc, (_name, (peer_id, party_id))| -> Result<_> {
338                let ids = dkg_output
339                    .nodes
340                    .share_ids_of(*party_id)
341                    .expect("party_id should be valid");
342                acc.insert(*peer_id, ids);
343                Ok(acc)
344            },
345        )?);
346        self.allowed_peers_set = authority_info
347            .values()
348            .map(|(peer_id, _)| *peer_id)
349            .collect();
350        self.allowed_peers
351            .update(Arc::new(self.allowed_peers_set.clone()));
352        self.epoch = new_epoch;
353        self.authority_info = Arc::new(authority_info);
354        self.dkg_output = Some(dkg_output);
355        self.aggregation_threshold = aggregation_threshold;
356        if let Some(round) = recovered_highest_completed_round {
357            self.highest_completed_round
358                .entry(new_epoch)
359                .and_modify(|r| *r = std::cmp::max(*r, round))
360                .or_insert(round);
361        }
362        for (_, (task, _)) in std::mem::take(&mut self.send_tasks) {
363            task.abort();
364        }
365        self.metrics.set_epoch(new_epoch);
366
367        // Throw away info from old epochs.
368        self.highest_requested_round = self.highest_requested_round.split_off(&new_epoch);
369        self.round_request_time = self
370            .round_request_time
371            .split_off(&(new_epoch, RandomnessRound(0)));
372        self.received_partial_sigs.clear();
373        self.completed_sigs.clear();
374        self.highest_completed_round = self.highest_completed_round.split_off(&new_epoch);
375
376        // Start any pending tasks for the new epoch.
377        self.maybe_start_pending_tasks();
378
379        // Aggregate any sigs received early from the new epoch.
380        // (We can't call `maybe_aggregate_partial_signatures` directly while iterating,
381        // because it takes `&mut self`, so we store in a Vec first.)
382        for ((epoch, round, peer_id), sig_bytes) in
383            std::mem::take(&mut self.future_epoch_partial_sigs)
384        {
385            // We can fully validate these now that we have current epoch DKG output.
386            self.receive_partial_signatures(peer_id, epoch, round, sig_bytes);
387        }
388        let rounds_to_aggregate: Vec<_> =
389            self.received_partial_sigs.keys().map(|(r, _)| *r).collect();
390        for round in rounds_to_aggregate {
391            self.maybe_aggregate_partial_signatures(new_epoch, round);
392        }
393
394        Ok(())
395    }
396
397    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
398    fn send_partial_signatures(&mut self, epoch: EpochId, round: RandomnessRound) {
399        if epoch < self.epoch {
400            error!(
401                "BUG: skipping sending partial sigs, we are already up to epoch {}",
402                self.epoch
403            );
404            debug_assert!(
405                false,
406                "skipping sending partial sigs, we are already up to higher epoch"
407            );
408            return;
409        }
410        if epoch == self.epoch {
411            if let Some(highest_completed_round) = self.highest_completed_round.get(&epoch) {
412                if round <= *highest_completed_round {
413                    info!("skipping sending partial sigs, we already have completed this round");
414                    return;
415                }
416            }
417        }
418
419        self.highest_requested_round
420            .entry(epoch)
421            .and_modify(|r| *r = std::cmp::max(*r, round))
422            .or_insert(round);
423        self.round_request_time
424            .insert((epoch, round), time::Instant::now());
425        self.maybe_start_pending_tasks();
426    }
427
428    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
429    fn complete_round(&mut self, epoch: EpochId, round: RandomnessRound) {
430        debug!("completing randomness round");
431        let new_highest_round = *self
432            .highest_completed_round
433            .entry(epoch)
434            .and_modify(|r| *r = std::cmp::max(*r, round))
435            .or_insert(round);
436        if round != new_highest_round {
437            // This round completion came out of order, and we're already ahead. Nothing
438            // more to do in that case.
439            return;
440        }
441
442        self.round_request_time = self.round_request_time.split_off(&(epoch, round + 1));
443
444        if epoch == self.epoch {
445            self.remove_partial_sigs_in_range((
446                Bound::Included((RandomnessRound(0), PeerId([0; 32]))),
447                Bound::Excluded((round + 1, PeerId([0; 32]))),
448            ));
449            self.completed_sigs = self.completed_sigs.split_off(&(round + 1));
450            for (_, (task, _)) in self.send_tasks.iter().take_while(|(r, _)| **r <= round) {
451                task.abort();
452            }
453            self.send_tasks = self.send_tasks.split_off(&(round + 1));
454            self.maybe_start_pending_tasks();
455        }
456
457        self.update_rounds_pending_metric();
458    }
459
460    #[instrument(level = "debug", skip_all, fields(?peer_id, ?epoch, ?round))]
461    fn receive_partial_signatures(
462        &mut self,
463        peer_id: PeerId,
464        epoch: EpochId,
465        round: RandomnessRound,
466        sig_bytes: Vec<Vec<u8>>,
467    ) {
468        // Basic validity checks.
469        if epoch < self.epoch {
470            debug!(
471                "skipping received partial sigs, we are already up to epoch {}",
472                self.epoch
473            );
474            return;
475        }
476        if epoch > self.epoch + 1 {
477            debug!(
478                "skipping received partial sigs, we are still on epoch {}",
479                self.epoch
480            );
481            return;
482        }
483        if epoch == self.epoch && self.completed_sigs.contains_key(&round) {
484            debug!("skipping received partial sigs, we already have completed this sig");
485            return;
486        }
487        let highest_completed_round = self.highest_completed_round.get(&epoch).copied();
488        if let Some(highest_completed_round) = &highest_completed_round {
489            if *highest_completed_round >= round {
490                debug!("skipping received partial sigs, we already have completed this round");
491                return;
492            }
493        }
494
495        // If sigs are for a future epoch, we can't fully verify them without DKG
496        // output. Save them for later use.
497        if epoch != self.epoch || self.peer_share_ids.is_none() {
498            if round.0 >= self.config.max_partial_sigs_rounds_ahead() {
499                debug!("skipping received partial sigs for future epoch, round too far ahead",);
500                return;
501            }
502
503            debug!("saving partial sigs from future epoch for later use");
504            self.future_epoch_partial_sigs
505                .insert((epoch, round, peer_id), sig_bytes);
506            return;
507        }
508
509        // Verify shape of sigs matches what we expect for the peer.
510        let peer_share_ids = self.peer_share_ids.as_ref().expect("checked above");
511        let expected_share_ids = if let Some(expected_share_ids) = peer_share_ids.get(&peer_id) {
512            expected_share_ids
513        } else {
514            debug!("received partial sigs from unknown peer");
515            return;
516        };
517        if sig_bytes.len() != expected_share_ids.len() as usize {
518            warn!(
519                "received partial sigs with wrong share ids count: expected {}, got {}",
520                expected_share_ids.len(),
521                sig_bytes.len(),
522            );
523            return;
524        }
525
526        // Accept partial signatures up to `max_partial_sigs_rounds_ahead` past the
527        // round of the last completed signature, or the highest completed
528        // round, whichever is greater.
529        let last_completed_signature = self.completed_sigs.last_key_value().map(|(r, _)| *r);
530        let last_completed_round = std::cmp::max(last_completed_signature, highest_completed_round)
531            .unwrap_or(RandomnessRound(0));
532        if round.0
533            >= last_completed_round
534                .0
535                .saturating_add(self.config.max_partial_sigs_rounds_ahead())
536        {
537            debug!(
538                "skipping received partial sigs, most recent round we completed was only {last_completed_round}",
539            );
540            return;
541        }
542
543        // Deserialize the partial sigs.
544        let partial_sigs =
545            match sig_bytes
546                .iter()
547                .try_fold(Vec::new(), |mut acc, bytes| -> Result<_> {
548                    let sig: RandomnessPartialSignature = bcs::from_bytes(bytes)?;
549                    acc.push(sig);
550                    Ok(acc)
551                }) {
552                Ok(partial_sigs) => partial_sigs,
553                Err(e) => {
554                    warn!("failed to deserialize partial sigs: {e:?}");
555                    return;
556                }
557            };
558        // Verify we received the expected share IDs (to protect against a validator
559        // that sends valid signatures of other peers which will be successfully
560        // verified below).
561        let received_share_ids = partial_sigs.iter().map(|s| s.index);
562        if received_share_ids
563            .zip(expected_share_ids.iter())
564            .any(|(a, b)| a != *b)
565        {
566            let received_share_ids = partial_sigs.iter().map(|s| s.index).collect::<Vec<_>>();
567            warn!(
568                "received partial sigs with wrong share ids: expected {expected_share_ids:?}, received {received_share_ids:?}"
569            );
570            return;
571        }
572
573        // We passed all the checks, save the partial sigs.
574        debug!("recording received partial signatures");
575        self.received_partial_sigs
576            .insert((round, peer_id), partial_sigs);
577
578        self.maybe_aggregate_partial_signatures(epoch, round);
579    }
580
581    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
582    fn maybe_aggregate_partial_signatures(&mut self, epoch: EpochId, round: RandomnessRound) {
583        if let Some(highest_completed_round) = self.highest_completed_round.get(&epoch) {
584            if round <= *highest_completed_round {
585                info!("skipping aggregation for already-completed round");
586                return;
587            }
588        }
589
590        let highest_requested_round = self.highest_requested_round.get(&epoch);
591        if highest_requested_round.is_none() || round > *highest_requested_round.unwrap() {
592            // We have to wait here, because even if we have enough information from other
593            // nodes to complete the signature, local shared object versions are
594            // not set until consensus finishes processing the corresponding
595            // commit. This function will be called again
596            // after maybe_start_pending_tasks begins this round locally.
597            debug!(
598                "waiting to aggregate randomness partial signatures until local consensus catches up"
599            );
600            return;
601        }
602
603        if epoch != self.epoch {
604            debug!(
605                "waiting to aggregate randomness partial signatures until DKG completes for epoch"
606            );
607            return;
608        }
609
610        if self.completed_sigs.contains_key(&round) {
611            info!("skipping aggregation for already-completed signature");
612            return;
613        }
614
615        let vss_pk = {
616            let Some(dkg_output) = &self.dkg_output else {
617                debug!("called maybe_aggregate_partial_signatures before DKG completed");
618                return;
619            };
620            &dkg_output.vss_pk
621        };
622
623        let sig_bounds = (
624            Bound::Included((round, PeerId([0; 32]))),
625            Bound::Excluded((round + 1, PeerId([0; 32]))),
626        );
627
628        // If we have enough partial signatures, aggregate them.
629        let sig_range = self
630            .received_partial_sigs
631            .range(sig_bounds)
632            .flat_map(|(_, sigs)| sigs);
633        let mut sig = match ThresholdBls12381MinSig::aggregate(
634            self.aggregation_threshold,
635            sig_range,
636        ) {
637            Ok(sig) => sig,
638            Err(fastcrypto::error::FastCryptoError::NotEnoughInputs) => return, // wait for more
639            // input
640            Err(e) => {
641                error!("error while aggregating randomness partial signatures: {e:?}");
642                return;
643            }
644        };
645
646        // Try to verify the aggregated signature all at once. (Should work in the happy
647        // path.)
648        if ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig).is_err() {
649            // If verifiation fails, some of the inputs must be invalid. We have to go
650            // through one-by-one to find which.
651            // TODO: add test for individual sig verification.
652            self.received_partial_sigs
653                .retain(|&(r, peer_id), partial_sigs| {
654                    if round != r {
655                        return true;
656                    }
657                    if ThresholdBls12381MinSig::partial_verify_batch(
658                        vss_pk,
659                        &round.signature_message(),
660                        partial_sigs.iter(),
661                        &mut rand::thread_rng(),
662                    )
663                    .is_err()
664                    {
665                        warn!(
666                            "received invalid partial signatures from possibly-Byzantine peer {peer_id}"
667                        );
668                        if let Some(sender) = self.mailbox_sender.upgrade() {
669                            sender.try_send(RandomnessMessage::MaybeIgnoreByzantinePeer(
670                                epoch,
671                                peer_id,
672                            ))
673                            .expect("RandomnessEventLoop mailbox should not overflow or be closed");
674                        }
675                        return false;
676                    }
677                    true
678                });
679            let sig_range = self
680                .received_partial_sigs
681                .range(sig_bounds)
682                .flat_map(|(_, sigs)| sigs);
683            sig = match ThresholdBls12381MinSig::aggregate(self.aggregation_threshold, sig_range) {
684                Ok(sig) => sig,
685                Err(fastcrypto::error::FastCryptoError::NotEnoughInputs) => return, /* wait for more input */
686                Err(e) => {
687                    error!("error while aggregating randomness partial signatures: {e:?}");
688                    return;
689                }
690            };
691            if let Err(e) =
692                ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
693            {
694                error!(
695                    "error while verifying randomness partial signatures after removing invalid partials: {e:?}"
696                );
697                debug_assert!(
698                    false,
699                    "error while verifying randomness partial signatures after removing invalid partials"
700                );
701                return;
702            }
703        }
704
705        debug!("successfully generated randomness full signature");
706        self.process_valid_full_signature(epoch, round, sig);
707    }
708
709    #[instrument(level = "debug", skip_all, fields(?peer_id, ?epoch, ?round))]
710    fn receive_full_signature(
711        &mut self,
712        peer_id: PeerId,
713        epoch: EpochId,
714        round: RandomnessRound,
715        sig: RandomnessSignature,
716    ) {
717        let vss_pk = {
718            let Some(dkg_output) = &self.dkg_output else {
719                debug!("called receive_full_signature before DKG completed");
720                return;
721            };
722            &dkg_output.vss_pk
723        };
724
725        // Basic validity checks.
726        if epoch != self.epoch {
727            debug!("skipping received full sig, we are on epoch {}", self.epoch);
728            return;
729        }
730        if self.completed_sigs.contains_key(&round) {
731            debug!("skipping received full sigs, we already have completed this sig");
732            return;
733        }
734        let highest_completed_round = self.highest_completed_round.get(&epoch).copied();
735        if let Some(highest_completed_round) = &highest_completed_round {
736            if *highest_completed_round >= round {
737                debug!("skipping received full sig, we already have completed this round");
738                return;
739            }
740        }
741
742        let highest_requested_round = self.highest_requested_round.get(&epoch);
743        if highest_requested_round.is_none() || round > *highest_requested_round.unwrap() {
744            // Wait for local consensus to catch up if necessary.
745            debug!(
746                "skipping received full signature, local consensus is not caught up to its round"
747            );
748            return;
749        }
750
751        if let Err(e) =
752            ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
753        {
754            info!("received invalid full signature from peer {peer_id}: {e:?}");
755            if let Some(sender) = self.mailbox_sender.upgrade() {
756                sender
757                    .try_send(RandomnessMessage::MaybeIgnoreByzantinePeer(epoch, peer_id))
758                    .expect("RandomnessEventLoop mailbox should not overflow or be closed");
759            }
760            return;
761        }
762
763        debug!("received valid randomness full signature");
764        self.process_valid_full_signature(epoch, round, sig);
765    }
766
767    fn process_valid_full_signature(
768        &mut self,
769        epoch: EpochId,
770        round: RandomnessRound,
771        sig: RandomnessSignature,
772    ) {
773        assert_eq!(epoch, self.epoch);
774
775        if let Some((_, full_sig_cell)) = self.send_tasks.get(&round) {
776            full_sig_cell
777                .set(sig)
778                .expect("full signature should never be processed twice");
779        }
780        self.completed_sigs.insert(round, sig);
781        self.remove_partial_sigs_in_range((
782            Bound::Included((round, PeerId([0; 32]))),
783            Bound::Excluded((round + 1, PeerId([0; 32]))),
784        ));
785        self.metrics.record_completed_round(round);
786        if let Some(start_time) = self.round_request_time.get(&(epoch, round)) {
787            if let Some(metric) = self.metrics.round_generation_latency_metric() {
788                metric.observe(start_time.elapsed().as_secs_f64());
789            }
790        }
791
792        let sig_bytes = bcs::to_bytes(&sig).expect("signature serialization should not fail");
793        self.randomness_tx
794            .try_send((epoch, round, sig_bytes))
795            .expect("RandomnessRoundReceiver mailbox should not overflow or be closed");
796    }
797
798    fn maybe_ignore_byzantine_peer(&mut self, epoch: EpochId, peer_id: PeerId) {
799        if epoch != self.epoch {
800            return; // make sure we're still on the same epoch
801        }
802        let Some(dkg_output) = &self.dkg_output else {
803            return; // can't ignore a peer if we haven't finished DKG
804        };
805        if !self.allowed_peers_set.contains(&peer_id) {
806            return; // peer is already disallowed
807        }
808        let Some(peer_share_ids) = &self.peer_share_ids else {
809            return; // can't ignore a peer if we haven't finished DKG
810        };
811        let Some(peer_shares) = peer_share_ids.get(&peer_id) else {
812            warn!("can't ignore unknown byzantine peer {peer_id:?}");
813            return;
814        };
815        let max_ignored_shares = (self.config.max_ignored_peer_weight_factor()
816            * (dkg_output.nodes.total_weight() as f64)) as usize;
817        if self.blocked_share_id_count + peer_shares.len() > max_ignored_shares {
818            warn!(
819                "ignoring byzantine peer {peer_id:?} with {} shares would exceed max ignored peer weight {max_ignored_shares}",
820                peer_shares.len()
821            );
822            return;
823        }
824
825        warn!(
826            "ignoring byzantine peer {peer_id:?} with {} shares",
827            peer_shares.len()
828        );
829        self.blocked_share_id_count += peer_shares.len();
830        self.allowed_peers_set.remove(&peer_id);
831        self.allowed_peers
832            .update(Arc::new(self.allowed_peers_set.clone()));
833        self.metrics.inc_num_ignored_byzantine_peers();
834    }
835
836    fn maybe_start_pending_tasks(&mut self) {
837        let dkg_output = if let Some(dkg_output) = &self.dkg_output {
838            dkg_output
839        } else {
840            return; // wait for DKG
841        };
842        let shares = if let Some(shares) = &dkg_output.shares {
843            shares
844        } else {
845            return; // can't participate in randomness generation without shares
846        };
847        let highest_requested_round =
848            if let Some(highest_requested_round) = self.highest_requested_round.get(&self.epoch) {
849                highest_requested_round
850            } else {
851                return; // no rounds to start
852            };
853        // Begin from the next round after the most recent one we've started (or, if
854        // none are running, after the highest completed round in the epoch).
855        let start_round = std::cmp::max(
856            if let Some(highest_completed_round) = self.highest_completed_round.get(&self.epoch) {
857                highest_completed_round.checked_add(1).unwrap()
858            } else {
859                RandomnessRound(0)
860            },
861            self.send_tasks
862                .last_key_value()
863                .map(|(r, _)| r.checked_add(1).unwrap())
864                .unwrap_or(RandomnessRound(0)),
865        );
866
867        let mut rounds_to_aggregate = Vec::new();
868        for round in start_round.0..=highest_requested_round.0 {
869            let round = RandomnessRound(round);
870
871            if self.send_tasks.len() >= self.config.max_partial_sigs_concurrent_sends() {
872                break; // limit concurrent tasks
873            }
874
875            let full_sig_cell = Arc::new(OnceCell::new());
876            self.send_tasks.entry(round).or_insert_with(|| {
877                let name = self.name;
878                let network = self.network.clone();
879                let retry_interval = self.config.partial_signature_retry_interval();
880                let metrics = self.metrics.clone();
881                let authority_info = self.authority_info.clone();
882                let epoch = self.epoch;
883                let partial_sigs = ThresholdBls12381MinSig::partial_sign_batch(
884                    shares.iter(),
885                    &round.signature_message(),
886                );
887                let full_sig_cell_clone = full_sig_cell.clone();
888
889                // Record own partial sigs.
890                if !self.completed_sigs.contains_key(&round) {
891                    self.received_partial_sigs
892                        .insert((round, self.network.peer_id()), partial_sigs.clone());
893                    rounds_to_aggregate.push((epoch, round));
894                }
895
896                debug!("sending partial sigs for epoch {epoch}, round {round}");
897                (
898                    spawn_monitored_task!(RandomnessEventLoop::send_signatures_task(
899                        name,
900                        network,
901                        retry_interval,
902                        metrics,
903                        authority_info,
904                        epoch,
905                        round,
906                        partial_sigs,
907                        full_sig_cell_clone,
908                    )),
909                    full_sig_cell,
910                )
911            });
912        }
913
914        self.update_rounds_pending_metric();
915
916        // After starting a round, we have generated our own partial sigs. Check if
917        // that's enough for us to aggregate already.
918        for (epoch, round) in rounds_to_aggregate {
919            self.maybe_aggregate_partial_signatures(epoch, round);
920        }
921    }
922
923    #[expect(clippy::type_complexity)]
924    fn remove_partial_sigs_in_range(
925        &mut self,
926        range: (
927            Bound<(RandomnessRound, PeerId)>,
928            Bound<(RandomnessRound, PeerId)>,
929        ),
930    ) {
931        let keys_to_remove: Vec<_> = self
932            .received_partial_sigs
933            .range(range)
934            .map(|(key, _)| *key)
935            .collect();
936        for key in keys_to_remove {
937            // Have to remove keys one-by-one because BTreeMap does not support
938            // range-removal.
939            self.received_partial_sigs.remove(&key);
940        }
941    }
942
943    async fn send_signatures_task(
944        name: AuthorityName,
945        network: anemo::Network,
946        retry_interval: Duration,
947        metrics: Metrics,
948        authority_info: Arc<HashMap<AuthorityName, (PeerId, PartyId)>>,
949        epoch: EpochId,
950        round: RandomnessRound,
951        partial_sigs: Vec<RandomnessPartialSignature>,
952        full_sig: Arc<OnceCell<RandomnessSignature>>,
953    ) {
954        // For simtests, we may test not sending partial signatures.
955        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
956        let mut fail_point_skip_sending = false;
957        fail_point_if!("rb-send-partial-signatures", || {
958            fail_point_skip_sending = true;
959        });
960        if fail_point_skip_sending {
961            warn!("skipping sending partial sigs due to simtest fail point");
962            return;
963        }
964
965        let _metrics_guard = metrics
966            .round_observation_latency_metric()
967            .map(|metric| metric.start_timer());
968
969        let peers: HashMap<_, _> = authority_info
970            .iter()
971            .map(|(name, (peer_id, _party_id))| (name, network.waiting_peer(*peer_id)))
972            .collect();
973        let partial_sigs: Vec<_> = partial_sigs
974            .iter()
975            .map(|sig| bcs::to_bytes(sig).expect("message serialization should not fail"))
976            .collect();
977
978        loop {
979            let mut requests = Vec::new();
980            for (peer_name, peer) in &peers {
981                if name == **peer_name {
982                    continue; // don't send partial sigs to self
983                }
984                let mut client = RandomnessClient::new(peer.clone());
985                // `test_byzantine_peer_handling` built in debug mode takes
986                // longer to verify invalid signatures and thus needs larger
987                // timeouts.
988                #[cfg(test)]
989                const SEND_PARTIAL_SIGNATURES_TIMEOUT: Duration = Duration::from_secs(300);
990                // In release signature verification should take less, so
991                // smaller timeout should be enough.
992                #[cfg(not(test))]
993                const SEND_PARTIAL_SIGNATURES_TIMEOUT: Duration = Duration::from_secs(10);
994                let full_sig = full_sig.get().cloned();
995                let request = anemo::Request::new(SendSignaturesRequest {
996                    epoch,
997                    round,
998                    partial_sigs: if full_sig.is_none() {
999                        partial_sigs.clone()
1000                    } else {
1001                        Vec::new()
1002                    },
1003                    sig: full_sig,
1004                })
1005                .with_timeout(SEND_PARTIAL_SIGNATURES_TIMEOUT);
1006                requests.push(async move {
1007                    let result = client.send_signatures(request).await;
1008                    if let Err(_error) = result {
1009                        // TODO: add Display impl to anemo::rpc::Status, log it here
1010                        debug!("failed to send partial signatures to {peer_name}");
1011                    }
1012                });
1013            }
1014
1015            // Process all requests.
1016            futures::future::join_all(requests).await;
1017
1018            // Keep retrying send to all peers until task is aborted via external message.
1019            tokio::time::sleep(retry_interval).await;
1020        }
1021    }
1022
1023    fn update_rounds_pending_metric(&self) {
1024        let highest_requested_round = self
1025            .highest_requested_round
1026            .get(&self.epoch)
1027            .map(|r| r.0)
1028            .unwrap_or(0);
1029        let highest_completed_round = self
1030            .highest_completed_round
1031            .get(&self.epoch)
1032            .map(|r| r.0)
1033            .unwrap_or(0);
1034        let num_rounds_pending =
1035            highest_requested_round.saturating_sub(highest_completed_round) as i64;
1036        let prev_value = self.metrics.num_rounds_pending().unwrap_or_default();
1037        if num_rounds_pending / 100 > prev_value / 100 {
1038            warn!(
1039                // Recording multiples of 100 so tests can match on the log message.
1040                "RandomnessEventLoop randomness generation backlog: over {} rounds are pending (oldest is {:?})",
1041                (num_rounds_pending / 100) * 100,
1042                highest_completed_round + 1,
1043            );
1044        }
1045        self.metrics.set_num_rounds_pending(num_rounds_pending);
1046    }
1047
1048    fn admin_get_partial_signatures(&self, round: RandomnessRound, tx: oneshot::Sender<Vec<u8>>) {
1049        let shares = if let Some(shares) = self.dkg_output.as_ref().and_then(|d| d.shares.as_ref())
1050        {
1051            shares
1052        } else {
1053            let _ = tx.send(Vec::new()); // no error handling needed if receiver is already dropped
1054            return;
1055        };
1056
1057        let partial_sigs =
1058            ThresholdBls12381MinSig::partial_sign_batch(shares.iter(), &round.signature_message());
1059        // no error handling needed if receiver is already dropped
1060        let _ = tx.send(bcs::to_bytes(&partial_sigs).expect("serialization should not fail"));
1061    }
1062
1063    fn admin_inject_partial_signatures(
1064        &mut self,
1065        authority_name: AuthorityName,
1066        round: RandomnessRound,
1067        sigs: Vec<RandomnessPartialSignature>,
1068    ) -> Result<()> {
1069        let peer_id = self
1070            .authority_info
1071            .get(&authority_name)
1072            .map(|(peer_id, _)| *peer_id)
1073            .ok_or(anyhow::anyhow!("unknown AuthorityName {authority_name:?}"))?;
1074        self.received_partial_sigs.insert((round, peer_id), sigs);
1075        self.maybe_aggregate_partial_signatures(self.epoch, round);
1076        Ok(())
1077    }
1078
1079    fn admin_inject_full_signature(
1080        &mut self,
1081        round: RandomnessRound,
1082        sig: RandomnessSignature,
1083    ) -> Result<()> {
1084        let vss_pk = {
1085            let Some(dkg_output) = &self.dkg_output else {
1086                return Err(anyhow::anyhow!(
1087                    "called admin_inject_full_signature before DKG completed"
1088                ));
1089            };
1090            &dkg_output.vss_pk
1091        };
1092
1093        ThresholdBls12381MinSig::verify(vss_pk.c0(), &round.signature_message(), &sig)
1094            .map_err(|e| anyhow::anyhow!("invalid full signature: {e:?}"))?;
1095
1096        self.process_valid_full_signature(self.epoch, round, sig);
1097        Ok(())
1098    }
1099}