1use std::{
6 collections::{BTreeMap, HashMap},
7 sync::{Arc, Weak},
8 time::Instant,
9};
10
11use anemo::PeerId;
12use fastcrypto::{
13 encoding::{Encoding, Hex},
14 error::{FastCryptoError, FastCryptoResult},
15 groups::bls12381,
16 serde_helpers::ToFromByteArray,
17 traits::{KeyPair, ToFromBytes},
18};
19use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
20use futures::{StreamExt, stream::FuturesUnordered};
21use iota_macros::fail_point_if;
22use iota_network::randomness;
23use iota_types::{
24 base_types::{AuthorityName, CommitRound},
25 committee::{Committee, EpochId, StakeUnit},
26 crypto::{AuthorityKeyPair, RandomnessRound},
27 error::{IotaError, IotaResult},
28 iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
29 messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
30};
31use parking_lot::Mutex;
32use rand::{
33 SeedableRng,
34 rngs::{OsRng, StdRng},
35};
36use serde::{Deserialize, Serialize};
37use tokio::{sync::OnceCell, task::JoinHandle};
38use tracing::{debug, error, info, warn};
39use typed_store::Map;
40
41use crate::{
42 authority::{
43 authority_per_epoch_store::{
44 AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
45 },
46 epoch_start_configuration::EpochStartConfigTrait,
47 },
48 consensus_adapter::SubmitToConsensus,
49};
50
51pub type CommitTimestampMs = u64;
53
54type PkG = bls12381::G2Element;
55type EncG = bls12381::G2Element;
56
57pub const SINGLETON_KEY: u64 = 0;
58
59#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub enum VersionedProcessedMessage {
63 V1(dkg_v1::ProcessedMessage<PkG, EncG>),
64}
65
66impl VersionedProcessedMessage {
67 pub fn sender(&self) -> PartyId {
68 match self {
69 VersionedProcessedMessage::V1(msg) => msg.message.sender,
70 }
71 }
72
73 pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
74 match self {
75 VersionedProcessedMessage::V1(msg) => msg,
76 }
77 }
78
79 pub fn process(
80 party: Arc<dkg_v1::Party<PkG, EncG>>,
81 message: VersionedDkgMessage,
82 ) -> FastCryptoResult<VersionedProcessedMessage> {
83 let processed = party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
86 Ok(VersionedProcessedMessage::V1(processed))
87 }
88
89 pub fn merge(
90 party: Arc<dkg_v1::Party<PkG, EncG>>,
91 messages: Vec<Self>,
92 ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
93 let (conf, msgs) = party.merge(
96 &messages
97 .into_iter()
98 .map(|vm| vm.unwrap_v1())
99 .collect::<Vec<_>>(),
100 )?;
101 Ok((
102 VersionedDkgConfirmation::V1(conf),
103 VersionedUsedProcessedMessages::V1(msgs),
104 ))
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109pub enum VersionedUsedProcessedMessages {
110 V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
111}
112
113impl VersionedUsedProcessedMessages {
114 fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
115 &self,
116 party: Arc<dkg_v1::Party<PkG, EncG>>,
117 confirmations: Iter,
118 ) -> FastCryptoResult<Output<PkG, EncG>> {
119 let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
122 let VersionedUsedProcessedMessages::V1(msg) = self;
123 party.complete(
124 msg,
125 &confirmations
126 .map(|vm| vm.unwrap_v1())
127 .cloned()
128 .collect::<Vec<_>>(),
129 rng,
130 )
131 }
132}
133
134pub struct RandomnessManager {
158 epoch_store: Weak<AuthorityPerEpochStore>,
159 epoch: EpochId,
160 consensus_adapter: Box<dyn SubmitToConsensus>,
161 network_handle: randomness::Handle,
162 authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
163
164 dkg_start_time: OnceCell<Instant>,
166 party: Arc<dkg_v1::Party<PkG, EncG>>,
167 enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
168 processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
169 used_messages: OnceCell<VersionedUsedProcessedMessages>,
170 confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
171 dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
172
173 next_randomness_round: RandomnessRound,
175 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
176}
177
178impl RandomnessManager {
179 pub async fn try_new(
181 epoch_store_weak: Weak<AuthorityPerEpochStore>,
182 consensus_adapter: Box<dyn SubmitToConsensus>,
183 network_handle: randomness::Handle,
184 authority_key_pair: &AuthorityKeyPair,
185 ) -> Option<Self> {
186 let epoch_store = match epoch_store_weak.upgrade() {
187 Some(epoch_store) => epoch_store,
188 None => {
189 error!(
190 "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
191 );
192 return None;
193 }
194 };
195 let tables = match epoch_store.tables() {
196 Ok(tables) => tables,
197 Err(_) => {
198 error!(
199 "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
200 );
201 return None;
202 }
203 };
204 let protocol_config = epoch_store.protocol_config();
205
206 let name: AuthorityName = authority_key_pair.public().into();
207 let committee = epoch_store.committee();
208 let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
209 if tracing::enabled!(tracing::Level::DEBUG) {
210 for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
212 let pk_bytes = pk.as_element().to_byte_array();
213 debug!(
214 "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
215 );
216 }
217 }
218 let authority_ids: HashMap<_, _> =
219 info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
220 let authority_peer_ids = epoch_store
221 .epoch_start_config()
222 .epoch_start_state()
223 .get_authority_names_to_peer_ids();
224 let authority_info = authority_ids
225 .into_iter()
226 .map(|(name, id)| {
227 let peer_id = *authority_peer_ids
228 .get(&name)
229 .expect("authority name should be in peer_ids");
230 (name, (peer_id, id))
231 })
232 .collect();
233 let nodes = info
234 .iter()
235 .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
236 id: *id,
237 pk: pk.clone(),
238 weight: (*stake).try_into().expect("stake should fit in u16"),
239 })
240 .collect();
241 let (nodes, t) = match nodes::Nodes::new_reduced(
242 nodes,
243 committee
244 .validity_threshold()
245 .try_into()
246 .expect("validity threshold should fit in u16"),
247 protocol_config.random_beacon_reduction_allowed_delta(),
248 protocol_config
249 .random_beacon_reduction_lower_bound()
250 .try_into()
251 .expect("should fit u16"),
252 ) {
253 Ok((nodes, t)) => (nodes, t),
254 Err(err) => {
255 error!("random beacon: error while initializing Nodes: {err:?}");
256 return None;
257 }
258 };
259 let total_weight = nodes.total_weight();
260 let num_nodes = nodes.num_nodes();
261 let prefix_str = format!(
262 "dkg {} {}",
263 Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
264 committee.epoch()
265 );
266 let randomness_private_key = bls12381::Scalar::from_byte_array(
267 authority_key_pair
268 .copy()
269 .private()
270 .as_bytes()
271 .try_into()
272 .expect("key length should match"),
273 )
274 .expect("should work to convert BLS key to Scalar");
275 let party = match dkg_v1::Party::<PkG, EncG>::new(
276 fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
277 randomness_private_key,
278 ),
279 nodes,
280 t,
281 fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
282 &mut rand::thread_rng(),
283 ) {
284 Ok(party) => party,
285 Err(err) => {
286 error!("random beacon: error while initializing Party: {err:?}");
287 return None;
288 }
289 };
290 info!(
291 "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
292 );
293
294 let highest_completed_round = tables
296 .randomness_highest_completed_round
297 .get(&SINGLETON_KEY)
298 .expect("typed_store should not fail");
299 let mut rm = RandomnessManager {
300 epoch_store: epoch_store_weak,
301 epoch: committee.epoch(),
302 consensus_adapter,
303 network_handle: network_handle.clone(),
304 authority_info,
305 dkg_start_time: OnceCell::new(),
306 party: Arc::new(party),
307 enqueued_messages: BTreeMap::new(),
308 processed_messages: BTreeMap::new(),
309 used_messages: OnceCell::new(),
310 confirmations: BTreeMap::new(),
311 dkg_output: OnceCell::new(),
312 next_randomness_round: RandomnessRound(0),
313 highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
314 };
315 let dkg_output = tables
316 .dkg_output
317 .get(&SINGLETON_KEY)
318 .expect("typed_store should not fail");
319 if let Some(dkg_output) = dkg_output {
320 info!(
321 "random beacon: loaded existing DKG output for epoch {}",
322 committee.epoch()
323 );
324 epoch_store
325 .metrics
326 .epoch_random_beacon_dkg_num_shares
327 .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
328 rm.dkg_output
329 .set(Some(dkg_output.clone()))
330 .expect("setting new OnceCell should succeed");
331 network_handle.update_epoch(
332 committee.epoch(),
333 rm.authority_info.clone(),
334 dkg_output,
335 rm.party.t(),
336 highest_completed_round,
337 );
338 } else {
339 info!(
340 "random beacon: no existing DKG output found for epoch {}",
341 committee.epoch()
342 );
343
344 assert!(
346 epoch_store.protocol_config().dkg_version() > 0,
347 "BUG: DKG version 0 is deprecated"
348 );
349 rm.processed_messages.extend(
350 tables
351 .dkg_processed_messages
352 .safe_iter()
353 .map(|result| result.expect("typed_store should not fail")),
354 );
355 if let Some(used_messages) = tables
356 .dkg_used_messages
357 .get(&SINGLETON_KEY)
358 .expect("typed_store should not fail")
359 {
360 rm.used_messages
361 .set(used_messages.clone())
362 .expect("setting new OnceCell should succeed");
363 }
364 rm.confirmations.extend(
365 tables
366 .dkg_confirmations
367 .safe_iter()
368 .map(|result| result.expect("typed_store should not fail")),
369 );
370 }
371
372 rm.next_randomness_round = tables
377 .randomness_next_round
378 .get(&SINGLETON_KEY)
379 .expect("typed_store should not fail")
380 .unwrap_or(RandomnessRound(0));
381 info!(
382 "random beacon: starting from next_randomness_round={}",
383 rm.next_randomness_round.0
384 );
385 let first_incomplete_round = highest_completed_round
386 .map(|r| r + 1)
387 .unwrap_or(RandomnessRound(0));
388 if first_incomplete_round < rm.next_randomness_round {
389 info!(
390 "random beacon: resuming generation for randomness rounds from {} to {}",
391 first_incomplete_round,
392 rm.next_randomness_round - 1,
393 );
394 for r in first_incomplete_round.0..rm.next_randomness_round.0 {
395 network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
396 }
397 }
398
399 Some(rm)
400 }
401
402 pub async fn start_dkg(&mut self) -> IotaResult {
404 if self.used_messages.initialized() || self.dkg_output.initialized() {
405 return Ok(());
407 }
408
409 let _ = self.dkg_start_time.set(Instant::now());
410
411 let epoch_store = self.epoch_store()?;
412 let dkg_version = epoch_store.protocol_config().dkg_version();
413 info!("random beacon: starting DKG, version {dkg_version}");
414
415 let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
416 Ok(msg) => msg,
417 Err(FastCryptoError::IgnoredMessage) => {
418 info!(
419 "random beacon: no DKG Message for party id={} (zero weight)",
420 self.party.id
421 );
422 return Ok(());
423 }
424 Err(e) => {
425 error!("random beacon: error while creating a DKG Message: {e:?}");
426 return Ok(());
427 }
428 };
429
430 info!("random beacon: created {msg:?} with dkg version {dkg_version}");
431 let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
432
433 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
434 let mut fail_point_skip_sending = false;
435 fail_point_if!("rb-dkg", || {
436 fail_point_skip_sending = true;
438 });
439 if !fail_point_skip_sending {
440 self.consensus_adapter
441 .submit_to_consensus(&[transaction], &epoch_store)?;
442 }
443
444 epoch_store
445 .metrics
446 .epoch_random_beacon_dkg_message_time_ms
447 .set(
448 self.dkg_start_time
449 .get()
450 .unwrap() .elapsed()
452 .as_millis() as i64,
453 );
454 Ok(())
455 }
456
457 pub(crate) async fn advance_dkg(
461 &mut self,
462 consensus_output: &mut ConsensusCommitOutput,
463 round: CommitRound,
464 ) -> IotaResult {
465 let epoch_store = self.epoch_store()?;
466
467 if !self.dkg_output.initialized() && !self.used_messages.initialized() {
469 let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
471 .into_values()
472 .collect();
473 while let Some(res) = handles.next().await {
474 if let Ok(Some(processed)) = res {
475 self.processed_messages
476 .insert(processed.sender(), processed.clone());
477 consensus_output.insert_dkg_processed_message(processed);
478 }
479 }
480
481 match VersionedProcessedMessage::merge(
483 self.party.clone(),
484 self.processed_messages
485 .values()
486 .cloned()
487 .collect::<Vec<_>>(),
488 ) {
489 Ok((conf, used_msgs)) => {
490 info!(
491 "random beacon: sending DKG Confirmation with {} complaints",
492 conf.num_of_complaints()
493 );
494 if self.used_messages.set(used_msgs.clone()).is_err() {
495 error!("BUG: used_messages should only ever be set once");
496 }
497 consensus_output.insert_dkg_used_messages(used_msgs);
498
499 let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
500 epoch_store.name,
501 &conf,
502 );
503
504 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
505 let mut fail_point_skip_sending = false;
506 fail_point_if!("rb-dkg", || {
507 fail_point_skip_sending = true;
509 });
510 if !fail_point_skip_sending {
511 self.consensus_adapter
512 .submit_to_consensus(&[transaction], &epoch_store)?;
513 }
514
515 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
516 if let Some(elapsed) = elapsed {
517 epoch_store
518 .metrics
519 .epoch_random_beacon_dkg_confirmation_time_ms
520 .set(elapsed as i64);
521 }
522 }
523 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
525 }
526 }
527
528 if !self.dkg_output.initialized() && self.used_messages.initialized() {
530 match self
531 .used_messages
532 .get()
533 .expect("checked above that `used_messages` is initialized")
534 .complete_dkg(self.party.clone(), self.confirmations.values())
535 {
536 Ok(output) => {
537 let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
538 let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
539 let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
540 info!(
541 "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
542 );
543 epoch_store
544 .metrics
545 .epoch_random_beacon_dkg_num_shares
546 .set(num_shares as i64);
547 epoch_store
548 .metrics
549 .epoch_random_beacon_dkg_epoch_start_completion_time_ms
550 .set(epoch_elapsed as i64);
551 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
552 if let Some(elapsed) = elapsed {
553 epoch_store
554 .metrics
555 .epoch_random_beacon_dkg_completion_time_ms
556 .set(elapsed as i64);
557 }
558 self.dkg_output
559 .set(Some(output.clone()))
560 .expect("checked above that `dkg_output` is uninitialized");
561 self.network_handle.update_epoch(
562 epoch_store.committee().epoch(),
563 self.authority_info.clone(),
564 output.clone(),
565 self.party.t(),
566 None,
567 );
568 consensus_output.set_dkg_output(output);
569 }
570 Err(FastCryptoError::NotEnoughInputs) => (), Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
572 }
573 }
574
575 if !self.dkg_output.initialized()
577 && round
578 > epoch_store
579 .protocol_config()
580 .random_beacon_dkg_timeout_round()
581 .into()
582 {
583 error!(
584 "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
585 );
586 epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
587 self.dkg_output
588 .set(None)
589 .expect("checked above that `dkg_output` is uninitialized");
590 }
591
592 Ok(())
593 }
594
595 pub fn add_message(
597 &mut self,
598 authority: &AuthorityName,
599 msg: VersionedDkgMessage,
600 ) -> IotaResult {
601 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
605 if !msg.is_valid_version(dkg_version) {
606 warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
607 return Ok(());
608 }
609
610 if self.used_messages.initialized() || self.dkg_output.initialized() {
611 return Ok(());
613 }
614 let Some((_, party_id)) = self.authority_info.get(authority) else {
615 error!("random beacon: received DKG Message from unknown authority: {authority:?}");
616 return Ok(());
617 };
618 if *party_id != msg.sender() {
619 warn!(
620 "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id:?}"
621 );
622 return Ok(());
623 }
624 if self.enqueued_messages.contains_key(&msg.sender())
625 || self.processed_messages.contains_key(&msg.sender())
626 {
627 info!("ignoring duplicate DKG Message from authority {authority:?}");
628 return Ok(());
629 }
630
631 let party = self.party.clone();
632 self.enqueued_messages.insert(
635 msg.sender(),
636 tokio::task::spawn_blocking(move || {
637 match VersionedProcessedMessage::process(party, msg) {
638 Ok(processed) => Some(processed),
639 Err(err) => {
640 debug!("random beacon: error while processing DKG Message: {err:?}");
641 None
642 }
643 }
644 }),
645 );
646 Ok(())
647 }
648
649 pub(crate) fn add_confirmation(
651 &mut self,
652 output: &mut ConsensusCommitOutput,
653 authority: &AuthorityName,
654 conf: VersionedDkgConfirmation,
655 ) -> IotaResult {
656 let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
660 if !conf.is_valid_version(dkg_version) {
661 warn!(
662 "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
663 );
664 return Ok(());
665 }
666
667 if self.dkg_output.initialized() {
668 return Ok(());
670 }
671 let Some((_, party_id)) = self.authority_info.get(authority) else {
672 error!(
673 "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
674 );
675 return Ok(());
676 };
677 if *party_id != conf.sender() {
678 warn!(
679 "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id:?}"
680 );
681 return Ok(());
682 }
683 self.confirmations.insert(conf.sender(), conf.clone());
684 output.insert_dkg_confirmation(conf);
685 Ok(())
686 }
687
688 pub(crate) fn reserve_next_randomness(
695 &mut self,
696 commit_timestamp: CommitTimestampMs,
697 output: &mut ConsensusCommitOutput,
698 ) -> IotaResult<Option<RandomnessRound>> {
699 let epoch_store = self.epoch_store()?;
700
701 let last_round_timestamp = epoch_store
702 .get_randomness_last_round_timestamp()
703 .expect("read should not fail");
704
705 if let Some(last_round_timestamp) = last_round_timestamp {
706 if commit_timestamp - last_round_timestamp
707 < epoch_store
708 .protocol_config()
709 .random_beacon_min_round_interval_ms()
710 {
711 return Ok(None);
712 }
713 }
714
715 let randomness_round = self.next_randomness_round;
716 self.next_randomness_round = self
717 .next_randomness_round
718 .checked_add(1)
719 .ok_or_else(|| IotaError::Unknown("RandomnessRound overflow".to_string()))?;
720
721 output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
722
723 Ok(Some(randomness_round))
724 }
725
726 pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
728 self.network_handle
729 .send_partial_signatures(epoch, randomness_round);
730 }
731
732 pub fn dkg_status(&self) -> DkgStatus {
733 match self.dkg_output.get() {
734 Some(Some(_)) => DkgStatus::Successful,
735 Some(None) => DkgStatus::Failed,
736 None => DkgStatus::Pending,
737 }
738 }
739
740 pub fn reporter(&self) -> RandomnessReporter {
743 RandomnessReporter {
744 epoch_store: self.epoch_store.clone(),
745 epoch: self.epoch,
746 network_handle: self.network_handle.clone(),
747 highest_completed_round: self.highest_completed_round.clone(),
748 }
749 }
750
751 fn epoch_store(&self) -> IotaResult<Arc<AuthorityPerEpochStore>> {
752 self.epoch_store
753 .upgrade()
754 .ok_or(IotaError::EpochEnded(self.epoch))
755 }
756
757 fn randomness_dkg_info_from_committee(
758 committee: &Committee,
759 ) -> Vec<(
760 u16,
761 AuthorityName,
762 fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
763 StakeUnit,
764 )> {
765 committee
766 .members()
767 .map(|(name, stake)| {
768 let index: u16 = committee
769 .authority_index(name)
770 .expect("lookup of known committee member should succeed")
771 .try_into()
772 .expect("authority index should fit in u16");
773 let pk = bls12381::G2Element::from_byte_array(
774 committee
775 .public_key(name)
776 .expect("lookup of known committee member should succeed")
777 .as_bytes()
778 .try_into()
779 .expect("key length should match"),
780 )
781 .expect("should work to convert BLS key to G2Element");
782 (
783 index,
784 *name,
785 fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
786 *stake,
787 )
788 })
789 .collect()
790 }
791}
792
793#[derive(Clone)]
796pub struct RandomnessReporter {
797 epoch_store: Weak<AuthorityPerEpochStore>,
798 epoch: EpochId,
799 network_handle: randomness::Handle,
800 highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
801}
802
803impl RandomnessReporter {
804 pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> IotaResult {
808 let epoch_store = self
809 .epoch_store
810 .upgrade()
811 .ok_or(IotaError::EpochEnded(self.epoch))?;
812 let mut highest_completed_round = self.highest_completed_round.lock();
813 if Some(round) > *highest_completed_round {
814 *highest_completed_round = Some(round);
815 epoch_store
816 .tables()?
817 .randomness_highest_completed_round
818 .insert(&SINGLETON_KEY, &round)?;
819 self.network_handle
820 .complete_round(epoch_store.committee().epoch(), round);
821 }
822 Ok(())
823 }
824}
825
826#[derive(Debug, Clone, Copy, PartialEq, Eq)]
827pub enum DkgStatus {
828 Pending,
829 Failed,
830 Successful,
831}
832
833#[cfg(test)]
834mod tests {
835 use std::num::NonZeroUsize;
836
837 use consensus_core::{BlockRef, BlockStatus};
838 use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
839 use iota_types::messages_consensus::ConsensusTransactionKind;
840 use tokio::sync::mpsc;
841
842 use crate::{
843 authority::{
844 authority_per_epoch_store::{ExecutionIndices, ExecutionIndicesWithStats},
845 test_authority_builder::TestAuthorityBuilder,
846 },
847 checkpoints::CheckpointStore,
848 consensus_adapter::{
849 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
850 MockConsensusClient,
851 },
852 epoch::randomness::*,
853 mock_consensus::with_block_status,
854 };
855
856 #[tokio::test]
857 async fn test_dkg_v1() {
858 test_dkg(1).await;
859 }
860
861 async fn test_dkg(version: u64) {
862 telemetry_subscribers::init_for_testing();
863
864 let network_config =
865 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
866 .committee_size(NonZeroUsize::new(4).unwrap())
867 .with_reference_gas_price(500)
868 .build();
869
870 let mut protocol_config =
871 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
872 protocol_config.set_random_beacon_dkg_version_for_testing(version);
873
874 let mut epoch_stores = Vec::new();
875 let mut randomness_managers = Vec::new();
876 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
877
878 for validator in network_config.validator_configs.iter() {
879 let mut mock_consensus_client = MockConsensusClient::new();
881 let tx_consensus = tx_consensus.clone();
882 mock_consensus_client
883 .expect_submit()
884 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
885 tx_consensus.try_send(transactions.to_vec()).unwrap();
886 true
887 })
888 .returning(|_, _| Ok(with_block_status(BlockStatus::Sequenced(BlockRef::MIN))));
889
890 let state = TestAuthorityBuilder::new()
891 .with_protocol_config(protocol_config.clone())
892 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
893 .build()
894 .await;
895 let consensus_adapter = Arc::new(ConsensusAdapter::new(
896 Arc::new(mock_consensus_client),
897 CheckpointStore::new_for_tests(),
898 state.name,
899 Arc::new(ConnectionMonitorStatusForTests {}),
900 100_000,
901 100_000,
902 None,
903 None,
904 ConsensusAdapterMetrics::new_test(),
905 ));
906 let epoch_store = state.epoch_store_for_testing();
907 let randomness_manager = RandomnessManager::try_new(
908 Arc::downgrade(&epoch_store),
909 Box::new(consensus_adapter.clone()),
910 iota_network::randomness::Handle::new_stub(),
911 validator.authority_key_pair(),
912 )
913 .await
914 .unwrap();
915
916 epoch_stores.push(epoch_store);
917 randomness_managers.push(randomness_manager);
918 }
919
920 let mut dkg_messages = Vec::new();
922 for randomness_manager in randomness_managers.iter_mut() {
923 randomness_manager.start_dkg().await.unwrap();
924
925 let mut dkg_message = rx_consensus.recv().await.unwrap();
926 assert!(dkg_message.len() == 1);
927 match dkg_message.remove(0).kind {
928 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
929 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
930 .expect("DKG message deserialization should not fail");
931 dkg_messages.push(msg);
932 }
933 _ => panic!("wrong type of message sent"),
934 }
935 }
936 for i in 0..randomness_managers.len() {
937 let mut output = ConsensusCommitOutput::new();
938 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
939 index: ExecutionIndices {
940 last_committed_round: 0,
941 ..Default::default()
942 },
943 ..Default::default()
944 });
945 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
946 randomness_managers[i]
947 .add_message(&epoch_stores[j].name, dkg_message)
948 .unwrap();
949 }
950 randomness_managers[i]
951 .advance_dkg(&mut output, 0)
952 .await
953 .unwrap();
954 let mut batch = epoch_stores[i].db_batch_for_test();
955 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
956 batch.write().unwrap();
957 }
958
959 let mut dkg_confirmations = Vec::new();
961 for _ in 0..randomness_managers.len() {
962 let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
963 assert!(dkg_confirmation.len() == 1);
964 match dkg_confirmation.remove(0).kind {
965 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
966 let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
967 .expect("DKG message deserialization should not fail");
968 dkg_confirmations.push(msg);
969 }
970 _ => panic!("wrong type of message sent"),
971 }
972 }
973 for i in 0..randomness_managers.len() {
974 let mut output = ConsensusCommitOutput::new();
975 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
976 index: ExecutionIndices {
977 last_committed_round: 1,
978 ..Default::default()
979 },
980 ..Default::default()
981 });
982 for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
983 randomness_managers[i]
984 .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
985 .unwrap();
986 }
987 randomness_managers[i]
988 .advance_dkg(&mut output, 0)
989 .await
990 .unwrap();
991 let mut batch = epoch_stores[i].db_batch_for_test();
992 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
993 batch.write().unwrap();
994 }
995
996 for randomness_manager in &randomness_managers {
998 assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_dkg_expiration_v1() {
1004 test_dkg_expiration(1).await;
1005 }
1006
1007 async fn test_dkg_expiration(version: u64) {
1008 telemetry_subscribers::init_for_testing();
1009
1010 let network_config =
1011 iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1012 .committee_size(NonZeroUsize::new(4).unwrap())
1013 .with_reference_gas_price(500)
1014 .build();
1015
1016 let mut epoch_stores = Vec::new();
1017 let mut randomness_managers = Vec::new();
1018 let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1019
1020 let mut protocol_config =
1021 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1022 protocol_config.set_random_beacon_dkg_version_for_testing(version);
1023
1024 for validator in network_config.validator_configs.iter() {
1025 let mut mock_consensus_client = MockConsensusClient::new();
1027 let tx_consensus = tx_consensus.clone();
1028 mock_consensus_client
1029 .expect_submit()
1030 .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1031 tx_consensus.try_send(transactions.to_vec()).unwrap();
1032 true
1033 })
1034 .returning(|_, _| {
1035 Ok(with_block_status(consensus_core::BlockStatus::Sequenced(
1036 BlockRef::MIN,
1037 )))
1038 });
1039
1040 let state = TestAuthorityBuilder::new()
1041 .with_protocol_config(protocol_config.clone())
1042 .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1043 .build()
1044 .await;
1045 let consensus_adapter = Arc::new(ConsensusAdapter::new(
1046 Arc::new(mock_consensus_client),
1047 CheckpointStore::new_for_tests(),
1048 state.name,
1049 Arc::new(ConnectionMonitorStatusForTests {}),
1050 100_000,
1051 100_000,
1052 None,
1053 None,
1054 ConsensusAdapterMetrics::new_test(),
1055 ));
1056 let epoch_store = state.epoch_store_for_testing();
1057 let randomness_manager = RandomnessManager::try_new(
1058 Arc::downgrade(&epoch_store),
1059 Box::new(consensus_adapter.clone()),
1060 iota_network::randomness::Handle::new_stub(),
1061 validator.authority_key_pair(),
1062 )
1063 .await
1064 .unwrap();
1065
1066 epoch_stores.push(epoch_store);
1067 randomness_managers.push(randomness_manager);
1068 }
1069
1070 let mut dkg_messages = Vec::new();
1072 for randomness_manager in randomness_managers.iter_mut() {
1073 randomness_manager.start_dkg().await.unwrap();
1074
1075 let mut dkg_message = rx_consensus.recv().await.unwrap();
1076 assert!(dkg_message.len() == 1);
1077 match dkg_message.remove(0).kind {
1078 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1079 let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1080 .expect("DKG message deserialization should not fail");
1081 dkg_messages.push(msg);
1082 }
1083 _ => panic!("wrong type of message sent"),
1084 }
1085 }
1086 for i in 0..randomness_managers.len() {
1087 let mut output = ConsensusCommitOutput::new();
1088 output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1089 index: ExecutionIndices {
1090 last_committed_round: 0,
1091 ..Default::default()
1092 },
1093 ..Default::default()
1094 });
1095 for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1096 randomness_managers[i]
1097 .add_message(&epoch_stores[j].name, dkg_message)
1098 .unwrap();
1099 }
1100 randomness_managers[i]
1101 .advance_dkg(&mut output, u64::MAX)
1102 .await
1103 .unwrap();
1104 let mut batch = epoch_stores[i].db_batch_for_test();
1105 output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1106 batch.write().unwrap();
1107 }
1108
1109 for randomness_manager in &randomness_managers {
1111 assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1112 }
1113 }
1114}