iota_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    io,
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
16use iota_metrics::{histogram::Histogram as IotaHistogram, spawn_monitored_task};
17use iota_network::{
18    api::{Validator, ValidatorServer},
19    tonic,
20};
21use iota_types::{
22    effects::TransactionEffectsAPI,
23    error::*,
24    fp_ensure,
25    iota_system_state::IotaSystemState,
26    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
27    messages_consensus::ConsensusTransaction,
28    messages_grpc::{
29        HandleCertificateRequestV1, HandleCertificateResponseV1,
30        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
31        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
32        SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
33        TransactionInfoResponse,
34    },
35    multiaddr::Multiaddr,
36    traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
37    transaction::*,
38};
39use nonempty::{NonEmpty, nonempty};
40use prometheus::{
41    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
42    register_int_counter_with_registry,
43};
44use tap::TapFallible;
45use tokio::task::JoinHandle;
46use tonic::{
47    metadata::{Ascii, MetadataValue},
48    transport::server::TcpConnectInfo,
49};
50use tracing::{Instrument, error, error_span, info};
51
52use crate::{
53    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
54    consensus_adapter::{
55        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
56    },
57    mysticeti_adapter::LazyMysticetiClient,
58    traffic_controller::{
59        TrafficController, metrics::TrafficControllerMetrics, policies::TrafficTally,
60    },
61};
62
63#[cfg(test)]
64#[path = "unit_tests/server_tests.rs"]
65mod server_tests;
66
67/// A handle to the authority server.
68pub struct AuthorityServerHandle {
69    tx_cancellation: tokio::sync::oneshot::Sender<()>,
70    local_addr: Multiaddr,
71    handle: JoinHandle<Result<(), tonic::transport::Error>>,
72}
73
74impl AuthorityServerHandle {
75    /// Waits for the server to complete.
76    pub async fn join(self) -> Result<(), io::Error> {
77        // Note that dropping `self.complete` would terminate the server.
78        self.handle.await?.map_err(io::Error::other)?;
79        Ok(())
80    }
81
82    /// Kills the server.
83    pub async fn kill(self) -> Result<(), io::Error> {
84        self.tx_cancellation
85            .send(())
86            .map_err(|_e| io::Error::other("could not send cancellation signal!"))?;
87        self.handle.await?.map_err(io::Error::other)?;
88        Ok(())
89    }
90
91    /// Returns the address of the server.
92    pub fn address(&self) -> &Multiaddr {
93        &self.local_addr
94    }
95}
96
97/// An authority server that is used for testing.
98pub struct AuthorityServer {
99    address: Multiaddr,
100    pub state: Arc<AuthorityState>,
101    consensus_adapter: Arc<ConsensusAdapter>,
102    pub metrics: Arc<ValidatorServiceMetrics>,
103}
104
105impl AuthorityServer {
106    /// Creates a new `AuthorityServer` for testing with a consensus adapter.
107    pub fn new_for_test_with_consensus_adapter(
108        state: Arc<AuthorityState>,
109        consensus_adapter: Arc<ConsensusAdapter>,
110    ) -> Self {
111        let address = new_local_tcp_address_for_testing();
112        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
113
114        Self {
115            address,
116            state,
117            consensus_adapter,
118            metrics,
119        }
120    }
121
122    /// Creates a new `AuthorityServer` for testing.
123    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
124        let consensus_adapter = Arc::new(ConsensusAdapter::new(
125            Arc::new(LazyMysticetiClient::new()),
126            state.name,
127            Arc::new(ConnectionMonitorStatusForTests {}),
128            100_000,
129            100_000,
130            None,
131            None,
132            ConsensusAdapterMetrics::new_test(),
133        ));
134        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
135    }
136
137    /// Spawns the server.
138    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
139        let address = self.address.clone();
140        self.spawn_with_bind_address_for_test(address).await
141    }
142
143    /// Spawns the server with a bind address.
144    pub async fn spawn_with_bind_address_for_test(
145        self,
146        address: Multiaddr,
147    ) -> Result<AuthorityServerHandle, io::Error> {
148        let mut server = iota_network_stack::config::Config::new()
149            .server_builder()
150            .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
151                self.state,
152                self.consensus_adapter,
153                self.metrics,
154            )))
155            .bind(&address)
156            .await
157            .unwrap();
158        let local_addr = server.local_addr().to_owned();
159        info!("Listening to traffic on {local_addr}");
160        let handle = AuthorityServerHandle {
161            tx_cancellation: server.take_cancel_handle().unwrap(),
162            local_addr,
163            handle: spawn_monitored_task!(server.serve()),
164        };
165        Ok(handle)
166    }
167}
168
169/// Metrics for the validator service.
170pub struct ValidatorServiceMetrics {
171    pub signature_errors: IntCounter,
172    pub tx_verification_latency: IotaHistogram,
173    pub cert_verification_latency: IotaHistogram,
174    pub consensus_latency: IotaHistogram,
175    pub handle_transaction_latency: IotaHistogram,
176    pub submit_certificate_consensus_latency: IotaHistogram,
177    pub handle_certificate_consensus_latency: IotaHistogram,
178    pub handle_certificate_non_consensus_latency: IotaHistogram,
179    pub handle_soft_bundle_certificates_consensus_latency: IotaHistogram,
180    pub handle_soft_bundle_certificates_count: IotaHistogram,
181    pub handle_soft_bundle_certificates_size_bytes: IotaHistogram,
182
183    num_rejected_tx_in_epoch_boundary: IntCounter,
184    num_rejected_cert_in_epoch_boundary: IntCounter,
185    num_rejected_tx_during_overload: IntCounterVec,
186    num_rejected_cert_during_overload: IntCounterVec,
187    connection_ip_not_found: IntCounter,
188    forwarded_header_parse_error: IntCounter,
189    forwarded_header_invalid: IntCounter,
190    forwarded_header_not_included: IntCounter,
191}
192
193impl ValidatorServiceMetrics {
194    /// Creates a new `ValidatorServiceMetrics` with Prometheus registry.
195    pub fn new(registry: &Registry) -> Self {
196        Self {
197            signature_errors: register_int_counter_with_registry!(
198                "total_signature_errors",
199                "Number of transaction signature errors",
200                registry,
201            )
202            .unwrap(),
203            tx_verification_latency: IotaHistogram::new_in_registry(
204                "validator_service_tx_verification_latency",
205                "Latency of verifying a transaction",
206                registry,
207            ),
208            cert_verification_latency: IotaHistogram::new_in_registry(
209                "validator_service_cert_verification_latency",
210                "Latency of verifying a certificate",
211                registry,
212            ),
213            consensus_latency: IotaHistogram::new_in_registry(
214                "validator_service_consensus_latency",
215                "Time spent between submitting a shared obj txn to consensus and getting result",
216                registry,
217            ),
218            handle_transaction_latency: IotaHistogram::new_in_registry(
219                "validator_service_handle_transaction_latency",
220                "Latency of handling a transaction",
221                registry,
222            ),
223            handle_certificate_consensus_latency: IotaHistogram::new_in_registry(
224                "validator_service_handle_certificate_consensus_latency",
225                "Latency of handling a consensus transaction certificate",
226                registry,
227            ),
228            submit_certificate_consensus_latency: IotaHistogram::new_in_registry(
229                "validator_service_submit_certificate_consensus_latency",
230                "Latency of submit_certificate RPC handler",
231                registry,
232            ),
233            handle_certificate_non_consensus_latency: IotaHistogram::new_in_registry(
234                "validator_service_handle_certificate_non_consensus_latency",
235                "Latency of handling a non-consensus transaction certificate",
236                registry,
237            ),
238            handle_soft_bundle_certificates_consensus_latency: IotaHistogram::new_in_registry(
239                "validator_service_handle_soft_bundle_certificates_consensus_latency",
240                "Latency of handling a consensus soft bundle",
241                registry,
242            ),
243            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
244                "validator_service_num_rejected_tx_in_epoch_boundary",
245                "Number of rejected transaction during epoch transitioning",
246                registry,
247            )
248            .unwrap(),
249            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
250                "validator_service_num_rejected_cert_in_epoch_boundary",
251                "Number of rejected transaction certificate during epoch transitioning",
252                registry,
253            )
254            .unwrap(),
255            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
256                "validator_service_num_rejected_tx_during_overload",
257                "Number of rejected transaction due to system overload",
258                &["error_type"],
259                registry,
260            )
261            .unwrap(),
262            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
263                "validator_service_num_rejected_cert_during_overload",
264                "Number of rejected transaction certificate due to system overload",
265                &["error_type"],
266                registry,
267            )
268            .unwrap(),
269            handle_soft_bundle_certificates_count: IotaHistogram::new_in_registry(
270                "handle_soft_bundle_certificates_count",
271                "The number of certificates included in a soft bundle",
272                registry,
273            ),
274            handle_soft_bundle_certificates_size_bytes: IotaHistogram::new_in_registry(
275                "handle_soft_bundle_certificates_size_bytes",
276                "The size of soft bundle in bytes",
277                registry,
278            ),
279            connection_ip_not_found: register_int_counter_with_registry!(
280                "validator_service_connection_ip_not_found",
281                "Number of times connection IP was not extractable from request",
282                registry,
283            )
284            .unwrap(),
285            forwarded_header_parse_error: register_int_counter_with_registry!(
286                "validator_service_forwarded_header_parse_error",
287                "Number of times x-forwarded-for header could not be parsed",
288                registry,
289            )
290            .unwrap(),
291            forwarded_header_invalid: register_int_counter_with_registry!(
292                "validator_service_forwarded_header_invalid",
293                "Number of times x-forwarded-for header was invalid",
294                registry,
295            )
296            .unwrap(),
297            forwarded_header_not_included: register_int_counter_with_registry!(
298                "validator_service_forwarded_header_not_included",
299                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
300                registry,
301            )
302            .unwrap(),
303        }
304    }
305
306    /// Creates a new `ValidatorServiceMetrics` for testing.
307    pub fn new_for_tests() -> Self {
308        let registry = Registry::new();
309        Self::new(&registry)
310    }
311}
312
313/// The validator service.
314#[derive(Clone)]
315pub struct ValidatorService {
316    state: Arc<AuthorityState>,
317    consensus_adapter: Arc<ConsensusAdapter>,
318    metrics: Arc<ValidatorServiceMetrics>,
319    traffic_controller: Option<Arc<TrafficController>>,
320    client_id_source: Option<ClientIdSource>,
321}
322
323impl ValidatorService {
324    /// Creates a new `ValidatorService`.
325    pub fn new(
326        state: Arc<AuthorityState>,
327        consensus_adapter: Arc<ConsensusAdapter>,
328        validator_metrics: Arc<ValidatorServiceMetrics>,
329        traffic_controller_metrics: TrafficControllerMetrics,
330        policy_config: Option<PolicyConfig>,
331        firewall_config: Option<RemoteFirewallConfig>,
332    ) -> Self {
333        Self {
334            state,
335            consensus_adapter,
336            metrics: validator_metrics,
337            traffic_controller: policy_config.clone().map(|policy| {
338                Arc::new(TrafficController::spawn(
339                    policy,
340                    traffic_controller_metrics,
341                    firewall_config,
342                ))
343            }),
344            client_id_source: policy_config.map(|policy| policy.client_id_source),
345        }
346    }
347
348    pub fn new_for_tests(
349        state: Arc<AuthorityState>,
350        consensus_adapter: Arc<ConsensusAdapter>,
351        metrics: Arc<ValidatorServiceMetrics>,
352    ) -> Self {
353        Self {
354            state,
355            consensus_adapter,
356            metrics,
357            traffic_controller: None,
358            client_id_source: None,
359        }
360    }
361
362    /// Returns the validator state.
363    pub fn validator_state(&self) -> &Arc<AuthorityState> {
364        &self.state
365    }
366
367    /// Executes a `CertifiedTransaction` for testing.
368    pub async fn execute_certificate_for_testing(
369        &self,
370        cert: CertifiedTransaction,
371    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
372        let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
373        self.handle_certificate_v1(request).await
374    }
375
376    /// Handles a `Transaction` request for benchmarking.
377    pub async fn handle_transaction_for_benchmarking(
378        &self,
379        transaction: Transaction,
380    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
381        let request = make_tonic_request_for_testing(transaction);
382        self.transaction(request).await
383    }
384
385    /// Handles a `Transaction` request.
386    async fn handle_transaction(
387        &self,
388        request: tonic::Request<Transaction>,
389    ) -> WrappedServiceResponse<HandleTransactionResponse> {
390        let Self {
391            state,
392            consensus_adapter,
393            metrics,
394            traffic_controller: _,
395            client_id_source: _,
396        } = self.clone();
397        let transaction = request.into_inner();
398        let epoch_store = state.load_epoch_store_one_call_per_task();
399
400        transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
401
402        // When authority is overloaded and decide to reject this tx, we still lock the
403        // object and ask the client to retry in the future. This is because
404        // without locking, the input objects can be locked by a different tx in
405        // the future, however, the input objects may already be locked by this
406        // tx in other validators. This can cause non of the txes to have enough
407        // quorum to form a certificate, causing the objects to be locked for
408        // the entire epoch. By doing locking but pushback, retrying transaction will
409        // have higher chance to succeed.
410        let mut validator_pushback_error = None;
411        let overload_check_res = state.check_system_overload(
412            &consensus_adapter,
413            transaction.data(),
414            state.check_system_overload_at_signing(),
415        );
416        if let Err(error) = overload_check_res {
417            metrics
418                .num_rejected_tx_during_overload
419                .with_label_values(&[error.as_ref()])
420                .inc();
421            // TODO: consider change the behavior for other types of overload errors.
422            match error {
423                IotaError::ValidatorOverloadedRetryAfter { .. } => {
424                    validator_pushback_error = Some(error)
425                }
426                _ => return Err(error.into()),
427            }
428        }
429
430        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
431
432        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
433        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
434            metrics.signature_errors.inc();
435        })?;
436        drop(tx_verif_metrics_guard);
437
438        let tx_digest = transaction.digest();
439
440        // Enable Trace Propagation across spans/processes using tx_digest
441        let span = error_span!("validator_state_process_tx", ?tx_digest);
442
443        let info = state
444            .handle_transaction(&epoch_store, transaction.clone())
445            .instrument(span)
446            .await
447            .tap_err(|e| {
448                if let IotaError::ValidatorHaltedAtEpochEnd = e {
449                    metrics.num_rejected_tx_in_epoch_boundary.inc();
450                }
451            })?;
452
453        if let Some(error) = validator_pushback_error {
454            // TODO: right now, we still sign the txn, but just don't return it. We can also
455            // skip signing to save more CPU.
456            return Err(error.into());
457        }
458
459        Ok((tonic::Response::new(info), Weight::zero()))
460    }
461
462    // In addition to the response from handling the certificates,
463    // returns a bool indicating whether the request should be tallied
464    // toward spam count. In general, this should be set to true for
465    // requests that are read-only and thus do not consume gas, such
466    // as when the transaction is already executed.
467    async fn handle_certificates(
468        &self,
469        certificates: NonEmpty<CertifiedTransaction>,
470        include_events: bool,
471        include_input_objects: bool,
472        include_output_objects: bool,
473        _include_auxiliary_data: bool,
474        epoch_store: &Arc<AuthorityPerEpochStore>,
475        wait_for_effects: bool,
476    ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
477        // Validate if cert can be executed
478        // Fullnode does not serve handle_certificate call.
479        fp_ensure!(
480            !self.state.is_fullnode(epoch_store),
481            IotaError::FullNodeCantHandleCertificate.into()
482        );
483
484        let shared_object_tx = certificates
485            .iter()
486            .any(|cert| cert.contains_shared_object());
487
488        let metrics = if certificates.len() == 1 {
489            if wait_for_effects {
490                if shared_object_tx {
491                    &self.metrics.handle_certificate_consensus_latency
492                } else {
493                    &self.metrics.handle_certificate_non_consensus_latency
494                }
495            } else {
496                &self.metrics.submit_certificate_consensus_latency
497            }
498        } else {
499            // `soft_bundle_validity_check` ensured that all certificates contain shared
500            // objects.
501            &self
502                .metrics
503                .handle_soft_bundle_certificates_consensus_latency
504        };
505
506        let _metrics_guard = metrics.start_timer();
507
508        // 1) Check if the certificate is already executed. This is only needed when we
509        //    have only one certificate (not a soft bundle). When multiple certificates
510        //    are provided, we will either submit all of them or none of them to
511        //    consensus.
512        if certificates.len() == 1 {
513            let tx_digest = *certificates[0].digest();
514
515            if let Some(signed_effects) = self
516                .state
517                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
518            {
519                let events = if include_events {
520                    if let Some(digest) = signed_effects.events_digest() {
521                        Some(self.state.get_transaction_events(digest)?)
522                    } else {
523                        None
524                    }
525                } else {
526                    None
527                };
528
529                return Ok((
530                    Some(vec![HandleCertificateResponseV1 {
531                        signed_effects: signed_effects.into_inner(),
532                        events,
533                        input_objects: None,
534                        output_objects: None,
535                        auxiliary_data: None,
536                    }]),
537                    Weight::one(),
538                ));
539            };
540        }
541
542        // 2) Verify the certificates.
543        // Check system overload
544        for certificate in &certificates {
545            let overload_check_res = self.state.check_system_overload(
546                &self.consensus_adapter,
547                certificate.data(),
548                self.state.check_system_overload_at_execution(),
549            );
550            if let Err(error) = overload_check_res {
551                self.metrics
552                    .num_rejected_cert_during_overload
553                    .with_label_values(&[error.as_ref()])
554                    .inc();
555                return Err(error.into());
556            }
557        }
558
559        let verified_certificates = {
560            let _timer = self.metrics.cert_verification_latency.start_timer();
561            epoch_store
562                .signature_verifier
563                .multi_verify_certs(certificates.into())
564                .await
565                .into_iter()
566                .collect::<Result<Vec<_>, _>>()?
567        };
568
569        {
570            // code block within reconfiguration lock
571            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
572            if !reconfiguration_lock.should_accept_user_certs() {
573                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
574                return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
575            }
576
577            // 3) All certificates are sent to consensus (at least by some authorities)
578            // For shared objects this will wait until either timeout or we have heard back
579            // from consensus. For owned objects this will return without
580            // waiting for certificate to be sequenced First do quick dirty
581            // non-async check.
582            if !epoch_store
583                .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
584            {
585                let _metrics_guard = if shared_object_tx {
586                    Some(self.metrics.consensus_latency.start_timer())
587                } else {
588                    None
589                };
590                let transactions = verified_certificates
591                    .iter()
592                    .map(|certificate| {
593                        ConsensusTransaction::new_certificate_message(
594                            &self.state.name,
595                            certificate.clone().into(),
596                        )
597                    })
598                    .collect::<Vec<_>>();
599                self.consensus_adapter.submit_batch(
600                    &transactions,
601                    Some(&reconfiguration_lock),
602                    epoch_store,
603                )?;
604                // Do not wait for the result, because the transaction might
605                // have already executed. Instead, check or wait
606                // for the existence of certificate effects below.
607            }
608        }
609
610        if !wait_for_effects {
611            // It is useful to enqueue owned object transaction for execution locally,
612            // even when we are not returning effects to user
613            let certificates_without_shared_objects = verified_certificates
614                .iter()
615                .filter(|certificate| !certificate.contains_shared_object())
616                .cloned()
617                .collect::<Vec<_>>();
618            if !certificates_without_shared_objects.is_empty() {
619                self.state.enqueue_certificates_for_execution(
620                    certificates_without_shared_objects,
621                    epoch_store,
622                );
623            }
624            return Ok((None, Weight::zero()));
625        }
626
627        // 4) Execute the certificates immediately if they contain only owned object
628        //    transactions,
629        // or wait for the execution results if it contains shared objects.
630        let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
631            |certificate| async move {
632                let effects = self
633                    .state
634                    .execute_certificate(&certificate, epoch_store)
635                    .await?;
636                let events = if include_events {
637                    if let Some(digest) = effects.events_digest() {
638                        Some(self.state.get_transaction_events(digest)?)
639                    } else {
640                        None
641                    }
642                } else {
643                    None
644                };
645
646                let input_objects = include_input_objects
647                    .then(|| self.state.get_transaction_input_objects(&effects))
648                    .and_then(Result::ok);
649
650                let output_objects = include_output_objects
651                    .then(|| self.state.get_transaction_output_objects(&effects))
652                    .and_then(Result::ok);
653
654                let signed_effects = self.state.sign_effects(effects, epoch_store)?;
655                epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
656
657                Ok::<_, IotaError>(HandleCertificateResponseV1 {
658                    signed_effects: signed_effects.into_inner(),
659                    events,
660                    input_objects,
661                    output_objects,
662                    auxiliary_data: None, // We don't have any aux data generated presently
663                })
664            },
665        ))
666        .await?;
667
668        Ok((Some(responses), Weight::zero()))
669    }
670}
671
672type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
673
674impl ValidatorService {
675    async fn transaction_impl(
676        &self,
677        request: tonic::Request<Transaction>,
678    ) -> WrappedServiceResponse<HandleTransactionResponse> {
679        self.handle_transaction(request).await
680    }
681
682    async fn submit_certificate_impl(
683        &self,
684        request: tonic::Request<CertifiedTransaction>,
685    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
686        let epoch_store = self.state.load_epoch_store_one_call_per_task();
687        let certificate = request.into_inner();
688        certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
689
690        let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
691        self.handle_certificates(
692            nonempty![certificate],
693            true,
694            false,
695            false,
696            false,
697            &epoch_store,
698            false,
699        )
700        .instrument(span)
701        .await
702        .map(|(executed, spam_weight)| {
703            (
704                tonic::Response::new(SubmitCertificateResponse {
705                    executed: executed.map(|mut x| x.remove(0)),
706                }),
707                spam_weight,
708            )
709        })
710    }
711
712    async fn handle_certificate_v1_impl(
713        &self,
714        request: tonic::Request<HandleCertificateRequestV1>,
715    ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
716        let epoch_store = self.state.load_epoch_store_one_call_per_task();
717        let request = request.into_inner();
718        request
719            .certificate
720            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
721
722        let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
723        self.handle_certificates(
724            nonempty![request.certificate],
725            request.include_events,
726            request.include_input_objects,
727            request.include_output_objects,
728            request.include_auxiliary_data,
729            &epoch_store,
730            true,
731        )
732        .instrument(span)
733        .await
734        .map(|(resp, spam_weight)| {
735            (
736                tonic::Response::new(
737                    resp.expect(
738                        "handle_certificate should not return none with wait_for_effects=true",
739                    )
740                    .remove(0),
741                ),
742                spam_weight,
743            )
744        })
745    }
746
747    async fn soft_bundle_validity_check(
748        &self,
749        certificates: &NonEmpty<CertifiedTransaction>,
750        epoch_store: &Arc<AuthorityPerEpochStore>,
751    ) -> Result<(), tonic::Status> {
752        let protocol_config = epoch_store.protocol_config();
753
754        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
755        // - All certs must access at least one shared object.
756        // - All certs must not be already executed.
757        // - All certs must have the same gas price.
758        // - Number of certs must not exceed the max allowed.
759        fp_ensure!(
760            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
761            IotaError::UserInput {
762                error: UserInputError::TooManyTransactionsInSoftBundle {
763                    limit: protocol_config.max_soft_bundle_size()
764                }
765            }
766            .into()
767        );
768        let mut gas_price = None;
769        for certificate in certificates {
770            let tx_digest = *certificate.digest();
771            fp_ensure!(
772                certificate.contains_shared_object(),
773                IotaError::UserInput {
774                    error: UserInputError::NoSharedObject { digest: tx_digest }
775                }
776                .into()
777            );
778            fp_ensure!(
779                !self.state.is_tx_already_executed(&tx_digest)?,
780                IotaError::UserInput {
781                    error: UserInputError::AlreadyExecuted { digest: tx_digest }
782                }
783                .into()
784            );
785            if let Some(gas) = gas_price {
786                fp_ensure!(
787                    gas == certificate.gas_price(),
788                    IotaError::UserInput {
789                        error: UserInputError::GasPriceMismatch {
790                            digest: tx_digest,
791                            expected: gas,
792                            actual: certificate.gas_price()
793                        }
794                    }
795                    .into()
796                );
797            } else {
798                gas_price = Some(certificate.gas_price());
799            }
800        }
801
802        // For Soft Bundle, if at this point we know at least one certificate has
803        // already been processed, reject the entire bundle.  Otherwise, submit
804        // all certificates in one request. This is not a strict check as there
805        // may be race conditions where one or more certificates are
806        // already being processed by another actor, and we could not know it.
807        fp_ensure!(
808            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
809            IotaError::UserInput {
810                error: UserInputError::CertificateAlreadyProcessed
811            }
812            .into()
813        );
814
815        Ok(())
816    }
817
818    async fn handle_soft_bundle_certificates_v1_impl(
819        &self,
820        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
821    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
822        let epoch_store = self.state.load_epoch_store_one_call_per_task();
823        let client_addr = if self.client_id_source.is_none() {
824            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
825        } else {
826            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
827        };
828        let request = request.into_inner();
829
830        let certificates =
831            NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
832        let mut total_size_bytes = 0;
833        for certificate in &certificates {
834            // We need to check this first because we haven't verified the cert signature.
835            total_size_bytes +=
836                certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
837        }
838
839        self.metrics
840            .handle_soft_bundle_certificates_count
841            .observe(certificates.len() as u64);
842
843        self.metrics
844            .handle_soft_bundle_certificates_size_bytes
845            .observe(total_size_bytes as u64);
846
847        // Now that individual certificates are valid, we check if the bundle is valid.
848        self.soft_bundle_validity_check(&certificates, &epoch_store)
849            .await?;
850
851        info!(
852            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}]",
853            certificates.len(),
854            client_addr
855                .map(|x| x.to_string())
856                .unwrap_or_else(|| "unknown".to_string()),
857            certificates
858                .iter()
859                .map(|x| x.digest().to_string())
860                .collect::<Vec<_>>()
861                .join(", ")
862        );
863
864        let span = error_span!("handle_soft_bundle_certificates_v1");
865        self.handle_certificates(
866            certificates,
867            request.include_events,
868            request.include_input_objects,
869            request.include_output_objects,
870            request.include_auxiliary_data,
871            &epoch_store,
872            request.wait_for_effects,
873        )
874        .instrument(span)
875        .await
876        .map(|(resp, spam_weight)| {
877            (
878                tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
879                    responses: resp.unwrap_or_default(),
880                }),
881                spam_weight,
882            )
883        })
884    }
885
886    async fn object_info_impl(
887        &self,
888        request: tonic::Request<ObjectInfoRequest>,
889    ) -> WrappedServiceResponse<ObjectInfoResponse> {
890        let request = request.into_inner();
891        let response = self.state.handle_object_info_request(request).await?;
892        Ok((tonic::Response::new(response), Weight::one()))
893    }
894
895    async fn transaction_info_impl(
896        &self,
897        request: tonic::Request<TransactionInfoRequest>,
898    ) -> WrappedServiceResponse<TransactionInfoResponse> {
899        let request = request.into_inner();
900        let response = self.state.handle_transaction_info_request(request).await?;
901        Ok((tonic::Response::new(response), Weight::one()))
902    }
903
904    async fn checkpoint_impl(
905        &self,
906        request: tonic::Request<CheckpointRequest>,
907    ) -> WrappedServiceResponse<CheckpointResponse> {
908        let request = request.into_inner();
909        let response = self.state.handle_checkpoint_request(&request)?;
910        Ok((tonic::Response::new(response), Weight::one()))
911    }
912
913    async fn get_system_state_object_impl(
914        &self,
915        _request: tonic::Request<SystemStateRequest>,
916    ) -> WrappedServiceResponse<IotaSystemState> {
917        let response = self
918            .state
919            .get_object_cache_reader()
920            .get_iota_system_state_object_unsafe()?;
921        Ok((tonic::Response::new(response), Weight::one()))
922    }
923
924    fn get_client_ip_addr<T>(
925        &self,
926        request: &tonic::Request<T>,
927        source: &ClientIdSource,
928    ) -> Option<IpAddr> {
929        match source {
930            ClientIdSource::SocketAddr => {
931                let socket_addr: Option<SocketAddr> = request.remote_addr();
932
933                // We will hit this case if the IO type used does not
934                // implement Connected or when using a unix domain socket.
935                // TODO: once we have confirmed that no legitimate traffic
936                // is hitting this case, we should reject such requests that
937                // hit this case.
938                if let Some(socket_addr) = socket_addr {
939                    Some(socket_addr.ip())
940                } else {
941                    if cfg!(msim) {
942                        // Ignore the error from simtests.
943                    } else if cfg!(test) {
944                        panic!("Failed to get remote address from request");
945                    } else {
946                        self.metrics.connection_ip_not_found.inc();
947                        error!("Failed to get remote address from request");
948                    }
949                    None
950                }
951            }
952            ClientIdSource::XForwardedFor(num_hops) => {
953                let do_header_parse = |op: &MetadataValue<Ascii>| {
954                    match op.to_str() {
955                        Ok(header_val) => {
956                            let header_contents =
957                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
958                            if *num_hops == 0 {
959                                error!(
960                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
961                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
962                                    to this node. Skipping traffic controller request handling.",
963                                    header_contents,
964                                );
965                                return None;
966                            }
967                            let contents_len = header_contents.len();
968                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
969                            else {
970                                error!(
971                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
972                                    Expected at least {} values. Skipping traffic controller request handling.",
973                                    header_contents, contents_len, num_hops, contents_len,
974                                );
975                                return None;
976                            };
977                            client_ip.parse::<IpAddr>().ok().or_else(|| {
978                                client_ip.parse::<SocketAddr>().ok().map(|socket_addr| socket_addr.ip()).or_else(|| {
979                                    self.metrics.forwarded_header_parse_error.inc();
980                                    error!(
981                                        "Failed to parse x-forwarded-for header value of {:?} to ip address or socket. \
982                                        Please ensure that your proxy is configured to resolve client domains to an \
983                                        IP address before writing header",
984                                        client_ip,
985                                    );
986                                    None
987                                })
988                            })
989                        }
990                        Err(e) => {
991                            // TODO: once we have confirmed that no legitimate traffic
992                            // is hitting this case, we should reject such requests that
993                            // hit this case.
994                            self.metrics.forwarded_header_invalid.inc();
995                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
996                            None
997                        }
998                    }
999                };
1000                if let Some(op) = request.metadata().get("x-forwarded-for") {
1001                    do_header_parse(op)
1002                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1003                    do_header_parse(op)
1004                } else {
1005                    self.metrics.forwarded_header_not_included.inc();
1006                    error!(
1007                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1008                    );
1009                    None
1010                }
1011            }
1012        }
1013    }
1014
1015    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1016        if let Some(traffic_controller) = &self.traffic_controller {
1017            if !traffic_controller.check(&client, &None).await {
1018                // Entity in blocklist
1019                Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1020            } else {
1021                Ok(())
1022            }
1023        } else {
1024            Ok(())
1025        }
1026    }
1027
1028    fn handle_traffic_resp<T>(
1029        &self,
1030        client: Option<IpAddr>,
1031        wrapped_response: WrappedServiceResponse<T>,
1032    ) -> Result<tonic::Response<T>, tonic::Status> {
1033        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1034            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1035            Err(status) => (
1036                Some(IotaError::from(status.clone())),
1037                Weight::zero(),
1038                Err(status.clone()),
1039            ),
1040        };
1041
1042        if let Some(traffic_controller) = self.traffic_controller.clone() {
1043            traffic_controller.tally(TrafficTally {
1044                direct: client,
1045                through_fullnode: None,
1046                error_weight: error.map(normalize).unwrap_or(Weight::zero()),
1047                spam_weight,
1048                timestamp: SystemTime::now(),
1049            })
1050        }
1051        unwrapped_response
1052    }
1053}
1054
1055fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1056    // simulate a TCP connection, which would have added extensions to
1057    // the request object that would be used downstream
1058    let mut request = tonic::Request::new(message);
1059    let tcp_connect_info = TcpConnectInfo {
1060        local_addr: None,
1061        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1062    };
1063    request.extensions_mut().insert(tcp_connect_info);
1064    request
1065}
1066
1067// TODO: refine error matching here
1068fn normalize(err: IotaError) -> Weight {
1069    match err {
1070        IotaError::UserInput { .. }
1071        | IotaError::InvalidSignature { .. }
1072        | IotaError::SignerSignatureAbsent { .. }
1073        | IotaError::SignerSignatureNumberMismatch { .. }
1074        | IotaError::IncorrectSigner { .. }
1075        | IotaError::UnknownSigner { .. }
1076        | IotaError::WrongEpoch { .. } => Weight::one(),
1077        _ => Weight::zero(),
1078    }
1079}
1080
1081/// Implements generic pre- and post-processing. Since this is on the critical
1082/// path, any heavy lifting should be done in a separate non-blocking task
1083/// unless it is necessary to override the return value.
1084#[macro_export]
1085macro_rules! handle_with_decoration {
1086    ($self:ident, $func_name:ident, $request:ident) => {{
1087        if $self.client_id_source.is_none() {
1088            return $self.$func_name($request).await.map(|(result, _)| result);
1089        }
1090
1091        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1092
1093        // check if either IP is blocked, in which case return early
1094        $self.handle_traffic_req(client.clone()).await?;
1095
1096        // handle traffic tallying
1097        let wrapped_response = $self.$func_name($request).await;
1098        $self.handle_traffic_resp(client, wrapped_response)
1099    }};
1100}
1101
1102#[async_trait]
1103impl Validator for ValidatorService {
1104    /// Handles a `Transaction` request.
1105    async fn transaction(
1106        &self,
1107        request: tonic::Request<Transaction>,
1108    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1109        let validator_service = self.clone();
1110
1111        // Spawns a task which handles the transaction. The task will unconditionally
1112        // continue processing in the event that the client connection is
1113        // dropped.
1114        spawn_monitored_task!(async move {
1115            // NB: traffic tally wrapping handled within the task rather than on task exit
1116            // to prevent an attacker from subverting traffic control by severing the
1117            // connection
1118            handle_with_decoration!(validator_service, transaction_impl, request)
1119        })
1120        .await
1121        .unwrap()
1122    }
1123
1124    /// Submits a `CertifiedTransaction` request.
1125    async fn submit_certificate(
1126        &self,
1127        request: tonic::Request<CertifiedTransaction>,
1128    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1129        let validator_service = self.clone();
1130
1131        // Spawns a task which handles the certificate. The task will unconditionally
1132        // continue processing in the event that the client connection is
1133        // dropped.
1134        spawn_monitored_task!(async move {
1135            // NB: traffic tally wrapping handled within the task rather than on task exit
1136            // to prevent an attacker from subverting traffic control by severing the
1137            // connection.
1138            handle_with_decoration!(validator_service, submit_certificate_impl, request)
1139        })
1140        .await
1141        .unwrap()
1142    }
1143
1144    async fn handle_certificate_v1(
1145        &self,
1146        request: tonic::Request<HandleCertificateRequestV1>,
1147    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1148        handle_with_decoration!(self, handle_certificate_v1_impl, request)
1149    }
1150
1151    async fn handle_soft_bundle_certificates_v1(
1152        &self,
1153        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1154    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1155        handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1156    }
1157
1158    /// Handles an `ObjectInfoRequest` request.
1159    async fn object_info(
1160        &self,
1161        request: tonic::Request<ObjectInfoRequest>,
1162    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1163        handle_with_decoration!(self, object_info_impl, request)
1164    }
1165
1166    /// Handles a `TransactionInfoRequest` request.
1167    async fn transaction_info(
1168        &self,
1169        request: tonic::Request<TransactionInfoRequest>,
1170    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1171        handle_with_decoration!(self, transaction_info_impl, request)
1172    }
1173
1174    /// Handles a `CheckpointRequest` request.
1175    async fn checkpoint(
1176        &self,
1177        request: tonic::Request<CheckpointRequest>,
1178    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1179        handle_with_decoration!(self, checkpoint_impl, request)
1180    }
1181
1182    /// Gets the `IotaSystemState` response.
1183    async fn get_system_state_object(
1184        &self,
1185        request: tonic::Request<SystemStateRequest>,
1186    ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1187        handle_with_decoration!(self, get_system_state_object_impl, request)
1188    }
1189}