iota_network_stack/
client.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use eyre::{Context, Result, eyre};
6use tonic::transport::{Channel, Endpoint, Uri};
7
8use crate::{
9    config::Config,
10    multiaddr::{Multiaddr, Protocol, parse_dns, parse_ip4, parse_ip6},
11};
12
13pub async fn connect(address: &Multiaddr) -> Result<Channel> {
14    let channel = endpoint_from_multiaddr(address)?.connect().await?;
15    Ok(channel)
16}
17
18pub fn connect_lazy(address: &Multiaddr) -> Result<Channel> {
19    let channel = endpoint_from_multiaddr(address)?.connect_lazy();
20    Ok(channel)
21}
22
23pub(crate) async fn connect_with_config(address: &Multiaddr, config: &Config) -> Result<Channel> {
24    let channel = endpoint_from_multiaddr(address)?
25        .apply_config(config)
26        .connect()
27        .await?;
28    Ok(channel)
29}
30
31pub(crate) fn connect_lazy_with_config(address: &Multiaddr, config: &Config) -> Result<Channel> {
32    let channel = endpoint_from_multiaddr(address)?
33        .apply_config(config)
34        .connect_lazy();
35    Ok(channel)
36}
37
38fn endpoint_from_multiaddr(addr: &Multiaddr) -> Result<MyEndpoint> {
39    let mut iter = addr.iter();
40
41    let channel = match iter.next().ok_or_else(|| eyre!("address is empty"))? {
42        Protocol::Dns(_) => {
43            let (dns_name, tcp_port, http_or_https) = parse_dns(addr)?;
44            let uri = format!("{http_or_https}://{dns_name}:{tcp_port}");
45            MyEndpoint::try_from_uri(uri)?
46        }
47        Protocol::Ip4(_) => {
48            let (socket_addr, http_or_https) = parse_ip4(addr)?;
49            let uri = format!("{http_or_https}://{socket_addr}");
50            MyEndpoint::try_from_uri(uri)?
51        }
52        Protocol::Ip6(_) => {
53            let (socket_addr, http_or_https) = parse_ip6(addr)?;
54            let uri = format!("{http_or_https}://{socket_addr}");
55            MyEndpoint::try_from_uri(uri)?
56        }
57        unsupported => return Err(eyre!("unsupported protocol {unsupported}")),
58    };
59
60    Ok(channel)
61}
62
63struct MyEndpoint {
64    endpoint: Endpoint,
65}
66
67impl MyEndpoint {
68    fn new(endpoint: Endpoint) -> Self {
69        Self { endpoint }
70    }
71
72    fn try_from_uri(uri: String) -> Result<Self> {
73        let uri: Uri = uri
74            .parse()
75            .with_context(|| format!("unable to create Uri from '{uri}'"))?;
76        let endpoint = Endpoint::from(uri);
77        Ok(Self::new(endpoint))
78    }
79
80    fn apply_config(mut self, config: &Config) -> Self {
81        self.endpoint = apply_config_to_endpoint(config, self.endpoint);
82        self
83    }
84
85    fn connect_lazy(self) -> Channel {
86        self.endpoint.connect_lazy()
87    }
88
89    async fn connect(self) -> Result<Channel> {
90        self.endpoint.connect().await.map_err(Into::into)
91    }
92}
93
94fn apply_config_to_endpoint(config: &Config, mut endpoint: Endpoint) -> Endpoint {
95    if let Some(limit) = config.concurrency_limit_per_connection {
96        endpoint = endpoint.concurrency_limit(limit);
97    }
98
99    if let Some(timeout) = config.request_timeout {
100        endpoint = endpoint.timeout(timeout);
101    }
102
103    if let Some(timeout) = config.connect_timeout {
104        endpoint = endpoint.connect_timeout(timeout);
105    }
106
107    if let Some(tcp_nodelay) = config.tcp_nodelay {
108        endpoint = endpoint.tcp_nodelay(tcp_nodelay);
109    }
110
111    if let Some(http2_keepalive_interval) = config.http2_keepalive_interval {
112        endpoint = endpoint.http2_keep_alive_interval(http2_keepalive_interval);
113    }
114
115    if let Some(http2_keepalive_timeout) = config.http2_keepalive_timeout {
116        endpoint = endpoint.keep_alive_timeout(http2_keepalive_timeout);
117    }
118
119    if let Some((limit, duration)) = config.rate_limit {
120        endpoint = endpoint.rate_limit(limit, duration);
121    }
122
123    endpoint
124        .initial_stream_window_size(config.http2_initial_stream_window_size)
125        .initial_connection_window_size(config.http2_initial_connection_window_size)
126        .tcp_keepalive(config.tcp_keepalive)
127}