iota_core/
connection_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc, time::Duration};
6
7use anemo::{PeerId, types::PeerEvent};
8use dashmap::DashMap;
9use futures::future;
10use iota_metrics::{metrics_network::NetworkConnectionMetrics, spawn_logged_monitored_task};
11use quinn_proto::ConnectionStats;
12use tokio::{sync::broadcast, task::JoinHandle, time};
13
14const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60);
15
16#[derive(Debug)]
17pub struct ConditionalBroadcastReceiver {
18    pub receiver: broadcast::Receiver<()>,
19}
20
21/// ConditionalBroadcastReceiver has an additional method for convenience to be
22/// able to use to conditionally check for shutdown in all branches of a select
23/// statement. Using this method will allow for the shutdown signal to propagate
24/// faster, since we will no longer be waiting until the branch that checks the
25/// receiver is randomly selected by the select macro.
26impl ConditionalBroadcastReceiver {
27    pub async fn received_signal(&mut self) -> bool {
28        futures::future::poll_immediate(&mut Box::pin(self.receiver.recv()))
29            .await
30            .is_some()
31    }
32}
33
34#[derive(Eq, PartialEq, Clone, Debug)]
35pub enum ConnectionStatus {
36    Connected,
37    Disconnected,
38}
39
40pub struct ConnectionMonitor {
41    network: anemo::NetworkRef,
42    connection_metrics: NetworkConnectionMetrics,
43    peer_id_types: HashMap<PeerId, String>,
44    connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
45    rx_shutdown: Option<ConditionalBroadcastReceiver>,
46}
47
48impl ConnectionMonitor {
49    #[must_use]
50    pub fn spawn(
51        network: anemo::NetworkRef,
52        connection_metrics: NetworkConnectionMetrics,
53        peer_id_types: HashMap<PeerId, String>,
54        rx_shutdown: Option<ConditionalBroadcastReceiver>,
55    ) -> (JoinHandle<()>, Arc<DashMap<PeerId, ConnectionStatus>>) {
56        let connection_statuses_outer = Arc::new(DashMap::new());
57        let connection_statuses = connection_statuses_outer.clone();
58        (
59            spawn_logged_monitored_task!(
60                Self {
61                    network,
62                    connection_metrics,
63                    peer_id_types,
64                    connection_statuses,
65                    rx_shutdown
66                }
67                .run(),
68                "ConnectionMonitor"
69            ),
70            connection_statuses_outer,
71        )
72    }
73
74    async fn run(mut self) {
75        let (mut subscriber, connected_peers) = {
76            if let Some(network) = self.network.upgrade() {
77                let Ok((subscriber, active_peers)) = network.subscribe() else {
78                    return;
79                };
80                (subscriber, active_peers)
81            } else {
82                return;
83            }
84        };
85
86        // we report first all the known peers as disconnected - so we can see
87        // their labels in the metrics reporting tool
88        let mut known_peers = Vec::new();
89        for (peer_id, ty) in &self.peer_id_types {
90            known_peers.push(*peer_id);
91            self.connection_metrics
92                .network_peer_connected
93                .with_label_values(&[&format!("{peer_id}"), ty])
94                .set(0)
95        }
96
97        // now report the connected peers
98        for peer_id in connected_peers.iter() {
99            self.handle_peer_event(PeerEvent::NewPeer(*peer_id)).await;
100        }
101
102        let mut connection_stat_collection_interval =
103            time::interval(CONNECTION_STAT_COLLECTION_INTERVAL);
104
105        async fn wait_for_shutdown(
106            rx_shutdown: &mut Option<ConditionalBroadcastReceiver>,
107        ) -> Result<(), tokio::sync::broadcast::error::RecvError> {
108            if let Some(rx) = rx_shutdown.as_mut() {
109                rx.receiver.recv().await
110            } else {
111                // If no shutdown receiver is provided, wait forever.
112                future::pending::<()>().await;
113                Ok(())
114            }
115        }
116
117        loop {
118            tokio::select! {
119                _ = connection_stat_collection_interval.tick() => {
120                    if let Some(network) = self.network.upgrade() {
121                        self.connection_metrics.socket_receive_buffer_size.set(
122                            network.socket_receive_buf_size() as i64
123                        );
124                        self.connection_metrics.socket_send_buffer_size.set(
125                            network.socket_send_buf_size() as i64
126                        );
127                        for peer_id in known_peers.iter() {
128                            if let Some(connection) = network.peer(*peer_id) {
129                                let stats = connection.connection_stats();
130                                self.update_quinn_metrics_for_peer(&format!("{peer_id}"), &stats);
131                            }
132                        }
133                    } else {
134                        continue;
135                    }
136                }
137                Ok(event) = subscriber.recv() => {
138                    self.handle_peer_event(event).await;
139                }
140                _ = wait_for_shutdown(&mut self.rx_shutdown) => {
141                    return;
142                }
143            }
144        }
145    }
146
147    async fn handle_peer_event(&self, peer_event: PeerEvent) {
148        if let Some(network) = self.network.upgrade() {
149            self.connection_metrics
150                .network_peers
151                .set(network.peers().len() as i64);
152        } else {
153            return;
154        }
155
156        let (peer_id, status, int_status) = match peer_event {
157            PeerEvent::NewPeer(peer_id) => (peer_id, ConnectionStatus::Connected, 1),
158            PeerEvent::LostPeer(peer_id, _) => (peer_id, ConnectionStatus::Disconnected, 0),
159        };
160        self.connection_statuses.insert(peer_id, status);
161
162        // Only report peer IDs for known peers to prevent unlimited cardinality.
163        let peer_id_str = if self.peer_id_types.contains_key(&peer_id) {
164            format!("{peer_id}")
165        } else {
166            "other_peer".to_string()
167        };
168
169        if let Some(ty) = self.peer_id_types.get(&peer_id) {
170            self.connection_metrics
171                .network_peer_connected
172                .with_label_values(&[&peer_id_str, ty])
173                .set(int_status);
174        }
175
176        if let PeerEvent::LostPeer(_, reason) = peer_event {
177            self.connection_metrics
178                .network_peer_disconnects
179                .with_label_values(&[&peer_id_str, &format!("{reason:?}")])
180                .inc();
181        }
182    }
183
184    // TODO: Replace this with ClosureMetric
185    fn update_quinn_metrics_for_peer(&self, peer_id: &str, stats: &ConnectionStats) {
186        // Update PathStats
187        self.connection_metrics
188            .network_peer_rtt
189            .with_label_values(&[peer_id])
190            .set(stats.path.rtt.as_millis() as i64);
191        self.connection_metrics
192            .network_peer_lost_packets
193            .with_label_values(&[peer_id])
194            .set(stats.path.lost_packets as i64);
195        self.connection_metrics
196            .network_peer_lost_bytes
197            .with_label_values(&[peer_id])
198            .set(stats.path.lost_bytes as i64);
199        self.connection_metrics
200            .network_peer_sent_packets
201            .with_label_values(&[peer_id])
202            .set(stats.path.sent_packets as i64);
203        self.connection_metrics
204            .network_peer_congestion_events
205            .with_label_values(&[peer_id])
206            .set(stats.path.congestion_events as i64);
207        self.connection_metrics
208            .network_peer_congestion_window
209            .with_label_values(&[peer_id])
210            .set(stats.path.cwnd as i64);
211
212        // Update FrameStats
213        self.connection_metrics
214            .network_peer_max_data
215            .with_label_values(&[peer_id, "transmitted"])
216            .set(stats.frame_tx.max_data as i64);
217        self.connection_metrics
218            .network_peer_max_data
219            .with_label_values(&[peer_id, "received"])
220            .set(stats.frame_rx.max_data as i64);
221        self.connection_metrics
222            .network_peer_closed_connections
223            .with_label_values(&[peer_id, "transmitted"])
224            .set(stats.frame_tx.connection_close as i64);
225        self.connection_metrics
226            .network_peer_closed_connections
227            .with_label_values(&[peer_id, "received"])
228            .set(stats.frame_rx.connection_close as i64);
229        self.connection_metrics
230            .network_peer_data_blocked
231            .with_label_values(&[peer_id, "transmitted"])
232            .set(stats.frame_tx.data_blocked as i64);
233        self.connection_metrics
234            .network_peer_data_blocked
235            .with_label_values(&[peer_id, "received"])
236            .set(stats.frame_rx.data_blocked as i64);
237
238        // Update UDPStats
239        self.connection_metrics
240            .network_peer_udp_datagrams
241            .with_label_values(&[peer_id, "transmitted"])
242            .set(stats.udp_tx.datagrams as i64);
243        self.connection_metrics
244            .network_peer_udp_datagrams
245            .with_label_values(&[peer_id, "received"])
246            .set(stats.udp_rx.datagrams as i64);
247        self.connection_metrics
248            .network_peer_udp_bytes
249            .with_label_values(&[peer_id, "transmitted"])
250            .set(stats.udp_tx.bytes as i64);
251        self.connection_metrics
252            .network_peer_udp_bytes
253            .with_label_values(&[peer_id, "received"])
254            .set(stats.udp_rx.bytes as i64);
255        self.connection_metrics
256            .network_peer_udp_transmits
257            .with_label_values(&[peer_id, "transmitted"])
258            .set(stats.udp_tx.ios as i64);
259        self.connection_metrics
260            .network_peer_udp_transmits
261            .with_label_values(&[peer_id, "received"])
262            .set(stats.udp_rx.ios as i64);
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use std::{collections::HashMap, convert::Infallible, time::Duration};
269
270    use anemo::{Network, Request, Response};
271    use bytes::Bytes;
272    use iota_metrics::metrics_network::NetworkConnectionMetrics;
273    use prometheus::Registry;
274    use tokio::{
275        sync::{broadcast, broadcast::error::SendError},
276        time::{sleep, timeout},
277    };
278    use tower::util::BoxCloneService;
279
280    use crate::connection_monitor::{
281        ConditionalBroadcastReceiver, ConnectionMonitor, ConnectionStatus,
282    };
283
284    /// PreSubscribedBroadcastSender is a wrapped Broadcast channel that limits
285    /// subscription to initialization time. This is designed to be used for
286    /// cancellation signal to all the components, and the limitation is
287    /// intended to prevent a component missing the shutdown signal due to a
288    /// subscription that happens after the shutdown signal was sent. The
289    /// receivers have a special peek method which can be used to
290    /// conditionally check for shutdown signal on the channel.
291    struct PreSubscribedBroadcastSender {
292        sender: broadcast::Sender<()>,
293        receivers: Vec<ConditionalBroadcastReceiver>,
294    }
295
296    impl PreSubscribedBroadcastSender {
297        fn new(num_subscribers: u64) -> Self {
298            let (tx_init, _) = broadcast::channel(1);
299            let mut receivers = Vec::new();
300            for _i in 0..num_subscribers {
301                receivers.push(ConditionalBroadcastReceiver {
302                    receiver: tx_init.subscribe(),
303                });
304            }
305
306            PreSubscribedBroadcastSender {
307                sender: tx_init,
308                receivers,
309            }
310        }
311
312        fn try_subscribe(&mut self) -> Option<ConditionalBroadcastReceiver> {
313            self.receivers.pop()
314        }
315
316        fn send(&self) -> Result<usize, SendError<()>> {
317            self.sender.send(())
318        }
319    }
320
321    #[tokio::test]
322    async fn test_pre_subscribed_broadcast() {
323        let mut tx_shutdown = PreSubscribedBroadcastSender::new(2);
324        let mut rx_shutdown_a = tx_shutdown.try_subscribe().unwrap();
325
326        let a = tokio::spawn(async move {
327            loop {
328                tokio::select! {
329                    _ = rx_shutdown_a.receiver.recv() => {
330                        return 1
331                    }
332
333                    _ = async{}, if true => {
334                        if rx_shutdown_a.received_signal().await {
335                            return 1
336                        }
337                    }
338                }
339            }
340        });
341
342        let mut rx_shutdown_b = tx_shutdown.try_subscribe().unwrap();
343        let rx_shutdown_c = tx_shutdown.try_subscribe();
344
345        assert!(rx_shutdown_c.is_none());
346
347        // send the shutdown signal before we start component b and started listening
348        // for shutdown there
349        assert!(tx_shutdown.send().is_ok());
350
351        let b = tokio::spawn(async move {
352            loop {
353                tokio::select! {
354                    _ = rx_shutdown_b.receiver.recv() => {
355                        return 2
356                    }
357
358                    _ = async{}, if true => {
359                        if rx_shutdown_b.received_signal().await {
360                            return 2
361                        }
362                    }
363                }
364            }
365        });
366
367        // assert that both component a and b loops have exited, effectively shutting
368        // down
369        assert_eq!(a.await.unwrap() + b.await.unwrap(), 3);
370    }
371
372    #[tokio::test]
373    async fn test_conditional_broadcast_receiver() {
374        let mut tx_shutdown: PreSubscribedBroadcastSender = PreSubscribedBroadcastSender::new(2);
375        let mut rx_shutdown = tx_shutdown.try_subscribe().unwrap();
376
377        let a = tokio::spawn(async move {
378            loop {
379                tokio::select! {
380                    _ = async{}, if true => {
381                        if rx_shutdown.received_signal().await {
382                            return 1
383                        }
384                    }
385                }
386            }
387        });
388
389        assert!(tx_shutdown.send().is_ok());
390
391        assert_eq!(a.await.unwrap(), 1);
392    }
393
394    #[tokio::test]
395    async fn test_connectivity() {
396        // GIVEN
397        let network_1 = build_network().unwrap();
398        let network_2 = build_network().unwrap();
399        let network_3 = build_network().unwrap();
400
401        let registry = Registry::new();
402        let metrics = NetworkConnectionMetrics::new("primary", &registry);
403
404        // AND we connect to peer 2
405        let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap();
406
407        let mut peer_types = HashMap::new();
408        peer_types.insert(network_2.peer_id(), "other_network".to_string());
409        peer_types.insert(network_3.peer_id(), "other_network".to_string());
410
411        // WHEN bring up the monitor
412        let (_h, statuses) =
413            ConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), peer_types, None);
414
415        // THEN peer 2 should be already connected
416        assert_network_peers(metrics.clone(), 1).await;
417
418        // AND we should have collected connection stats
419        let mut labels = HashMap::new();
420        let peer_2_str = format!("{peer_2}");
421        labels.insert("peer_id", peer_2_str.as_str());
422        assert_ne!(
423            metrics
424                .network_peer_rtt
425                .get_metric_with(&labels)
426                .unwrap()
427                .get(),
428            0
429        );
430        assert_eq!(
431            *statuses.get(&peer_2).unwrap().value(),
432            ConnectionStatus::Connected
433        );
434
435        // WHEN connect to peer 3
436        let peer_3 = network_1.connect(network_3.local_addr()).await.unwrap();
437
438        // THEN
439        assert_network_peers(metrics.clone(), 2).await;
440        assert_eq!(
441            *statuses.get(&peer_3).unwrap().value(),
442            ConnectionStatus::Connected
443        );
444
445        // AND disconnect peer 2
446        network_1.disconnect(peer_2).unwrap();
447
448        // THEN
449        assert_network_peers(metrics.clone(), 1).await;
450        assert_eq!(
451            *statuses.get(&peer_2).unwrap().value(),
452            ConnectionStatus::Disconnected
453        );
454
455        // AND disconnect peer 3
456        network_1.disconnect(peer_3).unwrap();
457
458        // THEN
459        assert_network_peers(metrics.clone(), 0).await;
460        assert_eq!(
461            *statuses.get(&peer_3).unwrap().value(),
462            ConnectionStatus::Disconnected
463        );
464    }
465
466    async fn assert_network_peers(metrics: NetworkConnectionMetrics, value: i64) {
467        let m = metrics.clone();
468        timeout(Duration::from_secs(5), async move {
469            while m.network_peers.get() != value {
470                sleep(Duration::from_millis(500)).await;
471            }
472        })
473        .await
474        .unwrap_or_else(|_| {
475            panic!(
476                "Timeout while waiting for connectivity results for value {}",
477                value
478            )
479        });
480
481        assert_eq!(metrics.network_peers.get(), value);
482    }
483
484    fn build_network() -> anyhow::Result<Network> {
485        let network = Network::bind("localhost:0")
486            .private_key(random_private_key())
487            .server_name("test")
488            .start(echo_service())?;
489        Ok(network)
490    }
491
492    fn echo_service() -> BoxCloneService<Request<Bytes>, Response<Bytes>, Infallible> {
493        let handle = move |request: Request<Bytes>| async move {
494            let response = Response::new(request.into_body());
495            Result::<Response<Bytes>, Infallible>::Ok(response)
496        };
497
498        tower::ServiceExt::boxed_clone(tower::service_fn(handle))
499    }
500
501    fn random_private_key() -> [u8; 32] {
502        let mut rng = rand::thread_rng();
503        let mut bytes = [0u8; 32];
504        rand::RngCore::fill_bytes(&mut rng, &mut bytes[..]);
505
506        bytes
507    }
508}