1use std::{collections::HashMap, sync::Arc, time::Duration};
6
7use anemo::{PeerId, types::PeerEvent};
8use dashmap::DashMap;
9use futures::future;
10use iota_metrics::{metrics_network::NetworkConnectionMetrics, spawn_logged_monitored_task};
11use quinn_proto::ConnectionStats;
12use tokio::{sync::broadcast, task::JoinHandle, time};
13
14const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60);
15
16#[derive(Debug)]
17pub struct ConditionalBroadcastReceiver {
18 pub receiver: broadcast::Receiver<()>,
19}
20
21impl ConditionalBroadcastReceiver {
27 pub async fn received_signal(&mut self) -> bool {
28 futures::future::poll_immediate(&mut Box::pin(self.receiver.recv()))
29 .await
30 .is_some()
31 }
32}
33
34#[derive(Eq, PartialEq, Clone, Debug)]
35pub enum ConnectionStatus {
36 Connected,
37 Disconnected,
38}
39
40pub struct ConnectionMonitor {
41 network: anemo::NetworkRef,
42 connection_metrics: NetworkConnectionMetrics,
43 peer_id_types: HashMap<PeerId, String>,
44 connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
45 rx_shutdown: Option<ConditionalBroadcastReceiver>,
46}
47
48impl ConnectionMonitor {
49 #[must_use]
50 pub fn spawn(
51 network: anemo::NetworkRef,
52 connection_metrics: NetworkConnectionMetrics,
53 peer_id_types: HashMap<PeerId, String>,
54 rx_shutdown: Option<ConditionalBroadcastReceiver>,
55 ) -> (JoinHandle<()>, Arc<DashMap<PeerId, ConnectionStatus>>) {
56 let connection_statuses_outer = Arc::new(DashMap::new());
57 let connection_statuses = connection_statuses_outer.clone();
58 (
59 spawn_logged_monitored_task!(
60 Self {
61 network,
62 connection_metrics,
63 peer_id_types,
64 connection_statuses,
65 rx_shutdown
66 }
67 .run(),
68 "ConnectionMonitor"
69 ),
70 connection_statuses_outer,
71 )
72 }
73
74 async fn run(mut self) {
75 let (mut subscriber, connected_peers) = {
76 if let Some(network) = self.network.upgrade() {
77 let Ok((subscriber, active_peers)) = network.subscribe() else {
78 return;
79 };
80 (subscriber, active_peers)
81 } else {
82 return;
83 }
84 };
85
86 let mut known_peers = Vec::new();
89 for (peer_id, ty) in &self.peer_id_types {
90 known_peers.push(*peer_id);
91 self.connection_metrics
92 .network_peer_connected
93 .with_label_values(&[&format!("{peer_id}"), ty])
94 .set(0)
95 }
96
97 for peer_id in connected_peers.iter() {
99 self.handle_peer_event(PeerEvent::NewPeer(*peer_id)).await;
100 }
101
102 let mut connection_stat_collection_interval =
103 time::interval(CONNECTION_STAT_COLLECTION_INTERVAL);
104
105 async fn wait_for_shutdown(
106 rx_shutdown: &mut Option<ConditionalBroadcastReceiver>,
107 ) -> Result<(), tokio::sync::broadcast::error::RecvError> {
108 if let Some(rx) = rx_shutdown.as_mut() {
109 rx.receiver.recv().await
110 } else {
111 future::pending::<()>().await;
113 Ok(())
114 }
115 }
116
117 loop {
118 tokio::select! {
119 _ = connection_stat_collection_interval.tick() => {
120 if let Some(network) = self.network.upgrade() {
121 self.connection_metrics.socket_receive_buffer_size.set(
122 network.socket_receive_buf_size() as i64
123 );
124 self.connection_metrics.socket_send_buffer_size.set(
125 network.socket_send_buf_size() as i64
126 );
127 for peer_id in known_peers.iter() {
128 if let Some(connection) = network.peer(*peer_id) {
129 let stats = connection.connection_stats();
130 self.update_quinn_metrics_for_peer(&format!("{peer_id}"), &stats);
131 }
132 }
133 } else {
134 continue;
135 }
136 }
137 Ok(event) = subscriber.recv() => {
138 self.handle_peer_event(event).await;
139 }
140 _ = wait_for_shutdown(&mut self.rx_shutdown) => {
141 return;
142 }
143 }
144 }
145 }
146
147 async fn handle_peer_event(&self, peer_event: PeerEvent) {
148 if let Some(network) = self.network.upgrade() {
149 self.connection_metrics
150 .network_peers
151 .set(network.peers().len() as i64);
152 } else {
153 return;
154 }
155
156 let (peer_id, status, int_status) = match peer_event {
157 PeerEvent::NewPeer(peer_id) => (peer_id, ConnectionStatus::Connected, 1),
158 PeerEvent::LostPeer(peer_id, _) => (peer_id, ConnectionStatus::Disconnected, 0),
159 };
160 self.connection_statuses.insert(peer_id, status);
161
162 let peer_id_str = if self.peer_id_types.contains_key(&peer_id) {
164 format!("{peer_id}")
165 } else {
166 "other_peer".to_string()
167 };
168
169 if let Some(ty) = self.peer_id_types.get(&peer_id) {
170 self.connection_metrics
171 .network_peer_connected
172 .with_label_values(&[&peer_id_str, ty])
173 .set(int_status);
174 }
175
176 if let PeerEvent::LostPeer(_, reason) = peer_event {
177 self.connection_metrics
178 .network_peer_disconnects
179 .with_label_values(&[&peer_id_str, &format!("{reason:?}")])
180 .inc();
181 }
182 }
183
184 fn update_quinn_metrics_for_peer(&self, peer_id: &str, stats: &ConnectionStats) {
186 self.connection_metrics
188 .network_peer_rtt
189 .with_label_values(&[peer_id])
190 .set(stats.path.rtt.as_millis() as i64);
191 self.connection_metrics
192 .network_peer_lost_packets
193 .with_label_values(&[peer_id])
194 .set(stats.path.lost_packets as i64);
195 self.connection_metrics
196 .network_peer_lost_bytes
197 .with_label_values(&[peer_id])
198 .set(stats.path.lost_bytes as i64);
199 self.connection_metrics
200 .network_peer_sent_packets
201 .with_label_values(&[peer_id])
202 .set(stats.path.sent_packets as i64);
203 self.connection_metrics
204 .network_peer_congestion_events
205 .with_label_values(&[peer_id])
206 .set(stats.path.congestion_events as i64);
207 self.connection_metrics
208 .network_peer_congestion_window
209 .with_label_values(&[peer_id])
210 .set(stats.path.cwnd as i64);
211
212 self.connection_metrics
214 .network_peer_max_data
215 .with_label_values(&[peer_id, "transmitted"])
216 .set(stats.frame_tx.max_data as i64);
217 self.connection_metrics
218 .network_peer_max_data
219 .with_label_values(&[peer_id, "received"])
220 .set(stats.frame_rx.max_data as i64);
221 self.connection_metrics
222 .network_peer_closed_connections
223 .with_label_values(&[peer_id, "transmitted"])
224 .set(stats.frame_tx.connection_close as i64);
225 self.connection_metrics
226 .network_peer_closed_connections
227 .with_label_values(&[peer_id, "received"])
228 .set(stats.frame_rx.connection_close as i64);
229 self.connection_metrics
230 .network_peer_data_blocked
231 .with_label_values(&[peer_id, "transmitted"])
232 .set(stats.frame_tx.data_blocked as i64);
233 self.connection_metrics
234 .network_peer_data_blocked
235 .with_label_values(&[peer_id, "received"])
236 .set(stats.frame_rx.data_blocked as i64);
237
238 self.connection_metrics
240 .network_peer_udp_datagrams
241 .with_label_values(&[peer_id, "transmitted"])
242 .set(stats.udp_tx.datagrams as i64);
243 self.connection_metrics
244 .network_peer_udp_datagrams
245 .with_label_values(&[peer_id, "received"])
246 .set(stats.udp_rx.datagrams as i64);
247 self.connection_metrics
248 .network_peer_udp_bytes
249 .with_label_values(&[peer_id, "transmitted"])
250 .set(stats.udp_tx.bytes as i64);
251 self.connection_metrics
252 .network_peer_udp_bytes
253 .with_label_values(&[peer_id, "received"])
254 .set(stats.udp_rx.bytes as i64);
255 self.connection_metrics
256 .network_peer_udp_transmits
257 .with_label_values(&[peer_id, "transmitted"])
258 .set(stats.udp_tx.ios as i64);
259 self.connection_metrics
260 .network_peer_udp_transmits
261 .with_label_values(&[peer_id, "received"])
262 .set(stats.udp_rx.ios as i64);
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use std::{collections::HashMap, convert::Infallible, time::Duration};
269
270 use anemo::{Network, Request, Response};
271 use bytes::Bytes;
272 use iota_metrics::metrics_network::NetworkConnectionMetrics;
273 use prometheus::Registry;
274 use tokio::{
275 sync::{broadcast, broadcast::error::SendError},
276 time::{sleep, timeout},
277 };
278 use tower::util::BoxCloneService;
279
280 use crate::connection_monitor::{
281 ConditionalBroadcastReceiver, ConnectionMonitor, ConnectionStatus,
282 };
283
284 struct PreSubscribedBroadcastSender {
292 sender: broadcast::Sender<()>,
293 receivers: Vec<ConditionalBroadcastReceiver>,
294 }
295
296 impl PreSubscribedBroadcastSender {
297 fn new(num_subscribers: u64) -> Self {
298 let (tx_init, _) = broadcast::channel(1);
299 let mut receivers = Vec::new();
300 for _i in 0..num_subscribers {
301 receivers.push(ConditionalBroadcastReceiver {
302 receiver: tx_init.subscribe(),
303 });
304 }
305
306 PreSubscribedBroadcastSender {
307 sender: tx_init,
308 receivers,
309 }
310 }
311
312 fn try_subscribe(&mut self) -> Option<ConditionalBroadcastReceiver> {
313 self.receivers.pop()
314 }
315
316 fn send(&self) -> Result<usize, SendError<()>> {
317 self.sender.send(())
318 }
319 }
320
321 #[tokio::test]
322 async fn test_pre_subscribed_broadcast() {
323 let mut tx_shutdown = PreSubscribedBroadcastSender::new(2);
324 let mut rx_shutdown_a = tx_shutdown.try_subscribe().unwrap();
325
326 let a = tokio::spawn(async move {
327 loop {
328 tokio::select! {
329 _ = rx_shutdown_a.receiver.recv() => {
330 return 1
331 }
332
333 _ = async{}, if true => {
334 if rx_shutdown_a.received_signal().await {
335 return 1
336 }
337 }
338 }
339 }
340 });
341
342 let mut rx_shutdown_b = tx_shutdown.try_subscribe().unwrap();
343 let rx_shutdown_c = tx_shutdown.try_subscribe();
344
345 assert!(rx_shutdown_c.is_none());
346
347 assert!(tx_shutdown.send().is_ok());
350
351 let b = tokio::spawn(async move {
352 loop {
353 tokio::select! {
354 _ = rx_shutdown_b.receiver.recv() => {
355 return 2
356 }
357
358 _ = async{}, if true => {
359 if rx_shutdown_b.received_signal().await {
360 return 2
361 }
362 }
363 }
364 }
365 });
366
367 assert_eq!(a.await.unwrap() + b.await.unwrap(), 3);
370 }
371
372 #[tokio::test]
373 async fn test_conditional_broadcast_receiver() {
374 let mut tx_shutdown: PreSubscribedBroadcastSender = PreSubscribedBroadcastSender::new(2);
375 let mut rx_shutdown = tx_shutdown.try_subscribe().unwrap();
376
377 let a = tokio::spawn(async move {
378 loop {
379 tokio::select! {
380 _ = async{}, if true => {
381 if rx_shutdown.received_signal().await {
382 return 1
383 }
384 }
385 }
386 }
387 });
388
389 assert!(tx_shutdown.send().is_ok());
390
391 assert_eq!(a.await.unwrap(), 1);
392 }
393
394 #[tokio::test]
395 async fn test_connectivity() {
396 let network_1 = build_network().unwrap();
398 let network_2 = build_network().unwrap();
399 let network_3 = build_network().unwrap();
400
401 let registry = Registry::new();
402 let metrics = NetworkConnectionMetrics::new("primary", ®istry);
403
404 let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap();
406
407 let mut peer_types = HashMap::new();
408 peer_types.insert(network_2.peer_id(), "other_network".to_string());
409 peer_types.insert(network_3.peer_id(), "other_network".to_string());
410
411 let (_h, statuses) =
413 ConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), peer_types, None);
414
415 assert_network_peers(metrics.clone(), 1).await;
417
418 let mut labels = HashMap::new();
420 let peer_2_str = format!("{peer_2}");
421 labels.insert("peer_id", peer_2_str.as_str());
422 assert_ne!(
423 metrics
424 .network_peer_rtt
425 .get_metric_with(&labels)
426 .unwrap()
427 .get(),
428 0
429 );
430 assert_eq!(
431 *statuses.get(&peer_2).unwrap().value(),
432 ConnectionStatus::Connected
433 );
434
435 let peer_3 = network_1.connect(network_3.local_addr()).await.unwrap();
437
438 assert_network_peers(metrics.clone(), 2).await;
440 assert_eq!(
441 *statuses.get(&peer_3).unwrap().value(),
442 ConnectionStatus::Connected
443 );
444
445 network_1.disconnect(peer_2).unwrap();
447
448 assert_network_peers(metrics.clone(), 1).await;
450 assert_eq!(
451 *statuses.get(&peer_2).unwrap().value(),
452 ConnectionStatus::Disconnected
453 );
454
455 network_1.disconnect(peer_3).unwrap();
457
458 assert_network_peers(metrics.clone(), 0).await;
460 assert_eq!(
461 *statuses.get(&peer_3).unwrap().value(),
462 ConnectionStatus::Disconnected
463 );
464 }
465
466 async fn assert_network_peers(metrics: NetworkConnectionMetrics, value: i64) {
467 let m = metrics.clone();
468 timeout(Duration::from_secs(5), async move {
469 while m.network_peers.get() != value {
470 sleep(Duration::from_millis(500)).await;
471 }
472 })
473 .await
474 .unwrap_or_else(|_| {
475 panic!(
476 "Timeout while waiting for connectivity results for value {}",
477 value
478 )
479 });
480
481 assert_eq!(metrics.network_peers.get(), value);
482 }
483
484 fn build_network() -> anyhow::Result<Network> {
485 let network = Network::bind("localhost:0")
486 .private_key(random_private_key())
487 .server_name("test")
488 .start(echo_service())?;
489 Ok(network)
490 }
491
492 fn echo_service() -> BoxCloneService<Request<Bytes>, Response<Bytes>, Infallible> {
493 let handle = move |request: Request<Bytes>| async move {
494 let response = Response::new(request.into_body());
495 Result::<Response<Bytes>, Infallible>::Ok(response)
496 };
497
498 tower::ServiceExt::boxed_clone(tower::service_fn(handle))
499 }
500
501 fn random_private_key() -> [u8; 32] {
502 let mut rng = rand::thread_rng();
503 let mut bytes = [0u8; 32];
504 rand::RngCore::fill_bytes(&mut rng, &mut bytes[..]);
505
506 bytes
507 }
508}