iota_core/
authority_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use iota_network::{
11    api::ValidatorClient,
12    tonic,
13    tonic::{metadata::KeyAndValueRef, transport::Channel},
14};
15use iota_network_stack::config::Config;
16use iota_types::{
17    base_types::AuthorityName,
18    committee::CommitteeWithNetworkMetadata,
19    error::{IotaError, IotaResult},
20    iota_system_state::IotaSystemState,
21    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
22    messages_grpc::{
23        HandleCertificateRequestV1, HandleCertificateResponseV1,
24        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
25        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
26        TransactionInfoRequest, TransactionInfoResponse,
27    },
28    multiaddr::Multiaddr,
29    transaction::*,
30};
31
32use crate::authority_client::tonic::IntoRequest;
33
34#[async_trait]
35pub trait AuthorityAPI {
36    /// Handles a `Transaction` for this account.
37    async fn handle_transaction(
38        &self,
39        transaction: Transaction,
40        client_addr: Option<SocketAddr>,
41    ) -> Result<HandleTransactionResponse, IotaError>;
42
43    /// Execute a certificate.
44    async fn handle_certificate_v1(
45        &self,
46        request: HandleCertificateRequestV1,
47        client_addr: Option<SocketAddr>,
48    ) -> Result<HandleCertificateResponseV1, IotaError>;
49
50    /// Execute a Soft Bundle with multiple certificates.
51    async fn handle_soft_bundle_certificates_v1(
52        &self,
53        request: HandleSoftBundleCertificatesRequestV1,
54        client_addr: Option<SocketAddr>,
55    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError>;
56
57    /// Handle Object information requests for this account.
58    async fn handle_object_info_request(
59        &self,
60        request: ObjectInfoRequest,
61    ) -> Result<ObjectInfoResponse, IotaError>;
62
63    /// Handles a `TransactionInfoRequest` for this account.
64    async fn handle_transaction_info_request(
65        &self,
66        request: TransactionInfoRequest,
67    ) -> Result<TransactionInfoResponse, IotaError>;
68
69    /// Handles a `CheckpointRequest` for this account.
70    async fn handle_checkpoint(
71        &self,
72        request: CheckpointRequest,
73    ) -> Result<CheckpointResponse, IotaError>;
74
75    // This API is exclusively used by the benchmark code.
76    // Hence it's OK to return a fixed system state type.
77    async fn handle_system_state_object(
78        &self,
79        request: SystemStateRequest,
80    ) -> Result<IotaSystemState, IotaError>;
81}
82
83/// A client for the network authority.
84#[derive(Clone)]
85pub struct NetworkAuthorityClient {
86    client: IotaResult<ValidatorClient<Channel>>,
87}
88
89impl NetworkAuthorityClient {
90    /// Connects to a client address.
91    pub async fn connect(address: &Multiaddr) -> anyhow::Result<Self> {
92        let channel = iota_network_stack::client::connect(address)
93            .await
94            .map_err(|err| anyhow!(err.to_string()))?;
95        Ok(Self::new(channel))
96    }
97
98    /// Connects to a client address lazily.
99    pub fn connect_lazy(address: &Multiaddr) -> Self {
100        let client: IotaResult<_> = iota_network_stack::client::connect_lazy(address)
101            .map(ValidatorClient::new)
102            .map_err(|err| err.to_string().into());
103        Self { client }
104    }
105
106    /// Creates a new client with a `transport` channel.
107    pub fn new(channel: Channel) -> Self {
108        Self {
109            client: Ok(ValidatorClient::new(channel)),
110        }
111    }
112
113    /// Creates a new client with a lazy `transport` channel.
114    fn new_lazy(client: IotaResult<Channel>) -> Self {
115        Self {
116            client: client.map(ValidatorClient::new),
117        }
118    }
119
120    fn client(&self) -> IotaResult<ValidatorClient<Channel>> {
121        self.client.clone()
122    }
123}
124
125#[async_trait]
126impl AuthorityAPI for NetworkAuthorityClient {
127    /// Handles a `Transaction` for this account.
128    async fn handle_transaction(
129        &self,
130        transaction: Transaction,
131        client_addr: Option<SocketAddr>,
132    ) -> Result<HandleTransactionResponse, IotaError> {
133        let mut request = transaction.into_request();
134        insert_metadata(&mut request, client_addr);
135
136        self.client()?
137            .transaction(request)
138            .await
139            .map(tonic::Response::into_inner)
140            .map_err(Into::into)
141    }
142
143    async fn handle_certificate_v1(
144        &self,
145        request: HandleCertificateRequestV1,
146        client_addr: Option<SocketAddr>,
147    ) -> Result<HandleCertificateResponseV1, IotaError> {
148        let mut request = request.into_request();
149        insert_metadata(&mut request, client_addr);
150
151        let response = self
152            .client()?
153            .handle_certificate_v1(request)
154            .await
155            .map(tonic::Response::into_inner);
156
157        response.map_err(Into::into)
158    }
159
160    async fn handle_soft_bundle_certificates_v1(
161        &self,
162        request: HandleSoftBundleCertificatesRequestV1,
163        client_addr: Option<SocketAddr>,
164    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
165        let mut request = request.into_request();
166        insert_metadata(&mut request, client_addr);
167
168        let response = self
169            .client()?
170            .handle_soft_bundle_certificates_v1(request)
171            .await
172            .map(tonic::Response::into_inner);
173
174        response.map_err(Into::into)
175    }
176
177    /// Handles a `ObjectInfoRequest` for this account.
178    async fn handle_object_info_request(
179        &self,
180        request: ObjectInfoRequest,
181    ) -> Result<ObjectInfoResponse, IotaError> {
182        self.client()?
183            .object_info(request)
184            .await
185            .map(tonic::Response::into_inner)
186            .map_err(Into::into)
187    }
188
189    /// Handles a `TransactionInfoRequest` for this account.
190    async fn handle_transaction_info_request(
191        &self,
192        request: TransactionInfoRequest,
193    ) -> Result<TransactionInfoResponse, IotaError> {
194        self.client()?
195            .transaction_info(request)
196            .await
197            .map(tonic::Response::into_inner)
198            .map_err(Into::into)
199    }
200
201    /// Handles a `CheckpointRequest` for this account.
202    async fn handle_checkpoint(
203        &self,
204        request: CheckpointRequest,
205    ) -> Result<CheckpointResponse, IotaError> {
206        self.client()?
207            .checkpoint(request)
208            .await
209            .map(tonic::Response::into_inner)
210            .map_err(Into::into)
211    }
212
213    /// This API is exclusively used by the benchmark code.
214    async fn handle_system_state_object(
215        &self,
216        request: SystemStateRequest,
217    ) -> Result<IotaSystemState, IotaError> {
218        self.client()?
219            .get_system_state_object(request)
220            .await
221            .map(tonic::Response::into_inner)
222            .map_err(Into::into)
223    }
224}
225
226/// Creates authority clients with network configuration.
227pub fn make_network_authority_clients_with_network_config(
228    committee: &CommitteeWithNetworkMetadata,
229    network_config: &Config,
230) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
231    let mut authority_clients = BTreeMap::new();
232    for (name, (_state, network_metadata)) in committee.validators() {
233        let address = network_metadata.network_address.clone();
234        let address = address.rewrite_udp_to_tcp();
235        let maybe_channel = network_config.connect_lazy(&address).map_err(|e| {
236            tracing::error!(
237                address = %address,
238                name = %name,
239                "unable to create authority client: {e}"
240            );
241            e.to_string().into()
242        });
243        let client = NetworkAuthorityClient::new_lazy(maybe_channel);
244        authority_clients.insert(*name, client);
245    }
246    authority_clients
247}
248
249/// Creates authority clients with a timeout configuration.
250pub fn make_authority_clients_with_timeout_config(
251    committee: &CommitteeWithNetworkMetadata,
252    connect_timeout: Duration,
253    request_timeout: Duration,
254) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
255    let mut network_config = iota_network_stack::config::Config::new();
256    network_config.connect_timeout = Some(connect_timeout);
257    network_config.request_timeout = Some(request_timeout);
258    make_network_authority_clients_with_network_config(committee, &network_config)
259}
260
261fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
262    if let Some(client_addr) = client_addr {
263        let mut metadata = tonic::metadata::MetadataMap::new();
264        metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
265        metadata
266            .iter()
267            .for_each(|key_and_value| match key_and_value {
268                KeyAndValueRef::Ascii(key, value) => {
269                    request.metadata_mut().insert(key, value.clone());
270                }
271                KeyAndValueRef::Binary(key, value) => {
272                    request.metadata_mut().insert_bin(key, value.clone());
273                }
274            });
275    }
276}