1use std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
16use iota_metrics::{histogram::Histogram as IotaHistogram, spawn_monitored_task};
17use iota_network::{
18 api::{Validator, ValidatorServer},
19 tonic,
20};
21use iota_types::{
22 effects::TransactionEffectsAPI,
23 error::*,
24 fp_ensure,
25 iota_system_state::IotaSystemState,
26 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
27 messages_consensus::ConsensusTransaction,
28 messages_grpc::{
29 HandleCertificateRequestV1, HandleCertificateResponseV1,
30 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
31 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
32 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
33 TransactionInfoResponse,
34 },
35 multiaddr::Multiaddr,
36 traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
37 transaction::*,
38};
39use nonempty::{NonEmpty, nonempty};
40use prometheus::{
41 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
42 register_int_counter_with_registry,
43};
44use tap::TapFallible;
45use tokio::task::JoinHandle;
46use tonic::{
47 metadata::{Ascii, MetadataValue},
48 transport::server::TcpConnectInfo,
49};
50use tracing::{Instrument, error, error_span, info};
51
52use crate::{
53 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
54 consensus_adapter::{
55 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
56 },
57 mysticeti_adapter::LazyMysticetiClient,
58 traffic_controller::{
59 TrafficController, metrics::TrafficControllerMetrics, policies::TrafficTally,
60 },
61};
62
63#[cfg(test)]
64#[path = "unit_tests/server_tests.rs"]
65mod server_tests;
66
67pub struct AuthorityServerHandle {
69 tx_cancellation: tokio::sync::oneshot::Sender<()>,
70 local_addr: Multiaddr,
71 handle: JoinHandle<Result<(), tonic::transport::Error>>,
72}
73
74impl AuthorityServerHandle {
75 pub async fn join(self) -> Result<(), io::Error> {
77 self.handle.await?.map_err(io::Error::other)?;
79 Ok(())
80 }
81
82 pub async fn kill(self) -> Result<(), io::Error> {
84 self.tx_cancellation
85 .send(())
86 .map_err(|_e| io::Error::other("could not send cancellation signal!"))?;
87 self.handle.await?.map_err(io::Error::other)?;
88 Ok(())
89 }
90
91 pub fn address(&self) -> &Multiaddr {
93 &self.local_addr
94 }
95}
96
97pub struct AuthorityServer {
99 address: Multiaddr,
100 pub state: Arc<AuthorityState>,
101 consensus_adapter: Arc<ConsensusAdapter>,
102 pub metrics: Arc<ValidatorServiceMetrics>,
103}
104
105impl AuthorityServer {
106 pub fn new_for_test_with_consensus_adapter(
108 state: Arc<AuthorityState>,
109 consensus_adapter: Arc<ConsensusAdapter>,
110 ) -> Self {
111 let address = new_local_tcp_address_for_testing();
112 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
113
114 Self {
115 address,
116 state,
117 consensus_adapter,
118 metrics,
119 }
120 }
121
122 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
124 let consensus_adapter = Arc::new(ConsensusAdapter::new(
125 Arc::new(LazyMysticetiClient::new()),
126 state.name,
127 Arc::new(ConnectionMonitorStatusForTests {}),
128 100_000,
129 100_000,
130 None,
131 None,
132 ConsensusAdapterMetrics::new_test(),
133 ));
134 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
135 }
136
137 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
139 let address = self.address.clone();
140 self.spawn_with_bind_address_for_test(address).await
141 }
142
143 pub async fn spawn_with_bind_address_for_test(
145 self,
146 address: Multiaddr,
147 ) -> Result<AuthorityServerHandle, io::Error> {
148 let mut server = iota_network_stack::config::Config::new()
149 .server_builder()
150 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
151 self.state,
152 self.consensus_adapter,
153 self.metrics,
154 )))
155 .bind(&address)
156 .await
157 .unwrap();
158 let local_addr = server.local_addr().to_owned();
159 info!("Listening to traffic on {local_addr}");
160 let handle = AuthorityServerHandle {
161 tx_cancellation: server.take_cancel_handle().unwrap(),
162 local_addr,
163 handle: spawn_monitored_task!(server.serve()),
164 };
165 Ok(handle)
166 }
167}
168
169pub struct ValidatorServiceMetrics {
171 pub signature_errors: IntCounter,
172 pub tx_verification_latency: IotaHistogram,
173 pub cert_verification_latency: IotaHistogram,
174 pub consensus_latency: IotaHistogram,
175 pub handle_transaction_latency: IotaHistogram,
176 pub submit_certificate_consensus_latency: IotaHistogram,
177 pub handle_certificate_consensus_latency: IotaHistogram,
178 pub handle_certificate_non_consensus_latency: IotaHistogram,
179 pub handle_soft_bundle_certificates_consensus_latency: IotaHistogram,
180 pub handle_soft_bundle_certificates_count: IotaHistogram,
181 pub handle_soft_bundle_certificates_size_bytes: IotaHistogram,
182
183 num_rejected_tx_in_epoch_boundary: IntCounter,
184 num_rejected_cert_in_epoch_boundary: IntCounter,
185 num_rejected_tx_during_overload: IntCounterVec,
186 num_rejected_cert_during_overload: IntCounterVec,
187 connection_ip_not_found: IntCounter,
188 forwarded_header_parse_error: IntCounter,
189 forwarded_header_invalid: IntCounter,
190 forwarded_header_not_included: IntCounter,
191}
192
193impl ValidatorServiceMetrics {
194 pub fn new(registry: &Registry) -> Self {
196 Self {
197 signature_errors: register_int_counter_with_registry!(
198 "total_signature_errors",
199 "Number of transaction signature errors",
200 registry,
201 )
202 .unwrap(),
203 tx_verification_latency: IotaHistogram::new_in_registry(
204 "validator_service_tx_verification_latency",
205 "Latency of verifying a transaction",
206 registry,
207 ),
208 cert_verification_latency: IotaHistogram::new_in_registry(
209 "validator_service_cert_verification_latency",
210 "Latency of verifying a certificate",
211 registry,
212 ),
213 consensus_latency: IotaHistogram::new_in_registry(
214 "validator_service_consensus_latency",
215 "Time spent between submitting a shared obj txn to consensus and getting result",
216 registry,
217 ),
218 handle_transaction_latency: IotaHistogram::new_in_registry(
219 "validator_service_handle_transaction_latency",
220 "Latency of handling a transaction",
221 registry,
222 ),
223 handle_certificate_consensus_latency: IotaHistogram::new_in_registry(
224 "validator_service_handle_certificate_consensus_latency",
225 "Latency of handling a consensus transaction certificate",
226 registry,
227 ),
228 submit_certificate_consensus_latency: IotaHistogram::new_in_registry(
229 "validator_service_submit_certificate_consensus_latency",
230 "Latency of submit_certificate RPC handler",
231 registry,
232 ),
233 handle_certificate_non_consensus_latency: IotaHistogram::new_in_registry(
234 "validator_service_handle_certificate_non_consensus_latency",
235 "Latency of handling a non-consensus transaction certificate",
236 registry,
237 ),
238 handle_soft_bundle_certificates_consensus_latency: IotaHistogram::new_in_registry(
239 "validator_service_handle_soft_bundle_certificates_consensus_latency",
240 "Latency of handling a consensus soft bundle",
241 registry,
242 ),
243 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
244 "validator_service_num_rejected_tx_in_epoch_boundary",
245 "Number of rejected transaction during epoch transitioning",
246 registry,
247 )
248 .unwrap(),
249 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
250 "validator_service_num_rejected_cert_in_epoch_boundary",
251 "Number of rejected transaction certificate during epoch transitioning",
252 registry,
253 )
254 .unwrap(),
255 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
256 "validator_service_num_rejected_tx_during_overload",
257 "Number of rejected transaction due to system overload",
258 &["error_type"],
259 registry,
260 )
261 .unwrap(),
262 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
263 "validator_service_num_rejected_cert_during_overload",
264 "Number of rejected transaction certificate due to system overload",
265 &["error_type"],
266 registry,
267 )
268 .unwrap(),
269 handle_soft_bundle_certificates_count: IotaHistogram::new_in_registry(
270 "handle_soft_bundle_certificates_count",
271 "The number of certificates included in a soft bundle",
272 registry,
273 ),
274 handle_soft_bundle_certificates_size_bytes: IotaHistogram::new_in_registry(
275 "handle_soft_bundle_certificates_size_bytes",
276 "The size of soft bundle in bytes",
277 registry,
278 ),
279 connection_ip_not_found: register_int_counter_with_registry!(
280 "validator_service_connection_ip_not_found",
281 "Number of times connection IP was not extractable from request",
282 registry,
283 )
284 .unwrap(),
285 forwarded_header_parse_error: register_int_counter_with_registry!(
286 "validator_service_forwarded_header_parse_error",
287 "Number of times x-forwarded-for header could not be parsed",
288 registry,
289 )
290 .unwrap(),
291 forwarded_header_invalid: register_int_counter_with_registry!(
292 "validator_service_forwarded_header_invalid",
293 "Number of times x-forwarded-for header was invalid",
294 registry,
295 )
296 .unwrap(),
297 forwarded_header_not_included: register_int_counter_with_registry!(
298 "validator_service_forwarded_header_not_included",
299 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
300 registry,
301 )
302 .unwrap(),
303 }
304 }
305
306 pub fn new_for_tests() -> Self {
308 let registry = Registry::new();
309 Self::new(®istry)
310 }
311}
312
313#[derive(Clone)]
315pub struct ValidatorService {
316 state: Arc<AuthorityState>,
317 consensus_adapter: Arc<ConsensusAdapter>,
318 metrics: Arc<ValidatorServiceMetrics>,
319 traffic_controller: Option<Arc<TrafficController>>,
320 client_id_source: Option<ClientIdSource>,
321}
322
323impl ValidatorService {
324 pub fn new(
326 state: Arc<AuthorityState>,
327 consensus_adapter: Arc<ConsensusAdapter>,
328 validator_metrics: Arc<ValidatorServiceMetrics>,
329 traffic_controller_metrics: TrafficControllerMetrics,
330 policy_config: Option<PolicyConfig>,
331 firewall_config: Option<RemoteFirewallConfig>,
332 ) -> Self {
333 Self {
334 state,
335 consensus_adapter,
336 metrics: validator_metrics,
337 traffic_controller: policy_config.clone().map(|policy| {
338 Arc::new(TrafficController::spawn(
339 policy,
340 traffic_controller_metrics,
341 firewall_config,
342 ))
343 }),
344 client_id_source: policy_config.map(|policy| policy.client_id_source),
345 }
346 }
347
348 pub fn new_for_tests(
349 state: Arc<AuthorityState>,
350 consensus_adapter: Arc<ConsensusAdapter>,
351 metrics: Arc<ValidatorServiceMetrics>,
352 ) -> Self {
353 Self {
354 state,
355 consensus_adapter,
356 metrics,
357 traffic_controller: None,
358 client_id_source: None,
359 }
360 }
361
362 pub fn validator_state(&self) -> &Arc<AuthorityState> {
364 &self.state
365 }
366
367 pub async fn execute_certificate_for_testing(
369 &self,
370 cert: CertifiedTransaction,
371 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
372 let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
373 self.handle_certificate_v1(request).await
374 }
375
376 pub async fn handle_transaction_for_benchmarking(
378 &self,
379 transaction: Transaction,
380 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
381 let request = make_tonic_request_for_testing(transaction);
382 self.transaction(request).await
383 }
384
385 async fn handle_transaction(
387 &self,
388 request: tonic::Request<Transaction>,
389 ) -> WrappedServiceResponse<HandleTransactionResponse> {
390 let Self {
391 state,
392 consensus_adapter,
393 metrics,
394 traffic_controller: _,
395 client_id_source: _,
396 } = self.clone();
397 let transaction = request.into_inner();
398 let epoch_store = state.load_epoch_store_one_call_per_task();
399
400 transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
401
402 let mut validator_pushback_error = None;
411 let overload_check_res = state.check_system_overload(
412 &consensus_adapter,
413 transaction.data(),
414 state.check_system_overload_at_signing(),
415 );
416 if let Err(error) = overload_check_res {
417 metrics
418 .num_rejected_tx_during_overload
419 .with_label_values(&[error.as_ref()])
420 .inc();
421 match error {
423 IotaError::ValidatorOverloadedRetryAfter { .. } => {
424 validator_pushback_error = Some(error)
425 }
426 _ => return Err(error.into()),
427 }
428 }
429
430 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
431
432 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
433 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
434 metrics.signature_errors.inc();
435 })?;
436 drop(tx_verif_metrics_guard);
437
438 let tx_digest = transaction.digest();
439
440 let span = error_span!("validator_state_process_tx", ?tx_digest);
442
443 let info = state
444 .handle_transaction(&epoch_store, transaction.clone())
445 .instrument(span)
446 .await
447 .tap_err(|e| {
448 if let IotaError::ValidatorHaltedAtEpochEnd = e {
449 metrics.num_rejected_tx_in_epoch_boundary.inc();
450 }
451 })?;
452
453 if let Some(error) = validator_pushback_error {
454 return Err(error.into());
457 }
458
459 Ok((tonic::Response::new(info), Weight::zero()))
460 }
461
462 async fn handle_certificates(
468 &self,
469 certificates: NonEmpty<CertifiedTransaction>,
470 include_events: bool,
471 include_input_objects: bool,
472 include_output_objects: bool,
473 _include_auxiliary_data: bool,
474 epoch_store: &Arc<AuthorityPerEpochStore>,
475 wait_for_effects: bool,
476 ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
477 fp_ensure!(
480 !self.state.is_fullnode(epoch_store),
481 IotaError::FullNodeCantHandleCertificate.into()
482 );
483
484 let shared_object_tx = certificates
485 .iter()
486 .any(|cert| cert.contains_shared_object());
487
488 let metrics = if certificates.len() == 1 {
489 if wait_for_effects {
490 if shared_object_tx {
491 &self.metrics.handle_certificate_consensus_latency
492 } else {
493 &self.metrics.handle_certificate_non_consensus_latency
494 }
495 } else {
496 &self.metrics.submit_certificate_consensus_latency
497 }
498 } else {
499 &self
502 .metrics
503 .handle_soft_bundle_certificates_consensus_latency
504 };
505
506 let _metrics_guard = metrics.start_timer();
507
508 if certificates.len() == 1 {
513 let tx_digest = *certificates[0].digest();
514
515 if let Some(signed_effects) = self
516 .state
517 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
518 {
519 let events = if include_events {
520 if let Some(digest) = signed_effects.events_digest() {
521 Some(self.state.get_transaction_events(digest)?)
522 } else {
523 None
524 }
525 } else {
526 None
527 };
528
529 return Ok((
530 Some(vec![HandleCertificateResponseV1 {
531 signed_effects: signed_effects.into_inner(),
532 events,
533 input_objects: None,
534 output_objects: None,
535 auxiliary_data: None,
536 }]),
537 Weight::one(),
538 ));
539 };
540 }
541
542 for certificate in &certificates {
545 let overload_check_res = self.state.check_system_overload(
546 &self.consensus_adapter,
547 certificate.data(),
548 self.state.check_system_overload_at_execution(),
549 );
550 if let Err(error) = overload_check_res {
551 self.metrics
552 .num_rejected_cert_during_overload
553 .with_label_values(&[error.as_ref()])
554 .inc();
555 return Err(error.into());
556 }
557 }
558
559 let verified_certificates = {
560 let _timer = self.metrics.cert_verification_latency.start_timer();
561 epoch_store
562 .signature_verifier
563 .multi_verify_certs(certificates.into())
564 .await
565 .into_iter()
566 .collect::<Result<Vec<_>, _>>()?
567 };
568
569 {
570 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
572 if !reconfiguration_lock.should_accept_user_certs() {
573 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
574 return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
575 }
576
577 if !epoch_store
583 .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
584 {
585 let _metrics_guard = if shared_object_tx {
586 Some(self.metrics.consensus_latency.start_timer())
587 } else {
588 None
589 };
590 let transactions = verified_certificates
591 .iter()
592 .map(|certificate| {
593 ConsensusTransaction::new_certificate_message(
594 &self.state.name,
595 certificate.clone().into(),
596 )
597 })
598 .collect::<Vec<_>>();
599 self.consensus_adapter.submit_batch(
600 &transactions,
601 Some(&reconfiguration_lock),
602 epoch_store,
603 )?;
604 }
608 }
609
610 if !wait_for_effects {
611 let certificates_without_shared_objects = verified_certificates
614 .iter()
615 .filter(|certificate| !certificate.contains_shared_object())
616 .cloned()
617 .collect::<Vec<_>>();
618 if !certificates_without_shared_objects.is_empty() {
619 self.state.enqueue_certificates_for_execution(
620 certificates_without_shared_objects,
621 epoch_store,
622 );
623 }
624 return Ok((None, Weight::zero()));
625 }
626
627 let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
631 |certificate| async move {
632 let effects = self
633 .state
634 .execute_certificate(&certificate, epoch_store)
635 .await?;
636 let events = if include_events {
637 if let Some(digest) = effects.events_digest() {
638 Some(self.state.get_transaction_events(digest)?)
639 } else {
640 None
641 }
642 } else {
643 None
644 };
645
646 let input_objects = include_input_objects
647 .then(|| self.state.get_transaction_input_objects(&effects))
648 .and_then(Result::ok);
649
650 let output_objects = include_output_objects
651 .then(|| self.state.get_transaction_output_objects(&effects))
652 .and_then(Result::ok);
653
654 let signed_effects = self.state.sign_effects(effects, epoch_store)?;
655 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
656
657 Ok::<_, IotaError>(HandleCertificateResponseV1 {
658 signed_effects: signed_effects.into_inner(),
659 events,
660 input_objects,
661 output_objects,
662 auxiliary_data: None, })
664 },
665 ))
666 .await?;
667
668 Ok((Some(responses), Weight::zero()))
669 }
670}
671
672type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
673
674impl ValidatorService {
675 async fn transaction_impl(
676 &self,
677 request: tonic::Request<Transaction>,
678 ) -> WrappedServiceResponse<HandleTransactionResponse> {
679 self.handle_transaction(request).await
680 }
681
682 async fn submit_certificate_impl(
683 &self,
684 request: tonic::Request<CertifiedTransaction>,
685 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
686 let epoch_store = self.state.load_epoch_store_one_call_per_task();
687 let certificate = request.into_inner();
688 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
689
690 let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
691 self.handle_certificates(
692 nonempty![certificate],
693 true,
694 false,
695 false,
696 false,
697 &epoch_store,
698 false,
699 )
700 .instrument(span)
701 .await
702 .map(|(executed, spam_weight)| {
703 (
704 tonic::Response::new(SubmitCertificateResponse {
705 executed: executed.map(|mut x| x.remove(0)),
706 }),
707 spam_weight,
708 )
709 })
710 }
711
712 async fn handle_certificate_v1_impl(
713 &self,
714 request: tonic::Request<HandleCertificateRequestV1>,
715 ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
716 let epoch_store = self.state.load_epoch_store_one_call_per_task();
717 let request = request.into_inner();
718 request
719 .certificate
720 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
721
722 let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
723 self.handle_certificates(
724 nonempty![request.certificate],
725 request.include_events,
726 request.include_input_objects,
727 request.include_output_objects,
728 request.include_auxiliary_data,
729 &epoch_store,
730 true,
731 )
732 .instrument(span)
733 .await
734 .map(|(resp, spam_weight)| {
735 (
736 tonic::Response::new(
737 resp.expect(
738 "handle_certificate should not return none with wait_for_effects=true",
739 )
740 .remove(0),
741 ),
742 spam_weight,
743 )
744 })
745 }
746
747 async fn soft_bundle_validity_check(
748 &self,
749 certificates: &NonEmpty<CertifiedTransaction>,
750 epoch_store: &Arc<AuthorityPerEpochStore>,
751 ) -> Result<(), tonic::Status> {
752 let protocol_config = epoch_store.protocol_config();
753
754 fp_ensure!(
760 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
761 IotaError::UserInput {
762 error: UserInputError::TooManyTransactionsInSoftBundle {
763 limit: protocol_config.max_soft_bundle_size()
764 }
765 }
766 .into()
767 );
768 let mut gas_price = None;
769 for certificate in certificates {
770 let tx_digest = *certificate.digest();
771 fp_ensure!(
772 certificate.contains_shared_object(),
773 IotaError::UserInput {
774 error: UserInputError::NoSharedObject { digest: tx_digest }
775 }
776 .into()
777 );
778 fp_ensure!(
779 !self.state.is_tx_already_executed(&tx_digest)?,
780 IotaError::UserInput {
781 error: UserInputError::AlreadyExecuted { digest: tx_digest }
782 }
783 .into()
784 );
785 if let Some(gas) = gas_price {
786 fp_ensure!(
787 gas == certificate.gas_price(),
788 IotaError::UserInput {
789 error: UserInputError::GasPriceMismatch {
790 digest: tx_digest,
791 expected: gas,
792 actual: certificate.gas_price()
793 }
794 }
795 .into()
796 );
797 } else {
798 gas_price = Some(certificate.gas_price());
799 }
800 }
801
802 fp_ensure!(
808 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
809 IotaError::UserInput {
810 error: UserInputError::CertificateAlreadyProcessed
811 }
812 .into()
813 );
814
815 Ok(())
816 }
817
818 async fn handle_soft_bundle_certificates_v1_impl(
819 &self,
820 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
821 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
822 let epoch_store = self.state.load_epoch_store_one_call_per_task();
823 let client_addr = if self.client_id_source.is_none() {
824 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
825 } else {
826 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
827 };
828 let request = request.into_inner();
829
830 let certificates =
831 NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
832 let mut total_size_bytes = 0;
833 for certificate in &certificates {
834 total_size_bytes +=
836 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
837 }
838
839 self.metrics
840 .handle_soft_bundle_certificates_count
841 .observe(certificates.len() as u64);
842
843 self.metrics
844 .handle_soft_bundle_certificates_size_bytes
845 .observe(total_size_bytes as u64);
846
847 self.soft_bundle_validity_check(&certificates, &epoch_store)
849 .await?;
850
851 info!(
852 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}]",
853 certificates.len(),
854 client_addr
855 .map(|x| x.to_string())
856 .unwrap_or_else(|| "unknown".to_string()),
857 certificates
858 .iter()
859 .map(|x| x.digest().to_string())
860 .collect::<Vec<_>>()
861 .join(", ")
862 );
863
864 let span = error_span!("handle_soft_bundle_certificates_v1");
865 self.handle_certificates(
866 certificates,
867 request.include_events,
868 request.include_input_objects,
869 request.include_output_objects,
870 request.include_auxiliary_data,
871 &epoch_store,
872 request.wait_for_effects,
873 )
874 .instrument(span)
875 .await
876 .map(|(resp, spam_weight)| {
877 (
878 tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
879 responses: resp.unwrap_or_default(),
880 }),
881 spam_weight,
882 )
883 })
884 }
885
886 async fn object_info_impl(
887 &self,
888 request: tonic::Request<ObjectInfoRequest>,
889 ) -> WrappedServiceResponse<ObjectInfoResponse> {
890 let request = request.into_inner();
891 let response = self.state.handle_object_info_request(request).await?;
892 Ok((tonic::Response::new(response), Weight::one()))
893 }
894
895 async fn transaction_info_impl(
896 &self,
897 request: tonic::Request<TransactionInfoRequest>,
898 ) -> WrappedServiceResponse<TransactionInfoResponse> {
899 let request = request.into_inner();
900 let response = self.state.handle_transaction_info_request(request).await?;
901 Ok((tonic::Response::new(response), Weight::one()))
902 }
903
904 async fn checkpoint_impl(
905 &self,
906 request: tonic::Request<CheckpointRequest>,
907 ) -> WrappedServiceResponse<CheckpointResponse> {
908 let request = request.into_inner();
909 let response = self.state.handle_checkpoint_request(&request)?;
910 Ok((tonic::Response::new(response), Weight::one()))
911 }
912
913 async fn get_system_state_object_impl(
914 &self,
915 _request: tonic::Request<SystemStateRequest>,
916 ) -> WrappedServiceResponse<IotaSystemState> {
917 let response = self
918 .state
919 .get_object_cache_reader()
920 .get_iota_system_state_object_unsafe()?;
921 Ok((tonic::Response::new(response), Weight::one()))
922 }
923
924 fn get_client_ip_addr<T>(
925 &self,
926 request: &tonic::Request<T>,
927 source: &ClientIdSource,
928 ) -> Option<IpAddr> {
929 match source {
930 ClientIdSource::SocketAddr => {
931 let socket_addr: Option<SocketAddr> = request.remote_addr();
932
933 if let Some(socket_addr) = socket_addr {
939 Some(socket_addr.ip())
940 } else {
941 if cfg!(msim) {
942 } else if cfg!(test) {
944 panic!("Failed to get remote address from request");
945 } else {
946 self.metrics.connection_ip_not_found.inc();
947 error!("Failed to get remote address from request");
948 }
949 None
950 }
951 }
952 ClientIdSource::XForwardedFor(num_hops) => {
953 let do_header_parse = |op: &MetadataValue<Ascii>| {
954 match op.to_str() {
955 Ok(header_val) => {
956 let header_contents =
957 header_val.split(',').map(str::trim).collect::<Vec<_>>();
958 if *num_hops == 0 {
959 error!(
960 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
961 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
962 to this node. Skipping traffic controller request handling.",
963 header_contents,
964 );
965 return None;
966 }
967 let contents_len = header_contents.len();
968 let Some(client_ip) = header_contents.get(contents_len - num_hops)
969 else {
970 error!(
971 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
972 Expected at least {} values. Skipping traffic controller request handling.",
973 header_contents, contents_len, num_hops, contents_len,
974 );
975 return None;
976 };
977 client_ip.parse::<IpAddr>().ok().or_else(|| {
978 client_ip.parse::<SocketAddr>().ok().map(|socket_addr| socket_addr.ip()).or_else(|| {
979 self.metrics.forwarded_header_parse_error.inc();
980 error!(
981 "Failed to parse x-forwarded-for header value of {:?} to ip address or socket. \
982 Please ensure that your proxy is configured to resolve client domains to an \
983 IP address before writing header",
984 client_ip,
985 );
986 None
987 })
988 })
989 }
990 Err(e) => {
991 self.metrics.forwarded_header_invalid.inc();
995 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
996 None
997 }
998 }
999 };
1000 if let Some(op) = request.metadata().get("x-forwarded-for") {
1001 do_header_parse(op)
1002 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1003 do_header_parse(op)
1004 } else {
1005 self.metrics.forwarded_header_not_included.inc();
1006 error!(
1007 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1008 );
1009 None
1010 }
1011 }
1012 }
1013 }
1014
1015 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1016 if let Some(traffic_controller) = &self.traffic_controller {
1017 if !traffic_controller.check(&client, &None).await {
1018 Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1020 } else {
1021 Ok(())
1022 }
1023 } else {
1024 Ok(())
1025 }
1026 }
1027
1028 fn handle_traffic_resp<T>(
1029 &self,
1030 client: Option<IpAddr>,
1031 wrapped_response: WrappedServiceResponse<T>,
1032 ) -> Result<tonic::Response<T>, tonic::Status> {
1033 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1034 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1035 Err(status) => (
1036 Some(IotaError::from(status.clone())),
1037 Weight::zero(),
1038 Err(status.clone()),
1039 ),
1040 };
1041
1042 if let Some(traffic_controller) = self.traffic_controller.clone() {
1043 traffic_controller.tally(TrafficTally {
1044 direct: client,
1045 through_fullnode: None,
1046 error_weight: error.map(normalize).unwrap_or(Weight::zero()),
1047 spam_weight,
1048 timestamp: SystemTime::now(),
1049 })
1050 }
1051 unwrapped_response
1052 }
1053}
1054
1055fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1056 let mut request = tonic::Request::new(message);
1059 let tcp_connect_info = TcpConnectInfo {
1060 local_addr: None,
1061 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1062 };
1063 request.extensions_mut().insert(tcp_connect_info);
1064 request
1065}
1066
1067fn normalize(err: IotaError) -> Weight {
1069 match err {
1070 IotaError::UserInput { .. }
1071 | IotaError::InvalidSignature { .. }
1072 | IotaError::SignerSignatureAbsent { .. }
1073 | IotaError::SignerSignatureNumberMismatch { .. }
1074 | IotaError::IncorrectSigner { .. }
1075 | IotaError::UnknownSigner { .. }
1076 | IotaError::WrongEpoch { .. } => Weight::one(),
1077 _ => Weight::zero(),
1078 }
1079}
1080
1081#[macro_export]
1085macro_rules! handle_with_decoration {
1086 ($self:ident, $func_name:ident, $request:ident) => {{
1087 if $self.client_id_source.is_none() {
1088 return $self.$func_name($request).await.map(|(result, _)| result);
1089 }
1090
1091 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1092
1093 $self.handle_traffic_req(client.clone()).await?;
1095
1096 let wrapped_response = $self.$func_name($request).await;
1098 $self.handle_traffic_resp(client, wrapped_response)
1099 }};
1100}
1101
1102#[async_trait]
1103impl Validator for ValidatorService {
1104 async fn transaction(
1106 &self,
1107 request: tonic::Request<Transaction>,
1108 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1109 let validator_service = self.clone();
1110
1111 spawn_monitored_task!(async move {
1115 handle_with_decoration!(validator_service, transaction_impl, request)
1119 })
1120 .await
1121 .unwrap()
1122 }
1123
1124 async fn submit_certificate(
1126 &self,
1127 request: tonic::Request<CertifiedTransaction>,
1128 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1129 let validator_service = self.clone();
1130
1131 spawn_monitored_task!(async move {
1135 handle_with_decoration!(validator_service, submit_certificate_impl, request)
1139 })
1140 .await
1141 .unwrap()
1142 }
1143
1144 async fn handle_certificate_v1(
1145 &self,
1146 request: tonic::Request<HandleCertificateRequestV1>,
1147 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1148 handle_with_decoration!(self, handle_certificate_v1_impl, request)
1149 }
1150
1151 async fn handle_soft_bundle_certificates_v1(
1152 &self,
1153 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1154 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1155 handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1156 }
1157
1158 async fn object_info(
1160 &self,
1161 request: tonic::Request<ObjectInfoRequest>,
1162 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1163 handle_with_decoration!(self, object_info_impl, request)
1164 }
1165
1166 async fn transaction_info(
1168 &self,
1169 request: tonic::Request<TransactionInfoRequest>,
1170 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1171 handle_with_decoration!(self, transaction_info_impl, request)
1172 }
1173
1174 async fn checkpoint(
1176 &self,
1177 request: tonic::Request<CheckpointRequest>,
1178 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1179 handle_with_decoration!(self, checkpoint_impl, request)
1180 }
1181
1182 async fn get_system_state_object(
1184 &self,
1185 request: tonic::Request<SystemStateRequest>,
1186 ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1187 handle_with_decoration!(self, get_system_state_object_impl, request)
1188 }
1189}