iota_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    io,
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19    api::{Validator, ValidatorServer},
20    tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24    effects::TransactionEffectsAPI,
25    error::*,
26    fp_ensure,
27    iota_system_state::IotaSystemState,
28    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29    messages_consensus::ConsensusTransaction,
30    messages_grpc::{
31        HandleCertificateRequestV1, HandleCertificateResponseV1,
32        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
33        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
34        SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
35        TransactionInfoResponse,
36    },
37    multiaddr::Multiaddr,
38    traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
39    transaction::*,
40};
41use nonempty::{NonEmpty, nonempty};
42use prometheus::{
43    Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
44    register_int_counter_vec_with_registry, register_int_counter_with_registry,
45};
46use tap::TapFallible;
47use tokio::task::JoinHandle;
48use tonic::{
49    metadata::{Ascii, MetadataValue},
50    transport::server::TcpConnectInfo,
51};
52use tracing::{Instrument, error, error_span, info};
53
54use crate::{
55    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
56    consensus_adapter::{
57        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
58    },
59    mysticeti_adapter::LazyMysticetiClient,
60    traffic_controller::{
61        TrafficController, metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally,
62    },
63};
64
65#[cfg(test)]
66#[path = "unit_tests/server_tests.rs"]
67mod server_tests;
68
69/// A handle to the authority server.
70pub struct AuthorityServerHandle {
71    tx_cancellation: tokio::sync::oneshot::Sender<()>,
72    local_addr: Multiaddr,
73    handle: JoinHandle<Result<(), tonic::transport::Error>>,
74}
75
76impl AuthorityServerHandle {
77    /// Waits for the server to complete.
78    pub async fn join(self) -> Result<(), io::Error> {
79        // Note that dropping `self.complete` would terminate the server.
80        self.handle.await?.map_err(io::Error::other)?;
81        Ok(())
82    }
83
84    /// Kills the server.
85    pub async fn kill(self) -> Result<(), io::Error> {
86        self.tx_cancellation
87            .send(())
88            .map_err(|_e| io::Error::other("could not send cancellation signal!"))?;
89        self.handle.await?.map_err(io::Error::other)?;
90        Ok(())
91    }
92
93    /// Returns the address of the server.
94    pub fn address(&self) -> &Multiaddr {
95        &self.local_addr
96    }
97}
98
99/// An authority server that is used for testing.
100pub struct AuthorityServer {
101    address: Multiaddr,
102    pub state: Arc<AuthorityState>,
103    consensus_adapter: Arc<ConsensusAdapter>,
104    pub metrics: Arc<ValidatorServiceMetrics>,
105}
106
107impl AuthorityServer {
108    /// Creates a new `AuthorityServer` for testing with a consensus adapter.
109    pub fn new_for_test_with_consensus_adapter(
110        state: Arc<AuthorityState>,
111        consensus_adapter: Arc<ConsensusAdapter>,
112    ) -> Self {
113        let address = new_local_tcp_address_for_testing();
114        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
115
116        Self {
117            address,
118            state,
119            consensus_adapter,
120            metrics,
121        }
122    }
123
124    /// Creates a new `AuthorityServer` for testing.
125    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
126        let consensus_adapter = Arc::new(ConsensusAdapter::new(
127            Arc::new(LazyMysticetiClient::new()),
128            state.name,
129            Arc::new(ConnectionMonitorStatusForTests {}),
130            100_000,
131            100_000,
132            None,
133            None,
134            ConsensusAdapterMetrics::new_test(),
135        ));
136        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
137    }
138
139    /// Spawns the server.
140    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
141        let address = self.address.clone();
142        self.spawn_with_bind_address_for_test(address).await
143    }
144
145    /// Spawns the server with a bind address.
146    pub async fn spawn_with_bind_address_for_test(
147        self,
148        address: Multiaddr,
149    ) -> Result<AuthorityServerHandle, io::Error> {
150        let tls_config = iota_tls::create_rustls_server_config(
151            self.state.config.network_key_pair().copy().private(),
152            IOTA_TLS_SERVER_NAME.to_string(),
153            iota_tls::AllowAll,
154        );
155        let mut server = iota_network_stack::config::Config::new()
156            .server_builder()
157            .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
158                self.state,
159                self.consensus_adapter,
160                self.metrics,
161            )))
162            .bind(&address, Some(tls_config))
163            .await
164            .unwrap();
165        let local_addr = server.local_addr().to_owned();
166        info!("Listening to traffic on {local_addr}");
167        let handle = AuthorityServerHandle {
168            tx_cancellation: server.take_cancel_handle().unwrap(),
169            local_addr,
170            handle: spawn_monitored_task!(server.serve()),
171        };
172        Ok(handle)
173    }
174}
175
176/// Metrics for the validator service.
177pub struct ValidatorServiceMetrics {
178    pub signature_errors: IntCounter,
179    pub tx_verification_latency: Histogram,
180    pub cert_verification_latency: Histogram,
181    pub consensus_latency: Histogram,
182    pub handle_transaction_latency: Histogram,
183    pub submit_certificate_consensus_latency: Histogram,
184    pub handle_certificate_consensus_latency: Histogram,
185    pub handle_certificate_non_consensus_latency: Histogram,
186    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
187    pub handle_soft_bundle_certificates_count: Histogram,
188    pub handle_soft_bundle_certificates_size_bytes: Histogram,
189
190    num_rejected_tx_in_epoch_boundary: IntCounter,
191    num_rejected_cert_in_epoch_boundary: IntCounter,
192    num_rejected_tx_during_overload: IntCounterVec,
193    num_rejected_cert_during_overload: IntCounterVec,
194    connection_ip_not_found: IntCounter,
195    forwarded_header_parse_error: IntCounter,
196    forwarded_header_invalid: IntCounter,
197    forwarded_header_not_included: IntCounter,
198    client_id_source_config_mismatch: IntCounter,
199}
200
201impl ValidatorServiceMetrics {
202    /// Creates a new `ValidatorServiceMetrics` with Prometheus registry.
203    pub fn new(registry: &Registry) -> Self {
204        Self {
205            signature_errors: register_int_counter_with_registry!(
206                "total_signature_errors",
207                "Number of transaction signature errors",
208                registry,
209            )
210            .unwrap(),
211            tx_verification_latency: register_histogram_with_registry!(
212                "validator_service_tx_verification_latency",
213                "Latency of verifying a transaction",
214                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
215                registry,
216            )
217            .unwrap(),
218            cert_verification_latency: register_histogram_with_registry!(
219                "validator_service_cert_verification_latency",
220                "Latency of verifying a certificate",
221                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
222                registry,
223            )
224            .unwrap(),
225            consensus_latency: register_histogram_with_registry!(
226                "validator_service_consensus_latency",
227                "Time spent between submitting a shared obj txn to consensus and getting result",
228                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
229                registry,
230            )
231            .unwrap(),
232            handle_transaction_latency: register_histogram_with_registry!(
233                "validator_service_handle_transaction_latency",
234                "Latency of handling a transaction",
235                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
236                registry,
237            )
238            .unwrap(),
239            handle_certificate_consensus_latency: register_histogram_with_registry!(
240                "validator_service_handle_certificate_consensus_latency",
241                "Latency of handling a consensus transaction certificate",
242                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
243                registry,
244            )
245            .unwrap(),
246            submit_certificate_consensus_latency: register_histogram_with_registry!(
247                "validator_service_submit_certificate_consensus_latency",
248                "Latency of submit_certificate RPC handler",
249                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
250                registry,
251            )
252            .unwrap(),
253            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
254                "validator_service_handle_certificate_non_consensus_latency",
255                "Latency of handling a non-consensus transaction certificate",
256                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257                registry,
258            )
259            .unwrap(),
260            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
261                "validator_service_handle_soft_bundle_certificates_consensus_latency",
262                "Latency of handling a consensus soft bundle",
263                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
264                registry,
265            )
266            .unwrap(),
267            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
268                "validator_service_handle_soft_bundle_certificates_count",
269                "The number of certificates included in a soft bundle",
270                iota_metrics::COUNT_BUCKETS.to_vec(),
271                registry,
272            )
273            .unwrap(),
274            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
275                "validator_service_handle_soft_bundle_certificates_size_bytes",
276                "The size of soft bundle in bytes",
277                iota_metrics::BYTES_BUCKETS.to_vec(),
278                registry,
279            )
280            .unwrap(),
281            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
282                "validator_service_num_rejected_tx_in_epoch_boundary",
283                "Number of rejected transaction during epoch transitioning",
284                registry,
285            )
286            .unwrap(),
287            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
288                "validator_service_num_rejected_cert_in_epoch_boundary",
289                "Number of rejected transaction certificate during epoch transitioning",
290                registry,
291            )
292            .unwrap(),
293            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
294                "validator_service_num_rejected_tx_during_overload",
295                "Number of rejected transaction due to system overload",
296                &["error_type"],
297                registry,
298            )
299            .unwrap(),
300            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
301                "validator_service_num_rejected_cert_during_overload",
302                "Number of rejected transaction certificate due to system overload",
303                &["error_type"],
304                registry,
305            )
306            .unwrap(),
307            connection_ip_not_found: register_int_counter_with_registry!(
308                "validator_service_connection_ip_not_found",
309                "Number of times connection IP was not extractable from request",
310                registry,
311            )
312            .unwrap(),
313            forwarded_header_parse_error: register_int_counter_with_registry!(
314                "validator_service_forwarded_header_parse_error",
315                "Number of times x-forwarded-for header could not be parsed",
316                registry,
317            )
318            .unwrap(),
319            forwarded_header_invalid: register_int_counter_with_registry!(
320                "validator_service_forwarded_header_invalid",
321                "Number of times x-forwarded-for header was invalid",
322                registry,
323            )
324            .unwrap(),
325            forwarded_header_not_included: register_int_counter_with_registry!(
326                "validator_service_forwarded_header_not_included",
327                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
328                registry,
329            )
330            .unwrap(),
331            client_id_source_config_mismatch: register_int_counter_with_registry!(
332                "validator_service_client_id_source_config_mismatch",
333                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
334                registry,
335            )
336            .unwrap(),
337        }
338    }
339
340    /// Creates a new `ValidatorServiceMetrics` for testing.
341    pub fn new_for_tests() -> Self {
342        let registry = Registry::new();
343        Self::new(&registry)
344    }
345}
346
347/// The validator service.
348#[derive(Clone)]
349pub struct ValidatorService {
350    state: Arc<AuthorityState>,
351    consensus_adapter: Arc<ConsensusAdapter>,
352    metrics: Arc<ValidatorServiceMetrics>,
353    traffic_controller: Option<Arc<TrafficController>>,
354    client_id_source: Option<ClientIdSource>,
355}
356
357impl ValidatorService {
358    /// Creates a new `ValidatorService`.
359    pub fn new(
360        state: Arc<AuthorityState>,
361        consensus_adapter: Arc<ConsensusAdapter>,
362        validator_metrics: Arc<ValidatorServiceMetrics>,
363        traffic_controller_metrics: TrafficControllerMetrics,
364        policy_config: Option<PolicyConfig>,
365        firewall_config: Option<RemoteFirewallConfig>,
366    ) -> Self {
367        Self {
368            state,
369            consensus_adapter,
370            metrics: validator_metrics,
371            traffic_controller: policy_config.clone().map(|policy| {
372                Arc::new(TrafficController::init(
373                    policy,
374                    traffic_controller_metrics,
375                    firewall_config,
376                ))
377            }),
378            client_id_source: policy_config.map(|policy| policy.client_id_source),
379        }
380    }
381
382    pub fn new_for_tests(
383        state: Arc<AuthorityState>,
384        consensus_adapter: Arc<ConsensusAdapter>,
385        metrics: Arc<ValidatorServiceMetrics>,
386    ) -> Self {
387        Self {
388            state,
389            consensus_adapter,
390            metrics,
391            traffic_controller: None,
392            client_id_source: None,
393        }
394    }
395
396    /// Returns the validator state.
397    pub fn validator_state(&self) -> &Arc<AuthorityState> {
398        &self.state
399    }
400
401    /// Executes a `CertifiedTransaction` for testing.
402    pub async fn execute_certificate_for_testing(
403        &self,
404        cert: CertifiedTransaction,
405    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
406        let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
407        self.handle_certificate_v1(request).await
408    }
409
410    /// Handles a `Transaction` request for benchmarking.
411    pub async fn handle_transaction_for_benchmarking(
412        &self,
413        transaction: Transaction,
414    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
415        let request = make_tonic_request_for_testing(transaction);
416        self.transaction(request).await
417    }
418
419    /// Handles a `Transaction` request.
420    async fn handle_transaction(
421        &self,
422        request: tonic::Request<Transaction>,
423    ) -> WrappedServiceResponse<HandleTransactionResponse> {
424        let Self {
425            state,
426            consensus_adapter,
427            metrics,
428            traffic_controller: _,
429            client_id_source: _,
430        } = self.clone();
431        let transaction = request.into_inner();
432        let epoch_store = state.load_epoch_store_one_call_per_task();
433
434        transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
435
436        // When authority is overloaded and decide to reject this tx, we still lock the
437        // object and ask the client to retry in the future. This is because
438        // without locking, the input objects can be locked by a different tx in
439        // the future, however, the input objects may already be locked by this
440        // tx in other validators. This can cause non of the txes to have enough
441        // quorum to form a certificate, causing the objects to be locked for
442        // the entire epoch. By doing locking but pushback, retrying transaction will
443        // have higher chance to succeed.
444        let mut validator_pushback_error = None;
445        let overload_check_res = state.check_system_overload(
446            &consensus_adapter,
447            transaction.data(),
448            state.check_system_overload_at_signing(),
449        );
450        if let Err(error) = overload_check_res {
451            metrics
452                .num_rejected_tx_during_overload
453                .with_label_values(&[error.as_ref()])
454                .inc();
455            // TODO: consider change the behavior for other types of overload errors.
456            match error {
457                IotaError::ValidatorOverloadedRetryAfter { .. } => {
458                    validator_pushback_error = Some(error)
459                }
460                _ => return Err(error.into()),
461            }
462        }
463
464        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
465
466        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
467        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
468            metrics.signature_errors.inc();
469        })?;
470        drop(tx_verif_metrics_guard);
471
472        let tx_digest = transaction.digest();
473
474        // Enable Trace Propagation across spans/processes using tx_digest
475        let span = error_span!("validator_state_process_tx", ?tx_digest);
476
477        let info = state
478            .handle_transaction(&epoch_store, transaction.clone())
479            .instrument(span)
480            .await
481            .tap_err(|e| {
482                if let IotaError::ValidatorHaltedAtEpochEnd = e {
483                    metrics.num_rejected_tx_in_epoch_boundary.inc();
484                }
485            })?;
486
487        if let Some(error) = validator_pushback_error {
488            // TODO: right now, we still sign the txn, but just don't return it. We can also
489            // skip signing to save more CPU.
490            return Err(error.into());
491        }
492
493        Ok((tonic::Response::new(info), Weight::zero()))
494    }
495
496    // In addition to the response from handling the certificates,
497    // returns a bool indicating whether the request should be tallied
498    // toward spam count. In general, this should be set to true for
499    // requests that are read-only and thus do not consume gas, such
500    // as when the transaction is already executed.
501    async fn handle_certificates(
502        &self,
503        certificates: NonEmpty<CertifiedTransaction>,
504        include_events: bool,
505        include_input_objects: bool,
506        include_output_objects: bool,
507        _include_auxiliary_data: bool,
508        epoch_store: &Arc<AuthorityPerEpochStore>,
509        wait_for_effects: bool,
510    ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
511        // Validate if cert can be executed
512        // Fullnode does not serve handle_certificate call.
513        fp_ensure!(
514            !self.state.is_fullnode(epoch_store),
515            IotaError::FullNodeCantHandleCertificate.into()
516        );
517
518        let shared_object_tx = certificates
519            .iter()
520            .any(|cert| cert.contains_shared_object());
521
522        let metrics = if certificates.len() == 1 {
523            if wait_for_effects {
524                if shared_object_tx {
525                    &self.metrics.handle_certificate_consensus_latency
526                } else {
527                    &self.metrics.handle_certificate_non_consensus_latency
528                }
529            } else {
530                &self.metrics.submit_certificate_consensus_latency
531            }
532        } else {
533            // `soft_bundle_validity_check` ensured that all certificates contain shared
534            // objects.
535            &self
536                .metrics
537                .handle_soft_bundle_certificates_consensus_latency
538        };
539
540        let _metrics_guard = metrics.start_timer();
541
542        // 1) Check if the certificate is already executed. This is only needed when we
543        //    have only one certificate (not a soft bundle). When multiple certificates
544        //    are provided, we will either submit all of them or none of them to
545        //    consensus.
546        if certificates.len() == 1 {
547            let tx_digest = *certificates[0].digest();
548
549            if let Some(signed_effects) = self
550                .state
551                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
552            {
553                let events = if include_events {
554                    if let Some(digest) = signed_effects.events_digest() {
555                        Some(self.state.get_transaction_events(digest)?)
556                    } else {
557                        None
558                    }
559                } else {
560                    None
561                };
562
563                return Ok((
564                    Some(vec![HandleCertificateResponseV1 {
565                        signed_effects: signed_effects.into_inner(),
566                        events,
567                        input_objects: None,
568                        output_objects: None,
569                        auxiliary_data: None,
570                    }]),
571                    Weight::one(),
572                ));
573            };
574        }
575
576        // 2) Verify the certificates.
577        // Check system overload
578        for certificate in &certificates {
579            let overload_check_res = self.state.check_system_overload(
580                &self.consensus_adapter,
581                certificate.data(),
582                self.state.check_system_overload_at_execution(),
583            );
584            if let Err(error) = overload_check_res {
585                self.metrics
586                    .num_rejected_cert_during_overload
587                    .with_label_values(&[error.as_ref()])
588                    .inc();
589                return Err(error.into());
590            }
591        }
592
593        let verified_certificates = {
594            let _timer = self.metrics.cert_verification_latency.start_timer();
595            epoch_store
596                .signature_verifier
597                .multi_verify_certs(certificates.into())
598                .await
599                .into_iter()
600                .collect::<Result<Vec<_>, _>>()?
601        };
602
603        {
604            // code block within reconfiguration lock
605            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
606            if !reconfiguration_lock.should_accept_user_certs() {
607                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
608                return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
609            }
610
611            // 3) All certificates are sent to consensus (at least by some authorities)
612            // For shared objects this will wait until either timeout or we have heard back
613            // from consensus. For owned objects this will return without
614            // waiting for certificate to be sequenced First do quick dirty
615            // non-async check.
616            if !epoch_store
617                .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
618            {
619                let _metrics_guard = if shared_object_tx {
620                    Some(self.metrics.consensus_latency.start_timer())
621                } else {
622                    None
623                };
624                let transactions = verified_certificates
625                    .iter()
626                    .map(|certificate| {
627                        ConsensusTransaction::new_certificate_message(
628                            &self.state.name,
629                            certificate.clone().into(),
630                        )
631                    })
632                    .collect::<Vec<_>>();
633                self.consensus_adapter.submit_batch(
634                    &transactions,
635                    Some(&reconfiguration_lock),
636                    epoch_store,
637                )?;
638                // Do not wait for the result, because the transaction might
639                // have already executed. Instead, check or wait
640                // for the existence of certificate effects below.
641            }
642        }
643
644        if !wait_for_effects {
645            // It is useful to enqueue owned object transaction for execution locally,
646            // even when we are not returning effects to user
647            let certificates_without_shared_objects = verified_certificates
648                .iter()
649                .filter(|certificate| !certificate.contains_shared_object())
650                .cloned()
651                .collect::<Vec<_>>();
652            if !certificates_without_shared_objects.is_empty() {
653                self.state.enqueue_certificates_for_execution(
654                    certificates_without_shared_objects,
655                    epoch_store,
656                );
657            }
658            return Ok((None, Weight::zero()));
659        }
660
661        // 4) Execute the certificates immediately if they contain only owned object
662        //    transactions,
663        // or wait for the execution results if it contains shared objects.
664        let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
665            |certificate| async move {
666                let effects = self
667                    .state
668                    .execute_certificate(&certificate, epoch_store)
669                    .await?;
670                let events = if include_events {
671                    if let Some(digest) = effects.events_digest() {
672                        Some(self.state.get_transaction_events(digest)?)
673                    } else {
674                        None
675                    }
676                } else {
677                    None
678                };
679
680                let input_objects = include_input_objects
681                    .then(|| self.state.get_transaction_input_objects(&effects))
682                    .and_then(Result::ok);
683
684                let output_objects = include_output_objects
685                    .then(|| self.state.get_transaction_output_objects(&effects))
686                    .and_then(Result::ok);
687
688                let signed_effects = self.state.sign_effects(effects, epoch_store)?;
689                epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
690
691                Ok::<_, IotaError>(HandleCertificateResponseV1 {
692                    signed_effects: signed_effects.into_inner(),
693                    events,
694                    input_objects,
695                    output_objects,
696                    auxiliary_data: None, // We don't have any aux data generated presently
697                })
698            },
699        ))
700        .await?;
701
702        Ok((Some(responses), Weight::zero()))
703    }
704}
705
706type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
707
708impl ValidatorService {
709    async fn transaction_impl(
710        &self,
711        request: tonic::Request<Transaction>,
712    ) -> WrappedServiceResponse<HandleTransactionResponse> {
713        self.handle_transaction(request).await
714    }
715
716    async fn submit_certificate_impl(
717        &self,
718        request: tonic::Request<CertifiedTransaction>,
719    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
720        let epoch_store = self.state.load_epoch_store_one_call_per_task();
721        let certificate = request.into_inner();
722        certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
723
724        let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
725        self.handle_certificates(
726            nonempty![certificate],
727            true,
728            false,
729            false,
730            false,
731            &epoch_store,
732            false,
733        )
734        .instrument(span)
735        .await
736        .map(|(executed, spam_weight)| {
737            (
738                tonic::Response::new(SubmitCertificateResponse {
739                    executed: executed.map(|mut x| x.remove(0)),
740                }),
741                spam_weight,
742            )
743        })
744    }
745
746    async fn handle_certificate_v1_impl(
747        &self,
748        request: tonic::Request<HandleCertificateRequestV1>,
749    ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
750        let epoch_store = self.state.load_epoch_store_one_call_per_task();
751        let request = request.into_inner();
752        request
753            .certificate
754            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
755
756        let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
757        self.handle_certificates(
758            nonempty![request.certificate],
759            request.include_events,
760            request.include_input_objects,
761            request.include_output_objects,
762            request.include_auxiliary_data,
763            &epoch_store,
764            true,
765        )
766        .instrument(span)
767        .await
768        .map(|(resp, spam_weight)| {
769            (
770                tonic::Response::new(
771                    resp.expect(
772                        "handle_certificate should not return none with wait_for_effects=true",
773                    )
774                    .remove(0),
775                ),
776                spam_weight,
777            )
778        })
779    }
780
781    async fn soft_bundle_validity_check(
782        &self,
783        certificates: &NonEmpty<CertifiedTransaction>,
784        epoch_store: &Arc<AuthorityPerEpochStore>,
785        total_size_bytes: u64,
786    ) -> Result<(), tonic::Status> {
787        let protocol_config = epoch_store.protocol_config();
788
789        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
790        // - All certs must access at least one shared object.
791        // - All certs must not be already executed.
792        // - All certs must have the same gas price.
793        // - Number of certs must not exceed the max allowed.
794        // - Total size of all certs must not exceed the max allowed.
795        fp_ensure!(
796            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
797            IotaError::UserInput {
798                error: UserInputError::TooManyTransactionsInSoftBundle {
799                    limit: protocol_config.max_soft_bundle_size()
800                }
801            }
802            .into()
803        );
804
805        // We set the soft bundle max size to be half of the consensus max transactions
806        // in block size. We do this to account for serialization overheads and
807        // to ensure that the soft bundle is not too large when is attempted to be
808        // posted via consensus. Although half the block size is on the extreme
809        // side, it's should be good enough for now.
810        let soft_bundle_max_size_bytes =
811            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
812        fp_ensure!(
813            total_size_bytes <= soft_bundle_max_size_bytes,
814            IotaError::UserInput {
815                error: UserInputError::SoftBundleTooLarge {
816                    size: total_size_bytes,
817                    limit: soft_bundle_max_size_bytes,
818                },
819            }
820            .into()
821        );
822
823        let mut gas_price = None;
824        for certificate in certificates {
825            let tx_digest = *certificate.digest();
826            fp_ensure!(
827                certificate.contains_shared_object(),
828                IotaError::UserInput {
829                    error: UserInputError::NoSharedObject { digest: tx_digest }
830                }
831                .into()
832            );
833            fp_ensure!(
834                !self.state.is_tx_already_executed(&tx_digest)?,
835                IotaError::UserInput {
836                    error: UserInputError::AlreadyExecuted { digest: tx_digest }
837                }
838                .into()
839            );
840            if let Some(gas) = gas_price {
841                fp_ensure!(
842                    gas == certificate.gas_price(),
843                    IotaError::UserInput {
844                        error: UserInputError::GasPriceMismatch {
845                            digest: tx_digest,
846                            expected: gas,
847                            actual: certificate.gas_price()
848                        }
849                    }
850                    .into()
851                );
852            } else {
853                gas_price = Some(certificate.gas_price());
854            }
855        }
856
857        // For Soft Bundle, if at this point we know at least one certificate has
858        // already been processed, reject the entire bundle.  Otherwise, submit
859        // all certificates in one request. This is not a strict check as there
860        // may be race conditions where one or more certificates are
861        // already being processed by another actor, and we could not know it.
862        fp_ensure!(
863            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
864            IotaError::UserInput {
865                error: UserInputError::CertificateAlreadyProcessed
866            }
867            .into()
868        );
869
870        Ok(())
871    }
872
873    async fn handle_soft_bundle_certificates_v1_impl(
874        &self,
875        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
876    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
877        let epoch_store = self.state.load_epoch_store_one_call_per_task();
878        let client_addr = if self.client_id_source.is_none() {
879            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
880        } else {
881            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
882        };
883        let request = request.into_inner();
884
885        let certificates =
886            NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
887        let mut total_size_bytes = 0;
888        for certificate in &certificates {
889            // We need to check this first because we haven't verified the cert signature.
890            total_size_bytes += certificate
891                .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
892                as u64;
893        }
894
895        self.metrics
896            .handle_soft_bundle_certificates_count
897            .observe(certificates.len() as f64);
898
899        self.metrics
900            .handle_soft_bundle_certificates_size_bytes
901            .observe(total_size_bytes as f64);
902
903        // Now that individual certificates are valid, we check if the bundle is valid.
904        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
905            .await?;
906
907        info!(
908            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
909            certificates.len(),
910            client_addr
911                .map(|x| x.to_string())
912                .unwrap_or_else(|| "unknown".to_string()),
913            certificates
914                .iter()
915                .map(|x| x.digest().to_string())
916                .collect::<Vec<_>>()
917                .join(", "),
918            total_size_bytes
919        );
920
921        let span = error_span!("handle_soft_bundle_certificates_v1");
922        self.handle_certificates(
923            certificates,
924            request.include_events,
925            request.include_input_objects,
926            request.include_output_objects,
927            request.include_auxiliary_data,
928            &epoch_store,
929            request.wait_for_effects,
930        )
931        .instrument(span)
932        .await
933        .map(|(resp, spam_weight)| {
934            (
935                tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
936                    responses: resp.unwrap_or_default(),
937                }),
938                spam_weight,
939            )
940        })
941    }
942
943    async fn object_info_impl(
944        &self,
945        request: tonic::Request<ObjectInfoRequest>,
946    ) -> WrappedServiceResponse<ObjectInfoResponse> {
947        let request = request.into_inner();
948        let response = self.state.handle_object_info_request(request).await?;
949        Ok((tonic::Response::new(response), Weight::one()))
950    }
951
952    async fn transaction_info_impl(
953        &self,
954        request: tonic::Request<TransactionInfoRequest>,
955    ) -> WrappedServiceResponse<TransactionInfoResponse> {
956        let request = request.into_inner();
957        let response = self.state.handle_transaction_info_request(request).await?;
958        Ok((tonic::Response::new(response), Weight::one()))
959    }
960
961    async fn checkpoint_impl(
962        &self,
963        request: tonic::Request<CheckpointRequest>,
964    ) -> WrappedServiceResponse<CheckpointResponse> {
965        let request = request.into_inner();
966        let response = self.state.handle_checkpoint_request(&request)?;
967        Ok((tonic::Response::new(response), Weight::one()))
968    }
969
970    async fn get_system_state_object_impl(
971        &self,
972        _request: tonic::Request<SystemStateRequest>,
973    ) -> WrappedServiceResponse<IotaSystemState> {
974        let response = self
975            .state
976            .get_object_cache_reader()
977            .get_iota_system_state_object_unsafe()?;
978        Ok((tonic::Response::new(response), Weight::one()))
979    }
980
981    fn get_client_ip_addr<T>(
982        &self,
983        request: &tonic::Request<T>,
984        source: &ClientIdSource,
985    ) -> Option<IpAddr> {
986        match source {
987            ClientIdSource::SocketAddr => {
988                let socket_addr: Option<SocketAddr> = request.remote_addr();
989
990                // We will hit this case if the IO type used does not
991                // implement Connected or when using a unix domain socket.
992                // TODO: once we have confirmed that no legitimate traffic
993                // is hitting this case, we should reject such requests that
994                // hit this case.
995                if let Some(socket_addr) = socket_addr {
996                    Some(socket_addr.ip())
997                } else {
998                    if cfg!(msim) {
999                        // Ignore the error from simtests.
1000                    } else if cfg!(test) {
1001                        panic!("Failed to get remote address from request");
1002                    } else {
1003                        self.metrics.connection_ip_not_found.inc();
1004                        error!("Failed to get remote address from request");
1005                    }
1006                    None
1007                }
1008            }
1009            ClientIdSource::XForwardedFor(num_hops) => {
1010                let do_header_parse = |op: &MetadataValue<Ascii>| {
1011                    match op.to_str() {
1012                        Ok(header_val) => {
1013                            let header_contents =
1014                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1015                            if *num_hops == 0 {
1016                                error!(
1017                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1018                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1019                                    to this node. Skipping traffic controller request handling.",
1020                                    header_contents,
1021                                );
1022                                return None;
1023                            }
1024                            let contents_len = header_contents.len();
1025                            if contents_len < *num_hops {
1026                                error!(
1027                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1028                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1029                                    `client-id-source` in the node config.",
1030                                    header_contents, contents_len, num_hops, contents_len,
1031                                );
1032                                self.metrics.client_id_source_config_mismatch.inc();
1033                                return None;
1034                            }
1035                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1036                            else {
1037                                error!(
1038                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1039                                    Expected at least {} values. Skipping traffic controller request handling.",
1040                                    header_contents, contents_len, num_hops, contents_len,
1041                                );
1042                                return None;
1043                            };
1044                            parse_ip(client_ip).or_else(|| {
1045                                self.metrics.forwarded_header_parse_error.inc();
1046                                None
1047                            })
1048                        }
1049                        Err(e) => {
1050                            // TODO: once we have confirmed that no legitimate traffic
1051                            // is hitting this case, we should reject such requests that
1052                            // hit this case.
1053                            self.metrics.forwarded_header_invalid.inc();
1054                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1055                            None
1056                        }
1057                    }
1058                };
1059                if let Some(op) = request.metadata().get("x-forwarded-for") {
1060                    do_header_parse(op)
1061                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1062                    do_header_parse(op)
1063                } else {
1064                    self.metrics.forwarded_header_not_included.inc();
1065                    error!(
1066                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1067                    );
1068                    None
1069                }
1070            }
1071        }
1072    }
1073
1074    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1075        if let Some(traffic_controller) = &self.traffic_controller {
1076            if !traffic_controller.check(&client, &None).await {
1077                // Entity in blocklist
1078                Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1079            } else {
1080                Ok(())
1081            }
1082        } else {
1083            Ok(())
1084        }
1085    }
1086
1087    fn handle_traffic_resp<T>(
1088        &self,
1089        client: Option<IpAddr>,
1090        wrapped_response: WrappedServiceResponse<T>,
1091    ) -> Result<tonic::Response<T>, tonic::Status> {
1092        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1093            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1094            Err(status) => (
1095                Some(IotaError::from(status.clone())),
1096                Weight::zero(),
1097                Err(status.clone()),
1098            ),
1099        };
1100
1101        if let Some(traffic_controller) = self.traffic_controller.clone() {
1102            traffic_controller.tally(TrafficTally {
1103                direct: client,
1104                through_fullnode: None,
1105                error_info: error.map(|e| {
1106                    let error_type = String::from(e.clone().as_ref());
1107                    let error_weight = normalize(e);
1108                    (error_weight, error_type)
1109                }),
1110                spam_weight,
1111                timestamp: SystemTime::now(),
1112            })
1113        }
1114        unwrapped_response
1115    }
1116}
1117
1118fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1119    // simulate a TCP connection, which would have added extensions to
1120    // the request object that would be used downstream
1121    let mut request = tonic::Request::new(message);
1122    let tcp_connect_info = TcpConnectInfo {
1123        local_addr: None,
1124        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1125    };
1126    request.extensions_mut().insert(tcp_connect_info);
1127    request
1128}
1129
1130// TODO: refine error matching here
1131fn normalize(err: IotaError) -> Weight {
1132    match err {
1133        IotaError::UserInput {
1134            error: UserInputError::IncorrectUserSignature { .. },
1135        } => Weight::one(),
1136        IotaError::InvalidSignature { .. }
1137        | IotaError::SignerSignatureAbsent { .. }
1138        | IotaError::SignerSignatureNumberMismatch { .. }
1139        | IotaError::IncorrectSigner { .. }
1140        | IotaError::UnknownSigner { .. }
1141        | IotaError::WrongEpoch { .. } => Weight::one(),
1142        _ => Weight::zero(),
1143    }
1144}
1145
1146/// Implements generic pre- and post-processing. Since this is on the critical
1147/// path, any heavy lifting should be done in a separate non-blocking task
1148/// unless it is necessary to override the return value.
1149#[macro_export]
1150macro_rules! handle_with_decoration {
1151    ($self:ident, $func_name:ident, $request:ident) => {{
1152        if $self.client_id_source.is_none() {
1153            return $self.$func_name($request).await.map(|(result, _)| result);
1154        }
1155
1156        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1157
1158        // check if either IP is blocked, in which case return early
1159        $self.handle_traffic_req(client.clone()).await?;
1160
1161        // handle traffic tallying
1162        let wrapped_response = $self.$func_name($request).await;
1163        $self.handle_traffic_resp(client, wrapped_response)
1164    }};
1165}
1166
1167#[async_trait]
1168impl Validator for ValidatorService {
1169    /// Handles a `Transaction` request.
1170    async fn transaction(
1171        &self,
1172        request: tonic::Request<Transaction>,
1173    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1174        let validator_service = self.clone();
1175
1176        // Spawns a task which handles the transaction. The task will unconditionally
1177        // continue processing in the event that the client connection is
1178        // dropped.
1179        spawn_monitored_task!(async move {
1180            // NB: traffic tally wrapping handled within the task rather than on task exit
1181            // to prevent an attacker from subverting traffic control by severing the
1182            // connection
1183            handle_with_decoration!(validator_service, transaction_impl, request)
1184        })
1185        .await
1186        .unwrap()
1187    }
1188
1189    /// Submits a `CertifiedTransaction` request.
1190    async fn submit_certificate(
1191        &self,
1192        request: tonic::Request<CertifiedTransaction>,
1193    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1194        let validator_service = self.clone();
1195
1196        // Spawns a task which handles the certificate. The task will unconditionally
1197        // continue processing in the event that the client connection is
1198        // dropped.
1199        spawn_monitored_task!(async move {
1200            // NB: traffic tally wrapping handled within the task rather than on task exit
1201            // to prevent an attacker from subverting traffic control by severing the
1202            // connection.
1203            handle_with_decoration!(validator_service, submit_certificate_impl, request)
1204        })
1205        .await
1206        .unwrap()
1207    }
1208
1209    async fn handle_certificate_v1(
1210        &self,
1211        request: tonic::Request<HandleCertificateRequestV1>,
1212    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1213        handle_with_decoration!(self, handle_certificate_v1_impl, request)
1214    }
1215
1216    async fn handle_soft_bundle_certificates_v1(
1217        &self,
1218        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1219    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1220        handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1221    }
1222
1223    /// Handles an `ObjectInfoRequest` request.
1224    async fn object_info(
1225        &self,
1226        request: tonic::Request<ObjectInfoRequest>,
1227    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1228        handle_with_decoration!(self, object_info_impl, request)
1229    }
1230
1231    /// Handles a `TransactionInfoRequest` request.
1232    async fn transaction_info(
1233        &self,
1234        request: tonic::Request<TransactionInfoRequest>,
1235    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1236        handle_with_decoration!(self, transaction_info_impl, request)
1237    }
1238
1239    /// Handles a `CheckpointRequest` request.
1240    async fn checkpoint(
1241        &self,
1242        request: tonic::Request<CheckpointRequest>,
1243    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1244        handle_with_decoration!(self, checkpoint_impl, request)
1245    }
1246
1247    /// Gets the `IotaSystemState` response.
1248    async fn get_system_state_object(
1249        &self,
1250        request: tonic::Request<SystemStateRequest>,
1251    ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1252        handle_with_decoration!(self, get_system_state_object_impl, request)
1253    }
1254}