1use std::{
6 collections::{BTreeMap, HashMap},
7 sync::{Arc, Weak},
8 time::Instant,
9};
10
11use anemo::PeerId;
12use fastcrypto::{
13 encoding::{Encoding, Hex},
14 error::{FastCryptoError, FastCryptoResult},
15 groups::bls12381,
16 serde_helpers::ToFromByteArray,
17 traits::{KeyPair, ToFromBytes},
18};
19use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
20use futures::{StreamExt, stream::FuturesUnordered};
21use iota_macros::fail_point_if;
22use iota_network::randomness;
23use iota_sdk_types::RandomnessRound;
24use iota_types::{
25 base_types::{AuthorityName, CommitRound},
26 committee::{Committee, EpochId, StakeUnit},
27 crypto::AuthorityKeyPair,
28 error::{IotaError, IotaResult},
29 iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
30 messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
31};
32use parking_lot::Mutex;
33use rand::{
34 SeedableRng,
35 rngs::{OsRng, StdRng},
36};
37use serde::{Deserialize, Serialize};
38use tokio::{sync::OnceCell, task::JoinHandle};
39use tracing::{debug, error, info, warn};
40use typed_store::Map;
41
42use crate::{
43 authority::{
44 authority_per_epoch_store::{
45 AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
46 },
47 epoch_start_configuration::EpochStartConfigTrait,
48 },
49 consensus_adapter::SubmitToConsensus,
50};
51
52pub type CommitTimestampMs = u64;
54
55type PkG = bls12381::G2Element;
56type EncG = bls12381::G2Element;
57
58pub const SINGLETON_KEY: u64 = 0;
59
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
63pub enum VersionedProcessedMessage {
64 V1(dkg_v1::ProcessedMessage<PkG, EncG>),
65}
66
67impl VersionedProcessedMessage {
68 pub fn sender(&self) -> PartyId {
69 match self {
70 VersionedProcessedMessage::V1(msg) => msg.message.sender,
71 }
72 }
73
74 pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
75 match self {
76 VersionedProcessedMessage::V1(msg) => msg,
77 }
78 }
79
80 pub fn process(
81 party: Arc<dkg_v1::Party<PkG, EncG>>,
82 message: VersionedDkgMessage,
83 ) -> FastCryptoResult<VersionedProcessedMessage> {
84 let processed = party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
87 Ok(VersionedProcessedMessage::V1(processed))
88 }
89
90 pub fn merge(
91 party: Arc<dkg_v1::Party<PkG, EncG>>,
92 messages: Vec<Self>,
93 ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
94 let (conf, msgs) = party.merge(
97 &messages
98 .into_iter()
99 .map(|vm| vm.unwrap_v1())
100 .collect::<Vec<_>>(),
101 )?;
102 Ok((
103 VersionedDkgConfirmation::V1(conf),
104 VersionedUsedProcessedMessages::V1(msgs),
105 ))
106 }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub enum VersionedUsedProcessedMessages {
111 V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
112}
113
114impl VersionedUsedProcessedMessages {
115 fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
116 &self,
117 party: Arc<dkg_v1::Party<PkG, EncG>>,
118 confirmations: Iter,
119 ) -> FastCryptoResult<Output<PkG, EncG>> {
120 let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
123 let VersionedUsedProcessedMessages::V1(msg) = self;
124 party.complete(
125 msg,
126 &confirmations
127 .map(|vm| vm.unwrap_v1())
128 .cloned()
129 .collect::<Vec<_>>(),
130 rng,
131 )
132 }
133}
134
135pub struct RandomnessManager {
159 epoch_store: Weak<AuthorityPerEpochStore>,
160 epoch: EpochId,
161 consensus_adapter: Box<dyn SubmitToConsensus>,
162 network_handle: randomness::Handle,
163 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
164
165 dkg_start_time: OnceCell<Instant>,
167 party: Arc<dkg_v1::Party<PkG, EncG>>,
168 enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
169 processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
170 used_messages: OnceCell<VersionedUsedProcessedMessages>,
171 confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
172 dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
173
174 next_randomness_round: RandomnessRound,
176 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
177}
178
179impl RandomnessManager {
180 pub async fn try_new(
182 epoch_store_weak: Weak<AuthorityPerEpochStore>,
183 consensus_adapter: Box<dyn SubmitToConsensus>,
184 network_handle: randomness::Handle,
185 authority_key_pair: &AuthorityKeyPair,
186 ) -> Option<Self> {
187 let epoch_store = match epoch_store_weak.upgrade() {
188 Some(epoch_store) => epoch_store,
189 None => {
190 error!(
191 "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
192 );
193 return None;
194 }
195 };
196 let tables = match epoch_store.tables() {
197 Ok(tables) => tables,
198 Err(_) => {
199 error!(
200 "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
201 );
202 return None;
203 }
204 };
205 let protocol_config = epoch_store.protocol_config();
206
207 let name: AuthorityName = authority_key_pair.public().into();
208 let committee = epoch_store.committee();
209 let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
210 if tracing::enabled!(tracing::Level::DEBUG) {
211 for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
213 let pk_bytes = pk.as_element().to_byte_array();
214 debug!(
215 "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
216 );
217 }
218 }
219 let authority_ids: HashMap<_, _> =
220 info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
221 let authority_peer_ids = epoch_store
222 .epoch_start_config()
223 .epoch_start_state()
224 .get_authority_names_to_peer_ids();
225 let authority_info = authority_ids
226 .into_iter()
227 .map(|(name, id)| {
228 let peer_id = *authority_peer_ids
229 .get(&name)
230 .expect("authority name should be in peer_ids");
231 (name, (peer_id, id))
232 })
233 .collect();
234 let nodes = info
235 .iter()
236 .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
237 id: *id,
238 pk: pk.clone(),
239 weight: (*stake).try_into().expect("stake should fit in u16"),
240 })
241 .collect();
242 let (nodes, t) = match nodes::Nodes::new_reduced(
243 nodes,
244 committee
245 .validity_threshold()
246 .try_into()
247 .expect("validity threshold should fit in u16"),
248 protocol_config.random_beacon_reduction_allowed_delta(),
249 protocol_config
250 .random_beacon_reduction_lower_bound()
251 .try_into()
252 .expect("should fit u16"),
253 ) {
254 Ok((nodes, t)) => (nodes, t),
255 Err(err) => {
256 error!("random beacon: error while initializing Nodes: {err:?}");
257 return None;
258 }
259 };
260 let total_weight = nodes.total_weight();
261 let num_nodes = nodes.num_nodes();
262 let prefix_str = format!(
263 "dkg {} {}",
264 Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
265 committee.epoch()
266 );
267 let randomness_private_key = bls12381::Scalar::from_byte_array(
268 authority_key_pair
269 .copy()
270 .private()
271 .as_bytes()
272 .try_into()
273 .expect("key length should match"),
274 )
275 .expect("should work to convert BLS key to Scalar");
276 let party = match dkg_v1::Party::<PkG, EncG>::new(
277 fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
278 randomness_private_key,
279 ),
280 nodes,
281 t,
282 fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
283 &mut rand::thread_rng(),
284 ) {
285 Ok(party) => party,
286 Err(err) => {
287 error!("random beacon: error while initializing Party: {err:?}");
288 return None;
289 }
290 };
291 info!(
292 "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
293 );
294
295 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
299 epoch_store
300 .metrics
301 .epoch_random_beacon_dkg_num_shares
302 .set(0);
303
304 let highest_completed_round = tables
306 .randomness_highest_completed_round
307 .get(&SINGLETON_KEY)
308 .expect("typed_store should not fail");
309 let mut rm = RandomnessManager {
310 epoch_store: epoch_store_weak,
311 epoch: committee.epoch(),
312 consensus_adapter,
313 network_handle: network_handle.clone(),
314 authority_info,
315 dkg_start_time: OnceCell::new(),
316 party: Arc::new(party),
317 enqueued_messages: BTreeMap::new(),
318 processed_messages: BTreeMap::new(),
319 used_messages: OnceCell::new(),
320 confirmations: BTreeMap::new(),
321 dkg_output: OnceCell::new(),
322 next_randomness_round: RandomnessRound::new(0),
323 highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
324 };
325 let dkg_output = match tables
329 .dkg_output_v2
330 .get(&SINGLETON_KEY)
331 .expect("typed_store should not fail")
332 {
333 Some(persisted) => Some(persisted),
334 None => tables
335 .dkg_output
336 .get(&SINGLETON_KEY)
337 .expect("typed_store should not fail")
338 .map(Some),
339 };
340 match dkg_output {
341 Some(Some(dkg_output)) => {
342 info!(
343 "random beacon: loaded existing DKG output for epoch {}",
344 committee.epoch()
345 );
346 epoch_store
347 .metrics
348 .epoch_random_beacon_dkg_num_shares
349 .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
350 rm.dkg_output
351 .set(Some(dkg_output.clone()))
352 .expect("setting new OnceCell should succeed");
353 network_handle.update_epoch(
354 committee.epoch(),
355 rm.authority_info.clone(),
356 dkg_output,
357 rm.party.t(),
358 highest_completed_round,
359 );
360 }
361 Some(None) => {
362 error!(
365 "random beacon: loaded failed DKG for epoch {}. Randomness disabled for this epoch. All randomness-using transactions will fail.",
366 committee.epoch()
367 );
368 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
369 rm.dkg_output
370 .set(None)
371 .expect("setting new OnceCell should succeed");
372 }
373 None => {
374 info!(
375 "random beacon: no existing DKG output found for epoch {}",
376 committee.epoch()
377 );
378
379 assert!(
381 epoch_store.protocol_config().dkg_version() > 0,
382 "BUG: DKG version 0 is deprecated"
383 );
384 rm.processed_messages.extend(
385 tables
386 .dkg_processed_messages
387 .safe_iter()
388 .map(|result| result.expect("typed_store should not fail")),
389 );
390 if let Some(used_messages) = tables
391 .dkg_used_messages
392 .get(&SINGLETON_KEY)
393 .expect("typed_store should not fail")
394 {
395 rm.used_messages
396 .set(used_messages)
397 .expect("setting new OnceCell should succeed");
398 }
399 rm.confirmations.extend(
400 tables
401 .dkg_confirmations
402 .safe_iter()
403 .map(|result| result.expect("typed_store should not fail")),
404 );
405 }
406 }
407
408 rm.next_randomness_round = tables
413 .randomness_next_round
414 .get(&SINGLETON_KEY)
415 .expect("typed_store should not fail")
416 .unwrap_or(RandomnessRound::new(0));
417 info!(
418 "random beacon: starting from next_randomness_round={}",
419 rm.next_randomness_round
420 );
421 let first_incomplete_round = highest_completed_round
422 .map(|r| r + 1)
423 .unwrap_or(RandomnessRound::new(0));
424 if first_incomplete_round < rm.next_randomness_round {
425 info!(
426 "random beacon: resuming generation for randomness rounds from {first_incomplete_round} to {}",
427 rm.next_randomness_round - 1,
428 );
429 for r in first_incomplete_round.value()..rm.next_randomness_round.value() {
430 network_handle.send_partial_signatures(committee.epoch(), RandomnessRound::new(r));
431 }
432 }
433
434 Some(rm)
435 }
436
437 pub async fn start_dkg(&mut self) -> IotaResult {
439 if self.used_messages.initialized() || self.dkg_output.initialized() {
440 return Ok(());
442 }
443
444 let _ = self.dkg_start_time.set(Instant::now());
445
446 let epoch_store = self.epoch_store()?;
447 let dkg_version = epoch_store.protocol_config().dkg_version();
448 info!("random beacon: starting DKG, version {dkg_version}");
449
450 let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
451 Ok(msg) => msg,
452 Err(FastCryptoError::IgnoredMessage) => {
453 info!(
454 "random beacon: no DKG Message for party id={} (zero weight)",
455 self.party.id
456 );
457 return Ok(());
458 }
459 Err(e) => {
460 error!("random beacon: error while creating a DKG Message: {e:?}");
461 return Ok(());
462 }
463 };
464
465 info!("random beacon: created {msg:?} with dkg version {dkg_version}");
466 let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
467
468 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
469 let mut fail_point_skip_sending = false;
470 fail_point_if!("rb-dkg", || {
471 fail_point_skip_sending = true;
473 });
474 if !fail_point_skip_sending {
475 self.consensus_adapter
476 .submit_to_consensus(&[transaction], &epoch_store)?;
477 }
478
479 epoch_store
480 .metrics
481 .epoch_random_beacon_dkg_message_time_ms
482 .set(
483 self.dkg_start_time
484 .get()
485 .unwrap() .elapsed()
487 .as_millis() as i64,
488 );
489 Ok(())
490 }
491
492 pub(crate) async fn advance_dkg(
496 &mut self,
497 consensus_output: &mut ConsensusCommitOutput,
498 round: CommitRound,
499 ) -> IotaResult {
500 let epoch_store = self.epoch_store()?;
501
502 if !self.dkg_output.initialized() && !self.used_messages.initialized() {
504 let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
506 .into_values()
507 .collect();
508 while let Some(res) = handles.next().await {
509 if let Ok(Some(processed)) = res {
510 self.processed_messages
511 .insert(processed.sender(), processed.clone());
512 consensus_output.insert_dkg_processed_message(processed);
513 }
514 }
515
516 match VersionedProcessedMessage::merge(
518 self.party.clone(),
519 self.processed_messages
520 .values()
521 .cloned()
522 .collect::<Vec<_>>(),
523 ) {
524 Ok((conf, used_msgs)) => {
525 info!(
526 "random beacon: sending DKG Confirmation with {} complaints",
527 conf.num_of_complaints()
528 );
529 if self.used_messages.set(used_msgs.clone()).is_err() {
530 error!("BUG: used_messages should only ever be set once");
531 }
532 consensus_output.insert_dkg_used_messages(used_msgs);
533
534 let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
535 epoch_store.name,
536 &conf,
537 );
538
539 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
540 let mut fail_point_skip_sending = false;
541 fail_point_if!("rb-dkg", || {
542 fail_point_skip_sending = true;
544 });
545 if !fail_point_skip_sending {
546 self.consensus_adapter
547 .submit_to_consensus(&[transaction], &epoch_store)?;
548 }
549
550 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
551 if let Some(elapsed) = elapsed {
552 epoch_store
553 .metrics
554 .epoch_random_beacon_dkg_confirmation_time_ms
555 .set(elapsed as i64);
556 }
557 }
558 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
560 }
561 }
562
563 if !self.dkg_output.initialized() && self.used_messages.initialized() {
565 match self
566 .used_messages
567 .get()
568 .expect("checked above that `used_messages` is initialized")
569 .complete_dkg(self.party.clone(), self.confirmations.values())
570 {
571 Ok(output) => {
572 let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
573 let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
574 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
575 info!(
576 "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
577 );
578 epoch_store
579 .metrics
580 .epoch_random_beacon_dkg_num_shares
581 .set(num_shares as i64);
582 epoch_store
583 .metrics
584 .epoch_random_beacon_dkg_epoch_start_completion_time_ms
585 .set(epoch_elapsed as i64);
586 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
587 if let Some(elapsed) = elapsed {
588 epoch_store
589 .metrics
590 .epoch_random_beacon_dkg_completion_time_ms
591 .set(elapsed as i64);
592 }
593 self.dkg_output
594 .set(Some(output.clone()))
595 .expect("checked above that `dkg_output` is uninitialized");
596 self.network_handle.update_epoch(
597 epoch_store.committee().epoch(),
598 self.authority_info.clone(),
599 output.clone(),
600 self.party.t(),
601 None,
602 );
603 consensus_output.set_dkg_output(Some(output));
604 }
605 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
607 }
608 }
609
610 if !self.dkg_output.initialized()
612 && round
613 > epoch_store
614 .protocol_config()
615 .random_beacon_dkg_timeout_round() as u64
616 {
617 error!(
618 "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
619 );
620 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
621 self.dkg_output
622 .set(None)
623 .expect("checked above that `dkg_output` is uninitialized");
624 consensus_output.set_dkg_output(None);
628 }
629
630 Ok(())
631 }
632
633 pub fn add_message(
635 &mut self,
636 authority: &AuthorityName,
637 msg: VersionedDkgMessage,
638 ) -> IotaResult {
639 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
643 if !msg.is_valid_version(dkg_version) {
644 warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
645 return Ok(());
646 }
647
648 if self.used_messages.initialized() || self.dkg_output.initialized() {
649 return Ok(());
651 }
652 let Some((_, party_id)) = self.authority_info.get(authority) else {
653 error!("random beacon: received DKG Message from unknown authority: {authority:?}");
654 return Ok(());
655 };
656 if *party_id != msg.sender() {
657 warn!(
658 "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id}"
659 );
660 return Ok(());
661 }
662 if self.enqueued_messages.contains_key(&msg.sender())
663 || self.processed_messages.contains_key(&msg.sender())
664 {
665 info!("ignoring duplicate DKG Message from authority {authority:?}");
666 return Ok(());
667 }
668
669 let party = self.party.clone();
670 self.enqueued_messages.insert(
673 msg.sender(),
674 tokio::task::spawn_blocking(move || {
675 match VersionedProcessedMessage::process(party, msg) {
676 Ok(processed) => Some(processed),
677 Err(err) => {
678 debug!("random beacon: error while processing DKG Message: {err:?}");
679 None
680 }
681 }
682 }),
683 );
684 Ok(())
685 }
686
687 pub(crate) fn add_confirmation(
689 &mut self,
690 output: &mut ConsensusCommitOutput,
691 authority: &AuthorityName,
692 conf: VersionedDkgConfirmation,
693 ) -> IotaResult {
694 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
698 if !conf.is_valid_version(dkg_version) {
699 warn!(
700 "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
701 );
702 return Ok(());
703 }
704
705 if self.dkg_output.initialized() {
706 return Ok(());
708 }
709 let Some((_, party_id)) = self.authority_info.get(authority) else {
710 error!(
711 "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
712 );
713 return Ok(());
714 };
715 if *party_id != conf.sender() {
716 warn!(
717 "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id}"
718 );
719 return Ok(());
720 }
721 self.confirmations.insert(conf.sender(), conf.clone());
722 output.insert_dkg_confirmation(conf);
723 Ok(())
724 }
725
726 pub(crate) fn reserve_next_randomness(
733 &mut self,
734 commit_timestamp: CommitTimestampMs,
735 output: &mut ConsensusCommitOutput,
736 ) -> IotaResult<Option<RandomnessRound>> {
737 let epoch_store = self.epoch_store()?;
738
739 let last_round_timestamp = epoch_store
740 .get_randomness_last_round_timestamp()
741 .expect("read should not fail");
742
743 if let Some(last_round_timestamp) = last_round_timestamp {
744 if commit_timestamp - last_round_timestamp
745 < epoch_store
746 .protocol_config()
747 .random_beacon_min_round_interval_ms()
748 {
749 return Ok(None);
750 }
751 }
752
753 let randomness_round = self.next_randomness_round;
754 self.next_randomness_round = self
755 .next_randomness_round
756 .checked_add(1)
757 .ok_or_else(|| IotaError::Unknown("RandomnessRound overflow".to_string()))?;
758
759 output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
760
761 Ok(Some(randomness_round))
762 }
763
764 pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
766 self.network_handle
767 .send_partial_signatures(epoch, randomness_round);
768 }
769
770 pub fn dkg_status(&self) -> DkgStatus {
771 match self.dkg_output.get() {
772 Some(Some(_)) => DkgStatus::Successful,
773 Some(None) => DkgStatus::Failed,
774 None => DkgStatus::Pending,
775 }
776 }
777
778 pub fn reporter(&self) -> RandomnessReporter {
781 RandomnessReporter {
782 epoch_store: self.epoch_store.clone(),
783 epoch: self.epoch,
784 network_handle: self.network_handle.clone(),
785 highest_completed_round: self.highest_completed_round.clone(),
786 }
787 }
788
789 fn epoch_store(&self) -> IotaResult<Arc<AuthorityPerEpochStore>> {
790 self.epoch_store
791 .upgrade()
792 .ok_or(IotaError::EpochEnded(self.epoch))
793 }
794
795 fn randomness_dkg_info_from_committee(
796 committee: &Committee,
797 ) -> Vec<(
798 u16,
799 AuthorityName,
800 fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
801 StakeUnit,
802 )> {
803 committee
804 .members()
805 .map(|(name, stake)| {
806 let index: u16 = committee
807 .authority_index(name)
808 .expect("lookup of known committee member should succeed")
809 .try_into()
810 .expect("authority index should fit in u16");
811 let pk = bls12381::G2Element::from_byte_array(
812 committee
813 .public_key(name)
814 .expect("lookup of known committee member should succeed")
815 .as_bytes()
816 .try_into()
817 .expect("key length should match"),
818 )
819 .expect("should work to convert BLS key to G2Element");
820 (
821 index,
822 *name,
823 fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
824 *stake,
825 )
826 })
827 .collect()
828 }
829}
830
831#[derive(Clone)]
834pub struct RandomnessReporter {
835 epoch_store: Weak<AuthorityPerEpochStore>,
836 epoch: EpochId,
837 network_handle: randomness::Handle,
838 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
839}
840
841impl RandomnessReporter {
842 pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> IotaResult {
846 let epoch_store = self
847 .epoch_store
848 .upgrade()
849 .ok_or(IotaError::EpochEnded(self.epoch))?;
850 let mut highest_completed_round = self.highest_completed_round.lock();
851 if Some(round) > *highest_completed_round {
852 *highest_completed_round = Some(round);
853 epoch_store
854 .tables()?
855 .randomness_highest_completed_round
856 .insert(&SINGLETON_KEY, &round)?;
857 self.network_handle
858 .complete_round(epoch_store.committee().epoch(), round);
859 }
860 Ok(())
861 }
862}
863
864#[derive(Debug, Clone, Copy, PartialEq, Eq)]
865pub enum DkgStatus {
866 Pending,
867 Failed,
868 Successful,
869}
870
871#[cfg(test)]
872mod tests {
873 use std::num::NonZeroUsize;
874
875 use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
876 use iota_types::{crypto::AuthorityKeyPair, messages_consensus::ConsensusTransactionKind};
877 use starfish_core::{BlockRef, BlockStatus};
878 use tokio::sync::mpsc;
879
880 use crate::{
881 authority::{
882 authority_per_epoch_store::{
883 AuthorityPerEpochStore, ExecutionIndices, ExecutionIndicesWithStats,
884 },
885 test_authority_builder::TestAuthorityBuilder,
886 },
887 checkpoints::CheckpointStore,
888 consensus_adapter::{
889 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
890 MockConsensusClient,
891 },
892 epoch::randomness::*,
893 mock_consensus::with_block_status,
894 };
895
896 #[tokio::test]
897 async fn test_dkg_v1() {
898 test_dkg(1).await;
899 }
900
901 async fn test_dkg(version: u64) {
902 telemetry_subscribers::init_for_testing();
903
904 let network_config =
905 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
906 .committee_size(NonZeroUsize::new(4).unwrap())
907 .with_reference_gas_price(500)
908 .build();
909
910 let mut protocol_config =
911 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
912 protocol_config.set_random_beacon_dkg_version_for_testing(version);
913
914 let mut epoch_stores = Vec::new();
915 let mut randomness_managers = Vec::new();
916 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
917
918 for validator in network_config.validator_configs.iter() {
919 let mut mock_consensus_client = MockConsensusClient::new();
921 let tx_consensus = tx_consensus.clone();
922 mock_consensus_client
923 .expect_submit()
924 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
925 tx_consensus.try_send(transactions.to_vec()).unwrap();
926 true
927 })
928 .returning(|_, _| {
929 Ok(with_block_status(BlockStatus::Sequenced(
930 starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
931 )))
932 });
933
934 let state = TestAuthorityBuilder::new()
935 .with_protocol_config(protocol_config.clone())
936 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
937 .build()
938 .await;
939 let consensus_adapter = Arc::new(ConsensusAdapter::new(
940 Arc::new(mock_consensus_client),
941 CheckpointStore::new_for_tests(),
942 state.name,
943 Arc::new(ConnectionMonitorStatusForTests {}),
944 100_000,
945 100_000,
946 None,
947 None,
948 ConsensusAdapterMetrics::new_test(),
949 ));
950 let epoch_store = state.epoch_store_for_testing();
951 let randomness_manager = RandomnessManager::try_new(
952 Arc::downgrade(&epoch_store),
953 Box::new(consensus_adapter.clone()),
954 iota_network::randomness::Handle::new_stub(),
955 validator.authority_key_pair(),
956 )
957 .await
958 .unwrap();
959
960 epoch_stores.push(epoch_store);
961 randomness_managers.push(randomness_manager);
962 }
963
964 let mut dkg_messages = Vec::new();
966 for randomness_manager in randomness_managers.iter_mut() {
967 randomness_manager.start_dkg().await.unwrap();
968
969 let mut dkg_message = rx_consensus.recv().await.unwrap();
970 assert!(dkg_message.len() == 1);
971 match dkg_message.remove(0).kind {
972 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
973 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
974 .expect("DKG message deserialization should not fail");
975 dkg_messages.push(msg);
976 }
977 _ => panic!("wrong type of message sent"),
978 }
979 }
980 for i in 0..randomness_managers.len() {
981 let mut output = ConsensusCommitOutput::new(0);
982 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
983 index: ExecutionIndices {
984 last_committed_round: 0,
985 ..Default::default()
986 },
987 ..Default::default()
988 });
989 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
990 randomness_managers[i]
991 .add_message(&epoch_stores[j].name, dkg_message)
992 .unwrap();
993 }
994 randomness_managers[i]
995 .advance_dkg(&mut output, 0)
996 .await
997 .unwrap();
998 let mut batch = epoch_stores[i].db_batch_for_test();
999 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1000 batch.write().unwrap();
1001 }
1002
1003 let mut dkg_confirmations = Vec::new();
1005 for _ in 0..randomness_managers.len() {
1006 let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
1007 assert!(dkg_confirmation.len() == 1);
1008 match dkg_confirmation.remove(0).kind {
1009 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
1010 let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
1011 .expect("DKG message deserialization should not fail");
1012 dkg_confirmations.push(msg);
1013 }
1014 _ => panic!("wrong type of message sent"),
1015 }
1016 }
1017 for i in 0..randomness_managers.len() {
1018 let mut output = ConsensusCommitOutput::new(0);
1019 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1020 index: ExecutionIndices {
1021 last_committed_round: 1,
1022 ..Default::default()
1023 },
1024 ..Default::default()
1025 });
1026 for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
1027 randomness_managers[i]
1028 .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
1029 .unwrap();
1030 }
1031 randomness_managers[i]
1032 .advance_dkg(&mut output, 0)
1033 .await
1034 .unwrap();
1035 let mut batch = epoch_stores[i].db_batch_for_test();
1036 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1037 batch.write().unwrap();
1038 }
1039
1040 for randomness_manager in &randomness_managers {
1042 assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
1043 }
1044 }
1045
1046 #[tokio::test]
1047 async fn test_dkg_expiration_v1() {
1048 test_dkg_expiration(1).await;
1049 }
1050
1051 async fn test_dkg_expiration(version: u64) {
1052 telemetry_subscribers::init_for_testing();
1053
1054 let network_config =
1055 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1056 .committee_size(NonZeroUsize::new(4).unwrap())
1057 .with_reference_gas_price(500)
1058 .build();
1059
1060 let mut epoch_stores = Vec::new();
1061 let mut randomness_managers = Vec::new();
1062 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1063
1064 let mut protocol_config =
1065 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1066 protocol_config.set_random_beacon_dkg_version_for_testing(version);
1067
1068 for validator in network_config.validator_configs.iter() {
1069 let mut mock_consensus_client = MockConsensusClient::new();
1071 let tx_consensus = tx_consensus.clone();
1072 mock_consensus_client
1073 .expect_submit()
1074 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1075 tx_consensus.try_send(transactions.to_vec()).unwrap();
1076 true
1077 })
1078 .returning(|_, _| {
1079 Ok(with_block_status(starfish_core::BlockStatus::Sequenced(
1080 starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
1081 )))
1082 });
1083
1084 let state = TestAuthorityBuilder::new()
1085 .with_protocol_config(protocol_config.clone())
1086 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1087 .build()
1088 .await;
1089 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1090 Arc::new(mock_consensus_client),
1091 CheckpointStore::new_for_tests(),
1092 state.name,
1093 Arc::new(ConnectionMonitorStatusForTests {}),
1094 100_000,
1095 100_000,
1096 None,
1097 None,
1098 ConsensusAdapterMetrics::new_test(),
1099 ));
1100 let epoch_store = state.epoch_store_for_testing();
1101 let randomness_manager = RandomnessManager::try_new(
1102 Arc::downgrade(&epoch_store),
1103 Box::new(consensus_adapter.clone()),
1104 iota_network::randomness::Handle::new_stub(),
1105 validator.authority_key_pair(),
1106 )
1107 .await
1108 .unwrap();
1109
1110 epoch_stores.push(epoch_store);
1111 randomness_managers.push(randomness_manager);
1112 }
1113
1114 let mut dkg_messages = Vec::new();
1116 for randomness_manager in randomness_managers.iter_mut() {
1117 randomness_manager.start_dkg().await.unwrap();
1118
1119 let mut dkg_message = rx_consensus.recv().await.unwrap();
1120 assert!(dkg_message.len() == 1);
1121 match dkg_message.remove(0).kind {
1122 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1123 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1124 .expect("DKG message deserialization should not fail");
1125 dkg_messages.push(msg);
1126 }
1127 _ => panic!("wrong type of message sent"),
1128 }
1129 }
1130 for i in 0..randomness_managers.len() {
1131 let mut output = ConsensusCommitOutput::new(0);
1132 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1133 index: ExecutionIndices {
1134 last_committed_round: 0,
1135 ..Default::default()
1136 },
1137 ..Default::default()
1138 });
1139 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1140 randomness_managers[i]
1141 .add_message(&epoch_stores[j].name, dkg_message)
1142 .unwrap();
1143 }
1144 randomness_managers[i]
1145 .advance_dkg(&mut output, u64::MAX)
1146 .await
1147 .unwrap();
1148 let mut batch = epoch_stores[i].db_batch_for_test();
1149 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1150 batch.write().unwrap();
1151 }
1152
1153 for randomness_manager in &randomness_managers {
1155 assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1156 }
1157
1158 for (epoch_store, validator) in epoch_stores
1165 .iter()
1166 .zip(network_config.validator_configs.iter())
1167 {
1168 let recovered = recover_randomness_manager(
1169 epoch_store,
1170 validator.authority_key_pair(),
1171 tx_consensus.clone(),
1172 )
1173 .await;
1174 assert_eq!(DkgStatus::Failed, recovered.dkg_status());
1175 }
1176 }
1177
1178 async fn recover_randomness_manager(
1183 epoch_store: &Arc<AuthorityPerEpochStore>,
1184 authority_key_pair: &AuthorityKeyPair,
1185 tx_consensus: mpsc::Sender<Vec<ConsensusTransaction>>,
1186 ) -> RandomnessManager {
1187 let mut mock_consensus_client = MockConsensusClient::new();
1188 mock_consensus_client
1189 .expect_submit()
1190 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1191 tx_consensus.try_send(transactions.to_vec()).unwrap();
1192 true
1193 })
1194 .returning(|_, _| {
1195 Ok(with_block_status(BlockStatus::Sequenced(
1196 starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
1197 )))
1198 });
1199 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1200 Arc::new(mock_consensus_client),
1201 CheckpointStore::new_for_tests(),
1202 epoch_store.name,
1203 Arc::new(ConnectionMonitorStatusForTests {}),
1204 100_000,
1205 100_000,
1206 None,
1207 None,
1208 ConsensusAdapterMetrics::new_test(),
1209 ));
1210 RandomnessManager::try_new(
1211 Arc::downgrade(epoch_store),
1212 Box::new(consensus_adapter),
1213 iota_network::randomness::Handle::new_stub(),
1214 authority_key_pair,
1215 )
1216 .await
1217 .unwrap()
1218 }
1219
1220 #[tokio::test]
1226 async fn test_dkg_recovers_processed_messages_after_restart() {
1227 telemetry_subscribers::init_for_testing();
1228
1229 let network_config =
1230 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1231 .committee_size(NonZeroUsize::new(4).unwrap())
1232 .with_reference_gas_price(500)
1233 .build();
1234
1235 let mut protocol_config =
1236 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1237 protocol_config.set_random_beacon_dkg_version_for_testing(1);
1238
1239 let mut epoch_stores = Vec::new();
1240 let mut randomness_managers = Vec::new();
1241 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1242
1243 for validator in network_config.validator_configs.iter() {
1244 let mut mock_consensus_client = MockConsensusClient::new();
1245 let tx_consensus = tx_consensus.clone();
1246 mock_consensus_client
1247 .expect_submit()
1248 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1249 tx_consensus.try_send(transactions.to_vec()).unwrap();
1250 true
1251 })
1252 .returning(|_, _| {
1253 Ok(with_block_status(BlockStatus::Sequenced(
1254 starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
1255 )))
1256 });
1257
1258 let state = TestAuthorityBuilder::new()
1259 .with_protocol_config(protocol_config.clone())
1260 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1261 .build()
1262 .await;
1263 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1264 Arc::new(mock_consensus_client),
1265 CheckpointStore::new_for_tests(),
1266 state.name,
1267 Arc::new(ConnectionMonitorStatusForTests {}),
1268 100_000,
1269 100_000,
1270 None,
1271 None,
1272 ConsensusAdapterMetrics::new_test(),
1273 ));
1274 let epoch_store = state.epoch_store_for_testing();
1275 let randomness_manager = RandomnessManager::try_new(
1276 Arc::downgrade(&epoch_store),
1277 Box::new(consensus_adapter.clone()),
1278 iota_network::randomness::Handle::new_stub(),
1279 validator.authority_key_pair(),
1280 )
1281 .await
1282 .unwrap();
1283
1284 epoch_stores.push(epoch_store);
1285 randomness_managers.push(randomness_manager);
1286 }
1287
1288 let mut dkg_messages = Vec::new();
1290 for randomness_manager in randomness_managers.iter_mut() {
1291 randomness_manager.start_dkg().await.unwrap();
1292
1293 let mut dkg_message = rx_consensus.recv().await.unwrap();
1294 assert!(dkg_message.len() == 1);
1295 match dkg_message.remove(0).kind {
1296 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1297 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1298 .expect("DKG message deserialization should not fail");
1299 dkg_messages.push(msg);
1300 }
1301 _ => panic!("wrong type of message sent"),
1302 }
1303 }
1304 for i in 0..randomness_managers.len() {
1305 let mut output = ConsensusCommitOutput::new(0);
1306 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1307 index: ExecutionIndices {
1308 last_committed_round: 0,
1309 ..Default::default()
1310 },
1311 ..Default::default()
1312 });
1313 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1314 randomness_managers[i]
1315 .add_message(&epoch_stores[j].name, dkg_message)
1316 .unwrap();
1317 }
1318 randomness_managers[i]
1319 .advance_dkg(&mut output, 0)
1320 .await
1321 .unwrap();
1322 let mut batch = epoch_stores[i].db_batch_for_test();
1323 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1324 batch.write().unwrap();
1325 }
1326
1327 for randomness_manager in &randomness_managers {
1330 assert_eq!(DkgStatus::Pending, randomness_manager.dkg_status());
1331 assert_eq!(
1332 network_config.validator_configs.len(),
1333 randomness_manager.processed_messages.len()
1334 );
1335 assert!(randomness_manager.used_messages.initialized());
1336 }
1337
1338 for (epoch_store, validator) in epoch_stores
1341 .iter()
1342 .zip(network_config.validator_configs.iter())
1343 {
1344 let recovered = recover_randomness_manager(
1345 epoch_store,
1346 validator.authority_key_pair(),
1347 tx_consensus.clone(),
1348 )
1349 .await;
1350 assert_eq!(DkgStatus::Pending, recovered.dkg_status());
1351 assert_eq!(
1352 network_config.validator_configs.len(),
1353 recovered.processed_messages.len()
1354 );
1355 assert!(recovered.used_messages.initialized());
1356 }
1357 }
1358}