iota_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    io,
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10    time::SystemTime,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use fastcrypto::traits::KeyPair;
16use iota_config::local_ip_utils::new_local_tcp_address_for_testing;
17use iota_metrics::spawn_monitored_task;
18use iota_network::{
19    api::{Validator, ValidatorServer},
20    tonic,
21};
22use iota_network_stack::server::IOTA_TLS_SERVER_NAME;
23use iota_types::{
24    effects::TransactionEffectsAPI,
25    error::*,
26    fp_ensure,
27    iota_system_state::IotaSystemState,
28    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
29    messages_consensus::ConsensusTransaction,
30    messages_grpc::{
31        HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
32        HandleCertificateRequestV1, HandleCertificateResponseV1,
33        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
34        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
35        SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest,
36        TransactionInfoResponse,
37    },
38    multiaddr::Multiaddr,
39    traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight},
40    transaction::*,
41};
42use nonempty::{NonEmpty, nonempty};
43use prometheus::{
44    Histogram, IntCounter, IntCounterVec, Registry, register_histogram_with_registry,
45    register_int_counter_vec_with_registry, register_int_counter_with_registry,
46};
47use tap::TapFallible;
48use tonic::{
49    metadata::{Ascii, MetadataValue},
50    transport::server::TcpConnectInfo,
51};
52use tracing::{Instrument, debug, error, error_span, info};
53
54use crate::{
55    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
56    checkpoints::CheckpointStore,
57    consensus_adapter::{
58        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
59    },
60    mysticeti_adapter::LazyMysticetiClient,
61    traffic_controller::{
62        TrafficController, metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally,
63    },
64};
65
66#[cfg(test)]
67#[path = "unit_tests/server_tests.rs"]
68mod server_tests;
69
70/// A handle to the authority server.
71pub struct AuthorityServerHandle {
72    server_handle: iota_network_stack::server::Server,
73}
74
75impl AuthorityServerHandle {
76    /// Waits for the server to complete.
77    pub async fn join(self) -> Result<(), io::Error> {
78        self.server_handle.handle().wait_for_shutdown().await;
79        Ok(())
80    }
81
82    /// Kills the server.
83    pub async fn kill(self) -> Result<(), io::Error> {
84        self.server_handle.handle().shutdown().await;
85        Ok(())
86    }
87
88    /// Returns the address of the server.
89    pub fn address(&self) -> &Multiaddr {
90        self.server_handle.local_addr()
91    }
92}
93
94/// An authority server that is used for testing.
95pub struct AuthorityServer {
96    address: Multiaddr,
97    pub state: Arc<AuthorityState>,
98    consensus_adapter: Arc<ConsensusAdapter>,
99    pub metrics: Arc<ValidatorServiceMetrics>,
100}
101
102impl AuthorityServer {
103    /// Creates a new `AuthorityServer` for testing with a consensus adapter.
104    pub fn new_for_test_with_consensus_adapter(
105        state: Arc<AuthorityState>,
106        consensus_adapter: Arc<ConsensusAdapter>,
107    ) -> Self {
108        let address = new_local_tcp_address_for_testing();
109        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
110
111        Self {
112            address,
113            state,
114            consensus_adapter,
115            metrics,
116        }
117    }
118
119    /// Creates a new `AuthorityServer` for testing.
120    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
121        let consensus_adapter = Arc::new(ConsensusAdapter::new(
122            Arc::new(LazyMysticetiClient::new()),
123            CheckpointStore::new_for_tests(),
124            state.name,
125            Arc::new(ConnectionMonitorStatusForTests {}),
126            100_000,
127            100_000,
128            None,
129            None,
130            ConsensusAdapterMetrics::new_test(),
131        ));
132        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
133    }
134
135    /// Spawns the server.
136    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
137        let address = self.address.clone();
138        self.spawn_with_bind_address_for_test(address).await
139    }
140
141    /// Spawns the server with a bind address.
142    pub async fn spawn_with_bind_address_for_test(
143        self,
144        address: Multiaddr,
145    ) -> Result<AuthorityServerHandle, io::Error> {
146        let tls_config = iota_tls::create_rustls_server_config(
147            self.state.config.network_key_pair().copy().private(),
148            IOTA_TLS_SERVER_NAME.to_string(),
149        );
150        let server = iota_network_stack::config::Config::new()
151            .server_builder()
152            .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
153                self.state,
154                self.consensus_adapter,
155                self.metrics,
156            )))
157            .bind(&address, Some(tls_config))
158            .await
159            .unwrap();
160        let local_addr = server.local_addr().to_owned();
161        info!("Listening to traffic on {local_addr}");
162        let handle = AuthorityServerHandle {
163            server_handle: server,
164        };
165        Ok(handle)
166    }
167}
168
169/// Metrics for the validator service.
170pub struct ValidatorServiceMetrics {
171    pub signature_errors: IntCounter,
172    pub tx_verification_latency: Histogram,
173    pub cert_verification_latency: Histogram,
174    pub consensus_latency: Histogram,
175    pub handle_transaction_latency: Histogram,
176    pub submit_certificate_consensus_latency: Histogram,
177    pub handle_certificate_consensus_latency: Histogram,
178    pub handle_certificate_non_consensus_latency: Histogram,
179    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
180    pub handle_soft_bundle_certificates_count: Histogram,
181    pub handle_soft_bundle_certificates_size_bytes: Histogram,
182    pub handle_capability_notification_latency: Histogram,
183
184    num_rejected_tx_in_epoch_boundary: IntCounter,
185    num_rejected_cert_in_epoch_boundary: IntCounter,
186    num_rejected_tx_during_overload: IntCounterVec,
187    num_rejected_cert_during_overload: IntCounterVec,
188    num_rejected_capability_notifications_during_overload: IntCounterVec,
189    connection_ip_not_found: IntCounter,
190    forwarded_header_parse_error: IntCounter,
191    forwarded_header_invalid: IntCounter,
192    forwarded_header_not_included: IntCounter,
193    client_id_source_config_mismatch: IntCounter,
194}
195
196impl ValidatorServiceMetrics {
197    /// Creates a new `ValidatorServiceMetrics` with Prometheus registry.
198    pub fn new(registry: &Registry) -> Self {
199        Self {
200            signature_errors: register_int_counter_with_registry!(
201                "total_signature_errors",
202                "Number of transaction signature errors",
203                registry,
204            )
205            .unwrap(),
206            tx_verification_latency: register_histogram_with_registry!(
207                "validator_service_tx_verification_latency",
208                "Latency of verifying a transaction",
209                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
210                registry,
211            )
212            .unwrap(),
213            cert_verification_latency: register_histogram_with_registry!(
214                "validator_service_cert_verification_latency",
215                "Latency of verifying a certificate",
216                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
217                registry,
218            )
219            .unwrap(),
220            consensus_latency: register_histogram_with_registry!(
221                "validator_service_consensus_latency",
222                "Time spent between submitting a shared obj txn to consensus and getting result",
223                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
224                registry,
225            )
226            .unwrap(),
227            handle_transaction_latency: register_histogram_with_registry!(
228                "validator_service_handle_transaction_latency",
229                "Latency of handling a transaction",
230                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231                registry,
232            )
233            .unwrap(),
234            handle_certificate_consensus_latency: register_histogram_with_registry!(
235                "validator_service_handle_certificate_consensus_latency",
236                "Latency of handling a consensus transaction certificate",
237                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
238                registry,
239            )
240            .unwrap(),
241            submit_certificate_consensus_latency: register_histogram_with_registry!(
242                "validator_service_submit_certificate_consensus_latency",
243                "Latency of submit_certificate RPC handler",
244                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
245                registry,
246            )
247            .unwrap(),
248            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
249                "validator_service_handle_certificate_non_consensus_latency",
250                "Latency of handling a non-consensus transaction certificate",
251                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
252                registry,
253            )
254            .unwrap(),
255            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
256                "validator_service_handle_soft_bundle_certificates_consensus_latency",
257                "Latency of handling a consensus soft bundle",
258                iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
259                registry,
260            )
261            .unwrap(),
262            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
263                "validator_service_handle_soft_bundle_certificates_count",
264                "The number of certificates included in a soft bundle",
265                iota_metrics::COUNT_BUCKETS.to_vec(),
266                registry,
267            )
268            .unwrap(),
269            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
270                "validator_service_handle_soft_bundle_certificates_size_bytes",
271                "The size of soft bundle in bytes",
272                iota_metrics::BYTES_BUCKETS.to_vec(),
273                registry,
274            )
275            .unwrap(),
276            handle_capability_notification_latency: register_histogram_with_registry!(
277                "validator_service_handle_capability_notification_latency",
278                "Latency of handling a capability notification",
279                iota_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
280                registry,
281            )
282            .unwrap(),
283            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
284                "validator_service_num_rejected_tx_in_epoch_boundary",
285                "Number of rejected transaction during epoch transitioning",
286                registry,
287            )
288            .unwrap(),
289            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
290                "validator_service_num_rejected_cert_in_epoch_boundary",
291                "Number of rejected transaction certificate during epoch transitioning",
292                registry,
293            )
294            .unwrap(),
295            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
296                "validator_service_num_rejected_tx_during_overload",
297                "Number of rejected transaction due to system overload",
298                &["error_type"],
299                registry,
300            )
301            .unwrap(),
302            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
303                "validator_service_num_rejected_cert_during_overload",
304                "Number of rejected transaction certificate due to system overload",
305                &["error_type"],
306                registry,
307            )
308            .unwrap(),
309            num_rejected_capability_notifications_during_overload: register_int_counter_vec_with_registry!(
310                "num_rejected_capability_notifications_during_overload",
311                "Number of rejected capability notifications from non-committee active validators due to system overload",
312                &["error_type"],
313                registry,
314            )
315            .unwrap(),
316            connection_ip_not_found: register_int_counter_with_registry!(
317                "validator_service_connection_ip_not_found",
318                "Number of times connection IP was not extractable from request",
319                registry,
320            )
321            .unwrap(),
322            forwarded_header_parse_error: register_int_counter_with_registry!(
323                "validator_service_forwarded_header_parse_error",
324                "Number of times x-forwarded-for header could not be parsed",
325                registry,
326            )
327            .unwrap(),
328            forwarded_header_invalid: register_int_counter_with_registry!(
329                "validator_service_forwarded_header_invalid",
330                "Number of times x-forwarded-for header was invalid",
331                registry,
332            )
333            .unwrap(),
334            forwarded_header_not_included: register_int_counter_with_registry!(
335                "validator_service_forwarded_header_not_included",
336                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
337                registry,
338            )
339            .unwrap(),
340            client_id_source_config_mismatch: register_int_counter_with_registry!(
341                "validator_service_client_id_source_config_mismatch",
342                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
343                registry,
344            )
345            .unwrap(),
346        }
347    }
348
349    /// Creates a new `ValidatorServiceMetrics` for testing.
350    pub fn new_for_tests() -> Self {
351        let registry = Registry::new();
352        Self::new(&registry)
353    }
354}
355
356/// The validator service.
357#[derive(Clone)]
358pub struct ValidatorService {
359    state: Arc<AuthorityState>,
360    consensus_adapter: Arc<ConsensusAdapter>,
361    metrics: Arc<ValidatorServiceMetrics>,
362    traffic_controller: Option<Arc<TrafficController>>,
363    client_id_source: Option<ClientIdSource>,
364}
365
366impl ValidatorService {
367    /// Creates a new `ValidatorService`.
368    pub fn new(
369        state: Arc<AuthorityState>,
370        consensus_adapter: Arc<ConsensusAdapter>,
371        validator_metrics: Arc<ValidatorServiceMetrics>,
372        traffic_controller_metrics: TrafficControllerMetrics,
373        policy_config: Option<PolicyConfig>,
374        firewall_config: Option<RemoteFirewallConfig>,
375    ) -> Self {
376        Self {
377            state,
378            consensus_adapter,
379            metrics: validator_metrics,
380            traffic_controller: policy_config.clone().map(|policy| {
381                Arc::new(TrafficController::init(
382                    policy,
383                    traffic_controller_metrics,
384                    firewall_config,
385                ))
386            }),
387            client_id_source: policy_config.map(|policy| policy.client_id_source),
388        }
389    }
390
391    pub fn new_for_tests(
392        state: Arc<AuthorityState>,
393        consensus_adapter: Arc<ConsensusAdapter>,
394        metrics: Arc<ValidatorServiceMetrics>,
395    ) -> Self {
396        Self {
397            state,
398            consensus_adapter,
399            metrics,
400            traffic_controller: None,
401            client_id_source: None,
402        }
403    }
404
405    /// Returns the validator state.
406    pub fn validator_state(&self) -> &Arc<AuthorityState> {
407        &self.state
408    }
409
410    /// Executes a `CertifiedTransaction` for testing.
411    pub async fn execute_certificate_for_testing(
412        &self,
413        cert: CertifiedTransaction,
414    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
415        let request = make_tonic_request_for_testing(HandleCertificateRequestV1::new(cert));
416        self.handle_certificate_v1(request).await
417    }
418
419    /// Handles a `Transaction` request for benchmarking.
420    pub async fn handle_transaction_for_benchmarking(
421        &self,
422        transaction: Transaction,
423    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
424        let request = make_tonic_request_for_testing(transaction);
425        self.transaction(request).await
426    }
427
428    /// Handles a `Transaction` request.
429    async fn handle_transaction(
430        &self,
431        request: tonic::Request<Transaction>,
432    ) -> WrappedServiceResponse<HandleTransactionResponse> {
433        let Self {
434            state,
435            consensus_adapter,
436            metrics,
437            traffic_controller: _,
438            client_id_source: _,
439        } = self.clone();
440        let transaction = request.into_inner();
441        let epoch_store = state.load_epoch_store_one_call_per_task();
442
443        transaction.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
444
445        // When authority is overloaded and decide to reject this tx, we still lock the
446        // object and ask the client to retry in the future. This is because
447        // without locking, the input objects can be locked by a different tx in
448        // the future, however, the input objects may already be locked by this
449        // tx in other validators. This can cause non of the txes to have enough
450        // quorum to form a certificate, causing the objects to be locked for
451        // the entire epoch. By doing locking but pushback, retrying transaction will
452        // have higher chance to succeed.
453        let mut validator_pushback_error = None;
454        let overload_check_res = state.check_system_overload(
455            &consensus_adapter,
456            transaction.data(),
457            state.check_system_overload_at_signing(),
458        );
459        if let Err(error) = overload_check_res {
460            metrics
461                .num_rejected_tx_during_overload
462                .with_label_values(&[error.as_ref()])
463                .inc();
464            // TODO: consider change the behavior for other types of overload errors.
465            match error {
466                IotaError::ValidatorOverloadedRetryAfter { .. } => {
467                    validator_pushback_error = Some(error)
468                }
469                _ => return Err(error.into()),
470            }
471        }
472
473        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
474
475        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
476        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
477            metrics.signature_errors.inc();
478        })?;
479        drop(tx_verif_metrics_guard);
480
481        let tx_digest = transaction.digest();
482
483        // Enable Trace Propagation across spans/processes using tx_digest
484        let span = error_span!("validator_state_process_tx", ?tx_digest);
485
486        let info = state
487            .handle_transaction(&epoch_store, transaction.clone())
488            .instrument(span)
489            .await
490            .tap_err(|e| {
491                if let IotaError::ValidatorHaltedAtEpochEnd = e {
492                    metrics.num_rejected_tx_in_epoch_boundary.inc();
493                }
494            })?;
495
496        if let Some(error) = validator_pushback_error {
497            // TODO: right now, we still sign the txn, but just don't return it. We can also
498            // skip signing to save more CPU.
499            return Err(error.into());
500        }
501
502        Ok((tonic::Response::new(info), Weight::zero()))
503    }
504
505    // In addition to the response from handling the certificates,
506    // returns a bool indicating whether the request should be tallied
507    // toward spam count. In general, this should be set to true for
508    // requests that are read-only and thus do not consume gas, such
509    // as when the transaction is already executed.
510    async fn handle_certificates(
511        &self,
512        certificates: NonEmpty<CertifiedTransaction>,
513        include_events: bool,
514        include_input_objects: bool,
515        include_output_objects: bool,
516        _include_auxiliary_data: bool,
517        epoch_store: &Arc<AuthorityPerEpochStore>,
518        wait_for_effects: bool,
519    ) -> Result<(Option<Vec<HandleCertificateResponseV1>>, Weight), tonic::Status> {
520        // Validate if cert can be executed
521        // Fullnode does not serve handle_certificate call.
522        fp_ensure!(
523            !self.state.is_fullnode(epoch_store),
524            IotaError::FullNodeCantHandleCertificate.into()
525        );
526
527        let shared_object_tx = certificates
528            .iter()
529            .any(|cert| cert.contains_shared_object());
530
531        let metrics = if certificates.len() == 1 {
532            if wait_for_effects {
533                if shared_object_tx {
534                    &self.metrics.handle_certificate_consensus_latency
535                } else {
536                    &self.metrics.handle_certificate_non_consensus_latency
537                }
538            } else {
539                &self.metrics.submit_certificate_consensus_latency
540            }
541        } else {
542            // `soft_bundle_validity_check` ensured that all certificates contain shared
543            // objects.
544            &self
545                .metrics
546                .handle_soft_bundle_certificates_consensus_latency
547        };
548
549        let _metrics_guard = metrics.start_timer();
550
551        // 1) Check if the certificate is already executed. This is only needed when we
552        //    have only one certificate (not a soft bundle). When multiple certificates
553        //    are provided, we will either submit all of them or none of them to
554        //    consensus.
555        if certificates.len() == 1 {
556            let tx_digest = *certificates[0].digest();
557
558            if let Some(signed_effects) = self
559                .state
560                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
561            {
562                let events = if include_events {
563                    if let Some(digest) = signed_effects.events_digest() {
564                        Some(self.state.get_transaction_events(digest)?)
565                    } else {
566                        None
567                    }
568                } else {
569                    None
570                };
571
572                return Ok((
573                    Some(vec![HandleCertificateResponseV1 {
574                        signed_effects: signed_effects.into_inner(),
575                        events,
576                        input_objects: None,
577                        output_objects: None,
578                        auxiliary_data: None,
579                    }]),
580                    Weight::one(),
581                ));
582            };
583        }
584
585        // 2) Verify the certificates.
586        // Check system overload
587        for certificate in &certificates {
588            let overload_check_res = self.state.check_system_overload(
589                &self.consensus_adapter,
590                certificate.data(),
591                self.state.check_system_overload_at_execution(),
592            );
593            if let Err(error) = overload_check_res {
594                self.metrics
595                    .num_rejected_cert_during_overload
596                    .with_label_values(&[error.as_ref()])
597                    .inc();
598                return Err(error.into());
599            }
600        }
601
602        let verified_certificates = {
603            let _timer = self.metrics.cert_verification_latency.start_timer();
604            epoch_store
605                .signature_verifier
606                .multi_verify_certs(certificates.into())
607                .await
608                .into_iter()
609                .collect::<Result<Vec<_>, _>>()?
610        };
611
612        {
613            // code block within reconfiguration lock
614            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
615            if !reconfiguration_lock.should_accept_user_certs() {
616                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
617                return Err(IotaError::ValidatorHaltedAtEpochEnd.into());
618            }
619
620            // 3) All certificates are sent to consensus (at least by some authorities)
621            // For shared objects this will wait until either timeout or we have heard back
622            // from consensus. For owned objects this will return without
623            // waiting for certificate to be sequenced First do quick dirty
624            // non-async check.
625            if !epoch_store
626                .is_all_tx_certs_consensus_message_processed(verified_certificates.iter())?
627            {
628                let _metrics_guard = if shared_object_tx {
629                    Some(self.metrics.consensus_latency.start_timer())
630                } else {
631                    None
632                };
633                let transactions = verified_certificates
634                    .iter()
635                    .map(|certificate| {
636                        ConsensusTransaction::new_certificate_message(
637                            &self.state.name,
638                            certificate.clone().into(),
639                        )
640                    })
641                    .collect::<Vec<_>>();
642                self.consensus_adapter.submit_batch(
643                    &transactions,
644                    Some(&reconfiguration_lock),
645                    epoch_store,
646                )?;
647                // Do not wait for the result, because the transaction might
648                // have already executed. Instead, check or wait
649                // for the existence of certificate effects below.
650            }
651        }
652
653        if !wait_for_effects {
654            // It is useful to enqueue owned object transaction for execution locally,
655            // even when we are not returning effects to user
656            let certificates_without_shared_objects = verified_certificates
657                .iter()
658                .filter(|certificate| !certificate.contains_shared_object())
659                .cloned()
660                .collect::<Vec<_>>();
661            if !certificates_without_shared_objects.is_empty() {
662                self.state.enqueue_certificates_for_execution(
663                    certificates_without_shared_objects,
664                    epoch_store,
665                );
666            }
667            return Ok((None, Weight::zero()));
668        }
669
670        // 4) Execute the certificates immediately if they contain only owned object
671        //    transactions,
672        // or wait for the execution results if it contains shared objects.
673        let responses = futures::future::try_join_all(verified_certificates.into_iter().map(
674            |certificate| async move {
675                let effects = self
676                    .state
677                    .execute_certificate(&certificate, epoch_store)
678                    .await?;
679                let events = if include_events {
680                    if let Some(digest) = effects.events_digest() {
681                        Some(self.state.get_transaction_events(digest)?)
682                    } else {
683                        None
684                    }
685                } else {
686                    None
687                };
688
689                let input_objects = include_input_objects
690                    .then(|| self.state.get_transaction_input_objects(&effects))
691                    .and_then(Result::ok);
692
693                let output_objects = include_output_objects
694                    .then(|| self.state.get_transaction_output_objects(&effects))
695                    .and_then(Result::ok);
696
697                let signed_effects = self.state.sign_effects(effects, epoch_store)?;
698                epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
699
700                Ok::<_, IotaError>(HandleCertificateResponseV1 {
701                    signed_effects: signed_effects.into_inner(),
702                    events,
703                    input_objects,
704                    output_objects,
705                    auxiliary_data: None, // We don't have any aux data generated presently
706                })
707            },
708        ))
709        .await?;
710
711        Ok((Some(responses), Weight::zero()))
712    }
713}
714
715type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
716
717impl ValidatorService {
718    async fn transaction_impl(
719        &self,
720        request: tonic::Request<Transaction>,
721    ) -> WrappedServiceResponse<HandleTransactionResponse> {
722        self.handle_transaction(request).await
723    }
724
725    async fn submit_certificate_impl(
726        &self,
727        request: tonic::Request<CertifiedTransaction>,
728    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
729        let epoch_store = self.state.load_epoch_store_one_call_per_task();
730        let certificate = request.into_inner();
731        certificate.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
732
733        let span = error_span!("submit_certificate", tx_digest = ?certificate.digest());
734        self.handle_certificates(
735            nonempty![certificate],
736            true,
737            false,
738            false,
739            false,
740            &epoch_store,
741            false,
742        )
743        .instrument(span)
744        .await
745        .map(|(executed, spam_weight)| {
746            (
747                tonic::Response::new(SubmitCertificateResponse {
748                    executed: executed.map(|mut x| x.remove(0)),
749                }),
750                spam_weight,
751            )
752        })
753    }
754
755    async fn handle_certificate_v1_impl(
756        &self,
757        request: tonic::Request<HandleCertificateRequestV1>,
758    ) -> WrappedServiceResponse<HandleCertificateResponseV1> {
759        let epoch_store = self.state.load_epoch_store_one_call_per_task();
760        let request = request.into_inner();
761        request
762            .certificate
763            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;
764
765        let span = error_span!("handle_certificate_v1", tx_digest = ?request.certificate.digest());
766        self.handle_certificates(
767            nonempty![request.certificate],
768            request.include_events,
769            request.include_input_objects,
770            request.include_output_objects,
771            request.include_auxiliary_data,
772            &epoch_store,
773            true,
774        )
775        .instrument(span)
776        .await
777        .map(|(resp, spam_weight)| {
778            (
779                tonic::Response::new(
780                    resp.expect(
781                        "handle_certificate should not return none with wait_for_effects=true",
782                    )
783                    .remove(0),
784                ),
785                spam_weight,
786            )
787        })
788    }
789
790    async fn soft_bundle_validity_check(
791        &self,
792        certificates: &NonEmpty<CertifiedTransaction>,
793        epoch_store: &Arc<AuthorityPerEpochStore>,
794        total_size_bytes: u64,
795    ) -> Result<(), tonic::Status> {
796        let protocol_config = epoch_store.protocol_config();
797
798        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
799        // - All certs must access at least one shared object.
800        // - All certs must not be already executed.
801        // - All certs must have the same gas price.
802        // - Number of certs must not exceed the max allowed.
803        // - Total size of all certs must not exceed the max allowed.
804        fp_ensure!(
805            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
806            IotaError::UserInput {
807                error: UserInputError::TooManyTransactionsInSoftBundle {
808                    limit: protocol_config.max_soft_bundle_size()
809                }
810            }
811            .into()
812        );
813
814        // We set the soft bundle max size to be half of the consensus max transactions
815        // in block size. We do this to account for serialization overheads and
816        // to ensure that the soft bundle is not too large when is attempted to be
817        // posted via consensus. Although half the block size is on the extreme
818        // side, it's should be good enough for now.
819        let soft_bundle_max_size_bytes =
820            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
821        fp_ensure!(
822            total_size_bytes <= soft_bundle_max_size_bytes,
823            IotaError::UserInput {
824                error: UserInputError::SoftBundleTooLarge {
825                    size: total_size_bytes,
826                    limit: soft_bundle_max_size_bytes,
827                },
828            }
829            .into()
830        );
831
832        let mut gas_price = None;
833        for certificate in certificates {
834            let tx_digest = *certificate.digest();
835            fp_ensure!(
836                certificate.contains_shared_object(),
837                IotaError::UserInput {
838                    error: UserInputError::NoSharedObject { digest: tx_digest }
839                }
840                .into()
841            );
842            fp_ensure!(
843                !self.state.try_is_tx_already_executed(&tx_digest)?,
844                IotaError::UserInput {
845                    error: UserInputError::AlreadyExecuted { digest: tx_digest }
846                }
847                .into()
848            );
849            if let Some(gas) = gas_price {
850                fp_ensure!(
851                    gas == certificate.gas_price(),
852                    IotaError::UserInput {
853                        error: UserInputError::GasPriceMismatch {
854                            digest: tx_digest,
855                            expected: gas,
856                            actual: certificate.gas_price()
857                        }
858                    }
859                    .into()
860                );
861            } else {
862                gas_price = Some(certificate.gas_price());
863            }
864        }
865
866        // For Soft Bundle, if at this point we know at least one certificate has
867        // already been processed, reject the entire bundle.  Otherwise, submit
868        // all certificates in one request. This is not a strict check as there
869        // may be race conditions where one or more certificates are
870        // already being processed by another actor, and we could not know it.
871        fp_ensure!(
872            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
873            IotaError::UserInput {
874                error: UserInputError::CertificateAlreadyProcessed
875            }
876            .into()
877        );
878
879        Ok(())
880    }
881
882    async fn handle_soft_bundle_certificates_v1_impl(
883        &self,
884        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
885    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV1> {
886        let epoch_store = self.state.load_epoch_store_one_call_per_task();
887        let client_addr = if self.client_id_source.is_none() {
888            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
889        } else {
890            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
891        };
892        let request = request.into_inner();
893
894        let certificates =
895            NonEmpty::from_vec(request.certificates).ok_or(IotaError::NoCertificateProvided)?;
896        let mut total_size_bytes = 0;
897        for certificate in &certificates {
898            // We need to check this first because we haven't verified the cert signature.
899            total_size_bytes += certificate
900                .validity_check(epoch_store.protocol_config(), epoch_store.epoch())?
901                as u64;
902        }
903
904        self.metrics
905            .handle_soft_bundle_certificates_count
906            .observe(certificates.len() as f64);
907
908        self.metrics
909            .handle_soft_bundle_certificates_size_bytes
910            .observe(total_size_bytes as f64);
911
912        // Now that individual certificates are valid, we check if the bundle is valid.
913        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
914            .await?;
915
916        info!(
917            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
918            certificates.len(),
919            client_addr
920                .map(|x| x.to_string())
921                .unwrap_or_else(|| "unknown".to_string()),
922            certificates
923                .iter()
924                .map(|x| x.digest().to_string())
925                .collect::<Vec<_>>()
926                .join(", "),
927            total_size_bytes
928        );
929
930        let span = error_span!("handle_soft_bundle_certificates_v1");
931        self.handle_certificates(
932            certificates,
933            request.include_events,
934            request.include_input_objects,
935            request.include_output_objects,
936            request.include_auxiliary_data,
937            &epoch_store,
938            request.wait_for_effects,
939        )
940        .instrument(span)
941        .await
942        .map(|(resp, spam_weight)| {
943            (
944                tonic::Response::new(HandleSoftBundleCertificatesResponseV1 {
945                    responses: resp.unwrap_or_default(),
946                }),
947                spam_weight,
948            )
949        })
950    }
951
952    async fn object_info_impl(
953        &self,
954        request: tonic::Request<ObjectInfoRequest>,
955    ) -> WrappedServiceResponse<ObjectInfoResponse> {
956        let request = request.into_inner();
957        let response = self.state.handle_object_info_request(request).await?;
958        Ok((tonic::Response::new(response), Weight::one()))
959    }
960
961    async fn transaction_info_impl(
962        &self,
963        request: tonic::Request<TransactionInfoRequest>,
964    ) -> WrappedServiceResponse<TransactionInfoResponse> {
965        let request = request.into_inner();
966        let response = self.state.handle_transaction_info_request(request).await?;
967        Ok((tonic::Response::new(response), Weight::one()))
968    }
969
970    async fn checkpoint_impl(
971        &self,
972        request: tonic::Request<CheckpointRequest>,
973    ) -> WrappedServiceResponse<CheckpointResponse> {
974        let request = request.into_inner();
975        let response = self.state.handle_checkpoint_request(&request)?;
976        Ok((tonic::Response::new(response), Weight::one()))
977    }
978
979    async fn get_system_state_object_impl(
980        &self,
981        _request: tonic::Request<SystemStateRequest>,
982    ) -> WrappedServiceResponse<IotaSystemState> {
983        let response = self
984            .state
985            .get_object_cache_reader()
986            .try_get_iota_system_state_object_unsafe()?;
987        Ok((tonic::Response::new(response), Weight::one()))
988    }
989
990    fn get_client_ip_addr<T>(
991        &self,
992        request: &tonic::Request<T>,
993        source: &ClientIdSource,
994    ) -> Option<IpAddr> {
995        match source {
996            ClientIdSource::SocketAddr => {
997                let socket_addr: Option<SocketAddr> = request.remote_addr();
998
999                // We will hit this case if the IO type used does not
1000                // implement Connected or when using a unix domain socket.
1001                // TODO: once we have confirmed that no legitimate traffic
1002                // is hitting this case, we should reject such requests that
1003                // hit this case.
1004                if let Some(socket_addr) = socket_addr {
1005                    Some(socket_addr.ip())
1006                } else {
1007                    if cfg!(msim) {
1008                        // Ignore the error from simtests.
1009                    } else if cfg!(test) {
1010                        panic!("Failed to get remote address from request");
1011                    } else {
1012                        self.metrics.connection_ip_not_found.inc();
1013                        error!("Failed to get remote address from request");
1014                    }
1015                    None
1016                }
1017            }
1018            ClientIdSource::XForwardedFor(num_hops) => {
1019                let do_header_parse = |op: &MetadataValue<Ascii>| {
1020                    match op.to_str() {
1021                        Ok(header_val) => {
1022                            let header_contents =
1023                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1024                            if *num_hops == 0 {
1025                                error!(
1026                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1027                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1028                                    to this node. Skipping traffic controller request handling.",
1029                                    header_contents,
1030                                );
1031                                return None;
1032                            }
1033                            let contents_len = header_contents.len();
1034                            if contents_len < *num_hops {
1035                                error!(
1036                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1037                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1038                                    `client-id-source` in the node config.",
1039                                    header_contents, contents_len, num_hops, contents_len,
1040                                );
1041                                self.metrics.client_id_source_config_mismatch.inc();
1042                                return None;
1043                            }
1044                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1045                            else {
1046                                error!(
1047                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1048                                    Expected at least {} values. Skipping traffic controller request handling.",
1049                                    header_contents, contents_len, num_hops, contents_len,
1050                                );
1051                                return None;
1052                            };
1053                            parse_ip(client_ip).or_else(|| {
1054                                self.metrics.forwarded_header_parse_error.inc();
1055                                None
1056                            })
1057                        }
1058                        Err(e) => {
1059                            // TODO: once we have confirmed that no legitimate traffic
1060                            // is hitting this case, we should reject such requests that
1061                            // hit this case.
1062                            self.metrics.forwarded_header_invalid.inc();
1063                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1064                            None
1065                        }
1066                    }
1067                };
1068                if let Some(op) = request.metadata().get("x-forwarded-for") {
1069                    do_header_parse(op)
1070                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1071                    do_header_parse(op)
1072                } else {
1073                    self.metrics.forwarded_header_not_included.inc();
1074                    error!(
1075                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1076                    );
1077                    None
1078                }
1079            }
1080        }
1081    }
1082
1083    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1084        if let Some(traffic_controller) = &self.traffic_controller {
1085            if !traffic_controller.check(&client, &None).await {
1086                // Entity in blocklist
1087                Err(tonic::Status::from_error(IotaError::TooManyRequests.into()))
1088            } else {
1089                Ok(())
1090            }
1091        } else {
1092            Ok(())
1093        }
1094    }
1095
1096    fn handle_traffic_resp<T>(
1097        &self,
1098        client: Option<IpAddr>,
1099        wrapped_response: WrappedServiceResponse<T>,
1100    ) -> Result<tonic::Response<T>, tonic::Status> {
1101        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1102            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1103            Err(status) => (
1104                Some(IotaError::from(status.clone())),
1105                Weight::zero(),
1106                Err(status.clone()),
1107            ),
1108        };
1109
1110        if let Some(traffic_controller) = self.traffic_controller.clone() {
1111            traffic_controller.tally(TrafficTally {
1112                direct: client,
1113                through_fullnode: None,
1114                error_info: error.map(|e| {
1115                    let error_type = String::from(e.clone().as_ref());
1116                    let error_weight = normalize(e);
1117                    (error_weight, error_type)
1118                }),
1119                spam_weight,
1120                timestamp: SystemTime::now(),
1121            })
1122        }
1123        unwrapped_response
1124    }
1125
1126    async fn handle_capability_notification_v1_impl(
1127        &self,
1128        request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1129    ) -> WrappedServiceResponse<HandleCapabilityNotificationResponseV1> {
1130        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1131        let request = request.into_inner();
1132        fp_ensure!(
1133            epoch_store
1134                .protocol_config()
1135                .track_non_committee_eligible_validators(),
1136            IotaError::UnsupportedFeature {
1137                error: "capability notification endpoint is not supported in this Protocol Version"
1138                    .to_string()
1139            }
1140            .into()
1141        );
1142        // Validate if cert can be executed
1143        // Fullnode does not serve handle_certificate call.
1144        fp_ensure!(
1145            !self.state.is_fullnode(&epoch_store),
1146            IotaError::FullNodeCantHandleAuthorityCapabilities.into()
1147        );
1148
1149        // Check if the capabilities notification has already been processed
1150        let existing_capabilities = epoch_store.get_capabilities_v1()?;
1151        let incoming_capability = request.message.data();
1152
1153        info!(
1154            "Received capability notification: {:?}",
1155            incoming_capability
1156        );
1157
1158        if let Some(existing) = existing_capabilities
1159            .iter()
1160            .find(|cap| cap.authority == incoming_capability.authority)
1161        {
1162            if incoming_capability.generation <= existing.generation {
1163                // Return successfully if generation is lower or equal - already processed
1164                return Ok((
1165                    tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1166                    Weight::one(),
1167                ));
1168            }
1169        }
1170        if let Err(error) = self.consensus_adapter.check_consensus_overload() {
1171            self.metrics
1172                .num_rejected_capability_notifications_during_overload
1173                .with_label_values(&[error.as_ref()])
1174                .inc();
1175            return Err(error.into());
1176        }
1177
1178        let _handle_tx_metrics_guard = self
1179            .metrics
1180            .handle_capability_notification_latency
1181            .start_timer();
1182
1183        let signed_authority_capabilities = request.message;
1184        // Verify the message signature
1185        let verified_authority_capabilities = epoch_store
1186            .verify_authority_capabilities(signed_authority_capabilities)
1187            .inspect_err(|_e| {
1188                self.metrics.signature_errors.inc();
1189            })?;
1190
1191        let authority_name = verified_authority_capabilities.authority;
1192        // Process the verified capabilities
1193        debug!("Verified capability notification for authority {authority_name:?}");
1194
1195        // Submit the signed capability notification to consensus instead of processing
1196        // directly
1197        let signed_authority_capabilities_transaction =
1198            ConsensusTransaction::new_signed_capability_notification_v1(
1199                verified_authority_capabilities.into_inner(),
1200            );
1201
1202        // Submit to consensus - similar to how certificates are handled
1203        self.consensus_adapter.submit(
1204            signed_authority_capabilities_transaction,
1205            None,
1206            &epoch_store,
1207        )?;
1208
1209        debug!("Submitted capability notification to consensus for authority {authority_name:?}");
1210
1211        Ok((
1212            tonic::Response::new(HandleCapabilityNotificationResponseV1 { _unused: false }),
1213            Weight::one(),
1214        ))
1215    }
1216}
1217
1218fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
1219    // simulate a TCP connection, which would have added extensions to
1220    // the request object that would be used downstream
1221    let mut request = tonic::Request::new(message);
1222    let tcp_connect_info = TcpConnectInfo {
1223        local_addr: None,
1224        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
1225    };
1226    request.extensions_mut().insert(tcp_connect_info);
1227    request
1228}
1229
1230// TODO: refine error matching here
1231fn normalize(err: IotaError) -> Weight {
1232    match err {
1233        IotaError::UserInput {
1234            error: UserInputError::IncorrectUserSignature { .. },
1235        } => Weight::one(),
1236        IotaError::InvalidSignature { .. }
1237        | IotaError::SignerSignatureAbsent { .. }
1238        | IotaError::SignerSignatureNumberMismatch { .. }
1239        | IotaError::IncorrectSigner { .. }
1240        | IotaError::UnknownSigner { .. }
1241        | IotaError::WrongEpoch { .. } => Weight::one(),
1242        _ => Weight::zero(),
1243    }
1244}
1245
1246/// Implements generic pre- and post-processing. Since this is on the critical
1247/// path, any heavy lifting should be done in a separate non-blocking task
1248/// unless it is necessary to override the return value.
1249#[macro_export]
1250macro_rules! handle_with_decoration {
1251    ($self:ident, $func_name:ident, $request:ident) => {{
1252        if $self.client_id_source.is_none() {
1253            return $self.$func_name($request).await.map(|(result, _)| result);
1254        }
1255
1256        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1257
1258        // check if either IP is blocked, in which case return early
1259        $self.handle_traffic_req(client.clone()).await?;
1260
1261        // handle traffic tallying
1262        let wrapped_response = $self.$func_name($request).await;
1263        $self.handle_traffic_resp(client, wrapped_response)
1264    }};
1265}
1266
1267#[async_trait]
1268impl Validator for ValidatorService {
1269    /// Handles a `Transaction` request.
1270    async fn transaction(
1271        &self,
1272        request: tonic::Request<Transaction>,
1273    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
1274        let validator_service = self.clone();
1275
1276        // Spawns a task which handles the transaction. The task will unconditionally
1277        // continue processing in the event that the client connection is
1278        // dropped.
1279        spawn_monitored_task!(async move {
1280            // NB: traffic tally wrapping handled within the task rather than on task exit
1281            // to prevent an attacker from subverting traffic control by severing the
1282            // connection
1283            handle_with_decoration!(validator_service, transaction_impl, request)
1284        })
1285        .await
1286        .unwrap()
1287    }
1288
1289    /// Submits a `CertifiedTransaction` request.
1290    async fn submit_certificate(
1291        &self,
1292        request: tonic::Request<CertifiedTransaction>,
1293    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
1294        let validator_service = self.clone();
1295
1296        // Spawns a task which handles the certificate. The task will unconditionally
1297        // continue processing in the event that the client connection is
1298        // dropped.
1299        spawn_monitored_task!(async move {
1300            // NB: traffic tally wrapping handled within the task rather than on task exit
1301            // to prevent an attacker from subverting traffic control by severing the
1302            // connection.
1303            handle_with_decoration!(validator_service, submit_certificate_impl, request)
1304        })
1305        .await
1306        .unwrap()
1307    }
1308
1309    async fn handle_certificate_v1(
1310        &self,
1311        request: tonic::Request<HandleCertificateRequestV1>,
1312    ) -> Result<tonic::Response<HandleCertificateResponseV1>, tonic::Status> {
1313        handle_with_decoration!(self, handle_certificate_v1_impl, request)
1314    }
1315
1316    async fn handle_soft_bundle_certificates_v1(
1317        &self,
1318        request: tonic::Request<HandleSoftBundleCertificatesRequestV1>,
1319    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV1>, tonic::Status> {
1320        handle_with_decoration!(self, handle_soft_bundle_certificates_v1_impl, request)
1321    }
1322
1323    /// Handles an `ObjectInfoRequest` request.
1324    async fn object_info(
1325        &self,
1326        request: tonic::Request<ObjectInfoRequest>,
1327    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1328        handle_with_decoration!(self, object_info_impl, request)
1329    }
1330
1331    /// Handles a `TransactionInfoRequest` request.
1332    async fn transaction_info(
1333        &self,
1334        request: tonic::Request<TransactionInfoRequest>,
1335    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1336        handle_with_decoration!(self, transaction_info_impl, request)
1337    }
1338
1339    /// Handles a `CheckpointRequest` request.
1340    async fn checkpoint(
1341        &self,
1342        request: tonic::Request<CheckpointRequest>,
1343    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1344        handle_with_decoration!(self, checkpoint_impl, request)
1345    }
1346
1347    /// Gets the `IotaSystemState` response.
1348    async fn get_system_state_object(
1349        &self,
1350        request: tonic::Request<SystemStateRequest>,
1351    ) -> Result<tonic::Response<IotaSystemState>, tonic::Status> {
1352        handle_with_decoration!(self, get_system_state_object_impl, request)
1353    }
1354
1355    async fn handle_capability_notification_v1(
1356        &self,
1357        request: tonic::Request<HandleCapabilityNotificationRequestV1>,
1358    ) -> Result<tonic::Response<HandleCapabilityNotificationResponseV1>, tonic::Status> {
1359        handle_with_decoration!(self, handle_capability_notification_v1_impl, request)
1360    }
1361}