iota_network_stack/
client.rs1use eyre::{Context, Result, eyre};
6use tonic::transport::{Channel, Endpoint, Uri};
7
8use crate::{
9 config::Config,
10 multiaddr::{Multiaddr, Protocol, parse_dns, parse_ip4, parse_ip6},
11};
12
13pub async fn connect(address: &Multiaddr) -> Result<Channel> {
14 let channel = endpoint_from_multiaddr(address)?.connect().await?;
15 Ok(channel)
16}
17
18pub fn connect_lazy(address: &Multiaddr) -> Result<Channel> {
19 let channel = endpoint_from_multiaddr(address)?.connect_lazy();
20 Ok(channel)
21}
22
23pub(crate) async fn connect_with_config(address: &Multiaddr, config: &Config) -> Result<Channel> {
24 let channel = endpoint_from_multiaddr(address)?
25 .apply_config(config)
26 .connect()
27 .await?;
28 Ok(channel)
29}
30
31pub(crate) fn connect_lazy_with_config(address: &Multiaddr, config: &Config) -> Result<Channel> {
32 let channel = endpoint_from_multiaddr(address)?
33 .apply_config(config)
34 .connect_lazy();
35 Ok(channel)
36}
37
38fn endpoint_from_multiaddr(addr: &Multiaddr) -> Result<MyEndpoint> {
39 let mut iter = addr.iter();
40
41 let channel = match iter.next().ok_or_else(|| eyre!("address is empty"))? {
42 Protocol::Dns(_) => {
43 let (dns_name, tcp_port, http_or_https) = parse_dns(addr)?;
44 let uri = format!("{http_or_https}://{dns_name}:{tcp_port}");
45 MyEndpoint::try_from_uri(uri)?
46 }
47 Protocol::Ip4(_) => {
48 let (socket_addr, http_or_https) = parse_ip4(addr)?;
49 let uri = format!("{http_or_https}://{socket_addr}");
50 MyEndpoint::try_from_uri(uri)?
51 }
52 Protocol::Ip6(_) => {
53 let (socket_addr, http_or_https) = parse_ip6(addr)?;
54 let uri = format!("{http_or_https}://{socket_addr}");
55 MyEndpoint::try_from_uri(uri)?
56 }
57 unsupported => return Err(eyre!("unsupported protocol {unsupported}")),
58 };
59
60 Ok(channel)
61}
62
63struct MyEndpoint {
64 endpoint: Endpoint,
65}
66
67impl MyEndpoint {
68 fn new(endpoint: Endpoint) -> Self {
69 Self { endpoint }
70 }
71
72 fn try_from_uri(uri: String) -> Result<Self> {
73 let uri: Uri = uri
74 .parse()
75 .with_context(|| format!("unable to create Uri from '{uri}'"))?;
76 let endpoint = Endpoint::from(uri);
77 Ok(Self::new(endpoint))
78 }
79
80 fn apply_config(mut self, config: &Config) -> Self {
81 self.endpoint = apply_config_to_endpoint(config, self.endpoint);
82 self
83 }
84
85 fn connect_lazy(self) -> Channel {
86 self.endpoint.connect_lazy()
87 }
88
89 async fn connect(self) -> Result<Channel> {
90 self.endpoint.connect().await.map_err(Into::into)
91 }
92}
93
94fn apply_config_to_endpoint(config: &Config, mut endpoint: Endpoint) -> Endpoint {
95 if let Some(limit) = config.concurrency_limit_per_connection {
96 endpoint = endpoint.concurrency_limit(limit);
97 }
98
99 if let Some(timeout) = config.request_timeout {
100 endpoint = endpoint.timeout(timeout);
101 }
102
103 if let Some(timeout) = config.connect_timeout {
104 endpoint = endpoint.connect_timeout(timeout);
105 }
106
107 if let Some(tcp_nodelay) = config.tcp_nodelay {
108 endpoint = endpoint.tcp_nodelay(tcp_nodelay);
109 }
110
111 if let Some(http2_keepalive_interval) = config.http2_keepalive_interval {
112 endpoint = endpoint.http2_keep_alive_interval(http2_keepalive_interval);
113 }
114
115 if let Some(http2_keepalive_timeout) = config.http2_keepalive_timeout {
116 endpoint = endpoint.keep_alive_timeout(http2_keepalive_timeout);
117 }
118
119 if let Some((limit, duration)) = config.rate_limit {
120 endpoint = endpoint.rate_limit(limit, duration);
121 }
122
123 endpoint
124 .initial_stream_window_size(config.http2_initial_stream_window_size)
125 .initial_connection_window_size(config.http2_initial_connection_window_size)
126 .tcp_keepalive(config.tcp_keepalive)
127}