iota_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    sync::{Arc, RwLock},
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::rate_limit;
12use fastcrypto::traits::KeyPair;
13use iota_config::p2p::P2pConfig;
14use iota_types::crypto::NetworkKeyPair;
15use tap::Pipe;
16use tokio::{
17    sync::{oneshot, watch},
18    task::JoinSet,
19};
20
21use super::{
22    Discovery, DiscoveryEventLoop, DiscoveryServer, State, metrics::Metrics, server::Server,
23};
24use crate::discovery::TrustedPeerChangeEvent;
25
26/// Discovery Service Builder.
27pub struct Builder {
28    config: Option<P2pConfig>,
29    metrics: Option<Metrics>,
30    trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
31}
32
33impl Builder {
34    pub fn new(trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>) -> Self {
35        Self {
36            config: None,
37            metrics: None,
38            trusted_peer_change_rx,
39        }
40    }
41
42    pub fn config(mut self, config: P2pConfig) -> Self {
43        self.config = Some(config);
44        self
45    }
46
47    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
48        self.metrics = Some(Metrics::enabled(registry));
49        self
50    }
51
52    pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
53        let discovery_config = self
54            .config
55            .clone()
56            .and_then(|config| config.discovery)
57            .unwrap_or_default();
58        let (builder, server) = self.build_internal();
59        let mut discovery_server = DiscoveryServer::new(server);
60
61        // Apply rate limits from configuration as needed.
62        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
63            discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
64                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
65                    governor::Quota::per_second(limit),
66                    rate_limit::WaitMode::Block,
67                )),
68            );
69        }
70        (builder, discovery_server)
71    }
72
73    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
74        let Builder {
75            config,
76            metrics,
77            trusted_peer_change_rx,
78        } = self;
79        let config = config.unwrap();
80        let metrics = metrics.unwrap_or_else(Metrics::disabled);
81        let (sender, receiver) = oneshot::channel();
82
83        let handle = Handle {
84            _shutdown_handle: Arc::new(sender),
85        };
86
87        let state = State {
88            our_info: None,
89            connected_peers: HashMap::default(),
90            known_peers: HashMap::default(),
91            address_verification_cooldown: HashMap::default(),
92        }
93        .pipe(RwLock::new)
94        .pipe(Arc::new);
95
96        let server = Server {
97            state: state.clone(),
98        };
99
100        (
101            UnstartedDiscovery {
102                handle,
103                config,
104                shutdown_handle: receiver,
105                state,
106                trusted_peer_change_rx,
107                metrics,
108            },
109            server,
110        )
111    }
112}
113
114/// Handle to an unstarted discovery system
115pub struct UnstartedDiscovery {
116    pub(super) handle: Handle,
117    pub(super) config: P2pConfig,
118    pub(super) shutdown_handle: oneshot::Receiver<()>,
119    pub(super) state: Arc<RwLock<State>>,
120    pub(super) trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
121    pub(super) metrics: Metrics,
122}
123
124impl UnstartedDiscovery {
125    pub(super) fn build(
126        self,
127        network: anemo::Network,
128        keypair: NetworkKeyPair,
129    ) -> (DiscoveryEventLoop, Handle) {
130        let Self {
131            handle,
132            config,
133            shutdown_handle,
134            state,
135            trusted_peer_change_rx,
136            metrics,
137        } = self;
138
139        let discovery_config = config.discovery.clone().unwrap_or_default();
140        // Builds the set of allowlisted peers from allowlisted_peers and seed_peers.
141        let allowlisted_peers = Arc::new(
142            discovery_config
143                .allowlisted_peers
144                .clone()
145                .into_iter()
146                .map(|ap| (ap.peer_id, ap.address))
147                .chain(config.seed_peers.iter().filter_map(|peer| {
148                    peer.peer_id
149                        .map(|peer_id| (peer_id, Some(peer.address.clone())))
150                }))
151                .collect::<HashMap<_, _>>(),
152        );
153        (
154            DiscoveryEventLoop {
155                config,
156                discovery_config: Arc::new(discovery_config),
157                allowlisted_peers,
158                network,
159                keypair,
160                tasks: JoinSet::new(),
161                pending_dials: Default::default(),
162                dial_seed_peers_task: None,
163                shutdown_handle,
164                state,
165                trusted_peer_change_rx,
166                metrics,
167            },
168            handle,
169        )
170    }
171
172    pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
173        assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
174        let (event_loop, handle) = self.build(network, keypair);
175        tokio::spawn(event_loop.start());
176
177        handle
178    }
179}
180
181/// A Handle to the Discovery subsystem. The Discovery system will be shutdown
182/// once its Handle has been dropped.
183pub struct Handle {
184    _shutdown_handle: Arc<oneshot::Sender<()>>,
185}