iota_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    sync::{Arc, RwLock},
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::rate_limit;
12use iota_config::p2p::P2pConfig;
13use tap::Pipe;
14use tokio::{
15    sync::{oneshot, watch},
16    task::JoinSet,
17};
18
19use super::{
20    Discovery, DiscoveryEventLoop, DiscoveryServer, State, metrics::Metrics, server::Server,
21};
22use crate::discovery::TrustedPeerChangeEvent;
23
24/// Discovery Service Builder.
25pub struct Builder {
26    config: Option<P2pConfig>,
27    metrics: Option<Metrics>,
28    trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
29}
30
31impl Builder {
32    pub fn new(trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>) -> Self {
33        Self {
34            config: None,
35            metrics: None,
36            trusted_peer_change_rx,
37        }
38    }
39
40    pub fn config(mut self, config: P2pConfig) -> Self {
41        self.config = Some(config);
42        self
43    }
44
45    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
46        self.metrics = Some(Metrics::enabled(registry));
47        self
48    }
49
50    pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
51        let discovery_config = self
52            .config
53            .clone()
54            .and_then(|config| config.discovery)
55            .unwrap_or_default();
56        let (builder, server) = self.build_internal();
57        let mut discovery_server = DiscoveryServer::new(server);
58
59        // Apply rate limits from configuration as needed.
60        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
61            discovery_server = discovery_server.add_layer_for_get_known_peers(
62                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
63                    governor::Quota::per_second(limit),
64                    rate_limit::WaitMode::Block,
65                )),
66            );
67        }
68        (builder, discovery_server)
69    }
70
71    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
72        let Builder {
73            config,
74            metrics,
75            trusted_peer_change_rx,
76        } = self;
77        let config = config.unwrap();
78        let metrics = metrics.unwrap_or_else(Metrics::disabled);
79        let (sender, receiver) = oneshot::channel();
80
81        let handle = Handle {
82            _shutdown_handle: Arc::new(sender),
83        };
84
85        let state = State {
86            our_info: None,
87            connected_peers: HashMap::default(),
88            known_peers: HashMap::default(),
89        }
90        .pipe(RwLock::new)
91        .pipe(Arc::new);
92
93        let server = Server {
94            state: state.clone(),
95        };
96
97        (
98            UnstartedDiscovery {
99                handle,
100                config,
101                shutdown_handle: receiver,
102                state,
103                trusted_peer_change_rx,
104                metrics,
105            },
106            server,
107        )
108    }
109}
110
111/// Handle to an unstarted discovery system
112pub struct UnstartedDiscovery {
113    pub(super) handle: Handle,
114    pub(super) config: P2pConfig,
115    pub(super) shutdown_handle: oneshot::Receiver<()>,
116    pub(super) state: Arc<RwLock<State>>,
117    pub(super) trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
118    pub(super) metrics: Metrics,
119}
120
121impl UnstartedDiscovery {
122    pub(super) fn build(self, network: anemo::Network) -> (DiscoveryEventLoop, Handle) {
123        let Self {
124            handle,
125            config,
126            shutdown_handle,
127            state,
128            trusted_peer_change_rx,
129            metrics,
130        } = self;
131
132        let discovery_config = config.discovery.clone().unwrap_or_default();
133        // Builds the set of allowlisted peers from allowlisted_peers and seed_peers.
134        let allowlisted_peers = Arc::new(
135            discovery_config
136                .allowlisted_peers
137                .clone()
138                .into_iter()
139                .map(|ap| (ap.peer_id, ap.address))
140                .chain(config.seed_peers.iter().filter_map(|peer| {
141                    peer.peer_id
142                        .map(|peer_id| (peer_id, Some(peer.address.clone())))
143                }))
144                .collect::<HashMap<_, _>>(),
145        );
146        (
147            DiscoveryEventLoop {
148                config,
149                discovery_config: Arc::new(discovery_config),
150                allowlisted_peers,
151                network,
152                tasks: JoinSet::new(),
153                pending_dials: Default::default(),
154                dial_seed_peers_task: None,
155                shutdown_handle,
156                state,
157                trusted_peer_change_rx,
158                metrics,
159            },
160            handle,
161        )
162    }
163
164    pub fn start(self, network: anemo::Network) -> Handle {
165        let (event_loop, handle) = self.build(network);
166        tokio::spawn(event_loop.start());
167
168        handle
169    }
170}
171
172/// A Handle to the Discovery subsystem. The Discovery system will be shutdown
173/// once its Handle has been dropped.
174pub struct Handle {
175    _shutdown_handle: Arc<oneshot::Sender<()>>,
176}