1use std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19 api::{Validator, ValidatorServer},
20 tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24 effects::TransactionEffectsAPI,
25 error::*,
26 fp_ensure,
27 iota_system_state::IotaSystemState,
28 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29 messages_consensus::ConsensusTransaction,
30 messages_grpc::{
31 HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
32 HandleCertificateRequestV1, HandleCertificateResponseV1,
33 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
34 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
35 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
36 TransactionInfoResponse,
37 },
38 multiaddr::Multiaddr,
39 traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
40 transaction::*,
41};
42use nonempty::{NonEmpty, nonempty};
43use prometheus::{
44 Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
45 register_int_counter_vec_with_registry, register_int_counter_with_registry,
46};
47use tap::TapFallible;
48use tonic::{
49 metadata::{Ascii, MetadataValue},
50 transport::server::TcpConnectInfo,
51};
52use tracing::{Instrument, debug, error, error_span, info};
53
54use crate::{
55 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
56 checkpoints::CheckpointStore,
57 consensus_adapter::{
58 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
59 },
60 mysticeti_adapter::LazyMysticetiClient,
61 traffic_controller::{
62 TrafficController, metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally,
63 },
64};
65
66#[cfg(test)]
67#[path = "unit_tests/server_tests.rs"]
68mod server_tests;
69
70pub struct AuthorityServerHandle {
72 server_handle: iota_network_stack::server::Server,
73}
74
75impl AuthorityServerHandle {
76 pub async fn join(self) -> Result<(), io::Error> {
78 self.server_handle.handle().wait_for_shutdown().await;
79 Ok(())
80 }
81
82 pub async fn kill(self) -> Result<(), io::Error> {
84 self.server_handle.handle().shutdown().await;
85 Ok(())
86 }
87
88 pub fn address(&self) -> &Multiaddr {
90 self.server_handle.local_addr()
91 }
92}
93
94pub struct AuthorityServer {
96 address: Multiaddr,
97 pub state: Arc<AuthorityState>,
98 consensus_adapter: Arc<ConsensusAdapter>,
99 pub metrics: Arc<ValidatorServiceMetrics>,
100}
101
102impl AuthorityServer {
103 pub fn new_for_test_with_consensus_adapter(
105 state: Arc<AuthorityState>,
106 consensus_adapter: Arc<ConsensusAdapter>,
107 ) -> Self {
108 let address = new_local_tcp_address_for_testing();
109 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
110
111 Self {
112 address,
113 state,
114 consensus_adapter,
115 metrics,
116 }
117 }
118
119 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
121 let consensus_adapter = Arc::new(ConsensusAdapter::new(
122 Arc::new(LazyMysticetiClient::new()),
123 CheckpointStore::new_for_tests(),
124 state.name,
125 Arc::new(ConnectionMonitorStatusForTests {}),
126 100_000,
127 100_000,
128 None,
129 None,
130 ConsensusAdapterMetrics::new_test(),
131 ));
132 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
133 }
134
135 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
137 let address = self.address.clone();
138 self.spawn_with_bind_address_for_test(address).await
139 }
140
141 pub async fn spawn_with_bind_address_for_test(
143 self,
144 address: Multiaddr,
145 ) -> Result<AuthorityServerHandle, io::Error> {
146 let tls_config = iota_tls::create_rustls_server_config(
147 self.state.config.network_key_pair().copy().private(),
148 IOTA_TLS_SERVER_NAME.to_string(),
149 );
150 let server = iota_network_stack::config::Config::new()
151 .server_builder()
152 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
153 self.state,
154 self.consensus_adapter,
155 self.metrics,
156 )))
157 .bind(&address, Some(tls_config))
158 .await
159 .unwrap();
160 let local_addr = server.local_addr().to_owned();
161 info!("Listening to traffic on {local_addr}");
162 let handle = AuthorityServerHandle {
163 server_handle: server,
164 };
165 Ok(handle)
166 }
167}
168
169pub struct ValidatorServiceMetrics {
171 pub signature_errors: IntCounter,
172 pub tx_verification_latency: Histogram,
173 pub cert_verification_latency: Histogram,
174 pub consensus_latency: Histogram,
175 pub handle_transaction_latency: Histogram,
176 pub submit_certificate_consensus_latency: Histogram,
177 pub handle_certificate_consensus_latency: Histogram,
178 pub handle_certificate_non_consensus_latency: Histogram,
179 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
180 pub handle_soft_bundle_certificates_count: Histogram,
181 pub handle_soft_bundle_certificates_size_bytes: Histogram,
182 pub handle_capability_notification_latency: Histogram,
183
184 num_rejected_tx_in_epoch_boundary: IntCounter,
185 num_rejected_cert_in_epoch_boundary: IntCounter,
186 num_rejected_tx_during_overload: IntCounterVec,
187 num_rejected_cert_during_overload: IntCounterVec,
188 num_rejected_capability_notifications_during_overload: IntCounterVec,
189 connection_ip_not_found: IntCounter,
190 forwarded_header_parse_error: IntCounter,
191 forwarded_header_invalid: IntCounter,
192 forwarded_header_not_included: IntCounter,
193 client_id_source_config_mismatch: IntCounter,
194}
195
196impl ValidatorServiceMetrics {
197 pub fn new(registry: &Registry) -> Self {
199 Self {
200 signature_errors: register_int_counter_with_registry!(
201 "total_signature_errors",
202 "Number of transaction signature errors",
203 registry,
204 )
205 .unwrap(),
206 tx_verification_latency: register_histogram_with_registry!(
207 "validator_service_tx_verification_latency",
208 "Latency of verifying a transaction",
209 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
210 registry,
211 )
212 .unwrap(),
213 cert_verification_latency: register_histogram_with_registry!(
214 "validator_service_cert_verification_latency",
215 "Latency of verifying a certificate",
216 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
217 registry,
218 )
219 .unwrap(),
220 consensus_latency: register_histogram_with_registry!(
221 "validator_service_consensus_latency",
222 "Time spent between submitting a shared obj txn to consensus and getting result",
223 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
224 registry,
225 )
226 .unwrap(),
227 handle_transaction_latency: register_histogram_with_registry!(
228 "validator_service_handle_transaction_latency",
229 "Latency of handling a transaction",
230 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231 registry,
232 )
233 .unwrap(),
234 handle_certificate_consensus_latency: register_histogram_with_registry!(
235 "validator_service_handle_certificate_consensus_latency",
236 "Latency of handling a consensus transaction certificate",
237 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap(),
241 submit_certificate_consensus_latency: register_histogram_with_registry!(
242 "validator_service_submit_certificate_consensus_latency",
243 "Latency of submit_certificate RPC handler",
244 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
245 registry,
246 )
247 .unwrap(),
248 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
249 "validator_service_handle_certificate_non_consensus_latency",
250 "Latency of handling a non-consensus transaction certificate",
251 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
252 registry,
253 )
254 .unwrap(),
255 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
256 "validator_service_handle_soft_bundle_certificates_consensus_latency",
257 "Latency of handling a consensus soft bundle",
258 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
259 registry,
260 )
261 .unwrap(),
262 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
263 "validator_service_handle_soft_bundle_certificates_count",
264 "The number of certificates included in a soft bundle",
265 iota_metrics::COUNT_BUCKETS.to_vec(),
266 registry,
267 )
268 .unwrap(),
269 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
270 "validator_service_handle_soft_bundle_certificates_size_bytes",
271 "The size of soft bundle in bytes",
272 iota_metrics::BYTES_BUCKETS.to_vec(),
273 registry,
274 )
275 .unwrap(),
276 handle_capability_notification_latency: register_histogram_with_registry!(
277 "validator_service_handle_capability_notification_latency",
278 "Latency of handling a capability notification",
279 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
280 registry,
281 )
282 .unwrap(),
283 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
284 "validator_service_num_rejected_tx_in_epoch_boundary",
285 "Number of rejected transaction during epoch transitioning",
286 registry,
287 )
288 .unwrap(),
289 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
290 "validator_service_num_rejected_cert_in_epoch_boundary",
291 "Number of rejected transaction certificate during epoch transitioning",
292 registry,
293 )
294 .unwrap(),
295 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
296 "validator_service_num_rejected_tx_during_overload",
297 "Number of rejected transaction due to system overload",
298 &["error_type"],
299 registry,
300 )
301 .unwrap(),
302 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
303 "validator_service_num_rejected_cert_during_overload",
304 "Number of rejected transaction certificate due to system overload",
305 &["error_type"],
306 registry,
307 )
308 .unwrap(),
309 num_rejected_capability_notifications_during_overload: register_int_counter_vec_with_registry!(
310 "num_rejected_capability_notifications_during_overload",
311 "Number of rejected capability notifications from non-committee active validators due to system overload",
312 &["error_type"],
313 registry,
314 )
315 .unwrap(),
316 connection_ip_not_found: register_int_counter_with_registry!(
317 "validator_service_connection_ip_not_found",
318 "Number of times connection IP was not extractable from request",
319 registry,
320 )
321 .unwrap(),
322 forwarded_header_parse_error: register_int_counter_with_registry!(
323 "validator_service_forwarded_header_parse_error",
324 "Number of times x-forwarded-for header could not be parsed",
325 registry,
326 )
327 .unwrap(),
328 forwarded_header_invalid: register_int_counter_with_registry!(
329 "validator_service_forwarded_header_invalid",
330 "Number of times x-forwarded-for header was invalid",
331 registry,
332 )
333 .unwrap(),
334 forwarded_header_not_included: register_int_counter_with_registry!(
335 "validator_service_forwarded_header_not_included",
336 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
337 registry,
338 )
339 .unwrap(),
340 client_id_source_config_mismatch: register_int_counter_with_registry!(
341 "validator_service_client_id_source_config_mismatch",
342 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
343 registry,
344 )
345 .unwrap(),
346 }
347 }
348
349 pub fn new_for_tests() -> Self {
351 let registry = Registry::new();
352 Self::new(®istry)
353 }
354}
355
356#[derive(Clone)]
358pub struct ValidatorService {
359 state: Arc<AuthorityState>,
360 consensus_adapter: Arc<ConsensusAdapter>,
361 metrics: Arc<ValidatorServiceMetrics>,
362 traffic_controller: Option<Arc<TrafficController>>,
363 client_id_source: Option<ClientIdSource>,
364}
365
366impl ValidatorService {
367 pub fn new(
369 state: Arc<AuthorityState>,
370 consensus_adapter: Arc<ConsensusAdapter>,
371 validator_metrics: Arc<ValidatorServiceMetrics>,
372 traffic_controller_metrics: TrafficControllerMetrics,
373 policy_config: Option<PolicyConfig>,
374 firewall_config: Option<RemoteFirewallConfig>,
375 ) -> Self {
376 Self {
377 state,
378 consensus_adapter,
379 metrics: validator_metrics,
380 traffic_controller: policy_config.clone().map(|policy| {
381 Arc::new(TrafficController::init(
382 policy,
383 traffic_controller_metrics,
384 firewall_config,
385 ))
386 }),
387 client_id_source: policy_config.map(|policy| policy.client_id_source),
388 }
389 }
390
391 pub fn new_for_tests(
392 state: Arc<AuthorityState>,
393 consensus_adapter: Arc<ConsensusAdapter>,
394 metrics: Arc<ValidatorServiceMetrics>,
395 ) -> Self {
396 Self {
397 state,
398 consensus_adapter,
399 metrics,
400 traffic_controller: None,
401 client_id_source: None,
402 }
403 }
404
405 pub fn validator_state(&self) -> &Arc<AuthorityState> {
407 &self.state
408 }
409
410 pub async fn execute_certificate_for_testing(
412 &self,
413 cert: CertifiedTransaction,
414 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
415 let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
416 self.handle_certificate_v1(request).await
417 }
418
419 pub async fn handle_transaction_for_benchmarking(
421 &self,
422 transaction: Transaction,
423 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
424 let request = make_tonic_request_for_testing(transaction);
425 self.transaction(request).await
426 }
427
428 async fn handle_transaction(
430 &self,
431 request: tonic::Request<Transaction>,
432 ) -> WrappedServiceResponse<HandleTransactionResponse> {
433 let Self {
434 state,
435 consensus_adapter,
436 metrics,
437 traffic_controller: _,
438 client_id_source: _,
439 } = self.clone();
440 let transaction = request.into_inner();
441 let epoch_store = state.load_epoch_store_one_call_per_task();
442
443 transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
444
445 let mut validator_pushback_error = None;
454 let overload_check_res = state.check_system_overload(
455 &consensus_adapter,
456 transaction.data(),
457 state.check_system_overload_at_signing(),
458 );
459 if let Err(error) = overload_check_res {
460 metrics
461 .num_rejected_tx_during_overload
462 .with_label_values(&[error.as_ref()])
463 .inc();
464 match error {
466 IotaError::ValidatorOverloadedRetryAfter { .. } => {
467 validator_pushback_error = Some(error)
468 }
469 _ => return Err(error.into()),
470 }
471 }
472
473 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
474
475 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
476 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
477 metrics.signature_errors.inc();
478 })?;
479 drop(tx_verif_metrics_guard);
480
481 let tx_digest = transaction.digest();
482
483 let span = error_span!("validator_state_process_tx", ?tx_digest);
485
486 let info = state
487 .handle_transaction(&epoch_store, transaction.clone())
488 .instrument(span)
489 .await
490 .tap_err(|e| {
491 if let IotaError::ValidatorHaltedAtEpochEnd = e {
492 metrics.num_rejected_tx_in_epoch_boundary.inc();
493 }
494 })?;
495
496 if let Some(error) = validator_pushback_error {
497 return Err(error.into());
500 }
501
502 Ok((tonic::Response::new(info), Weight::zero()))
503 }
504
505 async fn handle_certificates(
511 &self,
512 certificates: NonEmpty<CertifiedTransaction>,
513 include_events: bool,
514 include_input_objects: bool,
515 include_output_objects: bool,
516 _include_auxiliary_data: bool,
517 epoch_store: &Arc<AuthorityPerEpochStore>,
518 wait_for_effects: bool,
519 ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
520 fp_ensure!(
523 !self.state.is_fullnode(epoch_store),
524 IotaError::FullNodeCantHandleCertificate.into()
525 );
526
527 let shared_object_tx = certificates
528 .iter()
529 .any(|cert| cert.contains_shared_object());
530
531 let metrics = if certificates.len() == 1 {
532 if wait_for_effects {
533 if shared_object_tx {
534 &self.metrics.handle_certificate_consensus_latency
535 } else {
536 &self.metrics.handle_certificate_non_consensus_latency
537 }
538 } else {
539 &self.metrics.submit_certificate_consensus_latency
540 }
541 } else {
542 &self
545 .metrics
546 .handle_soft_bundle_certificates_consensus_latency
547 };
548
549 let _metrics_guard = metrics.start_timer();
550
551 if certificates.len() == 1 {
556 let tx_digest = *certificates[0].digest();
557
558 if let Some(signed_effects) = self
559 .state
560 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
561 {
562 let events = if include_events {
563 if let Some(digest) = signed_effects.events_digest() {
564 Some(self.state.get_transaction_events(digest)?)
565 } else {
566 None
567 }
568 } else {
569 None
570 };
571
572 return Ok((
573 Some(vec![HandleCertificateResponseV1 {
574 signed_effects: signed_effects.into_inner(),
575 events,
576 input_objects: None,
577 output_objects: None,
578 auxiliary_data: None,
579 }]),
580 Weight::one(),
581 ));
582 };
583 }
584
585 for certificate in &certificates {
588 let overload_check_res = self.state.check_system_overload(
589 &self.consensus_adapter,
590 certificate.data(),
591 self.state.check_system_overload_at_execution(),
592 );
593 if let Err(error) = overload_check_res {
594 self.metrics
595 .num_rejected_cert_during_overload
596 .with_label_values(&[error.as_ref()])
597 .inc();
598 return Err(error.into());
599 }
600 }
601
602 let verified_certificates = {
603 let _timer = self.metrics.cert_verification_latency.start_timer();
604 epoch_store
605 .signature_verifier
606 .multi_verify_certs(certificates.into())
607 .await
608 .into_iter()
609 .collect::<Result<Vec<_>, _>>()?
610 };
611
612 {
613 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
615 if !reconfiguration_lock.should_accept_user_certs() {
616 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
617 return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
618 }
619
620 if !epoch_store
626 .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
627 {
628 let _metrics_guard = if shared_object_tx {
629 Some(self.metrics.consensus_latency.start_timer())
630 } else {
631 None
632 };
633 let transactions = verified_certificates
634 .iter()
635 .map(|certificate| {
636 ConsensusTransaction::new_certificate_message(
637 &self.state.name,
638 certificate.clone().into(),
639 )
640 })
641 .collect::<Vec<_>>();
642 self.consensus_adapter.submit_batch(
643 &transactions,
644 Some(&reconfiguration_lock),
645 epoch_store,
646 )?;
647 }
651 }
652
653 if !wait_for_effects {
654 let certificates_without_shared_objects = verified_certificates
657 .iter()
658 .filter(|certificate| !certificate.contains_shared_object())
659 .cloned()
660 .collect::<Vec<_>>();
661 if !certificates_without_shared_objects.is_empty() {
662 self.state.enqueue_certificates_for_execution(
663 certificates_without_shared_objects,
664 epoch_store,
665 );
666 }
667 return Ok((None, Weight::zero()));
668 }
669
670 let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
674 |certificate| async move {
675 let effects = self
676 .state
677 .execute_certificate(&certificate, epoch_store)
678 .await?;
679 let events = if include_events {
680 if let Some(digest) = effects.events_digest() {
681 Some(self.state.get_transaction_events(digest)?)
682 } else {
683 None
684 }
685 } else {
686 None
687 };
688
689 let input_objects = include_input_objects
690 .then(|| self.state.get_transaction_input_objects(&effects))
691 .and_then(Result::ok);
692
693 let output_objects = include_output_objects
694 .then(|| self.state.get_transaction_output_objects(&effects))
695 .and_then(Result::ok);
696
697 let signed_effects = self.state.sign_effects(effects, epoch_store)?;
698 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
699
700 Ok::<_, IotaError>(HandleCertificateResponseV1 {
701 signed_effects: signed_effects.into_inner(),
702 events,
703 input_objects,
704 output_objects,
705 auxiliary_data: None, })
707 },
708 ))
709 .await?;
710
711 Ok((Some(responses), Weight::zero()))
712 }
713}
714
715type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
716
717impl ValidatorService {
718 async fn transaction_impl(
719 &self,
720 request: tonic::Request<Transaction>,
721 ) -> WrappedServiceResponse<HandleTransactionResponse> {
722 self.handle_transaction(request).await
723 }
724
725 async fn submit_certificate_impl(
726 &self,
727 request: tonic::Request<CertifiedTransaction>,
728 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
729 let epoch_store = self.state.load_epoch_store_one_call_per_task();
730 let certificate = request.into_inner();
731 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
732
733 let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
734 self.handle_certificates(
735 nonempty![certificate],
736 true,
737 false,
738 false,
739 false,
740 &epoch_store,
741 false,
742 )
743 .instrument(span)
744 .await
745 .map(|(executed, spam_weight)| {
746 (
747 tonic::Response::new(SubmitCertificateResponse {
748 executed: executed.map(|mut x| x.remove(0)),
749 }),
750 spam_weight,
751 )
752 })
753 }
754
755 async fn handle_certificate_v1_impl(
756 &self,
757 request: tonic::Request<HandleCertificateRequestV1>,
758 ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
759 let epoch_store = self.state.load_epoch_store_one_call_per_task();
760 let request = request.into_inner();
761 request
762 .certificate
763 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
764
765 let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
766 self.handle_certificates(
767 nonempty![request.certificate],
768 request.include_events,
769 request.include_input_objects,
770 request.include_output_objects,
771 request.include_auxiliary_data,
772 &epoch_store,
773 true,
774 )
775 .instrument(span)
776 .await
777 .map(|(resp, spam_weight)| {
778 (
779 tonic::Response::new(
780 resp.expect(
781 "handle_certificate should not return none with wait_for_effects=true",
782 )
783 .remove(0),
784 ),
785 spam_weight,
786 )
787 })
788 }
789
790 async fn soft_bundle_validity_check(
791 &self,
792 certificates: &NonEmpty<CertifiedTransaction>,
793 epoch_store: &Arc<AuthorityPerEpochStore>,
794 total_size_bytes: u64,
795 ) -> Result<(), tonic::Status> {
796 let protocol_config = epoch_store.protocol_config();
797
798 fp_ensure!(
805 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
806 IotaError::UserInput {
807 error: UserInputError::TooManyTransactionsInSoftBundle {
808 limit: protocol_config.max_soft_bundle_size()
809 }
810 }
811 .into()
812 );
813
814 let soft_bundle_max_size_bytes =
820 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
821 fp_ensure!(
822 total_size_bytes <= soft_bundle_max_size_bytes,
823 IotaError::UserInput {
824 error: UserInputError::SoftBundleTooLarge {
825 size: total_size_bytes,
826 limit: soft_bundle_max_size_bytes,
827 },
828 }
829 .into()
830 );
831
832 let mut gas_price = None;
833 for certificate in certificates {
834 let tx_digest = *certificate.digest();
835 fp_ensure!(
836 certificate.contains_shared_object(),
837 IotaError::UserInput {
838 error: UserInputError::NoSharedObject { digest: tx_digest }
839 }
840 .into()
841 );
842 fp_ensure!(
843 !self.state.try_is_tx_already_executed(&tx_digest)?,
844 IotaError::UserInput {
845 error: UserInputError::AlreadyExecuted { digest: tx_digest }
846 }
847 .into()
848 );
849 if let Some(gas) = gas_price {
850 fp_ensure!(
851 gas == certificate.gas_price(),
852 IotaError::UserInput {
853 error: UserInputError::GasPriceMismatch {
854 digest: tx_digest,
855 expected: gas,
856 actual: certificate.gas_price()
857 }
858 }
859 .into()
860 );
861 } else {
862 gas_price = Some(certificate.gas_price());
863 }
864 }
865
866 fp_ensure!(
872 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
873 IotaError::UserInput {
874 error: UserInputError::CertificateAlreadyProcessed
875 }
876 .into()
877 );
878
879 Ok(())
880 }
881
882 async fn handle_soft_bundle_certificates_v1_impl(
883 &self,
884 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
885 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
886 let epoch_store = self.state.load_epoch_store_one_call_per_task();
887 let client_addr = if self.client_id_source.is_none() {
888 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
889 } else {
890 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
891 };
892 let request = request.into_inner();
893
894 let certificates =
895 NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
896 let mut total_size_bytes = 0;
897 for certificate in &certificates {
898 total_size_bytes += certificate
900 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
901 as u64;
902 }
903
904 self.metrics
905 .handle_soft_bundle_certificates_count
906 .observe(certificates.len() as f64);
907
908 self.metrics
909 .handle_soft_bundle_certificates_size_bytes
910 .observe(total_size_bytes as f64);
911
912 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
914 .await?;
915
916 info!(
917 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
918 certificates.len(),
919 client_addr
920 .map(|x| x.to_string())
921 .unwrap_or_else(|| "unknown".to_string()),
922 certificates
923 .iter()
924 .map(|x| x.digest().to_string())
925 .collect::<Vec<_>>()
926 .join(", "),
927 total_size_bytes
928 );
929
930 let span = error_span!("handle_soft_bundle_certificates_v1");
931 self.handle_certificates(
932 certificates,
933 request.include_events,
934 request.include_input_objects,
935 request.include_output_objects,
936 request.include_auxiliary_data,
937 &epoch_store,
938 request.wait_for_effects,
939 )
940 .instrument(span)
941 .await
942 .map(|(resp, spam_weight)| {
943 (
944 tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
945 responses: resp.unwrap_or_default(),
946 }),
947 spam_weight,
948 )
949 })
950 }
951
952 async fn object_info_impl(
953 &self,
954 request: tonic::Request<ObjectInfoRequest>,
955 ) -> WrappedServiceResponse<ObjectInfoResponse> {
956 let request = request.into_inner();
957 let response = self.state.handle_object_info_request(request).await?;
958 Ok((tonic::Response::new(response), Weight::one()))
959 }
960
961 async fn transaction_info_impl(
962 &self,
963 request: tonic::Request<TransactionInfoRequest>,
964 ) -> WrappedServiceResponse<TransactionInfoResponse> {
965 let request = request.into_inner();
966 let response = self.state.handle_transaction_info_request(request).await?;
967 Ok((tonic::Response::new(response), Weight::one()))
968 }
969
970 async fn checkpoint_impl(
971 &self,
972 request: tonic::Request<CheckpointRequest>,
973 ) -> WrappedServiceResponse<CheckpointResponse> {
974 let request = request.into_inner();
975 let response = self.state.handle_checkpoint_request(&request)?;
976 Ok((tonic::Response::new(response), Weight::one()))
977 }
978
979 async fn get_system_state_object_impl(
980 &self,
981 _request: tonic::Request<SystemStateRequest>,
982 ) -> WrappedServiceResponse<IotaSystemState> {
983 let response = self
984 .state
985 .get_object_cache_reader()
986 .try_get_iota_system_state_object_unsafe()?;
987 Ok((tonic::Response::new(response), Weight::one()))
988 }
989
990 fn get_client_ip_addr<T>(
991 &self,
992 request: &tonic::Request<T>,
993 source: &ClientIdSource,
994 ) -> Option<IpAddr> {
995 match source {
996 ClientIdSource::SocketAddr => {
997 let socket_addr: Option<SocketAddr> = request.remote_addr();
998
999 if let Some(socket_addr) = socket_addr {
1005 Some(socket_addr.ip())
1006 } else {
1007 if cfg!(msim) {
1008 } else if cfg!(test) {
1010 panic!("Failed to get remote address from request");
1011 } else {
1012 self.metrics.connection_ip_not_found.inc();
1013 error!("Failed to get remote address from request");
1014 }
1015 None
1016 }
1017 }
1018 ClientIdSource::XForwardedFor(num_hops) => {
1019 let do_header_parse = |op: &MetadataValue<Ascii>| {
1020 match op.to_str() {
1021 Ok(header_val) => {
1022 let header_contents =
1023 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1024 if *num_hops == 0 {
1025 error!(
1026 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1027 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1028 to this node. Skipping traffic controller request handling.",
1029 header_contents,
1030 );
1031 return None;
1032 }
1033 let contents_len = header_contents.len();
1034 if contents_len < *num_hops {
1035 error!(
1036 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1037 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1038 `client-id-source` in the node config.",
1039 header_contents, contents_len, num_hops, contents_len,
1040 );
1041 self.metrics.client_id_source_config_mismatch.inc();
1042 return None;
1043 }
1044 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1045 else {
1046 error!(
1047 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1048 Expected at least {} values. Skipping traffic controller request handling.",
1049 header_contents, contents_len, num_hops, contents_len,
1050 );
1051 return None;
1052 };
1053 parse_ip(client_ip).or_else(|| {
1054 self.metrics.forwarded_header_parse_error.inc();
1055 None
1056 })
1057 }
1058 Err(e) => {
1059 self.metrics.forwarded_header_invalid.inc();
1063 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1064 None
1065 }
1066 }
1067 };
1068 if let Some(op) = request.metadata().get("x-forwarded-for") {
1069 do_header_parse(op)
1070 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1071 do_header_parse(op)
1072 } else {
1073 self.metrics.forwarded_header_not_included.inc();
1074 error!(
1075 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1076 );
1077 None
1078 }
1079 }
1080 }
1081 }
1082
1083 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1084 if let Some(traffic_controller) = &self.traffic_controller {
1085 if !traffic_controller.check(&client, &None).await {
1086 Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1088 } else {
1089 Ok(())
1090 }
1091 } else {
1092 Ok(())
1093 }
1094 }
1095
1096 fn handle_traffic_resp<T>(
1097 &self,
1098 client: Option<IpAddr>,
1099 wrapped_response: WrappedServiceResponse<T>,
1100 ) -> Result<tonic::Response<T>, tonic::Status> {
1101 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1102 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1103 Err(status) => (
1104 Some(IotaError::from(status.clone())),
1105 Weight::zero(),
1106 Err(status.clone()),
1107 ),
1108 };
1109
1110 if let Some(traffic_controller) = self.traffic_controller.clone() {
1111 traffic_controller.tally(TrafficTally {
1112 direct: client,
1113 through_fullnode: None,
1114 error_info: error.map(|e| {
1115 let error_type = String::from(e.clone().as_ref());
1116 let error_weight = normalize(e);
1117 (error_weight, error_type)
1118 }),
1119 spam_weight,
1120 timestamp: SystemTime::now(),
1121 })
1122 }
1123 unwrapped_response
1124 }
1125
1126 async fn handle_capability_notification_v1_impl(
1127 &self,
1128 request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1129 ) -> WrappedServiceResponse<HandleCapabilityNotificationResponseV1> {
1130 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1131 let request = request.into_inner();
1132 fp_ensure!(
1133 epoch_store
1134 .protocol_config()
1135 .track_non_committee_eligible_validators(),
1136 IotaError::UnsupportedFeature {
1137 error: "capability notification endpoint is not supported in this Protocol Version"
1138 .to_string()
1139 }
1140 .into()
1141 );
1142 fp_ensure!(
1145 !self.state.is_fullnode(&epoch_store),
1146 IotaError::FullNodeCantHandleAuthorityCapabilities.into()
1147 );
1148
1149 let existing_capabilities = epoch_store.get_capabilities_v1()?;
1151 let incoming_capability = request.message.data();
1152
1153 info!(
1154 "Received capability notification: {:?}",
1155 incoming_capability
1156 );
1157
1158 if let Some(existing) = existing_capabilities
1159 .iter()
1160 .find(|cap| cap.authority == incoming_capability.authority)
1161 {
1162 if incoming_capability.generation <= existing.generation {
1163 return Ok((
1165 tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1166 Weight::one(),
1167 ));
1168 }
1169 }
1170 if let Err(error) = self.consensus_adapter.check_consensus_overload() {
1171 self.metrics
1172 .num_rejected_capability_notifications_during_overload
1173 .with_label_values(&[error.as_ref()])
1174 .inc();
1175 return Err(error.into());
1176 }
1177
1178 let _handle_tx_metrics_guard = self
1179 .metrics
1180 .handle_capability_notification_latency
1181 .start_timer();
1182
1183 let signed_authority_capabilities = request.message;
1184 let verified_authority_capabilities = epoch_store
1186 .verify_authority_capabilities(signed_authority_capabilities)
1187 .inspect_err(|_e| {
1188 self.metrics.signature_errors.inc();
1189 })?;
1190
1191 let authority_name = verified_authority_capabilities.authority;
1192 debug!("Verified capability notification for authority {authority_name:?}");
1194
1195 let signed_authority_capabilities_transaction =
1198 ConsensusTransaction::new_signed_capability_notification_v1(
1199 verified_authority_capabilities.into_inner(),
1200 );
1201
1202 self.consensus_adapter.submit(
1204 signed_authority_capabilities_transaction,
1205 None,
1206 &epoch_store,
1207 )?;
1208
1209 debug!("Submitted capability notification to consensus for authority {authority_name:?}");
1210
1211 Ok((
1212 tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1213 Weight::one(),
1214 ))
1215 }
1216}
1217
1218fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1219 let mut request = tonic::Request::new(message);
1222 let tcp_connect_info = TcpConnectInfo {
1223 local_addr: None,
1224 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1225 };
1226 request.extensions_mut().insert(tcp_connect_info);
1227 request
1228}
1229
1230fn normalize(err: IotaError) -> Weight {
1232 match err {
1233 IotaError::UserInput {
1234 error: UserInputError::IncorrectUserSignature { .. },
1235 } => Weight::one(),
1236 IotaError::InvalidSignature { .. }
1237 | IotaError::SignerSignatureAbsent { .. }
1238 | IotaError::SignerSignatureNumberMismatch { .. }
1239 | IotaError::IncorrectSigner { .. }
1240 | IotaError::UnknownSigner { .. }
1241 | IotaError::WrongEpoch { .. } => Weight::one(),
1242 _ => Weight::zero(),
1243 }
1244}
1245
1246#[macro_export]
1250macro_rules! handle_with_decoration {
1251 ($self:ident, $func_name:ident, $request:ident) => {{
1252 if $self.client_id_source.is_none() {
1253 return $self.$func_name($request).await.map(|(result, _)| result);
1254 }
1255
1256 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1257
1258 $self.handle_traffic_req(client.clone()).await?;
1260
1261 let wrapped_response = $self.$func_name($request).await;
1263 $self.handle_traffic_resp(client, wrapped_response)
1264 }};
1265}
1266
1267#[async_trait]
1268impl Validator for ValidatorService {
1269 async fn transaction(
1271 &self,
1272 request: tonic::Request<Transaction>,
1273 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1274 let validator_service = self.clone();
1275
1276 spawn_monitored_task!(async move {
1280 handle_with_decoration!(validator_service, transaction_impl, request)
1284 })
1285 .await
1286 .unwrap()
1287 }
1288
1289 async fn submit_certificate(
1291 &self,
1292 request: tonic::Request<CertifiedTransaction>,
1293 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1294 let validator_service = self.clone();
1295
1296 spawn_monitored_task!(async move {
1300 handle_with_decoration!(validator_service, submit_certificate_impl, request)
1304 })
1305 .await
1306 .unwrap()
1307 }
1308
1309 async fn handle_certificate_v1(
1310 &self,
1311 request: tonic::Request<HandleCertificateRequestV1>,
1312 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1313 handle_with_decoration!(self, handle_certificate_v1_impl, request)
1314 }
1315
1316 async fn handle_soft_bundle_certificates_v1(
1317 &self,
1318 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1319 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1320 handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1321 }
1322
1323 async fn object_info(
1325 &self,
1326 request: tonic::Request<ObjectInfoRequest>,
1327 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1328 handle_with_decoration!(self, object_info_impl, request)
1329 }
1330
1331 async fn transaction_info(
1333 &self,
1334 request: tonic::Request<TransactionInfoRequest>,
1335 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1336 handle_with_decoration!(self, transaction_info_impl, request)
1337 }
1338
1339 async fn checkpoint(
1341 &self,
1342 request: tonic::Request<CheckpointRequest>,
1343 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1344 handle_with_decoration!(self, checkpoint_impl, request)
1345 }
1346
1347 async fn get_system_state_object(
1349 &self,
1350 request: tonic::Request<SystemStateRequest>,
1351 ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1352 handle_with_decoration!(self, get_system_state_object_impl, request)
1353 }
1354
1355 async fn handle_capability_notification_v1(
1356 &self,
1357 request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1358 ) -> Result<tonic::Response<HandleCapabilityNotificationResponseV1>, tonic::Status> {
1359 handle_with_decoration!(self, handle_capability_notification_v1_impl, request)
1360 }
1361}