iota_network/discovery/
builder.rs1use std::{
6 collections::HashMap,
7 sync::{Arc, RwLock},
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::rate_limit;
12use iota_config::p2p::P2pConfig;
13use tap::Pipe;
14use tokio::{
15 sync::{oneshot, watch},
16 task::JoinSet,
17};
18
19use super::{
20 Discovery, DiscoveryEventLoop, DiscoveryServer, State, metrics::Metrics, server::Server,
21};
22use crate::discovery::TrustedPeerChangeEvent;
23
24pub struct Builder {
26 config: Option<P2pConfig>,
27 metrics: Option<Metrics>,
28 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
29}
30
31impl Builder {
32 pub fn new(trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>) -> Self {
33 Self {
34 config: None,
35 metrics: None,
36 trusted_peer_change_rx,
37 }
38 }
39
40 pub fn config(mut self, config: P2pConfig) -> Self {
41 self.config = Some(config);
42 self
43 }
44
45 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
46 self.metrics = Some(Metrics::enabled(registry));
47 self
48 }
49
50 pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
51 let discovery_config = self
52 .config
53 .clone()
54 .and_then(|config| config.discovery)
55 .unwrap_or_default();
56 let (builder, server) = self.build_internal();
57 let mut discovery_server = DiscoveryServer::new(server);
58
59 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
61 discovery_server = discovery_server.add_layer_for_get_known_peers(
62 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
63 governor::Quota::per_second(limit),
64 rate_limit::WaitMode::Block,
65 )),
66 );
67 }
68 (builder, discovery_server)
69 }
70
71 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
72 let Builder {
73 config,
74 metrics,
75 trusted_peer_change_rx,
76 } = self;
77 let config = config.unwrap();
78 let metrics = metrics.unwrap_or_else(Metrics::disabled);
79 let (sender, receiver) = oneshot::channel();
80
81 let handle = Handle {
82 _shutdown_handle: Arc::new(sender),
83 };
84
85 let state = State {
86 our_info: None,
87 connected_peers: HashMap::default(),
88 known_peers: HashMap::default(),
89 }
90 .pipe(RwLock::new)
91 .pipe(Arc::new);
92
93 let server = Server {
94 state: state.clone(),
95 };
96
97 (
98 UnstartedDiscovery {
99 handle,
100 config,
101 shutdown_handle: receiver,
102 state,
103 trusted_peer_change_rx,
104 metrics,
105 },
106 server,
107 )
108 }
109}
110
111pub struct UnstartedDiscovery {
113 pub(super) handle: Handle,
114 pub(super) config: P2pConfig,
115 pub(super) shutdown_handle: oneshot::Receiver<()>,
116 pub(super) state: Arc<RwLock<State>>,
117 pub(super) trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
118 pub(super) metrics: Metrics,
119}
120
121impl UnstartedDiscovery {
122 pub(super) fn build(self, network: anemo::Network) -> (DiscoveryEventLoop, Handle) {
123 let Self {
124 handle,
125 config,
126 shutdown_handle,
127 state,
128 trusted_peer_change_rx,
129 metrics,
130 } = self;
131
132 let discovery_config = config.discovery.clone().unwrap_or_default();
133 let allowlisted_peers = Arc::new(
135 discovery_config
136 .allowlisted_peers
137 .clone()
138 .into_iter()
139 .map(|ap| (ap.peer_id, ap.address))
140 .chain(config.seed_peers.iter().filter_map(|peer| {
141 peer.peer_id
142 .map(|peer_id| (peer_id, Some(peer.address.clone())))
143 }))
144 .collect::<HashMap<_, _>>(),
145 );
146 (
147 DiscoveryEventLoop {
148 config,
149 discovery_config: Arc::new(discovery_config),
150 allowlisted_peers,
151 network,
152 tasks: JoinSet::new(),
153 pending_dials: Default::default(),
154 dial_seed_peers_task: None,
155 shutdown_handle,
156 state,
157 trusted_peer_change_rx,
158 metrics,
159 },
160 handle,
161 )
162 }
163
164 pub fn start(self, network: anemo::Network) -> Handle {
165 let (event_loop, handle) = self.build(network);
166 tokio::spawn(event_loop.start());
167
168 handle
169 }
170}
171
172pub struct Handle {
175 _shutdown_handle: Arc<oneshot::Sender<()>>,
176}