iota_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{Arc, RwLock},
8    time::Duration,
9};
10
11use anemo::{
12    Network, Peer, PeerId, Request, Response,
13    types::{PeerEvent, PeerInfo},
14};
15use futures::StreamExt;
16use iota_config::p2p::{AccessType, DiscoveryConfig, P2pConfig, SeedPeer};
17use iota_types::multiaddr::Multiaddr;
18use serde::{Deserialize, Serialize};
19use tap::{Pipe, TapFallible};
20use tokio::{
21    sync::{broadcast::error::RecvError, oneshot, watch},
22    task::{AbortHandle, JoinSet},
23};
24use tracing::{debug, info, trace};
25
26const TIMEOUT: Duration = Duration::from_secs(1);
27const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
28
29// Includes the generated Discovery code from the OUT_DIR
30mod generated {
31    include!(concat!(env!("OUT_DIR"), "/iota.Discovery.rs"));
32}
33mod builder;
34mod metrics;
35mod server;
36#[cfg(test)]
37mod tests;
38
39pub use builder::{Builder, Handle, UnstartedDiscovery};
40pub use generated::{
41    discovery_client::DiscoveryClient,
42    discovery_server::{Discovery, DiscoveryServer},
43};
44pub use server::GetKnownPeersResponse;
45
46use self::metrics::Metrics;
47
48/// The internal discovery state shared between the main event loop and the
49/// request handler
50struct State {
51    our_info: Option<NodeInfo>,
52    connected_peers: HashMap<PeerId, ()>,
53    known_peers: HashMap<PeerId, NodeInfo>,
54}
55
56/// The information necessary to dial another peer.
57///
58/// `NodeInfo` contains all the information that is shared with other nodes via
59/// the discovery service to advertise how a node can be reached.
60#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
61pub struct NodeInfo {
62    pub peer_id: PeerId,
63    pub addresses: Vec<Multiaddr>,
64
65    /// Creation time.
66    ///
67    /// This is used to determine which of two NodeInfo's from the same PeerId
68    /// should be retained.
69    pub timestamp_ms: u64,
70
71    pub access_type: AccessType,
72}
73
74#[derive(Clone, Debug, Default)]
75/// Contains a new list of available trusted peers.
76pub struct TrustedPeerChangeEvent {
77    pub new_committee: Vec<PeerInfo>,
78    pub old_committee: Vec<PeerInfo>,
79}
80
81struct DiscoveryEventLoop {
82    config: P2pConfig,
83    discovery_config: Arc<DiscoveryConfig>,
84    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
85    network: Network,
86    tasks: JoinSet<()>,
87    pending_dials: HashMap<PeerId, AbortHandle>,
88    dial_seed_peers_task: Option<AbortHandle>,
89    shutdown_handle: oneshot::Receiver<()>,
90    state: Arc<RwLock<State>>,
91    trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
92    metrics: Metrics,
93}
94
95impl DiscoveryEventLoop {
96    /// Starts the discovery event loop.
97    pub async fn start(mut self) {
98        info!("Discovery started");
99
100        self.construct_our_info();
101        self.configure_preferred_peers();
102
103        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
104        let mut peer_events = {
105            let (subscriber, _peers) = self.network.subscribe().unwrap();
106            subscriber
107        };
108
109        loop {
110            tokio::select! {
111                now = interval.tick() => {
112                    let now_unix = now_unix();
113                    self.handle_tick(now.into_std(), now_unix);
114                }
115                peer_event = peer_events.recv() => {
116                    self.handle_peer_event(peer_event);
117                },
118                // This is signaled when new trusted peer (committee member) is added.
119                Ok(()) = self.trusted_peer_change_rx.changed() => {
120                    let event: TrustedPeerChangeEvent = self.trusted_peer_change_rx.borrow_and_update().clone();
121                    self.handle_trusted_peer_change_event(event);
122                }
123                // Handles the result of a task from tasks.
124                Some(task_result) = self.tasks.join_next() => {
125                    match task_result {
126                        Ok(()) => {},
127                        Err(e) => {
128                            if e.is_cancelled() {
129                                // avoid crashing on ungraceful shutdown.
130                            } else if e.is_panic() {
131                                // propagate panics.
132                                std::panic::resume_unwind(e.into_panic());
133                            } else {
134                                panic!("task failed: {e}");
135                            }
136                        },
137                    };
138                },
139                // Once the shutdown notification is resolved we can terminate the event loop.
140                _ = &mut self.shutdown_handle => {
141                    break;
142                }
143            }
144        }
145
146        info!("Discovery ended");
147    }
148
149    /// Constructs [`NodeInfo`] of the node.
150    fn construct_our_info(&mut self) {
151        if self.state.read().unwrap().our_info.is_some() {
152            return;
153        }
154
155        let address = self
156            .config
157            .external_address
158            .clone()
159            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
160            .into_iter()
161            .collect();
162        let our_info = NodeInfo {
163            peer_id: self.network.peer_id(),
164            addresses: address,
165            timestamp_ms: now_unix(),
166            access_type: self.discovery_config.access_type(),
167        };
168
169        self.state.write().unwrap().our_info = Some(our_info);
170    }
171
172    /// Configures known peers list in [`Network`] using allowlisted peers and
173    /// seed peers.
174    fn configure_preferred_peers(&mut self) {
175        // Iterates over the allowlisted peers and seed peers to check if they have
176        // an address that can be converted to anemo address. If they do, they are added
177        // to the known peers list.
178        for (peer_id, address) in self
179            .discovery_config
180            .allowlisted_peers
181            .iter()
182            .map(|ap| (ap.peer_id, ap.address.clone()))
183            .chain(self.config.seed_peers.iter().filter_map(|sp| {
184                sp.peer_id
185                    .map(|peer_id| (peer_id, Some(sp.address.clone())))
186            }))
187        {
188            let anemo_address = if let Some(address) = address {
189                let Ok(address) = address.to_anemo_address() else {
190                    debug!(p2p_address=?address, "Can't convert p2p address to anemo address");
191                    continue;
192                };
193                Some(address)
194            } else {
195                None
196            };
197
198            // TODO: once we have `PeerAffinity::Allowlisted` we should update allowlisted
199            // peers' affinity.
200            let peer_info = anemo::types::PeerInfo {
201                peer_id,
202                affinity: anemo::types::PeerAffinity::High,
203                address: anemo_address.into_iter().collect(),
204            };
205            debug!(?peer_info, "Add configured preferred peer");
206            self.network.known_peers().insert(peer_info);
207        }
208    }
209
210    fn update_our_info_timestamp(&mut self, now_unix: u64) {
211        if let Some(our_info) = &mut self.state.write().unwrap().our_info {
212            our_info.timestamp_ms = now_unix;
213        }
214    }
215
216    /// Handles a [`TrustedPeerChangeEvent`] by updating the known peers with
217    /// the latest trusted new peers without deleting the allowlisted peers.
218    fn handle_trusted_peer_change_event(
219        &mut self,
220        trusted_peer_change_event: TrustedPeerChangeEvent,
221    ) {
222        let TrustedPeerChangeEvent {
223            new_committee,
224            old_committee,
225        } = trusted_peer_change_event;
226
227        let new_peer_ids = new_committee
228            .iter()
229            .map(|peer| peer.peer_id)
230            .collect::<HashSet<_>>();
231
232        // Remove peers from old_committee who are not in new_committee and are not in
233        // self.allowlisted_peers.
234        let to_remove = old_committee
235            .iter()
236            .map(|peer_info| &peer_info.peer_id)
237            .filter(|old_peer_id| {
238                !new_peer_ids.contains(old_peer_id)
239                    && !self.allowlisted_peers.contains_key(old_peer_id)
240            });
241
242        // Add the new_committee to the known peers skipping self peer.
243        // This will update the PeerInfo for those who are already in the
244        // committee and have updated their PeerInfo.
245        let to_insert = new_committee
246            .into_iter()
247            .filter(|peer_info| !self.network.peer_id().eq(&peer_info.peer_id));
248
249        let (removed, updated_or_inserted) = self
250            .network
251            .known_peers()
252            .batch_update(to_remove, to_insert.clone());
253
254        // Actually removed, may differ from `to_remove`
255        let removed: Vec<_> = removed
256            .into_iter()
257            .filter_map(|removed| removed.map(|info| info.peer_id))
258            .collect();
259        let mut updated = Vec::new();
260        let mut inserted = Vec::new();
261        for (replaced_val, to_insert_val) in updated_or_inserted.into_iter().zip(to_insert) {
262            if replaced_val.is_some() {
263                updated.push(to_insert_val.peer_id);
264            } else {
265                inserted.push(to_insert_val.peer_id);
266            }
267        }
268        debug!(
269            "Trusted peer change event: removed {removed:?}, updated {updated:?}, inserted {inserted:?}",
270        );
271    }
272
273    /// Handles a [`PeerEvent`].
274    ///
275    /// * NewPeer: Adds the peer to the connected peers list and queries the
276    ///   peer for their known peers.
277    /// * LostPeer: Removes the peer from the connected peers list.
278    /// * Closed: Panics if the channel is closed.
279    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
280        match peer_event {
281            Ok(PeerEvent::NewPeer(peer_id)) => {
282                if let Some(peer) = self.network.peer(peer_id) {
283                    // Adds the peer to the connected peers list.
284                    self.state
285                        .write()
286                        .unwrap()
287                        .connected_peers
288                        .insert(peer_id, ());
289
290                    // Queries the new node for any peers.
291                    self.tasks.spawn(query_peer_for_their_known_peers(
292                        peer,
293                        self.state.clone(),
294                        self.metrics.clone(),
295                        self.allowlisted_peers.clone(),
296                    ));
297                }
298            }
299            Ok(PeerEvent::LostPeer(peer_id, _)) => {
300                self.state.write().unwrap().connected_peers.remove(&peer_id);
301            }
302
303            Err(RecvError::Closed) => {
304                panic!("PeerEvent channel shouldn't be able to be closed");
305            }
306
307            Err(RecvError::Lagged(_)) => {
308                trace!("State-Sync fell behind processing PeerEvents");
309            }
310        }
311    }
312
313    /// This function performs several tasks:
314    ///
315    /// 1. Update the timestamp of our own info.
316    /// 2. Queries a subset of connected peers for their known peers.
317    /// 3. Culls old known peers older than a day.
318    /// 4. Cleans out the pending_dials, dial_seed_peers_task if it's done.
319    /// 5. Selects a subset of known peers to dial if we're not connected to
320    ///    enough peers.
321    /// 6. If we have no neighbors and we aren't presently trying to connect to
322    ///    anyone we need to try the seed peers.
323    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
324        self.update_our_info_timestamp(now_unix);
325
326        self.tasks
327            .spawn(query_connected_peers_for_their_known_peers(
328                self.network.clone(),
329                self.discovery_config.clone(),
330                self.state.clone(),
331                self.metrics.clone(),
332                self.allowlisted_peers.clone(),
333            ));
334
335        // Culls old known peers older than a day.
336        self.state
337            .write()
338            .unwrap()
339            .known_peers
340            .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
341
342        // Cleans out the pending_dials.
343        self.pending_dials.retain(|_k, v| !v.is_finished());
344        // Cleans out the dial_seed_peers_task if it's done.
345        if let Some(abort_handle) = &self.dial_seed_peers_task {
346            if abort_handle.is_finished() {
347                self.dial_seed_peers_task = None;
348            }
349        }
350
351        // Selects a subset of known peers to dial if we're not connected to enough
352        // peers.
353        let state = self.state.read().unwrap();
354        let eligible = state
355            .known_peers
356            .clone()
357            .into_iter()
358            .filter(|(peer_id, info)| {
359                peer_id != &self.network.peer_id() &&
360                !info.addresses.is_empty() // Peer has addresses we can dial
361                && !state.connected_peers.contains_key(peer_id) // We're not already connected
362                && !self.pending_dials.contains_key(peer_id) // There is no
363                // pending dial to
364                // this node
365            })
366            .collect::<Vec<_>>();
367
368        // No need to connect to any more peers if we're already connected to a bunch
369        let number_of_connections = state.connected_peers.len();
370        let number_to_dial = std::cmp::min(
371            eligible.len(),
372            self.discovery_config
373                .target_concurrent_connections()
374                .saturating_sub(number_of_connections),
375        );
376
377        // Randomly selects the number_to_dial of peers to connect to.
378        for (peer_id, info) in rand::seq::SliceRandom::choose_multiple(
379            eligible.as_slice(),
380            &mut rand::thread_rng(),
381            number_to_dial,
382        ) {
383            let abort_handle = self.tasks.spawn(try_to_connect_to_peer(
384                self.network.clone(),
385                info.to_owned(),
386            ));
387            self.pending_dials.insert(*peer_id, abort_handle);
388        }
389
390        // If we aren't connected to anything and we aren't presently trying to connect
391        // to anyone we need to try the seed peers
392        if self.dial_seed_peers_task.is_none()
393            && state.connected_peers.is_empty()
394            && self.pending_dials.is_empty()
395            && !self.config.seed_peers.is_empty()
396        {
397            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
398                self.network.clone(),
399                self.discovery_config.clone(),
400                self.config.seed_peers.clone(),
401            ));
402
403            self.dial_seed_peers_task = Some(abort_handle);
404        }
405    }
406}
407
408async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
409    debug!("Connecting to peer {info:?}");
410    for multiaddr in &info.addresses {
411        if let Ok(address) = multiaddr.to_anemo_address() {
412            // Ignore the result and just log the error if there is one
413            if network
414                .connect_with_peer_id(address, info.peer_id)
415                .await
416                .tap_err(|e| {
417                    debug!(
418                        "error dialing {} at address '{}': {e}",
419                        info.peer_id.short_display(4),
420                        multiaddr
421                    )
422                })
423                .is_ok()
424            {
425                return;
426            }
427        }
428    }
429}
430
431async fn try_to_connect_to_seed_peers(
432    network: Network,
433    config: Arc<DiscoveryConfig>,
434    seed_peers: Vec<SeedPeer>,
435) {
436    debug!(?seed_peers, "Connecting to seed peers");
437    let network = &network;
438
439    futures::stream::iter(seed_peers.into_iter().filter_map(|seed| {
440        seed.address
441            .to_anemo_address()
442            .ok()
443            .map(|address| (seed, address))
444    }))
445    .for_each_concurrent(
446        config.target_concurrent_connections(),
447        |(seed, address)| async move {
448            // Ignores the result and just logs the error if there is one.
449            let _ = if let Some(peer_id) = seed.peer_id {
450                network.connect_with_peer_id(address, peer_id).await
451            } else {
452                network.connect(address).await
453            }
454            .tap_err(|e| debug!("error dialing multiaddr '{}': {e}", seed.address));
455        },
456    )
457    .await;
458}
459
460async fn query_peer_for_their_known_peers(
461    peer: Peer,
462    state: Arc<RwLock<State>>,
463    metrics: Metrics,
464    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
465) {
466    let mut client = DiscoveryClient::new(peer);
467
468    let request = Request::new(()).with_timeout(TIMEOUT);
469    if let Some(found_peers) = client
470        .get_known_peers(request)
471        .await
472        .ok()
473        .map(Response::into_inner)
474        .map(
475            |GetKnownPeersResponse {
476                 own_info,
477                 mut known_peers,
478             }| {
479                if !own_info.addresses.is_empty() {
480                    known_peers.push(own_info)
481                }
482                known_peers
483            },
484        )
485    {
486        update_known_peers(state, metrics, found_peers, allowlisted_peers);
487    }
488}
489
490/// Queries a subset of neighbors for their known peers.
491async fn query_connected_peers_for_their_known_peers(
492    network: Network,
493    config: Arc<DiscoveryConfig>,
494    state: Arc<RwLock<State>>,
495    metrics: Metrics,
496    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
497) {
498    use rand::seq::IteratorRandom;
499
500    // Randomly selects a subset of neighbors to query.
501    let peers_to_query = network
502        .peers()
503        .into_iter()
504        .flat_map(|id| network.peer(id))
505        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
506
507    // Queries the selected neighbors for their known peers in parallel.
508    let found_peers = peers_to_query
509        .into_iter()
510        .map(DiscoveryClient::new)
511        .map(|mut client| async move {
512            let request = Request::new(()).with_timeout(TIMEOUT);
513            client
514                .get_known_peers(request)
515                .await
516                .ok()
517                .map(Response::into_inner)
518                .map(
519                    |GetKnownPeersResponse {
520                         own_info,
521                         mut known_peers,
522                     }| {
523                        known_peers.push(own_info);
524                        known_peers
525                    },
526                )
527        })
528        .pipe(futures::stream::iter)
529        .buffer_unordered(config.peers_to_query())
530        .filter_map(std::future::ready)
531        .flat_map(futures::stream::iter)
532        .collect::<Vec<_>>()
533        .await;
534
535    update_known_peers(state, metrics, found_peers, allowlisted_peers);
536}
537
538/// Updates the known peers list with the found peers. The found peer is ignored
539/// if it is too old or too far in the future from our clock.
540/// If a peer is already known, the NodeInfo is updated, otherwise the peer is
541/// inserted.
542fn update_known_peers(
543    state: Arc<RwLock<State>>,
544    metrics: Metrics,
545    found_peers: Vec<NodeInfo>,
546    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
547) {
548    use std::collections::hash_map::Entry;
549
550    let now_unix = now_unix();
551    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
552    let known_peers = &mut state.write().unwrap().known_peers;
553    for peer in found_peers {
554        // Skip peers whose timestamp is too far in the future from our clock
555        // or that are too old
556        if peer.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
557            || now_unix.saturating_sub(peer.timestamp_ms) > ONE_DAY_MILLISECONDS
558        {
559            continue;
560        }
561
562        if peer.peer_id == our_peer_id {
563            continue;
564        }
565
566        // If Peer is Private, and not in our allowlist, skip it.
567        if peer.access_type == AccessType::Private && !allowlisted_peers.contains_key(&peer.peer_id)
568        {
569            continue;
570        }
571
572        match known_peers.entry(peer.peer_id) {
573            // Updates the NodeInfo of the peer if it exists.
574            Entry::Occupied(mut o) => {
575                if peer.timestamp_ms > o.get().timestamp_ms {
576                    if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
577                        metrics.inc_num_peers_with_external_address();
578                    }
579                    if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
580                        metrics.dec_num_peers_with_external_address();
581                    }
582                    o.insert(peer);
583                }
584            }
585            // Inserts the peer if it doesn't exist.
586            Entry::Vacant(v) => {
587                if !peer.addresses.is_empty() {
588                    metrics.inc_num_peers_with_external_address();
589                }
590                v.insert(peer);
591            }
592        }
593    }
594}
595
596fn now_unix() -> u64 {
597    use std::time::{SystemTime, UNIX_EPOCH};
598
599    SystemTime::now()
600        .duration_since(UNIX_EPOCH)
601        .unwrap()
602        .as_millis() as u64
603}