1use std::{
7 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
8 convert::AsRef,
9 net::SocketAddr,
10 string::ToString,
11 sync::Arc,
12 time::Duration,
13};
14
15use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
16use iota_authority_aggregation::{AsyncResult, ReduceOutput, quorum_map_then_reduce_with_timeout};
17use iota_config::genesis::Genesis;
18use iota_metrics::{GaugeGuard, MonitorCancellation, monitored_future, spawn_monitored_task};
19use iota_network::{
20 DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_iota_network_config,
21};
22use iota_network_stack::config::Config;
23use iota_swarm_config::network_config::NetworkConfig;
24use iota_types::{
25 base_types::*,
26 committee::{Committee, CommitteeTrait, CommitteeWithNetworkMetadata, StakeUnit},
27 crypto::{AuthorityPublicKeyBytes, AuthoritySignInfo},
28 effects::{
29 CertifiedTransactionEffects, SignedTransactionEffects, TransactionEffects,
30 TransactionEvents, VerifiedCertifiedTransactionEffects,
31 },
32 error::{IotaError, IotaResult, UserInputError},
33 fp_ensure,
34 iota_system_state::{
35 IotaSystemState, IotaSystemStateTrait,
36 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
37 },
38 message_envelope::Message,
39 messages_grpc::{
40 HandleCapabilityNotificationRequestV1, HandleCertificateRequestV1,
41 HandleCertificateResponseV1, LayoutGenerationOption, ObjectInfoRequest,
42 TransactionInfoRequest,
43 },
44 messages_safe_client::PlainTransactionInfoResponse,
45 object::Object,
46 quorum_driver_types::{GroupedErrors, QuorumDriverResponse},
47 transaction::*,
48};
49use prometheus::{
50 Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry,
51 register_int_counter_vec_with_registry, register_int_counter_with_registry,
52 register_int_gauge_with_registry,
53};
54use thiserror::Error;
55use tokio::time::{sleep, timeout};
56use tracing::{Instrument, debug, error, info, instrument, trace, trace_span, warn};
57
58use crate::{
59 authority_client::{
60 AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
61 make_network_authority_clients_with_network_config,
62 },
63 epoch::committee_store::CommitteeStore,
64 safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase},
65 stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator},
66};
67
68pub const DEFAULT_RETRIES: usize = 4;
69
70#[cfg(test)]
71#[path = "unit_tests/authority_aggregator_tests.rs"]
72pub mod authority_aggregator_tests;
73
74#[derive(Clone)]
76pub struct TimeoutConfig {
77 pub pre_quorum_timeout: Duration,
78 pub post_quorum_timeout: Duration,
79
80 pub serial_authority_request_interval: Duration,
84}
85
86impl Default for TimeoutConfig {
87 fn default() -> Self {
88 Self {
89 pre_quorum_timeout: Duration::from_secs(60),
90 post_quorum_timeout: Duration::from_secs(7),
91 serial_authority_request_interval: Duration::from_millis(1000),
92 }
93 }
94}
95
96#[derive(Clone)]
98pub struct AuthAggMetrics {
99 pub total_tx_certificates_created: IntCounter,
100 pub process_tx_errors: IntCounterVec,
101 pub process_cert_errors: IntCounterVec,
102 pub total_client_double_spend_attempts_detected: IntCounter,
103 pub total_aggregated_err: IntCounterVec,
104 pub total_rpc_err: IntCounterVec,
105 pub inflight_transactions: IntGauge,
106 pub inflight_certificates: IntGauge,
107 pub inflight_transaction_requests: IntGauge,
108 pub inflight_certificate_requests: IntGauge,
109
110 pub cert_broadcasting_post_quorum_timeout: IntCounter,
111 pub remaining_tasks_when_reaching_cert_quorum: Histogram,
112 pub remaining_tasks_when_cert_broadcasting_post_quorum_timeout: Histogram,
113 pub quorum_reached_without_requested_objects: IntCounter,
114
115 pub capability_notification_success: IntCounter,
116 pub capability_notification_errors: IntCounter,
117}
118
119impl AuthAggMetrics {
120 pub fn new(registry: &prometheus::Registry) -> Self {
122 Self {
123 total_tx_certificates_created: register_int_counter_with_registry!(
124 "total_tx_certificates_created",
125 "Total number of certificates made in the authority_aggregator",
126 registry,
127 )
128 .unwrap(),
129 process_tx_errors: register_int_counter_vec_with_registry!(
130 "process_tx_errors",
131 "Number of errors returned from validators when processing transaction, group by validator name and error type",
132 &["name","error"],
133 registry,
134 )
135 .unwrap(),
136 process_cert_errors: register_int_counter_vec_with_registry!(
137 "process_cert_errors",
138 "Number of errors returned from validators when processing certificate, group by validator name and error type",
139 &["name", "error"],
140 registry,
141 )
142 .unwrap(),
143 total_client_double_spend_attempts_detected: register_int_counter_with_registry!(
144 "total_client_double_spend_attempts_detected",
145 "Total number of client double spend attempts that are detected",
146 registry,
147 )
148 .unwrap(),
149 total_aggregated_err: register_int_counter_vec_with_registry!(
150 "total_aggregated_err",
151 "Total number of errors returned from validators, grouped by error type",
152 &["error", "tx_recoverable"],
153 registry,
154 )
155 .unwrap(),
156 total_rpc_err: register_int_counter_vec_with_registry!(
157 "total_rpc_err",
158 "Total number of rpc errors returned from validators, grouped by validator short name and RPC error message",
159 &["name", "error_message"],
160 registry,
161 )
162 .unwrap(),
163 inflight_transactions: register_int_gauge_with_registry!(
164 "auth_agg_inflight_transactions",
165 "Inflight transaction gathering signatures",
166 registry,
167 )
168 .unwrap(),
169 inflight_certificates: register_int_gauge_with_registry!(
170 "auth_agg_inflight_certificates",
171 "Inflight certificates gathering effects",
172 registry,
173 )
174 .unwrap(),
175 inflight_transaction_requests: register_int_gauge_with_registry!(
176 "auth_agg_inflight_transaction_requests",
177 "Inflight handle_transaction requests",
178 registry,
179 )
180 .unwrap(),
181 inflight_certificate_requests: register_int_gauge_with_registry!(
182 "auth_agg_inflight_certificate_requests",
183 "Inflight handle_certificate requests",
184 registry,
185 )
186 .unwrap(),
187 cert_broadcasting_post_quorum_timeout: register_int_counter_with_registry!(
188 "auth_agg_cert_broadcasting_post_quorum_timeout",
189 "Total number of timeout in cert processing post quorum",
190 registry,
191 )
192 .unwrap(),
193 remaining_tasks_when_reaching_cert_quorum: register_histogram_with_registry!(
194 "auth_agg_remaining_tasks_when_reaching_cert_quorum",
195 "Number of remaining tasks when reaching certificate quorum",
196 registry,
197 ).unwrap(),
198 remaining_tasks_when_cert_broadcasting_post_quorum_timeout: register_histogram_with_registry!(
199 "auth_agg_remaining_tasks_when_cert_broadcasting_post_quorum_timeout",
200 "Number of remaining tasks when post quorum certificate broadcasting times out",
201 registry,
202 ).unwrap(),
203 quorum_reached_without_requested_objects: register_int_counter_with_registry!(
204 "auth_agg_quorum_reached_without_requested_objects",
205 "Number of times quorum was reached without getting the requested objects back from at least 1 validator",
206 registry,
207 )
208 .unwrap(),
209 capability_notification_success: register_int_counter_with_registry!(
210 "capability_notification_success",
211 "Total number of successful capability notifications sent to committee validators",
212 registry,
213 )
214 .unwrap(),
215 capability_notification_errors: register_int_counter_with_registry!(
216 "capability_notification_errors",
217 "Number of errors returned from validators when sending capability notifications",
218 registry,
219 )
220 .unwrap(),
221 }
222 }
223
224 pub fn new_for_tests() -> Self {
226 let registry = prometheus::Registry::new();
227 Self::new(®istry)
228 }
229}
230
231#[derive(Error, Debug, Eq, PartialEq)]
233pub enum AggregatorProcessTransactionError {
234 #[error(
235 "Failed to execute transaction on a quorum of validators due to non-retryable errors. Validator errors: {:?}",
236 errors
237 )]
238 FatalTransaction { errors: GroupedErrors },
239
240 #[error(
241 "Failed to execute transaction on a quorum of validators but state is still retryable. Validator errors: {:?}",
242 errors
243 )]
244 RetryableTransaction { errors: GroupedErrors },
245
246 #[error(
247 "Failed to execute transaction on a quorum of validators due to conflicting transactions. Locked objects: {:?}. Validator errors: {:?}",
248 conflicting_tx_digests,
249 errors
250 )]
251 FatalConflictingTransaction {
252 errors: GroupedErrors,
253 conflicting_tx_digests:
254 BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
255 },
256
257 #[error(
258 "{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
259 overloaded_stake,
260 errors
261 )]
262 SystemOverload {
263 overloaded_stake: StakeUnit,
264 errors: GroupedErrors,
265 },
266
267 #[error("Transaction is already finalized but with different user signatures")]
268 TxAlreadyFinalizedWithDifferentUserSignatures,
269
270 #[error(
271 "{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
272 overload_stake,
273 retry_after_secs,
274 errors
275 )]
276 SystemOverloadRetryAfter {
277 overload_stake: StakeUnit,
278 errors: GroupedErrors,
279 retry_after_secs: u64,
280 },
281}
282
283#[derive(Error, Debug)]
284pub enum AggregatorSendCapabilityNotificationError {
285 #[error(
286 "Failed to send capability notification to a quorum of validators due to non-retryable errors. Validator errors: {:?}",
287 errors
288 )]
289 NonRetryableNotification { errors: GroupedErrors },
290
291 #[error(
292 "Failed to send capability notification to a quorum of validators but state is still retryable. Validator errors: {:?}",
293 errors
294 )]
295 RetryableNotification { errors: GroupedErrors },
296}
297
298#[derive(Error, Debug)]
299pub enum AggregatorProcessCertificateError {
300 #[error(
301 "Failed to execute certificate on a quorum of validators. Non-retryable errors: {:?}",
302 non_retryable_errors
303 )]
304 FatalExecuteCertificate { non_retryable_errors: GroupedErrors },
305
306 #[error(
307 "Failed to execute certificate on a quorum of validators but state is still retryable. Retryable errors: {:?}",
308 retryable_errors
309 )]
310 RetryableExecuteCertificate { retryable_errors: GroupedErrors },
311}
312
313pub fn group_errors(errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>) -> GroupedErrors {
315 #[expect(clippy::mutable_key_type)]
316 let mut grouped_errors = HashMap::new();
317 for (error, names, stake) in errors {
318 let entry = grouped_errors.entry(error).or_insert((0, vec![]));
319 entry.0 += stake;
320 entry.1.extend(
321 names
322 .into_iter()
323 .map(|n| n.concise_owned())
324 .collect::<Vec<_>>(),
325 );
326 }
327 grouped_errors
328 .into_iter()
329 .map(|(e, (s, n))| (e, s, n))
330 .collect()
331}
332
333#[derive(Debug, Default)]
336pub struct RetryableOverloadInfo {
337 pub total_stake: StakeUnit,
339
340 pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
342}
343
344impl RetryableOverloadInfo {
345 pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
346 self.total_stake += stake;
347 self.stake_requested_retry_after
348 .entry(retry_after)
349 .and_modify(|s| *s += stake)
350 .or_insert(stake);
351 }
352
353 pub fn get_quorum_retry_after(
356 &self,
357 good_stake: StakeUnit,
358 quorum_threshold: StakeUnit,
359 ) -> Duration {
360 if self.stake_requested_retry_after.is_empty() {
361 return Duration::from_secs(0);
362 }
363
364 let mut quorum_stake = good_stake;
365 for (retry_after, stake) in self.stake_requested_retry_after.iter() {
366 quorum_stake += *stake;
367 if quorum_stake >= quorum_threshold {
368 return *retry_after;
369 }
370 }
371 *self.stake_requested_retry_after.last_key_value().unwrap().0
372 }
373}
374
375#[derive(Debug)]
376struct ProcessTransactionState {
377 tx_signatures: StakeAggregator<AuthoritySignInfo, true>,
379 effects_map: MultiStakeAggregator<TransactionEffectsDigest, TransactionEffects, true>,
380 errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
382 non_retryable_stake: StakeUnit,
384 object_or_package_not_found_stake: StakeUnit,
386 overloaded_stake: StakeUnit,
388 retryable_overload_info: RetryableOverloadInfo,
390 conflicting_tx_digests:
392 BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
393 retryable: bool,
398 tx_finalized_with_different_user_sig: bool,
399}
400
401impl ProcessTransactionState {
402 pub fn record_conflicting_transaction_if_any(
405 &mut self,
406 validator_name: AuthorityName,
407 weight: StakeUnit,
408 err: &IotaError,
409 ) {
410 if let IotaError::ObjectLockConflict {
411 obj_ref,
412 pending_transaction: transaction,
413 } = err
414 {
415 let (lock_records, total_stake) = self
416 .conflicting_tx_digests
417 .entry(*transaction)
418 .or_insert((Vec::new(), 0));
419 lock_records.push((validator_name, *obj_ref));
420 *total_stake += weight;
421 }
422 }
423
424 pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
426 &self,
427 validity_threshold: StakeUnit,
428 ) -> bool {
429 let invalid_sig_stake: StakeUnit = self
440 .errors
441 .iter()
442 .filter_map(|(e, _, stake)| {
443 if matches!(e, IotaError::FailedToVerifyTxCertWithExecutedEffects { .. }) {
444 Some(stake)
445 } else {
446 None
447 }
448 })
449 .sum();
450 invalid_sig_stake >= validity_threshold
451 }
452}
453
454struct ProcessCertificateState {
456 effects_map:
460 MultiStakeAggregator<(EpochId, TransactionEffectsDigest), TransactionEffects, true>,
461 non_retryable_stake: StakeUnit,
462 non_retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
463 retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
464 retryable: bool,
468
469 events: Option<TransactionEvents>,
473 input_objects: Option<Vec<Object>>,
474 output_objects: Option<Vec<Object>>,
475 auxiliary_data: Option<Vec<u8>>,
476 request: HandleCertificateRequestV1,
477}
478
479#[derive(Debug)]
481pub enum ProcessTransactionResult {
482 Certified {
483 certificate: CertifiedTransaction,
484 newly_formed: bool,
490 },
491 Executed(VerifiedCertifiedTransactionEffects, TransactionEvents),
492}
493
494impl ProcessTransactionResult {
495 pub fn into_cert_for_testing(self) -> CertifiedTransaction {
497 match self {
498 Self::Certified { certificate, .. } => certificate,
499 Self::Executed(..) => panic!("Wrong type"),
500 }
501 }
502
503 pub fn into_effects_for_testing(self) -> VerifiedCertifiedTransactionEffects {
506 match self {
507 Self::Certified { .. } => panic!("Wrong type"),
508 Self::Executed(effects, ..) => effects,
509 }
510 }
511}
512
513#[derive(Clone)]
516pub struct AuthorityAggregator<A: Clone> {
517 pub committee: Arc<Committee>,
519 pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
523 pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
525 pub metrics: Arc<AuthAggMetrics>,
527 pub safe_client_metrics_base: SafeClientMetricsBase,
530 pub timeouts: TimeoutConfig,
531 pub committee_store: Arc<CommitteeStore>,
533}
534
535impl<A: Clone> AuthorityAggregator<A> {
536 pub fn new(
538 committee: Committee,
539 committee_store: Arc<CommitteeStore>,
540 authority_clients: BTreeMap<AuthorityName, A>,
541 safe_client_metrics_base: SafeClientMetricsBase,
542 auth_agg_metrics: Arc<AuthAggMetrics>,
543 validator_display_names: Arc<HashMap<AuthorityName, String>>,
544 timeouts: TimeoutConfig,
545 ) -> Self {
546 Self {
547 committee: Arc::new(committee),
548 authority_clients: create_safe_clients(
549 authority_clients,
550 &committee_store,
551 &safe_client_metrics_base,
552 ),
553 metrics: auth_agg_metrics,
554 safe_client_metrics_base,
555 timeouts,
556 committee_store,
557 validator_display_names,
558 }
559 }
560
561 pub fn recreate_with_net_addresses(
568 &self,
569 committee: CommitteeWithNetworkMetadata,
570 network_config: &Config,
571 disallow_missing_intermediate_committees: bool,
572 ) -> IotaResult<AuthorityAggregator<NetworkAuthorityClient>> {
573 let network_clients =
574 make_network_authority_clients_with_network_config(&committee, network_config);
575
576 let safe_clients = network_clients
577 .into_iter()
578 .map(|(name, api)| {
579 (
580 name,
581 Arc::new(SafeClient::new(
582 api,
583 self.committee_store.clone(),
584 name,
585 SafeClientMetrics::new(&self.safe_client_metrics_base, name),
586 )),
587 )
588 })
589 .collect::<BTreeMap<_, _>>();
590
591 let new_committee = committee.committee().clone();
595 if disallow_missing_intermediate_committees {
596 fp_ensure!(
597 self.committee.epoch + 1 == new_committee.epoch,
598 IotaError::AdvanceEpoch {
599 error: format!(
600 "Trying to advance from epoch {} to epoch {}",
601 self.committee.epoch, new_committee.epoch
602 )
603 }
604 );
605 }
606 let _ = self.committee_store.insert_new_committee(&new_committee);
612 Ok(AuthorityAggregator {
613 committee: Arc::new(new_committee),
614 authority_clients: Arc::new(safe_clients),
615 metrics: self.metrics.clone(),
616 timeouts: self.timeouts.clone(),
617 safe_client_metrics_base: self.safe_client_metrics_base.clone(),
618 committee_store: self.committee_store.clone(),
619 validator_display_names: Arc::new(HashMap::new()),
620 })
621 }
622
623 pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
625 self.authority_clients.get(name)
626 }
627
628 pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
630 where
631 A: Clone,
632 {
633 self.authority_clients[name].clone()
634 }
635
636 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
638 self.committee_store.clone()
639 }
640
641 pub fn clone_inner_committee_test_only(&self) -> Committee {
643 (*self.committee).clone()
644 }
645
646 pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
648 (*self.authority_clients)
649 .clone()
650 .into_iter()
651 .map(|(k, v)| (k, (*v).clone()))
652 .collect()
653 }
654}
655
656fn create_safe_clients<A: Clone>(
658 authority_clients: BTreeMap<AuthorityName, A>,
659 committee_store: &Arc<CommitteeStore>,
660 safe_client_metrics_base: &SafeClientMetricsBase,
661) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
662 Arc::new(
663 authority_clients
664 .into_iter()
665 .map(|(name, api)| {
666 (
667 name,
668 Arc::new(SafeClient::new(
669 api,
670 committee_store.clone(),
671 name,
672 SafeClientMetrics::new(safe_client_metrics_base, name),
673 )),
674 )
675 })
676 .collect(),
677 )
678}
679
680impl AuthorityAggregator<NetworkAuthorityClient> {
681 pub fn new_from_epoch_start_state(
685 epoch_start_state: &EpochStartSystemState,
686 committee_store: &Arc<CommitteeStore>,
687 safe_client_metrics_base: SafeClientMetricsBase,
688 auth_agg_metrics: Arc<AuthAggMetrics>,
689 ) -> Self {
690 let committee = epoch_start_state.get_iota_committee_with_network_metadata();
691 let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
692 Self::new_from_committee(
693 committee,
694 committee_store,
695 safe_client_metrics_base,
696 auth_agg_metrics,
697 Arc::new(validator_display_names),
698 )
699 }
700
701 pub fn recreate_with_new_epoch_start_state(
706 &self,
707 epoch_start_state: &EpochStartSystemState,
708 ) -> Self {
709 Self::new_from_epoch_start_state(
710 epoch_start_state,
711 &self.committee_store,
712 self.safe_client_metrics_base.clone(),
713 self.metrics.clone(),
714 )
715 }
716
717 pub fn new_from_committee(
718 committee: CommitteeWithNetworkMetadata,
719 committee_store: &Arc<CommitteeStore>,
720 safe_client_metrics_base: SafeClientMetricsBase,
721 auth_agg_metrics: Arc<AuthAggMetrics>,
722 validator_display_names: Arc<HashMap<AuthorityName, String>>,
723 ) -> Self {
724 let net_config = default_iota_network_config();
725 let authority_clients =
726 make_network_authority_clients_with_network_config(&committee, &net_config);
727 Self::new(
728 committee.committee().clone(),
729 committee_store.clone(),
730 authority_clients,
731 safe_client_metrics_base,
732 auth_agg_metrics,
733 validator_display_names,
734 Default::default(),
735 )
736 }
737}
738
739impl<A> AuthorityAggregator<A>
740where
741 A: AuthorityAPI + Send + Sync + 'static + Clone,
742{
743 async fn quorum_once_inner<'a, S, FMap>(
748 &'a self,
749 preferences: Option<&BTreeSet<AuthorityName>>,
751 restrict_to: Option<&BTreeSet<AuthorityName>>,
753 map_each_authority: FMap,
756 timeout_each_authority: Duration,
757 authority_errors: &mut HashMap<AuthorityName, IotaError>,
758 ) -> Result<S, IotaError>
759 where
760 FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
761 + Send
762 + Clone
763 + 'a,
764 S: Send,
765 {
766 let start = tokio::time::Instant::now();
767 let mut delay = Duration::from_secs(1);
768 loop {
769 let authorities_shuffled = self.committee.shuffle_by_stake(preferences, restrict_to);
770 let mut authorities_shuffled = authorities_shuffled.iter();
771
772 type RequestResult<S> = Result<Result<S, IotaError>, tokio::time::error::Elapsed>;
773
774 #[expect(clippy::large_enum_variant)]
775 enum Event<S> {
776 StartNext,
777 Request(AuthorityName, RequestResult<S>),
778 }
779
780 let mut futures = FuturesUnordered::<BoxFuture<'a, Event<S>>>::new();
781
782 let start_req = |name: AuthorityName, client: Arc<SafeClient<A>>| {
783 let map_each_authority = map_each_authority.clone();
784 Box::pin(monitored_future!(async move {
785 trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start, "new request");
786 let map = map_each_authority(name, client);
787 Event::Request(name, timeout(timeout_each_authority, map).await)
788 }))
789 };
790
791 let schedule_next = || {
792 let delay = self.timeouts.serial_authority_request_interval;
793 Box::pin(monitored_future!(async move {
794 sleep(delay).await;
795 Event::StartNext
796 }))
797 };
798
799 let name = authorities_shuffled.next().ok_or_else(|| {
822 error!(
823 ?preferences,
824 ?restrict_to,
825 "Available authorities list is empty."
826 );
827 IotaError::from("Available authorities list is empty")
828 })?;
829 futures.push(start_req(*name, self.authority_clients[name].clone()));
830 futures.push(schedule_next());
831
832 while let Some(res) = futures.next().await {
833 match res {
834 Event::StartNext => {
835 trace!(now = ?tokio::time::Instant::now() - start, "eagerly beginning next request");
836 futures.push(schedule_next());
837 }
838 Event::Request(name, res) => {
839 match res {
840 Err(_) => {
842 debug!(name=?name.concise(), "authority request timed out");
843 authority_errors.insert(name, IotaError::Timeout);
844 }
845 Ok(inner_res) => {
847 trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start,
848 "request completed successfully");
849 match inner_res {
850 Err(e) => authority_errors.insert(name, e),
851 Ok(res) => return Ok(res),
852 };
853 }
854 };
855 }
856 }
857
858 if let Some(next_authority) = authorities_shuffled.next() {
859 futures.push(start_req(
860 *next_authority,
861 self.authority_clients[next_authority].clone(),
862 ));
863 } else {
864 break;
865 }
866 }
867
868 info!(
869 ?authority_errors,
870 "quorum_once_with_timeout failed on all authorities, retrying in {:?}", delay
871 );
872 sleep(delay).await;
873 delay = std::cmp::min(delay * 2, Duration::from_secs(5 * 60));
874 }
875 }
876
877 pub(crate) async fn quorum_once_with_timeout<'a, S, FMap>(
884 &'a self,
885 preferences: Option<&BTreeSet<AuthorityName>>,
887 restrict_to: Option<&BTreeSet<AuthorityName>>,
889 map_each_authority: FMap,
892 timeout_each_authority: Duration,
893 timeout_total: Option<Duration>,
895 description: String,
897 ) -> Result<S, IotaError>
898 where
899 FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
900 + Send
901 + Clone
902 + 'a,
903 S: Send,
904 {
905 let mut authority_errors = HashMap::new();
906
907 let fut = self.quorum_once_inner(
908 preferences,
909 restrict_to,
910 map_each_authority,
911 timeout_each_authority,
912 &mut authority_errors,
913 );
914
915 if let Some(t) = timeout_total {
916 timeout(t, fut).await.map_err(|_timeout_error| {
917 if authority_errors.is_empty() {
918 IotaError::Timeout
919 } else {
920 IotaError::TooManyIncorrectAuthorities {
921 errors: authority_errors
922 .iter()
923 .map(|(a, b)| (*a, b.clone()))
924 .collect(),
925 action: description,
926 }
927 }
928 })?
929 } else {
930 fut.await
931 }
932 }
933
934 pub async fn get_latest_object_version_for_testing(
941 &self,
942 object_id: ObjectID,
943 ) -> IotaResult<Object> {
944 #[derive(Debug, Default)]
945 struct State {
946 latest_object_version: Option<Object>,
947 total_weight: StakeUnit,
948 }
949 let initial_state = State::default();
950 let result = quorum_map_then_reduce_with_timeout(
951 self.committee.clone(),
952 self.authority_clients.clone(),
953 initial_state,
954 |_name, client| {
955 Box::pin(async move {
956 let request =
957 ObjectInfoRequest::latest_object_info_request(object_id, LayoutGenerationOption::None);
958 client.handle_object_info_request(request).await
959 })
960 },
961 |mut state, name, weight, result| {
962 Box::pin(async move {
963 state.total_weight += weight;
964 match result {
965 Ok(object_info) => {
966 debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
967 if state.latest_object_version.as_ref().is_none_or(|latest| {
968 object_info.object.version() > latest.version()
969 }) {
970 state.latest_object_version = Some(object_info.object);
971 }
972 }
973 Err(err) => {
974 debug!("Received error from validator {:?}: {:?}", name.concise(), err);
975 }
976 };
977 if state.total_weight >= self.committee.quorum_threshold() {
978 if let Some(object) = state.latest_object_version {
979 return ReduceOutput::Success(object);
980 } else {
981 return ReduceOutput::Failed(state);
982 }
983 }
984 ReduceOutput::Continue(state)
985 })
986 },
987 self.timeouts.pre_quorum_timeout,
989 )
990 .await.map_err(|_state| IotaError::from(UserInputError::ObjectNotFound {
991 object_id,
992 version: None,
993 }))?;
994 Ok(result.0)
995 }
996
997 pub async fn get_latest_system_state_object_for_testing(
1001 &self,
1002 ) -> anyhow::Result<IotaSystemState> {
1003 #[derive(Debug, Default)]
1004 struct State {
1005 latest_system_state: Option<IotaSystemState>,
1006 total_weight: StakeUnit,
1007 }
1008 let initial_state = State::default();
1009 let result = quorum_map_then_reduce_with_timeout(
1010 self.committee.clone(),
1011 self.authority_clients.clone(),
1012 initial_state,
1013 |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
1014 |mut state, name, weight, result| {
1015 Box::pin(async move {
1016 state.total_weight += weight;
1017 match result {
1018 Ok(system_state) => {
1019 debug!(
1020 "Received system state object from validator {:?} with epoch: {:?}",
1021 name.concise(),
1022 system_state.epoch()
1023 );
1024 if state
1025 .latest_system_state
1026 .as_ref()
1027 .is_none_or(|latest| system_state.epoch() > latest.epoch())
1028 {
1029 state.latest_system_state = Some(system_state);
1030 }
1031 }
1032 Err(err) => {
1033 debug!(
1034 "Received error from validator {:?}: {:?}",
1035 name.concise(),
1036 err
1037 );
1038 }
1039 };
1040 if state.total_weight >= self.committee.quorum_threshold() {
1041 if let Some(system_state) = state.latest_system_state {
1042 return ReduceOutput::Success(system_state);
1043 } else {
1044 return ReduceOutput::Failed(state);
1045 }
1046 }
1047 ReduceOutput::Continue(state)
1048 })
1049 },
1050 self.timeouts.pre_quorum_timeout,
1052 )
1053 .await
1054 .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
1055 Ok(result.0)
1056 }
1057
1058 #[instrument(level = "trace", skip_all)]
1060 pub async fn process_transaction(
1061 &self,
1062 transaction: Transaction,
1063 client_addr: Option<SocketAddr>,
1064 ) -> Result<ProcessTransactionResult, AggregatorProcessTransactionError> {
1065 let tx_digest = transaction.digest();
1067 debug!(
1068 tx_digest = ?tx_digest,
1069 "Broadcasting transaction request to authorities"
1070 );
1071 trace!(
1072 "Transaction data: {:?}",
1073 transaction.data().intent_message().value
1074 );
1075 let committee = self.committee.clone();
1076 let state = ProcessTransactionState {
1077 tx_signatures: StakeAggregator::new(committee.clone()),
1078 effects_map: MultiStakeAggregator::new(committee.clone()),
1079 errors: vec![],
1080 object_or_package_not_found_stake: 0,
1081 non_retryable_stake: 0,
1082 overloaded_stake: 0,
1083 retryable_overload_info: Default::default(),
1084 retryable: true,
1085 conflicting_tx_digests: Default::default(),
1086 tx_finalized_with_different_user_sig: false,
1087 };
1088
1089 let transaction_ref = &transaction;
1090 let validity_threshold = committee.validity_threshold();
1091 let quorum_threshold = committee.quorum_threshold();
1092 let validator_display_names = self.validator_display_names.clone();
1093 let result = quorum_map_then_reduce_with_timeout(
1094 committee.clone(),
1095 self.authority_clients.clone(),
1096 state,
1097 |name, client| {
1098 Box::pin(
1099 async move {
1100 let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
1101 let concise_name = name.concise_owned();
1102 client.handle_transaction(transaction_ref.clone(), client_addr)
1103 .monitor_cancellation()
1104 .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
1105 .await
1106 },
1107 )
1108 },
1109 |mut state, name, weight, response| {
1110 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1111 Box::pin(async move {
1112 match self.handle_process_transaction_response(
1113 tx_digest, &mut state, response, name, weight,
1114 ) {
1115 Ok(Some(result)) => {
1116 self.record_process_transaction_metrics(tx_digest, &state);
1117 return ReduceOutput::Success(result);
1118 }
1119 Ok(None) => {},
1120 Err(err) => {
1121 let concise_name = name.concise();
1122 debug!(?tx_digest, name=?concise_name, weight, "Error processing transaction from validator: {:?}", err);
1123 self.metrics
1124 .process_tx_errors
1125 .with_label_values(&[display_name.as_str(), err.as_ref()])
1126 .inc();
1127 Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
1128 state.record_conflicting_transaction_if_any(name, weight, &err);
1130 let (retryable, categorized) = err.is_retryable();
1131 if !categorized {
1132 error!(?tx_digest, "uncategorized tx error: {err}");
1135 }
1136 if err.is_object_or_package_not_found() {
1137 state.object_or_package_not_found_stake += weight;
1142 }
1143 else if err.is_overload() {
1144 state.overloaded_stake += weight;
1151 }
1152 else if err.is_retryable_overload() {
1153 state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
1161 }
1162 else if !retryable {
1163 state.non_retryable_stake += weight;
1164 }
1165 state.errors.push((err, vec![name], weight));
1166
1167 }
1168 };
1169
1170 let retryable_stake = self.get_retryable_stake(&state);
1171 let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
1172 if good_stake + retryable_stake < quorum_threshold {
1173 debug!(
1174 tx_digest = ?tx_digest,
1175 good_stake,
1176 retryable_stake,
1177 "No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
1178 state.conflicting_tx_digests
1179 );
1180 state.retryable = false;
1182 return ReduceOutput::Failed(state);
1183 }
1184
1185 if state.non_retryable_stake >= validity_threshold
1187 || state.object_or_package_not_found_stake >= quorum_threshold || state.overloaded_stake >= quorum_threshold {
1189 state.retryable = false;
1192 ReduceOutput::Failed(state)
1193 } else {
1194 ReduceOutput::Continue(state)
1195 }
1196 })
1197 },
1198 self.timeouts.pre_quorum_timeout,
1200 )
1201 .await;
1202
1203 match result {
1204 Ok((result, _)) => Ok(result),
1205 Err(state) => {
1206 self.record_process_transaction_metrics(tx_digest, &state);
1207 let state = self.record_non_quorum_effects_maybe(tx_digest, state);
1208 Err(self.handle_process_transaction_error(state))
1209 }
1210 }
1211 }
1212
1213 fn record_rpc_error_maybe(metrics: Arc<AuthAggMetrics>, display_name: &str, error: &IotaError) {
1215 if let IotaError::Rpc(_message, code) = error {
1216 metrics
1217 .total_rpc_err
1218 .with_label_values(&[display_name, code.as_str()])
1219 .inc();
1220 }
1221 }
1222
1223 fn handle_process_transaction_error(
1225 &self,
1226 state: ProcessTransactionState,
1227 ) -> AggregatorProcessTransactionError {
1228 let quorum_threshold = self.committee.quorum_threshold();
1229
1230 if state.overloaded_stake >= quorum_threshold {
1232 return AggregatorProcessTransactionError::SystemOverload {
1233 overloaded_stake: state.overloaded_stake,
1234 errors: group_errors(state.errors),
1235 };
1236 }
1237
1238 if !state.retryable {
1239 if state.tx_finalized_with_different_user_sig
1240 || state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1241 self.committee.validity_threshold(),
1242 )
1243 {
1244 return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
1245 }
1246
1247 if !state.conflicting_tx_digests.is_empty() {
1250 let good_stake = state.tx_signatures.total_votes();
1251 warn!(
1252 ?state.conflicting_tx_digests,
1253 original_tx_stake = good_stake,
1254 "Client double spend attempt detected!",
1255 );
1256 self.metrics
1257 .total_client_double_spend_attempts_detected
1258 .inc();
1259 return AggregatorProcessTransactionError::FatalConflictingTransaction {
1260 errors: group_errors(state.errors),
1261 conflicting_tx_digests: state.conflicting_tx_digests,
1262 };
1263 }
1264
1265 return AggregatorProcessTransactionError::FatalTransaction {
1266 errors: group_errors(state.errors),
1267 };
1268 }
1269
1270 if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
1277 >= quorum_threshold
1278 {
1279 let retry_after_secs = state
1280 .retryable_overload_info
1281 .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
1282 .as_secs();
1283 return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
1284 overload_stake: state.retryable_overload_info.total_stake,
1285 errors: group_errors(state.errors),
1286 retry_after_secs,
1287 };
1288 }
1289
1290 AggregatorProcessTransactionError::RetryableTransaction {
1292 errors: group_errors(state.errors),
1293 }
1294 }
1295
1296 fn record_process_transaction_metrics(
1298 &self,
1299 tx_digest: &TransactionDigest,
1300 state: &ProcessTransactionState,
1301 ) {
1302 let num_signatures = state.tx_signatures.validator_sig_count();
1303 let good_stake = state.tx_signatures.total_votes();
1304 debug!(
1305 ?tx_digest,
1306 num_errors = state.errors.iter().map(|e| e.1.len()).sum::<usize>(),
1307 num_unique_errors = state.errors.len(),
1308 ?good_stake,
1309 non_retryable_stake = state.non_retryable_stake,
1310 ?num_signatures,
1311 "Received signatures response from validators handle_transaction"
1312 );
1313 if !state.errors.is_empty() {
1314 debug!(?tx_digest, "Errors received: {:?}", state.errors);
1315 }
1316 }
1317
1318 fn handle_process_transaction_response(
1320 &self,
1321 tx_digest: &TransactionDigest,
1322 state: &mut ProcessTransactionState,
1323 response: IotaResult<PlainTransactionInfoResponse>,
1324 name: AuthorityName,
1325 weight: StakeUnit,
1326 ) -> IotaResult<Option<ProcessTransactionResult>> {
1327 match response {
1328 Ok(PlainTransactionInfoResponse::Signed(signed)) => {
1329 debug!(?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction");
1330 self.handle_transaction_response_with_signed(state, signed)
1331 }
1332 Ok(PlainTransactionInfoResponse::ExecutedWithCert(cert, effects, events)) => {
1333 debug!(?tx_digest, name=?name.concise(), weight, "Received prev certificate and effects from validator handle_transaction");
1334 self.handle_transaction_response_with_executed(state, Some(cert), effects, events)
1335 }
1336 Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(_, effects, events)) => {
1337 debug!(?tx_digest, name=?name.concise(), weight, "Received prev effects from validator handle_transaction");
1338 self.handle_transaction_response_with_executed(state, None, effects, events)
1339 }
1340 Err(err) => Err(err),
1341 }
1342 }
1343
1344 fn handle_transaction_response_with_signed(
1347 &self,
1348 state: &mut ProcessTransactionState,
1349 plain_tx: SignedTransaction,
1350 ) -> IotaResult<Option<ProcessTransactionResult>> {
1351 match state.tx_signatures.insert(plain_tx.clone()) {
1352 InsertResult::NotEnoughVotes {
1353 bad_votes,
1354 bad_authorities,
1355 } => {
1356 state.non_retryable_stake += bad_votes;
1357 if bad_votes > 0 {
1358 state.errors.push((
1359 IotaError::InvalidSignature {
1360 error: "Individual signature verification failed".to_string(),
1361 },
1362 bad_authorities,
1363 bad_votes,
1364 ));
1365 }
1366 Ok(None)
1367 }
1368 InsertResult::Failed { error } => Err(error),
1369 InsertResult::QuorumReached(cert_sig) => {
1370 let certificate =
1371 CertifiedTransaction::new_from_data_and_sig(plain_tx.into_data(), cert_sig);
1372 certificate.verify_committee_sigs_only(&self.committee)?;
1373 Ok(Some(ProcessTransactionResult::Certified {
1374 certificate,
1375 newly_formed: true,
1376 }))
1377 }
1378 }
1379 }
1380
1381 fn handle_transaction_response_with_executed(
1385 &self,
1386 state: &mut ProcessTransactionState,
1387 certificate: Option<CertifiedTransaction>,
1388 plain_tx_effects: SignedTransactionEffects,
1389 events: TransactionEvents,
1390 ) -> IotaResult<Option<ProcessTransactionResult>> {
1391 match certificate {
1392 Some(certificate) if certificate.epoch() == self.committee.epoch => {
1393 Ok(Some(ProcessTransactionResult::Certified {
1397 certificate,
1398 newly_formed: false,
1399 }))
1400 }
1401 _ => {
1402 let digest = plain_tx_effects.data().digest();
1407 match state.effects_map.insert(digest, plain_tx_effects.clone()) {
1408 InsertResult::NotEnoughVotes {
1409 bad_votes,
1410 bad_authorities,
1411 } => {
1412 state.non_retryable_stake += bad_votes;
1413 if bad_votes > 0 {
1414 state.errors.push((
1415 IotaError::InvalidSignature {
1416 error: "Individual signature verification failed".to_string(),
1417 },
1418 bad_authorities,
1419 bad_votes,
1420 ));
1421 }
1422 Ok(None)
1423 }
1424 InsertResult::Failed { error } => Err(error),
1425 InsertResult::QuorumReached(cert_sig) => {
1426 let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1427 plain_tx_effects.into_data(),
1428 cert_sig,
1429 );
1430 Ok(Some(ProcessTransactionResult::Executed(
1431 ct.verify(&self.committee)?,
1432 events,
1433 )))
1434 }
1435 }
1436 }
1437 }
1438 }
1439
1440 fn record_non_quorum_effects_maybe(
1442 &self,
1443 tx_digest: &TransactionDigest,
1444 mut state: ProcessTransactionState,
1445 ) -> ProcessTransactionState {
1446 if state.effects_map.unique_key_count() > 0 {
1447 let non_quorum_effects = state.effects_map.get_all_unique_values();
1448 warn!(
1449 ?tx_digest,
1450 "Received signed Effects but not with a quorum {:?}", non_quorum_effects
1451 );
1452
1453 let (_most_staked_effects_digest, (_, most_staked_effects_digest_stake)) =
1456 non_quorum_effects
1457 .iter()
1458 .max_by_key(|&(_, (_, stake))| stake)
1459 .unwrap();
1460 if most_staked_effects_digest_stake + self.get_retryable_stake(&state)
1464 < self.committee.quorum_threshold()
1465 {
1466 state.retryable = false;
1467 if state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1468 self.committee.validity_threshold(),
1469 ) {
1470 state.tx_finalized_with_different_user_sig = true;
1471 } else {
1472 error!(
1474 "We have seen signed effects but unable to reach quorum threshold even including retriable stakes. This is very rare. Tx: {tx_digest:?}. Non-quorum effects: {non_quorum_effects:?}."
1475 );
1476 }
1477 }
1478
1479 let mut involved_validators = Vec::new();
1480 let mut total_stake = 0;
1481 for (validators, stake) in non_quorum_effects.values() {
1482 involved_validators.extend_from_slice(validators);
1483 total_stake += stake;
1484 }
1485 state.errors.push((
1489 IotaError::QuorumFailedToGetEffectsQuorumWhenProcessingTransaction {
1490 effects_map: non_quorum_effects,
1491 },
1492 involved_validators,
1493 total_stake,
1494 ));
1495 }
1496 state
1497 }
1498
1499 fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
1501 self.committee.total_votes()
1502 - state.non_retryable_stake
1503 - state.effects_map.total_votes()
1504 - state.tx_signatures.total_votes()
1505 }
1506
1507 #[instrument(level = "trace", skip_all)]
1510 pub async fn process_certificate(
1511 &self,
1512 request: HandleCertificateRequestV1,
1513 client_addr: Option<SocketAddr>,
1514 ) -> Result<QuorumDriverResponse, AggregatorProcessCertificateError> {
1515 let state = ProcessCertificateState {
1516 effects_map: MultiStakeAggregator::new(self.committee.clone()),
1517 non_retryable_stake: 0,
1518 non_retryable_errors: vec![],
1519 retryable_errors: vec![],
1520 retryable: true,
1521 events: None,
1522 input_objects: None,
1523 output_objects: None,
1524 auxiliary_data: None,
1525 request: request.clone(),
1526 };
1527
1528 let validators_to_sample =
1531 if request.include_input_objects || request.include_output_objects {
1532 const NUMBER_TO_SAMPLE: usize = 10;
1534
1535 self.committee
1536 .choose_multiple_weighted_iter(NUMBER_TO_SAMPLE)
1537 .cloned()
1538 .collect()
1539 } else {
1540 HashSet::new()
1541 };
1542
1543 let tx_digest = *request.certificate.digest();
1544 let timeout_after_quorum = self.timeouts.post_quorum_timeout;
1545
1546 let request_ref = request;
1547 let threshold = self.committee.quorum_threshold();
1548 let validity = self.committee.validity_threshold();
1549
1550 debug!(
1551 ?tx_digest,
1552 quorum_threshold = threshold,
1553 validity_threshold = validity,
1554 ?timeout_after_quorum,
1555 "Broadcasting certificate to authorities"
1556 );
1557 let committee: Arc<Committee> = self.committee.clone();
1558 let authority_clients = self.authority_clients.clone();
1559 let metrics = self.metrics.clone();
1560 let metrics_clone = metrics.clone();
1561 let validator_display_names = self.validator_display_names.clone();
1562 let (result, mut remaining_tasks) = quorum_map_then_reduce_with_timeout(
1563 committee.clone(),
1564 authority_clients.clone(),
1565 state,
1566 move |name, client| {
1567 Box::pin(async move {
1568 let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
1569 let concise_name = name.concise_owned();
1570 if request_ref.include_input_objects || request_ref.include_output_objects {
1571
1572 let req = if validators_to_sample.contains(&name) {
1574 request_ref
1575 } else {
1576 HandleCertificateRequestV1 {
1577 include_input_objects: false,
1578 include_output_objects: false,
1579 include_auxiliary_data: false,
1580 ..request_ref
1581 }
1582 };
1583
1584 client
1585 .handle_certificate_v1(req, client_addr)
1586 .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1587 .await
1588 } else {
1589 client
1590 .handle_certificate_v1(HandleCertificateRequestV1::new(request_ref.certificate).with_events(), client_addr)
1591 .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1592 .await
1593 .map(|response| HandleCertificateResponseV1 {
1594 signed_effects: response.signed_effects,
1595 events: response.events,
1596 input_objects: None,
1597 output_objects: None,
1598 auxiliary_data: None,
1599 })
1600 }
1601 })
1602 },
1603 move |mut state, name, weight, response| {
1604 let committee_clone = committee.clone();
1605 let metrics = metrics.clone();
1606 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1607 Box::pin(async move {
1608 match AuthorityAggregator::<A>::handle_process_certificate_response(
1611 committee_clone,
1612 &metrics,
1613 &tx_digest, &mut state, response, name)
1614 {
1615 Ok(Some(effects)) => ReduceOutput::Success(effects),
1616 Ok(None) => {
1617 if state.non_retryable_stake >= validity {
1621 state.retryable = false;
1622 ReduceOutput::Failed(state)
1623 } else {
1624 ReduceOutput::Continue(state)
1625 }
1626 },
1627 Err(err) => {
1628 let concise_name = name.concise();
1629 debug!(?tx_digest, name=?concise_name, "Error processing certificate from validator: {:?}", err);
1630 metrics
1631 .process_cert_errors
1632 .with_label_values(&[display_name.as_str(), err.as_ref()])
1633 .inc();
1634 Self::record_rpc_error_maybe(metrics, &display_name, &err);
1635 let (retryable, categorized) = err.is_retryable();
1636 if !categorized {
1637 error!(?tx_digest, "[WATCHOUT] uncategorized tx error: {err}");
1640 }
1641 if !retryable {
1642 state.non_retryable_stake += weight;
1643 state.non_retryable_errors.push((err, vec![name], weight));
1644 } else {
1645 state.retryable_errors.push((err, vec![name], weight));
1646 }
1647 if state.non_retryable_stake >= validity {
1648 state.retryable = false;
1649 ReduceOutput::Failed(state)
1650 } else {
1651 ReduceOutput::Continue(state)
1652 }
1653 }
1654 }
1655 })
1656 },
1657 self.timeouts.pre_quorum_timeout,
1659 )
1660 .await
1661 .map_err(|state| {
1662 debug!(
1663 ?tx_digest,
1664 num_unique_effects = state.effects_map.unique_key_count(),
1665 non_retryable_stake = state.non_retryable_stake,
1666 "Received effects responses from validators"
1667 );
1668
1669 for (iota_err, _, _) in state.retryable_errors.iter().chain(state.non_retryable_errors.iter()) {
1671 self
1672 .metrics
1673 .total_aggregated_err
1674 .with_label_values(&[
1675 iota_err.as_ref(),
1676 if state.retryable {
1677 "recoverable"
1678 } else {
1679 "non-recoverable"
1680 },
1681 ])
1682 .inc();
1683 }
1684 if state.retryable {
1685 AggregatorProcessCertificateError::RetryableExecuteCertificate {
1686 retryable_errors: group_errors(state.retryable_errors),
1687 }
1688 } else {
1689 AggregatorProcessCertificateError::FatalExecuteCertificate {
1690 non_retryable_errors: group_errors(state.non_retryable_errors),
1691 }
1692 }
1693 })?;
1694
1695 let metrics = self.metrics.clone();
1696 metrics
1697 .remaining_tasks_when_reaching_cert_quorum
1698 .observe(remaining_tasks.len() as f64);
1699 if !remaining_tasks.is_empty() {
1700 spawn_monitored_task!(async move {
1702 let mut timeout = Box::pin(sleep(timeout_after_quorum));
1703 loop {
1704 tokio::select! {
1705 _ = &mut timeout => {
1706 debug!(?tx_digest, "Timed out in post quorum cert broadcasting: {:?}. Remaining tasks: {:?}", timeout_after_quorum, remaining_tasks.len());
1707 metrics.cert_broadcasting_post_quorum_timeout.inc();
1708 metrics.remaining_tasks_when_cert_broadcasting_post_quorum_timeout.observe(remaining_tasks.len() as f64);
1709 break;
1710 }
1711 res = remaining_tasks.next() => {
1712 if res.is_none() {
1713 break;
1714 }
1715 }
1716 }
1717 }
1718 });
1719 }
1720 Ok(result)
1721 }
1722
1723 fn handle_process_certificate_response(
1725 committee: Arc<Committee>,
1726 metrics: &AuthAggMetrics,
1727 tx_digest: &TransactionDigest,
1728 state: &mut ProcessCertificateState,
1729 response: IotaResult<HandleCertificateResponseV1>,
1730 name: AuthorityName,
1731 ) -> IotaResult<Option<QuorumDriverResponse>> {
1732 match response {
1733 Ok(HandleCertificateResponseV1 {
1734 signed_effects,
1735 events,
1736 input_objects,
1737 output_objects,
1738 auxiliary_data,
1739 }) => {
1740 debug!(
1741 ?tx_digest,
1742 name = ?name.concise(),
1743 "Validator handled certificate successfully",
1744 );
1745
1746 if events.is_some() && state.events.is_none() {
1747 state.events = events;
1748 }
1749
1750 if input_objects.is_some() && state.input_objects.is_none() {
1751 state.input_objects = input_objects;
1752 }
1753
1754 if output_objects.is_some() && state.output_objects.is_none() {
1755 state.output_objects = output_objects;
1756 }
1757
1758 if auxiliary_data.is_some() && state.auxiliary_data.is_none() {
1759 state.auxiliary_data = auxiliary_data;
1760 }
1761
1762 let effects_digest = *signed_effects.digest();
1763 match state.effects_map.insert(
1765 (signed_effects.epoch(), effects_digest),
1766 signed_effects.clone(),
1767 ) {
1768 InsertResult::NotEnoughVotes {
1769 bad_votes,
1770 bad_authorities,
1771 } => {
1772 state.non_retryable_stake += bad_votes;
1773 if bad_votes > 0 {
1774 state.non_retryable_errors.push((
1775 IotaError::InvalidSignature {
1776 error: "Individual signature verification failed".to_string(),
1777 },
1778 bad_authorities,
1779 bad_votes,
1780 ));
1781 }
1782 Ok(None)
1783 }
1784 InsertResult::Failed { error } => Err(error),
1785 InsertResult::QuorumReached(cert_sig) => {
1786 let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1787 signed_effects.into_data(),
1788 cert_sig,
1789 );
1790
1791 if (state.request.include_input_objects && state.input_objects.is_none())
1792 || (state.request.include_output_objects
1793 && state.output_objects.is_none())
1794 {
1795 metrics.quorum_reached_without_requested_objects.inc();
1796 debug!(
1797 ?tx_digest,
1798 "Quorum Reached but requested input/output objects were not returned"
1799 );
1800 }
1801
1802 ct.verify(&committee).map(|ct| {
1803 debug!(?tx_digest, "Got quorum for validators handle_certificate.");
1804 Some(QuorumDriverResponse {
1805 effects_cert: ct,
1806 events: state.events.take(),
1807 input_objects: state.input_objects.take(),
1808 output_objects: state.output_objects.take(),
1809 auxiliary_data: state.auxiliary_data.take(),
1810 })
1811 })
1812 }
1813 }
1814 }
1815 Err(err) => Err(err),
1816 }
1817 }
1818
1819 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1820 pub async fn execute_transaction_block(
1821 &self,
1822 transaction: &Transaction,
1823 client_addr: Option<SocketAddr>,
1824 ) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
1825 let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
1826 let result = self
1827 .process_transaction(transaction.clone(), client_addr)
1828 .await?;
1829 let cert = match result {
1830 ProcessTransactionResult::Certified { certificate, .. } => certificate,
1831 ProcessTransactionResult::Executed(effects, _) => {
1832 return Ok(effects);
1833 }
1834 };
1835 self.metrics.total_tx_certificates_created.inc();
1836 drop(tx_guard);
1837
1838 let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
1839 let response = self
1840 .process_certificate(
1841 HandleCertificateRequestV1 {
1842 certificate: cert.clone(),
1843 include_events: true,
1844 include_input_objects: false,
1845 include_output_objects: false,
1846 include_auxiliary_data: false,
1847 },
1848 client_addr,
1849 )
1850 .await?;
1851
1852 Ok(response.effects_cert)
1853 }
1854
1855 #[instrument(level = "trace", skip_all, fields(?tx_digest))]
1858 pub async fn handle_transaction_info_request_from_some_validators(
1859 &self,
1860 tx_digest: &TransactionDigest,
1861 validators: &BTreeSet<AuthorityName>,
1863 timeout_total: Option<Duration>,
1864 ) -> IotaResult<PlainTransactionInfoResponse> {
1865 self.quorum_once_with_timeout(
1866 None,
1867 Some(validators),
1868 |_authority, client| {
1869 Box::pin(async move {
1870 client
1871 .handle_transaction_info_request(TransactionInfoRequest {
1872 transaction_digest: *tx_digest,
1873 })
1874 .await
1875 })
1876 },
1877 Duration::from_secs(2),
1878 timeout_total,
1879 "handle_transaction_info_request_from_some_validators".to_string(),
1880 )
1881 .await
1882 }
1883
1884 #[instrument(level = "trace", skip_all)]
1888 pub async fn send_capability_notification_to_quorum(
1889 &self,
1890 request: HandleCapabilityNotificationRequestV1,
1891 ) -> Result<(), AggregatorSendCapabilityNotificationError> {
1892 #[derive(Debug, Default)]
1893 struct CapabilityNotificationState {
1894 good_responses: StakeUnit,
1895 non_retryable_errors: StakeUnit,
1896 retryable_errors: StakeUnit,
1897 errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
1898 }
1899
1900 let validity_threshold = self.committee.validity_threshold();
1901 let quorum_threshold = self.committee.quorum_threshold();
1902 let validator_display_names = self.validator_display_names.clone();
1903
1904 debug!(
1905 "Sending capability notification to committee validators with validity threshold: {}",
1906 validity_threshold
1907 );
1908
1909 let result = quorum_map_then_reduce_with_timeout(
1910 self.committee.clone(),
1911 self.authority_clients.clone(),
1912 CapabilityNotificationState::default(),
1913 |name, client| {
1914 Box::pin(async move {
1915 let concise_name = name.concise_owned();
1916 client
1917 .authority_client()
1918 .handle_capability_notification_v1(request.clone())
1919 .instrument(trace_span!("handle_capability_notification_v1", authority = ?concise_name))
1920 .await
1921 })
1922 },
1923 |mut state, name, weight, response| {
1924 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1925 Box::pin(async move {
1926 match response {
1927 Ok(_) => {
1928 debug!(
1929 authority = ?name.concise(),
1930 weight,
1931 "Successfully sent capability notification to committee validator"
1932 );
1933 state.good_responses += weight;
1934 if state.good_responses >= validity_threshold {
1936 return ReduceOutput::Success(());
1937 }
1938 }
1939 Err(err) => {
1940 debug!(
1941 authority = ?name.concise(),
1942 weight,
1943 error = ?err,
1944 "Failed to send capability notification to committee validator"
1945 );
1946 Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
1947
1948 let (retryable, _categorized) = err.is_retryable();
1949 if retryable {
1950 state.retryable_errors += weight;
1952 } else {
1953 state.non_retryable_errors += weight;
1955 }
1956 state.errors.push((err, vec![name], weight));
1957
1958 if state.non_retryable_errors >= quorum_threshold || (state.non_retryable_errors + state.retryable_errors >= quorum_threshold && state.good_responses + state.retryable_errors >= validity_threshold) {
1960 return ReduceOutput::Failed(state);
1961 }
1962 }
1963 }
1964
1965 ReduceOutput::Continue(state)
1966 })
1967 },
1968 self.timeouts.pre_quorum_timeout,
1970 ).await;
1971
1972 match result {
1973 Ok(_) => {
1974 info!("Successfully sent capability notification to quorum of validators");
1975 self.metrics.capability_notification_success.inc();
1976 Ok(())
1977 }
1978 Err(state) => {
1979 warn!(
1980 good_responses = state.good_responses,
1981 non_retryable_errors = state.non_retryable_errors,
1982 retryable_errors = state.retryable_errors,
1983 validity_threshold,
1984 quorum_threshold,
1985 errors = ?state.errors,
1986 "Failed to reach validity threshold for capability notification"
1987 );
1988 self.metrics.capability_notification_errors.inc();
1989
1990 let grouped_errors = group_errors(state.errors);
1991
1992 if state.non_retryable_errors >= quorum_threshold {
1994 Err(
1995 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
1996 errors: grouped_errors,
1997 },
1998 )
1999 } else {
2000 Err(
2001 AggregatorSendCapabilityNotificationError::RetryableNotification {
2002 errors: grouped_errors,
2003 },
2004 )
2005 }
2006 }
2007 }
2008 }
2009}
2010
2011#[derive(Default)]
2014pub struct AuthorityAggregatorBuilder<'a> {
2015 network_config: Option<&'a NetworkConfig>,
2016 genesis: Option<&'a Genesis>,
2017 committee: Option<Committee>,
2018 committee_store: Option<Arc<CommitteeStore>>,
2019 registry: Option<&'a Registry>,
2020 timeouts_config: Option<TimeoutConfig>,
2021}
2022
2023impl<'a> AuthorityAggregatorBuilder<'a> {
2024 pub fn from_network_config(config: &'a NetworkConfig) -> Self {
2026 Self {
2027 network_config: Some(config),
2028 ..Default::default()
2029 }
2030 }
2031
2032 pub fn from_genesis(genesis: &'a Genesis) -> Self {
2034 Self {
2035 genesis: Some(genesis),
2036 ..Default::default()
2037 }
2038 }
2039
2040 pub fn from_committee(committee: Committee) -> Self {
2042 Self {
2043 committee: Some(committee),
2044 ..Default::default()
2045 }
2046 }
2047
2048 pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
2050 self.committee_store = Some(committee_store);
2051 self
2052 }
2053
2054 pub fn with_registry(mut self, registry: &'a Registry) -> Self {
2056 self.registry = Some(registry);
2057 self
2058 }
2059
2060 pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
2062 self.timeouts_config = Some(timeouts_config);
2063 self
2064 }
2065
2066 fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
2067 let genesis = if let Some(network_config) = self.network_config {
2068 &network_config.genesis
2069 } else if let Some(genesis) = self.genesis {
2070 genesis
2071 } else {
2072 panic!("need either NetworkConfig or Genesis.");
2073 };
2074 genesis.committee_with_network()
2075 }
2076
2077 fn get_committee(&self) -> Committee {
2078 self.committee
2079 .clone()
2080 .unwrap_or_else(|| self.get_network_committee().committee().clone())
2081 }
2082
2083 pub fn build_network_clients(
2084 self,
2085 ) -> (
2086 AuthorityAggregator<NetworkAuthorityClient>,
2087 BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
2088 ) {
2089 let network_committee = self.get_network_committee();
2090 let auth_clients = make_authority_clients_with_timeout_config(
2091 &network_committee,
2092 DEFAULT_CONNECT_TIMEOUT_SEC,
2093 DEFAULT_REQUEST_TIMEOUT_SEC,
2094 );
2095 let auth_agg = self.build_custom_clients(auth_clients.clone());
2096 (auth_agg, auth_clients)
2097 }
2098
2099 pub fn build_custom_clients<C: Clone>(
2100 self,
2101 authority_clients: BTreeMap<AuthorityName, C>,
2102 ) -> AuthorityAggregator<C> {
2103 let committee = self.get_committee();
2104 let registry = Registry::new();
2105 let registry = self.registry.unwrap_or(®istry);
2106 let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
2107 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(registry));
2108
2109 let committee_store = self
2110 .committee_store
2111 .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
2112
2113 let timeouts_config = self.timeouts_config.unwrap_or_default();
2114
2115 AuthorityAggregator::new(
2116 committee,
2117 committee_store,
2118 authority_clients,
2119 safe_client_metrics_base,
2120 auth_agg_metrics,
2121 Arc::new(HashMap::new()),
2122 timeouts_config,
2123 )
2124 }
2125}