iota_network_stack/
anemo_ext.rs1use std::time::Instant;
6
7use anemo::{
8 Network, PeerId, Request, Response,
9 codegen::{BoxError, BoxFuture, Service},
10 types::PeerEvent,
11};
12use bytes::Bytes;
13use futures::{FutureExt, future::OptionFuture};
14
15pub trait NetworkExt {
16 fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer;
17}
18
19impl NetworkExt for Network {
20 fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer {
21 WaitingPeer::new(self.clone(), peer_id)
22 }
23}
24
25#[derive(Clone)]
26pub struct WaitingPeer {
27 peer_id: PeerId,
28 network: Network,
29}
30
31impl WaitingPeer {
32 pub fn new(network: Network, peer_id: PeerId) -> Self {
33 Self { peer_id, network }
34 }
35
36 async fn do_rpc(self, mut request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
37 use tokio::sync::broadcast::error::RecvError;
38
39 let start = Instant::now();
40 let (mut subscriber, _) = self.network.subscribe()?;
41
42 if let Some(mut peer) = self.network.peer(self.peer_id) {
44 return peer.rpc(request).await.map_err(Into::into);
45 }
46
47 let timeout = request.timeout();
49 let sleep: OptionFuture<_> = timeout.map(tokio::time::sleep).into();
50 tokio::pin!(sleep);
51 loop {
52 if self.network.known_peers().get(&self.peer_id).is_none() {
53 return Err(format!("peer {} is not a known peer", self.peer_id).into());
54 }
55
56 tokio::select! {
57 recv = subscriber.recv() => match recv {
58 Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
59 if let Some(mut peer) = self.network.peer(self.peer_id) {
61 if let Some(duration) = timeout {
62 request.set_timeout(duration.saturating_sub(Instant::now().duration_since(start)));
65 }
66 return peer.rpc(request).await.map_err(Into::into);
67 }
68 }
69 Err(RecvError::Closed) => return Err("network is closed".into()),
70 Err(RecvError::Lagged(_)) => {
71 subscriber = subscriber.resubscribe();
72
73 if let Some(mut peer) = self.network.peer(self.peer_id) {
75 return peer.rpc(request).await.map_err(Into::into);
76 }
77 }
78 _ => {}
80 },
81 Some(_) = &mut sleep => {
82 return Err(format!("timed out waiting for peer {}", self.peer_id).into());
83 },
84 }
85 }
86 }
87}
88
89impl Service<Request<Bytes>> for WaitingPeer {
90 type Response = Response<Bytes>;
91 type Error = BoxError;
92 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
93
94 #[inline]
95 fn poll_ready(
96 &mut self,
97 _: &mut std::task::Context<'_>,
98 ) -> std::task::Poll<Result<(), Self::Error>> {
99 std::task::Poll::Ready(Ok(()))
100 }
101
102 #[inline]
103 fn call(&mut self, request: Request<Bytes>) -> Self::Future {
104 let peer = self.clone();
105 peer.do_rpc(request).boxed()
106 }
107}