iota_core/epoch/
randomness.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    sync::{Arc, Weak},
8    time::Instant,
9};
10
11use anemo::PeerId;
12use fastcrypto::{
13    encoding::{Encoding, Hex},
14    error::{FastCryptoError, FastCryptoResult},
15    groups::bls12381,
16    serde_helpers::ToFromByteArray,
17    traits::{KeyPair, ToFromBytes},
18};
19use fastcrypto_tbls::{dkg, dkg::Output, dkg_v1, nodes, nodes::PartyId};
20use futures::{StreamExt, stream::FuturesUnordered};
21use iota_macros::fail_point_if;
22use iota_network::randomness;
23use iota_types::{
24    base_types::{AuthorityName, CommitRound},
25    committee::{Committee, EpochId, StakeUnit},
26    crypto::{AuthorityKeyPair, RandomnessRound},
27    error::{IotaError, IotaResult},
28    iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
29    messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
30};
31use parking_lot::Mutex;
32use rand::{
33    SeedableRng,
34    rngs::{OsRng, StdRng},
35};
36use serde::{Deserialize, Serialize};
37use tokio::{sync::OnceCell, task::JoinHandle};
38use tracing::{debug, error, info, warn};
39use typed_store::Map;
40
41use crate::{
42    authority::{
43        authority_per_epoch_store::{AuthorityPerEpochStore, ConsensusCommitOutput},
44        epoch_start_configuration::EpochStartConfigTrait,
45    },
46    consensus_adapter::SubmitToConsensus,
47};
48
49/// The epoch UNIX timestamp in milliseconds
50pub type CommitTimestampMs = u64;
51
52type PkG = bls12381::G2Element;
53type EncG = bls12381::G2Element;
54
55pub const SINGLETON_KEY: u64 = 0;
56
57// Wrappers for DKG messages (to simplify upgrades).
58
59#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
60pub enum VersionedProcessedMessage {
61    V1(dkg_v1::ProcessedMessage<PkG, EncG>),
62}
63
64impl VersionedProcessedMessage {
65    pub fn sender(&self) -> PartyId {
66        match self {
67            VersionedProcessedMessage::V1(msg) => msg.message.sender,
68        }
69    }
70
71    pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
72        match self {
73            VersionedProcessedMessage::V1(msg) => msg,
74        }
75    }
76
77    pub fn process(
78        party: Arc<dkg::Party<PkG, EncG>>,
79        message: VersionedDkgMessage,
80    ) -> FastCryptoResult<VersionedProcessedMessage> {
81        // All inputs are verified in add_message, so we can assume they are of the
82        // correct version.
83        let processed = party.process_message_v1(message.unwrap_v1(), &mut rand::thread_rng())?;
84        Ok(VersionedProcessedMessage::V1(processed))
85    }
86
87    pub fn merge(
88        party: Arc<dkg::Party<PkG, EncG>>,
89        messages: Vec<Self>,
90    ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
91        // All inputs were created by this validator, so we can assume they are of the
92        // correct version.
93        let (conf, msgs) = party.merge_v1(
94            &messages
95                .into_iter()
96                .map(|vm| vm.unwrap_v1())
97                .collect::<Vec<_>>(),
98        )?;
99        Ok((
100            VersionedDkgConfirmation::V1(conf),
101            VersionedUsedProcessedMessages::V1(msgs),
102        ))
103    }
104}
105
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub enum VersionedUsedProcessedMessages {
108    V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
109}
110
111impl VersionedUsedProcessedMessages {
112    fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
113        &self,
114        party: Arc<dkg::Party<PkG, EncG>>,
115        confirmations: Iter,
116    ) -> FastCryptoResult<Output<PkG, EncG>> {
117        // All inputs are verified in add_confirmation, so we can assume they are of the
118        // correct version.
119        let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
120        let VersionedUsedProcessedMessages::V1(msg) = self;
121        party.complete_v1(
122            msg,
123            &confirmations
124                .map(|vm| vm.unwrap_v1())
125                .cloned()
126                .collect::<Vec<_>>(),
127            rng,
128        )
129    }
130}
131
132// State machine for randomness DKG and generation.
133//
134// DKG protocol:
135// 1. This validator sends out a `Message` to all other validators.
136// 2. Once sufficient valid `Message`s are received from other validators via
137//    consensus and processed, this validator sends out a `Confirmation` to all
138//    other validators.
139// 3. Once sufficient `Confirmation`s are received from other validators via
140//    consensus and processed, they are combined to form a public VSS key and
141//    local private key shares.
142// 4. Randomness generation begins.
143//
144// Randomness generation:
145// 1. For each new round, AuthorityPerEpochStore eventually calls
146//    `generate_randomness`.
147// 2. This kicks off a process in RandomnessEventLoop to send partial signatures
148//    for the new round to all other validators.
149// 3. Once enough partial signatures for the round are collected, a
150//    RandomnessStateUpdate transaction is generated and injected into the
151//    TransactionManager.
152// 4. Once the RandomnessStateUpdate transaction is seen in a certified
153//    checkpoint, `notify_randomness_in_checkpoint` is called to complete the
154//    round and stop sending partial signatures for it.
155pub struct RandomnessManager {
156    epoch_store: Weak<AuthorityPerEpochStore>,
157    epoch: EpochId,
158    consensus_adapter: Box<dyn SubmitToConsensus>,
159    network_handle: randomness::Handle,
160    authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
161
162    // State for DKG.
163    dkg_start_time: OnceCell<Instant>,
164    party: Arc<dkg::Party<PkG, EncG>>,
165    enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
166    processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
167    used_messages: OnceCell<VersionedUsedProcessedMessages>,
168    confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
169    dkg_output: OnceCell<Option<dkg::Output<PkG, EncG>>>,
170
171    // State for randomness generation.
172    next_randomness_round: RandomnessRound,
173    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
174}
175
176impl RandomnessManager {
177    // Returns None in case of invalid input or other failure to initialize DKG.
178    pub async fn try_new(
179        epoch_store_weak: Weak<AuthorityPerEpochStore>,
180        consensus_adapter: Box<dyn SubmitToConsensus>,
181        network_handle: randomness::Handle,
182        authority_key_pair: &AuthorityKeyPair,
183    ) -> Option<Self> {
184        let epoch_store = match epoch_store_weak.upgrade() {
185            Some(epoch_store) => epoch_store,
186            None => {
187                error!(
188                    "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
189                );
190                return None;
191            }
192        };
193        let tables = match epoch_store.tables() {
194            Ok(tables) => tables,
195            Err(_) => {
196                error!(
197                    "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
198                );
199                return None;
200            }
201        };
202        let protocol_config = epoch_store.protocol_config();
203
204        let name: AuthorityName = authority_key_pair.public().into();
205        let committee = epoch_store.committee();
206        let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
207        if tracing::enabled!(tracing::Level::DEBUG) {
208            // Log first few entries in DKG info for debugging.
209            for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
210                let pk_bytes = pk.as_element().to_byte_array();
211                debug!(
212                    "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
213                );
214            }
215        }
216        let authority_ids: HashMap<_, _> =
217            info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
218        let authority_peer_ids = epoch_store
219            .epoch_start_config()
220            .epoch_start_state()
221            .get_authority_names_to_peer_ids();
222        let authority_info = authority_ids
223            .into_iter()
224            .map(|(name, id)| {
225                let peer_id = *authority_peer_ids
226                    .get(&name)
227                    .expect("authority name should be in peer_ids");
228                (name, (peer_id, id))
229            })
230            .collect();
231        let nodes = info
232            .iter()
233            .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
234                id: *id,
235                pk: pk.clone(),
236                weight: (*stake).try_into().expect("stake should fit in u16"),
237            })
238            .collect();
239        let (nodes, t) = match nodes::Nodes::new_reduced(
240            nodes,
241            committee
242                .validity_threshold()
243                .try_into()
244                .expect("validity threshold should fit in u16"),
245            protocol_config.random_beacon_reduction_allowed_delta(),
246            protocol_config
247                .random_beacon_reduction_lower_bound()
248                .try_into()
249                .expect("should fit u16"),
250        ) {
251            Ok((nodes, t)) => (nodes, t),
252            Err(err) => {
253                error!("random beacon: error while initializing Nodes: {err:?}");
254                return None;
255            }
256        };
257        let total_weight = nodes.total_weight();
258        let num_nodes = nodes.num_nodes();
259        let prefix_str = format!(
260            "dkg {} {}",
261            Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
262            committee.epoch()
263        );
264        let randomness_private_key = bls12381::Scalar::from_byte_array(
265            authority_key_pair
266                .copy()
267                .private()
268                .as_bytes()
269                .try_into()
270                .expect("key length should match"),
271        )
272        .expect("should work to convert BLS key to Scalar");
273        let party = match dkg::Party::<PkG, EncG>::new(
274            fastcrypto_tbls::ecies::PrivateKey::<bls12381::G2Element>::from(randomness_private_key),
275            nodes,
276            t,
277            fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
278            &mut rand::thread_rng(),
279        ) {
280            Ok(party) => party,
281            Err(err) => {
282                error!("random beacon: error while initializing Party: {err:?}");
283                return None;
284            }
285        };
286        info!(
287            "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
288        );
289
290        // Load existing data from store.
291        let highest_completed_round = tables
292            .randomness_highest_completed_round
293            .get(&SINGLETON_KEY)
294            .expect("typed_store should not fail");
295        let mut rm = RandomnessManager {
296            epoch_store: epoch_store_weak,
297            epoch: committee.epoch(),
298            consensus_adapter,
299            network_handle: network_handle.clone(),
300            authority_info,
301            dkg_start_time: OnceCell::new(),
302            party: Arc::new(party),
303            enqueued_messages: BTreeMap::new(),
304            processed_messages: BTreeMap::new(),
305            used_messages: OnceCell::new(),
306            confirmations: BTreeMap::new(),
307            dkg_output: OnceCell::new(),
308            next_randomness_round: RandomnessRound(0),
309            highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
310        };
311        let dkg_output = tables
312            .dkg_output
313            .get(&SINGLETON_KEY)
314            .expect("typed_store should not fail");
315        if let Some(dkg_output) = dkg_output {
316            info!(
317                "random beacon: loaded existing DKG output for epoch {}",
318                committee.epoch()
319            );
320            epoch_store
321                .metrics
322                .epoch_random_beacon_dkg_num_shares
323                .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
324            rm.dkg_output
325                .set(Some(dkg_output.clone()))
326                .expect("setting new OnceCell should succeed");
327            network_handle.update_epoch(
328                committee.epoch(),
329                rm.authority_info.clone(),
330                dkg_output,
331                rm.party.t(),
332                highest_completed_round,
333            );
334        } else {
335            info!(
336                "random beacon: no existing DKG output found for epoch {}",
337                committee.epoch()
338            );
339
340            // Load intermediate data.
341            assert!(
342                epoch_store.protocol_config().dkg_version() > 0,
343                "BUG: DKG version 0 is deprecated"
344            );
345            rm.processed_messages.extend(
346                tables
347                    .dkg_processed_messages
348                    .safe_iter()
349                    .map(|result| result.expect("typed_store should not fail")),
350            );
351            if let Some(used_messages) = tables
352                .dkg_used_messages
353                .get(&SINGLETON_KEY)
354                .expect("typed_store should not fail")
355            {
356                rm.used_messages
357                    .set(used_messages.clone())
358                    .expect("setting new OnceCell should succeed");
359            }
360            rm.confirmations.extend(
361                tables
362                    .dkg_confirmations
363                    .safe_iter()
364                    .map(|result| result.expect("typed_store should not fail")),
365            );
366        }
367
368        // Resume randomness generation from where we left off.
369        // This must be loaded regardless of whether DKG has finished yet, since the
370        // RandomnessEventLoop and commit-handling logic in AuthorityPerEpochStore both
371        // depend on this state.
372        rm.next_randomness_round = tables
373            .randomness_next_round
374            .get(&SINGLETON_KEY)
375            .expect("typed_store should not fail")
376            .unwrap_or(RandomnessRound(0));
377        info!(
378            "random beacon: starting from next_randomness_round={}",
379            rm.next_randomness_round.0
380        );
381        let first_incomplete_round = highest_completed_round
382            .map(|r| r + 1)
383            .unwrap_or(RandomnessRound(0));
384        if first_incomplete_round < rm.next_randomness_round {
385            info!(
386                "random beacon: resuming generation for randomness rounds from {} to {}",
387                first_incomplete_round,
388                rm.next_randomness_round - 1,
389            );
390            for r in first_incomplete_round.0..rm.next_randomness_round.0 {
391                network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
392            }
393        }
394
395        Some(rm)
396    }
397
398    /// Sends the initial dkg::Message to begin the randomness DKG protocol.
399    pub async fn start_dkg(&mut self) -> IotaResult {
400        if self.used_messages.initialized() || self.dkg_output.initialized() {
401            // DKG already started (or completed or failed).
402            return Ok(());
403        }
404
405        let _ = self.dkg_start_time.set(Instant::now());
406
407        let epoch_store = self.epoch_store()?;
408        let dkg_version = epoch_store.protocol_config().dkg_version();
409        info!("random beacon: starting DKG, version {dkg_version}");
410
411        let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
412            Ok(msg) => msg,
413            Err(FastCryptoError::IgnoredMessage) => {
414                info!(
415                    "random beacon: no DKG Message for party id={} (zero weight)",
416                    self.party.id
417                );
418                return Ok(());
419            }
420            Err(e) => {
421                error!("random beacon: error while creating a DKG Message: {e:?}");
422                return Ok(());
423            }
424        };
425
426        info!("random beacon: created {msg:?} with dkg version {dkg_version}");
427        let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
428
429        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
430        let mut fail_point_skip_sending = false;
431        fail_point_if!("rb-dkg", || {
432            // maybe skip sending in simtests
433            fail_point_skip_sending = true;
434        });
435        if !fail_point_skip_sending {
436            self.consensus_adapter
437                .submit_to_consensus(&[transaction], &epoch_store)
438                .await?;
439        }
440
441        epoch_store
442            .metrics
443            .epoch_random_beacon_dkg_message_time_ms
444            .set(
445                self.dkg_start_time
446                    .get()
447                    .unwrap() // already set above
448                    .elapsed()
449                    .as_millis() as i64,
450            );
451        Ok(())
452    }
453
454    /// Processes all received messages and advances the randomness DKG state
455    /// machine when possible, sending out a dkg::Confirmation and
456    /// generating final output.
457    pub(crate) async fn advance_dkg(
458        &mut self,
459        consensus_output: &mut ConsensusCommitOutput,
460        round: CommitRound,
461    ) -> IotaResult {
462        let epoch_store = self.epoch_store()?;
463
464        // Once we have enough Messages, send a Confirmation.
465        if !self.dkg_output.initialized() && !self.used_messages.initialized() {
466            // Process all enqueued messages.
467            let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
468                .into_values()
469                .collect();
470            while let Some(res) = handles.next().await {
471                if let Ok(Some(processed)) = res {
472                    self.processed_messages
473                        .insert(processed.sender(), processed.clone());
474                    consensus_output.insert_dkg_processed_message(processed);
475                }
476            }
477
478            // Attempt to generate the Confirmation.
479            match VersionedProcessedMessage::merge(
480                self.party.clone(),
481                self.processed_messages
482                    .values()
483                    .cloned()
484                    .collect::<Vec<_>>(),
485            ) {
486                Ok((conf, used_msgs)) => {
487                    info!(
488                        "random beacon: sending DKG Confirmation with {} complaints",
489                        conf.num_of_complaints()
490                    );
491                    if self.used_messages.set(used_msgs.clone()).is_err() {
492                        error!("BUG: used_messages should only ever be set once");
493                    }
494                    consensus_output.insert_dkg_used_messages(used_msgs);
495
496                    let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
497                        epoch_store.name,
498                        &conf,
499                    );
500
501                    #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
502                    let mut fail_point_skip_sending = false;
503                    fail_point_if!("rb-dkg", || {
504                        // maybe skip sending in simtests
505                        fail_point_skip_sending = true;
506                    });
507                    if !fail_point_skip_sending {
508                        self.consensus_adapter
509                            .submit_to_consensus(&[transaction], &epoch_store)
510                            .await?;
511                    }
512
513                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
514                    if let Some(elapsed) = elapsed {
515                        epoch_store
516                            .metrics
517                            .epoch_random_beacon_dkg_confirmation_time_ms
518                            .set(elapsed as i64);
519                    }
520                }
521                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
522                Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
523            }
524        }
525
526        // Once we have enough Confirmations, process them and update shares.
527        if !self.dkg_output.initialized() && self.used_messages.initialized() {
528            match self
529                .used_messages
530                .get()
531                .expect("checked above that `used_messages` is initialized")
532                .complete_dkg(self.party.clone(), self.confirmations.values())
533            {
534                Ok(output) => {
535                    let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
536                    let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
537                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
538                    info!(
539                        "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
540                    );
541                    epoch_store
542                        .metrics
543                        .epoch_random_beacon_dkg_num_shares
544                        .set(num_shares as i64);
545                    epoch_store
546                        .metrics
547                        .epoch_random_beacon_dkg_epoch_start_completion_time_ms
548                        .set(epoch_elapsed as i64);
549                    epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
550                    if let Some(elapsed) = elapsed {
551                        epoch_store
552                            .metrics
553                            .epoch_random_beacon_dkg_completion_time_ms
554                            .set(elapsed as i64);
555                    }
556                    self.dkg_output
557                        .set(Some(output.clone()))
558                        .expect("checked above that `dkg_output` is uninitialized");
559                    self.network_handle.update_epoch(
560                        epoch_store.committee().epoch(),
561                        self.authority_info.clone(),
562                        output.clone(),
563                        self.party.t(),
564                        None,
565                    );
566                    consensus_output.set_dkg_output(output);
567                }
568                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
569                Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
570            }
571        }
572
573        // If we ran out of time, mark DKG as failed.
574        if !self.dkg_output.initialized()
575            && round
576                > epoch_store
577                    .protocol_config()
578                    .random_beacon_dkg_timeout_round()
579                    .into()
580        {
581            error!(
582                "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
583            );
584            epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
585            self.dkg_output
586                .set(None)
587                .expect("checked above that `dkg_output` is uninitialized");
588        }
589
590        Ok(())
591    }
592
593    /// Adds a received VersionedDkgMessage to the randomness DKG state machine.
594    pub fn add_message(
595        &mut self,
596        authority: &AuthorityName,
597        msg: VersionedDkgMessage,
598    ) -> IotaResult {
599        // message was received from other validators, so we need to ensure it uses a
600        // supported version before we call other functions that assume the
601        // version is correct
602        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
603        if !msg.is_valid_version(dkg_version) {
604            warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
605            return Ok(());
606        }
607
608        if self.used_messages.initialized() || self.dkg_output.initialized() {
609            // We've already sent a `Confirmation`, so we can't add any more messages.
610            return Ok(());
611        }
612        let Some((_, party_id)) = self.authority_info.get(authority) else {
613            error!("random beacon: received DKG Message from unknown authority: {authority:?}");
614            return Ok(());
615        };
616        if *party_id != msg.sender() {
617            warn!(
618                "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
619            );
620            return Ok(());
621        }
622        if self.enqueued_messages.contains_key(&msg.sender())
623            || self.processed_messages.contains_key(&msg.sender())
624        {
625            info!("ignoring duplicate DKG Message from authority {authority:?}");
626            return Ok(());
627        }
628
629        let party = self.party.clone();
630        // TODO: Could save some CPU by not processing messages if we already have
631        // enough to merge.
632        self.enqueued_messages.insert(
633            msg.sender(),
634            tokio::task::spawn_blocking(move || {
635                match VersionedProcessedMessage::process(party, msg) {
636                    Ok(processed) => Some(processed),
637                    Err(err) => {
638                        debug!("random beacon: error while processing DKG Message: {err:?}");
639                        None
640                    }
641                }
642            }),
643        );
644        Ok(())
645    }
646
647    /// Adds a received dkg::Confirmation to the randomness DKG state machine.
648    pub(crate) fn add_confirmation(
649        &mut self,
650        output: &mut ConsensusCommitOutput,
651        authority: &AuthorityName,
652        conf: VersionedDkgConfirmation,
653    ) -> IotaResult {
654        // confirmation was received from other validators, so we need to ensure it uses
655        // a supported version before we call other functions that assume the
656        // version is correct
657        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
658        if !conf.is_valid_version(dkg_version) {
659            warn!(
660                "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
661            );
662            return Ok(());
663        }
664
665        if self.dkg_output.initialized() {
666            // Once we have completed DKG, no more `Confirmation`s are needed.
667            return Ok(());
668        }
669        let Some((_, party_id)) = self.authority_info.get(authority) else {
670            error!(
671                "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
672            );
673            return Ok(());
674        };
675        if *party_id != conf.sender() {
676            warn!(
677                "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
678            );
679            return Ok(());
680        }
681        self.confirmations.insert(conf.sender(), conf.clone());
682        output.insert_dkg_confirmation(conf);
683        Ok(())
684    }
685
686    /// Reserves the next available round number for randomness generation if
687    /// enough time has elapsed, or returns None if not yet ready (based on
688    /// ProtocolConfig setting). Once the given batch is written,
689    /// `generate_randomness` must be called to start the process. On restart,
690    /// any reserved rounds for which the batch was written will automatically
691    /// be resumed.
692    pub(crate) fn reserve_next_randomness(
693        &mut self,
694        commit_timestamp: CommitTimestampMs,
695        output: &mut ConsensusCommitOutput,
696    ) -> IotaResult<Option<RandomnessRound>> {
697        let epoch_store = self.epoch_store()?;
698        let tables = epoch_store.tables()?;
699
700        let last_round_timestamp = tables
701            .randomness_last_round_timestamp
702            .get(&SINGLETON_KEY)
703            .expect("typed_store should not fail");
704        if let Some(last_round_timestamp) = last_round_timestamp {
705            if commit_timestamp - last_round_timestamp
706                < epoch_store
707                    .protocol_config()
708                    .random_beacon_min_round_interval_ms()
709            {
710                return Ok(None);
711            }
712        }
713
714        let randomness_round = self.next_randomness_round;
715        self.next_randomness_round = self
716            .next_randomness_round
717            .checked_add(1)
718            .expect("RandomnessRound should not overflow");
719
720        output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
721
722        Ok(Some(randomness_round))
723    }
724
725    /// Starts the process of generating the given RandomnessRound.
726    pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
727        self.network_handle
728            .send_partial_signatures(epoch, randomness_round);
729    }
730
731    pub fn dkg_status(&self) -> DkgStatus {
732        match self.dkg_output.get() {
733            Some(Some(_)) => DkgStatus::Successful,
734            Some(None) => DkgStatus::Failed,
735            None => DkgStatus::Pending,
736        }
737    }
738
739    /// Generates a new RandomnessReporter for reporting observed rounds to this
740    /// RandomnessManager.
741    pub fn reporter(&self) -> RandomnessReporter {
742        RandomnessReporter {
743            epoch_store: self.epoch_store.clone(),
744            epoch: self.epoch,
745            network_handle: self.network_handle.clone(),
746            highest_completed_round: self.highest_completed_round.clone(),
747        }
748    }
749
750    fn epoch_store(&self) -> IotaResult<Arc<AuthorityPerEpochStore>> {
751        self.epoch_store
752            .upgrade()
753            .ok_or(IotaError::EpochEnded(self.epoch))
754    }
755
756    fn randomness_dkg_info_from_committee(
757        committee: &Committee,
758    ) -> Vec<(
759        u16,
760        AuthorityName,
761        fastcrypto_tbls::ecies::PublicKey<bls12381::G2Element>,
762        StakeUnit,
763    )> {
764        committee
765            .members()
766            .map(|(name, stake)| {
767                let index: u16 = committee
768                    .authority_index(name)
769                    .expect("lookup of known committee member should succeed")
770                    .try_into()
771                    .expect("authority index should fit in u16");
772                let pk = bls12381::G2Element::from_byte_array(
773                    committee
774                        .public_key(name)
775                        .expect("lookup of known committee member should succeed")
776                        .as_bytes()
777                        .try_into()
778                        .expect("key length should match"),
779                )
780                .expect("should work to convert BLS key to G2Element");
781                (
782                    index,
783                    *name,
784                    fastcrypto_tbls::ecies::PublicKey::from(pk),
785                    *stake,
786                )
787            })
788            .collect()
789    }
790}
791
792// Used by other components to notify the randomness system of observed
793// randomness.
794#[derive(Clone)]
795pub struct RandomnessReporter {
796    epoch_store: Weak<AuthorityPerEpochStore>,
797    epoch: EpochId,
798    network_handle: randomness::Handle,
799    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
800}
801
802impl RandomnessReporter {
803    /// Notifies the associated randomness manager that randomness for the given
804    /// round has been durably committed in a checkpoint. This completes the
805    /// process of generating randomness for the round.
806    pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> IotaResult {
807        let epoch_store = self
808            .epoch_store
809            .upgrade()
810            .ok_or(IotaError::EpochEnded(self.epoch))?;
811        let mut highest_completed_round = self.highest_completed_round.lock();
812        if Some(round) > *highest_completed_round {
813            *highest_completed_round = Some(round);
814            epoch_store
815                .tables()?
816                .randomness_highest_completed_round
817                .insert(&SINGLETON_KEY, &round)?;
818            self.network_handle
819                .complete_round(epoch_store.committee().epoch(), round);
820        }
821        Ok(())
822    }
823}
824
825#[derive(Debug, Clone, Copy, PartialEq, Eq)]
826pub enum DkgStatus {
827    Pending,
828    Failed,
829    Successful,
830}
831
832#[cfg(test)]
833mod tests {
834    use std::num::NonZeroUsize;
835
836    use consensus_core::{BlockRef, BlockStatus};
837    use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
838    use iota_types::messages_consensus::ConsensusTransactionKind;
839    use tokio::sync::mpsc;
840
841    use crate::{
842        authority::test_authority_builder::TestAuthorityBuilder,
843        consensus_adapter::{
844            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
845            MockConsensusClient,
846        },
847        epoch::randomness::*,
848        mock_consensus::with_block_status,
849    };
850
851    #[tokio::test]
852    async fn test_dkg_v1() {
853        test_dkg(1).await;
854    }
855
856    async fn test_dkg(version: u64) {
857        telemetry_subscribers::init_for_testing();
858
859        let network_config =
860            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
861                .committee_size(NonZeroUsize::new(4).unwrap())
862                .with_reference_gas_price(500)
863                .build();
864
865        let mut protocol_config =
866            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
867        protocol_config.set_random_beacon_dkg_version_for_testing(version);
868
869        let mut epoch_stores = Vec::new();
870        let mut randomness_managers = Vec::new();
871        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
872
873        for validator in network_config.validator_configs.iter() {
874            // Send consensus messages to channel.
875            let mut mock_consensus_client = MockConsensusClient::new();
876            let tx_consensus = tx_consensus.clone();
877            mock_consensus_client
878                .expect_submit()
879                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
880                    tx_consensus.try_send(transactions.to_vec()).unwrap();
881                    true
882                })
883                .returning(|_, _| Ok(with_block_status(BlockStatus::Sequenced(BlockRef::MIN))));
884
885            let state = TestAuthorityBuilder::new()
886                .with_protocol_config(protocol_config.clone())
887                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
888                .build()
889                .await;
890            let consensus_adapter = Arc::new(ConsensusAdapter::new(
891                Arc::new(mock_consensus_client),
892                state.name,
893                Arc::new(ConnectionMonitorStatusForTests {}),
894                100_000,
895                100_000,
896                None,
897                None,
898                ConsensusAdapterMetrics::new_test(),
899            ));
900            let epoch_store = state.epoch_store_for_testing();
901            let randomness_manager = RandomnessManager::try_new(
902                Arc::downgrade(&epoch_store),
903                Box::new(consensus_adapter.clone()),
904                iota_network::randomness::Handle::new_stub(),
905                validator.authority_key_pair(),
906            )
907            .await
908            .unwrap();
909
910            epoch_stores.push(epoch_store);
911            randomness_managers.push(randomness_manager);
912        }
913
914        // Generate and distribute Messages.
915        let mut dkg_messages = Vec::new();
916        for randomness_manager in randomness_managers.iter_mut() {
917            randomness_manager.start_dkg().await.unwrap();
918
919            let mut dkg_message = rx_consensus.recv().await.unwrap();
920            assert!(dkg_message.len() == 1);
921            match dkg_message.remove(0).kind {
922                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
923                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
924                        .expect("DKG message deserialization should not fail");
925                    dkg_messages.push(msg);
926                }
927                _ => panic!("wrong type of message sent"),
928            }
929        }
930        for i in 0..randomness_managers.len() {
931            let mut output = ConsensusCommitOutput::new();
932            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
933                randomness_managers[i]
934                    .add_message(&epoch_stores[j].name, dkg_message)
935                    .unwrap();
936            }
937            randomness_managers[i]
938                .advance_dkg(&mut output, 0)
939                .await
940                .unwrap();
941            let mut batch = epoch_stores[i].db_batch_for_test();
942            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
943            batch.write().unwrap();
944        }
945
946        // Generate and distribute Confirmations.
947        let mut dkg_confirmations = Vec::new();
948        for _ in 0..randomness_managers.len() {
949            let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
950            assert!(dkg_confirmation.len() == 1);
951            match dkg_confirmation.remove(0).kind {
952                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
953                    let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
954                        .expect("DKG message deserialization should not fail");
955                    dkg_confirmations.push(msg);
956                }
957                _ => panic!("wrong type of message sent"),
958            }
959        }
960        for i in 0..randomness_managers.len() {
961            let mut output = ConsensusCommitOutput::new();
962            for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
963                randomness_managers[i]
964                    .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
965                    .unwrap();
966            }
967            randomness_managers[i]
968                .advance_dkg(&mut output, 0)
969                .await
970                .unwrap();
971            let mut batch = epoch_stores[i].db_batch_for_test();
972            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
973            batch.write().unwrap();
974        }
975
976        // Verify DKG completed.
977        for randomness_manager in &randomness_managers {
978            assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
979        }
980    }
981
982    #[tokio::test]
983    async fn test_dkg_expiration_v1() {
984        test_dkg_expiration(1).await;
985    }
986
987    async fn test_dkg_expiration(version: u64) {
988        telemetry_subscribers::init_for_testing();
989
990        let network_config =
991            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
992                .committee_size(NonZeroUsize::new(4).unwrap())
993                .with_reference_gas_price(500)
994                .build();
995
996        let mut epoch_stores = Vec::new();
997        let mut randomness_managers = Vec::new();
998        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
999
1000        let mut protocol_config =
1001            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1002        protocol_config.set_random_beacon_dkg_version_for_testing(version);
1003
1004        for validator in network_config.validator_configs.iter() {
1005            // Send consensus messages to channel.
1006            let mut mock_consensus_client = MockConsensusClient::new();
1007            let tx_consensus = tx_consensus.clone();
1008            mock_consensus_client
1009                .expect_submit()
1010                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1011                    tx_consensus.try_send(transactions.to_vec()).unwrap();
1012                    true
1013                })
1014                .returning(|_, _| {
1015                    Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
1016                        BlockRef::MIN,
1017                    )))
1018                });
1019
1020            let state = TestAuthorityBuilder::new()
1021                .with_protocol_config(protocol_config.clone())
1022                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1023                .build()
1024                .await;
1025            let consensus_adapter = Arc::new(ConsensusAdapter::new(
1026                Arc::new(mock_consensus_client),
1027                state.name,
1028                Arc::new(ConnectionMonitorStatusForTests {}),
1029                100_000,
1030                100_000,
1031                None,
1032                None,
1033                ConsensusAdapterMetrics::new_test(),
1034            ));
1035            let epoch_store = state.epoch_store_for_testing();
1036            let randomness_manager = RandomnessManager::try_new(
1037                Arc::downgrade(&epoch_store),
1038                Box::new(consensus_adapter.clone()),
1039                iota_network::randomness::Handle::new_stub(),
1040                validator.authority_key_pair(),
1041            )
1042            .await
1043            .unwrap();
1044
1045            epoch_stores.push(epoch_store);
1046            randomness_managers.push(randomness_manager);
1047        }
1048
1049        // Generate and distribute Messages.
1050        let mut dkg_messages = Vec::new();
1051        for randomness_manager in randomness_managers.iter_mut() {
1052            randomness_manager.start_dkg().await.unwrap();
1053
1054            let mut dkg_message = rx_consensus.recv().await.unwrap();
1055            assert!(dkg_message.len() == 1);
1056            match dkg_message.remove(0).kind {
1057                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1058                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1059                        .expect("DKG message deserialization should not fail");
1060                    dkg_messages.push(msg);
1061                }
1062                _ => panic!("wrong type of message sent"),
1063            }
1064        }
1065        for i in 0..randomness_managers.len() {
1066            let mut output = ConsensusCommitOutput::new();
1067            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1068                randomness_managers[i]
1069                    .add_message(&epoch_stores[j].name, dkg_message)
1070                    .unwrap();
1071            }
1072            randomness_managers[i]
1073                .advance_dkg(&mut output, u64::MAX)
1074                .await
1075                .unwrap();
1076            let mut batch = epoch_stores[i].db_batch_for_test();
1077            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1078            batch.write().unwrap();
1079        }
1080
1081        // Verify DKG failed.
1082        for randomness_manager in &randomness_managers {
1083            assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1084        }
1085    }
1086}