1use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use iota_network::{
11 api::ValidatorClient,
12 tonic,
13 tonic::{metadata::KeyAndValueRef, transport::Channel},
14};
15use iota_network_stack::config::Config;
16use iota_types::{
17 base_types::AuthorityName,
18 committee::CommitteeWithNetworkMetadata,
19 crypto::NetworkPublicKey,
20 error::{IotaError, IotaResult},
21 iota_system_state::IotaSystemState,
22 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
23 messages_grpc::{
24 HandleCertificateRequestV1, HandleCertificateResponseV1,
25 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
26 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
27 TransactionInfoRequest, TransactionInfoResponse,
28 },
29 multiaddr::Multiaddr,
30 transaction::*,
31};
32
33use crate::authority_client::tonic::IntoRequest;
34
35#[async_trait]
36pub trait AuthorityAPI {
37 async fn handle_transaction(
39 &self,
40 transaction: Transaction,
41 client_addr: Option<SocketAddr>,
42 ) -> Result<HandleTransactionResponse, IotaError>;
43
44 async fn handle_certificate_v1(
46 &self,
47 request: HandleCertificateRequestV1,
48 client_addr: Option<SocketAddr>,
49 ) -> Result<HandleCertificateResponseV1, IotaError>;
50
51 async fn handle_soft_bundle_certificates_v1(
53 &self,
54 request: HandleSoftBundleCertificatesRequestV1,
55 client_addr: Option<SocketAddr>,
56 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError>;
57
58 async fn handle_object_info_request(
60 &self,
61 request: ObjectInfoRequest,
62 ) -> Result<ObjectInfoResponse, IotaError>;
63
64 async fn handle_transaction_info_request(
66 &self,
67 request: TransactionInfoRequest,
68 ) -> Result<TransactionInfoResponse, IotaError>;
69
70 async fn handle_checkpoint(
72 &self,
73 request: CheckpointRequest,
74 ) -> Result<CheckpointResponse, IotaError>;
75
76 async fn handle_system_state_object(
79 &self,
80 request: SystemStateRequest,
81 ) -> Result<IotaSystemState, IotaError>;
82}
83
84#[derive(Clone)]
86pub struct NetworkAuthorityClient {
87 client: IotaResult<ValidatorClient<Channel>>,
88}
89
90impl NetworkAuthorityClient {
91 pub async fn connect(
92 address: &Multiaddr,
93 tls_target: Option<NetworkPublicKey>,
94 ) -> anyhow::Result<Self> {
95 let tls_config = tls_target.map(|tls_target| {
96 iota_tls::create_rustls_client_config(
97 tls_target,
98 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
99 None,
100 )
101 });
102 let channel = iota_network_stack::client::connect(address, tls_config)
103 .await
104 .map_err(|err| anyhow!(err.to_string()))?;
105 Ok(Self::new(channel))
106 }
107
108 pub fn connect_lazy(address: &Multiaddr, tls_target: Option<NetworkPublicKey>) -> Self {
109 let tls_config = tls_target.map(|tls_target| {
110 iota_tls::create_rustls_client_config(
111 tls_target,
112 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
113 None,
114 )
115 });
116 let client: IotaResult<_> = iota_network_stack::client::connect_lazy(address, tls_config)
117 .map(ValidatorClient::new)
118 .map_err(|err| err.to_string().into());
119 Self { client }
120 }
121
122 pub fn new(channel: Channel) -> Self {
124 Self {
125 client: Ok(ValidatorClient::new(channel)),
126 }
127 }
128
129 fn new_lazy(client: IotaResult<Channel>) -> Self {
131 Self {
132 client: client.map(ValidatorClient::new),
133 }
134 }
135
136 fn client(&self) -> IotaResult<ValidatorClient<Channel>> {
137 self.client.clone()
138 }
139}
140
141#[async_trait]
142impl AuthorityAPI for NetworkAuthorityClient {
143 async fn handle_transaction(
145 &self,
146 transaction: Transaction,
147 client_addr: Option<SocketAddr>,
148 ) -> Result<HandleTransactionResponse, IotaError> {
149 let mut request = transaction.into_request();
150 insert_metadata(&mut request, client_addr);
151
152 self.client()?
153 .transaction(request)
154 .await
155 .map(tonic::Response::into_inner)
156 .map_err(Into::into)
157 }
158
159 async fn handle_certificate_v1(
160 &self,
161 request: HandleCertificateRequestV1,
162 client_addr: Option<SocketAddr>,
163 ) -> Result<HandleCertificateResponseV1, IotaError> {
164 let mut request = request.into_request();
165 insert_metadata(&mut request, client_addr);
166
167 let response = self
168 .client()?
169 .handle_certificate_v1(request)
170 .await
171 .map(tonic::Response::into_inner);
172
173 response.map_err(Into::into)
174 }
175
176 async fn handle_soft_bundle_certificates_v1(
177 &self,
178 request: HandleSoftBundleCertificatesRequestV1,
179 client_addr: Option<SocketAddr>,
180 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
181 let mut request = request.into_request();
182 insert_metadata(&mut request, client_addr);
183
184 let response = self
185 .client()?
186 .handle_soft_bundle_certificates_v1(request)
187 .await
188 .map(tonic::Response::into_inner);
189
190 response.map_err(Into::into)
191 }
192
193 async fn handle_object_info_request(
195 &self,
196 request: ObjectInfoRequest,
197 ) -> Result<ObjectInfoResponse, IotaError> {
198 self.client()?
199 .object_info(request)
200 .await
201 .map(tonic::Response::into_inner)
202 .map_err(Into::into)
203 }
204
205 async fn handle_transaction_info_request(
207 &self,
208 request: TransactionInfoRequest,
209 ) -> Result<TransactionInfoResponse, IotaError> {
210 self.client()?
211 .transaction_info(request)
212 .await
213 .map(tonic::Response::into_inner)
214 .map_err(Into::into)
215 }
216
217 async fn handle_checkpoint(
219 &self,
220 request: CheckpointRequest,
221 ) -> Result<CheckpointResponse, IotaError> {
222 self.client()?
223 .checkpoint(request)
224 .await
225 .map(tonic::Response::into_inner)
226 .map_err(Into::into)
227 }
228
229 async fn handle_system_state_object(
231 &self,
232 request: SystemStateRequest,
233 ) -> Result<IotaSystemState, IotaError> {
234 self.client()?
235 .get_system_state_object(request)
236 .await
237 .map(tonic::Response::into_inner)
238 .map_err(Into::into)
239 }
240}
241
242pub fn make_network_authority_clients_with_network_config(
244 committee: &CommitteeWithNetworkMetadata,
245 network_config: &Config,
246) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
247 let mut authority_clients = BTreeMap::new();
248 for (name, (_state, network_metadata)) in committee.validators() {
249 let address = network_metadata.network_address.clone();
250 let address = address.rewrite_udp_to_tcp();
251 let maybe_channel = network_config.connect_lazy(&address, None).map_err(|e| {
263 tracing::error!(
264 address = %address,
265 name = %name,
266 "unable to create authority client: {e}"
267 );
268 e.to_string().into()
269 });
270 let client = NetworkAuthorityClient::new_lazy(maybe_channel);
271 authority_clients.insert(*name, client);
272 }
273 authority_clients
274}
275
276pub fn make_authority_clients_with_timeout_config(
278 committee: &CommitteeWithNetworkMetadata,
279 connect_timeout: Duration,
280 request_timeout: Duration,
281) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
282 let mut network_config = iota_network_stack::config::Config::new();
283 network_config.connect_timeout = Some(connect_timeout);
284 network_config.request_timeout = Some(request_timeout);
285 make_network_authority_clients_with_network_config(committee, &network_config)
286}
287
288fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
289 if let Some(client_addr) = client_addr {
290 let mut metadata = tonic::metadata::MetadataMap::new();
291 metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
292 metadata
293 .iter()
294 .for_each(|key_and_value| match key_and_value {
295 KeyAndValueRef::Ascii(key, value) => {
296 request.metadata_mut().insert(key, value.clone());
297 }
298 KeyAndValueRef::Binary(key, value) => {
299 request.metadata_mut().insert_bin(key, value.clone());
300 }
301 });
302 }
303}