1use std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 sync::Arc,
10 time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19 api::{Validator, ValidatorServer},
20 tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24 effects::TransactionEffectsAPI,
25 error::*,
26 fp_ensure,
27 iota_system_state::IotaSystemState,
28 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29 messages_consensus::ConsensusTransaction,
30 messages_grpc::{
31 HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
32 HandleCertificateRequestV1, HandleCertificateResponseV1,
33 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
34 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
35 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
36 TransactionInfoResponse,
37 },
38 multiaddr::Multiaddr,
39 traffic_control::{ClientIdSource, Weight},
40 transaction::*,
41};
42use nonempty::{NonEmpty, nonempty};
43use prometheus::{
44 Gauge, Histogram, IntCounter, IntCounterVec, Registry, register_gauge_with_registry,
45 register_histogram_with_registry, register_int_counter_vec_with_registry,
46 register_int_counter_with_registry,
47};
48use tap::TapFallible;
49use tonic::{
50 metadata::{Ascii, MetadataValue},
51 transport::server::TcpConnectInfo,
52};
53use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
54
55use crate::{
56 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
57 checkpoints::CheckpointStore,
58 consensus_adapter::{
59 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
60 },
61 starfish_adapter::LazyStarfishClient,
62 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
63};
64
65#[cfg(test)]
66#[path = "unit_tests/server_tests.rs"]
67mod server_tests;
68
69pub struct AuthorityServerHandle {
71 server_handle: iota_network_stack::server::Server,
72}
73
74impl AuthorityServerHandle {
75 pub async fn join(self) -> Result<(), io::Error> {
77 self.server_handle.handle().wait_for_shutdown().await;
78 Ok(())
79 }
80
81 pub async fn kill(self) -> Result<(), io::Error> {
83 self.server_handle.handle().shutdown().await;
84 Ok(())
85 }
86
87 pub fn address(&self) -> &Multiaddr {
89 self.server_handle.local_addr()
90 }
91}
92
93pub struct AuthorityServer {
95 address: Multiaddr,
96 pub state: Arc<AuthorityState>,
97 consensus_adapter: Arc<ConsensusAdapter>,
98 pub metrics: Arc<ValidatorServiceMetrics>,
99}
100
101impl AuthorityServer {
102 pub fn new_for_test_with_consensus_adapter(
104 state: Arc<AuthorityState>,
105 consensus_adapter: Arc<ConsensusAdapter>,
106 ) -> Self {
107 let address = new_local_tcp_address_for_testing();
108 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
109
110 Self {
111 address,
112 state,
113 consensus_adapter,
114 metrics,
115 }
116 }
117
118 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
120 let consensus_adapter = Arc::new(ConsensusAdapter::new(
121 Arc::new(LazyStarfishClient::new()),
122 CheckpointStore::new_for_tests(),
123 state.name,
124 Arc::new(ConnectionMonitorStatusForTests {}),
125 100_000,
126 100_000,
127 None,
128 None,
129 ConsensusAdapterMetrics::new_test(),
130 ));
131 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
132 }
133
134 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
136 let address = self.address.clone();
137 self.spawn_with_bind_address_for_test(address).await
138 }
139
140 pub async fn spawn_with_bind_address_for_test(
142 self,
143 address: Multiaddr,
144 ) -> Result<AuthorityServerHandle, io::Error> {
145 let tls_config = iota_tls::create_rustls_server_config(
146 self.state.config.network_key_pair().copy().private(),
147 IOTA_TLS_SERVER_NAME.to_string(),
148 );
149 let server = iota_network_stack::config::Config::new()
150 .server_builder()
151 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
152 self.state,
153 self.consensus_adapter,
154 self.metrics,
155 )))
156 .bind(&address, Some(tls_config))
157 .await
158 .unwrap();
159 let local_addr = server.local_addr().to_owned();
160 info!("Listening to traffic on {local_addr}");
161 let handle = AuthorityServerHandle {
162 server_handle: server,
163 };
164 Ok(handle)
165 }
166}
167
168pub struct ValidatorServiceMetrics {
170 pub signature_errors: IntCounter,
171 pub tx_verification_latency: Histogram,
172 pub cert_verification_latency: Histogram,
173 pub consensus_latency: Histogram,
174 pub handle_transaction_latency: Histogram,
175 pub submit_certificate_consensus_latency: Histogram,
176 pub handle_certificate_consensus_latency: Histogram,
177 pub handle_certificate_non_consensus_latency: Histogram,
178 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
179 pub handle_soft_bundle_certificates_count: Histogram,
180 pub handle_soft_bundle_certificates_size_bytes: Histogram,
181 pub handle_capability_notification_latency: Histogram,
182
183 num_rejected_tx_in_epoch_boundary: IntCounter,
184 num_rejected_cert_in_epoch_boundary: IntCounter,
185 num_rejected_tx_during_overload: IntCounterVec,
186 num_rejected_cert_during_overload: IntCounterVec,
187 num_rejected_capability_notifications_during_overload: IntCounterVec,
188 connection_ip_not_found: IntCounter,
189 forwarded_header_parse_error: IntCounter,
190 forwarded_header_invalid: IntCounter,
191 forwarded_header_not_included: IntCounter,
192 client_id_source_config_mismatch: IntCounter,
193 x_forwarded_for_num_hops: Gauge,
194}
195
196impl ValidatorServiceMetrics {
197 pub fn new(registry: &Registry) -> Self {
199 Self {
200 signature_errors: register_int_counter_with_registry!(
201 "total_signature_errors",
202 "Number of transaction signature errors",
203 registry,
204 )
205 .unwrap(),
206 tx_verification_latency: register_histogram_with_registry!(
207 "validator_service_tx_verification_latency",
208 "Latency of verifying a transaction",
209 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
210 registry,
211 )
212 .unwrap(),
213 cert_verification_latency: register_histogram_with_registry!(
214 "validator_service_cert_verification_latency",
215 "Latency of verifying a certificate",
216 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
217 registry,
218 )
219 .unwrap(),
220 consensus_latency: register_histogram_with_registry!(
221 "validator_service_consensus_latency",
222 "Time spent between submitting a shared obj txn to consensus and getting result",
223 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
224 registry,
225 )
226 .unwrap(),
227 handle_transaction_latency: register_histogram_with_registry!(
228 "validator_service_handle_transaction_latency",
229 "Latency of handling a transaction",
230 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231 registry,
232 )
233 .unwrap(),
234 handle_certificate_consensus_latency: register_histogram_with_registry!(
235 "validator_service_handle_certificate_consensus_latency",
236 "Latency of handling a consensus transaction certificate",
237 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap(),
241 submit_certificate_consensus_latency: register_histogram_with_registry!(
242 "validator_service_submit_certificate_consensus_latency",
243 "Latency of submit_certificate RPC handler",
244 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
245 registry,
246 )
247 .unwrap(),
248 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
249 "validator_service_handle_certificate_non_consensus_latency",
250 "Latency of handling a non-consensus transaction certificate",
251 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
252 registry,
253 )
254 .unwrap(),
255 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
256 "validator_service_handle_soft_bundle_certificates_consensus_latency",
257 "Latency of handling a consensus soft bundle",
258 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
259 registry,
260 )
261 .unwrap(),
262 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
263 "validator_service_handle_soft_bundle_certificates_count",
264 "The number of certificates included in a soft bundle",
265 iota_metrics::COUNT_BUCKETS.to_vec(),
266 registry,
267 )
268 .unwrap(),
269 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
270 "validator_service_handle_soft_bundle_certificates_size_bytes",
271 "The size of soft bundle in bytes",
272 iota_metrics::BYTES_BUCKETS.to_vec(),
273 registry,
274 )
275 .unwrap(),
276 handle_capability_notification_latency: register_histogram_with_registry!(
277 "validator_service_handle_capability_notification_latency",
278 "Latency of handling a capability notification",
279 iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
280 registry,
281 )
282 .unwrap(),
283 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
284 "validator_service_num_rejected_tx_in_epoch_boundary",
285 "Number of rejected transaction during epoch transitioning",
286 registry,
287 )
288 .unwrap(),
289 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
290 "validator_service_num_rejected_cert_in_epoch_boundary",
291 "Number of rejected transaction certificate during epoch transitioning",
292 registry,
293 )
294 .unwrap(),
295 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
296 "validator_service_num_rejected_tx_during_overload",
297 "Number of rejected transaction due to system overload",
298 &["error_type"],
299 registry,
300 )
301 .unwrap(),
302 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
303 "validator_service_num_rejected_cert_during_overload",
304 "Number of rejected transaction certificate due to system overload",
305 &["error_type"],
306 registry,
307 )
308 .unwrap(),
309 num_rejected_capability_notifications_during_overload: register_int_counter_vec_with_registry!(
310 "num_rejected_capability_notifications_during_overload",
311 "Number of rejected capability notifications from non-committee active validators due to system overload",
312 &["error_type"],
313 registry,
314 )
315 .unwrap(),
316 connection_ip_not_found: register_int_counter_with_registry!(
317 "validator_service_connection_ip_not_found",
318 "Number of times connection IP was not extractable from request",
319 registry,
320 )
321 .unwrap(),
322 forwarded_header_parse_error: register_int_counter_with_registry!(
323 "validator_service_forwarded_header_parse_error",
324 "Number of times x-forwarded-for header could not be parsed",
325 registry,
326 )
327 .unwrap(),
328 forwarded_header_invalid: register_int_counter_with_registry!(
329 "validator_service_forwarded_header_invalid",
330 "Number of times x-forwarded-for header was invalid",
331 registry,
332 )
333 .unwrap(),
334 forwarded_header_not_included: register_int_counter_with_registry!(
335 "validator_service_forwarded_header_not_included",
336 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
337 registry,
338 )
339 .unwrap(),
340 client_id_source_config_mismatch: register_int_counter_with_registry!(
341 "validator_service_client_id_source_config_mismatch",
342 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
343 registry,
344 )
345 .unwrap(),
346 x_forwarded_for_num_hops: register_gauge_with_registry!(
347 "validator_service_x_forwarded_for_num_hops",
348 "Number of hops in x-forwarded-for header",
349 registry,
350 )
351 .unwrap(),
352 }
353 }
354
355 pub fn new_for_tests() -> Self {
357 let registry = Registry::new();
358 Self::new(®istry)
359 }
360}
361
362#[derive(Clone)]
364pub struct ValidatorService {
365 state: Arc<AuthorityState>,
366 consensus_adapter: Arc<ConsensusAdapter>,
367 metrics: Arc<ValidatorServiceMetrics>,
368 traffic_controller: Option<Arc<TrafficController>>,
369 client_id_source: Option<ClientIdSource>,
370}
371
372impl ValidatorService {
373 pub fn new(
375 state: Arc<AuthorityState>,
376 consensus_adapter: Arc<ConsensusAdapter>,
377 validator_metrics: Arc<ValidatorServiceMetrics>,
378 client_id_source: Option<ClientIdSource>,
379 ) -> Self {
380 let traffic_controller = state.traffic_controller.clone();
381 Self {
382 state,
383 consensus_adapter,
384 metrics: validator_metrics,
385 traffic_controller,
386 client_id_source,
387 }
388 }
389
390 pub fn new_for_tests(
391 state: Arc<AuthorityState>,
392 consensus_adapter: Arc<ConsensusAdapter>,
393 metrics: Arc<ValidatorServiceMetrics>,
394 ) -> Self {
395 Self {
396 state,
397 consensus_adapter,
398 metrics,
399 traffic_controller: None,
400 client_id_source: None,
401 }
402 }
403
404 pub fn validator_state(&self) -> &Arc<AuthorityState> {
406 &self.state
407 }
408
409 pub async fn execute_certificate_for_testing(
411 &self,
412 cert: CertifiedTransaction,
413 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
414 let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
415 self.handle_certificate_v1(request).await
416 }
417
418 pub async fn handle_transaction_for_benchmarking(
420 &self,
421 transaction: Transaction,
422 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
423 let request = make_tonic_request_for_testing(transaction);
424 self.transaction(request).await
425 }
426
427 async fn handle_transaction(
429 &self,
430 request: tonic::Request<Transaction>,
431 ) -> WrappedServiceResponse<HandleTransactionResponse> {
432 let Self {
433 state,
434 consensus_adapter,
435 metrics,
436 traffic_controller: _,
437 client_id_source: _,
438 } = self.clone();
439 let transaction = request.into_inner();
440 let epoch_store = state.load_epoch_store_one_call_per_task();
441
442 transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
443
444 let mut validator_pushback_error = None;
453 let overload_check_res = state.check_system_overload(
454 &consensus_adapter,
455 transaction.data(),
456 state.check_system_overload_at_signing(),
457 );
458 if let Err(error) = overload_check_res {
459 metrics
460 .num_rejected_tx_during_overload
461 .with_label_values(&[error.as_ref()])
462 .inc();
463 match error {
465 IotaError::ValidatorOverloadedRetryAfter { .. } => {
466 validator_pushback_error = Some(error)
467 }
468 _ => return Err(error.into()),
469 }
470 }
471
472 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
473
474 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
475 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
476 metrics.signature_errors.inc();
477 })?;
478 drop(tx_verif_metrics_guard);
479
480 let tx_digest = transaction.digest();
481
482 let span = error_span!("validator_state_process_tx", ?tx_digest);
484
485 let info = state
486 .handle_transaction(&epoch_store, transaction.clone())
487 .instrument(span)
488 .await
489 .tap_err(|e| {
490 if let IotaError::ValidatorHaltedAtEpochEnd = e {
491 metrics.num_rejected_tx_in_epoch_boundary.inc();
492 }
493 })?;
494
495 if let Some(error) = validator_pushback_error {
496 return Err(error.into());
499 }
500
501 Ok((tonic::Response::new(info), Weight::zero()))
502 }
503
504 async fn handle_certificates(
510 &self,
511 certificates: NonEmpty<CertifiedTransaction>,
512 include_events: bool,
513 include_input_objects: bool,
514 include_output_objects: bool,
515 _include_auxiliary_data: bool,
516 epoch_store: &Arc<AuthorityPerEpochStore>,
517 wait_for_effects: bool,
518 ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
519 fp_ensure!(
522 !self.state.is_fullnode(epoch_store),
523 IotaError::FullNodeCantHandleCertificate.into()
524 );
525
526 let shared_object_tx = certificates
527 .iter()
528 .any(|cert| cert.contains_shared_object());
529
530 let metrics = if certificates.len() == 1 {
531 if wait_for_effects {
532 if shared_object_tx {
533 &self.metrics.handle_certificate_consensus_latency
534 } else {
535 &self.metrics.handle_certificate_non_consensus_latency
536 }
537 } else {
538 &self.metrics.submit_certificate_consensus_latency
539 }
540 } else {
541 &self
544 .metrics
545 .handle_soft_bundle_certificates_consensus_latency
546 };
547
548 let _metrics_guard = metrics.start_timer();
549
550 if certificates.len() == 1 {
555 let tx_digest = *certificates[0].digest();
556
557 if let Some(signed_effects) = self
558 .state
559 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
560 {
561 let events = if include_events {
562 if signed_effects.events_digest().is_some() {
563 Some(
564 self.state
565 .get_transaction_events(signed_effects.transaction_digest())?,
566 )
567 } else {
568 None
569 }
570 } else {
571 None
572 };
573
574 return Ok((
575 Some(vec![HandleCertificateResponseV1 {
576 signed_effects: signed_effects.into_inner(),
577 events,
578 input_objects: None,
579 output_objects: None,
580 auxiliary_data: None,
581 }]),
582 Weight::one(),
583 ));
584 };
585 }
586
587 for certificate in &certificates {
590 let overload_check_res = self.state.check_system_overload(
591 &self.consensus_adapter,
592 certificate.data(),
593 self.state.check_system_overload_at_execution(),
594 );
595 if let Err(error) = overload_check_res {
596 self.metrics
597 .num_rejected_cert_during_overload
598 .with_label_values(&[error.as_ref()])
599 .inc();
600 return Err(error.into());
601 }
602 }
603
604 let verified_certificates = {
605 let _timer = self.metrics.cert_verification_latency.start_timer();
606 epoch_store
607 .signature_verifier
608 .multi_verify_certs(certificates.into())
609 .instrument(trace_span!("SignatureVerifier::multi_verify_certs"))
610 .await
611 .into_iter()
612 .collect::<Result<Vec<_>, _>>()?
613 };
614
615 {
616 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
618 if !reconfiguration_lock.should_accept_user_certs() {
619 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
620 return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
621 }
622
623 if !epoch_store
629 .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
630 {
631 let _metrics_guard = if shared_object_tx {
632 Some(self.metrics.consensus_latency.start_timer())
633 } else {
634 None
635 };
636 let transactions = verified_certificates
637 .iter()
638 .map(|certificate| {
639 ConsensusTransaction::new_certificate_message(
640 &self.state.name,
641 certificate.clone().into(),
642 )
643 })
644 .collect::<Vec<_>>();
645 self.consensus_adapter.submit_batch(
646 &transactions,
647 Some(&reconfiguration_lock),
648 epoch_store,
649 )?;
650 }
654 }
655
656 if !wait_for_effects {
657 let certificates_without_shared_objects = verified_certificates
660 .iter()
661 .filter(|certificate| !certificate.contains_shared_object())
662 .cloned()
663 .collect::<Vec<_>>();
664 if !certificates_without_shared_objects.is_empty() {
665 self.state.enqueue_certificates_for_execution(
666 certificates_without_shared_objects,
667 epoch_store,
668 );
669 }
670 return Ok((None, Weight::zero()));
671 }
672
673 let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
677 |certificate| async move {
678 let effects = self
679 .state
680 .execute_certificate(&certificate, epoch_store)
681 .await?;
682 let events = if include_events {
683 if effects.events_digest().is_some() {
684 Some(
685 self.state
686 .get_transaction_events(effects.transaction_digest())?,
687 )
688 } else {
689 None
690 }
691 } else {
692 None
693 };
694
695 let input_objects = include_input_objects
696 .then(|| self.state.get_transaction_input_objects(&effects))
697 .and_then(|res| {
698 res.map_err(|e| {
699 warn!(
700 tx_digest = ?effects.transaction_digest(),
701 error = ?e,
702 "Failed to load transaction input objects requested by client",
703 )
704 })
705 .ok()
706 });
707
708 let output_objects = include_output_objects
709 .then(|| self.state.get_transaction_output_objects(&effects))
710 .and_then(|res| {
711 res.map_err(|e| {
712 warn!(
713 tx_digest = ?effects.transaction_digest(),
714 error = ?e,
715 "Failed to load transaction output objects requested by client",
716 )
717 })
718 .ok()
719 });
720
721 let signed_effects = self.state.sign_effects(effects, epoch_store)?;
722 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
723
724 Ok::<_, IotaError>(HandleCertificateResponseV1 {
725 signed_effects: signed_effects.into_inner(),
726 events,
727 input_objects,
728 output_objects,
729 auxiliary_data: None, })
731 },
732 ))
733 .await?;
734
735 Ok((Some(responses), Weight::zero()))
736 }
737}
738
739type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
740
741impl ValidatorService {
742 async fn transaction_impl(
743 &self,
744 request: tonic::Request<Transaction>,
745 ) -> WrappedServiceResponse<HandleTransactionResponse> {
746 self.handle_transaction(request)
747 .instrument(trace_span!("ValidatorService::handle_transaction"))
748 .await
749 }
750
751 async fn submit_certificate_impl(
752 &self,
753 request: tonic::Request<CertifiedTransaction>,
754 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
755 let epoch_store = self.state.load_epoch_store_one_call_per_task();
756 let certificate = request.into_inner();
757 certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
758
759 let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
760 self.handle_certificates(
761 nonempty![certificate],
762 true,
763 false,
764 false,
765 false,
766 &epoch_store,
767 false,
768 )
769 .instrument(span)
770 .await
771 .map(|(executed, spam_weight)| {
772 (
773 tonic::Response::new(SubmitCertificateResponse {
774 executed: executed.map(|mut x| x.remove(0)),
775 }),
776 spam_weight,
777 )
778 })
779 }
780
781 async fn handle_certificate_v1_impl(
782 &self,
783 request: tonic::Request<HandleCertificateRequestV1>,
784 ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
785 let epoch_store = self.state.load_epoch_store_one_call_per_task();
786 let request = request.into_inner();
787 request
788 .certificate
789 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
790
791 let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
792 self.handle_certificates(
793 nonempty![request.certificate],
794 request.include_events,
795 request.include_input_objects,
796 request.include_output_objects,
797 request.include_auxiliary_data,
798 &epoch_store,
799 true,
800 )
801 .instrument(span)
802 .await
803 .map(|(resp, spam_weight)| {
804 (
805 tonic::Response::new(
806 resp.expect(
807 "handle_certificate should not return none with wait_for_effects=true",
808 )
809 .remove(0),
810 ),
811 spam_weight,
812 )
813 })
814 }
815
816 async fn soft_bundle_validity_check(
817 &self,
818 certificates: &NonEmpty<CertifiedTransaction>,
819 epoch_store: &Arc<AuthorityPerEpochStore>,
820 total_size_bytes: u64,
821 ) -> Result<(), tonic::Status> {
822 let protocol_config = epoch_store.protocol_config();
823
824 fp_ensure!(
831 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
832 IotaError::UserInput {
833 error: UserInputError::TooManyTransactionsInSoftBundle {
834 limit: protocol_config.max_soft_bundle_size()
835 }
836 }
837 .into()
838 );
839
840 let soft_bundle_max_size_bytes =
846 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
847 fp_ensure!(
848 total_size_bytes <= soft_bundle_max_size_bytes,
849 IotaError::UserInput {
850 error: UserInputError::SoftBundleTooLarge {
851 size: total_size_bytes,
852 limit: soft_bundle_max_size_bytes,
853 },
854 }
855 .into()
856 );
857
858 let mut gas_price = None;
859 for certificate in certificates {
860 let tx_digest = *certificate.digest();
861 fp_ensure!(
862 certificate.contains_shared_object(),
863 IotaError::UserInput {
864 error: UserInputError::NoSharedObject { digest: tx_digest }
865 }
866 .into()
867 );
868 fp_ensure!(
869 !self.state.try_is_tx_already_executed(&tx_digest)?,
870 IotaError::UserInput {
871 error: UserInputError::AlreadyExecuted { digest: tx_digest }
872 }
873 .into()
874 );
875 if let Some(gas) = gas_price {
876 fp_ensure!(
877 gas == certificate.gas_price(),
878 IotaError::UserInput {
879 error: UserInputError::GasPriceMismatch {
880 digest: tx_digest,
881 expected: gas,
882 actual: certificate.gas_price()
883 }
884 }
885 .into()
886 );
887 } else {
888 gas_price = Some(certificate.gas_price());
889 }
890 }
891
892 fp_ensure!(
898 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
899 IotaError::UserInput {
900 error: UserInputError::CertificateAlreadyProcessed
901 }
902 .into()
903 );
904
905 Ok(())
906 }
907
908 async fn handle_soft_bundle_certificates_v1_impl(
909 &self,
910 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
911 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
912 let epoch_store = self.state.load_epoch_store_one_call_per_task();
913 let client_addr = if let Some(client_id_source) = &self.client_id_source {
914 self.get_client_ip_addr(&request, client_id_source)
915 } else {
916 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
917 };
918
919 let request = request.into_inner();
920
921 let certificates =
922 NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
923 let mut total_size_bytes = 0;
924 for certificate in &certificates {
925 total_size_bytes += certificate
927 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
928 as u64;
929 }
930
931 self.metrics
932 .handle_soft_bundle_certificates_count
933 .observe(certificates.len() as f64);
934
935 self.metrics
936 .handle_soft_bundle_certificates_size_bytes
937 .observe(total_size_bytes as f64);
938
939 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
941 .await?;
942
943 info!(
944 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
945 certificates.len(),
946 client_addr
947 .map(|x| x.to_string())
948 .unwrap_or_else(|| "unknown".to_string()),
949 certificates
950 .iter()
951 .map(|x| x.digest().to_string())
952 .collect::<Vec<_>>()
953 .join(", "),
954 total_size_bytes
955 );
956
957 let span = error_span!("handle_soft_bundle_certificates_v1");
958 self.handle_certificates(
959 certificates,
960 request.include_events,
961 request.include_input_objects,
962 request.include_output_objects,
963 request.include_auxiliary_data,
964 &epoch_store,
965 request.wait_for_effects,
966 )
967 .instrument(span)
968 .await
969 .map(|(resp, spam_weight)| {
970 (
971 tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
972 responses: resp.unwrap_or_default(),
973 }),
974 spam_weight,
975 )
976 })
977 }
978
979 async fn object_info_impl(
980 &self,
981 request: tonic::Request<ObjectInfoRequest>,
982 ) -> WrappedServiceResponse<ObjectInfoResponse> {
983 let request = request.into_inner();
984 let response = self.state.handle_object_info_request(request).await?;
985 Ok((tonic::Response::new(response), Weight::one()))
986 }
987
988 async fn transaction_info_impl(
989 &self,
990 request: tonic::Request<TransactionInfoRequest>,
991 ) -> WrappedServiceResponse<TransactionInfoResponse> {
992 let request = request.into_inner();
993 let response = self.state.handle_transaction_info_request(request).await?;
994 Ok((tonic::Response::new(response), Weight::one()))
995 }
996
997 async fn checkpoint_impl(
998 &self,
999 request: tonic::Request<CheckpointRequest>,
1000 ) -> WrappedServiceResponse<CheckpointResponse> {
1001 let request = request.into_inner();
1002 let response = self.state.handle_checkpoint_request(&request)?;
1003 Ok((tonic::Response::new(response), Weight::one()))
1004 }
1005
1006 async fn get_system_state_object_impl(
1007 &self,
1008 _request: tonic::Request<SystemStateRequest>,
1009 ) -> WrappedServiceResponse<IotaSystemState> {
1010 let response = self
1011 .state
1012 .get_object_cache_reader()
1013 .try_get_iota_system_state_object_unsafe()?;
1014 Ok((tonic::Response::new(response), Weight::one()))
1015 }
1016
1017 fn get_client_ip_addr<T>(
1018 &self,
1019 request: &tonic::Request<T>,
1020 source: &ClientIdSource,
1021 ) -> Option<IpAddr> {
1022 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1023
1024 if let Some(header) = forwarded_header {
1025 let num_hops = header
1026 .to_str()
1027 .map(|h| h.split(',').count().saturating_sub(1))
1028 .unwrap_or(0);
1029
1030 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1031 }
1032
1033 match source {
1034 ClientIdSource::SocketAddr => {
1035 let socket_addr: Option<SocketAddr> = request.remote_addr();
1036
1037 if let Some(socket_addr) = socket_addr {
1043 Some(socket_addr.ip())
1044 } else {
1045 if cfg!(msim) {
1046 } else if cfg!(test) {
1048 panic!("Failed to get remote address from request");
1049 } else {
1050 self.metrics.connection_ip_not_found.inc();
1051 error!("Failed to get remote address from request");
1052 }
1053 None
1054 }
1055 }
1056 ClientIdSource::XForwardedFor(num_hops) => {
1057 let do_header_parse = |op: &MetadataValue<Ascii>| {
1058 match op.to_str() {
1059 Ok(header_val) => {
1060 let header_contents =
1061 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1062 if *num_hops == 0 {
1063 error!(
1064 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1065 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1066 to this node. Skipping traffic controller request handling.",
1067 header_contents,
1068 );
1069 return None;
1070 }
1071 let contents_len = header_contents.len();
1072 if contents_len < *num_hops {
1073 error!(
1074 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1075 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1076 `client-id-source` in the node config.",
1077 header_contents, contents_len, num_hops, contents_len,
1078 );
1079 self.metrics.client_id_source_config_mismatch.inc();
1080 return None;
1081 }
1082 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1083 else {
1084 error!(
1085 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1086 Expected at least {} values. Skipping traffic controller request handling.",
1087 header_contents, contents_len, num_hops, contents_len,
1088 );
1089 return None;
1090 };
1091 parse_ip(client_ip).or_else(|| {
1092 self.metrics.forwarded_header_parse_error.inc();
1093 None
1094 })
1095 }
1096 Err(e) => {
1097 self.metrics.forwarded_header_invalid.inc();
1101 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1102 None
1103 }
1104 }
1105 };
1106 if let Some(op) = request.metadata().get("x-forwarded-for") {
1107 do_header_parse(op)
1108 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1109 do_header_parse(op)
1110 } else {
1111 self.metrics.forwarded_header_not_included.inc();
1112 error!(
1113 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1114 );
1115 None
1116 }
1117 }
1118 }
1119 }
1120
1121 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1122 if let Some(traffic_controller) = &self.traffic_controller {
1123 if !traffic_controller.check(&client, &None).await {
1124 Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1126 } else {
1127 Ok(())
1128 }
1129 } else {
1130 Ok(())
1131 }
1132 }
1133
1134 fn handle_traffic_resp<T>(
1135 &self,
1136 client: Option<IpAddr>,
1137 wrapped_response: WrappedServiceResponse<T>,
1138 ) -> Result<tonic::Response<T>, tonic::Status> {
1139 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1140 Ok((result, spam_weight)) => (None, spam_weight, Ok(result)),
1141 Err(status) => (
1142 Some(IotaError::from(status.clone())),
1143 Weight::zero(),
1144 Err(status),
1145 ),
1146 };
1147
1148 if let Some(traffic_controller) = self.traffic_controller.clone() {
1149 traffic_controller.tally(TrafficTally {
1150 direct: client,
1151 through_fullnode: None,
1152 error_info: error.map(|e| {
1153 let error_type = String::from(e.as_ref());
1154 let error_weight = normalize(e);
1155 (error_weight, error_type)
1156 }),
1157 spam_weight,
1158 timestamp: SystemTime::now(),
1159 })
1160 }
1161 unwrapped_response
1162 }
1163
1164 async fn handle_capability_notification_v1_impl(
1165 &self,
1166 request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1167 ) -> WrappedServiceResponse<HandleCapabilityNotificationResponseV1> {
1168 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1169 let request = request.into_inner();
1170 fp_ensure!(
1171 epoch_store
1172 .protocol_config()
1173 .track_non_committee_eligible_validators(),
1174 IotaError::UnsupportedFeature {
1175 error: "capability notification endpoint is not supported in this Protocol Version"
1176 .to_string()
1177 }
1178 .into()
1179 );
1180 fp_ensure!(
1183 !self.state.is_fullnode(&epoch_store),
1184 IotaError::FullNodeCantHandleAuthorityCapabilities.into()
1185 );
1186
1187 let existing_capabilities = epoch_store.get_capabilities_v1()?;
1189 let incoming_capability = request.message.data();
1190
1191 info!(
1192 "Received capability notification: {:?}",
1193 incoming_capability
1194 );
1195
1196 if let Some(existing) = existing_capabilities
1197 .iter()
1198 .find(|cap| cap.authority == incoming_capability.authority)
1199 {
1200 if incoming_capability.generation <= existing.generation {
1201 return Ok((
1203 tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1204 Weight::one(),
1205 ));
1206 }
1207 }
1208 if let Err(error) = self.consensus_adapter.check_consensus_overload() {
1209 self.metrics
1210 .num_rejected_capability_notifications_during_overload
1211 .with_label_values(&[error.as_ref()])
1212 .inc();
1213 return Err(error.into());
1214 }
1215
1216 let _handle_tx_metrics_guard = self
1217 .metrics
1218 .handle_capability_notification_latency
1219 .start_timer();
1220
1221 let signed_authority_capabilities = request.message;
1222 let verified_authority_capabilities = epoch_store
1224 .verify_authority_capabilities(signed_authority_capabilities)
1225 .inspect_err(|_e| {
1226 self.metrics.signature_errors.inc();
1227 })?;
1228
1229 let authority_name = verified_authority_capabilities.authority;
1230 debug!("Verified capability notification for authority {authority_name:?}");
1232
1233 let signed_authority_capabilities_transaction =
1236 ConsensusTransaction::new_signed_capability_notification_v1(
1237 verified_authority_capabilities.into_inner(),
1238 );
1239
1240 self.consensus_adapter.submit(
1242 signed_authority_capabilities_transaction,
1243 None,
1244 &epoch_store,
1245 )?;
1246
1247 debug!("Submitted capability notification to consensus for authority {authority_name:?}");
1248
1249 Ok((
1250 tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1251 Weight::one(),
1252 ))
1253 }
1254}
1255
1256fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1257 let mut request = tonic::Request::new(message);
1260 let tcp_connect_info = TcpConnectInfo {
1261 local_addr: None,
1262 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1263 };
1264 request.extensions_mut().insert(tcp_connect_info);
1265 request
1266}
1267
1268fn normalize(err: IotaError) -> Weight {
1270 match err {
1271 IotaError::UserInput {
1272 error: UserInputError::IncorrectUserSignature { .. },
1273 } => Weight::one(),
1274 IotaError::InvalidSignature { .. }
1275 | IotaError::SignerSignatureAbsent { .. }
1276 | IotaError::SignerSignatureNumberMismatch { .. }
1277 | IotaError::IncorrectSigner { .. }
1278 | IotaError::UnknownSigner { .. }
1279 | IotaError::WrongEpoch { .. } => Weight::one(),
1280 _ => Weight::zero(),
1281 }
1282}
1283
1284#[macro_export]
1288macro_rules! handle_with_decoration {
1289 ($self:ident, $func_name:ident, $request:ident) => {{
1290 if $self.client_id_source.is_none() {
1291 return $self.$func_name($request).await.map(|(result, _)| result);
1292 }
1293
1294 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1295
1296 $self.handle_traffic_req(client.clone()).await?;
1298
1299 let wrapped_response = $self.$func_name($request).await;
1301 $self.handle_traffic_resp(client, wrapped_response)
1302 }};
1303}
1304
1305#[async_trait]
1306impl Validator for ValidatorService {
1307 async fn transaction(
1309 &self,
1310 request: tonic::Request<Transaction>,
1311 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1312 let validator_service = self.clone();
1313
1314 spawn_monitored_task!(async move {
1318 handle_with_decoration!(validator_service, transaction_impl, request)
1322 })
1323 .await
1324 .unwrap()
1325 }
1326
1327 async fn submit_certificate(
1329 &self,
1330 request: tonic::Request<CertifiedTransaction>,
1331 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1332 let validator_service = self.clone();
1333
1334 spawn_monitored_task!(async move {
1338 handle_with_decoration!(validator_service, submit_certificate_impl, request)
1342 })
1343 .await
1344 .unwrap()
1345 }
1346
1347 async fn handle_certificate_v1(
1348 &self,
1349 request: tonic::Request<HandleCertificateRequestV1>,
1350 ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1351 handle_with_decoration!(self, handle_certificate_v1_impl, request)
1352 }
1353
1354 async fn handle_soft_bundle_certificates_v1(
1355 &self,
1356 request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1357 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1358 handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1359 }
1360
1361 async fn object_info(
1363 &self,
1364 request: tonic::Request<ObjectInfoRequest>,
1365 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1366 handle_with_decoration!(self, object_info_impl, request)
1367 }
1368
1369 async fn transaction_info(
1371 &self,
1372 request: tonic::Request<TransactionInfoRequest>,
1373 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1374 handle_with_decoration!(self, transaction_info_impl, request)
1375 }
1376
1377 async fn checkpoint(
1379 &self,
1380 request: tonic::Request<CheckpointRequest>,
1381 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1382 handle_with_decoration!(self, checkpoint_impl, request)
1383 }
1384
1385 async fn get_system_state_object(
1387 &self,
1388 request: tonic::Request<SystemStateRequest>,
1389 ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1390 handle_with_decoration!(self, get_system_state_object_impl, request)
1391 }
1392
1393 async fn handle_capability_notification_v1(
1394 &self,
1395 request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1396 ) -> Result<tonic::Response<HandleCapabilityNotificationResponseV1>, tonic::Status> {
1397 handle_with_decoration!(self, handle_capability_notification_v1_impl, request)
1398 }
1399}