Skip to main content

iota_core/epoch/
randomness.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    sync::{Arc, Weak},
8    time::Instant,
9};
10
11use anemo::PeerId;
12use fastcrypto::{
13    encoding::{Encoding, Hex},
14    error::{FastCryptoError, FastCryptoResult},
15    groups::bls12381,
16    serde_helpers::ToFromByteArray,
17    traits::{KeyPair, ToFromBytes},
18};
19use fastcrypto_tbls::{dkg_v1, dkg_v1::Output, nodes, nodes::PartyId};
20use futures::{StreamExt, stream::FuturesUnordered};
21use iota_macros::fail_point_if;
22use iota_network::randomness;
23use iota_sdk_types::RandomnessRound;
24use iota_types::{
25    base_types::{AuthorityName, CommitRound},
26    committee::{Committee, EpochId, StakeUnit},
27    crypto::AuthorityKeyPair,
28    error::{IotaError, IotaResult},
29    iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
30    messages_consensus::{ConsensusTransaction, VersionedDkgConfirmation, VersionedDkgMessage},
31};
32use parking_lot::Mutex;
33use rand::{
34    SeedableRng,
35    rngs::{OsRng, StdRng},
36};
37use serde::{Deserialize, Serialize};
38use tokio::{sync::OnceCell, task::JoinHandle};
39use tracing::{debug, error, info, warn};
40use typed_store::Map;
41
42use crate::{
43    authority::{
44        authority_per_epoch_store::{
45            AuthorityPerEpochStore, consensus_quarantine::ConsensusCommitOutput,
46        },
47        epoch_start_configuration::EpochStartConfigTrait,
48    },
49    consensus_adapter::SubmitToConsensus,
50};
51
52/// The epoch UNIX timestamp in milliseconds
53pub type CommitTimestampMs = u64;
54
55type PkG = bls12381::G2Element;
56type EncG = bls12381::G2Element;
57
58pub const SINGLETON_KEY: u64 = 0;
59
60// Wrappers for DKG messages (to simplify upgrades).
61
62#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
63pub enum VersionedProcessedMessage {
64    V1(dkg_v1::ProcessedMessage<PkG, EncG>),
65}
66
67impl VersionedProcessedMessage {
68    pub fn sender(&self) -> PartyId {
69        match self {
70            VersionedProcessedMessage::V1(msg) => msg.message.sender,
71        }
72    }
73
74    pub fn unwrap_v1(self) -> dkg_v1::ProcessedMessage<PkG, EncG> {
75        match self {
76            VersionedProcessedMessage::V1(msg) => msg,
77        }
78    }
79
80    pub fn process(
81        party: Arc<dkg_v1::Party<PkG, EncG>>,
82        message: VersionedDkgMessage,
83    ) -> FastCryptoResult<VersionedProcessedMessage> {
84        // All inputs are verified in add_message, so we can assume they are of the
85        // correct version.
86        let processed = party.process_message(message.unwrap_v1(), &mut rand::thread_rng())?;
87        Ok(VersionedProcessedMessage::V1(processed))
88    }
89
90    pub fn merge(
91        party: Arc<dkg_v1::Party<PkG, EncG>>,
92        messages: Vec<Self>,
93    ) -> FastCryptoResult<(VersionedDkgConfirmation, VersionedUsedProcessedMessages)> {
94        // All inputs were created by this validator, so we can assume they are of the
95        // correct version.
96        let (conf, msgs) = party.merge(
97            &messages
98                .into_iter()
99                .map(|vm| vm.unwrap_v1())
100                .collect::<Vec<_>>(),
101        )?;
102        Ok((
103            VersionedDkgConfirmation::V1(conf),
104            VersionedUsedProcessedMessages::V1(msgs),
105        ))
106    }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub enum VersionedUsedProcessedMessages {
111    V1(dkg_v1::UsedProcessedMessages<PkG, EncG>),
112}
113
114impl VersionedUsedProcessedMessages {
115    fn complete_dkg<'a, Iter: Iterator<Item = &'a VersionedDkgConfirmation>>(
116        &self,
117        party: Arc<dkg_v1::Party<PkG, EncG>>,
118        confirmations: Iter,
119    ) -> FastCryptoResult<Output<PkG, EncG>> {
120        // All inputs are verified in add_confirmation, so we can assume they are of the
121        // correct version.
122        let rng = &mut StdRng::from_rng(OsRng).expect("RNG construction should not fail");
123        let VersionedUsedProcessedMessages::V1(msg) = self;
124        party.complete(
125            msg,
126            &confirmations
127                .map(|vm| vm.unwrap_v1())
128                .cloned()
129                .collect::<Vec<_>>(),
130            rng,
131        )
132    }
133}
134
135// State machine for randomness DKG and generation.
136//
137// DKG protocol:
138// 1. This validator sends out a `Message` to all other validators.
139// 2. Once sufficient valid `Message`s are received from other validators via
140//    consensus and processed, this validator sends out a `Confirmation` to all
141//    other validators.
142// 3. Once sufficient `Confirmation`s are received from other validators via
143//    consensus and processed, they are combined to form a public VSS key and
144//    local private key shares.
145// 4. Randomness generation begins.
146//
147// Randomness generation:
148// 1. For each new round, AuthorityPerEpochStore eventually calls
149//    `generate_randomness`.
150// 2. This kicks off a process in RandomnessEventLoop to send partial signatures
151//    for the new round to all other validators.
152// 3. Once enough partial signatures for the round are collected, a
153//    RandomnessStateUpdate transaction is generated and injected into the
154//    TransactionManager.
155// 4. Once the RandomnessStateUpdate transaction is seen in a certified
156//    checkpoint, `notify_randomness_in_checkpoint` is called to complete the
157//    round and stop sending partial signatures for it.
158pub struct RandomnessManager {
159    epoch_store: Weak<AuthorityPerEpochStore>,
160    epoch: EpochId,
161    consensus_adapter: Box<dyn SubmitToConsensus>,
162    network_handle: randomness::Handle,
163    authority_info: HashMap<AuthorityName, (PeerId, PartyId)>,
164
165    // State for DKG.
166    dkg_start_time: OnceCell<Instant>,
167    party: Arc<dkg_v1::Party<PkG, EncG>>,
168    enqueued_messages: BTreeMap<PartyId, JoinHandle<Option<VersionedProcessedMessage>>>,
169    processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
170    used_messages: OnceCell<VersionedUsedProcessedMessages>,
171    confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
172    dkg_output: OnceCell<Option<dkg_v1::Output<PkG, EncG>>>,
173
174    // State for randomness generation.
175    next_randomness_round: RandomnessRound,
176    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
177}
178
179impl RandomnessManager {
180    // Returns None in case of invalid input or other failure to initialize DKG.
181    pub async fn try_new(
182        epoch_store_weak: Weak<AuthorityPerEpochStore>,
183        consensus_adapter: Box<dyn SubmitToConsensus>,
184        network_handle: randomness::Handle,
185        authority_key_pair: &AuthorityKeyPair,
186    ) -> Option<Self> {
187        let epoch_store = match epoch_store_weak.upgrade() {
188            Some(epoch_store) => epoch_store,
189            None => {
190                error!(
191                    "could not construct RandomnessManager: AuthorityPerEpochStore already gone"
192                );
193                return None;
194            }
195        };
196        let tables = match epoch_store.tables() {
197            Ok(tables) => tables,
198            Err(_) => {
199                error!(
200                    "could not construct RandomnessManager: AuthorityPerEpochStore tables already gone"
201                );
202                return None;
203            }
204        };
205        let protocol_config = epoch_store.protocol_config();
206
207        let name: AuthorityName = authority_key_pair.public().into();
208        let committee = epoch_store.committee();
209        let info = RandomnessManager::randomness_dkg_info_from_committee(committee);
210        if tracing::enabled!(tracing::Level::DEBUG) {
211            // Log first few entries in DKG info for debugging.
212            for (id, name, pk, stake) in info.iter().filter(|(id, _, _, _)| *id < 3) {
213                let pk_bytes = pk.as_element().to_byte_array();
214                debug!(
215                    "random beacon: DKG info: id={id}, stake={stake}, name={name}, pk={pk_bytes:x?}"
216                );
217            }
218        }
219        let authority_ids: HashMap<_, _> =
220            info.iter().map(|(id, name, _, _)| (*name, *id)).collect();
221        let authority_peer_ids = epoch_store
222            .epoch_start_config()
223            .epoch_start_state()
224            .get_authority_names_to_peer_ids();
225        let authority_info = authority_ids
226            .into_iter()
227            .map(|(name, id)| {
228                let peer_id = *authority_peer_ids
229                    .get(&name)
230                    .expect("authority name should be in peer_ids");
231                (name, (peer_id, id))
232            })
233            .collect();
234        let nodes = info
235            .iter()
236            .map(|(id, _, pk, stake)| nodes::Node::<EncG> {
237                id: *id,
238                pk: pk.clone(),
239                weight: (*stake).try_into().expect("stake should fit in u16"),
240            })
241            .collect();
242        let (nodes, t) = match nodes::Nodes::new_reduced(
243            nodes,
244            committee
245                .validity_threshold()
246                .try_into()
247                .expect("validity threshold should fit in u16"),
248            protocol_config.random_beacon_reduction_allowed_delta(),
249            protocol_config
250                .random_beacon_reduction_lower_bound()
251                .try_into()
252                .expect("should fit u16"),
253        ) {
254            Ok((nodes, t)) => (nodes, t),
255            Err(err) => {
256                error!("random beacon: error while initializing Nodes: {err:?}");
257                return None;
258            }
259        };
260        let total_weight = nodes.total_weight();
261        let num_nodes = nodes.num_nodes();
262        let prefix_str = format!(
263            "dkg {} {}",
264            Hex::encode(epoch_store.get_chain_identifier().as_bytes()),
265            committee.epoch()
266        );
267        let randomness_private_key = bls12381::Scalar::from_byte_array(
268            authority_key_pair
269                .copy()
270                .private()
271                .as_bytes()
272                .try_into()
273                .expect("key length should match"),
274        )
275        .expect("should work to convert BLS key to Scalar");
276        let party = match dkg_v1::Party::<PkG, EncG>::new(
277            fastcrypto_tbls::ecies_v1::PrivateKey::<bls12381::G2Element>::from(
278                randomness_private_key,
279            ),
280            nodes,
281            t,
282            fastcrypto_tbls::random_oracle::RandomOracle::new(prefix_str.as_str()),
283            &mut rand::thread_rng(),
284        ) {
285            Ok(party) => party,
286            Err(err) => {
287                error!("random beacon: error while initializing Party: {err:?}");
288                return None;
289            }
290        };
291        info!(
292            "random beacon: state initialized with authority={name}, total_weight={total_weight}, t={t}, num_nodes={num_nodes}, oracle initial_prefix={prefix_str:?}",
293        );
294
295        // Reset DKG metrics to a clean baseline for this epoch. The success/failure
296        // branches below restore the right value if a persisted verdict exists;
297        // pending starts at 0 across the board.
298        epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
299        epoch_store
300            .metrics
301            .epoch_random_beacon_dkg_num_shares
302            .set(0);
303
304        // Load existing data from store.
305        let highest_completed_round = tables
306            .randomness_highest_completed_round
307            .get(&SINGLETON_KEY)
308            .expect("typed_store should not fail");
309        let mut rm = RandomnessManager {
310            epoch_store: epoch_store_weak,
311            epoch: committee.epoch(),
312            consensus_adapter,
313            network_handle: network_handle.clone(),
314            authority_info,
315            dkg_start_time: OnceCell::new(),
316            party: Arc::new(party),
317            enqueued_messages: BTreeMap::new(),
318            processed_messages: BTreeMap::new(),
319            used_messages: OnceCell::new(),
320            confirmations: BTreeMap::new(),
321            dkg_output: OnceCell::new(),
322            next_randomness_round: RandomnessRound::new(0),
323            highest_completed_round: Arc::new(Mutex::new(highest_completed_round)),
324        };
325        // Load the persisted DKG verdict. Read the v2 table first so a terminal
326        // failure (`Some(None)`) is restored across restarts; fall back to the
327        // legacy `dkg_output` table for stores written before this fix.
328        let dkg_output = match tables
329            .dkg_output_v2
330            .get(&SINGLETON_KEY)
331            .expect("typed_store should not fail")
332        {
333            Some(persisted) => Some(persisted),
334            None => tables
335                .dkg_output
336                .get(&SINGLETON_KEY)
337                .expect("typed_store should not fail")
338                .map(Some),
339        };
340        match dkg_output {
341            Some(Some(dkg_output)) => {
342                info!(
343                    "random beacon: loaded existing DKG output for epoch {}",
344                    committee.epoch()
345                );
346                epoch_store
347                    .metrics
348                    .epoch_random_beacon_dkg_num_shares
349                    .set(dkg_output.shares.as_ref().map_or(0, |shares| shares.len()) as i64);
350                rm.dkg_output
351                    .set(Some(dkg_output.clone()))
352                    .expect("setting new OnceCell should succeed");
353                network_handle.update_epoch(
354                    committee.epoch(),
355                    rm.authority_info.clone(),
356                    dkg_output,
357                    rm.party.t(),
358                    highest_completed_round,
359                );
360            }
361            Some(None) => {
362                // Terminal-failure verdict was persisted to `dkg_output_v2`. Restore
363                // it so DKG isn't re-run and randomness stays disabled for the epoch.
364                error!(
365                    "random beacon: loaded failed DKG for epoch {}. Randomness disabled for this epoch. All randomness-using transactions will fail.",
366                    committee.epoch()
367                );
368                epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
369                rm.dkg_output
370                    .set(None)
371                    .expect("setting new OnceCell should succeed");
372            }
373            None => {
374                info!(
375                    "random beacon: no existing DKG output found for epoch {}",
376                    committee.epoch()
377                );
378
379                // Load intermediate data.
380                assert!(
381                    epoch_store.protocol_config().dkg_version() > 0,
382                    "BUG: DKG version 0 is deprecated"
383                );
384                rm.processed_messages.extend(
385                    tables
386                        .dkg_processed_messages
387                        .safe_iter()
388                        .map(|result| result.expect("typed_store should not fail")),
389                );
390                if let Some(used_messages) = tables
391                    .dkg_used_messages
392                    .get(&SINGLETON_KEY)
393                    .expect("typed_store should not fail")
394                {
395                    rm.used_messages
396                        .set(used_messages)
397                        .expect("setting new OnceCell should succeed");
398                }
399                rm.confirmations.extend(
400                    tables
401                        .dkg_confirmations
402                        .safe_iter()
403                        .map(|result| result.expect("typed_store should not fail")),
404                );
405            }
406        }
407
408        // Resume randomness generation from where we left off.
409        // This must be loaded regardless of whether DKG has finished yet, since the
410        // RandomnessEventLoop and commit-handling logic in AuthorityPerEpochStore both
411        // depend on this state.
412        rm.next_randomness_round = tables
413            .randomness_next_round
414            .get(&SINGLETON_KEY)
415            .expect("typed_store should not fail")
416            .unwrap_or(RandomnessRound::new(0));
417        info!(
418            "random beacon: starting from next_randomness_round={}",
419            rm.next_randomness_round
420        );
421        let first_incomplete_round = highest_completed_round
422            .map(|r| r + 1)
423            .unwrap_or(RandomnessRound::new(0));
424        if first_incomplete_round < rm.next_randomness_round {
425            info!(
426                "random beacon: resuming generation for randomness rounds from {first_incomplete_round} to {}",
427                rm.next_randomness_round - 1,
428            );
429            for r in first_incomplete_round.value()..rm.next_randomness_round.value() {
430                network_handle.send_partial_signatures(committee.epoch(), RandomnessRound::new(r));
431            }
432        }
433
434        Some(rm)
435    }
436
437    /// Sends the initial dkg::Message to begin the randomness DKG protocol.
438    pub async fn start_dkg(&mut self) -> IotaResult {
439        if self.used_messages.initialized() || self.dkg_output.initialized() {
440            // DKG already started (or completed or failed).
441            return Ok(());
442        }
443
444        let _ = self.dkg_start_time.set(Instant::now());
445
446        let epoch_store = self.epoch_store()?;
447        let dkg_version = epoch_store.protocol_config().dkg_version();
448        info!("random beacon: starting DKG, version {dkg_version}");
449
450        let msg = match VersionedDkgMessage::create(dkg_version, self.party.clone()) {
451            Ok(msg) => msg,
452            Err(FastCryptoError::IgnoredMessage) => {
453                info!(
454                    "random beacon: no DKG Message for party id={} (zero weight)",
455                    self.party.id
456                );
457                return Ok(());
458            }
459            Err(e) => {
460                error!("random beacon: error while creating a DKG Message: {e:?}");
461                return Ok(());
462            }
463        };
464
465        info!("random beacon: created {msg:?} with dkg version {dkg_version}");
466        let transaction = ConsensusTransaction::new_randomness_dkg_message(epoch_store.name, &msg);
467
468        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
469        let mut fail_point_skip_sending = false;
470        fail_point_if!("rb-dkg", || {
471            // maybe skip sending in simtests
472            fail_point_skip_sending = true;
473        });
474        if !fail_point_skip_sending {
475            self.consensus_adapter
476                .submit_to_consensus(&[transaction], &epoch_store)?;
477        }
478
479        epoch_store
480            .metrics
481            .epoch_random_beacon_dkg_message_time_ms
482            .set(
483                self.dkg_start_time
484                    .get()
485                    .unwrap() // already set above
486                    .elapsed()
487                    .as_millis() as i64,
488            );
489        Ok(())
490    }
491
492    /// Processes all received messages and advances the randomness DKG state
493    /// machine when possible, sending out a dkg::Confirmation and
494    /// generating final output.
495    pub(crate) async fn advance_dkg(
496        &mut self,
497        consensus_output: &mut ConsensusCommitOutput,
498        round: CommitRound,
499    ) -> IotaResult {
500        let epoch_store = self.epoch_store()?;
501
502        // Once we have enough Messages, send a Confirmation.
503        if !self.dkg_output.initialized() && !self.used_messages.initialized() {
504            // Process all enqueued messages.
505            let mut handles: FuturesUnordered<_> = std::mem::take(&mut self.enqueued_messages)
506                .into_values()
507                .collect();
508            while let Some(res) = handles.next().await {
509                if let Ok(Some(processed)) = res {
510                    self.processed_messages
511                        .insert(processed.sender(), processed.clone());
512                    consensus_output.insert_dkg_processed_message(processed);
513                }
514            }
515
516            // Attempt to generate the Confirmation.
517            match VersionedProcessedMessage::merge(
518                self.party.clone(),
519                self.processed_messages
520                    .values()
521                    .cloned()
522                    .collect::<Vec<_>>(),
523            ) {
524                Ok((conf, used_msgs)) => {
525                    info!(
526                        "random beacon: sending DKG Confirmation with {} complaints",
527                        conf.num_of_complaints()
528                    );
529                    if self.used_messages.set(used_msgs.clone()).is_err() {
530                        error!("BUG: used_messages should only ever be set once");
531                    }
532                    consensus_output.insert_dkg_used_messages(used_msgs);
533
534                    let transaction = ConsensusTransaction::new_randomness_dkg_confirmation(
535                        epoch_store.name,
536                        &conf,
537                    );
538
539                    #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
540                    let mut fail_point_skip_sending = false;
541                    fail_point_if!("rb-dkg", || {
542                        // maybe skip sending in simtests
543                        fail_point_skip_sending = true;
544                    });
545                    if !fail_point_skip_sending {
546                        self.consensus_adapter
547                            .submit_to_consensus(&[transaction], &epoch_store)?;
548                    }
549
550                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
551                    if let Some(elapsed) = elapsed {
552                        epoch_store
553                            .metrics
554                            .epoch_random_beacon_dkg_confirmation_time_ms
555                            .set(elapsed as i64);
556                    }
557                }
558                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
559                Err(e) => debug!("random beacon: error while merging DKG Messages: {e:?}"),
560            }
561        }
562
563        // Once we have enough Confirmations, process them and update shares.
564        if !self.dkg_output.initialized() && self.used_messages.initialized() {
565            match self
566                .used_messages
567                .get()
568                .expect("checked above that `used_messages` is initialized")
569                .complete_dkg(self.party.clone(), self.confirmations.values())
570            {
571                Ok(output) => {
572                    let num_shares = output.shares.as_ref().map_or(0, |shares| shares.len());
573                    let epoch_elapsed = epoch_store.epoch_open_time.elapsed().as_millis();
574                    let elapsed = self.dkg_start_time.get().map(|t| t.elapsed().as_millis());
575                    info!(
576                        "random beacon: DKG complete in {epoch_elapsed}ms since epoch start, {elapsed:?}ms since DKG start, with {num_shares} shares for this node"
577                    );
578                    epoch_store
579                        .metrics
580                        .epoch_random_beacon_dkg_num_shares
581                        .set(num_shares as i64);
582                    epoch_store
583                        .metrics
584                        .epoch_random_beacon_dkg_epoch_start_completion_time_ms
585                        .set(epoch_elapsed as i64);
586                    epoch_store.metrics.epoch_random_beacon_dkg_failed.set(0);
587                    if let Some(elapsed) = elapsed {
588                        epoch_store
589                            .metrics
590                            .epoch_random_beacon_dkg_completion_time_ms
591                            .set(elapsed as i64);
592                    }
593                    self.dkg_output
594                        .set(Some(output.clone()))
595                        .expect("checked above that `dkg_output` is uninitialized");
596                    self.network_handle.update_epoch(
597                        epoch_store.committee().epoch(),
598                        self.authority_info.clone(),
599                        output.clone(),
600                        self.party.t(),
601                        None,
602                    );
603                    consensus_output.set_dkg_output(Some(output));
604                }
605                Err(FastCryptoError::NotEnoughInputs) => (), // wait for more input
606                Err(e) => error!("random beacon: error while processing DKG Confirmations: {e:?}"),
607            }
608        }
609
610        // If we ran out of time, mark DKG as failed.
611        if !self.dkg_output.initialized()
612            && round
613                > epoch_store
614                    .protocol_config()
615                    .random_beacon_dkg_timeout_round() as u64
616        {
617            error!(
618                "random beacon: DKG timed out. Randomness disabled for this epoch. All randomness-using transactions will fail."
619            );
620            epoch_store.metrics.epoch_random_beacon_dkg_failed.set(1);
621            self.dkg_output
622                .set(None)
623                .expect("checked above that `dkg_output` is uninitialized");
624            // Persist the terminal-failure verdict so a restart past the
625            // timeout round doesn't drop back to a "pending" view of DKG and
626            // defer randomness-using transactions indefinitely.
627            consensus_output.set_dkg_output(None);
628        }
629
630        Ok(())
631    }
632
633    /// Adds a received VersionedDkgMessage to the randomness DKG state machine.
634    pub fn add_message(
635        &mut self,
636        authority: &AuthorityName,
637        msg: VersionedDkgMessage,
638    ) -> IotaResult {
639        // message was received from other validators, so we need to ensure it uses a
640        // supported version before we call other functions that assume the
641        // version is correct
642        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
643        if !msg.is_valid_version(dkg_version) {
644            warn!("ignoring DKG Message from authority {authority:?} with unsupported version");
645            return Ok(());
646        }
647
648        if self.used_messages.initialized() || self.dkg_output.initialized() {
649            // We've already sent a `Confirmation`, so we can't add any more messages.
650            return Ok(());
651        }
652        let Some((_, party_id)) = self.authority_info.get(authority) else {
653            error!("random beacon: received DKG Message from unknown authority: {authority:?}");
654            return Ok(());
655        };
656        if *party_id != msg.sender() {
657            warn!(
658                "ignoring equivocating DKG Message from authority {authority:?} pretending to be PartyId {party_id}"
659            );
660            return Ok(());
661        }
662        if self.enqueued_messages.contains_key(&msg.sender())
663            || self.processed_messages.contains_key(&msg.sender())
664        {
665            info!("ignoring duplicate DKG Message from authority {authority:?}");
666            return Ok(());
667        }
668
669        let party = self.party.clone();
670        // TODO: Could save some CPU by not processing messages if we already have
671        // enough to merge.
672        self.enqueued_messages.insert(
673            msg.sender(),
674            tokio::task::spawn_blocking(move || {
675                match VersionedProcessedMessage::process(party, msg) {
676                    Ok(processed) => Some(processed),
677                    Err(err) => {
678                        debug!("random beacon: error while processing DKG Message: {err:?}");
679                        None
680                    }
681                }
682            }),
683        );
684        Ok(())
685    }
686
687    /// Adds a received dkg::Confirmation to the randomness DKG state machine.
688    pub(crate) fn add_confirmation(
689        &mut self,
690        output: &mut ConsensusCommitOutput,
691        authority: &AuthorityName,
692        conf: VersionedDkgConfirmation,
693    ) -> IotaResult {
694        // confirmation was received from other validators, so we need to ensure it uses
695        // a supported version before we call other functions that assume the
696        // version is correct
697        let dkg_version = self.epoch_store()?.protocol_config().dkg_version();
698        if !conf.is_valid_version(dkg_version) {
699            warn!(
700                "ignoring DKG Confirmation from authority {authority:?} with unsupported version"
701            );
702            return Ok(());
703        }
704
705        if self.dkg_output.initialized() {
706            // Once we have completed DKG, no more `Confirmation`s are needed.
707            return Ok(());
708        }
709        let Some((_, party_id)) = self.authority_info.get(authority) else {
710            error!(
711                "random beacon: received DKG Confirmation from unknown authority: {authority:?}"
712            );
713            return Ok(());
714        };
715        if *party_id != conf.sender() {
716            warn!(
717                "ignoring equivocating DKG Confirmation from authority {authority:?} pretending to be PartyId {party_id}"
718            );
719            return Ok(());
720        }
721        self.confirmations.insert(conf.sender(), conf.clone());
722        output.insert_dkg_confirmation(conf);
723        Ok(())
724    }
725
726    /// Reserves the next available round number for randomness generation if
727    /// enough time has elapsed, or returns None if not yet ready (based on
728    /// ProtocolConfig setting). Once the given batch is written,
729    /// `generate_randomness` must be called to start the process. On restart,
730    /// any reserved rounds for which the batch was written will automatically
731    /// be resumed.
732    pub(crate) fn reserve_next_randomness(
733        &mut self,
734        commit_timestamp: CommitTimestampMs,
735        output: &mut ConsensusCommitOutput,
736    ) -> IotaResult<Option<RandomnessRound>> {
737        let epoch_store = self.epoch_store()?;
738
739        let last_round_timestamp = epoch_store
740            .get_randomness_last_round_timestamp()
741            .expect("read should not fail");
742
743        if let Some(last_round_timestamp) = last_round_timestamp {
744            if commit_timestamp - last_round_timestamp
745                < epoch_store
746                    .protocol_config()
747                    .random_beacon_min_round_interval_ms()
748            {
749                return Ok(None);
750            }
751        }
752
753        let randomness_round = self.next_randomness_round;
754        self.next_randomness_round = self
755            .next_randomness_round
756            .checked_add(1)
757            .ok_or_else(|| IotaError::Unknown("RandomnessRound overflow".to_string()))?;
758
759        output.reserve_next_randomness_round(self.next_randomness_round, commit_timestamp);
760
761        Ok(Some(randomness_round))
762    }
763
764    /// Starts the process of generating the given RandomnessRound.
765    pub fn generate_randomness(&self, epoch: EpochId, randomness_round: RandomnessRound) {
766        self.network_handle
767            .send_partial_signatures(epoch, randomness_round);
768    }
769
770    pub fn dkg_status(&self) -> DkgStatus {
771        match self.dkg_output.get() {
772            Some(Some(_)) => DkgStatus::Successful,
773            Some(None) => DkgStatus::Failed,
774            None => DkgStatus::Pending,
775        }
776    }
777
778    /// Generates a new RandomnessReporter for reporting observed rounds to this
779    /// RandomnessManager.
780    pub fn reporter(&self) -> RandomnessReporter {
781        RandomnessReporter {
782            epoch_store: self.epoch_store.clone(),
783            epoch: self.epoch,
784            network_handle: self.network_handle.clone(),
785            highest_completed_round: self.highest_completed_round.clone(),
786        }
787    }
788
789    fn epoch_store(&self) -> IotaResult<Arc<AuthorityPerEpochStore>> {
790        self.epoch_store
791            .upgrade()
792            .ok_or(IotaError::EpochEnded(self.epoch))
793    }
794
795    fn randomness_dkg_info_from_committee(
796        committee: &Committee,
797    ) -> Vec<(
798        u16,
799        AuthorityName,
800        fastcrypto_tbls::ecies_v1::PublicKey<bls12381::G2Element>,
801        StakeUnit,
802    )> {
803        committee
804            .members()
805            .map(|(name, stake)| {
806                let index: u16 = committee
807                    .authority_index(name)
808                    .expect("lookup of known committee member should succeed")
809                    .try_into()
810                    .expect("authority index should fit in u16");
811                let pk = bls12381::G2Element::from_byte_array(
812                    committee
813                        .public_key(name)
814                        .expect("lookup of known committee member should succeed")
815                        .as_bytes()
816                        .try_into()
817                        .expect("key length should match"),
818                )
819                .expect("should work to convert BLS key to G2Element");
820                (
821                    index,
822                    *name,
823                    fastcrypto_tbls::ecies_v1::PublicKey::from(pk),
824                    *stake,
825                )
826            })
827            .collect()
828    }
829}
830
831// Used by other components to notify the randomness system of observed
832// randomness.
833#[derive(Clone)]
834pub struct RandomnessReporter {
835    epoch_store: Weak<AuthorityPerEpochStore>,
836    epoch: EpochId,
837    network_handle: randomness::Handle,
838    highest_completed_round: Arc<Mutex<Option<RandomnessRound>>>,
839}
840
841impl RandomnessReporter {
842    /// Notifies the associated randomness manager that randomness for the given
843    /// round has been durably committed in a checkpoint. This completes the
844    /// process of generating randomness for the round.
845    pub fn notify_randomness_in_checkpoint(&self, round: RandomnessRound) -> IotaResult {
846        let epoch_store = self
847            .epoch_store
848            .upgrade()
849            .ok_or(IotaError::EpochEnded(self.epoch))?;
850        let mut highest_completed_round = self.highest_completed_round.lock();
851        if Some(round) > *highest_completed_round {
852            *highest_completed_round = Some(round);
853            epoch_store
854                .tables()?
855                .randomness_highest_completed_round
856                .insert(&SINGLETON_KEY, &round)?;
857            self.network_handle
858                .complete_round(epoch_store.committee().epoch(), round);
859        }
860        Ok(())
861    }
862}
863
864#[derive(Debug, Clone, Copy, PartialEq, Eq)]
865pub enum DkgStatus {
866    Pending,
867    Failed,
868    Successful,
869}
870
871#[cfg(test)]
872mod tests {
873    use std::num::NonZeroUsize;
874
875    use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
876    use iota_types::{crypto::AuthorityKeyPair, messages_consensus::ConsensusTransactionKind};
877    use starfish_core::{BlockRef, BlockStatus};
878    use tokio::sync::mpsc;
879
880    use crate::{
881        authority::{
882            authority_per_epoch_store::{
883                AuthorityPerEpochStore, ExecutionIndices, ExecutionIndicesWithStats,
884            },
885            test_authority_builder::TestAuthorityBuilder,
886        },
887        checkpoints::CheckpointStore,
888        consensus_adapter::{
889            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
890            MockConsensusClient,
891        },
892        epoch::randomness::*,
893        mock_consensus::with_block_status,
894    };
895
896    #[tokio::test]
897    async fn test_dkg_v1() {
898        test_dkg(1).await;
899    }
900
901    async fn test_dkg(version: u64) {
902        telemetry_subscribers::init_for_testing();
903
904        let network_config =
905            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
906                .committee_size(NonZeroUsize::new(4).unwrap())
907                .with_reference_gas_price(500)
908                .build();
909
910        let mut protocol_config =
911            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
912        protocol_config.set_random_beacon_dkg_version_for_testing(version);
913
914        let mut epoch_stores = Vec::new();
915        let mut randomness_managers = Vec::new();
916        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
917
918        for validator in network_config.validator_configs.iter() {
919            // Send consensus messages to channel.
920            let mut mock_consensus_client = MockConsensusClient::new();
921            let tx_consensus = tx_consensus.clone();
922            mock_consensus_client
923                .expect_submit()
924                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
925                    tx_consensus.try_send(transactions.to_vec()).unwrap();
926                    true
927                })
928                .returning(|_, _| {
929                    Ok(with_block_status(BlockStatus::Sequenced(
930                        starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
931                    )))
932                });
933
934            let state = TestAuthorityBuilder::new()
935                .with_protocol_config(protocol_config.clone())
936                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
937                .build()
938                .await;
939            let consensus_adapter = Arc::new(ConsensusAdapter::new(
940                Arc::new(mock_consensus_client),
941                CheckpointStore::new_for_tests(),
942                state.name,
943                Arc::new(ConnectionMonitorStatusForTests {}),
944                100_000,
945                100_000,
946                None,
947                None,
948                ConsensusAdapterMetrics::new_test(),
949            ));
950            let epoch_store = state.epoch_store_for_testing();
951            let randomness_manager = RandomnessManager::try_new(
952                Arc::downgrade(&epoch_store),
953                Box::new(consensus_adapter.clone()),
954                iota_network::randomness::Handle::new_stub(),
955                validator.authority_key_pair(),
956            )
957            .await
958            .unwrap();
959
960            epoch_stores.push(epoch_store);
961            randomness_managers.push(randomness_manager);
962        }
963
964        // Generate and distribute Messages.
965        let mut dkg_messages = Vec::new();
966        for randomness_manager in randomness_managers.iter_mut() {
967            randomness_manager.start_dkg().await.unwrap();
968
969            let mut dkg_message = rx_consensus.recv().await.unwrap();
970            assert!(dkg_message.len() == 1);
971            match dkg_message.remove(0).kind {
972                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
973                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
974                        .expect("DKG message deserialization should not fail");
975                    dkg_messages.push(msg);
976                }
977                _ => panic!("wrong type of message sent"),
978            }
979        }
980        for i in 0..randomness_managers.len() {
981            let mut output = ConsensusCommitOutput::new(0);
982            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
983                index: ExecutionIndices {
984                    last_committed_round: 0,
985                    ..Default::default()
986                },
987                ..Default::default()
988            });
989            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
990                randomness_managers[i]
991                    .add_message(&epoch_stores[j].name, dkg_message)
992                    .unwrap();
993            }
994            randomness_managers[i]
995                .advance_dkg(&mut output, 0)
996                .await
997                .unwrap();
998            let mut batch = epoch_stores[i].db_batch_for_test();
999            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1000            batch.write().unwrap();
1001        }
1002
1003        // Generate and distribute Confirmations.
1004        let mut dkg_confirmations = Vec::new();
1005        for _ in 0..randomness_managers.len() {
1006            let mut dkg_confirmation = rx_consensus.recv().await.unwrap();
1007            assert!(dkg_confirmation.len() == 1);
1008            match dkg_confirmation.remove(0).kind {
1009                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
1010                    let msg: VersionedDkgConfirmation = bcs::from_bytes(&bytes)
1011                        .expect("DKG message deserialization should not fail");
1012                    dkg_confirmations.push(msg);
1013                }
1014                _ => panic!("wrong type of message sent"),
1015            }
1016        }
1017        for i in 0..randomness_managers.len() {
1018            let mut output = ConsensusCommitOutput::new(0);
1019            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1020                index: ExecutionIndices {
1021                    last_committed_round: 1,
1022                    ..Default::default()
1023                },
1024                ..Default::default()
1025            });
1026            for (j, dkg_confirmation) in dkg_confirmations.iter().cloned().enumerate() {
1027                randomness_managers[i]
1028                    .add_confirmation(&mut output, &epoch_stores[j].name, dkg_confirmation)
1029                    .unwrap();
1030            }
1031            randomness_managers[i]
1032                .advance_dkg(&mut output, 0)
1033                .await
1034                .unwrap();
1035            let mut batch = epoch_stores[i].db_batch_for_test();
1036            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1037            batch.write().unwrap();
1038        }
1039
1040        // Verify DKG completed.
1041        for randomness_manager in &randomness_managers {
1042            assert_eq!(DkgStatus::Successful, randomness_manager.dkg_status());
1043        }
1044    }
1045
1046    #[tokio::test]
1047    async fn test_dkg_expiration_v1() {
1048        test_dkg_expiration(1).await;
1049    }
1050
1051    async fn test_dkg_expiration(version: u64) {
1052        telemetry_subscribers::init_for_testing();
1053
1054        let network_config =
1055            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1056                .committee_size(NonZeroUsize::new(4).unwrap())
1057                .with_reference_gas_price(500)
1058                .build();
1059
1060        let mut epoch_stores = Vec::new();
1061        let mut randomness_managers = Vec::new();
1062        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1063
1064        let mut protocol_config =
1065            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1066        protocol_config.set_random_beacon_dkg_version_for_testing(version);
1067
1068        for validator in network_config.validator_configs.iter() {
1069            // Send consensus messages to channel.
1070            let mut mock_consensus_client = MockConsensusClient::new();
1071            let tx_consensus = tx_consensus.clone();
1072            mock_consensus_client
1073                .expect_submit()
1074                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1075                    tx_consensus.try_send(transactions.to_vec()).unwrap();
1076                    true
1077                })
1078                .returning(|_, _| {
1079                    Ok(with_block_status(starfish_core::BlockStatus::Sequenced(
1080                        starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
1081                    )))
1082                });
1083
1084            let state = TestAuthorityBuilder::new()
1085                .with_protocol_config(protocol_config.clone())
1086                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1087                .build()
1088                .await;
1089            let consensus_adapter = Arc::new(ConsensusAdapter::new(
1090                Arc::new(mock_consensus_client),
1091                CheckpointStore::new_for_tests(),
1092                state.name,
1093                Arc::new(ConnectionMonitorStatusForTests {}),
1094                100_000,
1095                100_000,
1096                None,
1097                None,
1098                ConsensusAdapterMetrics::new_test(),
1099            ));
1100            let epoch_store = state.epoch_store_for_testing();
1101            let randomness_manager = RandomnessManager::try_new(
1102                Arc::downgrade(&epoch_store),
1103                Box::new(consensus_adapter.clone()),
1104                iota_network::randomness::Handle::new_stub(),
1105                validator.authority_key_pair(),
1106            )
1107            .await
1108            .unwrap();
1109
1110            epoch_stores.push(epoch_store);
1111            randomness_managers.push(randomness_manager);
1112        }
1113
1114        // Generate and distribute Messages.
1115        let mut dkg_messages = Vec::new();
1116        for randomness_manager in randomness_managers.iter_mut() {
1117            randomness_manager.start_dkg().await.unwrap();
1118
1119            let mut dkg_message = rx_consensus.recv().await.unwrap();
1120            assert!(dkg_message.len() == 1);
1121            match dkg_message.remove(0).kind {
1122                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1123                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1124                        .expect("DKG message deserialization should not fail");
1125                    dkg_messages.push(msg);
1126                }
1127                _ => panic!("wrong type of message sent"),
1128            }
1129        }
1130        for i in 0..randomness_managers.len() {
1131            let mut output = ConsensusCommitOutput::new(0);
1132            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1133                index: ExecutionIndices {
1134                    last_committed_round: 0,
1135                    ..Default::default()
1136                },
1137                ..Default::default()
1138            });
1139            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1140                randomness_managers[i]
1141                    .add_message(&epoch_stores[j].name, dkg_message)
1142                    .unwrap();
1143            }
1144            randomness_managers[i]
1145                .advance_dkg(&mut output, u64::MAX)
1146                .await
1147                .unwrap();
1148            let mut batch = epoch_stores[i].db_batch_for_test();
1149            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1150            batch.write().unwrap();
1151        }
1152
1153        // Verify DKG failed.
1154        for randomness_manager in &randomness_managers {
1155            assert_eq!(DkgStatus::Failed, randomness_manager.dkg_status());
1156        }
1157
1158        // Simulate a restart: rebuild each manager from its persisted epoch store
1159        // and verify the failure verdict is durably restored (not Pending). This
1160        // exercises the `dkg_output_v2` `Some(None)` failure-load path in
1161        // `try_new`; before the fix the verdict lived only in the in-memory
1162        // `OnceCell`, so a restart re-loaded the manager as Pending and
1163        // randomness-using transactions were deferred forever.
1164        for (epoch_store, validator) in epoch_stores
1165            .iter()
1166            .zip(network_config.validator_configs.iter())
1167        {
1168            let recovered = recover_randomness_manager(
1169                epoch_store,
1170                validator.authority_key_pair(),
1171                tx_consensus.clone(),
1172            )
1173            .await;
1174            assert_eq!(DkgStatus::Failed, recovered.dkg_status());
1175        }
1176    }
1177
1178    /// Rebuilds a `RandomnessManager` from an existing (already-persisted)
1179    /// epoch store, simulating a validator restart. Only on-disk DKG state
1180    /// is carried over: the in-memory state machine starts fresh and must
1181    /// be restored from the persisted tables by `try_new`.
1182    async fn recover_randomness_manager(
1183        epoch_store: &Arc<AuthorityPerEpochStore>,
1184        authority_key_pair: &AuthorityKeyPair,
1185        tx_consensus: mpsc::Sender<Vec<ConsensusTransaction>>,
1186    ) -> RandomnessManager {
1187        let mut mock_consensus_client = MockConsensusClient::new();
1188        mock_consensus_client
1189            .expect_submit()
1190            .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1191                tx_consensus.try_send(transactions.to_vec()).unwrap();
1192                true
1193            })
1194            .returning(|_, _| {
1195                Ok(with_block_status(BlockStatus::Sequenced(
1196                    starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
1197                )))
1198            });
1199        let consensus_adapter = Arc::new(ConsensusAdapter::new(
1200            Arc::new(mock_consensus_client),
1201            CheckpointStore::new_for_tests(),
1202            epoch_store.name,
1203            Arc::new(ConnectionMonitorStatusForTests {}),
1204            100_000,
1205            100_000,
1206            None,
1207            None,
1208            ConsensusAdapterMetrics::new_test(),
1209        ));
1210        RandomnessManager::try_new(
1211            Arc::downgrade(epoch_store),
1212            Box::new(consensus_adapter),
1213            iota_network::randomness::Handle::new_stub(),
1214            authority_key_pair,
1215        )
1216        .await
1217        .unwrap()
1218    }
1219
1220    /// Distributes DKG messages (but no confirmations) so DKG is still pending,
1221    /// then reconstructs every `RandomnessManager` from the same epoch stores
1222    /// and asserts the in-progress state (processed and used messages) is
1223    /// restored and DKG is still reported as pending. This exercises the
1224    /// no-output (`None`) load branch in `try_new`.
1225    #[tokio::test]
1226    async fn test_dkg_recovers_processed_messages_after_restart() {
1227        telemetry_subscribers::init_for_testing();
1228
1229        let network_config =
1230            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
1231                .committee_size(NonZeroUsize::new(4).unwrap())
1232                .with_reference_gas_price(500)
1233                .build();
1234
1235        let mut protocol_config =
1236            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1237        protocol_config.set_random_beacon_dkg_version_for_testing(1);
1238
1239        let mut epoch_stores = Vec::new();
1240        let mut randomness_managers = Vec::new();
1241        let (tx_consensus, mut rx_consensus) = mpsc::channel(100);
1242
1243        for validator in network_config.validator_configs.iter() {
1244            let mut mock_consensus_client = MockConsensusClient::new();
1245            let tx_consensus = tx_consensus.clone();
1246            mock_consensus_client
1247                .expect_submit()
1248                .withf(move |transactions: &[ConsensusTransaction], _epoch_store| {
1249                    tx_consensus.try_send(transactions.to_vec()).unwrap();
1250                    true
1251                })
1252                .returning(|_, _| {
1253                    Ok(with_block_status(BlockStatus::Sequenced(
1254                        starfish_core::GenericTransactionRef::BlockRef(BlockRef::MIN),
1255                    )))
1256                });
1257
1258            let state = TestAuthorityBuilder::new()
1259                .with_protocol_config(protocol_config.clone())
1260                .with_genesis_and_keypair(&network_config.genesis, validator.authority_key_pair())
1261                .build()
1262                .await;
1263            let consensus_adapter = Arc::new(ConsensusAdapter::new(
1264                Arc::new(mock_consensus_client),
1265                CheckpointStore::new_for_tests(),
1266                state.name,
1267                Arc::new(ConnectionMonitorStatusForTests {}),
1268                100_000,
1269                100_000,
1270                None,
1271                None,
1272                ConsensusAdapterMetrics::new_test(),
1273            ));
1274            let epoch_store = state.epoch_store_for_testing();
1275            let randomness_manager = RandomnessManager::try_new(
1276                Arc::downgrade(&epoch_store),
1277                Box::new(consensus_adapter.clone()),
1278                iota_network::randomness::Handle::new_stub(),
1279                validator.authority_key_pair(),
1280            )
1281            .await
1282            .unwrap();
1283
1284            epoch_stores.push(epoch_store);
1285            randomness_managers.push(randomness_manager);
1286        }
1287
1288        // Generate and distribute Messages.
1289        let mut dkg_messages = Vec::new();
1290        for randomness_manager in randomness_managers.iter_mut() {
1291            randomness_manager.start_dkg().await.unwrap();
1292
1293            let mut dkg_message = rx_consensus.recv().await.unwrap();
1294            assert!(dkg_message.len() == 1);
1295            match dkg_message.remove(0).kind {
1296                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
1297                    let msg: VersionedDkgMessage = bcs::from_bytes(&bytes)
1298                        .expect("DKG message deserialization should not fail");
1299                    dkg_messages.push(msg);
1300                }
1301                _ => panic!("wrong type of message sent"),
1302            }
1303        }
1304        for i in 0..randomness_managers.len() {
1305            let mut output = ConsensusCommitOutput::new(0);
1306            output.record_consensus_commit_stats(ExecutionIndicesWithStats {
1307                index: ExecutionIndices {
1308                    last_committed_round: 0,
1309                    ..Default::default()
1310                },
1311                ..Default::default()
1312            });
1313            for (j, dkg_message) in dkg_messages.iter().cloned().enumerate() {
1314                randomness_managers[i]
1315                    .add_message(&epoch_stores[j].name, dkg_message)
1316                    .unwrap();
1317            }
1318            randomness_managers[i]
1319                .advance_dkg(&mut output, 0)
1320                .await
1321                .unwrap();
1322            let mut batch = epoch_stores[i].db_batch_for_test();
1323            output.write_to_batch(&epoch_stores[i], &mut batch).unwrap();
1324            batch.write().unwrap();
1325        }
1326
1327        // All messages processed and a confirmation emitted, but no confirmations
1328        // applied yet: DKG is still pending.
1329        for randomness_manager in &randomness_managers {
1330            assert_eq!(DkgStatus::Pending, randomness_manager.dkg_status());
1331            assert_eq!(
1332                network_config.validator_configs.len(),
1333                randomness_manager.processed_messages.len()
1334            );
1335            assert!(randomness_manager.used_messages.initialized());
1336        }
1337
1338        // Simulate a restart: rebuild each manager and verify the in-progress DKG
1339        // state is restored and still pending.
1340        for (epoch_store, validator) in epoch_stores
1341            .iter()
1342            .zip(network_config.validator_configs.iter())
1343        {
1344            let recovered = recover_randomness_manager(
1345                epoch_store,
1346                validator.authority_key_pair(),
1347                tx_consensus.clone(),
1348            )
1349            .await;
1350            assert_eq!(DkgStatus::Pending, recovered.dkg_status());
1351            assert_eq!(
1352                network_config.validator_configs.len(),
1353                recovered.processed_messages.len()
1354            );
1355            assert!(recovered.used_messages.initialized());
1356        }
1357    }
1358}