1use std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19 api::{Validator, ValidatorServer},
20 tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24 effects::TransactionEffectsAPI,
25 error::*,
26 fp_ensure,
27 iota_system_state::IotaSystemState,
28 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29 messages_consensus::ConsensusTransaction,
30 messages_grpc::{
31 HandleCertificateRequestV1, HandleCertificateResponseV1,
32 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
33 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
34 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
35 TransactionInfoResponse,
36 },
37 multiaddr::Multiaddr,
38 traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
39 transaction::*,
40};
41use nonempty::{NonEmpty, nonempty};
42use prometheus::{
43 Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
44 register_int_counter_vec_with_registry, register_int_counter_with_registry,
45};
46use tap::TapFallible;
47use tonic::{
48 metadata::{Ascii, MetadataValue},
49 transport::server::TcpConnectInfo,
50};
51use tracing::{Instrument, error, error_span, info};
52
53use crate::{
54 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
55 checkpoints::CheckpointStore,
56 consensus_adapter::{
57 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
58 },
59 mysticeti_adapter::LazyMysticetiClient,
60 traffic_controller::{
61 TrafficController, metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally,
62 },
63};
64
65#[cfg(test)]
66#[path = "unit_tests/server_tests.rs"]
67mod server_tests;
68
69pub struct AuthorityServerHandle {
71 server_handle: iota_network_stack::server::Server,
72}
73
74impl AuthorityServerHandle {
75 pub async fn join(self) -> Result<(), io::Error> {
77 self.server_handle.handle().wait_for_shutdown().await;
78 Ok(())
79 }
80
81 pub async fn kill(self) -> Result<(), io::Error> {
83 self.server_handle.handle().shutdown().await;
84 Ok(())
85 }
86
87 pub fn address(&self) -> &Multiaddr {
89 self.server_handle.local_addr()
90 }
91}
92
93pub struct AuthorityServer {
95 address: Multiaddr,
96 pub state: Arc<AuthorityState>,
97 consensus_adapter: Arc<ConsensusAdapter>,
98 pub metrics: Arc<ValidatorServiceMetrics>,
99}
100
101impl AuthorityServer {
102 pub fn new_for_test_with_consensus_adapter(
104 state: Arc<AuthorityState>,
105 consensus_adapter: Arc<ConsensusAdapter>,
106 ) -> Self {
107 let address = new_local_tcp_address_for_testing();
108 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
109
110 Self {
111 address,
112 state,
113 consensus_adapter,
114 metrics,
115 }
116 }
117
118 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
120 let consensus_adapter = Arc::new(ConsensusAdapter::new(
121 Arc::new(LazyMysticetiClient::new()),
122 CheckpointStore::new_for_tests(),
123 state.name,
124 Arc::new(ConnectionMonitorStatusForTests {}),
125 100_000,
126 100_000,
127 None,
128 None,
129 ConsensusAdapterMetrics::new_test(),
130 ));
131 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
132 }
133
134 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
136 let address = self.address.clone();
137 self.spawn_with_bind_address_for_test(address).await
138 }
139
140 pub async fn spawn_with_bind_address_for_test(
142 self,
143 address: Multiaddr,
144 ) -> Result<AuthorityServerHandle, io::Error> {
145 let tls_config = iota_tls::create_rustls_server_config(
146 self.state.config.network_key_pair().copy().private(),
147 IOTA_TLS_SERVER_NAME.to_string(),
148 );
149 let server = iota_network_stack::config::Config::new()
150 .server_builder()
151 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
152 self.state,
153 self.consensus_adapter,
154 self.metrics,
155 )))
156 .bind(&address, Some(tls_config))
157 .await
158 .unwrap();
159 let local_addr = server.local_addr().to_owned();
160 info!("Listening to traffic on {local_addr}");
161 let handle = AuthorityServerHandle {
162 server_handle: server,
163 };
164 Ok(handle)
165 }
166}
167
168pub struct ValidatorServiceMetrics {
170 pub signature_errors: IntCounter,
171 pub tx_verification_latency: Histogram,
172 pub cert_verification_latency: Histogram,
173 pub consensus_latency: Histogram,
174 pub handle_transaction_latency: Histogram,
175 pub submit_certificate_consensus_latency: Histogram,
176 pub handle_certificate_consensus_latency: Histogram,
177 pub handle_certificate_non_consensus_latency: Histogram,
178 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
179 pub handle_soft_bundle_certificates_count: Histogram,
180 pub handle_soft_bundle_certificates_size_bytes: Histogram,
181
182 num_rejected_tx_in_epoch_boundary: IntCounter,
183 num_rejected_cert_in_epoch_boundary: IntCounter,
184 num_rejected_tx_during_overload: IntCounterVec,
185 num_rejected_cert_during_overload: IntCounterVec,
186 connection_ip_not_found: IntCounter,
187 forwarded_header_parse_error: IntCounter,
188 forwarded_header_invalid: IntCounter,
189 forwarded_header_not_included: IntCounter,
190 client_id_source_config_mismatch: IntCounter,
191}
192
193impl ValidatorServiceMetrics {
194 pub fn new(registry: &Registry) -> Self {
196 Self {
197 signature_errors: register_int_counter_with_registry!(
198 "total_signature_errors",
199 "Number of transaction signature errors",
200 registry,
201 )
202 .unwrap(),
203 tx_verification_latency: register_histogram_with_registry!(
204 "validator_service_tx_verification_latency",
205 "Latency of verifying a transaction",
206 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
207 registry,
208 )
209 .unwrap(),
210 cert_verification_latency: register_histogram_with_registry!(
211 "validator_service_cert_verification_latency",
212 "Latency of verifying a certificate",
213 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
214 registry,
215 )
216 .unwrap(),
217 consensus_latency: register_histogram_with_registry!(
218 "validator_service_consensus_latency",
219 "Time spent between submitting a shared obj txn to consensus and getting result",
220 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
221 registry,
222 )
223 .unwrap(),
224 handle_transaction_latency: register_histogram_with_registry!(
225 "validator_service_handle_transaction_latency",
226 "Latency of handling a transaction",
227 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
228 registry,
229 )
230 .unwrap(),
231 handle_certificate_consensus_latency: register_histogram_with_registry!(
232 "validator_service_handle_certificate_consensus_latency",
233 "Latency of handling a consensus transaction certificate",
234 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
235 registry,
236 )
237 .unwrap(),
238 submit_certificate_consensus_latency: register_histogram_with_registry!(
239 "validator_service_submit_certificate_consensus_latency",
240 "Latency of submit_certificate RPC handler",
241 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
242 registry,
243 )
244 .unwrap(),
245 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
246 "validator_service_handle_certificate_non_consensus_latency",
247 "Latency of handling a non-consensus transaction certificate",
248 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249 registry,
250 )
251 .unwrap(),
252 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
253 "validator_service_handle_soft_bundle_certificates_consensus_latency",
254 "Latency of handling a consensus soft bundle",
255 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
256 registry,
257 )
258 .unwrap(),
259 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
260 "validator_service_handle_soft_bundle_certificates_count",
261 "The number of certificates included in a soft bundle",
262 iota_metrics::COUNT_BUCKETS.to_vec(),
263 registry,
264 )
265 .unwrap(),
266 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
267 "validator_service_handle_soft_bundle_certificates_size_bytes",
268 "The size of soft bundle in bytes",
269 iota_metrics::BYTES_BUCKETS.to_vec(),
270 registry,
271 )
272 .unwrap(),
273 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
274 "validator_service_num_rejected_tx_in_epoch_boundary",
275 "Number of rejected transaction during epoch transitioning",
276 registry,
277 )
278 .unwrap(),
279 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
280 "validator_service_num_rejected_cert_in_epoch_boundary",
281 "Number of rejected transaction certificate during epoch transitioning",
282 registry,
283 )
284 .unwrap(),
285 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
286 "validator_service_num_rejected_tx_during_overload",
287 "Number of rejected transaction due to system overload",
288 &["error_type"],
289 registry,
290 )
291 .unwrap(),
292 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
293 "validator_service_num_rejected_cert_during_overload",
294 "Number of rejected transaction certificate due to system overload",
295 &["error_type"],
296 registry,
297 )
298 .unwrap(),
299 connection_ip_not_found: register_int_counter_with_registry!(
300 "validator_service_connection_ip_not_found",
301 "Number of times connection IP was not extractable from request",
302 registry,
303 )
304 .unwrap(),
305 forwarded_header_parse_error: register_int_counter_with_registry!(
306 "validator_service_forwarded_header_parse_error",
307 "Number of times x-forwarded-for header could not be parsed",
308 registry,
309 )
310 .unwrap(),
311 forwarded_header_invalid: register_int_counter_with_registry!(
312 "validator_service_forwarded_header_invalid",
313 "Number of times x-forwarded-for header was invalid",
314 registry,
315 )
316 .unwrap(),
317 forwarded_header_not_included: register_int_counter_with_registry!(
318 "validator_service_forwarded_header_not_included",
319 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
320 registry,
321 )
322 .unwrap(),
323 client_id_source_config_mismatch: register_int_counter_with_registry!(
324 "validator_service_client_id_source_config_mismatch",
325 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
326 registry,
327 )
328 .unwrap(),
329 }
330 }
331
332 pub fn new_for_tests() -> Self {
334 let registry = Registry::new();
335 Self::new(®istry)
336 }
337}
338
339#[derive(Clone)]
341pub struct ValidatorService {
342 state: Arc<AuthorityState>,
343 consensus_adapter: Arc<ConsensusAdapter>,
344 metrics: Arc<ValidatorServiceMetrics>,
345 traffic_controller: Option<Arc<TrafficController>>,
346 client_id_source: Option<ClientIdSource>,
347}
348
349impl ValidatorService {
350 pub fn new(
352 state: Arc<AuthorityState>,
353 consensus_adapter: Arc<ConsensusAdapter>,
354 validator_metrics: Arc<ValidatorServiceMetrics>,
355 traffic_controller_metrics: TrafficControllerMetrics,
356 policy_config: Option<PolicyConfig>,
357 firewall_config: Option<RemoteFirewallConfig>,
358 ) -> Self {
359 Self {
360 state,
361 consensus_adapter,
362 metrics: validator_metrics,
363 traffic_controller: policy_config.clone().map(|policy| {
364 Arc::new(TrafficController::init(
365 policy,
366 traffic_controller_metrics,
367 firewall_config,
368 ))
369 }),
370 client_id_source: policy_config.map(|policy| policy.client_id_source),
371 }
372 }
373
374 pub fn new_for_tests(
375 state: Arc<AuthorityState>,
376 consensus_adapter: Arc<ConsensusAdapter>,
377 metrics: Arc<ValidatorServiceMetrics>,
378 ) -> Self {
379 Self {
380 state,
381 consensus_adapter,
382 metrics,
383 traffic_controller: None,
384 client_id_source: None,
385 }
386 }
387
388 pub fn validator_state(&self) -> &Arc<AuthorityState> {
390 &self.state
391 }
392
393 pub async fn execute_certificate_for_testing(
395 &self,
396 cert: CertifiedTransaction,
397 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
398 let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
399 self.handle_certificate_v1(request).await
400 }
401
402 pub async fn handle_transaction_for_benchmarking(
404 &self,
405 transaction: Transaction,
406 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
407 let request = make_tonic_request_for_testing(transaction);
408 self.transaction(request).await
409 }
410
411 async fn handle_transaction(
413 &self,
414 request: tonic::Request<Transaction>,
415 ) -> WrappedServiceResponse<HandleTransactionResponse> {
416 let Self {
417 state,
418 consensus_adapter,
419 metrics,
420 traffic_controller: _,
421 client_id_source: _,
422 } = self.clone();
423 let transaction = request.into_inner();
424 let epoch_store = state.load_epoch_store_one_call_per_task();
425
426 transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
427
428 let mut validator_pushback_error = None;
437 let overload_check_res = state.check_system_overload(
438 &consensus_adapter,
439 transaction.data(),
440 state.check_system_overload_at_signing(),
441 );
442 if let Err(error) = overload_check_res {
443 metrics
444 .num_rejected_tx_during_overload
445 .with_label_values(&[error.as_ref()])
446 .inc();
447 match error {
449 IotaError::ValidatorOverloadedRetryAfter { .. } => {
450 validator_pushback_error = Some(error)
451 }
452 _ => return Err(error.into()),
453 }
454 }
455
456 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
457
458 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
459 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
460 metrics.signature_errors.inc();
461 })?;
462 drop(tx_verif_metrics_guard);
463
464 let tx_digest = transaction.digest();
465
466 let span = error_span!("validator_state_process_tx", ?tx_digest);
468
469 let info = state
470 .handle_transaction(&epoch_store, transaction.clone())
471 .instrument(span)
472 .await
473 .tap_err(|e| {
474 if let IotaError::ValidatorHaltedAtEpochEnd = e {
475 metrics.num_rejected_tx_in_epoch_boundary.inc();
476 }
477 })?;
478
479 if let Some(error) = validator_pushback_error {
480 return Err(error.into());
483 }
484
485 Ok((tonic::Response::new(info), Weight::zero()))
486 }
487
488 async fn handle_certificates(
494 &self,
495 certificates: NonEmpty<CertifiedTransaction>,
496 include_events: bool,
497 include_input_objects: bool,
498 include_output_objects: bool,
499 _include_auxiliary_data: bool,
500 epoch_store: &Arc<AuthorityPerEpochStore>,
501 wait_for_effects: bool,
502 ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
503 fp_ensure!(
506 !self.state.is_fullnode(epoch_store),
507 IotaError::FullNodeCantHandleCertificate.into()
508 );
509
510 let shared_object_tx = certificates
511 .iter()
512 .any(|cert| cert.contains_shared_object());
513
514 let metrics = if certificates.len() == 1 {
515 if wait_for_effects {
516 if shared_object_tx {
517 &self.metrics.handle_certificate_consensus_latency
518 } else {
519 &self.metrics.handle_certificate_non_consensus_latency
520 }
521 } else {
522 &self.metrics.submit_certificate_consensus_latency
523 }
524 } else {
525 &self
528 .metrics
529 .handle_soft_bundle_certificates_consensus_latency
530 };
531
532 let _metrics_guard = metrics.start_timer();
533
534 if certificates.len() == 1 {
539 let tx_digest = *certificates[0].digest();
540
541 if let Some(signed_effects) = self
542 .state
543 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
544 {
545 let events = if include_events {
546 if let Some(digest) = signed_effects.events_digest() {
547 Some(self.state.get_transaction_events(digest)?)
548 } else {
549 None
550 }
551 } else {
552 None
553 };
554
555 return Ok((
556 Some(vec![HandleCertificateResponseV1 {
557 signed_effects: signed_effects.into_inner(),
558 events,
559 input_objects: None,
560 output_objects: None,
561 auxiliary_data: None,
562 }]),
563 Weight::one(),
564 ));
565 };
566 }
567
568 for certificate in &certificates {
571 let overload_check_res = self.state.check_system_overload(
572 &self.consensus_adapter,
573 certificate.data(),
574 self.state.check_system_overload_at_execution(),
575 );
576 if let Err(error) = overload_check_res {
577 self.metrics
578 .num_rejected_cert_during_overload
579 .with_label_values(&[error.as_ref()])
580 .inc();
581 return Err(error.into());
582 }
583 }
584
585 let verified_certificates = {
586 let _timer = self.metrics.cert_verification_latency.start_timer();
587 epoch_store
588 .signature_verifier
589 .multi_verify_certs(certificates.into())
590 .await
591 .into_iter()
592 .collect::<Result<Vec<_>, _>>()?
593 };
594
595 {
596 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
598 if !reconfiguration_lock.should_accept_user_certs() {
599 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
600 return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
601 }
602
603 if !epoch_store
609 .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
610 {
611 let _metrics_guard = if shared_object_tx {
612 Some(self.metrics.consensus_latency.start_timer())
613 } else {
614 None
615 };
616 let transactions = verified_certificates
617 .iter()
618 .map(|certificate| {
619 ConsensusTransaction::new_certificate_message(
620 &self.state.name,
621 certificate.clone().into(),
622 )
623 })
624 .collect::<Vec<_>>();
625 self.consensus_adapter.submit_batch(
626 &transactions,
627 Some(&reconfiguration_lock),
628 epoch_store,
629 )?;
630 }
634 }
635
636 if !wait_for_effects {
637 let certificates_without_shared_objects = verified_certificates
640 .iter()
641 .filter(|certificate| !certificate.contains_shared_object())
642 .cloned()
643 .collect::<Vec<_>>();
644 if !certificates_without_shared_objects.is_empty() {
645 self.state.enqueue_certificates_for_execution(
646 certificates_without_shared_objects,
647 epoch_store,
648 );
649 }
650 return Ok((None, Weight::zero()));
651 }
652
653 let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
657 |certificate| async move {
658 let effects = self
659 .state
660 .execute_certificate(&certificate, epoch_store)
661 .await?;
662 let events = if include_events {
663 if let Some(digest) = effects.events_digest() {
664 Some(self.state.get_transaction_events(digest)?)
665 } else {
666 None
667 }
668 } else {
669 None
670 };
671
672 let input_objects = include_input_objects
673 .then(|| self.state.get_transaction_input_objects(&effects))
674 .and_then(Result::ok);
675
676 let output_objects = include_output_objects
677 .then(|| self.state.get_transaction_output_objects(&effects))
678 .and_then(Result::ok);
679
680 let signed_effects = self.state.sign_effects(effects, epoch_store)?;
681 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
682
683 Ok::<_, IotaError>(HandleCertificateResponseV1 {
684 signed_effects: signed_effects.into_inner(),
685 events,
686 input_objects,
687 output_objects,
688 auxiliary_data: None, })
690 },
691 ))
692 .await?;
693
694 Ok((Some(responses), Weight::zero()))
695 }
696}
697
698type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
699
700impl ValidatorService {
701 async fn transaction_impl(
702 &self,
703 request: tonic::Request<Transaction>,
704 ) -> WrappedServiceResponse<HandleTransactionResponse> {
705 self.handle_transaction(request).await
706 }
707
708 async fn submit_certificate_impl(
709 &self,
710 request: tonic::Request<CertifiedTransaction>,
711 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
712 let epoch_store = self.state.load_epoch_store_one_call_per_task();
713 let certificate = request.into_inner();
714 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
715
716 let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
717 self.handle_certificates(
718 nonempty![certificate],
719 true,
720 false,
721 false,
722 false,
723 &epoch_store,
724 false,
725 )
726 .instrument(span)
727 .await
728 .map(|(executed, spam_weight)| {
729 (
730 tonic::Response::new(SubmitCertificateResponse {
731 executed: executed.map(|mut x| x.remove(0)),
732 }),
733 spam_weight,
734 )
735 })
736 }
737
738 async fn handle_certificate_v1_impl(
739 &self,
740 request: tonic::Request<HandleCertificateRequestV1>,
741 ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
742 let epoch_store = self.state.load_epoch_store_one_call_per_task();
743 let request = request.into_inner();
744 request
745 .certificate
746 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
747
748 let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
749 self.handle_certificates(
750 nonempty![request.certificate],
751 request.include_events,
752 request.include_input_objects,
753 request.include_output_objects,
754 request.include_auxiliary_data,
755 &epoch_store,
756 true,
757 )
758 .instrument(span)
759 .await
760 .map(|(resp, spam_weight)| {
761 (
762 tonic::Response::new(
763 resp.expect(
764 "handle_certificate should not return none with wait_for_effects=true",
765 )
766 .remove(0),
767 ),
768 spam_weight,
769 )
770 })
771 }
772
773 async fn soft_bundle_validity_check(
774 &self,
775 certificates: &NonEmpty<CertifiedTransaction>,
776 epoch_store: &Arc<AuthorityPerEpochStore>,
777 total_size_bytes: u64,
778 ) -> Result<(), tonic::Status> {
779 let protocol_config = epoch_store.protocol_config();
780
781 fp_ensure!(
788 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
789 IotaError::UserInput {
790 error: UserInputError::TooManyTransactionsInSoftBundle {
791 limit: protocol_config.max_soft_bundle_size()
792 }
793 }
794 .into()
795 );
796
797 let soft_bundle_max_size_bytes =
803 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
804 fp_ensure!(
805 total_size_bytes <= soft_bundle_max_size_bytes,
806 IotaError::UserInput {
807 error: UserInputError::SoftBundleTooLarge {
808 size: total_size_bytes,
809 limit: soft_bundle_max_size_bytes,
810 },
811 }
812 .into()
813 );
814
815 let mut gas_price = None;
816 for certificate in certificates {
817 let tx_digest = *certificate.digest();
818 fp_ensure!(
819 certificate.contains_shared_object(),
820 IotaError::UserInput {
821 error: UserInputError::NoSharedObject { digest: tx_digest }
822 }
823 .into()
824 );
825 fp_ensure!(
826 !self.state.try_is_tx_already_executed(&tx_digest)?,
827 IotaError::UserInput {
828 error: UserInputError::AlreadyExecuted { digest: tx_digest }
829 }
830 .into()
831 );
832 if let Some(gas) = gas_price {
833 fp_ensure!(
834 gas == certificate.gas_price(),
835 IotaError::UserInput {
836 error: UserInputError::GasPriceMismatch {
837 digest: tx_digest,
838 expected: gas,
839 actual: certificate.gas_price()
840 }
841 }
842 .into()
843 );
844 } else {
845 gas_price = Some(certificate.gas_price());
846 }
847 }
848
849 fp_ensure!(
855 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
856 IotaError::UserInput {
857 error: UserInputError::CertificateAlreadyProcessed
858 }
859 .into()
860 );
861
862 Ok(())
863 }
864
865 async fn handle_soft_bundle_certificates_v1_impl(
866 &self,
867 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
868 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
869 let epoch_store = self.state.load_epoch_store_one_call_per_task();
870 let client_addr = if self.client_id_source.is_none() {
871 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
872 } else {
873 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
874 };
875 let request = request.into_inner();
876
877 let certificates =
878 NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
879 let mut total_size_bytes = 0;
880 for certificate in &certificates {
881 total_size_bytes += certificate
883 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
884 as u64;
885 }
886
887 self.metrics
888 .handle_soft_bundle_certificates_count
889 .observe(certificates.len() as f64);
890
891 self.metrics
892 .handle_soft_bundle_certificates_size_bytes
893 .observe(total_size_bytes as f64);
894
895 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
897 .await?;
898
899 info!(
900 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
901 certificates.len(),
902 client_addr
903 .map(|x| x.to_string())
904 .unwrap_or_else(|| "unknown".to_string()),
905 certificates
906 .iter()
907 .map(|x| x.digest().to_string())
908 .collect::<Vec<_>>()
909 .join(", "),
910 total_size_bytes
911 );
912
913 let span = error_span!("handle_soft_bundle_certificates_v1");
914 self.handle_certificates(
915 certificates,
916 request.include_events,
917 request.include_input_objects,
918 request.include_output_objects,
919 request.include_auxiliary_data,
920 &epoch_store,
921 request.wait_for_effects,
922 )
923 .instrument(span)
924 .await
925 .map(|(resp, spam_weight)| {
926 (
927 tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
928 responses: resp.unwrap_or_default(),
929 }),
930 spam_weight,
931 )
932 })
933 }
934
935 async fn object_info_impl(
936 &self,
937 request: tonic::Request<ObjectInfoRequest>,
938 ) -> WrappedServiceResponse<ObjectInfoResponse> {
939 let request = request.into_inner();
940 let response = self.state.handle_object_info_request(request).await?;
941 Ok((tonic::Response::new(response), Weight::one()))
942 }
943
944 async fn transaction_info_impl(
945 &self,
946 request: tonic::Request<TransactionInfoRequest>,
947 ) -> WrappedServiceResponse<TransactionInfoResponse> {
948 let request = request.into_inner();
949 let response = self.state.handle_transaction_info_request(request).await?;
950 Ok((tonic::Response::new(response), Weight::one()))
951 }
952
953 async fn checkpoint_impl(
954 &self,
955 request: tonic::Request<CheckpointRequest>,
956 ) -> WrappedServiceResponse<CheckpointResponse> {
957 let request = request.into_inner();
958 let response = self.state.handle_checkpoint_request(&request)?;
959 Ok((tonic::Response::new(response), Weight::one()))
960 }
961
962 async fn get_system_state_object_impl(
963 &self,
964 _request: tonic::Request<SystemStateRequest>,
965 ) -> WrappedServiceResponse<IotaSystemState> {
966 let response = self
967 .state
968 .get_object_cache_reader()
969 .try_get_iota_system_state_object_unsafe()?;
970 Ok((tonic::Response::new(response), Weight::one()))
971 }
972
973 fn get_client_ip_addr<T>(
974 &self,
975 request: &tonic::Request<T>,
976 source: &ClientIdSource,
977 ) -> Option<IpAddr> {
978 match source {
979 ClientIdSource::SocketAddr => {
980 let socket_addr: Option<SocketAddr> = request.remote_addr();
981
982 if let Some(socket_addr) = socket_addr {
988 Some(socket_addr.ip())
989 } else {
990 if cfg!(msim) {
991 } else if cfg!(test) {
993 panic!("Failed to get remote address from request");
994 } else {
995 self.metrics.connection_ip_not_found.inc();
996 error!("Failed to get remote address from request");
997 }
998 None
999 }
1000 }
1001 ClientIdSource::XForwardedFor(num_hops) => {
1002 let do_header_parse = |op: &MetadataValue<Ascii>| {
1003 match op.to_str() {
1004 Ok(header_val) => {
1005 let header_contents =
1006 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1007 if *num_hops == 0 {
1008 error!(
1009 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1010 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1011 to this node. Skipping traffic controller request handling.",
1012 header_contents,
1013 );
1014 return None;
1015 }
1016 let contents_len = header_contents.len();
1017 if contents_len < *num_hops {
1018 error!(
1019 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1020 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1021 `client-id-source` in the node config.",
1022 header_contents, contents_len, num_hops, contents_len,
1023 );
1024 self.metrics.client_id_source_config_mismatch.inc();
1025 return None;
1026 }
1027 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1028 else {
1029 error!(
1030 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1031 Expected at least {} values. Skipping traffic controller request handling.",
1032 header_contents, contents_len, num_hops, contents_len,
1033 );
1034 return None;
1035 };
1036 parse_ip(client_ip).or_else(|| {
1037 self.metrics.forwarded_header_parse_error.inc();
1038 None
1039 })
1040 }
1041 Err(e) => {
1042 self.metrics.forwarded_header_invalid.inc();
1046 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1047 None
1048 }
1049 }
1050 };
1051 if let Some(op) = request.metadata().get("x-forwarded-for") {
1052 do_header_parse(op)
1053 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1054 do_header_parse(op)
1055 } else {
1056 self.metrics.forwarded_header_not_included.inc();
1057 error!(
1058 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1059 );
1060 None
1061 }
1062 }
1063 }
1064 }
1065
1066 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1067 if let Some(traffic_controller) = &self.traffic_controller {
1068 if !traffic_controller.check(&client, &None).await {
1069 Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1071 } else {
1072 Ok(())
1073 }
1074 } else {
1075 Ok(())
1076 }
1077 }
1078
1079 fn handle_traffic_resp<T>(
1080 &self,
1081 client: Option<IpAddr>,
1082 wrapped_response: WrappedServiceResponse<T>,
1083 ) -> Result<tonic::Response<T>, tonic::Status> {
1084 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1085 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1086 Err(status) => (
1087 Some(IotaError::from(status.clone())),
1088 Weight::zero(),
1089 Err(status.clone()),
1090 ),
1091 };
1092
1093 if let Some(traffic_controller) = self.traffic_controller.clone() {
1094 traffic_controller.tally(TrafficTally {
1095 direct: client,
1096 through_fullnode: None,
1097 error_info: error.map(|e| {
1098 let error_type = String::from(e.clone().as_ref());
1099 let error_weight = normalize(e);
1100 (error_weight, error_type)
1101 }),
1102 spam_weight,
1103 timestamp: SystemTime::now(),
1104 })
1105 }
1106 unwrapped_response
1107 }
1108}
1109
1110fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1111 let mut request = tonic::Request::new(message);
1114 let tcp_connect_info = TcpConnectInfo {
1115 local_addr: None,
1116 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1117 };
1118 request.extensions_mut().insert(tcp_connect_info);
1119 request
1120}
1121
1122fn normalize(err: IotaError) -> Weight {
1124 match err {
1125 IotaError::UserInput {
1126 error: UserInputError::IncorrectUserSignature { .. },
1127 } => Weight::one(),
1128 IotaError::InvalidSignature { .. }
1129 | IotaError::SignerSignatureAbsent { .. }
1130 | IotaError::SignerSignatureNumberMismatch { .. }
1131 | IotaError::IncorrectSigner { .. }
1132 | IotaError::UnknownSigner { .. }
1133 | IotaError::WrongEpoch { .. } => Weight::one(),
1134 _ => Weight::zero(),
1135 }
1136}
1137
1138#[macro_export]
1142macro_rules! handle_with_decoration {
1143 ($self:ident, $func_name:ident, $request:ident) => {{
1144 if $self.client_id_source.is_none() {
1145 return $self.$func_name($request).await.map(|(result, _)| result);
1146 }
1147
1148 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1149
1150 $self.handle_traffic_req(client.clone()).await?;
1152
1153 let wrapped_response = $self.$func_name($request).await;
1155 $self.handle_traffic_resp(client, wrapped_response)
1156 }};
1157}
1158
1159#[async_trait]
1160impl Validator for ValidatorService {
1161 async fn transaction(
1163 &self,
1164 request: tonic::Request<Transaction>,
1165 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1166 let validator_service = self.clone();
1167
1168 spawn_monitored_task!(async move {
1172 handle_with_decoration!(validator_service, transaction_impl, request)
1176 })
1177 .await
1178 .unwrap()
1179 }
1180
1181 async fn submit_certificate(
1183 &self,
1184 request: tonic::Request<CertifiedTransaction>,
1185 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1186 let validator_service = self.clone();
1187
1188 spawn_monitored_task!(async move {
1192 handle_with_decoration!(validator_service, submit_certificate_impl, request)
1196 })
1197 .await
1198 .unwrap()
1199 }
1200
1201 async fn handle_certificate_v1(
1202 &self,
1203 request: tonic::Request<HandleCertificateRequestV1>,
1204 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1205 handle_with_decoration!(self, handle_certificate_v1_impl, request)
1206 }
1207
1208 async fn handle_soft_bundle_certificates_v1(
1209 &self,
1210 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1211 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1212 handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1213 }
1214
1215 async fn object_info(
1217 &self,
1218 request: tonic::Request<ObjectInfoRequest>,
1219 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1220 handle_with_decoration!(self, object_info_impl, request)
1221 }
1222
1223 async fn transaction_info(
1225 &self,
1226 request: tonic::Request<TransactionInfoRequest>,
1227 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1228 handle_with_decoration!(self, transaction_info_impl, request)
1229 }
1230
1231 async fn checkpoint(
1233 &self,
1234 request: tonic::Request<CheckpointRequest>,
1235 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1236 handle_with_decoration!(self, checkpoint_impl, request)
1237 }
1238
1239 async fn get_system_state_object(
1241 &self,
1242 request: tonic::Request<SystemStateRequest>,
1243 ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1244 handle_with_decoration!(self, get_system_state_object_impl, request)
1245 }
1246}