1use std::{collections::HashMap, net::SocketAddr, sync::Arc};
7
8use iota_metrics::histogram::{Histogram, HistogramVec};
9use iota_types::{
10 base_types::*,
11 committee::*,
12 crypto::AuthorityPublicKeyBytes,
13 effects::{SignedTransactionEffects, TransactionEffectsAPI},
14 error::{IotaError, IotaResult},
15 fp_ensure,
16 iota_system_state::IotaSystemState,
17 messages_grpc::{
18 HandleCertificateRequestV1, HandleCertificateResponseV1, ObjectInfoRequest,
19 ObjectInfoResponse, SystemStateRequest, TransactionInfoRequest, TransactionStatus,
20 VerifiedObjectInfoResponse,
21 },
22 messages_safe_client::PlainTransactionInfoResponse,
23 transaction::*,
24};
25use prometheus::{
26 IntCounterVec, Registry, core::GenericCounter, register_int_counter_vec_with_registry,
27};
28use tap::TapFallible;
29use tracing::{debug, error, instrument};
30
31use crate::{authority_client::AuthorityAPI, epoch::committee_store::CommitteeStore};
32
33macro_rules! check_error {
34 ($address:expr, $cond:expr, $msg:expr) => {
35 $cond.tap_err(|err| {
36 if err.individual_error_indicates_epoch_change() {
37 debug!(?err, authority=?$address, "Not a real client error");
38 } else {
39 error!(?err, authority=?$address, $msg);
40 }
41 })
42 }
43}
44
45#[derive(Clone)]
46pub struct SafeClientMetricsBase {
47 total_requests_by_address_method: IntCounterVec,
48 total_responses_by_address_method: IntCounterVec,
49 latency: HistogramVec,
50}
51
52impl SafeClientMetricsBase {
53 pub fn new(registry: &Registry) -> Self {
54 Self {
55 total_requests_by_address_method: register_int_counter_vec_with_registry!(
56 "safe_client_total_requests_by_address_method",
57 "Total requests to validators group by address and method",
58 &["address", "method"],
59 registry,
60 )
61 .unwrap(),
62 total_responses_by_address_method: register_int_counter_vec_with_registry!(
63 "safe_client_total_responses_by_address_method",
64 "Total good (OK) responses from validators group by address and method",
65 &["address", "method"],
66 registry,
67 )
68 .unwrap(),
69 latency: HistogramVec::new_in_registry(
70 "safe_client_latency",
71 "RPC latency observed by safe client aggregator, group by address and method",
72 &["address", "method"],
73 registry,
74 ),
75 }
76 }
77}
78
79#[derive(Clone)]
81pub struct SafeClientMetrics {
82 total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
83 total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
84 total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
85 total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
86 handle_transaction_latency: Histogram,
87 handle_certificate_latency: Histogram,
88 handle_obj_info_latency: Histogram,
89 handle_tx_info_latency: Histogram,
90}
91
92impl SafeClientMetrics {
93 pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
94 let validator_address = validator_address.to_string();
95
96 let total_requests_handle_transaction_info_request = metrics_base
97 .total_requests_by_address_method
98 .with_label_values(&[
99 validator_address.as_str(),
100 "handle_transaction_info_request",
101 ]);
102 let total_ok_responses_handle_transaction_info_request = metrics_base
103 .total_responses_by_address_method
104 .with_label_values(&[
105 validator_address.as_str(),
106 "handle_transaction_info_request",
107 ]);
108
109 let total_requests_handle_object_info_request = metrics_base
110 .total_requests_by_address_method
111 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
112 let total_ok_responses_handle_object_info_request = metrics_base
113 .total_responses_by_address_method
114 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
115
116 let handle_transaction_latency = metrics_base
117 .latency
118 .with_label_values(&[validator_address.as_str(), "handle_transaction"]);
119 let handle_certificate_latency = metrics_base
120 .latency
121 .with_label_values(&[validator_address.as_str(), "handle_certificate"]);
122 let handle_obj_info_latency = metrics_base
123 .latency
124 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
125 let handle_tx_info_latency = metrics_base.latency.with_label_values(&[
126 validator_address.as_str(),
127 "handle_transaction_info_request",
128 ]);
129
130 Self {
131 total_requests_handle_transaction_info_request,
132 total_ok_responses_handle_transaction_info_request,
133 total_requests_handle_object_info_request,
134 total_ok_responses_handle_object_info_request,
135 handle_transaction_latency,
136 handle_certificate_latency,
137 handle_obj_info_latency,
138 handle_tx_info_latency,
139 }
140 }
141
142 pub fn new_for_tests(validator_address: AuthorityName) -> Self {
143 let registry = Registry::new();
144 let metrics_base = SafeClientMetricsBase::new(®istry);
145 Self::new(&metrics_base, validator_address)
146 }
147}
148
149#[derive(Clone)]
152pub struct SafeClient<C>
153where
154 C: Clone,
155{
156 authority_client: C,
157 committee_store: Arc<CommitteeStore>,
158 address: AuthorityPublicKeyBytes,
159 metrics: SafeClientMetrics,
160}
161
162impl<C: Clone> SafeClient<C> {
163 pub fn new(
164 authority_client: C,
165 committee_store: Arc<CommitteeStore>,
166 address: AuthorityPublicKeyBytes,
167 metrics: SafeClientMetrics,
168 ) -> Self {
169 Self {
170 authority_client,
171 committee_store,
172 address,
173 metrics,
174 }
175 }
176}
177
178impl<C: Clone> SafeClient<C> {
179 pub fn authority_client(&self) -> &C {
180 &self.authority_client
181 }
182
183 #[cfg(test)]
184 pub fn authority_client_mut(&mut self) -> &mut C {
185 &mut self.authority_client
186 }
187
188 fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Arc<Committee>> {
189 self.committee_store
190 .get_committee(epoch_id)?
191 .ok_or(IotaError::MissingCommitteeAtEpoch(*epoch_id))
192 }
193
194 fn check_signed_effects_plain(
195 &self,
196 digest: &TransactionDigest,
197 signed_effects: SignedTransactionEffects,
198 expected_effects_digest: Option<&TransactionEffectsDigest>,
199 ) -> IotaResult<SignedTransactionEffects> {
200 fp_ensure!(
202 signed_effects.auth_sig().authority == self.address,
203 IotaError::ByzantineAuthoritySuspicion {
204 authority: self.address,
205 reason: format!(
206 "Unexpected validator address in the signed effects signature: {:?}",
207 signed_effects.auth_sig().authority
208 ),
209 }
210 );
211 fp_ensure!(
213 signed_effects.data().transaction_digest() == digest,
214 IotaError::ByzantineAuthoritySuspicion {
215 authority: self.address,
216 reason: "Unexpected tx digest in the signed effects".to_string()
217 }
218 );
219 if let Some(effects_digest) = expected_effects_digest {
221 fp_ensure!(
222 signed_effects.digest() == effects_digest,
223 IotaError::ByzantineAuthoritySuspicion {
224 authority: self.address,
225 reason: "Effects digest does not match with expected digest".to_string()
226 }
227 );
228 }
229 self.get_committee(&signed_effects.epoch())?;
230 Ok(signed_effects)
231 }
232
233 fn check_transaction_info(
234 &self,
235 digest: &TransactionDigest,
236 transaction: Transaction,
237 status: TransactionStatus,
238 ) -> IotaResult<PlainTransactionInfoResponse> {
239 fp_ensure!(
240 digest == transaction.digest(),
241 IotaError::ByzantineAuthoritySuspicion {
242 authority: self.address,
243 reason: "Signed transaction digest does not match with expected digest".to_string()
244 }
245 );
246 match status {
247 TransactionStatus::Signed(signed) => {
248 self.get_committee(&signed.epoch)?;
249 Ok(PlainTransactionInfoResponse::Signed(
250 SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
251 ))
252 }
253 TransactionStatus::Executed(cert_opt, effects, events) => {
254 let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
255 match cert_opt {
256 Some(cert) => {
257 let committee = self.get_committee(&cert.epoch)?;
258 let ct = CertifiedTransaction::new_from_data_and_sig(
259 transaction.into_data(),
260 cert,
261 );
262 ct.verify_committee_sigs_only(&committee).map_err(|e| {
263 IotaError::FailedToVerifyTxCertWithExecutedEffects {
264 validator_name: self.address,
265 error: e.to_string(),
266 }
267 })?;
268 Ok(PlainTransactionInfoResponse::ExecutedWithCert(
269 ct,
270 signed_effects,
271 events,
272 ))
273 }
274 None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
275 transaction,
276 signed_effects,
277 events,
278 )),
279 }
280 }
281 }
282 }
283
284 fn check_object_response(
285 &self,
286 request: &ObjectInfoRequest,
287 response: ObjectInfoResponse,
288 ) -> IotaResult<VerifiedObjectInfoResponse> {
289 let ObjectInfoResponse {
290 object,
291 layout: _,
292 lock_for_debugging: _,
293 } = response;
294
295 fp_ensure!(
296 request.object_id == object.id(),
297 IotaError::ByzantineAuthoritySuspicion {
298 authority: self.address,
299 reason: "Object id mismatch in the response".to_string()
300 }
301 );
302
303 Ok(VerifiedObjectInfoResponse { object })
304 }
305
306 pub fn address(&self) -> &AuthorityPublicKeyBytes {
307 &self.address
308 }
309}
310
311impl<C> SafeClient<C>
312where
313 C: AuthorityAPI + Send + Sync + Clone + 'static,
314{
315 pub async fn handle_transaction(
317 &self,
318 transaction: Transaction,
319 client_addr: Option<SocketAddr>,
320 ) -> Result<PlainTransactionInfoResponse, IotaError> {
321 let _timer = self.metrics.handle_transaction_latency.start_timer();
322 let digest = *transaction.digest();
323 let response = self
324 .authority_client
325 .handle_transaction(transaction.clone(), client_addr)
326 .await?;
327 let response = check_error!(
328 self.address,
329 self.check_transaction_info(&digest, transaction, response.status),
330 "Client error in handle_transaction"
331 )?;
332 Ok(response)
333 }
334
335 fn verify_certificate_response_v1(
336 &self,
337 digest: &TransactionDigest,
338 HandleCertificateResponseV1 {
339 signed_effects,
340 events,
341 input_objects,
342 output_objects,
343 auxiliary_data,
344 }: HandleCertificateResponseV1,
345 ) -> IotaResult<HandleCertificateResponseV1> {
346 let signed_effects = self.check_signed_effects_plain(digest, signed_effects, None)?;
347
348 match (&events, signed_effects.events_digest()) {
350 (None, None) | (None, Some(_)) => {}
351 (Some(events), None) => {
352 if !events.data.is_empty() {
353 return Err(IotaError::ByzantineAuthoritySuspicion {
354 authority: self.address,
355 reason: "Returned events but no event digest present in the signed effects"
356 .to_string(),
357 });
358 }
359 }
360 (Some(events), Some(events_digest)) => {
361 fp_ensure!(
362 &events.digest() == events_digest,
363 IotaError::ByzantineAuthoritySuspicion {
364 authority: self.address,
365 reason: "Returned events don't match events digest in the signed effects"
366 .to_string()
367 }
368 );
369 }
370 }
371
372 if let Some(input_objects) = &input_objects {
374 let expected: HashMap<_, _> = signed_effects
375 .old_object_metadata()
376 .into_iter()
377 .map(|(object_ref, _owner)| (object_ref.0, object_ref))
378 .collect();
379
380 for object in input_objects {
381 let object_ref = object.compute_object_reference();
382 if expected
383 .get(&object_ref.0)
384 .is_none_or(|expect| &object_ref != expect)
385 {
386 return Err(IotaError::ByzantineAuthoritySuspicion {
387 authority: self.address,
388 reason: "Returned input object that wasn't present in the signed effects"
389 .to_string(),
390 });
391 }
392 }
393 }
394
395 if let Some(output_objects) = &output_objects {
397 let expected: HashMap<_, _> = signed_effects
398 .all_changed_objects()
399 .into_iter()
400 .map(|(object_ref, _, _)| (object_ref.0, object_ref))
401 .collect();
402
403 for object in output_objects {
404 let object_ref = object.compute_object_reference();
405 if expected
406 .get(&object_ref.0)
407 .is_none_or(|expect| &object_ref != expect)
408 {
409 return Err(IotaError::ByzantineAuthoritySuspicion {
410 authority: self.address,
411 reason: "Returned output object that wasn't present in the signed effects"
412 .to_string(),
413 });
414 }
415 }
416 }
417
418 Ok(HandleCertificateResponseV1 {
419 signed_effects,
420 events,
421 input_objects,
422 output_objects,
423 auxiliary_data,
424 })
425 }
426
427 pub async fn handle_certificate_v1(
429 &self,
430 request: HandleCertificateRequestV1,
431 client_addr: Option<SocketAddr>,
432 ) -> Result<HandleCertificateResponseV1, IotaError> {
433 let digest = *request.certificate.digest();
434 let _timer = self.metrics.handle_certificate_latency.start_timer();
435 let response = self
436 .authority_client
437 .handle_certificate_v1(request, client_addr)
438 .await?;
439
440 let verified = check_error!(
441 self.address,
442 self.verify_certificate_response_v1(&digest, response),
443 "Client error in handle_certificate"
444 )?;
445 Ok(verified)
446 }
447
448 pub async fn handle_object_info_request(
449 &self,
450 request: ObjectInfoRequest,
451 ) -> Result<VerifiedObjectInfoResponse, IotaError> {
452 self.metrics.total_requests_handle_object_info_request.inc();
453
454 let _timer = self.metrics.handle_obj_info_latency.start_timer();
455 let response = self
456 .authority_client
457 .handle_object_info_request(request.clone())
458 .await?;
459 let response = self
460 .check_object_response(&request, response)
461 .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
462
463 self.metrics
464 .total_ok_responses_handle_object_info_request
465 .inc();
466 Ok(response)
467 }
468
469 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
471 pub async fn handle_transaction_info_request(
472 &self,
473 request: TransactionInfoRequest,
474 ) -> Result<PlainTransactionInfoResponse, IotaError> {
475 self.metrics
476 .total_requests_handle_transaction_info_request
477 .inc();
478
479 let _timer = self.metrics.handle_tx_info_latency.start_timer();
480
481 let transaction_info = self
482 .authority_client
483 .handle_transaction_info_request(request.clone())
484 .await?;
485
486 let transaction = Transaction::new(transaction_info.transaction);
487 let transaction_info = self.check_transaction_info(
488 &request.transaction_digest,
489 transaction,
490 transaction_info.status,
491 ).tap_err(|err| {
492 error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
493 })?;
494 self.metrics
495 .total_ok_responses_handle_transaction_info_request
496 .inc();
497 Ok(transaction_info)
498 }
499
500 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
501 pub async fn handle_system_state_object(&self) -> Result<IotaSystemState, IotaError> {
502 self.authority_client
503 .handle_system_state_object(SystemStateRequest { _unused: false })
504 .await
505 }
506}