1use std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
16use iota_metrics::spawn_monitored_task;
17use iota_network::{
18 api::{Validator, ValidatorServer},
19 tonic,
20};
21use iota_types::{
22 effects::TransactionEffectsAPI,
23 error::*,
24 fp_ensure,
25 iota_system_state::IotaSystemState,
26 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
27 messages_consensus::ConsensusTransaction,
28 messages_grpc::{
29 HandleCertificateRequestV1, HandleCertificateResponseV1,
30 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
31 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
32 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
33 TransactionInfoResponse,
34 },
35 multiaddr::Multiaddr,
36 traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
37 transaction::*,
38};
39use nonempty::{NonEmpty, nonempty};
40use prometheus::{
41 Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
42 register_int_counter_vec_with_registry, register_int_counter_with_registry,
43};
44use tap::TapFallible;
45use tokio::task::JoinHandle;
46use tonic::{
47 metadata::{Ascii, MetadataValue},
48 transport::server::TcpConnectInfo,
49};
50use tracing::{Instrument, error, error_span, info};
51
52use crate::{
53 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
54 consensus_adapter::{
55 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
56 },
57 mysticeti_adapter::LazyMysticetiClient,
58 traffic_controller::{
59 TrafficController, metrics::TrafficControllerMetrics, policies::TrafficTally,
60 },
61};
62
63#[cfg(test)]
64#[path = "unit_tests/server_tests.rs"]
65mod server_tests;
66
67pub struct AuthorityServerHandle {
69 tx_cancellation: tokio::sync::oneshot::Sender<()>,
70 local_addr: Multiaddr,
71 handle: JoinHandle<Result<(), tonic::transport::Error>>,
72}
73
74impl AuthorityServerHandle {
75 pub async fn join(self) -> Result<(), io::Error> {
77 self.handle.await?.map_err(io::Error::other)?;
79 Ok(())
80 }
81
82 pub async fn kill(self) -> Result<(), io::Error> {
84 self.tx_cancellation
85 .send(())
86 .map_err(|_e| io::Error::other("could not send cancellation signal!"))?;
87 self.handle.await?.map_err(io::Error::other)?;
88 Ok(())
89 }
90
91 pub fn address(&self) -> &Multiaddr {
93 &self.local_addr
94 }
95}
96
97pub struct AuthorityServer {
99 address: Multiaddr,
100 pub state: Arc<AuthorityState>,
101 consensus_adapter: Arc<ConsensusAdapter>,
102 pub metrics: Arc<ValidatorServiceMetrics>,
103}
104
105impl AuthorityServer {
106 pub fn new_for_test_with_consensus_adapter(
108 state: Arc<AuthorityState>,
109 consensus_adapter: Arc<ConsensusAdapter>,
110 ) -> Self {
111 let address = new_local_tcp_address_for_testing();
112 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
113
114 Self {
115 address,
116 state,
117 consensus_adapter,
118 metrics,
119 }
120 }
121
122 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
124 let consensus_adapter = Arc::new(ConsensusAdapter::new(
125 Arc::new(LazyMysticetiClient::new()),
126 state.name,
127 Arc::new(ConnectionMonitorStatusForTests {}),
128 100_000,
129 100_000,
130 None,
131 None,
132 ConsensusAdapterMetrics::new_test(),
133 ));
134 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
135 }
136
137 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
139 let address = self.address.clone();
140 self.spawn_with_bind_address_for_test(address).await
141 }
142
143 pub async fn spawn_with_bind_address_for_test(
145 self,
146 address: Multiaddr,
147 ) -> Result<AuthorityServerHandle, io::Error> {
148 let mut server = iota_network_stack::config::Config::new()
149 .server_builder()
150 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
151 self.state,
152 self.consensus_adapter,
153 self.metrics,
154 )))
155 .bind(&address)
156 .await
157 .unwrap();
158 let local_addr = server.local_addr().to_owned();
159 info!("Listening to traffic on {local_addr}");
160 let handle = AuthorityServerHandle {
161 tx_cancellation: server.take_cancel_handle().unwrap(),
162 local_addr,
163 handle: spawn_monitored_task!(server.serve()),
164 };
165 Ok(handle)
166 }
167}
168
169pub struct ValidatorServiceMetrics {
171 pub signature_errors: IntCounter,
172 pub tx_verification_latency: Histogram,
173 pub cert_verification_latency: Histogram,
174 pub consensus_latency: Histogram,
175 pub handle_transaction_latency: Histogram,
176 pub submit_certificate_consensus_latency: Histogram,
177 pub handle_certificate_consensus_latency: Histogram,
178 pub handle_certificate_non_consensus_latency: Histogram,
179 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
180 pub handle_soft_bundle_certificates_count: Histogram,
181 pub handle_soft_bundle_certificates_size_bytes: Histogram,
182
183 num_rejected_tx_in_epoch_boundary: IntCounter,
184 num_rejected_cert_in_epoch_boundary: IntCounter,
185 num_rejected_tx_during_overload: IntCounterVec,
186 num_rejected_cert_during_overload: IntCounterVec,
187 connection_ip_not_found: IntCounter,
188 forwarded_header_parse_error: IntCounter,
189 forwarded_header_invalid: IntCounter,
190 forwarded_header_not_included: IntCounter,
191}
192
193impl ValidatorServiceMetrics {
194 pub fn new(registry: &Registry) -> Self {
196 Self {
197 signature_errors: register_int_counter_with_registry!(
198 "total_signature_errors",
199 "Number of transaction signature errors",
200 registry,
201 )
202 .unwrap(),
203 tx_verification_latency: register_histogram_with_registry!(
204 "validator_service_tx_verification_latency",
205 "Latency of verifying a transaction",
206 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
207 registry,
208 )
209 .unwrap(),
210 cert_verification_latency: register_histogram_with_registry!(
211 "validator_service_cert_verification_latency",
212 "Latency of verifying a certificate",
213 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
214 registry,
215 )
216 .unwrap(),
217 consensus_latency: register_histogram_with_registry!(
218 "validator_service_consensus_latency",
219 "Time spent between submitting a shared obj txn to consensus and getting result",
220 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
221 registry,
222 )
223 .unwrap(),
224 handle_transaction_latency: register_histogram_with_registry!(
225 "validator_service_handle_transaction_latency",
226 "Latency of handling a transaction",
227 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
228 registry,
229 )
230 .unwrap(),
231 handle_certificate_consensus_latency: register_histogram_with_registry!(
232 "validator_service_handle_certificate_consensus_latency",
233 "Latency of handling a consensus transaction certificate",
234 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
235 registry,
236 )
237 .unwrap(),
238 submit_certificate_consensus_latency: register_histogram_with_registry!(
239 "validator_service_submit_certificate_consensus_latency",
240 "Latency of submit_certificate RPC handler",
241 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
242 registry,
243 )
244 .unwrap(),
245 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
246 "validator_service_handle_certificate_non_consensus_latency",
247 "Latency of handling a non-consensus transaction certificate",
248 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249 registry,
250 )
251 .unwrap(),
252 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
253 "validator_service_handle_soft_bundle_certificates_consensus_latency",
254 "Latency of handling a consensus soft bundle",
255 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
256 registry,
257 )
258 .unwrap(),
259 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
260 "validator_service_handle_soft_bundle_certificates_count",
261 "The number of certificates included in a soft bundle",
262 iota_metrics::COUNT_BUCKETS.to_vec(),
263 registry,
264 )
265 .unwrap(),
266 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
267 "validator_service_handle_soft_bundle_certificates_size_bytes",
268 "The size of soft bundle in bytes",
269 iota_metrics::BYTES_BUCKETS.to_vec(),
270 registry,
271 )
272 .unwrap(),
273 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
274 "validator_service_num_rejected_tx_in_epoch_boundary",
275 "Number of rejected transaction during epoch transitioning",
276 registry,
277 )
278 .unwrap(),
279 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
280 "validator_service_num_rejected_cert_in_epoch_boundary",
281 "Number of rejected transaction certificate during epoch transitioning",
282 registry,
283 )
284 .unwrap(),
285 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
286 "validator_service_num_rejected_tx_during_overload",
287 "Number of rejected transaction due to system overload",
288 &["error_type"],
289 registry,
290 )
291 .unwrap(),
292 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
293 "validator_service_num_rejected_cert_during_overload",
294 "Number of rejected transaction certificate due to system overload",
295 &["error_type"],
296 registry,
297 )
298 .unwrap(),
299 connection_ip_not_found: register_int_counter_with_registry!(
300 "validator_service_connection_ip_not_found",
301 "Number of times connection IP was not extractable from request",
302 registry,
303 )
304 .unwrap(),
305 forwarded_header_parse_error: register_int_counter_with_registry!(
306 "validator_service_forwarded_header_parse_error",
307 "Number of times x-forwarded-for header could not be parsed",
308 registry,
309 )
310 .unwrap(),
311 forwarded_header_invalid: register_int_counter_with_registry!(
312 "validator_service_forwarded_header_invalid",
313 "Number of times x-forwarded-for header was invalid",
314 registry,
315 )
316 .unwrap(),
317 forwarded_header_not_included: register_int_counter_with_registry!(
318 "validator_service_forwarded_header_not_included",
319 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
320 registry,
321 )
322 .unwrap(),
323 }
324 }
325
326 pub fn new_for_tests() -> Self {
328 let registry = Registry::new();
329 Self::new(®istry)
330 }
331}
332
333#[derive(Clone)]
335pub struct ValidatorService {
336 state: Arc<AuthorityState>,
337 consensus_adapter: Arc<ConsensusAdapter>,
338 metrics: Arc<ValidatorServiceMetrics>,
339 traffic_controller: Option<Arc<TrafficController>>,
340 client_id_source: Option<ClientIdSource>,
341}
342
343impl ValidatorService {
344 pub fn new(
346 state: Arc<AuthorityState>,
347 consensus_adapter: Arc<ConsensusAdapter>,
348 validator_metrics: Arc<ValidatorServiceMetrics>,
349 traffic_controller_metrics: TrafficControllerMetrics,
350 policy_config: Option<PolicyConfig>,
351 firewall_config: Option<RemoteFirewallConfig>,
352 ) -> Self {
353 Self {
354 state,
355 consensus_adapter,
356 metrics: validator_metrics,
357 traffic_controller: policy_config.clone().map(|policy| {
358 Arc::new(TrafficController::spawn(
359 policy,
360 traffic_controller_metrics,
361 firewall_config,
362 ))
363 }),
364 client_id_source: policy_config.map(|policy| policy.client_id_source),
365 }
366 }
367
368 pub fn new_for_tests(
369 state: Arc<AuthorityState>,
370 consensus_adapter: Arc<ConsensusAdapter>,
371 metrics: Arc<ValidatorServiceMetrics>,
372 ) -> Self {
373 Self {
374 state,
375 consensus_adapter,
376 metrics,
377 traffic_controller: None,
378 client_id_source: None,
379 }
380 }
381
382 pub fn validator_state(&self) -> &Arc<AuthorityState> {
384 &self.state
385 }
386
387 pub async fn execute_certificate_for_testing(
389 &self,
390 cert: CertifiedTransaction,
391 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
392 let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
393 self.handle_certificate_v1(request).await
394 }
395
396 pub async fn handle_transaction_for_benchmarking(
398 &self,
399 transaction: Transaction,
400 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
401 let request = make_tonic_request_for_testing(transaction);
402 self.transaction(request).await
403 }
404
405 async fn handle_transaction(
407 &self,
408 request: tonic::Request<Transaction>,
409 ) -> WrappedServiceResponse<HandleTransactionResponse> {
410 let Self {
411 state,
412 consensus_adapter,
413 metrics,
414 traffic_controller: _,
415 client_id_source: _,
416 } = self.clone();
417 let transaction = request.into_inner();
418 let epoch_store = state.load_epoch_store_one_call_per_task();
419
420 transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
421
422 let mut validator_pushback_error = None;
431 let overload_check_res = state.check_system_overload(
432 &consensus_adapter,
433 transaction.data(),
434 state.check_system_overload_at_signing(),
435 );
436 if let Err(error) = overload_check_res {
437 metrics
438 .num_rejected_tx_during_overload
439 .with_label_values(&[error.as_ref()])
440 .inc();
441 match error {
443 IotaError::ValidatorOverloadedRetryAfter { .. } => {
444 validator_pushback_error = Some(error)
445 }
446 _ => return Err(error.into()),
447 }
448 }
449
450 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
451
452 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
453 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
454 metrics.signature_errors.inc();
455 })?;
456 drop(tx_verif_metrics_guard);
457
458 let tx_digest = transaction.digest();
459
460 let span = error_span!("validator_state_process_tx", ?tx_digest);
462
463 let info = state
464 .handle_transaction(&epoch_store, transaction.clone())
465 .instrument(span)
466 .await
467 .tap_err(|e| {
468 if let IotaError::ValidatorHaltedAtEpochEnd = e {
469 metrics.num_rejected_tx_in_epoch_boundary.inc();
470 }
471 })?;
472
473 if let Some(error) = validator_pushback_error {
474 return Err(error.into());
477 }
478
479 Ok((tonic::Response::new(info), Weight::zero()))
480 }
481
482 async fn handle_certificates(
488 &self,
489 certificates: NonEmpty<CertifiedTransaction>,
490 include_events: bool,
491 include_input_objects: bool,
492 include_output_objects: bool,
493 _include_auxiliary_data: bool,
494 epoch_store: &Arc<AuthorityPerEpochStore>,
495 wait_for_effects: bool,
496 ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
497 fp_ensure!(
500 !self.state.is_fullnode(epoch_store),
501 IotaError::FullNodeCantHandleCertificate.into()
502 );
503
504 let shared_object_tx = certificates
505 .iter()
506 .any(|cert| cert.contains_shared_object());
507
508 let metrics = if certificates.len() == 1 {
509 if wait_for_effects {
510 if shared_object_tx {
511 &self.metrics.handle_certificate_consensus_latency
512 } else {
513 &self.metrics.handle_certificate_non_consensus_latency
514 }
515 } else {
516 &self.metrics.submit_certificate_consensus_latency
517 }
518 } else {
519 &self
522 .metrics
523 .handle_soft_bundle_certificates_consensus_latency
524 };
525
526 let _metrics_guard = metrics.start_timer();
527
528 if certificates.len() == 1 {
533 let tx_digest = *certificates[0].digest();
534
535 if let Some(signed_effects) = self
536 .state
537 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
538 {
539 let events = if include_events {
540 if let Some(digest) = signed_effects.events_digest() {
541 Some(self.state.get_transaction_events(digest)?)
542 } else {
543 None
544 }
545 } else {
546 None
547 };
548
549 return Ok((
550 Some(vec![HandleCertificateResponseV1 {
551 signed_effects: signed_effects.into_inner(),
552 events,
553 input_objects: None,
554 output_objects: None,
555 auxiliary_data: None,
556 }]),
557 Weight::one(),
558 ));
559 };
560 }
561
562 for certificate in &certificates {
565 let overload_check_res = self.state.check_system_overload(
566 &self.consensus_adapter,
567 certificate.data(),
568 self.state.check_system_overload_at_execution(),
569 );
570 if let Err(error) = overload_check_res {
571 self.metrics
572 .num_rejected_cert_during_overload
573 .with_label_values(&[error.as_ref()])
574 .inc();
575 return Err(error.into());
576 }
577 }
578
579 let verified_certificates = {
580 let _timer = self.metrics.cert_verification_latency.start_timer();
581 epoch_store
582 .signature_verifier
583 .multi_verify_certs(certificates.into())
584 .await
585 .into_iter()
586 .collect::<Result<Vec<_>, _>>()?
587 };
588
589 {
590 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
592 if !reconfiguration_lock.should_accept_user_certs() {
593 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
594 return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
595 }
596
597 if !epoch_store
603 .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
604 {
605 let _metrics_guard = if shared_object_tx {
606 Some(self.metrics.consensus_latency.start_timer())
607 } else {
608 None
609 };
610 let transactions = verified_certificates
611 .iter()
612 .map(|certificate| {
613 ConsensusTransaction::new_certificate_message(
614 &self.state.name,
615 certificate.clone().into(),
616 )
617 })
618 .collect::<Vec<_>>();
619 self.consensus_adapter.submit_batch(
620 &transactions,
621 Some(&reconfiguration_lock),
622 epoch_store,
623 )?;
624 }
628 }
629
630 if !wait_for_effects {
631 let certificates_without_shared_objects = verified_certificates
634 .iter()
635 .filter(|certificate| !certificate.contains_shared_object())
636 .cloned()
637 .collect::<Vec<_>>();
638 if !certificates_without_shared_objects.is_empty() {
639 self.state.enqueue_certificates_for_execution(
640 certificates_without_shared_objects,
641 epoch_store,
642 );
643 }
644 return Ok((None, Weight::zero()));
645 }
646
647 let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
651 |certificate| async move {
652 let effects = self
653 .state
654 .execute_certificate(&certificate, epoch_store)
655 .await?;
656 let events = if include_events {
657 if let Some(digest) = effects.events_digest() {
658 Some(self.state.get_transaction_events(digest)?)
659 } else {
660 None
661 }
662 } else {
663 None
664 };
665
666 let input_objects = include_input_objects
667 .then(|| self.state.get_transaction_input_objects(&effects))
668 .and_then(Result::ok);
669
670 let output_objects = include_output_objects
671 .then(|| self.state.get_transaction_output_objects(&effects))
672 .and_then(Result::ok);
673
674 let signed_effects = self.state.sign_effects(effects, epoch_store)?;
675 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
676
677 Ok::<_, IotaError>(HandleCertificateResponseV1 {
678 signed_effects: signed_effects.into_inner(),
679 events,
680 input_objects,
681 output_objects,
682 auxiliary_data: None, })
684 },
685 ))
686 .await?;
687
688 Ok((Some(responses), Weight::zero()))
689 }
690}
691
692type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
693
694impl ValidatorService {
695 async fn transaction_impl(
696 &self,
697 request: tonic::Request<Transaction>,
698 ) -> WrappedServiceResponse<HandleTransactionResponse> {
699 self.handle_transaction(request).await
700 }
701
702 async fn submit_certificate_impl(
703 &self,
704 request: tonic::Request<CertifiedTransaction>,
705 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
706 let epoch_store = self.state.load_epoch_store_one_call_per_task();
707 let certificate = request.into_inner();
708 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
709
710 let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
711 self.handle_certificates(
712 nonempty![certificate],
713 true,
714 false,
715 false,
716 false,
717 &epoch_store,
718 false,
719 )
720 .instrument(span)
721 .await
722 .map(|(executed, spam_weight)| {
723 (
724 tonic::Response::new(SubmitCertificateResponse {
725 executed: executed.map(|mut x| x.remove(0)),
726 }),
727 spam_weight,
728 )
729 })
730 }
731
732 async fn handle_certificate_v1_impl(
733 &self,
734 request: tonic::Request<HandleCertificateRequestV1>,
735 ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
736 let epoch_store = self.state.load_epoch_store_one_call_per_task();
737 let request = request.into_inner();
738 request
739 .certificate
740 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
741
742 let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
743 self.handle_certificates(
744 nonempty![request.certificate],
745 request.include_events,
746 request.include_input_objects,
747 request.include_output_objects,
748 request.include_auxiliary_data,
749 &epoch_store,
750 true,
751 )
752 .instrument(span)
753 .await
754 .map(|(resp, spam_weight)| {
755 (
756 tonic::Response::new(
757 resp.expect(
758 "handle_certificate should not return none with wait_for_effects=true",
759 )
760 .remove(0),
761 ),
762 spam_weight,
763 )
764 })
765 }
766
767 async fn soft_bundle_validity_check(
768 &self,
769 certificates: &NonEmpty<CertifiedTransaction>,
770 epoch_store: &Arc<AuthorityPerEpochStore>,
771 total_size_bytes: u64,
772 ) -> Result<(), tonic::Status> {
773 let protocol_config = epoch_store.protocol_config();
774
775 fp_ensure!(
782 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
783 IotaError::UserInput {
784 error: UserInputError::TooManyTransactionsInSoftBundle {
785 limit: protocol_config.max_soft_bundle_size()
786 }
787 }
788 .into()
789 );
790
791 let soft_bundle_max_size_bytes =
797 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
798 fp_ensure!(
799 total_size_bytes <= soft_bundle_max_size_bytes,
800 IotaError::UserInput {
801 error: UserInputError::SoftBundleTooLarge {
802 size: total_size_bytes,
803 limit: soft_bundle_max_size_bytes,
804 },
805 }
806 .into()
807 );
808
809 let mut gas_price = None;
810 for certificate in certificates {
811 let tx_digest = *certificate.digest();
812 fp_ensure!(
813 certificate.contains_shared_object(),
814 IotaError::UserInput {
815 error: UserInputError::NoSharedObject { digest: tx_digest }
816 }
817 .into()
818 );
819 fp_ensure!(
820 !self.state.is_tx_already_executed(&tx_digest)?,
821 IotaError::UserInput {
822 error: UserInputError::AlreadyExecuted { digest: tx_digest }
823 }
824 .into()
825 );
826 if let Some(gas) = gas_price {
827 fp_ensure!(
828 gas == certificate.gas_price(),
829 IotaError::UserInput {
830 error: UserInputError::GasPriceMismatch {
831 digest: tx_digest,
832 expected: gas,
833 actual: certificate.gas_price()
834 }
835 }
836 .into()
837 );
838 } else {
839 gas_price = Some(certificate.gas_price());
840 }
841 }
842
843 fp_ensure!(
849 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
850 IotaError::UserInput {
851 error: UserInputError::CertificateAlreadyProcessed
852 }
853 .into()
854 );
855
856 Ok(())
857 }
858
859 async fn handle_soft_bundle_certificates_v1_impl(
860 &self,
861 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
862 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
863 let epoch_store = self.state.load_epoch_store_one_call_per_task();
864 let client_addr = if self.client_id_source.is_none() {
865 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
866 } else {
867 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
868 };
869 let request = request.into_inner();
870
871 let certificates =
872 NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
873 let mut total_size_bytes = 0;
874 for certificate in &certificates {
875 total_size_bytes += certificate
877 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
878 as u64;
879 }
880
881 self.metrics
882 .handle_soft_bundle_certificates_count
883 .observe(certificates.len() as f64);
884
885 self.metrics
886 .handle_soft_bundle_certificates_size_bytes
887 .observe(total_size_bytes as f64);
888
889 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
891 .await?;
892
893 info!(
894 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
895 certificates.len(),
896 client_addr
897 .map(|x| x.to_string())
898 .unwrap_or_else(|| "unknown".to_string()),
899 certificates
900 .iter()
901 .map(|x| x.digest().to_string())
902 .collect::<Vec<_>>()
903 .join(", "),
904 total_size_bytes
905 );
906
907 let span = error_span!("handle_soft_bundle_certificates_v1");
908 self.handle_certificates(
909 certificates,
910 request.include_events,
911 request.include_input_objects,
912 request.include_output_objects,
913 request.include_auxiliary_data,
914 &epoch_store,
915 request.wait_for_effects,
916 )
917 .instrument(span)
918 .await
919 .map(|(resp, spam_weight)| {
920 (
921 tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
922 responses: resp.unwrap_or_default(),
923 }),
924 spam_weight,
925 )
926 })
927 }
928
929 async fn object_info_impl(
930 &self,
931 request: tonic::Request<ObjectInfoRequest>,
932 ) -> WrappedServiceResponse<ObjectInfoResponse> {
933 let request = request.into_inner();
934 let response = self.state.handle_object_info_request(request).await?;
935 Ok((tonic::Response::new(response), Weight::one()))
936 }
937
938 async fn transaction_info_impl(
939 &self,
940 request: tonic::Request<TransactionInfoRequest>,
941 ) -> WrappedServiceResponse<TransactionInfoResponse> {
942 let request = request.into_inner();
943 let response = self.state.handle_transaction_info_request(request).await?;
944 Ok((tonic::Response::new(response), Weight::one()))
945 }
946
947 async fn checkpoint_impl(
948 &self,
949 request: tonic::Request<CheckpointRequest>,
950 ) -> WrappedServiceResponse<CheckpointResponse> {
951 let request = request.into_inner();
952 let response = self.state.handle_checkpoint_request(&request)?;
953 Ok((tonic::Response::new(response), Weight::one()))
954 }
955
956 async fn get_system_state_object_impl(
957 &self,
958 _request: tonic::Request<SystemStateRequest>,
959 ) -> WrappedServiceResponse<IotaSystemState> {
960 let response = self
961 .state
962 .get_object_cache_reader()
963 .get_iota_system_state_object_unsafe()?;
964 Ok((tonic::Response::new(response), Weight::one()))
965 }
966
967 fn get_client_ip_addr<T>(
968 &self,
969 request: &tonic::Request<T>,
970 source: &ClientIdSource,
971 ) -> Option<IpAddr> {
972 match source {
973 ClientIdSource::SocketAddr => {
974 let socket_addr: Option<SocketAddr> = request.remote_addr();
975
976 if let Some(socket_addr) = socket_addr {
982 Some(socket_addr.ip())
983 } else {
984 if cfg!(msim) {
985 } else if cfg!(test) {
987 panic!("Failed to get remote address from request");
988 } else {
989 self.metrics.connection_ip_not_found.inc();
990 error!("Failed to get remote address from request");
991 }
992 None
993 }
994 }
995 ClientIdSource::XForwardedFor(num_hops) => {
996 let do_header_parse = |op: &MetadataValue<Ascii>| {
997 match op.to_str() {
998 Ok(header_val) => {
999 let header_contents =
1000 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1001 if *num_hops == 0 {
1002 error!(
1003 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1004 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1005 to this node. Skipping traffic controller request handling.",
1006 header_contents,
1007 );
1008 return None;
1009 }
1010 let contents_len = header_contents.len();
1011 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1012 else {
1013 error!(
1014 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1015 Expected at least {} values. Skipping traffic controller request handling.",
1016 header_contents, contents_len, num_hops, contents_len,
1017 );
1018 return None;
1019 };
1020 client_ip.parse::<IpAddr>().ok().or_else(|| {
1021 client_ip.parse::<SocketAddr>().ok().map(|socket_addr| socket_addr.ip()).or_else(|| {
1022 self.metrics.forwarded_header_parse_error.inc();
1023 error!(
1024 "Failed to parse x-forwarded-for header value of {:?} to ip address or socket. \
1025 Please ensure that your proxy is configured to resolve client domains to an \
1026 IP address before writing header",
1027 client_ip,
1028 );
1029 None
1030 })
1031 })
1032 }
1033 Err(e) => {
1034 self.metrics.forwarded_header_invalid.inc();
1038 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1039 None
1040 }
1041 }
1042 };
1043 if let Some(op) = request.metadata().get("x-forwarded-for") {
1044 do_header_parse(op)
1045 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1046 do_header_parse(op)
1047 } else {
1048 self.metrics.forwarded_header_not_included.inc();
1049 error!(
1050 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1051 );
1052 None
1053 }
1054 }
1055 }
1056 }
1057
1058 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1059 if let Some(traffic_controller) = &self.traffic_controller {
1060 if !traffic_controller.check(&client, &None).await {
1061 Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1063 } else {
1064 Ok(())
1065 }
1066 } else {
1067 Ok(())
1068 }
1069 }
1070
1071 fn handle_traffic_resp<T>(
1072 &self,
1073 client: Option<IpAddr>,
1074 wrapped_response: WrappedServiceResponse<T>,
1075 ) -> Result<tonic::Response<T>, tonic::Status> {
1076 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1077 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1078 Err(status) => (
1079 Some(IotaError::from(status.clone())),
1080 Weight::zero(),
1081 Err(status.clone()),
1082 ),
1083 };
1084
1085 if let Some(traffic_controller) = self.traffic_controller.clone() {
1086 traffic_controller.tally(TrafficTally {
1087 direct: client,
1088 through_fullnode: None,
1089 error_weight: error.map(normalize).unwrap_or(Weight::zero()),
1090 spam_weight,
1091 timestamp: SystemTime::now(),
1092 })
1093 }
1094 unwrapped_response
1095 }
1096}
1097
1098fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1099 let mut request = tonic::Request::new(message);
1102 let tcp_connect_info = TcpConnectInfo {
1103 local_addr: None,
1104 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1105 };
1106 request.extensions_mut().insert(tcp_connect_info);
1107 request
1108}
1109
1110fn normalize(err: IotaError) -> Weight {
1112 match err {
1113 IotaError::UserInput { .. }
1114 | IotaError::InvalidSignature { .. }
1115 | IotaError::SignerSignatureAbsent { .. }
1116 | IotaError::SignerSignatureNumberMismatch { .. }
1117 | IotaError::IncorrectSigner { .. }
1118 | IotaError::UnknownSigner { .. }
1119 | IotaError::WrongEpoch { .. } => Weight::one(),
1120 _ => Weight::zero(),
1121 }
1122}
1123
1124#[macro_export]
1128macro_rules! handle_with_decoration {
1129 ($self:ident, $func_name:ident, $request:ident) => {{
1130 if $self.client_id_source.is_none() {
1131 return $self.$func_name($request).await.map(|(result, _)| result);
1132 }
1133
1134 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1135
1136 $self.handle_traffic_req(client.clone()).await?;
1138
1139 let wrapped_response = $self.$func_name($request).await;
1141 $self.handle_traffic_resp(client, wrapped_response)
1142 }};
1143}
1144
1145#[async_trait]
1146impl Validator for ValidatorService {
1147 async fn transaction(
1149 &self,
1150 request: tonic::Request<Transaction>,
1151 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1152 let validator_service = self.clone();
1153
1154 spawn_monitored_task!(async move {
1158 handle_with_decoration!(validator_service, transaction_impl, request)
1162 })
1163 .await
1164 .unwrap()
1165 }
1166
1167 async fn submit_certificate(
1169 &self,
1170 request: tonic::Request<CertifiedTransaction>,
1171 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1172 let validator_service = self.clone();
1173
1174 spawn_monitored_task!(async move {
1178 handle_with_decoration!(validator_service, submit_certificate_impl, request)
1182 })
1183 .await
1184 .unwrap()
1185 }
1186
1187 async fn handle_certificate_v1(
1188 &self,
1189 request: tonic::Request<HandleCertificateRequestV1>,
1190 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1191 handle_with_decoration!(self, handle_certificate_v1_impl, request)
1192 }
1193
1194 async fn handle_soft_bundle_certificates_v1(
1195 &self,
1196 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1197 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1198 handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1199 }
1200
1201 async fn object_info(
1203 &self,
1204 request: tonic::Request<ObjectInfoRequest>,
1205 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1206 handle_with_decoration!(self, object_info_impl, request)
1207 }
1208
1209 async fn transaction_info(
1211 &self,
1212 request: tonic::Request<TransactionInfoRequest>,
1213 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1214 handle_with_decoration!(self, transaction_info_impl, request)
1215 }
1216
1217 async fn checkpoint(
1219 &self,
1220 request: tonic::Request<CheckpointRequest>,
1221 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1222 handle_with_decoration!(self, checkpoint_impl, request)
1223 }
1224
1225 async fn get_system_state_object(
1227 &self,
1228 request: tonic::Request<SystemStateRequest>,
1229 ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1230 handle_with_decoration!(self, get_system_state_object_impl, request)
1231 }
1232}