1use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use iota_network::{
11 api::ValidatorClient,
12 tonic,
13 tonic::{metadata::KeyAndValueRef, transport::Channel},
14};
15use iota_network_stack::config::Config;
16use iota_types::{
17 base_types::AuthorityName,
18 committee::CommitteeWithNetworkMetadata,
19 error::{IotaError, IotaResult},
20 iota_system_state::IotaSystemState,
21 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
22 messages_grpc::{
23 HandleCertificateRequestV1, HandleCertificateResponseV1,
24 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
25 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
26 TransactionInfoRequest, TransactionInfoResponse,
27 },
28 multiaddr::Multiaddr,
29 transaction::*,
30};
31
32use crate::authority_client::tonic::IntoRequest;
33
34#[async_trait]
35pub trait AuthorityAPI {
36 async fn handle_transaction(
38 &self,
39 transaction: Transaction,
40 client_addr: Option<SocketAddr>,
41 ) -> Result<HandleTransactionResponse, IotaError>;
42
43 async fn handle_certificate_v1(
45 &self,
46 request: HandleCertificateRequestV1,
47 client_addr: Option<SocketAddr>,
48 ) -> Result<HandleCertificateResponseV1, IotaError>;
49
50 async fn handle_soft_bundle_certificates_v1(
52 &self,
53 request: HandleSoftBundleCertificatesRequestV1,
54 client_addr: Option<SocketAddr>,
55 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError>;
56
57 async fn handle_object_info_request(
59 &self,
60 request: ObjectInfoRequest,
61 ) -> Result<ObjectInfoResponse, IotaError>;
62
63 async fn handle_transaction_info_request(
65 &self,
66 request: TransactionInfoRequest,
67 ) -> Result<TransactionInfoResponse, IotaError>;
68
69 async fn handle_checkpoint(
71 &self,
72 request: CheckpointRequest,
73 ) -> Result<CheckpointResponse, IotaError>;
74
75 async fn handle_system_state_object(
78 &self,
79 request: SystemStateRequest,
80 ) -> Result<IotaSystemState, IotaError>;
81}
82
83#[derive(Clone)]
85pub struct NetworkAuthorityClient {
86 client: IotaResult<ValidatorClient<Channel>>,
87}
88
89impl NetworkAuthorityClient {
90 pub async fn connect(address: &Multiaddr) -> anyhow::Result<Self> {
92 let channel = iota_network_stack::client::connect(address)
93 .await
94 .map_err(|err| anyhow!(err.to_string()))?;
95 Ok(Self::new(channel))
96 }
97
98 pub fn connect_lazy(address: &Multiaddr) -> Self {
100 let client: IotaResult<_> = iota_network_stack::client::connect_lazy(address)
101 .map(ValidatorClient::new)
102 .map_err(|err| err.to_string().into());
103 Self { client }
104 }
105
106 pub fn new(channel: Channel) -> Self {
108 Self {
109 client: Ok(ValidatorClient::new(channel)),
110 }
111 }
112
113 fn new_lazy(client: IotaResult<Channel>) -> Self {
115 Self {
116 client: client.map(ValidatorClient::new),
117 }
118 }
119
120 fn client(&self) -> IotaResult<ValidatorClient<Channel>> {
121 self.client.clone()
122 }
123}
124
125#[async_trait]
126impl AuthorityAPI for NetworkAuthorityClient {
127 async fn handle_transaction(
129 &self,
130 transaction: Transaction,
131 client_addr: Option<SocketAddr>,
132 ) -> Result<HandleTransactionResponse, IotaError> {
133 let mut request = transaction.into_request();
134 insert_metadata(&mut request, client_addr);
135
136 self.client()?
137 .transaction(request)
138 .await
139 .map(tonic::Response::into_inner)
140 .map_err(Into::into)
141 }
142
143 async fn handle_certificate_v1(
144 &self,
145 request: HandleCertificateRequestV1,
146 client_addr: Option<SocketAddr>,
147 ) -> Result<HandleCertificateResponseV1, IotaError> {
148 let mut request = request.into_request();
149 insert_metadata(&mut request, client_addr);
150
151 let response = self
152 .client()?
153 .handle_certificate_v1(request)
154 .await
155 .map(tonic::Response::into_inner);
156
157 response.map_err(Into::into)
158 }
159
160 async fn handle_soft_bundle_certificates_v1(
161 &self,
162 request: HandleSoftBundleCertificatesRequestV1,
163 client_addr: Option<SocketAddr>,
164 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
165 let mut request = request.into_request();
166 insert_metadata(&mut request, client_addr);
167
168 let response = self
169 .client()?
170 .handle_soft_bundle_certificates_v1(request)
171 .await
172 .map(tonic::Response::into_inner);
173
174 response.map_err(Into::into)
175 }
176
177 async fn handle_object_info_request(
179 &self,
180 request: ObjectInfoRequest,
181 ) -> Result<ObjectInfoResponse, IotaError> {
182 self.client()?
183 .object_info(request)
184 .await
185 .map(tonic::Response::into_inner)
186 .map_err(Into::into)
187 }
188
189 async fn handle_transaction_info_request(
191 &self,
192 request: TransactionInfoRequest,
193 ) -> Result<TransactionInfoResponse, IotaError> {
194 self.client()?
195 .transaction_info(request)
196 .await
197 .map(tonic::Response::into_inner)
198 .map_err(Into::into)
199 }
200
201 async fn handle_checkpoint(
203 &self,
204 request: CheckpointRequest,
205 ) -> Result<CheckpointResponse, IotaError> {
206 self.client()?
207 .checkpoint(request)
208 .await
209 .map(tonic::Response::into_inner)
210 .map_err(Into::into)
211 }
212
213 async fn handle_system_state_object(
215 &self,
216 request: SystemStateRequest,
217 ) -> Result<IotaSystemState, IotaError> {
218 self.client()?
219 .get_system_state_object(request)
220 .await
221 .map(tonic::Response::into_inner)
222 .map_err(Into::into)
223 }
224}
225
226pub fn make_network_authority_clients_with_network_config(
228 committee: &CommitteeWithNetworkMetadata,
229 network_config: &Config,
230) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
231 let mut authority_clients = BTreeMap::new();
232 for (name, (_state, network_metadata)) in committee.validators() {
233 let address = network_metadata.network_address.clone();
234 let address = address.rewrite_udp_to_tcp();
235 let maybe_channel = network_config.connect_lazy(&address).map_err(|e| {
236 tracing::error!(
237 address = %address,
238 name = %name,
239 "unable to create authority client: {e}"
240 );
241 e.to_string().into()
242 });
243 let client = NetworkAuthorityClient::new_lazy(maybe_channel);
244 authority_clients.insert(*name, client);
245 }
246 authority_clients
247}
248
249pub fn make_authority_clients_with_timeout_config(
251 committee: &CommitteeWithNetworkMetadata,
252 connect_timeout: Duration,
253 request_timeout: Duration,
254) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
255 let mut network_config = iota_network_stack::config::Config::new();
256 network_config.connect_timeout = Some(connect_timeout);
257 network_config.request_timeout = Some(request_timeout);
258 make_network_authority_clients_with_network_config(committee, &network_config)
259}
260
261fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
262 if let Some(client_addr) = client_addr {
263 let mut metadata = tonic::metadata::MetadataMap::new();
264 metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
265 metadata
266 .iter()
267 .for_each(|key_and_value| match key_and_value {
268 KeyAndValueRef::Ascii(key, value) => {
269 request.metadata_mut().insert(key, value.clone());
270 }
271 KeyAndValueRef::Binary(key, value) => {
272 request.metadata_mut().insert_bin(key, value.clone());
273 }
274 });
275 }
276}