iota_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    io,
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19    api::{Validator, ValidatorServer},
20    tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24    effects::TransactionEffectsAPI,
25    error::*,
26    fp_ensure,
27    iota_system_state::IotaSystemState,
28    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29    messages_consensus::ConsensusTransaction,
30    messages_grpc::{
31        HandleCertificateRequestV1, HandleCertificateResponseV1,
32        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
33        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
34        SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
35        TransactionInfoResponse,
36    },
37    multiaddr::Multiaddr,
38    traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
39    transaction::*,
40};
41use nonempty::{NonEmpty, nonempty};
42use prometheus::{
43    Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
44    register_int_counter_vec_with_registry, register_int_counter_with_registry,
45};
46use tap::TapFallible;
47use tonic::{
48    metadata::{Ascii, MetadataValue},
49    transport::server::TcpConnectInfo,
50};
51use tracing::{Instrument, error, error_span, info};
52
53use crate::{
54    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
55    checkpoints::CheckpointStore,
56    consensus_adapter::{
57        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
58    },
59    mysticeti_adapter::LazyMysticetiClient,
60    traffic_controller::{
61        TrafficController, metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally,
62    },
63};
64
65#[cfg(test)]
66#[path = "unit_tests/server_tests.rs"]
67mod server_tests;
68
69/// A handle to the authority server.
70pub struct AuthorityServerHandle {
71    server_handle: iota_network_stack::server::Server,
72}
73
74impl AuthorityServerHandle {
75    /// Waits for the server to complete.
76    pub async fn join(self) -> Result<(), io::Error> {
77        self.server_handle.handle().wait_for_shutdown().await;
78        Ok(())
79    }
80
81    /// Kills the server.
82    pub async fn kill(self) -> Result<(), io::Error> {
83        self.server_handle.handle().shutdown().await;
84        Ok(())
85    }
86
87    /// Returns the address of the server.
88    pub fn address(&self) -> &Multiaddr {
89        self.server_handle.local_addr()
90    }
91}
92
93/// An authority server that is used for testing.
94pub struct AuthorityServer {
95    address: Multiaddr,
96    pub state: Arc<AuthorityState>,
97    consensus_adapter: Arc<ConsensusAdapter>,
98    pub metrics: Arc<ValidatorServiceMetrics>,
99}
100
101impl AuthorityServer {
102    /// Creates a new `AuthorityServer` for testing with a consensus adapter.
103    pub fn new_for_test_with_consensus_adapter(
104        state: Arc<AuthorityState>,
105        consensus_adapter: Arc<ConsensusAdapter>,
106    ) -> Self {
107        let address = new_local_tcp_address_for_testing();
108        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
109
110        Self {
111            address,
112            state,
113            consensus_adapter,
114            metrics,
115        }
116    }
117
118    /// Creates a new `AuthorityServer` for testing.
119    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
120        let consensus_adapter = Arc::new(ConsensusAdapter::new(
121            Arc::new(LazyMysticetiClient::new()),
122            CheckpointStore::new_for_tests(),
123            state.name,
124            Arc::new(ConnectionMonitorStatusForTests {}),
125            100_000,
126            100_000,
127            None,
128            None,
129            ConsensusAdapterMetrics::new_test(),
130        ));
131        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
132    }
133
134    /// Spawns the server.
135    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
136        let address = self.address.clone();
137        self.spawn_with_bind_address_for_test(address).await
138    }
139
140    /// Spawns the server with a bind address.
141    pub async fn spawn_with_bind_address_for_test(
142        self,
143        address: Multiaddr,
144    ) -> Result<AuthorityServerHandle, io::Error> {
145        let tls_config = iota_tls::create_rustls_server_config(
146            self.state.config.network_key_pair().copy().private(),
147            IOTA_TLS_SERVER_NAME.to_string(),
148        );
149        let server = iota_network_stack::config::Config::new()
150            .server_builder()
151            .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
152                self.state,
153                self.consensus_adapter,
154                self.metrics,
155            )))
156            .bind(&address, Some(tls_config))
157            .await
158            .unwrap();
159        let local_addr = server.local_addr().to_owned();
160        info!("Listening to traffic on {local_addr}");
161        let handle = AuthorityServerHandle {
162            server_handle: server,
163        };
164        Ok(handle)
165    }
166}
167
168/// Metrics for the validator service.
169pub struct ValidatorServiceMetrics {
170    pub signature_errors: IntCounter,
171    pub tx_verification_latency: Histogram,
172    pub cert_verification_latency: Histogram,
173    pub consensus_latency: Histogram,
174    pub handle_transaction_latency: Histogram,
175    pub submit_certificate_consensus_latency: Histogram,
176    pub handle_certificate_consensus_latency: Histogram,
177    pub handle_certificate_non_consensus_latency: Histogram,
178    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
179    pub handle_soft_bundle_certificates_count: Histogram,
180    pub handle_soft_bundle_certificates_size_bytes: Histogram,
181
182    num_rejected_tx_in_epoch_boundary: IntCounter,
183    num_rejected_cert_in_epoch_boundary: IntCounter,
184    num_rejected_tx_during_overload: IntCounterVec,
185    num_rejected_cert_during_overload: IntCounterVec,
186    connection_ip_not_found: IntCounter,
187    forwarded_header_parse_error: IntCounter,
188    forwarded_header_invalid: IntCounter,
189    forwarded_header_not_included: IntCounter,
190    client_id_source_config_mismatch: IntCounter,
191}
192
193impl ValidatorServiceMetrics {
194    /// Creates a new `ValidatorServiceMetrics` with Prometheus registry.
195    pub fn new(registry: &Registry) -> Self {
196        Self {
197            signature_errors: register_int_counter_with_registry!(
198                "total_signature_errors",
199                "Number of transaction signature errors",
200                registry,
201            )
202            .unwrap(),
203            tx_verification_latency: register_histogram_with_registry!(
204                "validator_service_tx_verification_latency",
205                "Latency of verifying a transaction",
206                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
207                registry,
208            )
209            .unwrap(),
210            cert_verification_latency: register_histogram_with_registry!(
211                "validator_service_cert_verification_latency",
212                "Latency of verifying a certificate",
213                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
214                registry,
215            )
216            .unwrap(),
217            consensus_latency: register_histogram_with_registry!(
218                "validator_service_consensus_latency",
219                "Time spent between submitting a shared obj txn to consensus and getting result",
220                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
221                registry,
222            )
223            .unwrap(),
224            handle_transaction_latency: register_histogram_with_registry!(
225                "validator_service_handle_transaction_latency",
226                "Latency of handling a transaction",
227                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
228                registry,
229            )
230            .unwrap(),
231            handle_certificate_consensus_latency: register_histogram_with_registry!(
232                "validator_service_handle_certificate_consensus_latency",
233                "Latency of handling a consensus transaction certificate",
234                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
235                registry,
236            )
237            .unwrap(),
238            submit_certificate_consensus_latency: register_histogram_with_registry!(
239                "validator_service_submit_certificate_consensus_latency",
240                "Latency of submit_certificate RPC handler",
241                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
242                registry,
243            )
244            .unwrap(),
245            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
246                "validator_service_handle_certificate_non_consensus_latency",
247                "Latency of handling a non-consensus transaction certificate",
248                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249                registry,
250            )
251            .unwrap(),
252            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
253                "validator_service_handle_soft_bundle_certificates_consensus_latency",
254                "Latency of handling a consensus soft bundle",
255                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
256                registry,
257            )
258            .unwrap(),
259            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
260                "validator_service_handle_soft_bundle_certificates_count",
261                "The number of certificates included in a soft bundle",
262                iota_metrics::COUNT_BUCKETS.to_vec(),
263                registry,
264            )
265            .unwrap(),
266            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
267                "validator_service_handle_soft_bundle_certificates_size_bytes",
268                "The size of soft bundle in bytes",
269                iota_metrics::BYTES_BUCKETS.to_vec(),
270                registry,
271            )
272            .unwrap(),
273            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
274                "validator_service_num_rejected_tx_in_epoch_boundary",
275                "Number of rejected transaction during epoch transitioning",
276                registry,
277            )
278            .unwrap(),
279            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
280                "validator_service_num_rejected_cert_in_epoch_boundary",
281                "Number of rejected transaction certificate during epoch transitioning",
282                registry,
283            )
284            .unwrap(),
285            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
286                "validator_service_num_rejected_tx_during_overload",
287                "Number of rejected transaction due to system overload",
288                &["error_type"],
289                registry,
290            )
291            .unwrap(),
292            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
293                "validator_service_num_rejected_cert_during_overload",
294                "Number of rejected transaction certificate due to system overload",
295                &["error_type"],
296                registry,
297            )
298            .unwrap(),
299            connection_ip_not_found: register_int_counter_with_registry!(
300                "validator_service_connection_ip_not_found",
301                "Number of times connection IP was not extractable from request",
302                registry,
303            )
304            .unwrap(),
305            forwarded_header_parse_error: register_int_counter_with_registry!(
306                "validator_service_forwarded_header_parse_error",
307                "Number of times x-forwarded-for header could not be parsed",
308                registry,
309            )
310            .unwrap(),
311            forwarded_header_invalid: register_int_counter_with_registry!(
312                "validator_service_forwarded_header_invalid",
313                "Number of times x-forwarded-for header was invalid",
314                registry,
315            )
316            .unwrap(),
317            forwarded_header_not_included: register_int_counter_with_registry!(
318                "validator_service_forwarded_header_not_included",
319                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
320                registry,
321            )
322            .unwrap(),
323            client_id_source_config_mismatch: register_int_counter_with_registry!(
324                "validator_service_client_id_source_config_mismatch",
325                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
326                registry,
327            )
328            .unwrap(),
329        }
330    }
331
332    /// Creates a new `ValidatorServiceMetrics` for testing.
333    pub fn new_for_tests() -> Self {
334        let registry = Registry::new();
335        Self::new(&registry)
336    }
337}
338
339/// The validator service.
340#[derive(Clone)]
341pub struct ValidatorService {
342    state: Arc<AuthorityState>,
343    consensus_adapter: Arc<ConsensusAdapter>,
344    metrics: Arc<ValidatorServiceMetrics>,
345    traffic_controller: Option<Arc<TrafficController>>,
346    client_id_source: Option<ClientIdSource>,
347}
348
349impl ValidatorService {
350    /// Creates a new `ValidatorService`.
351    pub fn new(
352        state: Arc<AuthorityState>,
353        consensus_adapter: Arc<ConsensusAdapter>,
354        validator_metrics: Arc<ValidatorServiceMetrics>,
355        traffic_controller_metrics: TrafficControllerMetrics,
356        policy_config: Option<PolicyConfig>,
357        firewall_config: Option<RemoteFirewallConfig>,
358    ) -> Self {
359        Self {
360            state,
361            consensus_adapter,
362            metrics: validator_metrics,
363            traffic_controller: policy_config.clone().map(|policy| {
364                Arc::new(TrafficController::init(
365                    policy,
366                    traffic_controller_metrics,
367                    firewall_config,
368                ))
369            }),
370            client_id_source: policy_config.map(|policy| policy.client_id_source),
371        }
372    }
373
374    pub fn new_for_tests(
375        state: Arc<AuthorityState>,
376        consensus_adapter: Arc<ConsensusAdapter>,
377        metrics: Arc<ValidatorServiceMetrics>,
378    ) -> Self {
379        Self {
380            state,
381            consensus_adapter,
382            metrics,
383            traffic_controller: None,
384            client_id_source: None,
385        }
386    }
387
388    /// Returns the validator state.
389    pub fn validator_state(&self) -> &Arc<AuthorityState> {
390        &self.state
391    }
392
393    /// Executes a `CertifiedTransaction` for testing.
394    pub async fn execute_certificate_for_testing(
395        &self,
396        cert: CertifiedTransaction,
397    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
398        let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
399        self.handle_certificate_v1(request).await
400    }
401
402    /// Handles a `Transaction` request for benchmarking.
403    pub async fn handle_transaction_for_benchmarking(
404        &self,
405        transaction: Transaction,
406    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
407        let request = make_tonic_request_for_testing(transaction);
408        self.transaction(request).await
409    }
410
411    /// Handles a `Transaction` request.
412    async fn handle_transaction(
413        &self,
414        request: tonic::Request<Transaction>,
415    ) -> WrappedServiceResponse<HandleTransactionResponse> {
416        let Self {
417            state,
418            consensus_adapter,
419            metrics,
420            traffic_controller: _,
421            client_id_source: _,
422        } = self.clone();
423        let transaction = request.into_inner();
424        let epoch_store = state.load_epoch_store_one_call_per_task();
425
426        transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
427
428        // When authority is overloaded and decide to reject this tx, we still lock the
429        // object and ask the client to retry in the future. This is because
430        // without locking, the input objects can be locked by a different tx in
431        // the future, however, the input objects may already be locked by this
432        // tx in other validators. This can cause non of the txes to have enough
433        // quorum to form a certificate, causing the objects to be locked for
434        // the entire epoch. By doing locking but pushback, retrying transaction will
435        // have higher chance to succeed.
436        let mut validator_pushback_error = None;
437        let overload_check_res = state.check_system_overload(
438            &consensus_adapter,
439            transaction.data(),
440            state.check_system_overload_at_signing(),
441        );
442        if let Err(error) = overload_check_res {
443            metrics
444                .num_rejected_tx_during_overload
445                .with_label_values(&[error.as_ref()])
446                .inc();
447            // TODO: consider change the behavior for other types of overload errors.
448            match error {
449                IotaError::ValidatorOverloadedRetryAfter { .. } => {
450                    validator_pushback_error = Some(error)
451                }
452                _ => return Err(error.into()),
453            }
454        }
455
456        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
457
458        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
459        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
460            metrics.signature_errors.inc();
461        })?;
462        drop(tx_verif_metrics_guard);
463
464        let tx_digest = transaction.digest();
465
466        // Enable Trace Propagation across spans/processes using tx_digest
467        let span = error_span!("validator_state_process_tx", ?tx_digest);
468
469        let info = state
470            .handle_transaction(&epoch_store, transaction.clone())
471            .instrument(span)
472            .await
473            .tap_err(|e| {
474                if let IotaError::ValidatorHaltedAtEpochEnd = e {
475                    metrics.num_rejected_tx_in_epoch_boundary.inc();
476                }
477            })?;
478
479        if let Some(error) = validator_pushback_error {
480            // TODO: right now, we still sign the txn, but just don't return it. We can also
481            // skip signing to save more CPU.
482            return Err(error.into());
483        }
484
485        Ok((tonic::Response::new(info), Weight::zero()))
486    }
487
488    // In addition to the response from handling the certificates,
489    // returns a bool indicating whether the request should be tallied
490    // toward spam count. In general, this should be set to true for
491    // requests that are read-only and thus do not consume gas, such
492    // as when the transaction is already executed.
493    async fn handle_certificates(
494        &self,
495        certificates: NonEmpty<CertifiedTransaction>,
496        include_events: bool,
497        include_input_objects: bool,
498        include_output_objects: bool,
499        _include_auxiliary_data: bool,
500        epoch_store: &Arc<AuthorityPerEpochStore>,
501        wait_for_effects: bool,
502    ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
503        // Validate if cert can be executed
504        // Fullnode does not serve handle_certificate call.
505        fp_ensure!(
506            !self.state.is_fullnode(epoch_store),
507            IotaError::FullNodeCantHandleCertificate.into()
508        );
509
510        let shared_object_tx = certificates
511            .iter()
512            .any(|cert| cert.contains_shared_object());
513
514        let metrics = if certificates.len() == 1 {
515            if wait_for_effects {
516                if shared_object_tx {
517                    &self.metrics.handle_certificate_consensus_latency
518                } else {
519                    &self.metrics.handle_certificate_non_consensus_latency
520                }
521            } else {
522                &self.metrics.submit_certificate_consensus_latency
523            }
524        } else {
525            // `soft_bundle_validity_check` ensured that all certificates contain shared
526            // objects.
527            &self
528                .metrics
529                .handle_soft_bundle_certificates_consensus_latency
530        };
531
532        let _metrics_guard = metrics.start_timer();
533
534        // 1) Check if the certificate is already executed. This is only needed when we
535        //    have only one certificate (not a soft bundle). When multiple certificates
536        //    are provided, we will either submit all of them or none of them to
537        //    consensus.
538        if certificates.len() == 1 {
539            let tx_digest = *certificates[0].digest();
540
541            if let Some(signed_effects) = self
542                .state
543                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
544            {
545                let events = if include_events {
546                    if let Some(digest) = signed_effects.events_digest() {
547                        Some(self.state.get_transaction_events(digest)?)
548                    } else {
549                        None
550                    }
551                } else {
552                    None
553                };
554
555                return Ok((
556                    Some(vec![HandleCertificateResponseV1 {
557                        signed_effects: signed_effects.into_inner(),
558                        events,
559                        input_objects: None,
560                        output_objects: None,
561                        auxiliary_data: None,
562                    }]),
563                    Weight::one(),
564                ));
565            };
566        }
567
568        // 2) Verify the certificates.
569        // Check system overload
570        for certificate in &certificates {
571            let overload_check_res = self.state.check_system_overload(
572                &self.consensus_adapter,
573                certificate.data(),
574                self.state.check_system_overload_at_execution(),
575            );
576            if let Err(error) = overload_check_res {
577                self.metrics
578                    .num_rejected_cert_during_overload
579                    .with_label_values(&[error.as_ref()])
580                    .inc();
581                return Err(error.into());
582            }
583        }
584
585        let verified_certificates = {
586            let _timer = self.metrics.cert_verification_latency.start_timer();
587            epoch_store
588                .signature_verifier
589                .multi_verify_certs(certificates.into())
590                .await
591                .into_iter()
592                .collect::<Result<Vec<_>, _>>()?
593        };
594
595        {
596            // code block within reconfiguration lock
597            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
598            if !reconfiguration_lock.should_accept_user_certs() {
599                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
600                return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
601            }
602
603            // 3) All certificates are sent to consensus (at least by some authorities)
604            // For shared objects this will wait until either timeout or we have heard back
605            // from consensus. For owned objects this will return without
606            // waiting for certificate to be sequenced First do quick dirty
607            // non-async check.
608            if !epoch_store
609                .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
610            {
611                let _metrics_guard = if shared_object_tx {
612                    Some(self.metrics.consensus_latency.start_timer())
613                } else {
614                    None
615                };
616                let transactions = verified_certificates
617                    .iter()
618                    .map(|certificate| {
619                        ConsensusTransaction::new_certificate_message(
620                            &self.state.name,
621                            certificate.clone().into(),
622                        )
623                    })
624                    .collect::<Vec<_>>();
625                self.consensus_adapter.submit_batch(
626                    &transactions,
627                    Some(&reconfiguration_lock),
628                    epoch_store,
629                )?;
630                // Do not wait for the result, because the transaction might
631                // have already executed. Instead, check or wait
632                // for the existence of certificate effects below.
633            }
634        }
635
636        if !wait_for_effects {
637            // It is useful to enqueue owned object transaction for execution locally,
638            // even when we are not returning effects to user
639            let certificates_without_shared_objects = verified_certificates
640                .iter()
641                .filter(|certificate| !certificate.contains_shared_object())
642                .cloned()
643                .collect::<Vec<_>>();
644            if !certificates_without_shared_objects.is_empty() {
645                self.state.enqueue_certificates_for_execution(
646                    certificates_without_shared_objects,
647                    epoch_store,
648                );
649            }
650            return Ok((None, Weight::zero()));
651        }
652
653        // 4) Execute the certificates immediately if they contain only owned object
654        //    transactions,
655        // or wait for the execution results if it contains shared objects.
656        let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
657            |certificate| async move {
658                let effects = self
659                    .state
660                    .execute_certificate(&certificate, epoch_store)
661                    .await?;
662                let events = if include_events {
663                    if let Some(digest) = effects.events_digest() {
664                        Some(self.state.get_transaction_events(digest)?)
665                    } else {
666                        None
667                    }
668                } else {
669                    None
670                };
671
672                let input_objects = include_input_objects
673                    .then(|| self.state.get_transaction_input_objects(&effects))
674                    .and_then(Result::ok);
675
676                let output_objects = include_output_objects
677                    .then(|| self.state.get_transaction_output_objects(&effects))
678                    .and_then(Result::ok);
679
680                let signed_effects = self.state.sign_effects(effects, epoch_store)?;
681                epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
682
683                Ok::<_, IotaError>(HandleCertificateResponseV1 {
684                    signed_effects: signed_effects.into_inner(),
685                    events,
686                    input_objects,
687                    output_objects,
688                    auxiliary_data: None, // We don't have any aux data generated presently
689                })
690            },
691        ))
692        .await?;
693
694        Ok((Some(responses), Weight::zero()))
695    }
696}
697
698type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
699
700impl ValidatorService {
701    async fn transaction_impl(
702        &self,
703        request: tonic::Request<Transaction>,
704    ) -> WrappedServiceResponse<HandleTransactionResponse> {
705        self.handle_transaction(request).await
706    }
707
708    async fn submit_certificate_impl(
709        &self,
710        request: tonic::Request<CertifiedTransaction>,
711    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
712        let epoch_store = self.state.load_epoch_store_one_call_per_task();
713        let certificate = request.into_inner();
714        certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
715
716        let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
717        self.handle_certificates(
718            nonempty![certificate],
719            true,
720            false,
721            false,
722            false,
723            &epoch_store,
724            false,
725        )
726        .instrument(span)
727        .await
728        .map(|(executed, spam_weight)| {
729            (
730                tonic::Response::new(SubmitCertificateResponse {
731                    executed: executed.map(|mut x| x.remove(0)),
732                }),
733                spam_weight,
734            )
735        })
736    }
737
738    async fn handle_certificate_v1_impl(
739        &self,
740        request: tonic::Request<HandleCertificateRequestV1>,
741    ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
742        let epoch_store = self.state.load_epoch_store_one_call_per_task();
743        let request = request.into_inner();
744        request
745            .certificate
746            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
747
748        let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
749        self.handle_certificates(
750            nonempty![request.certificate],
751            request.include_events,
752            request.include_input_objects,
753            request.include_output_objects,
754            request.include_auxiliary_data,
755            &epoch_store,
756            true,
757        )
758        .instrument(span)
759        .await
760        .map(|(resp, spam_weight)| {
761            (
762                tonic::Response::new(
763                    resp.expect(
764                        "handle_certificate should not return none with wait_for_effects=true",
765                    )
766                    .remove(0),
767                ),
768                spam_weight,
769            )
770        })
771    }
772
773    async fn soft_bundle_validity_check(
774        &self,
775        certificates: &NonEmpty<CertifiedTransaction>,
776        epoch_store: &Arc<AuthorityPerEpochStore>,
777        total_size_bytes: u64,
778    ) -> Result<(), tonic::Status> {
779        let protocol_config = epoch_store.protocol_config();
780
781        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
782        // - All certs must access at least one shared object.
783        // - All certs must not be already executed.
784        // - All certs must have the same gas price.
785        // - Number of certs must not exceed the max allowed.
786        // - Total size of all certs must not exceed the max allowed.
787        fp_ensure!(
788            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
789            IotaError::UserInput {
790                error: UserInputError::TooManyTransactionsInSoftBundle {
791                    limit: protocol_config.max_soft_bundle_size()
792                }
793            }
794            .into()
795        );
796
797        // We set the soft bundle max size to be half of the consensus max transactions
798        // in block size. We do this to account for serialization overheads and
799        // to ensure that the soft bundle is not too large when is attempted to be
800        // posted via consensus. Although half the block size is on the extreme
801        // side, it's should be good enough for now.
802        let soft_bundle_max_size_bytes =
803            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
804        fp_ensure!(
805            total_size_bytes <= soft_bundle_max_size_bytes,
806            IotaError::UserInput {
807                error: UserInputError::SoftBundleTooLarge {
808                    size: total_size_bytes,
809                    limit: soft_bundle_max_size_bytes,
810                },
811            }
812            .into()
813        );
814
815        let mut gas_price = None;
816        for certificate in certificates {
817            let tx_digest = *certificate.digest();
818            fp_ensure!(
819                certificate.contains_shared_object(),
820                IotaError::UserInput {
821                    error: UserInputError::NoSharedObject { digest: tx_digest }
822                }
823                .into()
824            );
825            fp_ensure!(
826                !self.state.try_is_tx_already_executed(&tx_digest)?,
827                IotaError::UserInput {
828                    error: UserInputError::AlreadyExecuted { digest: tx_digest }
829                }
830                .into()
831            );
832            if let Some(gas) = gas_price {
833                fp_ensure!(
834                    gas == certificate.gas_price(),
835                    IotaError::UserInput {
836                        error: UserInputError::GasPriceMismatch {
837                            digest: tx_digest,
838                            expected: gas,
839                            actual: certificate.gas_price()
840                        }
841                    }
842                    .into()
843                );
844            } else {
845                gas_price = Some(certificate.gas_price());
846            }
847        }
848
849        // For Soft Bundle, if at this point we know at least one certificate has
850        // already been processed, reject the entire bundle.  Otherwise, submit
851        // all certificates in one request. This is not a strict check as there
852        // may be race conditions where one or more certificates are
853        // already being processed by another actor, and we could not know it.
854        fp_ensure!(
855            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
856            IotaError::UserInput {
857                error: UserInputError::CertificateAlreadyProcessed
858            }
859            .into()
860        );
861
862        Ok(())
863    }
864
865    async fn handle_soft_bundle_certificates_v1_impl(
866        &self,
867        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
868    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
869        let epoch_store = self.state.load_epoch_store_one_call_per_task();
870        let client_addr = if self.client_id_source.is_none() {
871            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
872        } else {
873            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
874        };
875        let request = request.into_inner();
876
877        let certificates =
878            NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
879        let mut total_size_bytes = 0;
880        for certificate in &certificates {
881            // We need to check this first because we haven't verified the cert signature.
882            total_size_bytes += certificate
883                .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
884                as u64;
885        }
886
887        self.metrics
888            .handle_soft_bundle_certificates_count
889            .observe(certificates.len() as f64);
890
891        self.metrics
892            .handle_soft_bundle_certificates_size_bytes
893            .observe(total_size_bytes as f64);
894
895        // Now that individual certificates are valid, we check if the bundle is valid.
896        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
897            .await?;
898
899        info!(
900            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
901            certificates.len(),
902            client_addr
903                .map(|x| x.to_string())
904                .unwrap_or_else(|| "unknown".to_string()),
905            certificates
906                .iter()
907                .map(|x| x.digest().to_string())
908                .collect::<Vec<_>>()
909                .join(", "),
910            total_size_bytes
911        );
912
913        let span = error_span!("handle_soft_bundle_certificates_v1");
914        self.handle_certificates(
915            certificates,
916            request.include_events,
917            request.include_input_objects,
918            request.include_output_objects,
919            request.include_auxiliary_data,
920            &epoch_store,
921            request.wait_for_effects,
922        )
923        .instrument(span)
924        .await
925        .map(|(resp, spam_weight)| {
926            (
927                tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
928                    responses: resp.unwrap_or_default(),
929                }),
930                spam_weight,
931            )
932        })
933    }
934
935    async fn object_info_impl(
936        &self,
937        request: tonic::Request<ObjectInfoRequest>,
938    ) -> WrappedServiceResponse<ObjectInfoResponse> {
939        let request = request.into_inner();
940        let response = self.state.handle_object_info_request(request).await?;
941        Ok((tonic::Response::new(response), Weight::one()))
942    }
943
944    async fn transaction_info_impl(
945        &self,
946        request: tonic::Request<TransactionInfoRequest>,
947    ) -> WrappedServiceResponse<TransactionInfoResponse> {
948        let request = request.into_inner();
949        let response = self.state.handle_transaction_info_request(request).await?;
950        Ok((tonic::Response::new(response), Weight::one()))
951    }
952
953    async fn checkpoint_impl(
954        &self,
955        request: tonic::Request<CheckpointRequest>,
956    ) -> WrappedServiceResponse<CheckpointResponse> {
957        let request = request.into_inner();
958        let response = self.state.handle_checkpoint_request(&request)?;
959        Ok((tonic::Response::new(response), Weight::one()))
960    }
961
962    async fn get_system_state_object_impl(
963        &self,
964        _request: tonic::Request<SystemStateRequest>,
965    ) -> WrappedServiceResponse<IotaSystemState> {
966        let response = self
967            .state
968            .get_object_cache_reader()
969            .try_get_iota_system_state_object_unsafe()?;
970        Ok((tonic::Response::new(response), Weight::one()))
971    }
972
973    fn get_client_ip_addr<T>(
974        &self,
975        request: &tonic::Request<T>,
976        source: &ClientIdSource,
977    ) -> Option<IpAddr> {
978        match source {
979            ClientIdSource::SocketAddr => {
980                let socket_addr: Option<SocketAddr> = request.remote_addr();
981
982                // We will hit this case if the IO type used does not
983                // implement Connected or when using a unix domain socket.
984                // TODO: once we have confirmed that no legitimate traffic
985                // is hitting this case, we should reject such requests that
986                // hit this case.
987                if let Some(socket_addr) = socket_addr {
988                    Some(socket_addr.ip())
989                } else {
990                    if cfg!(msim) {
991                        // Ignore the error from simtests.
992                    } else if cfg!(test) {
993                        panic!("Failed to get remote address from request");
994                    } else {
995                        self.metrics.connection_ip_not_found.inc();
996                        error!("Failed to get remote address from request");
997                    }
998                    None
999                }
1000            }
1001            ClientIdSource::XForwardedFor(num_hops) => {
1002                let do_header_parse = |op: &MetadataValue<Ascii>| {
1003                    match op.to_str() {
1004                        Ok(header_val) => {
1005                            let header_contents =
1006                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1007                            if *num_hops == 0 {
1008                                error!(
1009                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1010                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1011                                    to this node. Skipping traffic controller request handling.",
1012                                    header_contents,
1013                                );
1014                                return None;
1015                            }
1016                            let contents_len = header_contents.len();
1017                            if contents_len < *num_hops {
1018                                error!(
1019                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1020                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1021                                    `client-id-source` in the node config.",
1022                                    header_contents, contents_len, num_hops, contents_len,
1023                                );
1024                                self.metrics.client_id_source_config_mismatch.inc();
1025                                return None;
1026                            }
1027                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1028                            else {
1029                                error!(
1030                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1031                                    Expected at least {} values. Skipping traffic controller request handling.",
1032                                    header_contents, contents_len, num_hops, contents_len,
1033                                );
1034                                return None;
1035                            };
1036                            parse_ip(client_ip).or_else(|| {
1037                                self.metrics.forwarded_header_parse_error.inc();
1038                                None
1039                            })
1040                        }
1041                        Err(e) => {
1042                            // TODO: once we have confirmed that no legitimate traffic
1043                            // is hitting this case, we should reject such requests that
1044                            // hit this case.
1045                            self.metrics.forwarded_header_invalid.inc();
1046                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1047                            None
1048                        }
1049                    }
1050                };
1051                if let Some(op) = request.metadata().get("x-forwarded-for") {
1052                    do_header_parse(op)
1053                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1054                    do_header_parse(op)
1055                } else {
1056                    self.metrics.forwarded_header_not_included.inc();
1057                    error!(
1058                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1059                    );
1060                    None
1061                }
1062            }
1063        }
1064    }
1065
1066    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1067        if let Some(traffic_controller) = &self.traffic_controller {
1068            if !traffic_controller.check(&client, &None).await {
1069                // Entity in blocklist
1070                Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1071            } else {
1072                Ok(())
1073            }
1074        } else {
1075            Ok(())
1076        }
1077    }
1078
1079    fn handle_traffic_resp<T>(
1080        &self,
1081        client: Option<IpAddr>,
1082        wrapped_response: WrappedServiceResponse<T>,
1083    ) -> Result<tonic::Response<T>, tonic::Status> {
1084        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1085            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1086            Err(status) => (
1087                Some(IotaError::from(status.clone())),
1088                Weight::zero(),
1089                Err(status.clone()),
1090            ),
1091        };
1092
1093        if let Some(traffic_controller) = self.traffic_controller.clone() {
1094            traffic_controller.tally(TrafficTally {
1095                direct: client,
1096                through_fullnode: None,
1097                error_info: error.map(|e| {
1098                    let error_type = String::from(e.clone().as_ref());
1099                    let error_weight = normalize(e);
1100                    (error_weight, error_type)
1101                }),
1102                spam_weight,
1103                timestamp: SystemTime::now(),
1104            })
1105        }
1106        unwrapped_response
1107    }
1108}
1109
1110fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1111    // simulate a TCP connection, which would have added extensions to
1112    // the request object that would be used downstream
1113    let mut request = tonic::Request::new(message);
1114    let tcp_connect_info = TcpConnectInfo {
1115        local_addr: None,
1116        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1117    };
1118    request.extensions_mut().insert(tcp_connect_info);
1119    request
1120}
1121
1122// TODO: refine error matching here
1123fn normalize(err: IotaError) -> Weight {
1124    match err {
1125        IotaError::UserInput {
1126            error: UserInputError::IncorrectUserSignature { .. },
1127        } => Weight::one(),
1128        IotaError::InvalidSignature { .. }
1129        | IotaError::SignerSignatureAbsent { .. }
1130        | IotaError::SignerSignatureNumberMismatch { .. }
1131        | IotaError::IncorrectSigner { .. }
1132        | IotaError::UnknownSigner { .. }
1133        | IotaError::WrongEpoch { .. } => Weight::one(),
1134        _ => Weight::zero(),
1135    }
1136}
1137
1138/// Implements generic pre- and post-processing. Since this is on the critical
1139/// path, any heavy lifting should be done in a separate non-blocking task
1140/// unless it is necessary to override the return value.
1141#[macro_export]
1142macro_rules! handle_with_decoration {
1143    ($self:ident, $func_name:ident, $request:ident) => {{
1144        if $self.client_id_source.is_none() {
1145            return $self.$func_name($request).await.map(|(result, _)| result);
1146        }
1147
1148        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1149
1150        // check if either IP is blocked, in which case return early
1151        $self.handle_traffic_req(client.clone()).await?;
1152
1153        // handle traffic tallying
1154        let wrapped_response = $self.$func_name($request).await;
1155        $self.handle_traffic_resp(client, wrapped_response)
1156    }};
1157}
1158
1159#[async_trait]
1160impl Validator for ValidatorService {
1161    /// Handles a `Transaction` request.
1162    async fn transaction(
1163        &self,
1164        request: tonic::Request<Transaction>,
1165    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1166        let validator_service = self.clone();
1167
1168        // Spawns a task which handles the transaction. The task will unconditionally
1169        // continue processing in the event that the client connection is
1170        // dropped.
1171        spawn_monitored_task!(async move {
1172            // NB: traffic tally wrapping handled within the task rather than on task exit
1173            // to prevent an attacker from subverting traffic control by severing the
1174            // connection
1175            handle_with_decoration!(validator_service, transaction_impl, request)
1176        })
1177        .await
1178        .unwrap()
1179    }
1180
1181    /// Submits a `CertifiedTransaction` request.
1182    async fn submit_certificate(
1183        &self,
1184        request: tonic::Request<CertifiedTransaction>,
1185    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1186        let validator_service = self.clone();
1187
1188        // Spawns a task which handles the certificate. The task will unconditionally
1189        // continue processing in the event that the client connection is
1190        // dropped.
1191        spawn_monitored_task!(async move {
1192            // NB: traffic tally wrapping handled within the task rather than on task exit
1193            // to prevent an attacker from subverting traffic control by severing the
1194            // connection.
1195            handle_with_decoration!(validator_service, submit_certificate_impl, request)
1196        })
1197        .await
1198        .unwrap()
1199    }
1200
1201    async fn handle_certificate_v1(
1202        &self,
1203        request: tonic::Request<HandleCertificateRequestV1>,
1204    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1205        handle_with_decoration!(self, handle_certificate_v1_impl, request)
1206    }
1207
1208    async fn handle_soft_bundle_certificates_v1(
1209        &self,
1210        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1211    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1212        handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1213    }
1214
1215    /// Handles an `ObjectInfoRequest` request.
1216    async fn object_info(
1217        &self,
1218        request: tonic::Request<ObjectInfoRequest>,
1219    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1220        handle_with_decoration!(self, object_info_impl, request)
1221    }
1222
1223    /// Handles a `TransactionInfoRequest` request.
1224    async fn transaction_info(
1225        &self,
1226        request: tonic::Request<TransactionInfoRequest>,
1227    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1228        handle_with_decoration!(self, transaction_info_impl, request)
1229    }
1230
1231    /// Handles a `CheckpointRequest` request.
1232    async fn checkpoint(
1233        &self,
1234        request: tonic::Request<CheckpointRequest>,
1235    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1236        handle_with_decoration!(self, checkpoint_impl, request)
1237    }
1238
1239    /// Gets the `IotaSystemState` response.
1240    async fn get_system_state_object(
1241        &self,
1242        request: tonic::Request<SystemStateRequest>,
1243    ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1244        handle_with_decoration!(self, get_system_state_object_impl, request)
1245    }
1246}