1use std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19 api::{Validator, ValidatorServer},
20 tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24 effects::TransactionEffectsAPI,
25 error::*,
26 fp_ensure,
27 iota_system_state::IotaSystemState,
28 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29 messages_consensus::ConsensusTransaction,
30 messages_grpc::{
31 HandleCertificateRequestV1, HandleCertificateResponseV1,
32 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
33 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
34 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
35 TransactionInfoResponse,
36 },
37 multiaddr::Multiaddr,
38 traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
39 transaction::*,
40};
41use nonempty::{NonEmpty, nonempty};
42use prometheus::{
43 Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
44 register_int_counter_vec_with_registry, register_int_counter_with_registry,
45};
46use tap::TapFallible;
47use tokio::task::JoinHandle;
48use tonic::{
49 metadata::{Ascii, MetadataValue},
50 transport::server::TcpConnectInfo,
51};
52use tracing::{Instrument, error, error_span, info};
53
54use crate::{
55 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
56 consensus_adapter::{
57 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
58 },
59 mysticeti_adapter::LazyMysticetiClient,
60 traffic_controller::{
61 TrafficController, metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally,
62 },
63};
64
65#[cfg(test)]
66#[path = "unit_tests/server_tests.rs"]
67mod server_tests;
68
69pub struct AuthorityServerHandle {
71 tx_cancellation: tokio::sync::oneshot::Sender<()>,
72 local_addr: Multiaddr,
73 handle: JoinHandle<Result<(), tonic::transport::Error>>,
74}
75
76impl AuthorityServerHandle {
77 pub async fn join(self) -> Result<(), io::Error> {
79 self.handle.await?.map_err(io::Error::other)?;
81 Ok(())
82 }
83
84 pub async fn kill(self) -> Result<(), io::Error> {
86 self.tx_cancellation
87 .send(())
88 .map_err(|_e| io::Error::other("could not send cancellation signal!"))?;
89 self.handle.await?.map_err(io::Error::other)?;
90 Ok(())
91 }
92
93 pub fn address(&self) -> &Multiaddr {
95 &self.local_addr
96 }
97}
98
99pub struct AuthorityServer {
101 address: Multiaddr,
102 pub state: Arc<AuthorityState>,
103 consensus_adapter: Arc<ConsensusAdapter>,
104 pub metrics: Arc<ValidatorServiceMetrics>,
105}
106
107impl AuthorityServer {
108 pub fn new_for_test_with_consensus_adapter(
110 state: Arc<AuthorityState>,
111 consensus_adapter: Arc<ConsensusAdapter>,
112 ) -> Self {
113 let address = new_local_tcp_address_for_testing();
114 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
115
116 Self {
117 address,
118 state,
119 consensus_adapter,
120 metrics,
121 }
122 }
123
124 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
126 let consensus_adapter = Arc::new(ConsensusAdapter::new(
127 Arc::new(LazyMysticetiClient::new()),
128 state.name,
129 Arc::new(ConnectionMonitorStatusForTests {}),
130 100_000,
131 100_000,
132 None,
133 None,
134 ConsensusAdapterMetrics::new_test(),
135 ));
136 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
137 }
138
139 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
141 let address = self.address.clone();
142 self.spawn_with_bind_address_for_test(address).await
143 }
144
145 pub async fn spawn_with_bind_address_for_test(
147 self,
148 address: Multiaddr,
149 ) -> Result<AuthorityServerHandle, io::Error> {
150 let tls_config = iota_tls::create_rustls_server_config(
151 self.state.config.network_key_pair().copy().private(),
152 IOTA_TLS_SERVER_NAME.to_string(),
153 iota_tls::AllowAll,
154 );
155 let mut server = iota_network_stack::config::Config::new()
156 .server_builder()
157 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
158 self.state,
159 self.consensus_adapter,
160 self.metrics,
161 )))
162 .bind(&address, Some(tls_config))
163 .await
164 .unwrap();
165 let local_addr = server.local_addr().to_owned();
166 info!("Listening to traffic on {local_addr}");
167 let handle = AuthorityServerHandle {
168 tx_cancellation: server.take_cancel_handle().unwrap(),
169 local_addr,
170 handle: spawn_monitored_task!(server.serve()),
171 };
172 Ok(handle)
173 }
174}
175
176pub struct ValidatorServiceMetrics {
178 pub signature_errors: IntCounter,
179 pub tx_verification_latency: Histogram,
180 pub cert_verification_latency: Histogram,
181 pub consensus_latency: Histogram,
182 pub handle_transaction_latency: Histogram,
183 pub submit_certificate_consensus_latency: Histogram,
184 pub handle_certificate_consensus_latency: Histogram,
185 pub handle_certificate_non_consensus_latency: Histogram,
186 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
187 pub handle_soft_bundle_certificates_count: Histogram,
188 pub handle_soft_bundle_certificates_size_bytes: Histogram,
189
190 num_rejected_tx_in_epoch_boundary: IntCounter,
191 num_rejected_cert_in_epoch_boundary: IntCounter,
192 num_rejected_tx_during_overload: IntCounterVec,
193 num_rejected_cert_during_overload: IntCounterVec,
194 connection_ip_not_found: IntCounter,
195 forwarded_header_parse_error: IntCounter,
196 forwarded_header_invalid: IntCounter,
197 forwarded_header_not_included: IntCounter,
198 client_id_source_config_mismatch: IntCounter,
199}
200
201impl ValidatorServiceMetrics {
202 pub fn new(registry: &Registry) -> Self {
204 Self {
205 signature_errors: register_int_counter_with_registry!(
206 "total_signature_errors",
207 "Number of transaction signature errors",
208 registry,
209 )
210 .unwrap(),
211 tx_verification_latency: register_histogram_with_registry!(
212 "validator_service_tx_verification_latency",
213 "Latency of verifying a transaction",
214 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
215 registry,
216 )
217 .unwrap(),
218 cert_verification_latency: register_histogram_with_registry!(
219 "validator_service_cert_verification_latency",
220 "Latency of verifying a certificate",
221 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
222 registry,
223 )
224 .unwrap(),
225 consensus_latency: register_histogram_with_registry!(
226 "validator_service_consensus_latency",
227 "Time spent between submitting a shared obj txn to consensus and getting result",
228 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
229 registry,
230 )
231 .unwrap(),
232 handle_transaction_latency: register_histogram_with_registry!(
233 "validator_service_handle_transaction_latency",
234 "Latency of handling a transaction",
235 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
236 registry,
237 )
238 .unwrap(),
239 handle_certificate_consensus_latency: register_histogram_with_registry!(
240 "validator_service_handle_certificate_consensus_latency",
241 "Latency of handling a consensus transaction certificate",
242 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
243 registry,
244 )
245 .unwrap(),
246 submit_certificate_consensus_latency: register_histogram_with_registry!(
247 "validator_service_submit_certificate_consensus_latency",
248 "Latency of submit_certificate RPC handler",
249 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
250 registry,
251 )
252 .unwrap(),
253 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
254 "validator_service_handle_certificate_non_consensus_latency",
255 "Latency of handling a non-consensus transaction certificate",
256 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257 registry,
258 )
259 .unwrap(),
260 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
261 "validator_service_handle_soft_bundle_certificates_consensus_latency",
262 "Latency of handling a consensus soft bundle",
263 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
264 registry,
265 )
266 .unwrap(),
267 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
268 "validator_service_handle_soft_bundle_certificates_count",
269 "The number of certificates included in a soft bundle",
270 iota_metrics::COUNT_BUCKETS.to_vec(),
271 registry,
272 )
273 .unwrap(),
274 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
275 "validator_service_handle_soft_bundle_certificates_size_bytes",
276 "The size of soft bundle in bytes",
277 iota_metrics::BYTES_BUCKETS.to_vec(),
278 registry,
279 )
280 .unwrap(),
281 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
282 "validator_service_num_rejected_tx_in_epoch_boundary",
283 "Number of rejected transaction during epoch transitioning",
284 registry,
285 )
286 .unwrap(),
287 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
288 "validator_service_num_rejected_cert_in_epoch_boundary",
289 "Number of rejected transaction certificate during epoch transitioning",
290 registry,
291 )
292 .unwrap(),
293 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
294 "validator_service_num_rejected_tx_during_overload",
295 "Number of rejected transaction due to system overload",
296 &["error_type"],
297 registry,
298 )
299 .unwrap(),
300 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
301 "validator_service_num_rejected_cert_during_overload",
302 "Number of rejected transaction certificate due to system overload",
303 &["error_type"],
304 registry,
305 )
306 .unwrap(),
307 connection_ip_not_found: register_int_counter_with_registry!(
308 "validator_service_connection_ip_not_found",
309 "Number of times connection IP was not extractable from request",
310 registry,
311 )
312 .unwrap(),
313 forwarded_header_parse_error: register_int_counter_with_registry!(
314 "validator_service_forwarded_header_parse_error",
315 "Number of times x-forwarded-for header could not be parsed",
316 registry,
317 )
318 .unwrap(),
319 forwarded_header_invalid: register_int_counter_with_registry!(
320 "validator_service_forwarded_header_invalid",
321 "Number of times x-forwarded-for header was invalid",
322 registry,
323 )
324 .unwrap(),
325 forwarded_header_not_included: register_int_counter_with_registry!(
326 "validator_service_forwarded_header_not_included",
327 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
328 registry,
329 )
330 .unwrap(),
331 client_id_source_config_mismatch: register_int_counter_with_registry!(
332 "validator_service_client_id_source_config_mismatch",
333 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
334 registry,
335 )
336 .unwrap(),
337 }
338 }
339
340 pub fn new_for_tests() -> Self {
342 let registry = Registry::new();
343 Self::new(®istry)
344 }
345}
346
347#[derive(Clone)]
349pub struct ValidatorService {
350 state: Arc<AuthorityState>,
351 consensus_adapter: Arc<ConsensusAdapter>,
352 metrics: Arc<ValidatorServiceMetrics>,
353 traffic_controller: Option<Arc<TrafficController>>,
354 client_id_source: Option<ClientIdSource>,
355}
356
357impl ValidatorService {
358 pub fn new(
360 state: Arc<AuthorityState>,
361 consensus_adapter: Arc<ConsensusAdapter>,
362 validator_metrics: Arc<ValidatorServiceMetrics>,
363 traffic_controller_metrics: TrafficControllerMetrics,
364 policy_config: Option<PolicyConfig>,
365 firewall_config: Option<RemoteFirewallConfig>,
366 ) -> Self {
367 Self {
368 state,
369 consensus_adapter,
370 metrics: validator_metrics,
371 traffic_controller: policy_config.clone().map(|policy| {
372 Arc::new(TrafficController::init(
373 policy,
374 traffic_controller_metrics,
375 firewall_config,
376 ))
377 }),
378 client_id_source: policy_config.map(|policy| policy.client_id_source),
379 }
380 }
381
382 pub fn new_for_tests(
383 state: Arc<AuthorityState>,
384 consensus_adapter: Arc<ConsensusAdapter>,
385 metrics: Arc<ValidatorServiceMetrics>,
386 ) -> Self {
387 Self {
388 state,
389 consensus_adapter,
390 metrics,
391 traffic_controller: None,
392 client_id_source: None,
393 }
394 }
395
396 pub fn validator_state(&self) -> &Arc<AuthorityState> {
398 &self.state
399 }
400
401 pub async fn execute_certificate_for_testing(
403 &self,
404 cert: CertifiedTransaction,
405 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
406 let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
407 self.handle_certificate_v1(request).await
408 }
409
410 pub async fn handle_transaction_for_benchmarking(
412 &self,
413 transaction: Transaction,
414 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
415 let request = make_tonic_request_for_testing(transaction);
416 self.transaction(request).await
417 }
418
419 async fn handle_transaction(
421 &self,
422 request: tonic::Request<Transaction>,
423 ) -> WrappedServiceResponse<HandleTransactionResponse> {
424 let Self {
425 state,
426 consensus_adapter,
427 metrics,
428 traffic_controller: _,
429 client_id_source: _,
430 } = self.clone();
431 let transaction = request.into_inner();
432 let epoch_store = state.load_epoch_store_one_call_per_task();
433
434 transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
435
436 let mut validator_pushback_error = None;
445 let overload_check_res = state.check_system_overload(
446 &consensus_adapter,
447 transaction.data(),
448 state.check_system_overload_at_signing(),
449 );
450 if let Err(error) = overload_check_res {
451 metrics
452 .num_rejected_tx_during_overload
453 .with_label_values(&[error.as_ref()])
454 .inc();
455 match error {
457 IotaError::ValidatorOverloadedRetryAfter { .. } => {
458 validator_pushback_error = Some(error)
459 }
460 _ => return Err(error.into()),
461 }
462 }
463
464 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
465
466 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
467 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
468 metrics.signature_errors.inc();
469 })?;
470 drop(tx_verif_metrics_guard);
471
472 let tx_digest = transaction.digest();
473
474 let span = error_span!("validator_state_process_tx", ?tx_digest);
476
477 let info = state
478 .handle_transaction(&epoch_store, transaction.clone())
479 .instrument(span)
480 .await
481 .tap_err(|e| {
482 if let IotaError::ValidatorHaltedAtEpochEnd = e {
483 metrics.num_rejected_tx_in_epoch_boundary.inc();
484 }
485 })?;
486
487 if let Some(error) = validator_pushback_error {
488 return Err(error.into());
491 }
492
493 Ok((tonic::Response::new(info), Weight::zero()))
494 }
495
496 async fn handle_certificates(
502 &self,
503 certificates: NonEmpty<CertifiedTransaction>,
504 include_events: bool,
505 include_input_objects: bool,
506 include_output_objects: bool,
507 _include_auxiliary_data: bool,
508 epoch_store: &Arc<AuthorityPerEpochStore>,
509 wait_for_effects: bool,
510 ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
511 fp_ensure!(
514 !self.state.is_fullnode(epoch_store),
515 IotaError::FullNodeCantHandleCertificate.into()
516 );
517
518 let shared_object_tx = certificates
519 .iter()
520 .any(|cert| cert.contains_shared_object());
521
522 let metrics = if certificates.len() == 1 {
523 if wait_for_effects {
524 if shared_object_tx {
525 &self.metrics.handle_certificate_consensus_latency
526 } else {
527 &self.metrics.handle_certificate_non_consensus_latency
528 }
529 } else {
530 &self.metrics.submit_certificate_consensus_latency
531 }
532 } else {
533 &self
536 .metrics
537 .handle_soft_bundle_certificates_consensus_latency
538 };
539
540 let _metrics_guard = metrics.start_timer();
541
542 if certificates.len() == 1 {
547 let tx_digest = *certificates[0].digest();
548
549 if let Some(signed_effects) = self
550 .state
551 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
552 {
553 let events = if include_events {
554 if let Some(digest) = signed_effects.events_digest() {
555 Some(self.state.get_transaction_events(digest)?)
556 } else {
557 None
558 }
559 } else {
560 None
561 };
562
563 return Ok((
564 Some(vec![HandleCertificateResponseV1 {
565 signed_effects: signed_effects.into_inner(),
566 events,
567 input_objects: None,
568 output_objects: None,
569 auxiliary_data: None,
570 }]),
571 Weight::one(),
572 ));
573 };
574 }
575
576 for certificate in &certificates {
579 let overload_check_res = self.state.check_system_overload(
580 &self.consensus_adapter,
581 certificate.data(),
582 self.state.check_system_overload_at_execution(),
583 );
584 if let Err(error) = overload_check_res {
585 self.metrics
586 .num_rejected_cert_during_overload
587 .with_label_values(&[error.as_ref()])
588 .inc();
589 return Err(error.into());
590 }
591 }
592
593 let verified_certificates = {
594 let _timer = self.metrics.cert_verification_latency.start_timer();
595 epoch_store
596 .signature_verifier
597 .multi_verify_certs(certificates.into())
598 .await
599 .into_iter()
600 .collect::<Result<Vec<_>, _>>()?
601 };
602
603 {
604 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
606 if !reconfiguration_lock.should_accept_user_certs() {
607 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
608 return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
609 }
610
611 if !epoch_store
617 .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
618 {
619 let _metrics_guard = if shared_object_tx {
620 Some(self.metrics.consensus_latency.start_timer())
621 } else {
622 None
623 };
624 let transactions = verified_certificates
625 .iter()
626 .map(|certificate| {
627 ConsensusTransaction::new_certificate_message(
628 &self.state.name,
629 certificate.clone().into(),
630 )
631 })
632 .collect::<Vec<_>>();
633 self.consensus_adapter.submit_batch(
634 &transactions,
635 Some(&reconfiguration_lock),
636 epoch_store,
637 )?;
638 }
642 }
643
644 if !wait_for_effects {
645 let certificates_without_shared_objects = verified_certificates
648 .iter()
649 .filter(|certificate| !certificate.contains_shared_object())
650 .cloned()
651 .collect::<Vec<_>>();
652 if !certificates_without_shared_objects.is_empty() {
653 self.state.enqueue_certificates_for_execution(
654 certificates_without_shared_objects,
655 epoch_store,
656 );
657 }
658 return Ok((None, Weight::zero()));
659 }
660
661 let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
665 |certificate| async move {
666 let effects = self
667 .state
668 .execute_certificate(&certificate, epoch_store)
669 .await?;
670 let events = if include_events {
671 if let Some(digest) = effects.events_digest() {
672 Some(self.state.get_transaction_events(digest)?)
673 } else {
674 None
675 }
676 } else {
677 None
678 };
679
680 let input_objects = include_input_objects
681 .then(|| self.state.get_transaction_input_objects(&effects))
682 .and_then(Result::ok);
683
684 let output_objects = include_output_objects
685 .then(|| self.state.get_transaction_output_objects(&effects))
686 .and_then(Result::ok);
687
688 let signed_effects = self.state.sign_effects(effects, epoch_store)?;
689 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
690
691 Ok::<_, IotaError>(HandleCertificateResponseV1 {
692 signed_effects: signed_effects.into_inner(),
693 events,
694 input_objects,
695 output_objects,
696 auxiliary_data: None, })
698 },
699 ))
700 .await?;
701
702 Ok((Some(responses), Weight::zero()))
703 }
704}
705
706type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
707
708impl ValidatorService {
709 async fn transaction_impl(
710 &self,
711 request: tonic::Request<Transaction>,
712 ) -> WrappedServiceResponse<HandleTransactionResponse> {
713 self.handle_transaction(request).await
714 }
715
716 async fn submit_certificate_impl(
717 &self,
718 request: tonic::Request<CertifiedTransaction>,
719 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
720 let epoch_store = self.state.load_epoch_store_one_call_per_task();
721 let certificate = request.into_inner();
722 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
723
724 let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
725 self.handle_certificates(
726 nonempty![certificate],
727 true,
728 false,
729 false,
730 false,
731 &epoch_store,
732 false,
733 )
734 .instrument(span)
735 .await
736 .map(|(executed, spam_weight)| {
737 (
738 tonic::Response::new(SubmitCertificateResponse {
739 executed: executed.map(|mut x| x.remove(0)),
740 }),
741 spam_weight,
742 )
743 })
744 }
745
746 async fn handle_certificate_v1_impl(
747 &self,
748 request: tonic::Request<HandleCertificateRequestV1>,
749 ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
750 let epoch_store = self.state.load_epoch_store_one_call_per_task();
751 let request = request.into_inner();
752 request
753 .certificate
754 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
755
756 let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
757 self.handle_certificates(
758 nonempty![request.certificate],
759 request.include_events,
760 request.include_input_objects,
761 request.include_output_objects,
762 request.include_auxiliary_data,
763 &epoch_store,
764 true,
765 )
766 .instrument(span)
767 .await
768 .map(|(resp, spam_weight)| {
769 (
770 tonic::Response::new(
771 resp.expect(
772 "handle_certificate should not return none with wait_for_effects=true",
773 )
774 .remove(0),
775 ),
776 spam_weight,
777 )
778 })
779 }
780
781 async fn soft_bundle_validity_check(
782 &self,
783 certificates: &NonEmpty<CertifiedTransaction>,
784 epoch_store: &Arc<AuthorityPerEpochStore>,
785 total_size_bytes: u64,
786 ) -> Result<(), tonic::Status> {
787 let protocol_config = epoch_store.protocol_config();
788
789 fp_ensure!(
796 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
797 IotaError::UserInput {
798 error: UserInputError::TooManyTransactionsInSoftBundle {
799 limit: protocol_config.max_soft_bundle_size()
800 }
801 }
802 .into()
803 );
804
805 let soft_bundle_max_size_bytes =
811 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
812 fp_ensure!(
813 total_size_bytes <= soft_bundle_max_size_bytes,
814 IotaError::UserInput {
815 error: UserInputError::SoftBundleTooLarge {
816 size: total_size_bytes,
817 limit: soft_bundle_max_size_bytes,
818 },
819 }
820 .into()
821 );
822
823 let mut gas_price = None;
824 for certificate in certificates {
825 let tx_digest = *certificate.digest();
826 fp_ensure!(
827 certificate.contains_shared_object(),
828 IotaError::UserInput {
829 error: UserInputError::NoSharedObject { digest: tx_digest }
830 }
831 .into()
832 );
833 fp_ensure!(
834 !self.state.is_tx_already_executed(&tx_digest)?,
835 IotaError::UserInput {
836 error: UserInputError::AlreadyExecuted { digest: tx_digest }
837 }
838 .into()
839 );
840 if let Some(gas) = gas_price {
841 fp_ensure!(
842 gas == certificate.gas_price(),
843 IotaError::UserInput {
844 error: UserInputError::GasPriceMismatch {
845 digest: tx_digest,
846 expected: gas,
847 actual: certificate.gas_price()
848 }
849 }
850 .into()
851 );
852 } else {
853 gas_price = Some(certificate.gas_price());
854 }
855 }
856
857 fp_ensure!(
863 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
864 IotaError::UserInput {
865 error: UserInputError::CertificateAlreadyProcessed
866 }
867 .into()
868 );
869
870 Ok(())
871 }
872
873 async fn handle_soft_bundle_certificates_v1_impl(
874 &self,
875 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
876 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
877 let epoch_store = self.state.load_epoch_store_one_call_per_task();
878 let client_addr = if self.client_id_source.is_none() {
879 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
880 } else {
881 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
882 };
883 let request = request.into_inner();
884
885 let certificates =
886 NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
887 let mut total_size_bytes = 0;
888 for certificate in &certificates {
889 total_size_bytes += certificate
891 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
892 as u64;
893 }
894
895 self.metrics
896 .handle_soft_bundle_certificates_count
897 .observe(certificates.len() as f64);
898
899 self.metrics
900 .handle_soft_bundle_certificates_size_bytes
901 .observe(total_size_bytes as f64);
902
903 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
905 .await?;
906
907 info!(
908 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
909 certificates.len(),
910 client_addr
911 .map(|x| x.to_string())
912 .unwrap_or_else(|| "unknown".to_string()),
913 certificates
914 .iter()
915 .map(|x| x.digest().to_string())
916 .collect::<Vec<_>>()
917 .join(", "),
918 total_size_bytes
919 );
920
921 let span = error_span!("handle_soft_bundle_certificates_v1");
922 self.handle_certificates(
923 certificates,
924 request.include_events,
925 request.include_input_objects,
926 request.include_output_objects,
927 request.include_auxiliary_data,
928 &epoch_store,
929 request.wait_for_effects,
930 )
931 .instrument(span)
932 .await
933 .map(|(resp, spam_weight)| {
934 (
935 tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
936 responses: resp.unwrap_or_default(),
937 }),
938 spam_weight,
939 )
940 })
941 }
942
943 async fn object_info_impl(
944 &self,
945 request: tonic::Request<ObjectInfoRequest>,
946 ) -> WrappedServiceResponse<ObjectInfoResponse> {
947 let request = request.into_inner();
948 let response = self.state.handle_object_info_request(request).await?;
949 Ok((tonic::Response::new(response), Weight::one()))
950 }
951
952 async fn transaction_info_impl(
953 &self,
954 request: tonic::Request<TransactionInfoRequest>,
955 ) -> WrappedServiceResponse<TransactionInfoResponse> {
956 let request = request.into_inner();
957 let response = self.state.handle_transaction_info_request(request).await?;
958 Ok((tonic::Response::new(response), Weight::one()))
959 }
960
961 async fn checkpoint_impl(
962 &self,
963 request: tonic::Request<CheckpointRequest>,
964 ) -> WrappedServiceResponse<CheckpointResponse> {
965 let request = request.into_inner();
966 let response = self.state.handle_checkpoint_request(&request)?;
967 Ok((tonic::Response::new(response), Weight::one()))
968 }
969
970 async fn get_system_state_object_impl(
971 &self,
972 _request: tonic::Request<SystemStateRequest>,
973 ) -> WrappedServiceResponse<IotaSystemState> {
974 let response = self
975 .state
976 .get_object_cache_reader()
977 .get_iota_system_state_object_unsafe()?;
978 Ok((tonic::Response::new(response), Weight::one()))
979 }
980
981 fn get_client_ip_addr<T>(
982 &self,
983 request: &tonic::Request<T>,
984 source: &ClientIdSource,
985 ) -> Option<IpAddr> {
986 match source {
987 ClientIdSource::SocketAddr => {
988 let socket_addr: Option<SocketAddr> = request.remote_addr();
989
990 if let Some(socket_addr) = socket_addr {
996 Some(socket_addr.ip())
997 } else {
998 if cfg!(msim) {
999 } else if cfg!(test) {
1001 panic!("Failed to get remote address from request");
1002 } else {
1003 self.metrics.connection_ip_not_found.inc();
1004 error!("Failed to get remote address from request");
1005 }
1006 None
1007 }
1008 }
1009 ClientIdSource::XForwardedFor(num_hops) => {
1010 let do_header_parse = |op: &MetadataValue<Ascii>| {
1011 match op.to_str() {
1012 Ok(header_val) => {
1013 let header_contents =
1014 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1015 if *num_hops == 0 {
1016 error!(
1017 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1018 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1019 to this node. Skipping traffic controller request handling.",
1020 header_contents,
1021 );
1022 return None;
1023 }
1024 let contents_len = header_contents.len();
1025 if contents_len < *num_hops {
1026 error!(
1027 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1028 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1029 `client-id-source` in the node config.",
1030 header_contents, contents_len, num_hops, contents_len,
1031 );
1032 self.metrics.client_id_source_config_mismatch.inc();
1033 return None;
1034 }
1035 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1036 else {
1037 error!(
1038 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1039 Expected at least {} values. Skipping traffic controller request handling.",
1040 header_contents, contents_len, num_hops, contents_len,
1041 );
1042 return None;
1043 };
1044 parse_ip(client_ip).or_else(|| {
1045 self.metrics.forwarded_header_parse_error.inc();
1046 None
1047 })
1048 }
1049 Err(e) => {
1050 self.metrics.forwarded_header_invalid.inc();
1054 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1055 None
1056 }
1057 }
1058 };
1059 if let Some(op) = request.metadata().get("x-forwarded-for") {
1060 do_header_parse(op)
1061 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1062 do_header_parse(op)
1063 } else {
1064 self.metrics.forwarded_header_not_included.inc();
1065 error!(
1066 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1067 );
1068 None
1069 }
1070 }
1071 }
1072 }
1073
1074 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1075 if let Some(traffic_controller) = &self.traffic_controller {
1076 if !traffic_controller.check(&client, &None).await {
1077 Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1079 } else {
1080 Ok(())
1081 }
1082 } else {
1083 Ok(())
1084 }
1085 }
1086
1087 fn handle_traffic_resp<T>(
1088 &self,
1089 client: Option<IpAddr>,
1090 wrapped_response: WrappedServiceResponse<T>,
1091 ) -> Result<tonic::Response<T>, tonic::Status> {
1092 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1093 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1094 Err(status) => (
1095 Some(IotaError::from(status.clone())),
1096 Weight::zero(),
1097 Err(status.clone()),
1098 ),
1099 };
1100
1101 if let Some(traffic_controller) = self.traffic_controller.clone() {
1102 traffic_controller.tally(TrafficTally {
1103 direct: client,
1104 through_fullnode: None,
1105 error_info: error.map(|e| {
1106 let error_type = String::from(e.clone().as_ref());
1107 let error_weight = normalize(e);
1108 (error_weight, error_type)
1109 }),
1110 spam_weight,
1111 timestamp: SystemTime::now(),
1112 })
1113 }
1114 unwrapped_response
1115 }
1116}
1117
1118fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1119 let mut request = tonic::Request::new(message);
1122 let tcp_connect_info = TcpConnectInfo {
1123 local_addr: None,
1124 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1125 };
1126 request.extensions_mut().insert(tcp_connect_info);
1127 request
1128}
1129
1130fn normalize(err: IotaError) -> Weight {
1132 match err {
1133 IotaError::UserInput {
1134 error: UserInputError::IncorrectUserSignature { .. },
1135 } => Weight::one(),
1136 IotaError::InvalidSignature { .. }
1137 | IotaError::SignerSignatureAbsent { .. }
1138 | IotaError::SignerSignatureNumberMismatch { .. }
1139 | IotaError::IncorrectSigner { .. }
1140 | IotaError::UnknownSigner { .. }
1141 | IotaError::WrongEpoch { .. } => Weight::one(),
1142 _ => Weight::zero(),
1143 }
1144}
1145
1146#[macro_export]
1150macro_rules! handle_with_decoration {
1151 ($self:ident, $func_name:ident, $request:ident) => {{
1152 if $self.client_id_source.is_none() {
1153 return $self.$func_name($request).await.map(|(result, _)| result);
1154 }
1155
1156 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1157
1158 $self.handle_traffic_req(client.clone()).await?;
1160
1161 let wrapped_response = $self.$func_name($request).await;
1163 $self.handle_traffic_resp(client, wrapped_response)
1164 }};
1165}
1166
1167#[async_trait]
1168impl Validator for ValidatorService {
1169 async fn transaction(
1171 &self,
1172 request: tonic::Request<Transaction>,
1173 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1174 let validator_service = self.clone();
1175
1176 spawn_monitored_task!(async move {
1180 handle_with_decoration!(validator_service, transaction_impl, request)
1184 })
1185 .await
1186 .unwrap()
1187 }
1188
1189 async fn submit_certificate(
1191 &self,
1192 request: tonic::Request<CertifiedTransaction>,
1193 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1194 let validator_service = self.clone();
1195
1196 spawn_monitored_task!(async move {
1200 handle_with_decoration!(validator_service, submit_certificate_impl, request)
1204 })
1205 .await
1206 .unwrap()
1207 }
1208
1209 async fn handle_certificate_v1(
1210 &self,
1211 request: tonic::Request<HandleCertificateRequestV1>,
1212 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1213 handle_with_decoration!(self, handle_certificate_v1_impl, request)
1214 }
1215
1216 async fn handle_soft_bundle_certificates_v1(
1217 &self,
1218 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1219 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1220 handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1221 }
1222
1223 async fn object_info(
1225 &self,
1226 request: tonic::Request<ObjectInfoRequest>,
1227 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1228 handle_with_decoration!(self, object_info_impl, request)
1229 }
1230
1231 async fn transaction_info(
1233 &self,
1234 request: tonic::Request<TransactionInfoRequest>,
1235 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1236 handle_with_decoration!(self, transaction_info_impl, request)
1237 }
1238
1239 async fn checkpoint(
1241 &self,
1242 request: tonic::Request<CheckpointRequest>,
1243 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1244 handle_with_decoration!(self, checkpoint_impl, request)
1245 }
1246
1247 async fn get_system_state_object(
1249 &self,
1250 request: tonic::Request<SystemStateRequest>,
1251 ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1252 handle_with_decoration!(self, get_system_state_object_impl, request)
1253 }
1254}