iota_network/discovery/
builder.rs1use std::{
6 collections::HashMap,
7 sync::{Arc, RwLock},
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::rate_limit;
12use fastcrypto::traits::KeyPair;
13use iota_config::p2p::P2pConfig;
14use iota_types::crypto::NetworkKeyPair;
15use tap::Pipe;
16use tokio::{
17 sync::{oneshot, watch},
18 task::JoinSet,
19};
20
21use super::{
22 Discovery, DiscoveryEventLoop, DiscoveryServer, State, metrics::Metrics, server::Server,
23};
24use crate::discovery::TrustedPeerChangeEvent;
25
26pub struct Builder {
28 config: Option<P2pConfig>,
29 metrics: Option<Metrics>,
30 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
31}
32
33impl Builder {
34 pub fn new(trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>) -> Self {
35 Self {
36 config: None,
37 metrics: None,
38 trusted_peer_change_rx,
39 }
40 }
41
42 pub fn config(mut self, config: P2pConfig) -> Self {
43 self.config = Some(config);
44 self
45 }
46
47 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
48 self.metrics = Some(Metrics::enabled(registry));
49 self
50 }
51
52 pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
53 let discovery_config = self
54 .config
55 .clone()
56 .and_then(|config| config.discovery)
57 .unwrap_or_default();
58 let (builder, server) = self.build_internal();
59 let mut discovery_server = DiscoveryServer::new(server);
60
61 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
63 discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
64 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
65 governor::Quota::per_second(limit),
66 rate_limit::WaitMode::Block,
67 )),
68 );
69 }
70 (builder, discovery_server)
71 }
72
73 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
74 let Builder {
75 config,
76 metrics,
77 trusted_peer_change_rx,
78 } = self;
79 let config = config.unwrap();
80 let metrics = metrics.unwrap_or_else(Metrics::disabled);
81 let (sender, receiver) = oneshot::channel();
82
83 let handle = Handle {
84 _shutdown_handle: Arc::new(sender),
85 };
86
87 let state = State {
88 our_info: None,
89 connected_peers: HashMap::default(),
90 known_peers: HashMap::default(),
91 address_verification_cooldown: HashMap::default(),
92 }
93 .pipe(RwLock::new)
94 .pipe(Arc::new);
95
96 let server = Server {
97 state: state.clone(),
98 };
99
100 (
101 UnstartedDiscovery {
102 handle,
103 config,
104 shutdown_handle: receiver,
105 state,
106 trusted_peer_change_rx,
107 metrics,
108 },
109 server,
110 )
111 }
112}
113
114pub struct UnstartedDiscovery {
116 pub(super) handle: Handle,
117 pub(super) config: P2pConfig,
118 pub(super) shutdown_handle: oneshot::Receiver<()>,
119 pub(super) state: Arc<RwLock<State>>,
120 pub(super) trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
121 pub(super) metrics: Metrics,
122}
123
124impl UnstartedDiscovery {
125 pub(super) fn build(
126 self,
127 network: anemo::Network,
128 keypair: NetworkKeyPair,
129 ) -> (DiscoveryEventLoop, Handle) {
130 let Self {
131 handle,
132 config,
133 shutdown_handle,
134 state,
135 trusted_peer_change_rx,
136 metrics,
137 } = self;
138
139 let discovery_config = config.discovery.clone().unwrap_or_default();
140 let allowlisted_peers = Arc::new(
142 discovery_config
143 .allowlisted_peers
144 .clone()
145 .into_iter()
146 .map(|ap| (ap.peer_id, ap.address))
147 .chain(config.seed_peers.iter().filter_map(|peer| {
148 peer.peer_id
149 .map(|peer_id| (peer_id, Some(peer.address.clone())))
150 }))
151 .collect::<HashMap<_, _>>(),
152 );
153 (
154 DiscoveryEventLoop {
155 config,
156 discovery_config: Arc::new(discovery_config),
157 allowlisted_peers,
158 network,
159 keypair,
160 tasks: JoinSet::new(),
161 pending_dials: Default::default(),
162 dial_seed_peers_task: None,
163 shutdown_handle,
164 state,
165 trusted_peer_change_rx,
166 metrics,
167 },
168 handle,
169 )
170 }
171
172 pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
173 assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
174 let (event_loop, handle) = self.build(network, keypair);
175 tokio::spawn(event_loop.start());
176
177 handle
178 }
179}
180
181pub struct Handle {
184 _shutdown_handle: Arc<oneshot::Sender<()>>,
185}