iota_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{collections::HashMap, net::SocketAddr, sync::Arc};
7
8use iota_types::{
9    base_types::*,
10    committee::*,
11    crypto::AuthorityPublicKeyBytes,
12    effects::{SignedTransactionEffects, TransactionEffectsAPI},
13    error::{IotaError, IotaResult},
14    fp_ensure,
15    iota_system_state::IotaSystemState,
16    messages_grpc::{
17        HandleCertificateRequestV1, HandleCertificateResponseV1, ObjectInfoRequest,
18        ObjectInfoResponse, SystemStateRequest, TransactionInfoRequest, TransactionStatus,
19        VerifiedObjectInfoResponse,
20    },
21    messages_safe_client::PlainTransactionInfoResponse,
22    transaction::*,
23};
24use prometheus::{
25    Histogram, HistogramVec, IntCounterVec, Registry, core::GenericCounter,
26    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
27};
28use tap::TapFallible;
29use tracing::{debug, error, instrument};
30
31use crate::{authority_client::AuthorityAPI, epoch::committee_store::CommitteeStore};
32
33macro_rules! check_error {
34    ($address:expr, $cond:expr, $msg:expr) => {
35        $cond.tap_err(|err| {
36            if err.individual_error_indicates_epoch_change() {
37                debug!(?err, authority=?$address, "Not a real client error");
38            } else {
39                error!(?err, authority=?$address, $msg);
40            }
41        })
42    }
43}
44
45#[derive(Clone)]
46pub struct SafeClientMetricsBase {
47    total_requests_by_address_method: IntCounterVec,
48    total_responses_by_address_method: IntCounterVec,
49    latency: HistogramVec,
50}
51
52impl SafeClientMetricsBase {
53    pub fn new(registry: &Registry) -> Self {
54        Self {
55            total_requests_by_address_method: register_int_counter_vec_with_registry!(
56                "safe_client_total_requests_by_address_method",
57                "Total requests to validators group by address and method",
58                &["address", "method"],
59                registry,
60            )
61            .unwrap(),
62            total_responses_by_address_method: register_int_counter_vec_with_registry!(
63                "safe_client_total_responses_by_address_method",
64                "Total good (OK) responses from validators group by address and method",
65                &["address", "method"],
66                registry,
67            )
68            .unwrap(),
69            // Address label is removed to reduce high cardinality, can be added back if needed
70            latency: register_histogram_vec_with_registry!(
71                "safe_client_latency",
72                "RPC latency observed by safe client aggregator, group by method",
73                &["method"],
74                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
75                registry,
76            )
77            .unwrap(),
78        }
79    }
80}
81
82/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
83#[derive(Clone)]
84pub struct SafeClientMetrics {
85    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
86    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
87    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
88    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
89    handle_transaction_latency: Histogram,
90    handle_certificate_latency: Histogram,
91    handle_obj_info_latency: Histogram,
92    handle_tx_info_latency: Histogram,
93}
94
95impl SafeClientMetrics {
96    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
97        let validator_address = validator_address.to_string();
98
99        let total_requests_handle_transaction_info_request = metrics_base
100            .total_requests_by_address_method
101            .with_label_values(&[
102                validator_address.as_str(),
103                "handle_transaction_info_request",
104            ]);
105        let total_ok_responses_handle_transaction_info_request = metrics_base
106            .total_responses_by_address_method
107            .with_label_values(&[
108                validator_address.as_str(),
109                "handle_transaction_info_request",
110            ]);
111
112        let total_requests_handle_object_info_request = metrics_base
113            .total_requests_by_address_method
114            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
115        let total_ok_responses_handle_object_info_request = metrics_base
116            .total_responses_by_address_method
117            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
118
119        let handle_transaction_latency = metrics_base
120            .latency
121            .with_label_values(&["handle_transaction"]);
122        let handle_certificate_latency = metrics_base
123            .latency
124            .with_label_values(&["handle_certificate"]);
125        let handle_obj_info_latency = metrics_base
126            .latency
127            .with_label_values(&["handle_object_info_request"]);
128        let handle_tx_info_latency = metrics_base
129            .latency
130            .with_label_values(&["handle_transaction_info_request"]);
131
132        Self {
133            total_requests_handle_transaction_info_request,
134            total_ok_responses_handle_transaction_info_request,
135            total_requests_handle_object_info_request,
136            total_ok_responses_handle_object_info_request,
137            handle_transaction_latency,
138            handle_certificate_latency,
139            handle_obj_info_latency,
140            handle_tx_info_latency,
141        }
142    }
143
144    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
145        let registry = Registry::new();
146        let metrics_base = SafeClientMetricsBase::new(&registry);
147        Self::new(&metrics_base, validator_address)
148    }
149}
150
151/// See `SafeClientMetrics::new` for description of each metrics.
152/// The metrics are per validator client.
153#[derive(Clone)]
154pub struct SafeClient<C>
155where
156    C: Clone,
157{
158    authority_client: C,
159    committee_store: Arc<CommitteeStore>,
160    address: AuthorityPublicKeyBytes,
161    metrics: SafeClientMetrics,
162}
163
164impl<C: Clone> SafeClient<C> {
165    pub fn new(
166        authority_client: C,
167        committee_store: Arc<CommitteeStore>,
168        address: AuthorityPublicKeyBytes,
169        metrics: SafeClientMetrics,
170    ) -> Self {
171        Self {
172            authority_client,
173            committee_store,
174            address,
175            metrics,
176        }
177    }
178}
179
180impl<C: Clone> SafeClient<C> {
181    pub fn authority_client(&self) -> &C {
182        &self.authority_client
183    }
184
185    #[cfg(test)]
186    pub fn authority_client_mut(&mut self) -> &mut C {
187        &mut self.authority_client
188    }
189
190    fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Arc<Committee>> {
191        self.committee_store
192            .get_committee(epoch_id)?
193            .ok_or(IotaError::MissingCommitteeAtEpoch(*epoch_id))
194    }
195
196    fn check_signed_effects_plain(
197        &self,
198        digest: &TransactionDigest,
199        signed_effects: SignedTransactionEffects,
200        expected_effects_digest: Option<&TransactionEffectsDigest>,
201    ) -> IotaResult<SignedTransactionEffects> {
202        // Check it has the right signer
203        fp_ensure!(
204            signed_effects.auth_sig().authority == self.address,
205            IotaError::ByzantineAuthoritySuspicion {
206                authority: self.address,
207                reason: format!(
208                    "Unexpected validator address in the signed effects signature: {:?}",
209                    signed_effects.auth_sig().authority
210                ),
211            }
212        );
213        // Checks it concerns the right tx
214        fp_ensure!(
215            signed_effects.data().transaction_digest() == digest,
216            IotaError::ByzantineAuthoritySuspicion {
217                authority: self.address,
218                reason: "Unexpected tx digest in the signed effects".to_string()
219            }
220        );
221        // check that the effects digest is correct.
222        if let Some(effects_digest) = expected_effects_digest {
223            fp_ensure!(
224                signed_effects.digest() == effects_digest,
225                IotaError::ByzantineAuthoritySuspicion {
226                    authority: self.address,
227                    reason: "Effects digest does not match with expected digest".to_string()
228                }
229            );
230        }
231        self.get_committee(&signed_effects.epoch())?;
232        Ok(signed_effects)
233    }
234
235    fn check_transaction_info(
236        &self,
237        digest: &TransactionDigest,
238        transaction: Transaction,
239        status: TransactionStatus,
240    ) -> IotaResult<PlainTransactionInfoResponse> {
241        fp_ensure!(
242            digest == transaction.digest(),
243            IotaError::ByzantineAuthoritySuspicion {
244                authority: self.address,
245                reason: "Signed transaction digest does not match with expected digest".to_string()
246            }
247        );
248        match status {
249            TransactionStatus::Signed(signed) => {
250                self.get_committee(&signed.epoch)?;
251                Ok(PlainTransactionInfoResponse::Signed(
252                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
253                ))
254            }
255            TransactionStatus::Executed(cert_opt, effects, events) => {
256                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
257                match cert_opt {
258                    Some(cert) => {
259                        let committee = self.get_committee(&cert.epoch)?;
260                        let ct = CertifiedTransaction::new_from_data_and_sig(
261                            transaction.into_data(),
262                            cert,
263                        );
264                        ct.verify_committee_sigs_only(&committee).map_err(|e| {
265                            IotaError::FailedToVerifyTxCertWithExecutedEffects {
266                                validator_name: self.address,
267                                error: e.to_string(),
268                            }
269                        })?;
270                        Ok(PlainTransactionInfoResponse::ExecutedWithCert(
271                            ct,
272                            signed_effects,
273                            events,
274                        ))
275                    }
276                    None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
277                        transaction,
278                        signed_effects,
279                        events,
280                    )),
281                }
282            }
283        }
284    }
285
286    fn check_object_response(
287        &self,
288        request: &ObjectInfoRequest,
289        response: ObjectInfoResponse,
290    ) -> IotaResult<VerifiedObjectInfoResponse> {
291        let ObjectInfoResponse {
292            object,
293            layout: _,
294            lock_for_debugging: _,
295        } = response;
296
297        fp_ensure!(
298            request.object_id == object.id(),
299            IotaError::ByzantineAuthoritySuspicion {
300                authority: self.address,
301                reason: "Object id mismatch in the response".to_string()
302            }
303        );
304
305        Ok(VerifiedObjectInfoResponse { object })
306    }
307
308    pub fn address(&self) -> &AuthorityPublicKeyBytes {
309        &self.address
310    }
311}
312
313impl<C> SafeClient<C>
314where
315    C: AuthorityAPI + Send + Sync + Clone + 'static,
316{
317    /// Initiate a new transfer to an IOTA or Primary account.
318    pub async fn handle_transaction(
319        &self,
320        transaction: Transaction,
321        client_addr: Option<SocketAddr>,
322    ) -> Result<PlainTransactionInfoResponse, IotaError> {
323        let _timer = self.metrics.handle_transaction_latency.start_timer();
324        let digest = *transaction.digest();
325        let response = self
326            .authority_client
327            .handle_transaction(transaction.clone(), client_addr)
328            .await?;
329        let response = check_error!(
330            self.address,
331            self.check_transaction_info(&digest, transaction, response.status),
332            "Client error in handle_transaction"
333        )?;
334        Ok(response)
335    }
336
337    fn verify_certificate_response_v1(
338        &self,
339        digest: &TransactionDigest,
340        HandleCertificateResponseV1 {
341            signed_effects,
342            events,
343            input_objects,
344            output_objects,
345            auxiliary_data,
346        }: HandleCertificateResponseV1,
347    ) -> IotaResult<HandleCertificateResponseV1> {
348        let signed_effects = self.check_signed_effects_plain(digest, signed_effects, None)?;
349
350        // Check Events
351        match (&events, signed_effects.events_digest()) {
352            (None, None) | (None, Some(_)) => {}
353            (Some(events), None) => {
354                if !events.data.is_empty() {
355                    return Err(IotaError::ByzantineAuthoritySuspicion {
356                        authority: self.address,
357                        reason: "Returned events but no event digest present in the signed effects"
358                            .to_string(),
359                    });
360                }
361            }
362            (Some(events), Some(events_digest)) => {
363                fp_ensure!(
364                    &events.digest() == events_digest,
365                    IotaError::ByzantineAuthoritySuspicion {
366                        authority: self.address,
367                        reason: "Returned events don't match events digest in the signed effects"
368                            .to_string()
369                    }
370                );
371            }
372        }
373
374        // Check Input Objects
375        if let Some(input_objects) = &input_objects {
376            let expected: HashMap<_, _> = signed_effects
377                .old_object_metadata()
378                .into_iter()
379                .map(|(object_ref, _owner)| (object_ref.0, object_ref))
380                .collect();
381
382            for object in input_objects {
383                let object_ref = object.compute_object_reference();
384                if expected
385                    .get(&object_ref.0)
386                    .is_none_or(|expect| &object_ref != expect)
387                {
388                    return Err(IotaError::ByzantineAuthoritySuspicion {
389                        authority: self.address,
390                        reason: "Returned input object that wasn't present in the signed effects"
391                            .to_string(),
392                    });
393                }
394            }
395        }
396
397        // Check Output Objects
398        if let Some(output_objects) = &output_objects {
399            let expected: HashMap<_, _> = signed_effects
400                .all_changed_objects()
401                .into_iter()
402                .map(|(object_ref, _, _)| (object_ref.0, object_ref))
403                .collect();
404
405            for object in output_objects {
406                let object_ref = object.compute_object_reference();
407                if expected
408                    .get(&object_ref.0)
409                    .is_none_or(|expect| &object_ref != expect)
410                {
411                    return Err(IotaError::ByzantineAuthoritySuspicion {
412                        authority: self.address,
413                        reason: "Returned output object that wasn't present in the signed effects"
414                            .to_string(),
415                    });
416                }
417            }
418        }
419
420        Ok(HandleCertificateResponseV1 {
421            signed_effects,
422            events,
423            input_objects,
424            output_objects,
425            auxiliary_data,
426        })
427    }
428
429    /// Execute a certificate.
430    pub async fn handle_certificate_v1(
431        &self,
432        request: HandleCertificateRequestV1,
433        client_addr: Option<SocketAddr>,
434    ) -> Result<HandleCertificateResponseV1, IotaError> {
435        let digest = *request.certificate.digest();
436        let _timer = self.metrics.handle_certificate_latency.start_timer();
437        let response = self
438            .authority_client
439            .handle_certificate_v1(request, client_addr)
440            .await?;
441
442        let verified = check_error!(
443            self.address,
444            self.verify_certificate_response_v1(&digest, response),
445            "Client error in handle_certificate"
446        )?;
447        Ok(verified)
448    }
449
450    pub async fn handle_object_info_request(
451        &self,
452        request: ObjectInfoRequest,
453    ) -> Result<VerifiedObjectInfoResponse, IotaError> {
454        self.metrics.total_requests_handle_object_info_request.inc();
455
456        let _timer = self.metrics.handle_obj_info_latency.start_timer();
457        let response = self
458            .authority_client
459            .handle_object_info_request(request.clone())
460            .await?;
461        let response = self
462            .check_object_response(&request, response)
463            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
464
465        self.metrics
466            .total_ok_responses_handle_object_info_request
467            .inc();
468        Ok(response)
469    }
470
471    /// Handle Transaction information requests for a given digest.
472    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
473    pub async fn handle_transaction_info_request(
474        &self,
475        request: TransactionInfoRequest,
476    ) -> Result<PlainTransactionInfoResponse, IotaError> {
477        self.metrics
478            .total_requests_handle_transaction_info_request
479            .inc();
480
481        let _timer = self.metrics.handle_tx_info_latency.start_timer();
482
483        let transaction_info = self
484            .authority_client
485            .handle_transaction_info_request(request.clone())
486            .await?;
487
488        let transaction = Transaction::new(transaction_info.transaction);
489        let transaction_info = self.check_transaction_info(
490            &request.transaction_digest,
491            transaction,
492            transaction_info.status,
493        ).tap_err(|err| {
494            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
495        })?;
496        self.metrics
497            .total_ok_responses_handle_transaction_info_request
498            .inc();
499        Ok(transaction_info)
500    }
501
502    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
503    pub async fn handle_system_state_object(&self) -> Result<IotaSystemState, IotaError> {
504        self.authority_client
505            .handle_system_state_object(SystemStateRequest { _unused: false })
506            .await
507    }
508}