1use std::{collections::HashMap, net::SocketAddr, sync::Arc};
7
8use iota_types::{
9 base_types::*,
10 committee::*,
11 crypto::AuthorityPublicKeyBytes,
12 effects::{SignedTransactionEffects, TransactionEffectsAPI},
13 error::{IotaError, IotaResult},
14 fp_ensure,
15 iota_system_state::IotaSystemState,
16 messages_grpc::{
17 HandleCertificateRequestV1, HandleCertificateResponseV1, ObjectInfoRequest,
18 ObjectInfoResponse, SystemStateRequest, TransactionInfoRequest, TransactionStatus,
19 VerifiedObjectInfoResponse,
20 },
21 messages_safe_client::PlainTransactionInfoResponse,
22 transaction::*,
23};
24use prometheus::{
25 Histogram, HistogramVec, IntCounterVec, Registry, core::GenericCounter,
26 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
27};
28use tap::TapFallible;
29use tracing::{debug, error, instrument};
30
31use crate::{authority_client::AuthorityAPI, epoch::committee_store::CommitteeStore};
32
33macro_rules! check_error {
34 ($address:expr, $cond:expr, $msg:expr) => {
35 $cond.tap_err(|err| {
36 if err.individual_error_indicates_epoch_change() {
37 debug!(?err, authority=?$address, "Not a real client error");
38 } else {
39 error!(?err, authority=?$address, $msg);
40 }
41 })
42 }
43}
44
45#[derive(Clone)]
46pub struct SafeClientMetricsBase {
47 total_requests_by_address_method: IntCounterVec,
48 total_responses_by_address_method: IntCounterVec,
49 latency: HistogramVec,
50}
51
52impl SafeClientMetricsBase {
53 pub fn new(registry: &Registry) -> Self {
54 Self {
55 total_requests_by_address_method: register_int_counter_vec_with_registry!(
56 "safe_client_total_requests_by_address_method",
57 "Total requests to validators group by address and method",
58 &["address", "method"],
59 registry,
60 )
61 .unwrap(),
62 total_responses_by_address_method: register_int_counter_vec_with_registry!(
63 "safe_client_total_responses_by_address_method",
64 "Total good (OK) responses from validators group by address and method",
65 &["address", "method"],
66 registry,
67 )
68 .unwrap(),
69 latency: register_histogram_vec_with_registry!(
71 "safe_client_latency",
72 "RPC latency observed by safe client aggregator, group by method",
73 &["method"],
74 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
75 registry,
76 )
77 .unwrap(),
78 }
79 }
80}
81
82#[derive(Clone)]
84pub struct SafeClientMetrics {
85 total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
86 total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
87 total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
88 total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
89 handle_transaction_latency: Histogram,
90 handle_certificate_latency: Histogram,
91 handle_obj_info_latency: Histogram,
92 handle_tx_info_latency: Histogram,
93}
94
95impl SafeClientMetrics {
96 pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
97 let validator_address = validator_address.to_string();
98
99 let total_requests_handle_transaction_info_request = metrics_base
100 .total_requests_by_address_method
101 .with_label_values(&[
102 validator_address.as_str(),
103 "handle_transaction_info_request",
104 ]);
105 let total_ok_responses_handle_transaction_info_request = metrics_base
106 .total_responses_by_address_method
107 .with_label_values(&[
108 validator_address.as_str(),
109 "handle_transaction_info_request",
110 ]);
111
112 let total_requests_handle_object_info_request = metrics_base
113 .total_requests_by_address_method
114 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
115 let total_ok_responses_handle_object_info_request = metrics_base
116 .total_responses_by_address_method
117 .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
118
119 let handle_transaction_latency = metrics_base
120 .latency
121 .with_label_values(&["handle_transaction"]);
122 let handle_certificate_latency = metrics_base
123 .latency
124 .with_label_values(&["handle_certificate"]);
125 let handle_obj_info_latency = metrics_base
126 .latency
127 .with_label_values(&["handle_object_info_request"]);
128 let handle_tx_info_latency = metrics_base
129 .latency
130 .with_label_values(&["handle_transaction_info_request"]);
131
132 Self {
133 total_requests_handle_transaction_info_request,
134 total_ok_responses_handle_transaction_info_request,
135 total_requests_handle_object_info_request,
136 total_ok_responses_handle_object_info_request,
137 handle_transaction_latency,
138 handle_certificate_latency,
139 handle_obj_info_latency,
140 handle_tx_info_latency,
141 }
142 }
143
144 pub fn new_for_tests(validator_address: AuthorityName) -> Self {
145 let registry = Registry::new();
146 let metrics_base = SafeClientMetricsBase::new(®istry);
147 Self::new(&metrics_base, validator_address)
148 }
149}
150
151#[derive(Clone)]
154pub struct SafeClient<C>
155where
156 C: Clone,
157{
158 authority_client: C,
159 committee_store: Arc<CommitteeStore>,
160 address: AuthorityPublicKeyBytes,
161 metrics: SafeClientMetrics,
162}
163
164impl<C: Clone> SafeClient<C> {
165 pub fn new(
166 authority_client: C,
167 committee_store: Arc<CommitteeStore>,
168 address: AuthorityPublicKeyBytes,
169 metrics: SafeClientMetrics,
170 ) -> Self {
171 Self {
172 authority_client,
173 committee_store,
174 address,
175 metrics,
176 }
177 }
178}
179
180impl<C: Clone> SafeClient<C> {
181 pub fn authority_client(&self) -> &C {
182 &self.authority_client
183 }
184
185 #[cfg(test)]
186 pub fn authority_client_mut(&mut self) -> &mut C {
187 &mut self.authority_client
188 }
189
190 fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Arc<Committee>> {
191 self.committee_store
192 .get_committee(epoch_id)?
193 .ok_or(IotaError::MissingCommitteeAtEpoch(*epoch_id))
194 }
195
196 fn check_signed_effects_plain(
197 &self,
198 digest: &TransactionDigest,
199 signed_effects: SignedTransactionEffects,
200 expected_effects_digest: Option<&TransactionEffectsDigest>,
201 ) -> IotaResult<SignedTransactionEffects> {
202 fp_ensure!(
204 signed_effects.auth_sig().authority == self.address,
205 IotaError::ByzantineAuthoritySuspicion {
206 authority: self.address,
207 reason: format!(
208 "Unexpected validator address in the signed effects signature: {:?}",
209 signed_effects.auth_sig().authority
210 ),
211 }
212 );
213 fp_ensure!(
215 signed_effects.data().transaction_digest() == digest,
216 IotaError::ByzantineAuthoritySuspicion {
217 authority: self.address,
218 reason: "Unexpected tx digest in the signed effects".to_string()
219 }
220 );
221 if let Some(effects_digest) = expected_effects_digest {
223 fp_ensure!(
224 signed_effects.digest() == effects_digest,
225 IotaError::ByzantineAuthoritySuspicion {
226 authority: self.address,
227 reason: "Effects digest does not match with expected digest".to_string()
228 }
229 );
230 }
231 self.get_committee(&signed_effects.epoch())?;
232 Ok(signed_effects)
233 }
234
235 fn check_transaction_info(
236 &self,
237 digest: &TransactionDigest,
238 transaction: Transaction,
239 status: TransactionStatus,
240 ) -> IotaResult<PlainTransactionInfoResponse> {
241 fp_ensure!(
242 digest == transaction.digest(),
243 IotaError::ByzantineAuthoritySuspicion {
244 authority: self.address,
245 reason: "Signed transaction digest does not match with expected digest".to_string()
246 }
247 );
248 match status {
249 TransactionStatus::Signed(signed) => {
250 self.get_committee(&signed.epoch)?;
251 Ok(PlainTransactionInfoResponse::Signed(
252 SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
253 ))
254 }
255 TransactionStatus::Executed(cert_opt, effects, events) => {
256 let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
257 match cert_opt {
258 Some(cert) => {
259 let committee = self.get_committee(&cert.epoch)?;
260 let ct = CertifiedTransaction::new_from_data_and_sig(
261 transaction.into_data(),
262 cert,
263 );
264 ct.verify_committee_sigs_only(&committee).map_err(|e| {
265 IotaError::FailedToVerifyTxCertWithExecutedEffects {
266 validator_name: self.address,
267 error: e.to_string(),
268 }
269 })?;
270 Ok(PlainTransactionInfoResponse::ExecutedWithCert(
271 ct,
272 signed_effects,
273 events,
274 ))
275 }
276 None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
277 transaction,
278 signed_effects,
279 events,
280 )),
281 }
282 }
283 }
284 }
285
286 fn check_object_response(
287 &self,
288 request: &ObjectInfoRequest,
289 response: ObjectInfoResponse,
290 ) -> IotaResult<VerifiedObjectInfoResponse> {
291 let ObjectInfoResponse {
292 object,
293 layout: _,
294 lock_for_debugging: _,
295 } = response;
296
297 fp_ensure!(
298 request.object_id == object.id(),
299 IotaError::ByzantineAuthoritySuspicion {
300 authority: self.address,
301 reason: "Object id mismatch in the response".to_string()
302 }
303 );
304
305 Ok(VerifiedObjectInfoResponse { object })
306 }
307
308 pub fn address(&self) -> &AuthorityPublicKeyBytes {
309 &self.address
310 }
311}
312
313impl<C> SafeClient<C>
314where
315 C: AuthorityAPI + Send + Sync + Clone + 'static,
316{
317 pub async fn handle_transaction(
319 &self,
320 transaction: Transaction,
321 client_addr: Option<SocketAddr>,
322 ) -> Result<PlainTransactionInfoResponse, IotaError> {
323 let _timer = self.metrics.handle_transaction_latency.start_timer();
324 let digest = *transaction.digest();
325 let response = self
326 .authority_client
327 .handle_transaction(transaction.clone(), client_addr)
328 .await?;
329 let response = check_error!(
330 self.address,
331 self.check_transaction_info(&digest, transaction, response.status),
332 "Client error in handle_transaction"
333 )?;
334 Ok(response)
335 }
336
337 fn verify_certificate_response_v1(
338 &self,
339 digest: &TransactionDigest,
340 HandleCertificateResponseV1 {
341 signed_effects,
342 events,
343 input_objects,
344 output_objects,
345 auxiliary_data,
346 }: HandleCertificateResponseV1,
347 ) -> IotaResult<HandleCertificateResponseV1> {
348 let signed_effects = self.check_signed_effects_plain(digest, signed_effects, None)?;
349
350 match (&events, signed_effects.events_digest()) {
352 (None, None) | (None, Some(_)) => {}
353 (Some(events), None) => {
354 if !events.data.is_empty() {
355 return Err(IotaError::ByzantineAuthoritySuspicion {
356 authority: self.address,
357 reason: "Returned events but no event digest present in the signed effects"
358 .to_string(),
359 });
360 }
361 }
362 (Some(events), Some(events_digest)) => {
363 fp_ensure!(
364 &events.digest() == events_digest,
365 IotaError::ByzantineAuthoritySuspicion {
366 authority: self.address,
367 reason: "Returned events don't match events digest in the signed effects"
368 .to_string()
369 }
370 );
371 }
372 }
373
374 if let Some(input_objects) = &input_objects {
376 let expected: HashMap<_, _> = signed_effects
377 .old_object_metadata()
378 .into_iter()
379 .map(|(object_ref, _owner)| (object_ref.0, object_ref))
380 .collect();
381
382 for object in input_objects {
383 let object_ref = object.compute_object_reference();
384 if expected
385 .get(&object_ref.0)
386 .is_none_or(|expect| &object_ref != expect)
387 {
388 return Err(IotaError::ByzantineAuthoritySuspicion {
389 authority: self.address,
390 reason: "Returned input object that wasn't present in the signed effects"
391 .to_string(),
392 });
393 }
394 }
395 }
396
397 if let Some(output_objects) = &output_objects {
399 let expected: HashMap<_, _> = signed_effects
400 .all_changed_objects()
401 .into_iter()
402 .map(|(object_ref, _, _)| (object_ref.0, object_ref))
403 .collect();
404
405 for object in output_objects {
406 let object_ref = object.compute_object_reference();
407 if expected
408 .get(&object_ref.0)
409 .is_none_or(|expect| &object_ref != expect)
410 {
411 return Err(IotaError::ByzantineAuthoritySuspicion {
412 authority: self.address,
413 reason: "Returned output object that wasn't present in the signed effects"
414 .to_string(),
415 });
416 }
417 }
418 }
419
420 Ok(HandleCertificateResponseV1 {
421 signed_effects,
422 events,
423 input_objects,
424 output_objects,
425 auxiliary_data,
426 })
427 }
428
429 pub async fn handle_certificate_v1(
431 &self,
432 request: HandleCertificateRequestV1,
433 client_addr: Option<SocketAddr>,
434 ) -> Result<HandleCertificateResponseV1, IotaError> {
435 let digest = *request.certificate.digest();
436 let _timer = self.metrics.handle_certificate_latency.start_timer();
437 let response = self
438 .authority_client
439 .handle_certificate_v1(request, client_addr)
440 .await?;
441
442 let verified = check_error!(
443 self.address,
444 self.verify_certificate_response_v1(&digest, response),
445 "Client error in handle_certificate"
446 )?;
447 Ok(verified)
448 }
449
450 pub async fn handle_object_info_request(
451 &self,
452 request: ObjectInfoRequest,
453 ) -> Result<VerifiedObjectInfoResponse, IotaError> {
454 self.metrics.total_requests_handle_object_info_request.inc();
455
456 let _timer = self.metrics.handle_obj_info_latency.start_timer();
457 let response = self
458 .authority_client
459 .handle_object_info_request(request.clone())
460 .await?;
461 let response = self
462 .check_object_response(&request, response)
463 .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
464
465 self.metrics
466 .total_ok_responses_handle_object_info_request
467 .inc();
468 Ok(response)
469 }
470
471 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
473 pub async fn handle_transaction_info_request(
474 &self,
475 request: TransactionInfoRequest,
476 ) -> Result<PlainTransactionInfoResponse, IotaError> {
477 self.metrics
478 .total_requests_handle_transaction_info_request
479 .inc();
480
481 let _timer = self.metrics.handle_tx_info_latency.start_timer();
482
483 let transaction_info = self
484 .authority_client
485 .handle_transaction_info_request(request.clone())
486 .await?;
487
488 let transaction = Transaction::new(transaction_info.transaction);
489 let transaction_info = self.check_transaction_info(
490 &request.transaction_digest,
491 transaction,
492 transaction_info.status,
493 ).tap_err(|err| {
494 error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
495 })?;
496 self.metrics
497 .total_ok_responses_handle_transaction_info_request
498 .inc();
499 Ok(transaction_info)
500 }
501
502 #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
503 pub async fn handle_system_state_object(&self) -> Result<IotaSystemState, IotaError> {
504 self.authority_client
505 .handle_system_state_object(SystemStateRequest { _unused: false })
506 .await
507 }
508}