1use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use iota_network::{
11 api::ValidatorClient,
12 tonic,
13 tonic::{metadata::KeyAndValueRef, transport::Channel},
14};
15use iota_network_stack::config::Config;
16use iota_types::{
17 base_types::AuthorityName,
18 committee::CommitteeWithNetworkMetadata,
19 crypto::NetworkPublicKey,
20 error::{IotaError, IotaResult},
21 iota_system_state::IotaSystemState,
22 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
23 messages_grpc::{
24 HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
25 HandleCertificateRequestV1, HandleCertificateResponseV1,
26 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
27 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
28 TransactionInfoRequest, TransactionInfoResponse,
29 },
30 multiaddr::Multiaddr,
31 transaction::*,
32};
33
34use crate::authority_client::tonic::IntoRequest;
35
36#[async_trait]
37pub trait AuthorityAPI {
38 async fn handle_transaction(
40 &self,
41 transaction: Transaction,
42 client_addr: Option<SocketAddr>,
43 ) -> Result<HandleTransactionResponse, IotaError>;
44
45 async fn handle_certificate_v1(
47 &self,
48 request: HandleCertificateRequestV1,
49 client_addr: Option<SocketAddr>,
50 ) -> Result<HandleCertificateResponseV1, IotaError>;
51
52 async fn handle_soft_bundle_certificates_v1(
54 &self,
55 request: HandleSoftBundleCertificatesRequestV1,
56 client_addr: Option<SocketAddr>,
57 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError>;
58
59 async fn handle_object_info_request(
61 &self,
62 request: ObjectInfoRequest,
63 ) -> Result<ObjectInfoResponse, IotaError>;
64
65 async fn handle_transaction_info_request(
67 &self,
68 request: TransactionInfoRequest,
69 ) -> Result<TransactionInfoResponse, IotaError>;
70
71 async fn handle_checkpoint(
73 &self,
74 request: CheckpointRequest,
75 ) -> Result<CheckpointResponse, IotaError>;
76
77 async fn handle_system_state_object(
80 &self,
81 request: SystemStateRequest,
82 ) -> Result<IotaSystemState, IotaError>;
83
84 async fn handle_capability_notification_v1(
86 &self,
87 request: HandleCapabilityNotificationRequestV1,
88 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError>;
89}
90
91#[derive(Clone)]
93pub struct NetworkAuthorityClient {
94 client: IotaResult<ValidatorClient<Channel>>,
95}
96
97impl NetworkAuthorityClient {
98 pub async fn connect(
99 address: &Multiaddr,
100 tls_target: Option<NetworkPublicKey>,
101 ) -> anyhow::Result<Self> {
102 let tls_config = tls_target.map(|tls_target| {
103 iota_tls::create_rustls_client_config(
104 tls_target,
105 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
106 None,
107 )
108 });
109 let channel = iota_network_stack::client::connect(address, tls_config)
110 .await
111 .map_err(|err| anyhow!(err.to_string()))?;
112 Ok(Self::new(channel))
113 }
114
115 pub fn connect_lazy(address: &Multiaddr, tls_target: Option<NetworkPublicKey>) -> Self {
116 let tls_config = tls_target.map(|tls_target| {
117 iota_tls::create_rustls_client_config(
118 tls_target,
119 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
120 None,
121 )
122 });
123 let client: IotaResult<_> = iota_network_stack::client::connect_lazy(address, tls_config)
124 .map(ValidatorClient::new)
125 .map_err(|err| err.to_string().into());
126 Self { client }
127 }
128
129 pub fn new(channel: Channel) -> Self {
131 Self {
132 client: Ok(ValidatorClient::new(channel)),
133 }
134 }
135
136 fn new_lazy(client: IotaResult<Channel>) -> Self {
138 Self {
139 client: client.map(ValidatorClient::new),
140 }
141 }
142
143 fn client(&self) -> IotaResult<ValidatorClient<Channel>> {
144 self.client.clone()
145 }
146}
147
148#[async_trait]
149impl AuthorityAPI for NetworkAuthorityClient {
150 async fn handle_transaction(
152 &self,
153 transaction: Transaction,
154 client_addr: Option<SocketAddr>,
155 ) -> Result<HandleTransactionResponse, IotaError> {
156 let mut request = transaction.into_request();
157 insert_metadata(&mut request, client_addr);
158
159 self.client()?
160 .transaction(request)
161 .await
162 .map(tonic::Response::into_inner)
163 .map_err(Into::into)
164 }
165
166 async fn handle_certificate_v1(
167 &self,
168 request: HandleCertificateRequestV1,
169 client_addr: Option<SocketAddr>,
170 ) -> Result<HandleCertificateResponseV1, IotaError> {
171 let mut request = request.into_request();
172 insert_metadata(&mut request, client_addr);
173
174 let response = self
175 .client()?
176 .handle_certificate_v1(request)
177 .await
178 .map(tonic::Response::into_inner);
179
180 response.map_err(Into::into)
181 }
182
183 async fn handle_soft_bundle_certificates_v1(
184 &self,
185 request: HandleSoftBundleCertificatesRequestV1,
186 client_addr: Option<SocketAddr>,
187 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
188 let mut request = request.into_request();
189 insert_metadata(&mut request, client_addr);
190
191 let response = self
192 .client()?
193 .handle_soft_bundle_certificates_v1(request)
194 .await
195 .map(tonic::Response::into_inner);
196
197 response.map_err(Into::into)
198 }
199
200 async fn handle_object_info_request(
202 &self,
203 request: ObjectInfoRequest,
204 ) -> Result<ObjectInfoResponse, IotaError> {
205 self.client()?
206 .object_info(request)
207 .await
208 .map(tonic::Response::into_inner)
209 .map_err(Into::into)
210 }
211
212 async fn handle_transaction_info_request(
214 &self,
215 request: TransactionInfoRequest,
216 ) -> Result<TransactionInfoResponse, IotaError> {
217 self.client()?
218 .transaction_info(request)
219 .await
220 .map(tonic::Response::into_inner)
221 .map_err(Into::into)
222 }
223
224 async fn handle_checkpoint(
226 &self,
227 request: CheckpointRequest,
228 ) -> Result<CheckpointResponse, IotaError> {
229 self.client()?
230 .checkpoint(request)
231 .await
232 .map(tonic::Response::into_inner)
233 .map_err(Into::into)
234 }
235
236 async fn handle_system_state_object(
238 &self,
239 request: SystemStateRequest,
240 ) -> Result<IotaSystemState, IotaError> {
241 self.client()?
242 .get_system_state_object(request)
243 .await
244 .map(tonic::Response::into_inner)
245 .map_err(Into::into)
246 }
247
248 async fn handle_capability_notification_v1(
249 &self,
250 request: HandleCapabilityNotificationRequestV1,
251 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
252 self.client()?
253 .handle_capability_notification_v1(request)
254 .await
255 .map(tonic::Response::into_inner)
256 .map_err(Into::into)
257 }
258}
259
260pub fn make_network_authority_clients_with_network_config(
262 committee: &CommitteeWithNetworkMetadata,
263 network_config: &Config,
264) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
265 let mut authority_clients = BTreeMap::new();
266 for (name, (_state, network_metadata)) in committee.validators() {
267 let address = network_metadata.network_address.clone();
268 let address = address.rewrite_udp_to_tcp();
269 let maybe_channel = network_config.connect_lazy(&address, None).map_err(|e| {
281 tracing::error!(
282 address = %address,
283 name = %name,
284 "unable to create authority client: {e}"
285 );
286 e.to_string().into()
287 });
288 let client = NetworkAuthorityClient::new_lazy(maybe_channel);
289 authority_clients.insert(*name, client);
290 }
291 authority_clients
292}
293
294pub fn make_authority_clients_with_timeout_config(
296 committee: &CommitteeWithNetworkMetadata,
297 connect_timeout: Duration,
298 request_timeout: Duration,
299) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
300 let mut network_config = iota_network_stack::config::Config::new();
301 network_config.connect_timeout = Some(connect_timeout);
302 network_config.request_timeout = Some(request_timeout);
303 make_network_authority_clients_with_network_config(committee, &network_config)
304}
305
306fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
307 if let Some(client_addr) = client_addr {
308 let mut metadata = tonic::metadata::MetadataMap::new();
309 metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
310 metadata
311 .iter()
312 .for_each(|key_and_value| match key_and_value {
313 KeyAndValueRef::Ascii(key, value) => {
314 request.metadata_mut().insert(key, value.clone());
315 }
316 KeyAndValueRef::Binary(key, value) => {
317 request.metadata_mut().insert_bin(key, value.clone());
318 }
319 });
320 }
321}