iota_core/epoch/
randomness.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    sync::{Arc, Weak},
8    time::Instant,
9};
10
11use anemo::PeerId;
12use fastcrypto::{
13    encoding::{Encoding, Hex},
14    error::{FastCryptoError, FastCryptoResult},
15    groups::bls12381,
16    serde_helpers::ToFromByteArray,
17    traits::{KeyPair, ToFromBytes},
18};
19use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
20use futures::{StreamExt, stream::FuturesUnordered};
21use iota_macros::fail_point_if;
22use iota_network::randomness;
23use iota_types::{
24    base_types::{AuthorityName, CommitRound},
25    committee::{Committee, EpochId, StakeUnit},
26    crypto::{AuthorityKeyPair, RandomnessRound},
27    error::{IotaError, IotaResult},
28    iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
29    messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
30};
31use parking_lot::Mutex;
32use rand::{
33    SeedableRng,
34    rngs::{OsRng, StdRng},
35};
36use serde::{Deserialize, Serialize};
37use tokio::{sync::OnceCell, task::JoinHandle};
38use tracing::{debug, error, info, warn};
39use typed_store::Map;
40
41use crate::{
42    authority::{
43        authority_per_epoch_store::{
44            AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
45        },
46        epoch_start_configuration::EpochStartConfigTrait,
47    },
48    consensus_adapter::SubmitToConsensus,
49};
50
51/// The epoch UNIX timestamp in milliseconds
52pub type CommitTimestampMs = u64;
53
54type PkG = bls12381::G2Element;
55type EncG = bls12381::G2Element;
56
57pub const SINGLETON_KEY: u64 = 0;
58
59// Wrappers for DKG messages (to simplify upgrades).
60
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub enum VersionedProcessedMessage {
63    V1(dkg_v1::ProcessedMessage<PkG, EncG>),
64}
65
66impl VersionedProcessedMessage {
67    pub fn sender(&self) -> PartyId {
68        match self {
69            VersionedProcessedMessage::V1(msg) => msg.message.sender,
70        }
71    }
72
73    pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
74        match self {
75            VersionedProcessedMessage::V1(msg) => msg,
76        }
77    }
78
79    pub fn process(
80        party: Arc<dkg_v1::Party<PkG, EncG>>,
81        message: VersionedDkgMessage,
82    ) -> FastCryptoResult<VersionedProcessedMessage> {
83        // All inputs are verified in add_message, so we can assume they are of the
84        // correct version.
85        let processed = party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
86        Ok(VersionedProcessedMessage::V1(processed))
87    }
88
89    pub fn merge(
90        party: Arc<dkg_v1::Party<PkG, EncG>>,
91        messages: Vec<Self>,
92    ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
93        // All inputs were created by this validator, so we can assume they are of the
94        // correct version.
95        let (conf, msgs) = party.merge(
96            &messages
97                .into_iter()
98                .map(|vm| vm.unwrap_v1())
99                .collect::<Vec<_>>(),
100        )?;
101        Ok((
102            VersionedDkgConfirmation::V1(conf),
103            VersionedUsedProcessedMessages::V1(msgs),
104        ))
105    }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109pub enum VersionedUsedProcessedMessages {
110    V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
111}
112
113impl VersionedUsedProcessedMessages {
114    fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
115        &self,
116        party: Arc<dkg_v1::Party<PkG, EncG>>,
117        confirmations: Iter,
118    ) -> FastCryptoResult<Output<PkG, EncG>> {
119        // All inputs are verified in add_confirmation, so we can assume they are of the
120        // correct version.
121        let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
122        let VersionedUsedProcessedMessages::V1(msg) = self;
123        party.complete(
124            msg,
125            &confirmations
126                .map(|vm| vm.unwrap_v1())
127                .cloned()
128                .collect::<Vec<_>>(),
129            rng,
130        )
131    }
132}
133
134// State machine for randomness DKG and generation.
135//
136// DKG protocol:
137// 1. This validator sends out a `Message` to all other validators.
138// 2. Once sufficient valid `Message`s are received from other validators via
139//    consensus and processed, this validator sends out a `Confirmation` to all
140//    other validators.
141// 3. Once sufficient `Confirmation`s are received from other validators via
142//    consensus and processed, they are combined to form a public VSS key and
143//    local private key shares.
144// 4. Randomness generation begins.
145//
146// Randomness generation:
147// 1. For each new round, AuthorityPerEpochStore eventually calls
148//    `generate_randomness`.
149// 2. This kicks off a process in RandomnessEventLoop to send partial signatures
150//    for the new round to all other validators.
151// 3. Once enough partial signatures for the round are collected, a
152//    RandomnessStateUpdate transaction is generated and injected into the
153//    TransactionManager.
154// 4. Once the RandomnessStateUpdate transaction is seen in a certified
155//    checkpoint, `notify_randomness_in_checkpoint` is called to complete the
156//    round and stop sending partial signatures for it.
157pub struct RandomnessManager {
158    epoch_store: Weak<AuthorityPerEpochStore>,
159    epoch: EpochId,
160    consensus_adapter: Box<dyn SubmitToConsensus>,
161    network_handle: randomness::Handle,
162    authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
163
164    // State for DKG.
165    dkg_start_time: OnceCell<Instant>,
166    party: Arc<dkg_v1::Party<PkG, EncG>>,
167    enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
168    processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
169    used_messages: OnceCell<VersionedUsedProcessedMessages>,
170    confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
171    dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
172
173    // State for randomness generation.
174    next_randomness_round: RandomnessRound,
175    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
176}
177
178impl RandomnessManager {
179    // Returns None in case of invalid input or other failure to initialize DKG.
180    pub async fn try_new(
181        epoch_store_weak: Weak<AuthorityPerEpochStore>,
182        consensus_adapter: Box<dyn SubmitToConsensus>,
183        network_handle: randomness::Handle,
184        authority_key_pair: &AuthorityKeyPair,
185    ) -> Option<Self> {
186        let epoch_store = match epoch_store_weak.upgrade() {
187            Some(epoch_store) => epoch_store,
188            None => {
189                error!(
190                    "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
191                );
192                return None;
193            }
194        };
195        let tables = match epoch_store.tables() {
196            Ok(tables) => tables,
197            Err(_) => {
198                error!(
199                    "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
200                );
201                return None;
202            }
203        };
204        let protocol_config = epoch_store.protocol_config();
205
206        let name: AuthorityName = authority_key_pair.public().into();
207        let committee = epoch_store.committee();
208        let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
209        if tracing::enabled!(tracing::Level::DEBUG) {
210            // Log first few entries in DKG info for debugging.
211            for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
212                let pk_bytes = pk.as_element().to_byte_array();
213                debug!(
214                    "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
215                );
216            }
217        }
218        let authority_ids: HashMap<_, _> =
219            info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
220        let authority_peer_ids = epoch_store
221            .epoch_start_config()
222            .epoch_start_state()
223            .get_authority_names_to_peer_ids();
224        let authority_info = authority_ids
225            .into_iter()
226            .map(|(name, id)| {
227                let peer_id = *authority_peer_ids
228                    .get(&name)
229                    .expect("authority name should be in peer_ids");
230                (name, (peer_id, id))
231            })
232            .collect();
233        let nodes = info
234            .iter()
235            .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
236                id: *id,
237                pk: pk.clone(),
238                weight: (*stake).try_into().expect("stake should fit in u16"),
239            })
240            .collect();
241        let (nodes, t) = match nodes::Nodes::new_reduced(
242            nodes,
243            committee
244                .validity_threshold()
245                .try_into()
246                .expect("validity threshold should fit in u16"),
247            protocol_config.random_beacon_reduction_allowed_delta(),
248            protocol_config
249                .random_beacon_reduction_lower_bound()
250                .try_into()
251                .expect("should fit u16"),
252        ) {
253            Ok((nodes, t)) => (nodes, t),
254            Err(err) => {
255                error!("random beacon: error while initializing Nodes: {err:?}");
256                return None;
257            }
258        };
259        let total_weight = nodes.total_weight();
260        let num_nodes = nodes.num_nodes();
261        let prefix_str = format!(
262            "dkg {} {}",
263            Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
264            committee.epoch()
265        );
266        let randomness_private_key = bls12381::Scalar::from_byte_array(
267            authority_key_pair
268                .copy()
269                .private()
270                .as_bytes()
271                .try_into()
272                .expect("key length should match"),
273        )
274        .expect("should work to convert BLS key to Scalar");
275        let party = match dkg_v1::Party::<PkG, EncG>::new(
276            fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
277                randomness_private_key,
278            ),
279            nodes,
280            t,
281            fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
282            &mut rand::thread_rng(),
283        ) {
284            Ok(party) => party,
285            Err(err) => {
286                error!("random beacon: error while initializing Party: {err:?}");
287                return None;
288            }
289        };
290        info!(
291            "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
292        );
293
294        // Load existing data from store.
295        let highest_completed_round = tables
296            .randomness_highest_completed_round
297            .get(&SINGLETON_KEY)
298            .expect("typed_store should not fail");
299        let mut rm = RandomnessManager {
300            epoch_store: epoch_store_weak,
301            epoch: committee.epoch(),
302            consensus_adapter,
303            network_handle: network_handle.clone(),
304            authority_info,
305            dkg_start_time: OnceCell::new(),
306            party: Arc::new(party),
307            enqueued_messages: BTreeMap::new(),
308            processed_messages: BTreeMap::new(),
309            used_messages: OnceCell::new(),
310            confirmations: BTreeMap::new(),
311            dkg_output: OnceCell::new(),
312            next_randomness_round: RandomnessRound(0),
313            highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
314        };
315        let dkg_output = tables
316            .dkg_output
317            .get(&SINGLETON_KEY)
318            .expect("typed_store should not fail");
319        if let Some(dkg_output) = dkg_output {
320            info!(
321                "random beacon: loaded existing DKG output for epoch {}",
322                committee.epoch()
323            );
324            epoch_store
325                .metrics
326                .epoch_random_beacon_dkg_num_shares
327                .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
328            rm.dkg_output
329                .set(Some(dkg_output.clone()))
330                .expect("setting new OnceCell should succeed");
331            network_handle.update_epoch(
332                committee.epoch(),
333                rm.authority_info.clone(),
334                dkg_output,
335                rm.party.t(),
336                highest_completed_round,
337            );
338        } else {
339            info!(
340                "random beacon: no existing DKG output found for epoch {}",
341                committee.epoch()
342            );
343
344            // Load intermediate data.
345            assert!(
346                epoch_store.protocol_config().dkg_version() > 0,
347                "BUG: DKG version 0 is deprecated"
348            );
349            rm.processed_messages.extend(
350                tables
351                    .dkg_processed_messages
352                    .safe_iter()
353                    .map(|result| result.expect("typed_store should not fail")),
354            );
355            if let Some(used_messages) = tables
356                .dkg_used_messages
357                .get(&SINGLETON_KEY)
358                .expect("typed_store should not fail")
359            {
360                rm.used_messages
361                    .set(used_messages.clone())
362                    .expect("setting new OnceCell should succeed");
363            }
364            rm.confirmations.extend(
365                tables
366                    .dkg_confirmations
367                    .safe_iter()
368                    .map(|result| result.expect("typed_store should not fail")),
369            );
370        }
371
372        // Resume randomness generation from where we left off.
373        // This must be loaded regardless of whether DKG has finished yet, since the
374        // RandomnessEventLoop and commit-handling logic in AuthorityPerEpochStore both
375        // depend on this state.
376        rm.next_randomness_round = tables
377            .randomness_next_round
378            .get(&SINGLETON_KEY)
379            .expect("typed_store should not fail")
380            .unwrap_or(RandomnessRound(0));
381        info!(
382            "random beacon: starting from next_randomness_round={}",
383            rm.next_randomness_round.0
384        );
385        let first_incomplete_round = highest_completed_round
386            .map(|r| r + 1)
387            .unwrap_or(RandomnessRound(0));
388        if first_incomplete_round < rm.next_randomness_round {
389            info!(
390                "random beacon: resuming generation for randomness rounds from {} to {}",
391                first_incomplete_round,
392                rm.next_randomness_round - 1,
393            );
394            for r in first_incomplete_round.0..rm.next_randomness_round.0 {
395                network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
396            }
397        }
398
399        Some(rm)
400    }
401
402    /// Sends the initial dkg::Message to begin the randomness DKG protocol.
403    pub async fn start_dkg(&mut self) -> IotaResult {
404        if self.used_messages.initialized() || self.dkg_output.initialized() {
405            // DKG already started (or completed or failed).
406            return Ok(());
407        }
408
409        let _ = self.dkg_start_time.set(Instant::now());
410
411        let epoch_store = self.epoch_store()?;
412        let dkg_version = epoch_store.protocol_config().dkg_version();
413        info!("random beacon: starting DKG, version {dkg_version}");
414
415        let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
416            Ok(msg) => msg,
417            Err(FastCryptoError::IgnoredMessage) => {
418                info!(
419                    "random beacon: no DKG Message for party id={} (zero weight)",
420                    self.party.id
421                );
422                return Ok(());
423            }
424            Err(e) => {
425                error!("random beacon: error while creating a DKG Message: {e:?}");
426                return Ok(());
427            }
428        };
429
430        info!("random beacon: created {msg:?} with dkg version {dkg_version}");
431        let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
432
433        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
434        let mut fail_point_skip_sending = false;
435        fail_point_if!("rb-dkg", || {
436            // maybe skip sending in simtests
437            fail_point_skip_sending = true;
438        });
439        if !fail_point_skip_sending {
440            self.consensus_adapter
441                .submit_to_consensus(&[transaction], &epoch_store)?;
442        }
443
444        epoch_store
445            .metrics
446            .epoch_random_beacon_dkg_message_time_ms
447            .set(
448                self.dkg_start_time
449                    .get()
450                    .unwrap() // already set above
451                    .elapsed()
452                    .as_millis() as i64,
453            );
454        Ok(())
455    }
456
457    /// Processes all received messages and advances the randomness DKG state
458    /// machine when possible, sending out a dkg::Confirmation and
459    /// generating final output.
460    pub(crate) async fn advance_dkg(
461        &mut self,
462        consensus_output: &mut ConsensusCommitOutput,
463        round: CommitRound,
464    ) -> IotaResult {
465        let epoch_store = self.epoch_store()?;
466
467        // Once we have enough Messages, send a Confirmation.
468        if !self.dkg_output.initialized() && !self.used_messages.initialized() {
469            // Process all enqueued messages.
470            let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
471                .into_values()
472                .collect();
473            while let Some(res) = handles.next().await {
474                if let Ok(Some(processed)) = res {
475                    self.processed_messages
476                        .insert(processed.sender(), processed.clone());
477                    consensus_output.insert_dkg_processed_message(processed);
478                }
479            }
480
481            // Attempt to generate the Confirmation.
482            match VersionedProcessedMessage::merge(
483                self.party.clone(),
484                self.processed_messages
485                    .values()
486                    .cloned()
487                    .collect::<Vec<_>>(),
488            ) {
489                Ok((conf, used_msgs)) => {
490                    info!(
491                        "random beacon: sending DKG Confirmation with {} complaints",
492                        conf.num_of_complaints()
493                    );
494                    if self.used_messages.set(used_msgs.clone()).is_err() {
495                        error!("BUG: used_messages should only ever be set once");
496                    }
497                    consensus_output.insert_dkg_used_messages(used_msgs);
498
499                    let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
500                        epoch_store.name,
501                        &conf,
502                    );
503
504                    #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
505                    let mut fail_point_skip_sending = false;
506                    fail_point_if!("rb-dkg", || {
507                        // maybe skip sending in simtests
508                        fail_point_skip_sending = true;
509                    });
510                    if !fail_point_skip_sending {
511                        self.consensus_adapter
512                            .submit_to_consensus(&[transaction], &epoch_store)?;
513                    }
514
515                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
516                    if let Some(elapsed) = elapsed {
517                        epoch_store
518                            .metrics
519                            .epoch_random_beacon_dkg_confirmation_time_ms
520                            .set(elapsed as i64);
521                    }
522                }
523                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
524                Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
525            }
526        }
527
528        // Once we have enough Confirmations, process them and update shares.
529        if !self.dkg_output.initialized() && self.used_messages.initialized() {
530            match self
531                .used_messages
532                .get()
533                .expect("checked above that `used_messages` is initialized")
534                .complete_dkg(self.party.clone(), self.confirmations.values())
535            {
536                Ok(output) => {
537                    let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
538                    let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
539                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
540                    info!(
541                        "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
542                    );
543                    epoch_store
544                        .metrics
545                        .epoch_random_beacon_dkg_num_shares
546                        .set(num_shares as i64);
547                    epoch_store
548                        .metrics
549                        .epoch_random_beacon_dkg_epoch_start_completion_time_ms
550                        .set(epoch_elapsed as i64);
551                    epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
552                    if let Some(elapsed) = elapsed {
553                        epoch_store
554                            .metrics
555                            .epoch_random_beacon_dkg_completion_time_ms
556                            .set(elapsed as i64);
557                    }
558                    self.dkg_output
559                        .set(Some(output.clone()))
560                        .expect("checked above that `dkg_output` is uninitialized");
561                    self.network_handle.update_epoch(
562                        epoch_store.committee().epoch(),
563                        self.authority_info.clone(),
564                        output.clone(),
565                        self.party.t(),
566                        None,
567                    );
568                    consensus_output.set_dkg_output(output);
569                }
570                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
571                Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
572            }
573        }
574
575        // If we ran out of time, mark DKG as failed.
576        if !self.dkg_output.initialized()
577            && round
578                > epoch_store
579                    .protocol_config()
580                    .random_beacon_dkg_timeout_round()
581                    .into()
582        {
583            error!(
584                "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
585            );
586            epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
587            self.dkg_output
588                .set(None)
589                .expect("checked above that `dkg_output` is uninitialized");
590        }
591
592        Ok(())
593    }
594
595    /// Adds a received VersionedDkgMessage to the randomness DKG state machine.
596    pub fn add_message(
597        &mut self,
598        authority: &AuthorityName,
599        msg: VersionedDkgMessage,
600    ) -> IotaResult {
601        // message was received from other validators, so we need to ensure it uses a
602        // supported version before we call other functions that assume the
603        // version is correct
604        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
605        if !msg.is_valid_version(dkg_version) {
606            warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
607            return Ok(());
608        }
609
610        if self.used_messages.initialized() || self.dkg_output.initialized() {
611            // We've already sent a `Confirmation`, so we can't add any more messages.
612            return Ok(());
613        }
614        let Some((_, party_id)) = self.authority_info.get(authority) else {
615            error!("random beacon: received DKG Message from unknown authority: {authority:?}");
616            return Ok(());
617        };
618        if *party_id != msg.sender() {
619            warn!(
620                "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
621            );
622            return Ok(());
623        }
624        if self.enqueued_messages.contains_key(&msg.sender())
625            || self.processed_messages.contains_key(&msg.sender())
626        {
627            info!("ignoring duplicate DKG Message from authority {authority:?}");
628            return Ok(());
629        }
630
631        let party = self.party.clone();
632        // TODO: Could save some CPU by not processing messages if we already have
633        // enough to merge.
634        self.enqueued_messages.insert(
635            msg.sender(),
636            tokio::task::spawn_blocking(move || {
637                match VersionedProcessedMessage::process(party, msg) {
638                    Ok(processed) => Some(processed),
639                    Err(err) => {
640                        debug!("random beacon: error while processing DKG Message: {err:?}");
641                        None
642                    }
643                }
644            }),
645        );
646        Ok(())
647    }
648
649    /// Adds a received dkg::Confirmation to the randomness DKG state machine.
650    pub(crate) fn add_confirmation(
651        &mut self,
652        output: &mut ConsensusCommitOutput,
653        authority: &AuthorityName,
654        conf: VersionedDkgConfirmation,
655    ) -> IotaResult {
656        // confirmation was received from other validators, so we need to ensure it uses
657        // a supported version before we call other functions that assume the
658        // version is correct
659        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
660        if !conf.is_valid_version(dkg_version) {
661            warn!(
662                "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
663            );
664            return Ok(());
665        }
666
667        if self.dkg_output.initialized() {
668            // Once we have completed DKG, no more `Confirmation`s are needed.
669            return Ok(());
670        }
671        let Some((_, party_id)) = self.authority_info.get(authority) else {
672            error!(
673                "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
674            );
675            return Ok(());
676        };
677        if *party_id != conf.sender() {
678            warn!(
679                "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
680            );
681            return Ok(());
682        }
683        self.confirmations.insert(conf.sender(), conf.clone());
684        output.insert_dkg_confirmation(conf);
685        Ok(())
686    }
687
688    /// Reserves the next available round number for randomness generation if
689    /// enough time has elapsed, or returns None if not yet ready (based on
690    /// ProtocolConfig setting). Once the given batch is written,
691    /// `generate_randomness` must be called to start the process. On restart,
692    /// any reserved rounds for which the batch was written will automatically
693    /// be resumed.
694    pub(crate) fn reserve_next_randomness(
695        &mut self,
696        commit_timestamp: CommitTimestampMs,
697        output: &mut ConsensusCommitOutput,
698    ) -> IotaResult<Option<RandomnessRound>> {
699        let epoch_store = self.epoch_store()?;
700
701        let last_round_timestamp = epoch_store
702            .get_randomness_last_round_timestamp()
703            .expect("read should not fail");
704
705        if let Some(last_round_timestamp) = last_round_timestamp {
706            if commit_timestamp - last_round_timestamp
707                < epoch_store
708                    .protocol_config()
709                    .random_beacon_min_round_interval_ms()
710            {
711                return Ok(None);
712            }
713        }
714
715        let randomness_round = self.next_randomness_round;
716        self.next_randomness_round = self
717            .next_randomness_round
718            .checked_add(1)
719            .ok_or_else(|| IotaError::Unknown("RandomnessRound overflow".to_string()))?;
720
721        output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
722
723        Ok(Some(randomness_round))
724    }
725
726    /// Starts the process of generating the given RandomnessRound.
727    pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
728        self.network_handle
729            .send_partial_signatures(epoch, randomness_round);
730    }
731
732    pub fn dkg_status(&self) -> DkgStatus {
733        match self.dkg_output.get() {
734            Some(Some(_)) => DkgStatus::Successful,
735            Some(None) => DkgStatus::Failed,
736            None => DkgStatus::Pending,
737        }
738    }
739
740    /// Generates a new RandomnessReporter for reporting observed rounds to this
741    /// RandomnessManager.
742    pub fn reporter(&self) -> RandomnessReporter {
743        RandomnessReporter {
744            epoch_store: self.epoch_store.clone(),
745            epoch: self.epoch,
746            network_handle: self.network_handle.clone(),
747            highest_completed_round: self.highest_completed_round.clone(),
748        }
749    }
750
751    fn epoch_store(&self) -> IotaResult<Arc<AuthorityPerEpochStore>> {
752        self.epoch_store
753            .upgrade()
754            .ok_or(IotaError::EpochEnded(self.epoch))
755    }
756
757    fn randomness_dkg_info_from_committee(
758        committee: &Committee,
759    ) -> Vec<(
760        u16,
761        AuthorityName,
762        fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
763        StakeUnit,
764    )> {
765        committee
766            .members()
767            .map(|(name, stake)| {
768                let index: u16 = committee
769                    .authority_index(name)
770                    .expect("lookup of known committee member should succeed")
771                    .try_into()
772                    .expect("authority index should fit in u16");
773                let pk = bls12381::G2Element::from_byte_array(
774                    committee
775                        .public_key(name)
776                        .expect("lookup of known committee member should succeed")
777                        .as_bytes()
778                        .try_into()
779                        .expect("key length should match"),
780                )
781                .expect("should work to convert BLS key to G2Element");
782                (
783                    index,
784                    *name,
785                    fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
786                    *stake,
787                )
788            })
789            .collect()
790    }
791}
792
793// Used by other components to notify the randomness system of observed
794// randomness.
795#[derive(Clone)]
796pub struct RandomnessReporter {
797    epoch_store: Weak<AuthorityPerEpochStore>,
798    epoch: EpochId,
799    network_handle: randomness::Handle,
800    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
801}
802
803impl RandomnessReporter {
804    /// Notifies the associated randomness manager that randomness for the given
805    /// round has been durably committed in a checkpoint. This completes the
806    /// process of generating randomness for the round.
807    pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> IotaResult {
808        let epoch_store = self
809            .epoch_store
810            .upgrade()
811            .ok_or(IotaError::EpochEnded(self.epoch))?;
812        let mut highest_completed_round = self.highest_completed_round.lock();
813        if Some(round) > *highest_completed_round {
814            *highest_completed_round = Some(round);
815            epoch_store
816                .tables()?
817                .randomness_highest_completed_round
818                .insert(&SINGLETON_KEY, &round)?;
819            self.network_handle
820                .complete_round(epoch_store.committee().epoch(), round);
821        }
822        Ok(())
823    }
824}
825
826#[derive(Debug, Clone, Copy, PartialEq, Eq)]
827pub enum DkgStatus {
828    Pending,
829    Failed,
830    Successful,
831}
832
833#[cfg(test)]
834mod tests {
835    use std::num::NonZeroUsize;
836
837    use consensus_core::{BlockRef, BlockStatus};
838    use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
839    use iota_types::messages_consensus::ConsensusTransactionKind;
840    use tokio::sync::mpsc;
841
842    use crate::{
843        authority::{
844            authority_per_epoch_store::{ExecutionIndices, ExecutionIndicesWithStats},
845            test_authority_builder::TestAuthorityBuilder,
846        },
847        checkpoints::CheckpointStore,
848        consensus_adapter::{
849            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
850            MockConsensusClient,
851        },
852        epoch::randomness::*,
853        mock_consensus::with_block_status,
854    };
855
856    #[tokio::test]
857    async fn test_dkg_v1() {
858        test_dkg(1).await;
859    }
860
861    async fn test_dkg(version: u64) {
862        telemetry_subscribers::init_for_testing();
863
864        let network_config =
865            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
866                .committee_size(NonZeroUsize::new(4).unwrap())
867                .with_reference_gas_price(500)
868                .build();
869
870        let mut protocol_config =
871            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
872        protocol_config.set_random_beacon_dkg_version_for_testing(version);
873
874        let mut epoch_stores = Vec::new();
875        let mut randomness_managers = Vec::new();
876        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
877
878        for validator in network_config.validator_configs.iter() {
879            // Send consensus messages to channel.
880            let mut mock_consensus_client = MockConsensusClient::new();
881            let tx_consensus = tx_consensus.clone();
882            mock_consensus_client
883                .expect_submit()
884                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
885                    tx_consensus.try_send(transactions.to_vec()).unwrap();
886                    true
887                })
888                .returning(|_, _| Ok(with_block_status(BlockStatus::Sequenced(BlockRef::MIN))));
889
890            let state = TestAuthorityBuilder::new()
891                .with_protocol_config(protocol_config.clone())
892                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
893                .build()
894                .await;
895            let consensus_adapter = Arc::new(ConsensusAdapter::new(
896                Arc::new(mock_consensus_client),
897                CheckpointStore::new_for_tests(),
898                state.name,
899                Arc::new(ConnectionMonitorStatusForTests {}),
900                100_000,
901                100_000,
902                None,
903                None,
904                ConsensusAdapterMetrics::new_test(),
905            ));
906            let epoch_store = state.epoch_store_for_testing();
907            let randomness_manager = RandomnessManager::try_new(
908                Arc::downgrade(&epoch_store),
909                Box::new(consensus_adapter.clone()),
910                iota_network::randomness::Handle::new_stub(),
911                validator.authority_key_pair(),
912            )
913            .await
914            .unwrap();
915
916            epoch_stores.push(epoch_store);
917            randomness_managers.push(randomness_manager);
918        }
919
920        // Generate and distribute Messages.
921        let mut dkg_messages = Vec::new();
922        for randomness_manager in randomness_managers.iter_mut() {
923            randomness_manager.start_dkg().await.unwrap();
924
925            let mut dkg_message = rx_consensus.recv().await.unwrap();
926            assert!(dkg_message.len() == 1);
927            match dkg_message.remove(0).kind {
928                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
929                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
930                        .expect("DKG message deserialization should not fail");
931                    dkg_messages.push(msg);
932                }
933                _ => panic!("wrong type of message sent"),
934            }
935        }
936        for i in 0..randomness_managers.len() {
937            let mut output = ConsensusCommitOutput::new();
938            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
939                index: ExecutionIndices {
940                    last_committed_round: 0,
941                    ..Default::default()
942                },
943                ..Default::default()
944            });
945            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
946                randomness_managers[i]
947                    .add_message(&epoch_stores[j].name, dkg_message)
948                    .unwrap();
949            }
950            randomness_managers[i]
951                .advance_dkg(&mut output, 0)
952                .await
953                .unwrap();
954            let mut batch = epoch_stores[i].db_batch_for_test();
955            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
956            batch.write().unwrap();
957        }
958
959        // Generate and distribute Confirmations.
960        let mut dkg_confirmations = Vec::new();
961        for _ in 0..randomness_managers.len() {
962            let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
963            assert!(dkg_confirmation.len() == 1);
964            match dkg_confirmation.remove(0).kind {
965                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
966                    let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
967                        .expect("DKG message deserialization should not fail");
968                    dkg_confirmations.push(msg);
969                }
970                _ => panic!("wrong type of message sent"),
971            }
972        }
973        for i in 0..randomness_managers.len() {
974            let mut output = ConsensusCommitOutput::new();
975            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
976                index: ExecutionIndices {
977                    last_committed_round: 1,
978                    ..Default::default()
979                },
980                ..Default::default()
981            });
982            for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
983                randomness_managers[i]
984                    .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
985                    .unwrap();
986            }
987            randomness_managers[i]
988                .advance_dkg(&mut output, 0)
989                .await
990                .unwrap();
991            let mut batch = epoch_stores[i].db_batch_for_test();
992            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
993            batch.write().unwrap();
994        }
995
996        // Verify DKG completed.
997        for randomness_manager in &randomness_managers {
998            assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
999        }
1000    }
1001
1002    #[tokio::test]
1003    async fn test_dkg_expiration_v1() {
1004        test_dkg_expiration(1).await;
1005    }
1006
1007    async fn test_dkg_expiration(version: u64) {
1008        telemetry_subscribers::init_for_testing();
1009
1010        let network_config =
1011            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1012                .committee_size(NonZeroUsize::new(4).unwrap())
1013                .with_reference_gas_price(500)
1014                .build();
1015
1016        let mut epoch_stores = Vec::new();
1017        let mut randomness_managers = Vec::new();
1018        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1019
1020        let mut protocol_config =
1021            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1022        protocol_config.set_random_beacon_dkg_version_for_testing(version);
1023
1024        for validator in network_config.validator_configs.iter() {
1025            // Send consensus messages to channel.
1026            let mut mock_consensus_client = MockConsensusClient::new();
1027            let tx_consensus = tx_consensus.clone();
1028            mock_consensus_client
1029                .expect_submit()
1030                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1031                    tx_consensus.try_send(transactions.to_vec()).unwrap();
1032                    true
1033                })
1034                .returning(|_, _| {
1035                    Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
1036                        BlockRef::MIN,
1037                    )))
1038                });
1039
1040            let state = TestAuthorityBuilder::new()
1041                .with_protocol_config(protocol_config.clone())
1042                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1043                .build()
1044                .await;
1045            let consensus_adapter = Arc::new(ConsensusAdapter::new(
1046                Arc::new(mock_consensus_client),
1047                CheckpointStore::new_for_tests(),
1048                state.name,
1049                Arc::new(ConnectionMonitorStatusForTests {}),
1050                100_000,
1051                100_000,
1052                None,
1053                None,
1054                ConsensusAdapterMetrics::new_test(),
1055            ));
1056            let epoch_store = state.epoch_store_for_testing();
1057            let randomness_manager = RandomnessManager::try_new(
1058                Arc::downgrade(&epoch_store),
1059                Box::new(consensus_adapter.clone()),
1060                iota_network::randomness::Handle::new_stub(),
1061                validator.authority_key_pair(),
1062            )
1063            .await
1064            .unwrap();
1065
1066            epoch_stores.push(epoch_store);
1067            randomness_managers.push(randomness_manager);
1068        }
1069
1070        // Generate and distribute Messages.
1071        let mut dkg_messages = Vec::new();
1072        for randomness_manager in randomness_managers.iter_mut() {
1073            randomness_manager.start_dkg().await.unwrap();
1074
1075            let mut dkg_message = rx_consensus.recv().await.unwrap();
1076            assert!(dkg_message.len() == 1);
1077            match dkg_message.remove(0).kind {
1078                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1079                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1080                        .expect("DKG message deserialization should not fail");
1081                    dkg_messages.push(msg);
1082                }
1083                _ => panic!("wrong type of message sent"),
1084            }
1085        }
1086        for i in 0..randomness_managers.len() {
1087            let mut output = ConsensusCommitOutput::new();
1088            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1089                index: ExecutionIndices {
1090                    last_committed_round: 0,
1091                    ..Default::default()
1092                },
1093                ..Default::default()
1094            });
1095            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1096                randomness_managers[i]
1097                    .add_message(&epoch_stores[j].name, dkg_message)
1098                    .unwrap();
1099            }
1100            randomness_managers[i]
1101                .advance_dkg(&mut output, u64::MAX)
1102                .await
1103                .unwrap();
1104            let mut batch = epoch_stores[i].db_batch_for_test();
1105            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1106            batch.write().unwrap();
1107        }
1108
1109        // Verify DKG failed.
1110        for randomness_manager in &randomness_managers {
1111            assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1112        }
1113    }
1114}