1use std::{
6 collections::{HashMap, HashSet},
7 sync::{Arc, RwLock},
8 time::Duration,
9};
10
11use anemo::{
12 Network, Peer, PeerId, Request, Response,
13 types::{PeerEvent, PeerInfo},
14};
15use futures::StreamExt;
16use iota_config::p2p::{AccessType, DiscoveryConfig, P2pConfig, SeedPeer};
17use iota_types::multiaddr::Multiaddr;
18use serde::{Deserialize, Serialize};
19use tap::{Pipe, TapFallible};
20use tokio::{
21 sync::{broadcast::error::RecvError, oneshot, watch},
22 task::{AbortHandle, JoinSet},
23};
24use tracing::{debug, info, trace};
25
26const TIMEOUT: Duration = Duration::from_secs(1);
27const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
28
29mod generated {
31 include!(concat!(env!("OUT_DIR"), "/iota.Discovery.rs"));
32}
33mod builder;
34mod metrics;
35mod server;
36#[cfg(test)]
37mod tests;
38
39pub use builder::{Builder, Handle, UnstartedDiscovery};
40pub use generated::{
41 discovery_client::DiscoveryClient,
42 discovery_server::{Discovery, DiscoveryServer},
43};
44pub use server::GetKnownPeersResponse;
45
46use self::metrics::Metrics;
47
48struct State {
51 our_info: Option<NodeInfo>,
52 connected_peers: HashMap<PeerId, ()>,
53 known_peers: HashMap<PeerId, NodeInfo>,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
61pub struct NodeInfo {
62 pub peer_id: PeerId,
63 pub addresses: Vec<Multiaddr>,
64
65 pub timestamp_ms: u64,
70
71 pub access_type: AccessType,
72}
73
74#[derive(Clone, Debug, Default)]
75pub struct TrustedPeerChangeEvent {
77 pub new_committee: Vec<PeerInfo>,
78 pub old_committee: Vec<PeerInfo>,
79}
80
81struct DiscoveryEventLoop {
82 config: P2pConfig,
83 discovery_config: Arc<DiscoveryConfig>,
84 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
85 network: Network,
86 tasks: JoinSet<()>,
87 pending_dials: HashMap<PeerId, AbortHandle>,
88 dial_seed_peers_task: Option<AbortHandle>,
89 shutdown_handle: oneshot::Receiver<()>,
90 state: Arc<RwLock<State>>,
91 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
92 metrics: Metrics,
93}
94
95impl DiscoveryEventLoop {
96 pub async fn start(mut self) {
98 info!("Discovery started");
99
100 self.construct_our_info();
101 self.configure_preferred_peers();
102
103 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
104 let mut peer_events = {
105 let (subscriber, _peers) = self.network.subscribe().unwrap();
106 subscriber
107 };
108
109 loop {
110 tokio::select! {
111 now = interval.tick() => {
112 let now_unix = now_unix();
113 self.handle_tick(now.into_std(), now_unix);
114 }
115 peer_event = peer_events.recv() => {
116 self.handle_peer_event(peer_event);
117 },
118 Ok(()) = self.trusted_peer_change_rx.changed() => {
120 let event: TrustedPeerChangeEvent = self.trusted_peer_change_rx.borrow_and_update().clone();
121 self.handle_trusted_peer_change_event(event);
122 }
123 Some(task_result) = self.tasks.join_next() => {
125 match task_result {
126 Ok(()) => {},
127 Err(e) => {
128 if e.is_cancelled() {
129 } else if e.is_panic() {
131 std::panic::resume_unwind(e.into_panic());
133 } else {
134 panic!("task failed: {e}");
135 }
136 },
137 };
138 },
139 _ = &mut self.shutdown_handle => {
141 break;
142 }
143 }
144 }
145
146 info!("Discovery ended");
147 }
148
149 fn construct_our_info(&mut self) {
151 if self.state.read().unwrap().our_info.is_some() {
152 return;
153 }
154
155 let address = self
156 .config
157 .external_address
158 .clone()
159 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
160 .into_iter()
161 .collect();
162 let our_info = NodeInfo {
163 peer_id: self.network.peer_id(),
164 addresses: address,
165 timestamp_ms: now_unix(),
166 access_type: self.discovery_config.access_type(),
167 };
168
169 self.state.write().unwrap().our_info = Some(our_info);
170 }
171
172 fn configure_preferred_peers(&mut self) {
175 for (peer_id, address) in self
179 .discovery_config
180 .allowlisted_peers
181 .iter()
182 .map(|ap| (ap.peer_id, ap.address.clone()))
183 .chain(self.config.seed_peers.iter().filter_map(|sp| {
184 sp.peer_id
185 .map(|peer_id| (peer_id, Some(sp.address.clone())))
186 }))
187 {
188 let anemo_address = if let Some(address) = address {
189 let Ok(address) = address.to_anemo_address() else {
190 debug!(p2p_address=?address, "Can't convert p2p address to anemo address");
191 continue;
192 };
193 Some(address)
194 } else {
195 None
196 };
197
198 let peer_info = anemo::types::PeerInfo {
201 peer_id,
202 affinity: anemo::types::PeerAffinity::High,
203 address: anemo_address.into_iter().collect(),
204 };
205 debug!(?peer_info, "Add configured preferred peer");
206 self.network.known_peers().insert(peer_info);
207 }
208 }
209
210 fn update_our_info_timestamp(&mut self, now_unix: u64) {
211 if let Some(our_info) = &mut self.state.write().unwrap().our_info {
212 our_info.timestamp_ms = now_unix;
213 }
214 }
215
216 fn handle_trusted_peer_change_event(
219 &mut self,
220 trusted_peer_change_event: TrustedPeerChangeEvent,
221 ) {
222 let TrustedPeerChangeEvent {
223 new_committee,
224 old_committee,
225 } = trusted_peer_change_event;
226
227 let new_peer_ids = new_committee
228 .iter()
229 .map(|peer| peer.peer_id)
230 .collect::<HashSet<_>>();
231
232 let to_remove = old_committee
235 .iter()
236 .map(|peer_info| &peer_info.peer_id)
237 .filter(|old_peer_id| {
238 !new_peer_ids.contains(old_peer_id)
239 && !self.allowlisted_peers.contains_key(old_peer_id)
240 });
241
242 let to_insert = new_committee
246 .into_iter()
247 .filter(|peer_info| !self.network.peer_id().eq(&peer_info.peer_id));
248
249 let (removed, updated_or_inserted) = self
250 .network
251 .known_peers()
252 .batch_update(to_remove, to_insert.clone());
253
254 let removed: Vec<_> = removed
256 .into_iter()
257 .filter_map(|removed| removed.map(|info| info.peer_id))
258 .collect();
259 let mut updated = Vec::new();
260 let mut inserted = Vec::new();
261 for (replaced_val, to_insert_val) in updated_or_inserted.into_iter().zip(to_insert) {
262 if replaced_val.is_some() {
263 updated.push(to_insert_val.peer_id);
264 } else {
265 inserted.push(to_insert_val.peer_id);
266 }
267 }
268 debug!(
269 "Trusted peer change event: removed {removed:?}, updated {updated:?}, inserted {inserted:?}",
270 );
271 }
272
273 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
280 match peer_event {
281 Ok(PeerEvent::NewPeer(peer_id)) => {
282 if let Some(peer) = self.network.peer(peer_id) {
283 self.state
285 .write()
286 .unwrap()
287 .connected_peers
288 .insert(peer_id, ());
289
290 self.tasks.spawn(query_peer_for_their_known_peers(
292 peer,
293 self.state.clone(),
294 self.metrics.clone(),
295 self.allowlisted_peers.clone(),
296 ));
297 }
298 }
299 Ok(PeerEvent::LostPeer(peer_id, _)) => {
300 self.state.write().unwrap().connected_peers.remove(&peer_id);
301 }
302
303 Err(RecvError::Closed) => {
304 panic!("PeerEvent channel shouldn't be able to be closed");
305 }
306
307 Err(RecvError::Lagged(_)) => {
308 trace!("State-Sync fell behind processing PeerEvents");
309 }
310 }
311 }
312
313 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
324 self.update_our_info_timestamp(now_unix);
325
326 self.tasks
327 .spawn(query_connected_peers_for_their_known_peers(
328 self.network.clone(),
329 self.discovery_config.clone(),
330 self.state.clone(),
331 self.metrics.clone(),
332 self.allowlisted_peers.clone(),
333 ));
334
335 self.state
337 .write()
338 .unwrap()
339 .known_peers
340 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
341
342 self.pending_dials.retain(|_k, v| !v.is_finished());
344 if let Some(abort_handle) = &self.dial_seed_peers_task {
346 if abort_handle.is_finished() {
347 self.dial_seed_peers_task = None;
348 }
349 }
350
351 let state = self.state.read().unwrap();
354 let eligible = state
355 .known_peers
356 .clone()
357 .into_iter()
358 .filter(|(peer_id, info)| {
359 peer_id != &self.network.peer_id() &&
360 !info.addresses.is_empty() && !state.connected_peers.contains_key(peer_id) && !self.pending_dials.contains_key(peer_id) })
366 .collect::<Vec<_>>();
367
368 let number_of_connections = state.connected_peers.len();
370 let number_to_dial = std::cmp::min(
371 eligible.len(),
372 self.discovery_config
373 .target_concurrent_connections()
374 .saturating_sub(number_of_connections),
375 );
376
377 for (peer_id, info) in rand::seq::SliceRandom::choose_multiple(
379 eligible.as_slice(),
380 &mut rand::thread_rng(),
381 number_to_dial,
382 ) {
383 let abort_handle = self.tasks.spawn(try_to_connect_to_peer(
384 self.network.clone(),
385 info.to_owned(),
386 ));
387 self.pending_dials.insert(*peer_id, abort_handle);
388 }
389
390 if self.dial_seed_peers_task.is_none()
393 && state.connected_peers.is_empty()
394 && self.pending_dials.is_empty()
395 && !self.config.seed_peers.is_empty()
396 {
397 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
398 self.network.clone(),
399 self.discovery_config.clone(),
400 self.config.seed_peers.clone(),
401 ));
402
403 self.dial_seed_peers_task = Some(abort_handle);
404 }
405 }
406}
407
408async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
409 debug!("Connecting to peer {info:?}");
410 for multiaddr in &info.addresses {
411 if let Ok(address) = multiaddr.to_anemo_address() {
412 if network
414 .connect_with_peer_id(address, info.peer_id)
415 .await
416 .tap_err(|e| {
417 debug!(
418 "error dialing {} at address '{}': {e}",
419 info.peer_id.short_display(4),
420 multiaddr
421 )
422 })
423 .is_ok()
424 {
425 return;
426 }
427 }
428 }
429}
430
431async fn try_to_connect_to_seed_peers(
432 network: Network,
433 config: Arc<DiscoveryConfig>,
434 seed_peers: Vec<SeedPeer>,
435) {
436 debug!(?seed_peers, "Connecting to seed peers");
437 let network = &network;
438
439 futures::stream::iter(seed_peers.into_iter().filter_map(|seed| {
440 seed.address
441 .to_anemo_address()
442 .ok()
443 .map(|address| (seed, address))
444 }))
445 .for_each_concurrent(
446 config.target_concurrent_connections(),
447 |(seed, address)| async move {
448 let _ = if let Some(peer_id) = seed.peer_id {
450 network.connect_with_peer_id(address, peer_id).await
451 } else {
452 network.connect(address).await
453 }
454 .tap_err(|e| debug!("error dialing multiaddr '{}': {e}", seed.address));
455 },
456 )
457 .await;
458}
459
460async fn query_peer_for_their_known_peers(
461 peer: Peer,
462 state: Arc<RwLock<State>>,
463 metrics: Metrics,
464 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
465) {
466 let mut client = DiscoveryClient::new(peer);
467
468 let request = Request::new(()).with_timeout(TIMEOUT);
469 if let Some(found_peers) = client
470 .get_known_peers(request)
471 .await
472 .ok()
473 .map(Response::into_inner)
474 .map(
475 |GetKnownPeersResponse {
476 own_info,
477 mut known_peers,
478 }| {
479 if !own_info.addresses.is_empty() {
480 known_peers.push(own_info)
481 }
482 known_peers
483 },
484 )
485 {
486 update_known_peers(state, metrics, found_peers, allowlisted_peers);
487 }
488}
489
490async fn query_connected_peers_for_their_known_peers(
492 network: Network,
493 config: Arc<DiscoveryConfig>,
494 state: Arc<RwLock<State>>,
495 metrics: Metrics,
496 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
497) {
498 use rand::seq::IteratorRandom;
499
500 let peers_to_query = network
502 .peers()
503 .into_iter()
504 .flat_map(|id| network.peer(id))
505 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
506
507 let found_peers = peers_to_query
509 .into_iter()
510 .map(DiscoveryClient::new)
511 .map(|mut client| async move {
512 let request = Request::new(()).with_timeout(TIMEOUT);
513 client
514 .get_known_peers(request)
515 .await
516 .ok()
517 .map(Response::into_inner)
518 .map(
519 |GetKnownPeersResponse {
520 own_info,
521 mut known_peers,
522 }| {
523 known_peers.push(own_info);
524 known_peers
525 },
526 )
527 })
528 .pipe(futures::stream::iter)
529 .buffer_unordered(config.peers_to_query())
530 .filter_map(std::future::ready)
531 .flat_map(futures::stream::iter)
532 .collect::<Vec<_>>()
533 .await;
534
535 update_known_peers(state, metrics, found_peers, allowlisted_peers);
536}
537
538fn update_known_peers(
543 state: Arc<RwLock<State>>,
544 metrics: Metrics,
545 found_peers: Vec<NodeInfo>,
546 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
547) {
548 use std::collections::hash_map::Entry;
549
550 let now_unix = now_unix();
551 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
552 let known_peers = &mut state.write().unwrap().known_peers;
553 for peer in found_peers {
554 if peer.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer.timestamp_ms) > ONE_DAY_MILLISECONDS
558 {
559 continue;
560 }
561
562 if peer.peer_id == our_peer_id {
563 continue;
564 }
565
566 if peer.access_type == AccessType::Private && !allowlisted_peers.contains_key(&peer.peer_id)
568 {
569 continue;
570 }
571
572 match known_peers.entry(peer.peer_id) {
573 Entry::Occupied(mut o) => {
575 if peer.timestamp_ms > o.get().timestamp_ms {
576 if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
577 metrics.inc_num_peers_with_external_address();
578 }
579 if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
580 metrics.dec_num_peers_with_external_address();
581 }
582 o.insert(peer);
583 }
584 }
585 Entry::Vacant(v) => {
587 if !peer.addresses.is_empty() {
588 metrics.inc_num_peers_with_external_address();
589 }
590 v.insert(peer);
591 }
592 }
593 }
594}
595
596fn now_unix() -> u64 {
597 use std::time::{SystemTime, UNIX_EPOCH};
598
599 SystemTime::now()
600 .duration_since(UNIX_EPOCH)
601 .unwrap()
602 .as_millis() as u64
603}