1use std::{
6 collections::{BTreeMap, HashMap},
7 sync::{Arc, Weak},
8 time::Instant,
9};
10
11use anemo::PeerId;
12use fastcrypto::{
13 encoding::{Encoding, Hex},
14 error::{FastCryptoError, FastCryptoResult},
15 groups::bls12381,
16 serde_helpers::ToFromByteArray,
17 traits::{KeyPair, ToFromBytes},
18};
19use fastcrypto_tbls::{dkg, dkg::Output, dkg_v1, nodes, nodes::PartyId};
20use futures::{StreamExt, stream::FuturesUnordered};
21use iota_macros::fail_point_if;
22use iota_network::randomness;
23use iota_types::{
24 base_types::{AuthorityName, CommitRound},
25 committee::{Committee, EpochId, StakeUnit},
26 crypto::{AuthorityKeyPair, RandomnessRound},
27 error::{IotaError, IotaResult},
28 iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
29 messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
30};
31use parking_lot::Mutex;
32use rand::{
33 SeedableRng,
34 rngs::{OsRng, StdRng},
35};
36use serde::{Deserialize, Serialize};
37use tokio::{sync::OnceCell, task::JoinHandle};
38use tracing::{debug, error, info, warn};
39use typed_store::Map;
40
41use crate::{
42 authority::{
43 authority_per_epoch_store::{AuthorityPerEpochStore, ConsensusCommitOutput},
44 epoch_start_configuration::EpochStartConfigTrait,
45 },
46 consensus_adapter::SubmitToConsensus,
47};
48
49pub type CommitTimestampMs = u64;
51
52type PkG = bls12381::G2Element;
53type EncG = bls12381::G2Element;
54
55pub const SINGLETON_KEY: u64 = 0;
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
60pub enum VersionedProcessedMessage {
61 V1(dkg_v1::ProcessedMessage<PkG, EncG>),
62}
63
64impl VersionedProcessedMessage {
65 pub fn sender(&self) -> PartyId {
66 match self {
67 VersionedProcessedMessage::V1(msg) => msg.message.sender,
68 }
69 }
70
71 pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
72 match self {
73 VersionedProcessedMessage::V1(msg) => msg,
74 }
75 }
76
77 pub fn process(
78 party: Arc<dkg::Party<PkG, EncG>>,
79 message: VersionedDkgMessage,
80 ) -> FastCryptoResult<VersionedProcessedMessage> {
81 let processed = party.process_message_v1(message.unwrap_v1(), &mut rand::thread_rng())?;
84 Ok(VersionedProcessedMessage::V1(processed))
85 }
86
87 pub fn merge(
88 party: Arc<dkg::Party<PkG, EncG>>,
89 messages: Vec<Self>,
90 ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
91 let (conf, msgs) = party.merge_v1(
94 &messages
95 .into_iter()
96 .map(|vm| vm.unwrap_v1())
97 .collect::<Vec<_>>(),
98 )?;
99 Ok((
100 VersionedDkgConfirmation::V1(conf),
101 VersionedUsedProcessedMessages::V1(msgs),
102 ))
103 }
104}
105
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub enum VersionedUsedProcessedMessages {
108 V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
109}
110
111impl VersionedUsedProcessedMessages {
112 fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
113 &self,
114 party: Arc<dkg::Party<PkG, EncG>>,
115 confirmations: Iter,
116 ) -> FastCryptoResult<Output<PkG, EncG>> {
117 let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
120 let VersionedUsedProcessedMessages::V1(msg) = self;
121 party.complete_v1(
122 msg,
123 &confirmations
124 .map(|vm| vm.unwrap_v1())
125 .cloned()
126 .collect::<Vec<_>>(),
127 rng,
128 )
129 }
130}
131
132pub struct RandomnessManager {
156 epoch_store: Weak<AuthorityPerEpochStore>,
157 epoch: EpochId,
158 consensus_adapter: Box<dyn SubmitToConsensus>,
159 network_handle: randomness::Handle,
160 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
161
162 dkg_start_time: OnceCell<Instant>,
164 party: Arc<dkg::Party<PkG, EncG>>,
165 enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
166 processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
167 used_messages: OnceCell<VersionedUsedProcessedMessages>,
168 confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
169 dkg_output: OnceCell<Option<dkg::Output<PkG, EncG>>>,
170
171 next_randomness_round: RandomnessRound,
173 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
174}
175
176impl RandomnessManager {
177 pub async fn try_new(
179 epoch_store_weak: Weak<AuthorityPerEpochStore>,
180 consensus_adapter: Box<dyn SubmitToConsensus>,
181 network_handle: randomness::Handle,
182 authority_key_pair: &AuthorityKeyPair,
183 ) -> Option<Self> {
184 let epoch_store = match epoch_store_weak.upgrade() {
185 Some(epoch_store) => epoch_store,
186 None => {
187 error!(
188 "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
189 );
190 return None;
191 }
192 };
193 let tables = match epoch_store.tables() {
194 Ok(tables) => tables,
195 Err(_) => {
196 error!(
197 "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
198 );
199 return None;
200 }
201 };
202 let protocol_config = epoch_store.protocol_config();
203
204 let name: AuthorityName = authority_key_pair.public().into();
205 let committee = epoch_store.committee();
206 let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
207 if tracing::enabled!(tracing::Level::DEBUG) {
208 for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
210 let pk_bytes = pk.as_element().to_byte_array();
211 debug!(
212 "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
213 );
214 }
215 }
216 let authority_ids: HashMap<_, _> =
217 info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
218 let authority_peer_ids = epoch_store
219 .epoch_start_config()
220 .epoch_start_state()
221 .get_authority_names_to_peer_ids();
222 let authority_info = authority_ids
223 .into_iter()
224 .map(|(name, id)| {
225 let peer_id = *authority_peer_ids
226 .get(&name)
227 .expect("authority name should be in peer_ids");
228 (name, (peer_id, id))
229 })
230 .collect();
231 let nodes = info
232 .iter()
233 .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
234 id: *id,
235 pk: pk.clone(),
236 weight: (*stake).try_into().expect("stake should fit in u16"),
237 })
238 .collect();
239 let (nodes, t) = match nodes::Nodes::new_reduced(
240 nodes,
241 committee
242 .validity_threshold()
243 .try_into()
244 .expect("validity threshold should fit in u16"),
245 protocol_config.random_beacon_reduction_allowed_delta(),
246 protocol_config
247 .random_beacon_reduction_lower_bound()
248 .try_into()
249 .expect("should fit u16"),
250 ) {
251 Ok((nodes, t)) => (nodes, t),
252 Err(err) => {
253 error!("random beacon: error while initializing Nodes: {err:?}");
254 return None;
255 }
256 };
257 let total_weight = nodes.total_weight();
258 let num_nodes = nodes.num_nodes();
259 let prefix_str = format!(
260 "dkg {} {}",
261 Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
262 committee.epoch()
263 );
264 let randomness_private_key = bls12381::Scalar::from_byte_array(
265 authority_key_pair
266 .copy()
267 .private()
268 .as_bytes()
269 .try_into()
270 .expect("key length should match"),
271 )
272 .expect("should work to convert BLS key to Scalar");
273 let party = match dkg::Party::<PkG, EncG>::new(
274 fastcrypto_tbls::ecies::PrivateKey::<bls12381::G2Element>::from(randomness_private_key),
275 nodes,
276 t,
277 fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
278 &mut rand::thread_rng(),
279 ) {
280 Ok(party) => party,
281 Err(err) => {
282 error!("random beacon: error while initializing Party: {err:?}");
283 return None;
284 }
285 };
286 info!(
287 "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
288 );
289
290 let highest_completed_round = tables
292 .randomness_highest_completed_round
293 .get(&SINGLETON_KEY)
294 .expect("typed_store should not fail");
295 let mut rm = RandomnessManager {
296 epoch_store: epoch_store_weak,
297 epoch: committee.epoch(),
298 consensus_adapter,
299 network_handle: network_handle.clone(),
300 authority_info,
301 dkg_start_time: OnceCell::new(),
302 party: Arc::new(party),
303 enqueued_messages: BTreeMap::new(),
304 processed_messages: BTreeMap::new(),
305 used_messages: OnceCell::new(),
306 confirmations: BTreeMap::new(),
307 dkg_output: OnceCell::new(),
308 next_randomness_round: RandomnessRound(0),
309 highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
310 };
311 let dkg_output = tables
312 .dkg_output
313 .get(&SINGLETON_KEY)
314 .expect("typed_store should not fail");
315 if let Some(dkg_output) = dkg_output {
316 info!(
317 "random beacon: loaded existing DKG output for epoch {}",
318 committee.epoch()
319 );
320 epoch_store
321 .metrics
322 .epoch_random_beacon_dkg_num_shares
323 .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
324 rm.dkg_output
325 .set(Some(dkg_output.clone()))
326 .expect("setting new OnceCell should succeed");
327 network_handle.update_epoch(
328 committee.epoch(),
329 rm.authority_info.clone(),
330 dkg_output,
331 rm.party.t(),
332 highest_completed_round,
333 );
334 } else {
335 info!(
336 "random beacon: no existing DKG output found for epoch {}",
337 committee.epoch()
338 );
339
340 assert!(
342 epoch_store.protocol_config().dkg_version() > 0,
343 "BUG: DKG version 0 is deprecated"
344 );
345 rm.processed_messages.extend(
346 tables
347 .dkg_processed_messages
348 .safe_iter()
349 .map(|result| result.expect("typed_store should not fail")),
350 );
351 if let Some(used_messages) = tables
352 .dkg_used_messages
353 .get(&SINGLETON_KEY)
354 .expect("typed_store should not fail")
355 {
356 rm.used_messages
357 .set(used_messages.clone())
358 .expect("setting new OnceCell should succeed");
359 }
360 rm.confirmations.extend(
361 tables
362 .dkg_confirmations
363 .safe_iter()
364 .map(|result| result.expect("typed_store should not fail")),
365 );
366 }
367
368 rm.next_randomness_round = tables
373 .randomness_next_round
374 .get(&SINGLETON_KEY)
375 .expect("typed_store should not fail")
376 .unwrap_or(RandomnessRound(0));
377 info!(
378 "random beacon: starting from next_randomness_round={}",
379 rm.next_randomness_round.0
380 );
381 let first_incomplete_round = highest_completed_round
382 .map(|r| r + 1)
383 .unwrap_or(RandomnessRound(0));
384 if first_incomplete_round < rm.next_randomness_round {
385 info!(
386 "random beacon: resuming generation for randomness rounds from {} to {}",
387 first_incomplete_round,
388 rm.next_randomness_round - 1,
389 );
390 for r in first_incomplete_round.0..rm.next_randomness_round.0 {
391 network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
392 }
393 }
394
395 Some(rm)
396 }
397
398 pub async fn start_dkg(&mut self) -> IotaResult {
400 if self.used_messages.initialized() || self.dkg_output.initialized() {
401 return Ok(());
403 }
404
405 let _ = self.dkg_start_time.set(Instant::now());
406
407 let epoch_store = self.epoch_store()?;
408 let dkg_version = epoch_store.protocol_config().dkg_version();
409 info!("random beacon: starting DKG, version {dkg_version}");
410
411 let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
412 Ok(msg) => msg,
413 Err(FastCryptoError::IgnoredMessage) => {
414 info!(
415 "random beacon: no DKG Message for party id={} (zero weight)",
416 self.party.id
417 );
418 return Ok(());
419 }
420 Err(e) => {
421 error!("random beacon: error while creating a DKG Message: {e:?}");
422 return Ok(());
423 }
424 };
425
426 info!("random beacon: created {msg:?} with dkg version {dkg_version}");
427 let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
428
429 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
430 let mut fail_point_skip_sending = false;
431 fail_point_if!("rb-dkg", || {
432 fail_point_skip_sending = true;
434 });
435 if !fail_point_skip_sending {
436 self.consensus_adapter
437 .submit_to_consensus(&[transaction], &epoch_store)
438 .await?;
439 }
440
441 epoch_store
442 .metrics
443 .epoch_random_beacon_dkg_message_time_ms
444 .set(
445 self.dkg_start_time
446 .get()
447 .unwrap() .elapsed()
449 .as_millis() as i64,
450 );
451 Ok(())
452 }
453
454 pub(crate) async fn advance_dkg(
458 &mut self,
459 consensus_output: &mut ConsensusCommitOutput,
460 round: CommitRound,
461 ) -> IotaResult {
462 let epoch_store = self.epoch_store()?;
463
464 if !self.dkg_output.initialized() && !self.used_messages.initialized() {
466 let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
468 .into_values()
469 .collect();
470 while let Some(res) = handles.next().await {
471 if let Ok(Some(processed)) = res {
472 self.processed_messages
473 .insert(processed.sender(), processed.clone());
474 consensus_output.insert_dkg_processed_message(processed);
475 }
476 }
477
478 match VersionedProcessedMessage::merge(
480 self.party.clone(),
481 self.processed_messages
482 .values()
483 .cloned()
484 .collect::<Vec<_>>(),
485 ) {
486 Ok((conf, used_msgs)) => {
487 info!(
488 "random beacon: sending DKG Confirmation with {} complaints",
489 conf.num_of_complaints()
490 );
491 if self.used_messages.set(used_msgs.clone()).is_err() {
492 error!("BUG: used_messages should only ever be set once");
493 }
494 consensus_output.insert_dkg_used_messages(used_msgs);
495
496 let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
497 epoch_store.name,
498 &conf,
499 );
500
501 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
502 let mut fail_point_skip_sending = false;
503 fail_point_if!("rb-dkg", || {
504 fail_point_skip_sending = true;
506 });
507 if !fail_point_skip_sending {
508 self.consensus_adapter
509 .submit_to_consensus(&[transaction], &epoch_store)
510 .await?;
511 }
512
513 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
514 if let Some(elapsed) = elapsed {
515 epoch_store
516 .metrics
517 .epoch_random_beacon_dkg_confirmation_time_ms
518 .set(elapsed as i64);
519 }
520 }
521 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
523 }
524 }
525
526 if !self.dkg_output.initialized() && self.used_messages.initialized() {
528 match self
529 .used_messages
530 .get()
531 .expect("checked above that `used_messages` is initialized")
532 .complete_dkg(self.party.clone(), self.confirmations.values())
533 {
534 Ok(output) => {
535 let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
536 let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
537 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
538 info!(
539 "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
540 );
541 epoch_store
542 .metrics
543 .epoch_random_beacon_dkg_num_shares
544 .set(num_shares as i64);
545 epoch_store
546 .metrics
547 .epoch_random_beacon_dkg_epoch_start_completion_time_ms
548 .set(epoch_elapsed as i64);
549 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
550 if let Some(elapsed) = elapsed {
551 epoch_store
552 .metrics
553 .epoch_random_beacon_dkg_completion_time_ms
554 .set(elapsed as i64);
555 }
556 self.dkg_output
557 .set(Some(output.clone()))
558 .expect("checked above that `dkg_output` is uninitialized");
559 self.network_handle.update_epoch(
560 epoch_store.committee().epoch(),
561 self.authority_info.clone(),
562 output.clone(),
563 self.party.t(),
564 None,
565 );
566 consensus_output.set_dkg_output(output);
567 }
568 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
570 }
571 }
572
573 if !self.dkg_output.initialized()
575 && round
576 > epoch_store
577 .protocol_config()
578 .random_beacon_dkg_timeout_round()
579 .into()
580 {
581 error!(
582 "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
583 );
584 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
585 self.dkg_output
586 .set(None)
587 .expect("checked above that `dkg_output` is uninitialized");
588 }
589
590 Ok(())
591 }
592
593 pub fn add_message(
595 &mut self,
596 authority: &AuthorityName,
597 msg: VersionedDkgMessage,
598 ) -> IotaResult {
599 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
603 if !msg.is_valid_version(dkg_version) {
604 warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
605 return Ok(());
606 }
607
608 if self.used_messages.initialized() || self.dkg_output.initialized() {
609 return Ok(());
611 }
612 let Some((_, party_id)) = self.authority_info.get(authority) else {
613 error!("random beacon: received DKG Message from unknown authority: {authority:?}");
614 return Ok(());
615 };
616 if *party_id != msg.sender() {
617 warn!(
618 "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
619 );
620 return Ok(());
621 }
622 if self.enqueued_messages.contains_key(&msg.sender())
623 || self.processed_messages.contains_key(&msg.sender())
624 {
625 info!("ignoring duplicate DKG Message from authority {authority:?}");
626 return Ok(());
627 }
628
629 let party = self.party.clone();
630 self.enqueued_messages.insert(
633 msg.sender(),
634 tokio::task::spawn_blocking(move || {
635 match VersionedProcessedMessage::process(party, msg) {
636 Ok(processed) => Some(processed),
637 Err(err) => {
638 debug!("random beacon: error while processing DKG Message: {err:?}");
639 None
640 }
641 }
642 }),
643 );
644 Ok(())
645 }
646
647 pub(crate) fn add_confirmation(
649 &mut self,
650 output: &mut ConsensusCommitOutput,
651 authority: &AuthorityName,
652 conf: VersionedDkgConfirmation,
653 ) -> IotaResult {
654 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
658 if !conf.is_valid_version(dkg_version) {
659 warn!(
660 "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
661 );
662 return Ok(());
663 }
664
665 if self.dkg_output.initialized() {
666 return Ok(());
668 }
669 let Some((_, party_id)) = self.authority_info.get(authority) else {
670 error!(
671 "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
672 );
673 return Ok(());
674 };
675 if *party_id != conf.sender() {
676 warn!(
677 "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
678 );
679 return Ok(());
680 }
681 self.confirmations.insert(conf.sender(), conf.clone());
682 output.insert_dkg_confirmation(conf);
683 Ok(())
684 }
685
686 pub(crate) fn reserve_next_randomness(
693 &mut self,
694 commit_timestamp: CommitTimestampMs,
695 output: &mut ConsensusCommitOutput,
696 ) -> IotaResult<Option<RandomnessRound>> {
697 let epoch_store = self.epoch_store()?;
698 let tables = epoch_store.tables()?;
699
700 let last_round_timestamp = tables
701 .randomness_last_round_timestamp
702 .get(&SINGLETON_KEY)
703 .expect("typed_store should not fail");
704 if let Some(last_round_timestamp) = last_round_timestamp {
705 if commit_timestamp - last_round_timestamp
706 < epoch_store
707 .protocol_config()
708 .random_beacon_min_round_interval_ms()
709 {
710 return Ok(None);
711 }
712 }
713
714 let randomness_round = self.next_randomness_round;
715 self.next_randomness_round = self
716 .next_randomness_round
717 .checked_add(1)
718 .expect("RandomnessRound should not overflow");
719
720 output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
721
722 Ok(Some(randomness_round))
723 }
724
725 pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
727 self.network_handle
728 .send_partial_signatures(epoch, randomness_round);
729 }
730
731 pub fn dkg_status(&self) -> DkgStatus {
732 match self.dkg_output.get() {
733 Some(Some(_)) => DkgStatus::Successful,
734 Some(None) => DkgStatus::Failed,
735 None => DkgStatus::Pending,
736 }
737 }
738
739 pub fn reporter(&self) -> RandomnessReporter {
742 RandomnessReporter {
743 epoch_store: self.epoch_store.clone(),
744 epoch: self.epoch,
745 network_handle: self.network_handle.clone(),
746 highest_completed_round: self.highest_completed_round.clone(),
747 }
748 }
749
750 fn epoch_store(&self) -> IotaResult<Arc<AuthorityPerEpochStore>> {
751 self.epoch_store
752 .upgrade()
753 .ok_or(IotaError::EpochEnded(self.epoch))
754 }
755
756 fn randomness_dkg_info_from_committee(
757 committee: &Committee,
758 ) -> Vec<(
759 u16,
760 AuthorityName,
761 fastcrypto_tbls::ecies::PublicKey<bls12381::G2Element>,
762 StakeUnit,
763 )> {
764 committee
765 .members()
766 .map(|(name, stake)| {
767 let index: u16 = committee
768 .authority_index(name)
769 .expect("lookup of known committee member should succeed")
770 .try_into()
771 .expect("authority index should fit in u16");
772 let pk = bls12381::G2Element::from_byte_array(
773 committee
774 .public_key(name)
775 .expect("lookup of known committee member should succeed")
776 .as_bytes()
777 .try_into()
778 .expect("key length should match"),
779 )
780 .expect("should work to convert BLS key to G2Element");
781 (
782 index,
783 *name,
784 fastcrypto_tbls::ecies::PublicKey::from(pk),
785 *stake,
786 )
787 })
788 .collect()
789 }
790}
791
792#[derive(Clone)]
795pub struct RandomnessReporter {
796 epoch_store: Weak<AuthorityPerEpochStore>,
797 epoch: EpochId,
798 network_handle: randomness::Handle,
799 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
800}
801
802impl RandomnessReporter {
803 pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> IotaResult {
807 let epoch_store = self
808 .epoch_store
809 .upgrade()
810 .ok_or(IotaError::EpochEnded(self.epoch))?;
811 let mut highest_completed_round = self.highest_completed_round.lock();
812 if Some(round) > *highest_completed_round {
813 *highest_completed_round = Some(round);
814 epoch_store
815 .tables()?
816 .randomness_highest_completed_round
817 .insert(&SINGLETON_KEY, &round)?;
818 self.network_handle
819 .complete_round(epoch_store.committee().epoch(), round);
820 }
821 Ok(())
822 }
823}
824
825#[derive(Debug, Clone, Copy, PartialEq, Eq)]
826pub enum DkgStatus {
827 Pending,
828 Failed,
829 Successful,
830}
831
832#[cfg(test)]
833mod tests {
834 use std::num::NonZeroUsize;
835
836 use consensus_core::{BlockRef, BlockStatus};
837 use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
838 use iota_types::messages_consensus::ConsensusTransactionKind;
839 use tokio::sync::mpsc;
840
841 use crate::{
842 authority::test_authority_builder::TestAuthorityBuilder,
843 consensus_adapter::{
844 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
845 MockConsensusClient,
846 },
847 epoch::randomness::*,
848 mock_consensus::with_block_status,
849 };
850
851 #[tokio::test]
852 async fn test_dkg_v1() {
853 test_dkg(1).await;
854 }
855
856 async fn test_dkg(version: u64) {
857 telemetry_subscribers::init_for_testing();
858
859 let network_config =
860 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
861 .committee_size(NonZeroUsize::new(4).unwrap())
862 .with_reference_gas_price(500)
863 .build();
864
865 let mut protocol_config =
866 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
867 protocol_config.set_random_beacon_dkg_version_for_testing(version);
868
869 let mut epoch_stores = Vec::new();
870 let mut randomness_managers = Vec::new();
871 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
872
873 for validator in network_config.validator_configs.iter() {
874 let mut mock_consensus_client = MockConsensusClient::new();
876 let tx_consensus = tx_consensus.clone();
877 mock_consensus_client
878 .expect_submit()
879 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
880 tx_consensus.try_send(transactions.to_vec()).unwrap();
881 true
882 })
883 .returning(|_, _| Ok(with_block_status(BlockStatus::Sequenced(BlockRef::MIN))));
884
885 let state = TestAuthorityBuilder::new()
886 .with_protocol_config(protocol_config.clone())
887 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
888 .build()
889 .await;
890 let consensus_adapter = Arc::new(ConsensusAdapter::new(
891 Arc::new(mock_consensus_client),
892 state.name,
893 Arc::new(ConnectionMonitorStatusForTests {}),
894 100_000,
895 100_000,
896 None,
897 None,
898 ConsensusAdapterMetrics::new_test(),
899 ));
900 let epoch_store = state.epoch_store_for_testing();
901 let randomness_manager = RandomnessManager::try_new(
902 Arc::downgrade(&epoch_store),
903 Box::new(consensus_adapter.clone()),
904 iota_network::randomness::Handle::new_stub(),
905 validator.authority_key_pair(),
906 )
907 .await
908 .unwrap();
909
910 epoch_stores.push(epoch_store);
911 randomness_managers.push(randomness_manager);
912 }
913
914 let mut dkg_messages = Vec::new();
916 for randomness_manager in randomness_managers.iter_mut() {
917 randomness_manager.start_dkg().await.unwrap();
918
919 let mut dkg_message = rx_consensus.recv().await.unwrap();
920 assert!(dkg_message.len() == 1);
921 match dkg_message.remove(0).kind {
922 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
923 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
924 .expect("DKG message deserialization should not fail");
925 dkg_messages.push(msg);
926 }
927 _ => panic!("wrong type of message sent"),
928 }
929 }
930 for i in 0..randomness_managers.len() {
931 let mut output = ConsensusCommitOutput::new();
932 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
933 randomness_managers[i]
934 .add_message(&epoch_stores[j].name, dkg_message)
935 .unwrap();
936 }
937 randomness_managers[i]
938 .advance_dkg(&mut output, 0)
939 .await
940 .unwrap();
941 let mut batch = epoch_stores[i].db_batch_for_test();
942 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
943 batch.write().unwrap();
944 }
945
946 let mut dkg_confirmations = Vec::new();
948 for _ in 0..randomness_managers.len() {
949 let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
950 assert!(dkg_confirmation.len() == 1);
951 match dkg_confirmation.remove(0).kind {
952 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
953 let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
954 .expect("DKG message deserialization should not fail");
955 dkg_confirmations.push(msg);
956 }
957 _ => panic!("wrong type of message sent"),
958 }
959 }
960 for i in 0..randomness_managers.len() {
961 let mut output = ConsensusCommitOutput::new();
962 for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
963 randomness_managers[i]
964 .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
965 .unwrap();
966 }
967 randomness_managers[i]
968 .advance_dkg(&mut output, 0)
969 .await
970 .unwrap();
971 let mut batch = epoch_stores[i].db_batch_for_test();
972 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
973 batch.write().unwrap();
974 }
975
976 for randomness_manager in &randomness_managers {
978 assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
979 }
980 }
981
982 #[tokio::test]
983 async fn test_dkg_expiration_v1() {
984 test_dkg_expiration(1).await;
985 }
986
987 async fn test_dkg_expiration(version: u64) {
988 telemetry_subscribers::init_for_testing();
989
990 let network_config =
991 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
992 .committee_size(NonZeroUsize::new(4).unwrap())
993 .with_reference_gas_price(500)
994 .build();
995
996 let mut epoch_stores = Vec::new();
997 let mut randomness_managers = Vec::new();
998 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
999
1000 let mut protocol_config =
1001 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1002 protocol_config.set_random_beacon_dkg_version_for_testing(version);
1003
1004 for validator in network_config.validator_configs.iter() {
1005 let mut mock_consensus_client = MockConsensusClient::new();
1007 let tx_consensus = tx_consensus.clone();
1008 mock_consensus_client
1009 .expect_submit()
1010 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1011 tx_consensus.try_send(transactions.to_vec()).unwrap();
1012 true
1013 })
1014 .returning(|_, _| {
1015 Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
1016 BlockRef::MIN,
1017 )))
1018 });
1019
1020 let state = TestAuthorityBuilder::new()
1021 .with_protocol_config(protocol_config.clone())
1022 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1023 .build()
1024 .await;
1025 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1026 Arc::new(mock_consensus_client),
1027 state.name,
1028 Arc::new(ConnectionMonitorStatusForTests {}),
1029 100_000,
1030 100_000,
1031 None,
1032 None,
1033 ConsensusAdapterMetrics::new_test(),
1034 ));
1035 let epoch_store = state.epoch_store_for_testing();
1036 let randomness_manager = RandomnessManager::try_new(
1037 Arc::downgrade(&epoch_store),
1038 Box::new(consensus_adapter.clone()),
1039 iota_network::randomness::Handle::new_stub(),
1040 validator.authority_key_pair(),
1041 )
1042 .await
1043 .unwrap();
1044
1045 epoch_stores.push(epoch_store);
1046 randomness_managers.push(randomness_manager);
1047 }
1048
1049 let mut dkg_messages = Vec::new();
1051 for randomness_manager in randomness_managers.iter_mut() {
1052 randomness_manager.start_dkg().await.unwrap();
1053
1054 let mut dkg_message = rx_consensus.recv().await.unwrap();
1055 assert!(dkg_message.len() == 1);
1056 match dkg_message.remove(0).kind {
1057 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1058 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1059 .expect("DKG message deserialization should not fail");
1060 dkg_messages.push(msg);
1061 }
1062 _ => panic!("wrong type of message sent"),
1063 }
1064 }
1065 for i in 0..randomness_managers.len() {
1066 let mut output = ConsensusCommitOutput::new();
1067 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1068 randomness_managers[i]
1069 .add_message(&epoch_stores[j].name, dkg_message)
1070 .unwrap();
1071 }
1072 randomness_managers[i]
1073 .advance_dkg(&mut output, u64::MAX)
1074 .await
1075 .unwrap();
1076 let mut batch = epoch_stores[i].db_batch_for_test();
1077 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1078 batch.write().unwrap();
1079 }
1080
1081 for randomness_manager in &randomness_managers {
1083 assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1084 }
1085 }
1086}