iota_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    io,
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
16use iota_metrics::spawn_monitored_task;
17use iota_network::{
18    api::{Validator, ValidatorServer},
19    tonic,
20};
21use iota_types::{
22    effects::TransactionEffectsAPI,
23    error::*,
24    fp_ensure,
25    iota_system_state::IotaSystemState,
26    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
27    messages_consensus::ConsensusTransaction,
28    messages_grpc::{
29        HandleCertificateRequestV1, HandleCertificateResponseV1,
30        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
31        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
32        SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
33        TransactionInfoResponse,
34    },
35    multiaddr::Multiaddr,
36    traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
37    transaction::*,
38};
39use nonempty::{NonEmpty, nonempty};
40use prometheus::{
41    Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
42    register_int_counter_vec_with_registry, register_int_counter_with_registry,
43};
44use tap::TapFallible;
45use tokio::task::JoinHandle;
46use tonic::{
47    metadata::{Ascii, MetadataValue},
48    transport::server::TcpConnectInfo,
49};
50use tracing::{Instrument, error, error_span, info};
51
52use crate::{
53    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
54    consensus_adapter::{
55        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
56    },
57    mysticeti_adapter::LazyMysticetiClient,
58    traffic_controller::{
59        TrafficController, metrics::TrafficControllerMetrics, policies::TrafficTally,
60    },
61};
62
63#[cfg(test)]
64#[path = "unit_tests/server_tests.rs"]
65mod server_tests;
66
67/// A handle to the authority server.
68pub struct AuthorityServerHandle {
69    tx_cancellation: tokio::sync::oneshot::Sender<()>,
70    local_addr: Multiaddr,
71    handle: JoinHandle<Result<(), tonic::transport::Error>>,
72}
73
74impl AuthorityServerHandle {
75    /// Waits for the server to complete.
76    pub async fn join(self) -> Result<(), io::Error> {
77        // Note that dropping `self.complete` would terminate the server.
78        self.handle.await?.map_err(io::Error::other)?;
79        Ok(())
80    }
81
82    /// Kills the server.
83    pub async fn kill(self) -> Result<(), io::Error> {
84        self.tx_cancellation
85            .send(())
86            .map_err(|_e| io::Error::other("could not send cancellation signal!"))?;
87        self.handle.await?.map_err(io::Error::other)?;
88        Ok(())
89    }
90
91    /// Returns the address of the server.
92    pub fn address(&self) -> &Multiaddr {
93        &self.local_addr
94    }
95}
96
97/// An authority server that is used for testing.
98pub struct AuthorityServer {
99    address: Multiaddr,
100    pub state: Arc<AuthorityState>,
101    consensus_adapter: Arc<ConsensusAdapter>,
102    pub metrics: Arc<ValidatorServiceMetrics>,
103}
104
105impl AuthorityServer {
106    /// Creates a new `AuthorityServer` for testing with a consensus adapter.
107    pub fn new_for_test_with_consensus_adapter(
108        state: Arc<AuthorityState>,
109        consensus_adapter: Arc<ConsensusAdapter>,
110    ) -> Self {
111        let address = new_local_tcp_address_for_testing();
112        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
113
114        Self {
115            address,
116            state,
117            consensus_adapter,
118            metrics,
119        }
120    }
121
122    /// Creates a new `AuthorityServer` for testing.
123    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
124        let consensus_adapter = Arc::new(ConsensusAdapter::new(
125            Arc::new(LazyMysticetiClient::new()),
126            state.name,
127            Arc::new(ConnectionMonitorStatusForTests {}),
128            100_000,
129            100_000,
130            None,
131            None,
132            ConsensusAdapterMetrics::new_test(),
133        ));
134        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
135    }
136
137    /// Spawns the server.
138    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
139        let address = self.address.clone();
140        self.spawn_with_bind_address_for_test(address).await
141    }
142
143    /// Spawns the server with a bind address.
144    pub async fn spawn_with_bind_address_for_test(
145        self,
146        address: Multiaddr,
147    ) -> Result<AuthorityServerHandle, io::Error> {
148        let mut server = iota_network_stack::config::Config::new()
149            .server_builder()
150            .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
151                self.state,
152                self.consensus_adapter,
153                self.metrics,
154            )))
155            .bind(&address)
156            .await
157            .unwrap();
158        let local_addr = server.local_addr().to_owned();
159        info!("Listening to traffic on {local_addr}");
160        let handle = AuthorityServerHandle {
161            tx_cancellation: server.take_cancel_handle().unwrap(),
162            local_addr,
163            handle: spawn_monitored_task!(server.serve()),
164        };
165        Ok(handle)
166    }
167}
168
169/// Metrics for the validator service.
170pub struct ValidatorServiceMetrics {
171    pub signature_errors: IntCounter,
172    pub tx_verification_latency: Histogram,
173    pub cert_verification_latency: Histogram,
174    pub consensus_latency: Histogram,
175    pub handle_transaction_latency: Histogram,
176    pub submit_certificate_consensus_latency: Histogram,
177    pub handle_certificate_consensus_latency: Histogram,
178    pub handle_certificate_non_consensus_latency: Histogram,
179    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
180    pub handle_soft_bundle_certificates_count: Histogram,
181    pub handle_soft_bundle_certificates_size_bytes: Histogram,
182
183    num_rejected_tx_in_epoch_boundary: IntCounter,
184    num_rejected_cert_in_epoch_boundary: IntCounter,
185    num_rejected_tx_during_overload: IntCounterVec,
186    num_rejected_cert_during_overload: IntCounterVec,
187    connection_ip_not_found: IntCounter,
188    forwarded_header_parse_error: IntCounter,
189    forwarded_header_invalid: IntCounter,
190    forwarded_header_not_included: IntCounter,
191}
192
193impl ValidatorServiceMetrics {
194    /// Creates a new `ValidatorServiceMetrics` with Prometheus registry.
195    pub fn new(registry: &Registry) -> Self {
196        Self {
197            signature_errors: register_int_counter_with_registry!(
198                "total_signature_errors",
199                "Number of transaction signature errors",
200                registry,
201            )
202            .unwrap(),
203            tx_verification_latency: register_histogram_with_registry!(
204                "validator_service_tx_verification_latency",
205                "Latency of verifying a transaction",
206                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
207                registry,
208            )
209            .unwrap(),
210            cert_verification_latency: register_histogram_with_registry!(
211                "validator_service_cert_verification_latency",
212                "Latency of verifying a certificate",
213                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
214                registry,
215            )
216            .unwrap(),
217            consensus_latency: register_histogram_with_registry!(
218                "validator_service_consensus_latency",
219                "Time spent between submitting a shared obj txn to consensus and getting result",
220                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
221                registry,
222            )
223            .unwrap(),
224            handle_transaction_latency: register_histogram_with_registry!(
225                "validator_service_handle_transaction_latency",
226                "Latency of handling a transaction",
227                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
228                registry,
229            )
230            .unwrap(),
231            handle_certificate_consensus_latency: register_histogram_with_registry!(
232                "validator_service_handle_certificate_consensus_latency",
233                "Latency of handling a consensus transaction certificate",
234                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
235                registry,
236            )
237            .unwrap(),
238            submit_certificate_consensus_latency: register_histogram_with_registry!(
239                "validator_service_submit_certificate_consensus_latency",
240                "Latency of submit_certificate RPC handler",
241                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
242                registry,
243            )
244            .unwrap(),
245            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
246                "validator_service_handle_certificate_non_consensus_latency",
247                "Latency of handling a non-consensus transaction certificate",
248                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249                registry,
250            )
251            .unwrap(),
252            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
253                "validator_service_handle_soft_bundle_certificates_consensus_latency",
254                "Latency of handling a consensus soft bundle",
255                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
256                registry,
257            )
258            .unwrap(),
259            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
260                "validator_service_handle_soft_bundle_certificates_count",
261                "The number of certificates included in a soft bundle",
262                iota_metrics::COUNT_BUCKETS.to_vec(),
263                registry,
264            )
265            .unwrap(),
266            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
267                "validator_service_handle_soft_bundle_certificates_size_bytes",
268                "The size of soft bundle in bytes",
269                iota_metrics::BYTES_BUCKETS.to_vec(),
270                registry,
271            )
272            .unwrap(),
273            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
274                "validator_service_num_rejected_tx_in_epoch_boundary",
275                "Number of rejected transaction during epoch transitioning",
276                registry,
277            )
278            .unwrap(),
279            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
280                "validator_service_num_rejected_cert_in_epoch_boundary",
281                "Number of rejected transaction certificate during epoch transitioning",
282                registry,
283            )
284            .unwrap(),
285            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
286                "validator_service_num_rejected_tx_during_overload",
287                "Number of rejected transaction due to system overload",
288                &["error_type"],
289                registry,
290            )
291            .unwrap(),
292            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
293                "validator_service_num_rejected_cert_during_overload",
294                "Number of rejected transaction certificate due to system overload",
295                &["error_type"],
296                registry,
297            )
298            .unwrap(),
299            connection_ip_not_found: register_int_counter_with_registry!(
300                "validator_service_connection_ip_not_found",
301                "Number of times connection IP was not extractable from request",
302                registry,
303            )
304            .unwrap(),
305            forwarded_header_parse_error: register_int_counter_with_registry!(
306                "validator_service_forwarded_header_parse_error",
307                "Number of times x-forwarded-for header could not be parsed",
308                registry,
309            )
310            .unwrap(),
311            forwarded_header_invalid: register_int_counter_with_registry!(
312                "validator_service_forwarded_header_invalid",
313                "Number of times x-forwarded-for header was invalid",
314                registry,
315            )
316            .unwrap(),
317            forwarded_header_not_included: register_int_counter_with_registry!(
318                "validator_service_forwarded_header_not_included",
319                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
320                registry,
321            )
322            .unwrap(),
323        }
324    }
325
326    /// Creates a new `ValidatorServiceMetrics` for testing.
327    pub fn new_for_tests() -> Self {
328        let registry = Registry::new();
329        Self::new(&registry)
330    }
331}
332
333/// The validator service.
334#[derive(Clone)]
335pub struct ValidatorService {
336    state: Arc<AuthorityState>,
337    consensus_adapter: Arc<ConsensusAdapter>,
338    metrics: Arc<ValidatorServiceMetrics>,
339    traffic_controller: Option<Arc<TrafficController>>,
340    client_id_source: Option<ClientIdSource>,
341}
342
343impl ValidatorService {
344    /// Creates a new `ValidatorService`.
345    pub fn new(
346        state: Arc<AuthorityState>,
347        consensus_adapter: Arc<ConsensusAdapter>,
348        validator_metrics: Arc<ValidatorServiceMetrics>,
349        traffic_controller_metrics: TrafficControllerMetrics,
350        policy_config: Option<PolicyConfig>,
351        firewall_config: Option<RemoteFirewallConfig>,
352    ) -> Self {
353        Self {
354            state,
355            consensus_adapter,
356            metrics: validator_metrics,
357            traffic_controller: policy_config.clone().map(|policy| {
358                Arc::new(TrafficController::spawn(
359                    policy,
360                    traffic_controller_metrics,
361                    firewall_config,
362                ))
363            }),
364            client_id_source: policy_config.map(|policy| policy.client_id_source),
365        }
366    }
367
368    pub fn new_for_tests(
369        state: Arc<AuthorityState>,
370        consensus_adapter: Arc<ConsensusAdapter>,
371        metrics: Arc<ValidatorServiceMetrics>,
372    ) -> Self {
373        Self {
374            state,
375            consensus_adapter,
376            metrics,
377            traffic_controller: None,
378            client_id_source: None,
379        }
380    }
381
382    /// Returns the validator state.
383    pub fn validator_state(&self) -> &Arc<AuthorityState> {
384        &self.state
385    }
386
387    /// Executes a `CertifiedTransaction` for testing.
388    pub async fn execute_certificate_for_testing(
389        &self,
390        cert: CertifiedTransaction,
391    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
392        let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
393        self.handle_certificate_v1(request).await
394    }
395
396    /// Handles a `Transaction` request for benchmarking.
397    pub async fn handle_transaction_for_benchmarking(
398        &self,
399        transaction: Transaction,
400    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
401        let request = make_tonic_request_for_testing(transaction);
402        self.transaction(request).await
403    }
404
405    /// Handles a `Transaction` request.
406    async fn handle_transaction(
407        &self,
408        request: tonic::Request<Transaction>,
409    ) -> WrappedServiceResponse<HandleTransactionResponse> {
410        let Self {
411            state,
412            consensus_adapter,
413            metrics,
414            traffic_controller: _,
415            client_id_source: _,
416        } = self.clone();
417        let transaction = request.into_inner();
418        let epoch_store = state.load_epoch_store_one_call_per_task();
419
420        transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
421
422        // When authority is overloaded and decide to reject this tx, we still lock the
423        // object and ask the client to retry in the future. This is because
424        // without locking, the input objects can be locked by a different tx in
425        // the future, however, the input objects may already be locked by this
426        // tx in other validators. This can cause non of the txes to have enough
427        // quorum to form a certificate, causing the objects to be locked for
428        // the entire epoch. By doing locking but pushback, retrying transaction will
429        // have higher chance to succeed.
430        let mut validator_pushback_error = None;
431        let overload_check_res = state.check_system_overload(
432            &consensus_adapter,
433            transaction.data(),
434            state.check_system_overload_at_signing(),
435        );
436        if let Err(error) = overload_check_res {
437            metrics
438                .num_rejected_tx_during_overload
439                .with_label_values(&[error.as_ref()])
440                .inc();
441            // TODO: consider change the behavior for other types of overload errors.
442            match error {
443                IotaError::ValidatorOverloadedRetryAfter { .. } => {
444                    validator_pushback_error = Some(error)
445                }
446                _ => return Err(error.into()),
447            }
448        }
449
450        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
451
452        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
453        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
454            metrics.signature_errors.inc();
455        })?;
456        drop(tx_verif_metrics_guard);
457
458        let tx_digest = transaction.digest();
459
460        // Enable Trace Propagation across spans/processes using tx_digest
461        let span = error_span!("validator_state_process_tx", ?tx_digest);
462
463        let info = state
464            .handle_transaction(&epoch_store, transaction.clone())
465            .instrument(span)
466            .await
467            .tap_err(|e| {
468                if let IotaError::ValidatorHaltedAtEpochEnd = e {
469                    metrics.num_rejected_tx_in_epoch_boundary.inc();
470                }
471            })?;
472
473        if let Some(error) = validator_pushback_error {
474            // TODO: right now, we still sign the txn, but just don't return it. We can also
475            // skip signing to save more CPU.
476            return Err(error.into());
477        }
478
479        Ok((tonic::Response::new(info), Weight::zero()))
480    }
481
482    // In addition to the response from handling the certificates,
483    // returns a bool indicating whether the request should be tallied
484    // toward spam count. In general, this should be set to true for
485    // requests that are read-only and thus do not consume gas, such
486    // as when the transaction is already executed.
487    async fn handle_certificates(
488        &self,
489        certificates: NonEmpty<CertifiedTransaction>,
490        include_events: bool,
491        include_input_objects: bool,
492        include_output_objects: bool,
493        _include_auxiliary_data: bool,
494        epoch_store: &Arc<AuthorityPerEpochStore>,
495        wait_for_effects: bool,
496    ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
497        // Validate if cert can be executed
498        // Fullnode does not serve handle_certificate call.
499        fp_ensure!(
500            !self.state.is_fullnode(epoch_store),
501            IotaError::FullNodeCantHandleCertificate.into()
502        );
503
504        let shared_object_tx = certificates
505            .iter()
506            .any(|cert| cert.contains_shared_object());
507
508        let metrics = if certificates.len() == 1 {
509            if wait_for_effects {
510                if shared_object_tx {
511                    &self.metrics.handle_certificate_consensus_latency
512                } else {
513                    &self.metrics.handle_certificate_non_consensus_latency
514                }
515            } else {
516                &self.metrics.submit_certificate_consensus_latency
517            }
518        } else {
519            // `soft_bundle_validity_check` ensured that all certificates contain shared
520            // objects.
521            &self
522                .metrics
523                .handle_soft_bundle_certificates_consensus_latency
524        };
525
526        let _metrics_guard = metrics.start_timer();
527
528        // 1) Check if the certificate is already executed. This is only needed when we
529        //    have only one certificate (not a soft bundle). When multiple certificates
530        //    are provided, we will either submit all of them or none of them to
531        //    consensus.
532        if certificates.len() == 1 {
533            let tx_digest = *certificates[0].digest();
534
535            if let Some(signed_effects) = self
536                .state
537                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
538            {
539                let events = if include_events {
540                    if let Some(digest) = signed_effects.events_digest() {
541                        Some(self.state.get_transaction_events(digest)?)
542                    } else {
543                        None
544                    }
545                } else {
546                    None
547                };
548
549                return Ok((
550                    Some(vec![HandleCertificateResponseV1 {
551                        signed_effects: signed_effects.into_inner(),
552                        events,
553                        input_objects: None,
554                        output_objects: None,
555                        auxiliary_data: None,
556                    }]),
557                    Weight::one(),
558                ));
559            };
560        }
561
562        // 2) Verify the certificates.
563        // Check system overload
564        for certificate in &certificates {
565            let overload_check_res = self.state.check_system_overload(
566                &self.consensus_adapter,
567                certificate.data(),
568                self.state.check_system_overload_at_execution(),
569            );
570            if let Err(error) = overload_check_res {
571                self.metrics
572                    .num_rejected_cert_during_overload
573                    .with_label_values(&[error.as_ref()])
574                    .inc();
575                return Err(error.into());
576            }
577        }
578
579        let verified_certificates = {
580            let _timer = self.metrics.cert_verification_latency.start_timer();
581            epoch_store
582                .signature_verifier
583                .multi_verify_certs(certificates.into())
584                .await
585                .into_iter()
586                .collect::<Result<Vec<_>, _>>()?
587        };
588
589        {
590            // code block within reconfiguration lock
591            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
592            if !reconfiguration_lock.should_accept_user_certs() {
593                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
594                return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
595            }
596
597            // 3) All certificates are sent to consensus (at least by some authorities)
598            // For shared objects this will wait until either timeout or we have heard back
599            // from consensus. For owned objects this will return without
600            // waiting for certificate to be sequenced First do quick dirty
601            // non-async check.
602            if !epoch_store
603                .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
604            {
605                let _metrics_guard = if shared_object_tx {
606                    Some(self.metrics.consensus_latency.start_timer())
607                } else {
608                    None
609                };
610                let transactions = verified_certificates
611                    .iter()
612                    .map(|certificate| {
613                        ConsensusTransaction::new_certificate_message(
614                            &self.state.name,
615                            certificate.clone().into(),
616                        )
617                    })
618                    .collect::<Vec<_>>();
619                self.consensus_adapter.submit_batch(
620                    &transactions,
621                    Some(&reconfiguration_lock),
622                    epoch_store,
623                )?;
624                // Do not wait for the result, because the transaction might
625                // have already executed. Instead, check or wait
626                // for the existence of certificate effects below.
627            }
628        }
629
630        if !wait_for_effects {
631            // It is useful to enqueue owned object transaction for execution locally,
632            // even when we are not returning effects to user
633            let certificates_without_shared_objects = verified_certificates
634                .iter()
635                .filter(|certificate| !certificate.contains_shared_object())
636                .cloned()
637                .collect::<Vec<_>>();
638            if !certificates_without_shared_objects.is_empty() {
639                self.state.enqueue_certificates_for_execution(
640                    certificates_without_shared_objects,
641                    epoch_store,
642                );
643            }
644            return Ok((None, Weight::zero()));
645        }
646
647        // 4) Execute the certificates immediately if they contain only owned object
648        //    transactions,
649        // or wait for the execution results if it contains shared objects.
650        let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
651            |certificate| async move {
652                let effects = self
653                    .state
654                    .execute_certificate(&certificate, epoch_store)
655                    .await?;
656                let events = if include_events {
657                    if let Some(digest) = effects.events_digest() {
658                        Some(self.state.get_transaction_events(digest)?)
659                    } else {
660                        None
661                    }
662                } else {
663                    None
664                };
665
666                let input_objects = include_input_objects
667                    .then(|| self.state.get_transaction_input_objects(&effects))
668                    .and_then(Result::ok);
669
670                let output_objects = include_output_objects
671                    .then(|| self.state.get_transaction_output_objects(&effects))
672                    .and_then(Result::ok);
673
674                let signed_effects = self.state.sign_effects(effects, epoch_store)?;
675                epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
676
677                Ok::<_, IotaError>(HandleCertificateResponseV1 {
678                    signed_effects: signed_effects.into_inner(),
679                    events,
680                    input_objects,
681                    output_objects,
682                    auxiliary_data: None, // We don't have any aux data generated presently
683                })
684            },
685        ))
686        .await?;
687
688        Ok((Some(responses), Weight::zero()))
689    }
690}
691
692type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
693
694impl ValidatorService {
695    async fn transaction_impl(
696        &self,
697        request: tonic::Request<Transaction>,
698    ) -> WrappedServiceResponse<HandleTransactionResponse> {
699        self.handle_transaction(request).await
700    }
701
702    async fn submit_certificate_impl(
703        &self,
704        request: tonic::Request<CertifiedTransaction>,
705    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
706        let epoch_store = self.state.load_epoch_store_one_call_per_task();
707        let certificate = request.into_inner();
708        certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
709
710        let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
711        self.handle_certificates(
712            nonempty![certificate],
713            true,
714            false,
715            false,
716            false,
717            &epoch_store,
718            false,
719        )
720        .instrument(span)
721        .await
722        .map(|(executed, spam_weight)| {
723            (
724                tonic::Response::new(SubmitCertificateResponse {
725                    executed: executed.map(|mut x| x.remove(0)),
726                }),
727                spam_weight,
728            )
729        })
730    }
731
732    async fn handle_certificate_v1_impl(
733        &self,
734        request: tonic::Request<HandleCertificateRequestV1>,
735    ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
736        let epoch_store = self.state.load_epoch_store_one_call_per_task();
737        let request = request.into_inner();
738        request
739            .certificate
740            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
741
742        let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
743        self.handle_certificates(
744            nonempty![request.certificate],
745            request.include_events,
746            request.include_input_objects,
747            request.include_output_objects,
748            request.include_auxiliary_data,
749            &epoch_store,
750            true,
751        )
752        .instrument(span)
753        .await
754        .map(|(resp, spam_weight)| {
755            (
756                tonic::Response::new(
757                    resp.expect(
758                        "handle_certificate should not return none with wait_for_effects=true",
759                    )
760                    .remove(0),
761                ),
762                spam_weight,
763            )
764        })
765    }
766
767    async fn soft_bundle_validity_check(
768        &self,
769        certificates: &NonEmpty<CertifiedTransaction>,
770        epoch_store: &Arc<AuthorityPerEpochStore>,
771        total_size_bytes: u64,
772    ) -> Result<(), tonic::Status> {
773        let protocol_config = epoch_store.protocol_config();
774
775        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
776        // - All certs must access at least one shared object.
777        // - All certs must not be already executed.
778        // - All certs must have the same gas price.
779        // - Number of certs must not exceed the max allowed.
780        // - Total size of all certs must not exceed the max allowed.
781        fp_ensure!(
782            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
783            IotaError::UserInput {
784                error: UserInputError::TooManyTransactionsInSoftBundle {
785                    limit: protocol_config.max_soft_bundle_size()
786                }
787            }
788            .into()
789        );
790
791        // We set the soft bundle max size to be half of the consensus max transactions
792        // in block size. We do this to account for serialization overheads and
793        // to ensure that the soft bundle is not too large when is attempted to be
794        // posted via consensus. Although half the block size is on the extreme
795        // side, it's should be good enough for now.
796        let soft_bundle_max_size_bytes =
797            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
798        fp_ensure!(
799            total_size_bytes <= soft_bundle_max_size_bytes,
800            IotaError::UserInput {
801                error: UserInputError::SoftBundleTooLarge {
802                    size: total_size_bytes,
803                    limit: soft_bundle_max_size_bytes,
804                },
805            }
806            .into()
807        );
808
809        let mut gas_price = None;
810        for certificate in certificates {
811            let tx_digest = *certificate.digest();
812            fp_ensure!(
813                certificate.contains_shared_object(),
814                IotaError::UserInput {
815                    error: UserInputError::NoSharedObject { digest: tx_digest }
816                }
817                .into()
818            );
819            fp_ensure!(
820                !self.state.is_tx_already_executed(&tx_digest)?,
821                IotaError::UserInput {
822                    error: UserInputError::AlreadyExecuted { digest: tx_digest }
823                }
824                .into()
825            );
826            if let Some(gas) = gas_price {
827                fp_ensure!(
828                    gas == certificate.gas_price(),
829                    IotaError::UserInput {
830                        error: UserInputError::GasPriceMismatch {
831                            digest: tx_digest,
832                            expected: gas,
833                            actual: certificate.gas_price()
834                        }
835                    }
836                    .into()
837                );
838            } else {
839                gas_price = Some(certificate.gas_price());
840            }
841        }
842
843        // For Soft Bundle, if at this point we know at least one certificate has
844        // already been processed, reject the entire bundle.  Otherwise, submit
845        // all certificates in one request. This is not a strict check as there
846        // may be race conditions where one or more certificates are
847        // already being processed by another actor, and we could not know it.
848        fp_ensure!(
849            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
850            IotaError::UserInput {
851                error: UserInputError::CertificateAlreadyProcessed
852            }
853            .into()
854        );
855
856        Ok(())
857    }
858
859    async fn handle_soft_bundle_certificates_v1_impl(
860        &self,
861        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
862    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
863        let epoch_store = self.state.load_epoch_store_one_call_per_task();
864        let client_addr = if self.client_id_source.is_none() {
865            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
866        } else {
867            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
868        };
869        let request = request.into_inner();
870
871        let certificates =
872            NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
873        let mut total_size_bytes = 0;
874        for certificate in &certificates {
875            // We need to check this first because we haven't verified the cert signature.
876            total_size_bytes += certificate
877                .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
878                as u64;
879        }
880
881        self.metrics
882            .handle_soft_bundle_certificates_count
883            .observe(certificates.len() as f64);
884
885        self.metrics
886            .handle_soft_bundle_certificates_size_bytes
887            .observe(total_size_bytes as f64);
888
889        // Now that individual certificates are valid, we check if the bundle is valid.
890        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
891            .await?;
892
893        info!(
894            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
895            certificates.len(),
896            client_addr
897                .map(|x| x.to_string())
898                .unwrap_or_else(|| "unknown".to_string()),
899            certificates
900                .iter()
901                .map(|x| x.digest().to_string())
902                .collect::<Vec<_>>()
903                .join(", "),
904            total_size_bytes
905        );
906
907        let span = error_span!("handle_soft_bundle_certificates_v1");
908        self.handle_certificates(
909            certificates,
910            request.include_events,
911            request.include_input_objects,
912            request.include_output_objects,
913            request.include_auxiliary_data,
914            &epoch_store,
915            request.wait_for_effects,
916        )
917        .instrument(span)
918        .await
919        .map(|(resp, spam_weight)| {
920            (
921                tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
922                    responses: resp.unwrap_or_default(),
923                }),
924                spam_weight,
925            )
926        })
927    }
928
929    async fn object_info_impl(
930        &self,
931        request: tonic::Request<ObjectInfoRequest>,
932    ) -> WrappedServiceResponse<ObjectInfoResponse> {
933        let request = request.into_inner();
934        let response = self.state.handle_object_info_request(request).await?;
935        Ok((tonic::Response::new(response), Weight::one()))
936    }
937
938    async fn transaction_info_impl(
939        &self,
940        request: tonic::Request<TransactionInfoRequest>,
941    ) -> WrappedServiceResponse<TransactionInfoResponse> {
942        let request = request.into_inner();
943        let response = self.state.handle_transaction_info_request(request).await?;
944        Ok((tonic::Response::new(response), Weight::one()))
945    }
946
947    async fn checkpoint_impl(
948        &self,
949        request: tonic::Request<CheckpointRequest>,
950    ) -> WrappedServiceResponse<CheckpointResponse> {
951        let request = request.into_inner();
952        let response = self.state.handle_checkpoint_request(&request)?;
953        Ok((tonic::Response::new(response), Weight::one()))
954    }
955
956    async fn get_system_state_object_impl(
957        &self,
958        _request: tonic::Request<SystemStateRequest>,
959    ) -> WrappedServiceResponse<IotaSystemState> {
960        let response = self
961            .state
962            .get_object_cache_reader()
963            .get_iota_system_state_object_unsafe()?;
964        Ok((tonic::Response::new(response), Weight::one()))
965    }
966
967    fn get_client_ip_addr<T>(
968        &self,
969        request: &tonic::Request<T>,
970        source: &ClientIdSource,
971    ) -> Option<IpAddr> {
972        match source {
973            ClientIdSource::SocketAddr => {
974                let socket_addr: Option<SocketAddr> = request.remote_addr();
975
976                // We will hit this case if the IO type used does not
977                // implement Connected or when using a unix domain socket.
978                // TODO: once we have confirmed that no legitimate traffic
979                // is hitting this case, we should reject such requests that
980                // hit this case.
981                if let Some(socket_addr) = socket_addr {
982                    Some(socket_addr.ip())
983                } else {
984                    if cfg!(msim) {
985                        // Ignore the error from simtests.
986                    } else if cfg!(test) {
987                        panic!("Failed to get remote address from request");
988                    } else {
989                        self.metrics.connection_ip_not_found.inc();
990                        error!("Failed to get remote address from request");
991                    }
992                    None
993                }
994            }
995            ClientIdSource::XForwardedFor(num_hops) => {
996                let do_header_parse = |op: &MetadataValue<Ascii>| {
997                    match op.to_str() {
998                        Ok(header_val) => {
999                            let header_contents =
1000                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1001                            if *num_hops == 0 {
1002                                error!(
1003                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1004                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1005                                    to this node. Skipping traffic controller request handling.",
1006                                    header_contents,
1007                                );
1008                                return None;
1009                            }
1010                            let contents_len = header_contents.len();
1011                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1012                            else {
1013                                error!(
1014                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1015                                    Expected at least {} values. Skipping traffic controller request handling.",
1016                                    header_contents, contents_len, num_hops, contents_len,
1017                                );
1018                                return None;
1019                            };
1020                            client_ip.parse::<IpAddr>().ok().or_else(|| {
1021                                client_ip.parse::<SocketAddr>().ok().map(|socket_addr| socket_addr.ip()).or_else(|| {
1022                                    self.metrics.forwarded_header_parse_error.inc();
1023                                    error!(
1024                                        "Failed to parse x-forwarded-for header value of {:?} to ip address or socket. \
1025                                        Please ensure that your proxy is configured to resolve client domains to an \
1026                                        IP address before writing header",
1027                                        client_ip,
1028                                    );
1029                                    None
1030                                })
1031                            })
1032                        }
1033                        Err(e) => {
1034                            // TODO: once we have confirmed that no legitimate traffic
1035                            // is hitting this case, we should reject such requests that
1036                            // hit this case.
1037                            self.metrics.forwarded_header_invalid.inc();
1038                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1039                            None
1040                        }
1041                    }
1042                };
1043                if let Some(op) = request.metadata().get("x-forwarded-for") {
1044                    do_header_parse(op)
1045                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1046                    do_header_parse(op)
1047                } else {
1048                    self.metrics.forwarded_header_not_included.inc();
1049                    error!(
1050                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1051                    );
1052                    None
1053                }
1054            }
1055        }
1056    }
1057
1058    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1059        if let Some(traffic_controller) = &self.traffic_controller {
1060            if !traffic_controller.check(&client, &None).await {
1061                // Entity in blocklist
1062                Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1063            } else {
1064                Ok(())
1065            }
1066        } else {
1067            Ok(())
1068        }
1069    }
1070
1071    fn handle_traffic_resp<T>(
1072        &self,
1073        client: Option<IpAddr>,
1074        wrapped_response: WrappedServiceResponse<T>,
1075    ) -> Result<tonic::Response<T>, tonic::Status> {
1076        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1077            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1078            Err(status) => (
1079                Some(IotaError::from(status.clone())),
1080                Weight::zero(),
1081                Err(status.clone()),
1082            ),
1083        };
1084
1085        if let Some(traffic_controller) = self.traffic_controller.clone() {
1086            traffic_controller.tally(TrafficTally {
1087                direct: client,
1088                through_fullnode: None,
1089                error_weight: error.map(normalize).unwrap_or(Weight::zero()),
1090                spam_weight,
1091                timestamp: SystemTime::now(),
1092            })
1093        }
1094        unwrapped_response
1095    }
1096}
1097
1098fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1099    // simulate a TCP connection, which would have added extensions to
1100    // the request object that would be used downstream
1101    let mut request = tonic::Request::new(message);
1102    let tcp_connect_info = TcpConnectInfo {
1103        local_addr: None,
1104        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1105    };
1106    request.extensions_mut().insert(tcp_connect_info);
1107    request
1108}
1109
1110// TODO: refine error matching here
1111fn normalize(err: IotaError) -> Weight {
1112    match err {
1113        IotaError::UserInput { .. }
1114        | IotaError::InvalidSignature { .. }
1115        | IotaError::SignerSignatureAbsent { .. }
1116        | IotaError::SignerSignatureNumberMismatch { .. }
1117        | IotaError::IncorrectSigner { .. }
1118        | IotaError::UnknownSigner { .. }
1119        | IotaError::WrongEpoch { .. } => Weight::one(),
1120        _ => Weight::zero(),
1121    }
1122}
1123
1124/// Implements generic pre- and post-processing. Since this is on the critical
1125/// path, any heavy lifting should be done in a separate non-blocking task
1126/// unless it is necessary to override the return value.
1127#[macro_export]
1128macro_rules! handle_with_decoration {
1129    ($self:ident, $func_name:ident, $request:ident) => {{
1130        if $self.client_id_source.is_none() {
1131            return $self.$func_name($request).await.map(|(result, _)| result);
1132        }
1133
1134        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1135
1136        // check if either IP is blocked, in which case return early
1137        $self.handle_traffic_req(client.clone()).await?;
1138
1139        // handle traffic tallying
1140        let wrapped_response = $self.$func_name($request).await;
1141        $self.handle_traffic_resp(client, wrapped_response)
1142    }};
1143}
1144
1145#[async_trait]
1146impl Validator for ValidatorService {
1147    /// Handles a `Transaction` request.
1148    async fn transaction(
1149        &self,
1150        request: tonic::Request<Transaction>,
1151    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1152        let validator_service = self.clone();
1153
1154        // Spawns a task which handles the transaction. The task will unconditionally
1155        // continue processing in the event that the client connection is
1156        // dropped.
1157        spawn_monitored_task!(async move {
1158            // NB: traffic tally wrapping handled within the task rather than on task exit
1159            // to prevent an attacker from subverting traffic control by severing the
1160            // connection
1161            handle_with_decoration!(validator_service, transaction_impl, request)
1162        })
1163        .await
1164        .unwrap()
1165    }
1166
1167    /// Submits a `CertifiedTransaction` request.
1168    async fn submit_certificate(
1169        &self,
1170        request: tonic::Request<CertifiedTransaction>,
1171    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1172        let validator_service = self.clone();
1173
1174        // Spawns a task which handles the certificate. The task will unconditionally
1175        // continue processing in the event that the client connection is
1176        // dropped.
1177        spawn_monitored_task!(async move {
1178            // NB: traffic tally wrapping handled within the task rather than on task exit
1179            // to prevent an attacker from subverting traffic control by severing the
1180            // connection.
1181            handle_with_decoration!(validator_service, submit_certificate_impl, request)
1182        })
1183        .await
1184        .unwrap()
1185    }
1186
1187    async fn handle_certificate_v1(
1188        &self,
1189        request: tonic::Request<HandleCertificateRequestV1>,
1190    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1191        handle_with_decoration!(self, handle_certificate_v1_impl, request)
1192    }
1193
1194    async fn handle_soft_bundle_certificates_v1(
1195        &self,
1196        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1197    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1198        handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1199    }
1200
1201    /// Handles an `ObjectInfoRequest` request.
1202    async fn object_info(
1203        &self,
1204        request: tonic::Request<ObjectInfoRequest>,
1205    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1206        handle_with_decoration!(self, object_info_impl, request)
1207    }
1208
1209    /// Handles a `TransactionInfoRequest` request.
1210    async fn transaction_info(
1211        &self,
1212        request: tonic::Request<TransactionInfoRequest>,
1213    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1214        handle_with_decoration!(self, transaction_info_impl, request)
1215    }
1216
1217    /// Handles a `CheckpointRequest` request.
1218    async fn checkpoint(
1219        &self,
1220        request: tonic::Request<CheckpointRequest>,
1221    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1222        handle_with_decoration!(self, checkpoint_impl, request)
1223    }
1224
1225    /// Gets the `IotaSystemState` response.
1226    async fn get_system_state_object(
1227        &self,
1228        request: tonic::Request<SystemStateRequest>,
1229    ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1230        handle_with_decoration!(self, get_system_state_object_impl, request)
1231    }
1232}