iota_core/
safe_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{collections::HashMap, net::SocketAddr, sync::Arc};
7
8use iota_metrics::histogram::{Histogram, HistogramVec};
9use iota_types::{
10    base_types::*,
11    committee::*,
12    crypto::AuthorityPublicKeyBytes,
13    effects::{SignedTransactionEffects, TransactionEffectsAPI},
14    error::{IotaError, IotaResult},
15    fp_ensure,
16    iota_system_state::IotaSystemState,
17    messages_grpc::{
18        HandleCertificateRequestV1, HandleCertificateResponseV1, ObjectInfoRequest,
19        ObjectInfoResponse, SystemStateRequest, TransactionInfoRequest, TransactionStatus,
20        VerifiedObjectInfoResponse,
21    },
22    messages_safe_client::PlainTransactionInfoResponse,
23    transaction::*,
24};
25use prometheus::{
26    IntCounterVec, Registry, core::GenericCounter, register_int_counter_vec_with_registry,
27};
28use tap::TapFallible;
29use tracing::{debug, error, instrument};
30
31use crate::{authority_client::AuthorityAPI, epoch::committee_store::CommitteeStore};
32
33macro_rules! check_error {
34    ($address:expr, $cond:expr, $msg:expr) => {
35        $cond.tap_err(|err| {
36            if err.individual_error_indicates_epoch_change() {
37                debug!(?err, authority=?$address, "Not a real client error");
38            } else {
39                error!(?err, authority=?$address, $msg);
40            }
41        })
42    }
43}
44
45#[derive(Clone)]
46pub struct SafeClientMetricsBase {
47    total_requests_by_address_method: IntCounterVec,
48    total_responses_by_address_method: IntCounterVec,
49    latency: HistogramVec,
50}
51
52impl SafeClientMetricsBase {
53    pub fn new(registry: &Registry) -> Self {
54        Self {
55            total_requests_by_address_method: register_int_counter_vec_with_registry!(
56                "safe_client_total_requests_by_address_method",
57                "Total requests to validators group by address and method",
58                &["address", "method"],
59                registry,
60            )
61            .unwrap(),
62            total_responses_by_address_method: register_int_counter_vec_with_registry!(
63                "safe_client_total_responses_by_address_method",
64                "Total good (OK) responses from validators group by address and method",
65                &["address", "method"],
66                registry,
67            )
68            .unwrap(),
69            latency: HistogramVec::new_in_registry(
70                "safe_client_latency",
71                "RPC latency observed by safe client aggregator, group by address and method",
72                &["address", "method"],
73                registry,
74            ),
75        }
76    }
77}
78
79/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
80#[derive(Clone)]
81pub struct SafeClientMetrics {
82    total_requests_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
83    total_ok_responses_handle_transaction_info_request: GenericCounter<prometheus::core::AtomicU64>,
84    total_requests_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
85    total_ok_responses_handle_object_info_request: GenericCounter<prometheus::core::AtomicU64>,
86    handle_transaction_latency: Histogram,
87    handle_certificate_latency: Histogram,
88    handle_obj_info_latency: Histogram,
89    handle_tx_info_latency: Histogram,
90}
91
92impl SafeClientMetrics {
93    pub fn new(metrics_base: &SafeClientMetricsBase, validator_address: AuthorityName) -> Self {
94        let validator_address = validator_address.to_string();
95
96        let total_requests_handle_transaction_info_request = metrics_base
97            .total_requests_by_address_method
98            .with_label_values(&[
99                validator_address.as_str(),
100                "handle_transaction_info_request",
101            ]);
102        let total_ok_responses_handle_transaction_info_request = metrics_base
103            .total_responses_by_address_method
104            .with_label_values(&[
105                validator_address.as_str(),
106                "handle_transaction_info_request",
107            ]);
108
109        let total_requests_handle_object_info_request = metrics_base
110            .total_requests_by_address_method
111            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
112        let total_ok_responses_handle_object_info_request = metrics_base
113            .total_responses_by_address_method
114            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
115
116        let handle_transaction_latency = metrics_base
117            .latency
118            .with_label_values(&[validator_address.as_str(), "handle_transaction"]);
119        let handle_certificate_latency = metrics_base
120            .latency
121            .with_label_values(&[validator_address.as_str(), "handle_certificate"]);
122        let handle_obj_info_latency = metrics_base
123            .latency
124            .with_label_values(&[validator_address.as_str(), "handle_object_info_request"]);
125        let handle_tx_info_latency = metrics_base.latency.with_label_values(&[
126            validator_address.as_str(),
127            "handle_transaction_info_request",
128        ]);
129
130        Self {
131            total_requests_handle_transaction_info_request,
132            total_ok_responses_handle_transaction_info_request,
133            total_requests_handle_object_info_request,
134            total_ok_responses_handle_object_info_request,
135            handle_transaction_latency,
136            handle_certificate_latency,
137            handle_obj_info_latency,
138            handle_tx_info_latency,
139        }
140    }
141
142    pub fn new_for_tests(validator_address: AuthorityName) -> Self {
143        let registry = Registry::new();
144        let metrics_base = SafeClientMetricsBase::new(&registry);
145        Self::new(&metrics_base, validator_address)
146    }
147}
148
149/// See `SafeClientMetrics::new` for description of each metrics.
150/// The metrics are per validator client.
151#[derive(Clone)]
152pub struct SafeClient<C>
153where
154    C: Clone,
155{
156    authority_client: C,
157    committee_store: Arc<CommitteeStore>,
158    address: AuthorityPublicKeyBytes,
159    metrics: SafeClientMetrics,
160}
161
162impl<C: Clone> SafeClient<C> {
163    pub fn new(
164        authority_client: C,
165        committee_store: Arc<CommitteeStore>,
166        address: AuthorityPublicKeyBytes,
167        metrics: SafeClientMetrics,
168    ) -> Self {
169        Self {
170            authority_client,
171            committee_store,
172            address,
173            metrics,
174        }
175    }
176}
177
178impl<C: Clone> SafeClient<C> {
179    pub fn authority_client(&self) -> &C {
180        &self.authority_client
181    }
182
183    #[cfg(test)]
184    pub fn authority_client_mut(&mut self) -> &mut C {
185        &mut self.authority_client
186    }
187
188    fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Arc<Committee>> {
189        self.committee_store
190            .get_committee(epoch_id)?
191            .ok_or(IotaError::MissingCommitteeAtEpoch(*epoch_id))
192    }
193
194    fn check_signed_effects_plain(
195        &self,
196        digest: &TransactionDigest,
197        signed_effects: SignedTransactionEffects,
198        expected_effects_digest: Option<&TransactionEffectsDigest>,
199    ) -> IotaResult<SignedTransactionEffects> {
200        // Check it has the right signer
201        fp_ensure!(
202            signed_effects.auth_sig().authority == self.address,
203            IotaError::ByzantineAuthoritySuspicion {
204                authority: self.address,
205                reason: format!(
206                    "Unexpected validator address in the signed effects signature: {:?}",
207                    signed_effects.auth_sig().authority
208                ),
209            }
210        );
211        // Checks it concerns the right tx
212        fp_ensure!(
213            signed_effects.data().transaction_digest() == digest,
214            IotaError::ByzantineAuthoritySuspicion {
215                authority: self.address,
216                reason: "Unexpected tx digest in the signed effects".to_string()
217            }
218        );
219        // check that the effects digest is correct.
220        if let Some(effects_digest) = expected_effects_digest {
221            fp_ensure!(
222                signed_effects.digest() == effects_digest,
223                IotaError::ByzantineAuthoritySuspicion {
224                    authority: self.address,
225                    reason: "Effects digest does not match with expected digest".to_string()
226                }
227            );
228        }
229        self.get_committee(&signed_effects.epoch())?;
230        Ok(signed_effects)
231    }
232
233    fn check_transaction_info(
234        &self,
235        digest: &TransactionDigest,
236        transaction: Transaction,
237        status: TransactionStatus,
238    ) -> IotaResult<PlainTransactionInfoResponse> {
239        fp_ensure!(
240            digest == transaction.digest(),
241            IotaError::ByzantineAuthoritySuspicion {
242                authority: self.address,
243                reason: "Signed transaction digest does not match with expected digest".to_string()
244            }
245        );
246        match status {
247            TransactionStatus::Signed(signed) => {
248                self.get_committee(&signed.epoch)?;
249                Ok(PlainTransactionInfoResponse::Signed(
250                    SignedTransaction::new_from_data_and_sig(transaction.into_data(), signed),
251                ))
252            }
253            TransactionStatus::Executed(cert_opt, effects, events) => {
254                let signed_effects = self.check_signed_effects_plain(digest, effects, None)?;
255                match cert_opt {
256                    Some(cert) => {
257                        let committee = self.get_committee(&cert.epoch)?;
258                        let ct = CertifiedTransaction::new_from_data_and_sig(
259                            transaction.into_data(),
260                            cert,
261                        );
262                        ct.verify_committee_sigs_only(&committee).map_err(|e| {
263                            IotaError::FailedToVerifyTxCertWithExecutedEffects {
264                                validator_name: self.address,
265                                error: e.to_string(),
266                            }
267                        })?;
268                        Ok(PlainTransactionInfoResponse::ExecutedWithCert(
269                            ct,
270                            signed_effects,
271                            events,
272                        ))
273                    }
274                    None => Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(
275                        transaction,
276                        signed_effects,
277                        events,
278                    )),
279                }
280            }
281        }
282    }
283
284    fn check_object_response(
285        &self,
286        request: &ObjectInfoRequest,
287        response: ObjectInfoResponse,
288    ) -> IotaResult<VerifiedObjectInfoResponse> {
289        let ObjectInfoResponse {
290            object,
291            layout: _,
292            lock_for_debugging: _,
293        } = response;
294
295        fp_ensure!(
296            request.object_id == object.id(),
297            IotaError::ByzantineAuthoritySuspicion {
298                authority: self.address,
299                reason: "Object id mismatch in the response".to_string()
300            }
301        );
302
303        Ok(VerifiedObjectInfoResponse { object })
304    }
305
306    pub fn address(&self) -> &AuthorityPublicKeyBytes {
307        &self.address
308    }
309}
310
311impl<C> SafeClient<C>
312where
313    C: AuthorityAPI + Send + Sync + Clone + 'static,
314{
315    /// Initiate a new transfer to an IOTA or Primary account.
316    pub async fn handle_transaction(
317        &self,
318        transaction: Transaction,
319        client_addr: Option<SocketAddr>,
320    ) -> Result<PlainTransactionInfoResponse, IotaError> {
321        let _timer = self.metrics.handle_transaction_latency.start_timer();
322        let digest = *transaction.digest();
323        let response = self
324            .authority_client
325            .handle_transaction(transaction.clone(), client_addr)
326            .await?;
327        let response = check_error!(
328            self.address,
329            self.check_transaction_info(&digest, transaction, response.status),
330            "Client error in handle_transaction"
331        )?;
332        Ok(response)
333    }
334
335    fn verify_certificate_response_v1(
336        &self,
337        digest: &TransactionDigest,
338        HandleCertificateResponseV1 {
339            signed_effects,
340            events,
341            input_objects,
342            output_objects,
343            auxiliary_data,
344        }: HandleCertificateResponseV1,
345    ) -> IotaResult<HandleCertificateResponseV1> {
346        let signed_effects = self.check_signed_effects_plain(digest, signed_effects, None)?;
347
348        // Check Events
349        match (&events, signed_effects.events_digest()) {
350            (None, None) | (None, Some(_)) => {}
351            (Some(events), None) => {
352                if !events.data.is_empty() {
353                    return Err(IotaError::ByzantineAuthoritySuspicion {
354                        authority: self.address,
355                        reason: "Returned events but no event digest present in the signed effects"
356                            .to_string(),
357                    });
358                }
359            }
360            (Some(events), Some(events_digest)) => {
361                fp_ensure!(
362                    &events.digest() == events_digest,
363                    IotaError::ByzantineAuthoritySuspicion {
364                        authority: self.address,
365                        reason: "Returned events don't match events digest in the signed effects"
366                            .to_string()
367                    }
368                );
369            }
370        }
371
372        // Check Input Objects
373        if let Some(input_objects) = &input_objects {
374            let expected: HashMap<_, _> = signed_effects
375                .old_object_metadata()
376                .into_iter()
377                .map(|(object_ref, _owner)| (object_ref.0, object_ref))
378                .collect();
379
380            for object in input_objects {
381                let object_ref = object.compute_object_reference();
382                if expected
383                    .get(&object_ref.0)
384                    .is_none_or(|expect| &object_ref != expect)
385                {
386                    return Err(IotaError::ByzantineAuthoritySuspicion {
387                        authority: self.address,
388                        reason: "Returned input object that wasn't present in the signed effects"
389                            .to_string(),
390                    });
391                }
392            }
393        }
394
395        // Check Output Objects
396        if let Some(output_objects) = &output_objects {
397            let expected: HashMap<_, _> = signed_effects
398                .all_changed_objects()
399                .into_iter()
400                .map(|(object_ref, _, _)| (object_ref.0, object_ref))
401                .collect();
402
403            for object in output_objects {
404                let object_ref = object.compute_object_reference();
405                if expected
406                    .get(&object_ref.0)
407                    .is_none_or(|expect| &object_ref != expect)
408                {
409                    return Err(IotaError::ByzantineAuthoritySuspicion {
410                        authority: self.address,
411                        reason: "Returned output object that wasn't present in the signed effects"
412                            .to_string(),
413                    });
414                }
415            }
416        }
417
418        Ok(HandleCertificateResponseV1 {
419            signed_effects,
420            events,
421            input_objects,
422            output_objects,
423            auxiliary_data,
424        })
425    }
426
427    /// Execute a certificate.
428    pub async fn handle_certificate_v1(
429        &self,
430        request: HandleCertificateRequestV1,
431        client_addr: Option<SocketAddr>,
432    ) -> Result<HandleCertificateResponseV1, IotaError> {
433        let digest = *request.certificate.digest();
434        let _timer = self.metrics.handle_certificate_latency.start_timer();
435        let response = self
436            .authority_client
437            .handle_certificate_v1(request, client_addr)
438            .await?;
439
440        let verified = check_error!(
441            self.address,
442            self.verify_certificate_response_v1(&digest, response),
443            "Client error in handle_certificate"
444        )?;
445        Ok(verified)
446    }
447
448    pub async fn handle_object_info_request(
449        &self,
450        request: ObjectInfoRequest,
451    ) -> Result<VerifiedObjectInfoResponse, IotaError> {
452        self.metrics.total_requests_handle_object_info_request.inc();
453
454        let _timer = self.metrics.handle_obj_info_latency.start_timer();
455        let response = self
456            .authority_client
457            .handle_object_info_request(request.clone())
458            .await?;
459        let response = self
460            .check_object_response(&request, response)
461            .tap_err(|err| error!(?err, authority=?self.address, "Client error in handle_object_info_request"))?;
462
463        self.metrics
464            .total_ok_responses_handle_object_info_request
465            .inc();
466        Ok(response)
467    }
468
469    /// Handle Transaction information requests for a given digest.
470    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
471    pub async fn handle_transaction_info_request(
472        &self,
473        request: TransactionInfoRequest,
474    ) -> Result<PlainTransactionInfoResponse, IotaError> {
475        self.metrics
476            .total_requests_handle_transaction_info_request
477            .inc();
478
479        let _timer = self.metrics.handle_tx_info_latency.start_timer();
480
481        let transaction_info = self
482            .authority_client
483            .handle_transaction_info_request(request.clone())
484            .await?;
485
486        let transaction = Transaction::new(transaction_info.transaction);
487        let transaction_info = self.check_transaction_info(
488            &request.transaction_digest,
489            transaction,
490            transaction_info.status,
491        ).tap_err(|err| {
492            error!(?err, authority=?self.address, "Client error in handle_transaction_info_request");
493        })?;
494        self.metrics
495            .total_ok_responses_handle_transaction_info_request
496            .inc();
497        Ok(transaction_info)
498    }
499
500    #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
501    pub async fn handle_system_state_object(&self) -> Result<IotaSystemState, IotaError> {
502        self.authority_client
503            .handle_system_state_object(SystemStateRequest { _unused: false })
504            .await
505    }
506}