Skip to main content

iota_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    io,
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19    api::{Validator, ValidatorServer},
20    tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24    effects::TransactionEffectsAPI,
25    error::*,
26    fp_ensure,
27    iota_system_state::IotaSystemState,
28    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29    messages_consensus::ConsensusTransaction,
30    messages_grpc::{
31        HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
32        HandleCertificateRequestV1, HandleCertificateResponseV1,
33        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
34        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
35        SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
36        TransactionInfoResponse,
37    },
38    multiaddr::Multiaddr,
39    traffic_control::{ClientIdSource, Weight},
40    transaction::*,
41};
42use nonempty::{NonEmpty, nonempty};
43use prometheus::{
44    Gauge, Histogram, IntCounter, IntCounterVec, Registry, register_gauge_with_registry,
45    register_histogram_with_registry, register_int_counter_vec_with_registry,
46    register_int_counter_with_registry,
47};
48use tap::TapFallible;
49use tonic::{
50    metadata::{Ascii, MetadataValue},
51    transport::server::TcpConnectInfo,
52};
53use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
54
55use crate::{
56    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
57    checkpoints::CheckpointStore,
58    consensus_adapter::{
59        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
60    },
61    starfish_adapter::LazyStarfishClient,
62    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
63};
64
65#[cfg(test)]
66#[path = "unit_tests/server_tests.rs"]
67mod server_tests;
68
69/// A handle to the authority server.
70pub struct AuthorityServerHandle {
71    server_handle: iota_network_stack::server::Server,
72}
73
74impl AuthorityServerHandle {
75    /// Waits for the server to complete.
76    pub async fn join(self) -> Result<(), io::Error> {
77        self.server_handle.handle().wait_for_shutdown().await;
78        Ok(())
79    }
80
81    /// Kills the server.
82    pub async fn kill(self) -> Result<(), io::Error> {
83        self.server_handle.handle().shutdown().await;
84        Ok(())
85    }
86
87    /// Returns the address of the server.
88    pub fn address(&self) -> &Multiaddr {
89        self.server_handle.local_addr()
90    }
91}
92
93/// An authority server that is used for testing.
94pub struct AuthorityServer {
95    address: Multiaddr,
96    pub state: Arc<AuthorityState>,
97    consensus_adapter: Arc<ConsensusAdapter>,
98    pub metrics: Arc<ValidatorServiceMetrics>,
99}
100
101impl AuthorityServer {
102    /// Creates a new `AuthorityServer` for testing with a consensus adapter.
103    pub fn new_for_test_with_consensus_adapter(
104        state: Arc<AuthorityState>,
105        consensus_adapter: Arc<ConsensusAdapter>,
106    ) -> Self {
107        let address = new_local_tcp_address_for_testing();
108        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
109
110        Self {
111            address,
112            state,
113            consensus_adapter,
114            metrics,
115        }
116    }
117
118    /// Creates a new `AuthorityServer` for testing.
119    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
120        let consensus_adapter = Arc::new(ConsensusAdapter::new(
121            Arc::new(LazyStarfishClient::new()),
122            CheckpointStore::new_for_tests(),
123            state.name,
124            Arc::new(ConnectionMonitorStatusForTests {}),
125            100_000,
126            100_000,
127            None,
128            None,
129            ConsensusAdapterMetrics::new_test(),
130        ));
131        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
132    }
133
134    /// Spawns the server.
135    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
136        let address = self.address.clone();
137        self.spawn_with_bind_address_for_test(address).await
138    }
139
140    /// Spawns the server with a bind address.
141    pub async fn spawn_with_bind_address_for_test(
142        self,
143        address: Multiaddr,
144    ) -> Result<AuthorityServerHandle, io::Error> {
145        let tls_config = iota_tls::create_rustls_server_config(
146            self.state.config.network_key_pair().copy().private(),
147            IOTA_TLS_SERVER_NAME.to_string(),
148        );
149        let server = iota_network_stack::config::Config::new()
150            .server_builder()
151            .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
152                self.state,
153                self.consensus_adapter,
154                self.metrics,
155            )))
156            .bind(&address, Some(tls_config))
157            .await
158            .unwrap();
159        let local_addr = server.local_addr().to_owned();
160        info!("Listening to traffic on {local_addr}");
161        let handle = AuthorityServerHandle {
162            server_handle: server,
163        };
164        Ok(handle)
165    }
166}
167
168/// Metrics for the validator service.
169pub struct ValidatorServiceMetrics {
170    pub signature_errors: IntCounter,
171    pub tx_verification_latency: Histogram,
172    pub cert_verification_latency: Histogram,
173    pub consensus_latency: Histogram,
174    pub handle_transaction_latency: Histogram,
175    pub submit_certificate_consensus_latency: Histogram,
176    pub handle_certificate_consensus_latency: Histogram,
177    pub handle_certificate_non_consensus_latency: Histogram,
178    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
179    pub handle_soft_bundle_certificates_count: Histogram,
180    pub handle_soft_bundle_certificates_size_bytes: Histogram,
181    pub handle_capability_notification_latency: Histogram,
182
183    num_rejected_tx_in_epoch_boundary: IntCounter,
184    num_rejected_cert_in_epoch_boundary: IntCounter,
185    num_rejected_tx_during_overload: IntCounterVec,
186    num_rejected_cert_during_overload: IntCounterVec,
187    num_rejected_capability_notifications_during_overload: IntCounterVec,
188    connection_ip_not_found: IntCounter,
189    forwarded_header_parse_error: IntCounter,
190    forwarded_header_invalid: IntCounter,
191    forwarded_header_not_included: IntCounter,
192    client_id_source_config_mismatch: IntCounter,
193    x_forwarded_for_num_hops: Gauge,
194}
195
196impl ValidatorServiceMetrics {
197    /// Creates a new `ValidatorServiceMetrics` with Prometheus registry.
198    pub fn new(registry: &Registry) -> Self {
199        Self {
200            signature_errors: register_int_counter_with_registry!(
201                "total_signature_errors",
202                "Number of transaction signature errors",
203                registry,
204            )
205            .unwrap(),
206            tx_verification_latency: register_histogram_with_registry!(
207                "validator_service_tx_verification_latency",
208                "Latency of verifying a transaction",
209                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
210                registry,
211            )
212            .unwrap(),
213            cert_verification_latency: register_histogram_with_registry!(
214                "validator_service_cert_verification_latency",
215                "Latency of verifying a certificate",
216                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
217                registry,
218            )
219            .unwrap(),
220            consensus_latency: register_histogram_with_registry!(
221                "validator_service_consensus_latency",
222                "Time spent between submitting a shared obj txn to consensus and getting result",
223                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
224                registry,
225            )
226            .unwrap(),
227            handle_transaction_latency: register_histogram_with_registry!(
228                "validator_service_handle_transaction_latency",
229                "Latency of handling a transaction",
230                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231                registry,
232            )
233            .unwrap(),
234            handle_certificate_consensus_latency: register_histogram_with_registry!(
235                "validator_service_handle_certificate_consensus_latency",
236                "Latency of handling a consensus transaction certificate",
237                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
238                registry,
239            )
240            .unwrap(),
241            submit_certificate_consensus_latency: register_histogram_with_registry!(
242                "validator_service_submit_certificate_consensus_latency",
243                "Latency of submit_certificate RPC handler",
244                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
245                registry,
246            )
247            .unwrap(),
248            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
249                "validator_service_handle_certificate_non_consensus_latency",
250                "Latency of handling a non-consensus transaction certificate",
251                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
252                registry,
253            )
254            .unwrap(),
255            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
256                "validator_service_handle_soft_bundle_certificates_consensus_latency",
257                "Latency of handling a consensus soft bundle",
258                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
259                registry,
260            )
261            .unwrap(),
262            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
263                "validator_service_handle_soft_bundle_certificates_count",
264                "The number of certificates included in a soft bundle",
265                iota_metrics::COUNT_BUCKETS.to_vec(),
266                registry,
267            )
268            .unwrap(),
269            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
270                "validator_service_handle_soft_bundle_certificates_size_bytes",
271                "The size of soft bundle in bytes",
272                iota_metrics::BYTES_BUCKETS.to_vec(),
273                registry,
274            )
275            .unwrap(),
276            handle_capability_notification_latency: register_histogram_with_registry!(
277                "validator_service_handle_capability_notification_latency",
278                "Latency of handling a capability notification",
279                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
280                registry,
281            )
282            .unwrap(),
283            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
284                "validator_service_num_rejected_tx_in_epoch_boundary",
285                "Number of rejected transaction during epoch transitioning",
286                registry,
287            )
288            .unwrap(),
289            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
290                "validator_service_num_rejected_cert_in_epoch_boundary",
291                "Number of rejected transaction certificate during epoch transitioning",
292                registry,
293            )
294            .unwrap(),
295            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
296                "validator_service_num_rejected_tx_during_overload",
297                "Number of rejected transaction due to system overload",
298                &["error_type"],
299                registry,
300            )
301            .unwrap(),
302            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
303                "validator_service_num_rejected_cert_during_overload",
304                "Number of rejected transaction certificate due to system overload",
305                &["error_type"],
306                registry,
307            )
308            .unwrap(),
309            num_rejected_capability_notifications_during_overload: register_int_counter_vec_with_registry!(
310                "num_rejected_capability_notifications_during_overload",
311                "Number of rejected capability notifications from non-committee active validators due to system overload",
312                &["error_type"],
313                registry,
314            )
315            .unwrap(),
316            connection_ip_not_found: register_int_counter_with_registry!(
317                "validator_service_connection_ip_not_found",
318                "Number of times connection IP was not extractable from request",
319                registry,
320            )
321            .unwrap(),
322            forwarded_header_parse_error: register_int_counter_with_registry!(
323                "validator_service_forwarded_header_parse_error",
324                "Number of times x-forwarded-for header could not be parsed",
325                registry,
326            )
327            .unwrap(),
328            forwarded_header_invalid: register_int_counter_with_registry!(
329                "validator_service_forwarded_header_invalid",
330                "Number of times x-forwarded-for header was invalid",
331                registry,
332            )
333            .unwrap(),
334            forwarded_header_not_included: register_int_counter_with_registry!(
335                "validator_service_forwarded_header_not_included",
336                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
337                registry,
338            )
339            .unwrap(),
340            client_id_source_config_mismatch: register_int_counter_with_registry!(
341                "validator_service_client_id_source_config_mismatch",
342                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
343                registry,
344            )
345            .unwrap(),
346            x_forwarded_for_num_hops: register_gauge_with_registry!(
347                "validator_service_x_forwarded_for_num_hops",
348                "Number of hops in x-forwarded-for header",
349                registry,
350            )
351            .unwrap(),
352        }
353    }
354
355    /// Creates a new `ValidatorServiceMetrics` for testing.
356    pub fn new_for_tests() -> Self {
357        let registry = Registry::new();
358        Self::new(&registry)
359    }
360}
361
362/// The validator service.
363#[derive(Clone)]
364pub struct ValidatorService {
365    state: Arc<AuthorityState>,
366    consensus_adapter: Arc<ConsensusAdapter>,
367    metrics: Arc<ValidatorServiceMetrics>,
368    traffic_controller: Option<Arc<TrafficController>>,
369    client_id_source: Option<ClientIdSource>,
370}
371
372impl ValidatorService {
373    /// Creates a new `ValidatorService`.
374    pub fn new(
375        state: Arc<AuthorityState>,
376        consensus_adapter: Arc<ConsensusAdapter>,
377        validator_metrics: Arc<ValidatorServiceMetrics>,
378        client_id_source: Option<ClientIdSource>,
379    ) -> Self {
380        let traffic_controller = state.traffic_controller.clone();
381        Self {
382            state,
383            consensus_adapter,
384            metrics: validator_metrics,
385            traffic_controller,
386            client_id_source,
387        }
388    }
389
390    pub fn new_for_tests(
391        state: Arc<AuthorityState>,
392        consensus_adapter: Arc<ConsensusAdapter>,
393        metrics: Arc<ValidatorServiceMetrics>,
394    ) -> Self {
395        Self {
396            state,
397            consensus_adapter,
398            metrics,
399            traffic_controller: None,
400            client_id_source: None,
401        }
402    }
403
404    /// Returns the validator state.
405    pub fn validator_state(&self) -> &Arc<AuthorityState> {
406        &self.state
407    }
408
409    /// Executes a `CertifiedTransaction` for testing.
410    pub async fn execute_certificate_for_testing(
411        &self,
412        cert: CertifiedTransaction,
413    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
414        let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
415        self.handle_certificate_v1(request).await
416    }
417
418    /// Handles a `Transaction` request for benchmarking.
419    pub async fn handle_transaction_for_benchmarking(
420        &self,
421        transaction: Transaction,
422    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
423        let request = make_tonic_request_for_testing(transaction);
424        self.transaction(request).await
425    }
426
427    /// Handles a `Transaction` request.
428    async fn handle_transaction(
429        &self,
430        request: tonic::Request<Transaction>,
431    ) -> WrappedServiceResponse<HandleTransactionResponse> {
432        let Self {
433            state,
434            consensus_adapter,
435            metrics,
436            traffic_controller: _,
437            client_id_source: _,
438        } = self.clone();
439        let transaction = request.into_inner();
440        let epoch_store = state.load_epoch_store_one_call_per_task();
441
442        transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
443
444        // When authority is overloaded and decide to reject this tx, we still lock the
445        // object and ask the client to retry in the future. This is because
446        // without locking, the input objects can be locked by a different tx in
447        // the future, however, the input objects may already be locked by this
448        // tx in other validators. This can cause non of the txes to have enough
449        // quorum to form a certificate, causing the objects to be locked for
450        // the entire epoch. By doing locking but pushback, retrying transaction will
451        // have higher chance to succeed.
452        let mut validator_pushback_error = None;
453        let overload_check_res = state.check_system_overload(
454            &consensus_adapter,
455            transaction.data(),
456            state.check_system_overload_at_signing(),
457        );
458        if let Err(error) = overload_check_res {
459            metrics
460                .num_rejected_tx_during_overload
461                .with_label_values(&[error.as_ref()])
462                .inc();
463            // TODO: consider change the behavior for other types of overload errors.
464            match error {
465                IotaError::ValidatorOverloadedRetryAfter { .. } => {
466                    validator_pushback_error = Some(error)
467                }
468                _ => return Err(error.into()),
469            }
470        }
471
472        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
473
474        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
475        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
476            metrics.signature_errors.inc();
477        })?;
478        drop(tx_verif_metrics_guard);
479
480        let tx_digest = transaction.digest();
481
482        // Enable Trace Propagation across spans/processes using tx_digest
483        let span = error_span!("validator_state_process_tx", ?tx_digest);
484
485        let info = state
486            .handle_transaction(&epoch_store, transaction.clone())
487            .instrument(span)
488            .await
489            .tap_err(|e| {
490                if let IotaError::ValidatorHaltedAtEpochEnd = e {
491                    metrics.num_rejected_tx_in_epoch_boundary.inc();
492                }
493            })?;
494
495        if let Some(error) = validator_pushback_error {
496            // TODO: right now, we still sign the txn, but just don't return it. We can also
497            // skip signing to save more CPU.
498            return Err(error.into());
499        }
500
501        Ok((tonic::Response::new(info), Weight::zero()))
502    }
503
504    // In addition to the response from handling the certificates,
505    // returns a bool indicating whether the request should be tallied
506    // toward spam count. In general, this should be set to true for
507    // requests that are read-only and thus do not consume gas, such
508    // as when the transaction is already executed.
509    async fn handle_certificates(
510        &self,
511        certificates: NonEmpty<CertifiedTransaction>,
512        include_events: bool,
513        include_input_objects: bool,
514        include_output_objects: bool,
515        _include_auxiliary_data: bool,
516        epoch_store: &Arc<AuthorityPerEpochStore>,
517        wait_for_effects: bool,
518    ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
519        // Validate if cert can be executed
520        // Fullnode does not serve handle_certificate call.
521        fp_ensure!(
522            !self.state.is_fullnode(epoch_store),
523            IotaError::FullNodeCantHandleCertificate.into()
524        );
525
526        let shared_object_tx = certificates
527            .iter()
528            .any(|cert| cert.contains_shared_object());
529
530        let metrics = if certificates.len() == 1 {
531            if wait_for_effects {
532                if shared_object_tx {
533                    &self.metrics.handle_certificate_consensus_latency
534                } else {
535                    &self.metrics.handle_certificate_non_consensus_latency
536                }
537            } else {
538                &self.metrics.submit_certificate_consensus_latency
539            }
540        } else {
541            // `soft_bundle_validity_check` ensured that all certificates contain shared
542            // objects.
543            &self
544                .metrics
545                .handle_soft_bundle_certificates_consensus_latency
546        };
547
548        let _metrics_guard = metrics.start_timer();
549
550        // 1) Check if the certificate is already executed. This is only needed when we
551        //    have only one certificate (not a soft bundle). When multiple certificates
552        //    are provided, we will either submit all of them or none of them to
553        //    consensus.
554        if certificates.len() == 1 {
555            let tx_digest = *certificates[0].digest();
556
557            if let Some(signed_effects) = self
558                .state
559                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
560            {
561                let events = if include_events {
562                    if signed_effects.events_digest().is_some() {
563                        Some(
564                            self.state
565                                .get_transaction_events(signed_effects.transaction_digest())?,
566                        )
567                    } else {
568                        None
569                    }
570                } else {
571                    None
572                };
573
574                return Ok((
575                    Some(vec![HandleCertificateResponseV1 {
576                        signed_effects: signed_effects.into_inner(),
577                        events,
578                        input_objects: None,
579                        output_objects: None,
580                        auxiliary_data: None,
581                    }]),
582                    Weight::one(),
583                ));
584            };
585        }
586
587        // 2) Verify the certificates.
588        // Check system overload
589        for certificate in &certificates {
590            let overload_check_res = self.state.check_system_overload(
591                &self.consensus_adapter,
592                certificate.data(),
593                self.state.check_system_overload_at_execution(),
594            );
595            if let Err(error) = overload_check_res {
596                self.metrics
597                    .num_rejected_cert_during_overload
598                    .with_label_values(&[error.as_ref()])
599                    .inc();
600                return Err(error.into());
601            }
602        }
603
604        let verified_certificates = {
605            let _timer = self.metrics.cert_verification_latency.start_timer();
606            epoch_store
607                .signature_verifier
608                .multi_verify_certs(certificates.into())
609                .instrument(trace_span!("SignatureVerifier::multi_verify_certs"))
610                .await
611                .into_iter()
612                .collect::<Result<Vec<_>, _>>()?
613        };
614
615        {
616            // code block within reconfiguration lock
617            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
618            if !reconfiguration_lock.should_accept_user_certs() {
619                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
620                return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
621            }
622
623            // 3) All certificates are sent to consensus (at least by some authorities)
624            // For shared objects this will wait until either timeout or we have heard back
625            // from consensus. For owned objects this will return without
626            // waiting for certificate to be sequenced First do quick dirty
627            // non-async check.
628            if !epoch_store
629                .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
630            {
631                let _metrics_guard = if shared_object_tx {
632                    Some(self.metrics.consensus_latency.start_timer())
633                } else {
634                    None
635                };
636                let transactions = verified_certificates
637                    .iter()
638                    .map(|certificate| {
639                        ConsensusTransaction::new_certificate_message(
640                            &self.state.name,
641                            certificate.clone().into(),
642                        )
643                    })
644                    .collect::<Vec<_>>();
645                self.consensus_adapter.submit_batch(
646                    &transactions,
647                    Some(&reconfiguration_lock),
648                    epoch_store,
649                )?;
650                // Do not wait for the result, because the transaction might
651                // have already executed. Instead, check or wait
652                // for the existence of certificate effects below.
653            }
654        }
655
656        if !wait_for_effects {
657            // It is useful to enqueue owned object transaction for execution locally,
658            // even when we are not returning effects to user
659            let certificates_without_shared_objects = verified_certificates
660                .iter()
661                .filter(|certificate| !certificate.contains_shared_object())
662                .cloned()
663                .collect::<Vec<_>>();
664            if !certificates_without_shared_objects.is_empty() {
665                self.state.enqueue_certificates_for_execution(
666                    certificates_without_shared_objects,
667                    epoch_store,
668                );
669            }
670            return Ok((None, Weight::zero()));
671        }
672
673        // 4) Execute the certificates immediately if they contain only owned object
674        //    transactions,
675        // or wait for the execution results if it contains shared objects.
676        let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
677            |certificate| async move {
678                let effects = self
679                    .state
680                    .execute_certificate(&certificate, epoch_store)
681                    .await?;
682                let events = if include_events {
683                    if effects.events_digest().is_some() {
684                        Some(
685                            self.state
686                                .get_transaction_events(effects.transaction_digest())?,
687                        )
688                    } else {
689                        None
690                    }
691                } else {
692                    None
693                };
694
695                let input_objects = include_input_objects
696                    .then(|| self.state.get_transaction_input_objects(&effects))
697                    .and_then(|res| {
698                        res.map_err(|e| {
699                            warn!(
700                                tx_digest = ?effects.transaction_digest(),
701                                error = ?e,
702                                "Failed to load transaction input objects requested by client",
703                            )
704                        })
705                        .ok()
706                    });
707
708                let output_objects = include_output_objects
709                    .then(|| self.state.get_transaction_output_objects(&effects))
710                    .and_then(|res| {
711                        res.map_err(|e| {
712                            warn!(
713                                tx_digest = ?effects.transaction_digest(),
714                                error = ?e,
715                                "Failed to load transaction output objects requested by client",
716                            )
717                        })
718                        .ok()
719                    });
720
721                let signed_effects = self.state.sign_effects(effects, epoch_store)?;
722                epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
723
724                Ok::<_, IotaError>(HandleCertificateResponseV1 {
725                    signed_effects: signed_effects.into_inner(),
726                    events,
727                    input_objects,
728                    output_objects,
729                    auxiliary_data: None, // We don't have any aux data generated presently
730                })
731            },
732        ))
733        .await?;
734
735        Ok((Some(responses), Weight::zero()))
736    }
737}
738
739type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
740
741impl ValidatorService {
742    async fn transaction_impl(
743        &self,
744        request: tonic::Request<Transaction>,
745    ) -> WrappedServiceResponse<HandleTransactionResponse> {
746        self.handle_transaction(request)
747            .instrument(trace_span!("ValidatorService::handle_transaction"))
748            .await
749    }
750
751    async fn submit_certificate_impl(
752        &self,
753        request: tonic::Request<CertifiedTransaction>,
754    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
755        let epoch_store = self.state.load_epoch_store_one_call_per_task();
756        let certificate = request.into_inner();
757        certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
758
759        let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
760        self.handle_certificates(
761            nonempty![certificate],
762            true,
763            false,
764            false,
765            false,
766            &epoch_store,
767            false,
768        )
769        .instrument(span)
770        .await
771        .map(|(executed, spam_weight)| {
772            (
773                tonic::Response::new(SubmitCertificateResponse {
774                    executed: executed.map(|mut x| x.remove(0)),
775                }),
776                spam_weight,
777            )
778        })
779    }
780
781    async fn handle_certificate_v1_impl(
782        &self,
783        request: tonic::Request<HandleCertificateRequestV1>,
784    ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
785        let epoch_store = self.state.load_epoch_store_one_call_per_task();
786        let request = request.into_inner();
787        request
788            .certificate
789            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
790
791        let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
792        self.handle_certificates(
793            nonempty![request.certificate],
794            request.include_events,
795            request.include_input_objects,
796            request.include_output_objects,
797            request.include_auxiliary_data,
798            &epoch_store,
799            true,
800        )
801        .instrument(span)
802        .await
803        .map(|(resp, spam_weight)| {
804            (
805                tonic::Response::new(
806                    resp.expect(
807                        "handle_certificate should not return none with wait_for_effects=true",
808                    )
809                    .remove(0),
810                ),
811                spam_weight,
812            )
813        })
814    }
815
816    async fn soft_bundle_validity_check(
817        &self,
818        certificates: &NonEmpty<CertifiedTransaction>,
819        epoch_store: &Arc<AuthorityPerEpochStore>,
820        total_size_bytes: u64,
821    ) -> Result<(), tonic::Status> {
822        let protocol_config = epoch_store.protocol_config();
823
824        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
825        // - All certs must access at least one shared object.
826        // - All certs must not be already executed.
827        // - All certs must have the same gas price.
828        // - Number of certs must not exceed the max allowed.
829        // - Total size of all certs must not exceed the max allowed.
830        fp_ensure!(
831            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
832            IotaError::UserInput {
833                error: UserInputError::TooManyTransactionsInSoftBundle {
834                    limit: protocol_config.max_soft_bundle_size()
835                }
836            }
837            .into()
838        );
839
840        // We set the soft bundle max size to be half of the consensus max transactions
841        // in block size. We do this to account for serialization overheads and
842        // to ensure that the soft bundle is not too large when is attempted to be
843        // posted via consensus. Although half the block size is on the extreme
844        // side, it's should be good enough for now.
845        let soft_bundle_max_size_bytes =
846            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
847        fp_ensure!(
848            total_size_bytes <= soft_bundle_max_size_bytes,
849            IotaError::UserInput {
850                error: UserInputError::SoftBundleTooLarge {
851                    size: total_size_bytes,
852                    limit: soft_bundle_max_size_bytes,
853                },
854            }
855            .into()
856        );
857
858        let mut gas_price = None;
859        for certificate in certificates {
860            let tx_digest = *certificate.digest();
861            fp_ensure!(
862                certificate.contains_shared_object(),
863                IotaError::UserInput {
864                    error: UserInputError::NoSharedObject { digest: tx_digest }
865                }
866                .into()
867            );
868            fp_ensure!(
869                !self.state.try_is_tx_already_executed(&tx_digest)?,
870                IotaError::UserInput {
871                    error: UserInputError::AlreadyExecuted { digest: tx_digest }
872                }
873                .into()
874            );
875            if let Some(gas) = gas_price {
876                fp_ensure!(
877                    gas == certificate.gas_price(),
878                    IotaError::UserInput {
879                        error: UserInputError::GasPriceMismatch {
880                            digest: tx_digest,
881                            expected: gas,
882                            actual: certificate.gas_price()
883                        }
884                    }
885                    .into()
886                );
887            } else {
888                gas_price = Some(certificate.gas_price());
889            }
890        }
891
892        // For Soft Bundle, if at this point we know at least one certificate has
893        // already been processed, reject the entire bundle.  Otherwise, submit
894        // all certificates in one request. This is not a strict check as there
895        // may be race conditions where one or more certificates are
896        // already being processed by another actor, and we could not know it.
897        fp_ensure!(
898            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
899            IotaError::UserInput {
900                error: UserInputError::CertificateAlreadyProcessed
901            }
902            .into()
903        );
904
905        Ok(())
906    }
907
908    async fn handle_soft_bundle_certificates_v1_impl(
909        &self,
910        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
911    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
912        let epoch_store = self.state.load_epoch_store_one_call_per_task();
913        let client_addr = if let Some(client_id_source) = &self.client_id_source {
914            self.get_client_ip_addr(&request, client_id_source)
915        } else {
916            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
917        };
918
919        let request = request.into_inner();
920
921        let certificates =
922            NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
923        let mut total_size_bytes = 0;
924        for certificate in &certificates {
925            // We need to check this first because we haven't verified the cert signature.
926            total_size_bytes += certificate
927                .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
928                as u64;
929        }
930
931        self.metrics
932            .handle_soft_bundle_certificates_count
933            .observe(certificates.len() as f64);
934
935        self.metrics
936            .handle_soft_bundle_certificates_size_bytes
937            .observe(total_size_bytes as f64);
938
939        // Now that individual certificates are valid, we check if the bundle is valid.
940        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
941            .await?;
942
943        info!(
944            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
945            certificates.len(),
946            client_addr
947                .map(|x| x.to_string())
948                .unwrap_or_else(|| "unknown".to_string()),
949            certificates
950                .iter()
951                .map(|x| x.digest().to_string())
952                .collect::<Vec<_>>()
953                .join(", "),
954            total_size_bytes
955        );
956
957        let span = error_span!("handle_soft_bundle_certificates_v1");
958        self.handle_certificates(
959            certificates,
960            request.include_events,
961            request.include_input_objects,
962            request.include_output_objects,
963            request.include_auxiliary_data,
964            &epoch_store,
965            request.wait_for_effects,
966        )
967        .instrument(span)
968        .await
969        .map(|(resp, spam_weight)| {
970            (
971                tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
972                    responses: resp.unwrap_or_default(),
973                }),
974                spam_weight,
975            )
976        })
977    }
978
979    async fn object_info_impl(
980        &self,
981        request: tonic::Request<ObjectInfoRequest>,
982    ) -> WrappedServiceResponse<ObjectInfoResponse> {
983        let request = request.into_inner();
984        let response = self.state.handle_object_info_request(request).await?;
985        Ok((tonic::Response::new(response), Weight::one()))
986    }
987
988    async fn transaction_info_impl(
989        &self,
990        request: tonic::Request<TransactionInfoRequest>,
991    ) -> WrappedServiceResponse<TransactionInfoResponse> {
992        let request = request.into_inner();
993        let response = self.state.handle_transaction_info_request(request).await?;
994        Ok((tonic::Response::new(response), Weight::one()))
995    }
996
997    async fn checkpoint_impl(
998        &self,
999        request: tonic::Request<CheckpointRequest>,
1000    ) -> WrappedServiceResponse<CheckpointResponse> {
1001        let request = request.into_inner();
1002        let response = self.state.handle_checkpoint_request(&request)?;
1003        Ok((tonic::Response::new(response), Weight::one()))
1004    }
1005
1006    async fn get_system_state_object_impl(
1007        &self,
1008        _request: tonic::Request<SystemStateRequest>,
1009    ) -> WrappedServiceResponse<IotaSystemState> {
1010        let response = self
1011            .state
1012            .get_object_cache_reader()
1013            .try_get_iota_system_state_object_unsafe()?;
1014        Ok((tonic::Response::new(response), Weight::one()))
1015    }
1016
1017    fn get_client_ip_addr<T>(
1018        &self,
1019        request: &tonic::Request<T>,
1020        source: &ClientIdSource,
1021    ) -> Option<IpAddr> {
1022        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1023
1024        if let Some(header) = forwarded_header {
1025            let num_hops = header
1026                .to_str()
1027                .map(|h| h.split(',').count().saturating_sub(1))
1028                .unwrap_or(0);
1029
1030            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1031        }
1032
1033        match source {
1034            ClientIdSource::SocketAddr => {
1035                let socket_addr: Option<SocketAddr> = request.remote_addr();
1036
1037                // We will hit this case if the IO type used does not
1038                // implement Connected or when using a unix domain socket.
1039                // TODO: once we have confirmed that no legitimate traffic
1040                // is hitting this case, we should reject such requests that
1041                // hit this case.
1042                if let Some(socket_addr) = socket_addr {
1043                    Some(socket_addr.ip())
1044                } else {
1045                    if cfg!(msim) {
1046                        // Ignore the error from simtests.
1047                    } else if cfg!(test) {
1048                        panic!("Failed to get remote address from request");
1049                    } else {
1050                        self.metrics.connection_ip_not_found.inc();
1051                        error!("Failed to get remote address from request");
1052                    }
1053                    None
1054                }
1055            }
1056            ClientIdSource::XForwardedFor(num_hops) => {
1057                let do_header_parse = |op: &MetadataValue<Ascii>| {
1058                    match op.to_str() {
1059                        Ok(header_val) => {
1060                            let header_contents =
1061                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1062                            if *num_hops == 0 {
1063                                error!(
1064                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1065                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1066                                    to this node. Skipping traffic controller request handling.",
1067                                    header_contents,
1068                                );
1069                                return None;
1070                            }
1071                            let contents_len = header_contents.len();
1072                            if contents_len < *num_hops {
1073                                error!(
1074                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1075                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1076                                    `client-id-source` in the node config.",
1077                                    header_contents, contents_len, num_hops, contents_len,
1078                                );
1079                                self.metrics.client_id_source_config_mismatch.inc();
1080                                return None;
1081                            }
1082                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1083                            else {
1084                                error!(
1085                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1086                                    Expected at least {} values. Skipping traffic controller request handling.",
1087                                    header_contents, contents_len, num_hops, contents_len,
1088                                );
1089                                return None;
1090                            };
1091                            parse_ip(client_ip).or_else(|| {
1092                                self.metrics.forwarded_header_parse_error.inc();
1093                                None
1094                            })
1095                        }
1096                        Err(e) => {
1097                            // TODO: once we have confirmed that no legitimate traffic
1098                            // is hitting this case, we should reject such requests that
1099                            // hit this case.
1100                            self.metrics.forwarded_header_invalid.inc();
1101                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1102                            None
1103                        }
1104                    }
1105                };
1106                if let Some(op) = request.metadata().get("x-forwarded-for") {
1107                    do_header_parse(op)
1108                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1109                    do_header_parse(op)
1110                } else {
1111                    self.metrics.forwarded_header_not_included.inc();
1112                    error!(
1113                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1114                    );
1115                    None
1116                }
1117            }
1118        }
1119    }
1120
1121    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1122        if let Some(traffic_controller) = &self.traffic_controller {
1123            if !traffic_controller.check(&client, &None).await {
1124                // Entity in blocklist
1125                Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1126            } else {
1127                Ok(())
1128            }
1129        } else {
1130            Ok(())
1131        }
1132    }
1133
1134    fn handle_traffic_resp<T>(
1135        &self,
1136        client: Option<IpAddr>,
1137        wrapped_response: WrappedServiceResponse<T>,
1138    ) -> Result<tonic::Response<T>, tonic::Status> {
1139        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1140            Ok((result, spam_weight)) => (None, spam_weight, Ok(result)),
1141            Err(status) => (
1142                Some(IotaError::from(status.clone())),
1143                Weight::zero(),
1144                Err(status),
1145            ),
1146        };
1147
1148        if let Some(traffic_controller) = self.traffic_controller.clone() {
1149            traffic_controller.tally(TrafficTally {
1150                direct: client,
1151                through_fullnode: None,
1152                error_info: error.map(|e| {
1153                    let error_type = String::from(e.as_ref());
1154                    let error_weight = normalize(e);
1155                    (error_weight, error_type)
1156                }),
1157                spam_weight,
1158                timestamp: SystemTime::now(),
1159            })
1160        }
1161        unwrapped_response
1162    }
1163
1164    async fn handle_capability_notification_v1_impl(
1165        &self,
1166        request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1167    ) -> WrappedServiceResponse<HandleCapabilityNotificationResponseV1> {
1168        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1169        let request = request.into_inner();
1170        fp_ensure!(
1171            epoch_store
1172                .protocol_config()
1173                .track_non_committee_eligible_validators(),
1174            IotaError::UnsupportedFeature {
1175                error: "capability notification endpoint is not supported in this Protocol Version"
1176                    .to_string()
1177            }
1178            .into()
1179        );
1180        // Validate if cert can be executed
1181        // Fullnode does not serve handle_certificate call.
1182        fp_ensure!(
1183            !self.state.is_fullnode(&epoch_store),
1184            IotaError::FullNodeCantHandleAuthorityCapabilities.into()
1185        );
1186
1187        // Check if the capabilities notification has already been processed
1188        let existing_capabilities = epoch_store.get_capabilities_v1()?;
1189        let incoming_capability = request.message.data();
1190
1191        info!(
1192            "Received capability notification: {:?}",
1193            incoming_capability
1194        );
1195
1196        if let Some(existing) = existing_capabilities
1197            .iter()
1198            .find(|cap| cap.authority == incoming_capability.authority)
1199        {
1200            if incoming_capability.generation <= existing.generation {
1201                // Return successfully if generation is lower or equal - already processed
1202                return Ok((
1203                    tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1204                    Weight::one(),
1205                ));
1206            }
1207        }
1208        if let Err(error) = self.consensus_adapter.check_consensus_overload() {
1209            self.metrics
1210                .num_rejected_capability_notifications_during_overload
1211                .with_label_values(&[error.as_ref()])
1212                .inc();
1213            return Err(error.into());
1214        }
1215
1216        let _handle_tx_metrics_guard = self
1217            .metrics
1218            .handle_capability_notification_latency
1219            .start_timer();
1220
1221        let signed_authority_capabilities = request.message;
1222        // Verify the message signature
1223        let verified_authority_capabilities = epoch_store
1224            .verify_authority_capabilities(signed_authority_capabilities)
1225            .inspect_err(|_e| {
1226                self.metrics.signature_errors.inc();
1227            })?;
1228
1229        let authority_name = verified_authority_capabilities.authority;
1230        // Process the verified capabilities
1231        debug!("Verified capability notification for authority {authority_name:?}");
1232
1233        // Submit the signed capability notification to consensus instead of processing
1234        // directly
1235        let signed_authority_capabilities_transaction =
1236            ConsensusTransaction::new_signed_capability_notification_v1(
1237                verified_authority_capabilities.into_inner(),
1238            );
1239
1240        // Submit to consensus - similar to how certificates are handled
1241        self.consensus_adapter.submit(
1242            signed_authority_capabilities_transaction,
1243            None,
1244            &epoch_store,
1245        )?;
1246
1247        debug!("Submitted capability notification to consensus for authority {authority_name:?}");
1248
1249        Ok((
1250            tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1251            Weight::one(),
1252        ))
1253    }
1254}
1255
1256fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1257    // simulate a TCP connection, which would have added extensions to
1258    // the request object that would be used downstream
1259    let mut request = tonic::Request::new(message);
1260    let tcp_connect_info = TcpConnectInfo {
1261        local_addr: None,
1262        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1263    };
1264    request.extensions_mut().insert(tcp_connect_info);
1265    request
1266}
1267
1268// TODO: refine error matching here
1269fn normalize(err: IotaError) -> Weight {
1270    match err {
1271        IotaError::UserInput {
1272            error: UserInputError::IncorrectUserSignature { .. },
1273        } => Weight::one(),
1274        IotaError::InvalidSignature { .. }
1275        | IotaError::SignerSignatureAbsent { .. }
1276        | IotaError::SignerSignatureNumberMismatch { .. }
1277        | IotaError::IncorrectSigner { .. }
1278        | IotaError::UnknownSigner { .. }
1279        | IotaError::WrongEpoch { .. } => Weight::one(),
1280        _ => Weight::zero(),
1281    }
1282}
1283
1284/// Implements generic pre- and post-processing. Since this is on the critical
1285/// path, any heavy lifting should be done in a separate non-blocking task
1286/// unless it is necessary to override the return value.
1287#[macro_export]
1288macro_rules! handle_with_decoration {
1289    ($self:ident, $func_name:ident, $request:ident) => {{
1290        if $self.client_id_source.is_none() {
1291            return $self.$func_name($request).await.map(|(result, _)| result);
1292        }
1293
1294        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1295
1296        // check if either IP is blocked, in which case return early
1297        $self.handle_traffic_req(client.clone()).await?;
1298
1299        // handle traffic tallying
1300        let wrapped_response = $self.$func_name($request).await;
1301        $self.handle_traffic_resp(client, wrapped_response)
1302    }};
1303}
1304
1305#[async_trait]
1306impl Validator for ValidatorService {
1307    /// Handles a `Transaction` request.
1308    async fn transaction(
1309        &self,
1310        request: tonic::Request<Transaction>,
1311    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1312        let validator_service = self.clone();
1313
1314        // Spawns a task which handles the transaction. The task will unconditionally
1315        // continue processing in the event that the client connection is
1316        // dropped.
1317        spawn_monitored_task!(async move {
1318            // NB: traffic tally wrapping handled within the task rather than on task exit
1319            // to prevent an attacker from subverting traffic control by severing the
1320            // connection
1321            handle_with_decoration!(validator_service, transaction_impl, request)
1322        })
1323        .await
1324        .unwrap()
1325    }
1326
1327    /// Submits a `CertifiedTransaction` request.
1328    async fn submit_certificate(
1329        &self,
1330        request: tonic::Request<CertifiedTransaction>,
1331    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1332        let validator_service = self.clone();
1333
1334        // Spawns a task which handles the certificate. The task will unconditionally
1335        // continue processing in the event that the client connection is
1336        // dropped.
1337        spawn_monitored_task!(async move {
1338            // NB: traffic tally wrapping handled within the task rather than on task exit
1339            // to prevent an attacker from subverting traffic control by severing the
1340            // connection.
1341            handle_with_decoration!(validator_service, submit_certificate_impl, request)
1342        })
1343        .await
1344        .unwrap()
1345    }
1346
1347    async fn handle_certificate_v1(
1348        &self,
1349        request: tonic::Request<HandleCertificateRequestV1>,
1350    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1351        handle_with_decoration!(self, handle_certificate_v1_impl, request)
1352    }
1353
1354    async fn handle_soft_bundle_certificates_v1(
1355        &self,
1356        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1357    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1358        handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1359    }
1360
1361    /// Handles an `ObjectInfoRequest` request.
1362    async fn object_info(
1363        &self,
1364        request: tonic::Request<ObjectInfoRequest>,
1365    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1366        handle_with_decoration!(self, object_info_impl, request)
1367    }
1368
1369    /// Handles a `TransactionInfoRequest` request.
1370    async fn transaction_info(
1371        &self,
1372        request: tonic::Request<TransactionInfoRequest>,
1373    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1374        handle_with_decoration!(self, transaction_info_impl, request)
1375    }
1376
1377    /// Handles a `CheckpointRequest` request.
1378    async fn checkpoint(
1379        &self,
1380        request: tonic::Request<CheckpointRequest>,
1381    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1382        handle_with_decoration!(self, checkpoint_impl, request)
1383    }
1384
1385    /// Gets the `IotaSystemState` response.
1386    async fn get_system_state_object(
1387        &self,
1388        request: tonic::Request<SystemStateRequest>,
1389    ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1390        handle_with_decoration!(self, get_system_state_object_impl, request)
1391    }
1392
1393    async fn handle_capability_notification_v1(
1394        &self,
1395        request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1396    ) -> Result<tonic::Response<HandleCapabilityNotificationResponseV1>, tonic::Status> {
1397        handle_with_decoration!(self, handle_capability_notification_v1_impl, request)
1398    }
1399}