iota_network_stack/
anemo_ext.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Instant;
6
7use anemo::{
8    Network, PeerId, Request, Response,
9    codegen::{BoxError, BoxFuture, Service},
10    types::PeerEvent,
11};
12use bytes::Bytes;
13use futures::{FutureExt, future::OptionFuture};
14
15pub trait NetworkExt {
16    fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer;
17}
18
19impl NetworkExt for Network {
20    fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer {
21        WaitingPeer::new(self.clone(), peer_id)
22    }
23}
24
25#[derive(Clone)]
26pub struct WaitingPeer {
27    peer_id: PeerId,
28    network: Network,
29}
30
31impl WaitingPeer {
32    pub fn new(network: Network, peer_id: PeerId) -> Self {
33        Self { peer_id, network }
34    }
35
36    async fn do_rpc(self, mut request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
37        use tokio::sync::broadcast::error::RecvError;
38
39        let start = Instant::now();
40        let (mut subscriber, _) = self.network.subscribe()?;
41
42        // If we're connected with the peer immediately make the request
43        if let Some(mut peer) = self.network.peer(self.peer_id) {
44            return peer.rpc(request).await.map_err(Into::into);
45        }
46
47        // If we're not connected we'll need to check to see if the Peer is a KnownPeer
48        let timeout = request.timeout();
49        let sleep: OptionFuture<_> = timeout.map(tokio::time::sleep).into();
50        tokio::pin!(sleep);
51        loop {
52            if self.network.known_peers().get(&self.peer_id).is_none() {
53                return Err(format!("peer {} is not a known peer", self.peer_id).into());
54            }
55
56            tokio::select! {
57                recv = subscriber.recv() => match recv {
58                    Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
59                        // We're now connected with the peer, lets try to make a network request
60                        if let Some(mut peer) = self.network.peer(self.peer_id) {
61                            if let Some(duration) = timeout {
62                                // Reduce timeout to account for time already spent waiting
63                                // for the peer.
64                                request.set_timeout(duration.saturating_sub(Instant::now().duration_since(start)));
65                            }
66                            return peer.rpc(request).await.map_err(Into::into);
67                        }
68                    }
69                    Err(RecvError::Closed) => return Err("network is closed".into()),
70                    Err(RecvError::Lagged(_)) => {
71                        subscriber = subscriber.resubscribe();
72
73                        // We lagged behind so we may have missed the connection event
74                        if let Some(mut peer) = self.network.peer(self.peer_id) {
75                            return peer.rpc(request).await.map_err(Into::into);
76                        }
77                    }
78                    // Just do another iteration
79                    _ => {}
80                },
81                Some(_) = &mut sleep => {
82                    return Err(format!("timed out waiting for peer {}", self.peer_id).into());
83                },
84            }
85        }
86    }
87}
88
89impl Service<Request<Bytes>> for WaitingPeer {
90    type Response = Response<Bytes>;
91    type Error = BoxError;
92    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
93
94    #[inline]
95    fn poll_ready(
96        &mut self,
97        _: &mut std::task::Context<'_>,
98    ) -> std::task::Poll<Result<(), Self::Error>> {
99        std::task::Poll::Ready(Ok(()))
100    }
101
102    #[inline]
103    fn call(&mut self, request: Request<Bytes>) -> Self::Future {
104        let peer = self.clone();
105        peer.do_rpc(request).boxed()
106    }
107}