1use std::{
6 collections::{BTreeMap, HashMap, HashSet},
7 sync::Arc,
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{auth::RequireAuthorizationLayer, inflight_limit};
12use iota_config::p2p::RandomnessConfig;
13use iota_types::{base_types::AuthorityName, committee::EpochId, crypto::RandomnessRound};
14use tokio::sync::mpsc;
15
16use super::{
17 Handle, RandomnessEventLoop, RandomnessMessage, RandomnessServer, auth::AllowedPeersUpdatable,
18 metrics::Metrics, server::Server,
19};
20
21pub struct Builder {
23 name: AuthorityName,
24 config: Option<RandomnessConfig>,
25 metrics: Option<Metrics>,
26 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
27}
28
29impl Builder {
30 pub fn new(
31 name: AuthorityName,
32 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
33 ) -> Self {
34 Self {
35 name,
36 config: None,
37 metrics: None,
38 randomness_tx,
39 }
40 }
41
42 pub fn config(mut self, config: RandomnessConfig) -> Self {
43 self.config = Some(config);
44 self
45 }
46
47 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
48 self.metrics = Some(Metrics::enabled(registry));
49 self
50 }
51
52 pub fn build(self) -> (UnstartedRandomness, anemo::Router) {
53 let Builder {
54 name,
55 config,
56 metrics,
57 randomness_tx,
58 } = self;
59 let config = config.unwrap_or_default();
60 let metrics = metrics.unwrap_or_else(Metrics::disabled);
61 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
62 let mailbox_sender = sender.downgrade();
63 let handle = Handle {
64 sender: sender.clone(),
65 };
66 let server = Server {
67 sender: sender.downgrade(),
68 };
69 let randomness_server = RandomnessServer::new(server).add_layer_for_send_signatures(
70 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
71 config.send_partial_signatures_inflight_limit(),
72 inflight_limit::WaitMode::ReturnError,
73 )),
74 );
75
76 let allowed_peers = AllowedPeersUpdatable::new(Arc::new(HashSet::new()));
77 let router = anemo::Router::new()
78 .route_layer(RequireAuthorizationLayer::new(allowed_peers.clone()))
79 .add_rpc_service(randomness_server);
80
81 (
82 UnstartedRandomness {
83 name,
84 config,
85 handle,
86 mailbox,
87 mailbox_sender,
88 allowed_peers,
89 metrics,
90 randomness_tx,
91 },
92 router,
93 )
94 }
95}
96
97pub struct UnstartedRandomness {
99 pub(super) name: AuthorityName,
100 pub(super) config: RandomnessConfig,
101 pub(super) handle: Handle,
102 pub(super) mailbox: mpsc::Receiver<RandomnessMessage>,
103 pub(super) mailbox_sender: mpsc::WeakSender<RandomnessMessage>,
104 pub(super) allowed_peers: AllowedPeersUpdatable,
105 pub(super) metrics: Metrics,
106 pub(super) randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
107}
108
109impl UnstartedRandomness {
110 pub(super) fn build(self, network: anemo::Network) -> (RandomnessEventLoop, Handle) {
111 let Self {
112 name,
113 config,
114 handle,
115 mailbox,
116 mailbox_sender,
117 allowed_peers,
118 metrics,
119 randomness_tx,
120 } = self;
121 (
122 RandomnessEventLoop {
123 name,
124 config,
125 mailbox,
126 mailbox_sender,
127 network,
128 allowed_peers,
129 allowed_peers_set: HashSet::new(),
130 metrics,
131 randomness_tx,
132
133 epoch: 0,
134 authority_info: Arc::new(HashMap::new()),
135 peer_share_ids: None,
136 blocked_share_id_count: 0,
137 dkg_output: None,
138 aggregation_threshold: 0,
139 highest_requested_round: BTreeMap::new(),
140 send_tasks: BTreeMap::new(),
141 round_request_time: BTreeMap::new(),
142 future_epoch_partial_sigs: BTreeMap::new(),
143 received_partial_sigs: BTreeMap::new(),
144 completed_sigs: BTreeMap::new(),
145 highest_completed_round: BTreeMap::new(),
146 },
147 handle,
148 )
149 }
150
151 pub fn start(self, network: anemo::Network) -> Handle {
152 let (event_loop, handle) = self.build(network);
153 tokio::spawn(event_loop.start());
154
155 handle
156 }
157}