1use std::{
7 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
8 convert::AsRef,
9 net::SocketAddr,
10 string::ToString,
11 sync::Arc,
12 time::Duration,
13};
14
15use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
16use iota_authority_aggregation::{AsyncResult, ReduceOutput, quorum_map_then_reduce_with_timeout};
17use iota_config::genesis::Genesis;
18use iota_metrics::{GaugeGuard, MonitorCancellation, monitored_future, spawn_monitored_task};
19use iota_network::{
20 DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_iota_network_config,
21};
22use iota_network_stack::config::Config;
23use iota_swarm_config::network_config::NetworkConfig;
24use iota_types::{
25 base_types::*,
26 committee::{Committee, CommitteeTrait, CommitteeWithNetworkMetadata, StakeUnit},
27 crypto::{AuthorityPublicKeyBytes, AuthoritySignInfo},
28 effects::{
29 CertifiedTransactionEffects, SignedTransactionEffects, TransactionEffects,
30 TransactionEvents, VerifiedCertifiedTransactionEffects,
31 },
32 error::{IotaError, IotaResult, UserInputError},
33 fp_ensure,
34 iota_system_state::{
35 IotaSystemState, IotaSystemStateTrait,
36 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
37 },
38 message_envelope::Message,
39 messages_grpc::{
40 HandleCertificateRequestV1, HandleCertificateResponseV1, LayoutGenerationOption,
41 ObjectInfoRequest, TransactionInfoRequest,
42 },
43 messages_safe_client::PlainTransactionInfoResponse,
44 object::Object,
45 quorum_driver_types::{GroupedErrors, QuorumDriverResponse},
46 transaction::*,
47};
48use prometheus::{
49 Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry,
50 register_int_counter_vec_with_registry, register_int_counter_with_registry,
51 register_int_gauge_with_registry,
52};
53use thiserror::Error;
54use tokio::time::{sleep, timeout};
55use tracing::{Instrument, debug, error, info, instrument, trace, trace_span, warn};
56
57use crate::{
58 authority_client::{
59 AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
60 make_network_authority_clients_with_network_config,
61 },
62 epoch::committee_store::CommitteeStore,
63 safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase},
64 stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator},
65};
66
67pub const DEFAULT_RETRIES: usize = 4;
68
69#[cfg(test)]
70#[path = "unit_tests/authority_aggregator_tests.rs"]
71pub mod authority_aggregator_tests;
72
73#[derive(Clone)]
75pub struct TimeoutConfig {
76 pub pre_quorum_timeout: Duration,
77 pub post_quorum_timeout: Duration,
78
79 pub serial_authority_request_interval: Duration,
83}
84
85impl Default for TimeoutConfig {
86 fn default() -> Self {
87 Self {
88 pre_quorum_timeout: Duration::from_secs(60),
89 post_quorum_timeout: Duration::from_secs(7),
90 serial_authority_request_interval: Duration::from_millis(1000),
91 }
92 }
93}
94
95#[derive(Clone)]
97pub struct AuthAggMetrics {
98 pub total_tx_certificates_created: IntCounter,
99 pub process_tx_errors: IntCounterVec,
100 pub process_cert_errors: IntCounterVec,
101 pub total_client_double_spend_attempts_detected: IntCounter,
102 pub total_aggregated_err: IntCounterVec,
103 pub total_rpc_err: IntCounterVec,
104 pub inflight_transactions: IntGauge,
105 pub inflight_certificates: IntGauge,
106 pub inflight_transaction_requests: IntGauge,
107 pub inflight_certificate_requests: IntGauge,
108
109 pub cert_broadcasting_post_quorum_timeout: IntCounter,
110 pub remaining_tasks_when_reaching_cert_quorum: Histogram,
111 pub remaining_tasks_when_cert_broadcasting_post_quorum_timeout: Histogram,
112 pub quorum_reached_without_requested_objects: IntCounter,
113}
114
115impl AuthAggMetrics {
116 pub fn new(registry: &prometheus::Registry) -> Self {
118 Self {
119 total_tx_certificates_created: register_int_counter_with_registry!(
120 "total_tx_certificates_created",
121 "Total number of certificates made in the authority_aggregator",
122 registry,
123 )
124 .unwrap(),
125 process_tx_errors: register_int_counter_vec_with_registry!(
126 "process_tx_errors",
127 "Number of errors returned from validators when processing transaction, group by validator name and error type",
128 &["name","error"],
129 registry,
130 )
131 .unwrap(),
132 process_cert_errors: register_int_counter_vec_with_registry!(
133 "process_cert_errors",
134 "Number of errors returned from validators when processing certificate, group by validator name and error type",
135 &["name", "error"],
136 registry,
137 )
138 .unwrap(),
139 total_client_double_spend_attempts_detected: register_int_counter_with_registry!(
140 "total_client_double_spend_attempts_detected",
141 "Total number of client double spend attempts that are detected",
142 registry,
143 )
144 .unwrap(),
145 total_aggregated_err: register_int_counter_vec_with_registry!(
146 "total_aggregated_err",
147 "Total number of errors returned from validators, grouped by error type",
148 &["error", "tx_recoverable"],
149 registry,
150 )
151 .unwrap(),
152 total_rpc_err: register_int_counter_vec_with_registry!(
153 "total_rpc_err",
154 "Total number of rpc errors returned from validators, grouped by validator short name and RPC error message",
155 &["name", "error_message"],
156 registry,
157 )
158 .unwrap(),
159 inflight_transactions: register_int_gauge_with_registry!(
160 "auth_agg_inflight_transactions",
161 "Inflight transaction gathering signatures",
162 registry,
163 )
164 .unwrap(),
165 inflight_certificates: register_int_gauge_with_registry!(
166 "auth_agg_inflight_certificates",
167 "Inflight certificates gathering effects",
168 registry,
169 )
170 .unwrap(),
171 inflight_transaction_requests: register_int_gauge_with_registry!(
172 "auth_agg_inflight_transaction_requests",
173 "Inflight handle_transaction requests",
174 registry,
175 )
176 .unwrap(),
177 inflight_certificate_requests: register_int_gauge_with_registry!(
178 "auth_agg_inflight_certificate_requests",
179 "Inflight handle_certificate requests",
180 registry,
181 )
182 .unwrap(),
183 cert_broadcasting_post_quorum_timeout: register_int_counter_with_registry!(
184 "auth_agg_cert_broadcasting_post_quorum_timeout",
185 "Total number of timeout in cert processing post quorum",
186 registry,
187 )
188 .unwrap(),
189 remaining_tasks_when_reaching_cert_quorum: register_histogram_with_registry!(
190 "auth_agg_remaining_tasks_when_reaching_cert_quorum",
191 "Number of remaining tasks when reaching certificate quorum",
192 registry,
193 ).unwrap(),
194 remaining_tasks_when_cert_broadcasting_post_quorum_timeout: register_histogram_with_registry!(
195 "auth_agg_remaining_tasks_when_cert_broadcasting_post_quorum_timeout",
196 "Number of remaining tasks when post quorum certificate broadcasting times out",
197 registry,
198 ).unwrap(),
199 quorum_reached_without_requested_objects: register_int_counter_with_registry!(
200 "auth_agg_quorum_reached_without_requested_objects",
201 "Number of times quorum was reached without getting the requested objects back from at least 1 validator",
202 registry,
203 )
204 .unwrap(),
205 }
206 }
207
208 pub fn new_for_tests() -> Self {
210 let registry = prometheus::Registry::new();
211 Self::new(®istry)
212 }
213}
214
215#[derive(Error, Debug, Eq, PartialEq)]
217pub enum AggregatorProcessTransactionError {
218 #[error(
219 "Failed to execute transaction on a quorum of validators due to non-retryable errors. Validator errors: {:?}",
220 errors
221 )]
222 FatalTransaction { errors: GroupedErrors },
223
224 #[error(
225 "Failed to execute transaction on a quorum of validators but state is still retryable. Validator errors: {:?}",
226 errors
227 )]
228 RetryableTransaction { errors: GroupedErrors },
229
230 #[error(
231 "Failed to execute transaction on a quorum of validators due to conflicting transactions. Locked objects: {:?}. Validator errors: {:?}",
232 conflicting_tx_digests,
233 errors
234 )]
235 FatalConflictingTransaction {
236 errors: GroupedErrors,
237 conflicting_tx_digests:
238 BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
239 },
240
241 #[error(
242 "{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
243 overloaded_stake,
244 errors
245 )]
246 SystemOverload {
247 overloaded_stake: StakeUnit,
248 errors: GroupedErrors,
249 },
250
251 #[error("Transaction is already finalized but with different user signatures")]
252 TxAlreadyFinalizedWithDifferentUserSignatures,
253
254 #[error(
255 "{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
256 overload_stake,
257 retry_after_secs,
258 errors
259 )]
260 SystemOverloadRetryAfter {
261 overload_stake: StakeUnit,
262 errors: GroupedErrors,
263 retry_after_secs: u64,
264 },
265}
266
267#[derive(Error, Debug)]
268pub enum AggregatorProcessCertificateError {
269 #[error(
270 "Failed to execute certificate on a quorum of validators. Non-retryable errors: {:?}",
271 non_retryable_errors
272 )]
273 FatalExecuteCertificate { non_retryable_errors: GroupedErrors },
274
275 #[error(
276 "Failed to execute certificate on a quorum of validators but state is still retryable. Retryable errors: {:?}",
277 retryable_errors
278 )]
279 RetryableExecuteCertificate { retryable_errors: GroupedErrors },
280}
281
282pub fn group_errors(errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>) -> GroupedErrors {
284 #[expect(clippy::mutable_key_type)]
285 let mut grouped_errors = HashMap::new();
286 for (error, names, stake) in errors {
287 let entry = grouped_errors.entry(error).or_insert((0, vec![]));
288 entry.0 += stake;
289 entry.1.extend(
290 names
291 .into_iter()
292 .map(|n| n.concise_owned())
293 .collect::<Vec<_>>(),
294 );
295 }
296 grouped_errors
297 .into_iter()
298 .map(|(e, (s, n))| (e, s, n))
299 .collect()
300}
301
302#[derive(Debug, Default)]
305pub struct RetryableOverloadInfo {
306 pub total_stake: StakeUnit,
308
309 pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
311}
312
313impl RetryableOverloadInfo {
314 pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
315 self.total_stake += stake;
316 self.stake_requested_retry_after
317 .entry(retry_after)
318 .and_modify(|s| *s += stake)
319 .or_insert(stake);
320 }
321
322 pub fn get_quorum_retry_after(
325 &self,
326 good_stake: StakeUnit,
327 quorum_threshold: StakeUnit,
328 ) -> Duration {
329 if self.stake_requested_retry_after.is_empty() {
330 return Duration::from_secs(0);
331 }
332
333 let mut quorum_stake = good_stake;
334 for (retry_after, stake) in self.stake_requested_retry_after.iter() {
335 quorum_stake += *stake;
336 if quorum_stake >= quorum_threshold {
337 return *retry_after;
338 }
339 }
340 *self.stake_requested_retry_after.last_key_value().unwrap().0
341 }
342}
343
344#[derive(Debug)]
345struct ProcessTransactionState {
346 tx_signatures: StakeAggregator<AuthoritySignInfo, true>,
348 effects_map: MultiStakeAggregator<TransactionEffectsDigest, TransactionEffects, true>,
349 errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
351 non_retryable_stake: StakeUnit,
353 object_or_package_not_found_stake: StakeUnit,
355 overloaded_stake: StakeUnit,
357 retryable_overload_info: RetryableOverloadInfo,
359 conflicting_tx_digests:
361 BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
362 retryable: bool,
367 tx_finalized_with_different_user_sig: bool,
368}
369
370impl ProcessTransactionState {
371 pub fn record_conflicting_transaction_if_any(
374 &mut self,
375 validator_name: AuthorityName,
376 weight: StakeUnit,
377 err: &IotaError,
378 ) {
379 if let IotaError::ObjectLockConflict {
380 obj_ref,
381 pending_transaction: transaction,
382 } = err
383 {
384 let (lock_records, total_stake) = self
385 .conflicting_tx_digests
386 .entry(*transaction)
387 .or_insert((Vec::new(), 0));
388 lock_records.push((validator_name, *obj_ref));
389 *total_stake += weight;
390 }
391 }
392
393 pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
395 &self,
396 validity_threshold: StakeUnit,
397 ) -> bool {
398 let invalid_sig_stake: StakeUnit = self
409 .errors
410 .iter()
411 .filter_map(|(e, _, stake)| {
412 if matches!(e, IotaError::FailedToVerifyTxCertWithExecutedEffects { .. }) {
413 Some(stake)
414 } else {
415 None
416 }
417 })
418 .sum();
419 invalid_sig_stake >= validity_threshold
420 }
421}
422
423struct ProcessCertificateState {
425 effects_map:
429 MultiStakeAggregator<(EpochId, TransactionEffectsDigest), TransactionEffects, true>,
430 non_retryable_stake: StakeUnit,
431 non_retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
432 retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
433 retryable: bool,
437
438 events: Option<TransactionEvents>,
442 input_objects: Option<Vec<Object>>,
443 output_objects: Option<Vec<Object>>,
444 auxiliary_data: Option<Vec<u8>>,
445 request: HandleCertificateRequestV1,
446}
447
448#[derive(Debug)]
450pub enum ProcessTransactionResult {
451 Certified {
452 certificate: CertifiedTransaction,
453 newly_formed: bool,
459 },
460 Executed(VerifiedCertifiedTransactionEffects, TransactionEvents),
461}
462
463impl ProcessTransactionResult {
464 pub fn into_cert_for_testing(self) -> CertifiedTransaction {
466 match self {
467 Self::Certified { certificate, .. } => certificate,
468 Self::Executed(..) => panic!("Wrong type"),
469 }
470 }
471
472 pub fn into_effects_for_testing(self) -> VerifiedCertifiedTransactionEffects {
475 match self {
476 Self::Certified { .. } => panic!("Wrong type"),
477 Self::Executed(effects, ..) => effects,
478 }
479 }
480}
481
482#[derive(Clone)]
485pub struct AuthorityAggregator<A: Clone> {
486 pub committee: Arc<Committee>,
488 pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
492 pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
494 pub metrics: Arc<AuthAggMetrics>,
496 pub safe_client_metrics_base: SafeClientMetricsBase,
499 pub timeouts: TimeoutConfig,
500 pub committee_store: Arc<CommitteeStore>,
502}
503
504impl<A: Clone> AuthorityAggregator<A> {
505 pub fn new(
507 committee: Committee,
508 committee_store: Arc<CommitteeStore>,
509 authority_clients: BTreeMap<AuthorityName, A>,
510 safe_client_metrics_base: SafeClientMetricsBase,
511 auth_agg_metrics: Arc<AuthAggMetrics>,
512 validator_display_names: Arc<HashMap<AuthorityName, String>>,
513 timeouts: TimeoutConfig,
514 ) -> Self {
515 Self {
516 committee: Arc::new(committee),
517 authority_clients: create_safe_clients(
518 authority_clients,
519 &committee_store,
520 &safe_client_metrics_base,
521 ),
522 metrics: auth_agg_metrics,
523 safe_client_metrics_base,
524 timeouts,
525 committee_store,
526 validator_display_names,
527 }
528 }
529
530 pub fn recreate_with_net_addresses(
537 &self,
538 committee: CommitteeWithNetworkMetadata,
539 network_config: &Config,
540 disallow_missing_intermediate_committees: bool,
541 ) -> IotaResult<AuthorityAggregator<NetworkAuthorityClient>> {
542 let network_clients =
543 make_network_authority_clients_with_network_config(&committee, network_config);
544
545 let safe_clients = network_clients
546 .into_iter()
547 .map(|(name, api)| {
548 (
549 name,
550 Arc::new(SafeClient::new(
551 api,
552 self.committee_store.clone(),
553 name,
554 SafeClientMetrics::new(&self.safe_client_metrics_base, name),
555 )),
556 )
557 })
558 .collect::<BTreeMap<_, _>>();
559
560 let new_committee = committee.committee().clone();
564 if disallow_missing_intermediate_committees {
565 fp_ensure!(
566 self.committee.epoch + 1 == new_committee.epoch,
567 IotaError::AdvanceEpoch {
568 error: format!(
569 "Trying to advance from epoch {} to epoch {}",
570 self.committee.epoch, new_committee.epoch
571 )
572 }
573 );
574 }
575 let _ = self.committee_store.insert_new_committee(&new_committee);
581 Ok(AuthorityAggregator {
582 committee: Arc::new(new_committee),
583 authority_clients: Arc::new(safe_clients),
584 metrics: self.metrics.clone(),
585 timeouts: self.timeouts.clone(),
586 safe_client_metrics_base: self.safe_client_metrics_base.clone(),
587 committee_store: self.committee_store.clone(),
588 validator_display_names: Arc::new(HashMap::new()),
589 })
590 }
591
592 pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
594 self.authority_clients.get(name)
595 }
596
597 pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
599 where
600 A: Clone,
601 {
602 self.authority_clients[name].clone()
603 }
604
605 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
607 self.committee_store.clone()
608 }
609
610 pub fn clone_inner_committee_test_only(&self) -> Committee {
612 (*self.committee).clone()
613 }
614
615 pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
617 (*self.authority_clients)
618 .clone()
619 .into_iter()
620 .map(|(k, v)| (k, (*v).clone()))
621 .collect()
622 }
623}
624
625fn create_safe_clients<A: Clone>(
627 authority_clients: BTreeMap<AuthorityName, A>,
628 committee_store: &Arc<CommitteeStore>,
629 safe_client_metrics_base: &SafeClientMetricsBase,
630) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
631 Arc::new(
632 authority_clients
633 .into_iter()
634 .map(|(name, api)| {
635 (
636 name,
637 Arc::new(SafeClient::new(
638 api,
639 committee_store.clone(),
640 name,
641 SafeClientMetrics::new(safe_client_metrics_base, name),
642 )),
643 )
644 })
645 .collect(),
646 )
647}
648
649impl AuthorityAggregator<NetworkAuthorityClient> {
650 pub fn new_from_epoch_start_state(
654 epoch_start_state: &EpochStartSystemState,
655 committee_store: &Arc<CommitteeStore>,
656 safe_client_metrics_base: SafeClientMetricsBase,
657 auth_agg_metrics: Arc<AuthAggMetrics>,
658 ) -> Self {
659 let committee = epoch_start_state.get_iota_committee_with_network_metadata();
660 let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
661 Self::new_from_committee(
662 committee,
663 committee_store,
664 safe_client_metrics_base,
665 auth_agg_metrics,
666 Arc::new(validator_display_names),
667 )
668 }
669
670 pub fn recreate_with_new_epoch_start_state(
675 &self,
676 epoch_start_state: &EpochStartSystemState,
677 ) -> Self {
678 Self::new_from_epoch_start_state(
679 epoch_start_state,
680 &self.committee_store,
681 self.safe_client_metrics_base.clone(),
682 self.metrics.clone(),
683 )
684 }
685
686 pub fn new_from_committee(
687 committee: CommitteeWithNetworkMetadata,
688 committee_store: &Arc<CommitteeStore>,
689 safe_client_metrics_base: SafeClientMetricsBase,
690 auth_agg_metrics: Arc<AuthAggMetrics>,
691 validator_display_names: Arc<HashMap<AuthorityName, String>>,
692 ) -> Self {
693 let net_config = default_iota_network_config();
694 let authority_clients =
695 make_network_authority_clients_with_network_config(&committee, &net_config);
696 Self::new(
697 committee.committee().clone(),
698 committee_store.clone(),
699 authority_clients,
700 safe_client_metrics_base,
701 auth_agg_metrics,
702 validator_display_names,
703 Default::default(),
704 )
705 }
706}
707
708impl<A> AuthorityAggregator<A>
709where
710 A: AuthorityAPI + Send + Sync + 'static + Clone,
711{
712 async fn quorum_once_inner<'a, S, FMap>(
717 &'a self,
718 preferences: Option<&BTreeSet<AuthorityName>>,
720 restrict_to: Option<&BTreeSet<AuthorityName>>,
722 map_each_authority: FMap,
725 timeout_each_authority: Duration,
726 authority_errors: &mut HashMap<AuthorityName, IotaError>,
727 ) -> Result<S, IotaError>
728 where
729 FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
730 + Send
731 + Clone
732 + 'a,
733 S: Send,
734 {
735 let start = tokio::time::Instant::now();
736 let mut delay = Duration::from_secs(1);
737 loop {
738 let authorities_shuffled = self.committee.shuffle_by_stake(preferences, restrict_to);
739 let mut authorities_shuffled = authorities_shuffled.iter();
740
741 type RequestResult<S> = Result<Result<S, IotaError>, tokio::time::error::Elapsed>;
742
743 enum Event<S> {
744 StartNext,
745 Request(AuthorityName, RequestResult<S>),
746 }
747
748 let mut futures = FuturesUnordered::<BoxFuture<'a, Event<S>>>::new();
749
750 let start_req = |name: AuthorityName, client: Arc<SafeClient<A>>| {
751 let map_each_authority = map_each_authority.clone();
752 Box::pin(monitored_future!(async move {
753 trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start, "new request");
754 let map = map_each_authority(name, client);
755 Event::Request(name, timeout(timeout_each_authority, map).await)
756 }))
757 };
758
759 let schedule_next = || {
760 let delay = self.timeouts.serial_authority_request_interval;
761 Box::pin(monitored_future!(async move {
762 sleep(delay).await;
763 Event::StartNext
764 }))
765 };
766
767 let name = authorities_shuffled.next().ok_or_else(|| {
790 error!(
791 ?preferences,
792 ?restrict_to,
793 "Available authorities list is empty."
794 );
795 IotaError::from("Available authorities list is empty")
796 })?;
797 futures.push(start_req(*name, self.authority_clients[name].clone()));
798 futures.push(schedule_next());
799
800 while let Some(res) = futures.next().await {
801 match res {
802 Event::StartNext => {
803 trace!(now = ?tokio::time::Instant::now() - start, "eagerly beginning next request");
804 futures.push(schedule_next());
805 }
806 Event::Request(name, res) => {
807 match res {
808 Err(_) => {
810 debug!(name=?name.concise(), "authority request timed out");
811 authority_errors.insert(name, IotaError::Timeout);
812 }
813 Ok(inner_res) => {
815 trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start,
816 "request completed successfully");
817 match inner_res {
818 Err(e) => authority_errors.insert(name, e),
819 Ok(res) => return Ok(res),
820 };
821 }
822 };
823 }
824 }
825
826 if let Some(next_authority) = authorities_shuffled.next() {
827 futures.push(start_req(
828 *next_authority,
829 self.authority_clients[next_authority].clone(),
830 ));
831 } else {
832 break;
833 }
834 }
835
836 info!(
837 ?authority_errors,
838 "quorum_once_with_timeout failed on all authorities, retrying in {:?}", delay
839 );
840 sleep(delay).await;
841 delay = std::cmp::min(delay * 2, Duration::from_secs(5 * 60));
842 }
843 }
844
845 pub(crate) async fn quorum_once_with_timeout<'a, S, FMap>(
852 &'a self,
853 preferences: Option<&BTreeSet<AuthorityName>>,
855 restrict_to: Option<&BTreeSet<AuthorityName>>,
857 map_each_authority: FMap,
860 timeout_each_authority: Duration,
861 timeout_total: Option<Duration>,
863 description: String,
865 ) -> Result<S, IotaError>
866 where
867 FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
868 + Send
869 + Clone
870 + 'a,
871 S: Send,
872 {
873 let mut authority_errors = HashMap::new();
874
875 let fut = self.quorum_once_inner(
876 preferences,
877 restrict_to,
878 map_each_authority,
879 timeout_each_authority,
880 &mut authority_errors,
881 );
882
883 if let Some(t) = timeout_total {
884 timeout(t, fut).await.map_err(|_timeout_error| {
885 if authority_errors.is_empty() {
886 IotaError::Timeout
887 } else {
888 IotaError::TooManyIncorrectAuthorities {
889 errors: authority_errors
890 .iter()
891 .map(|(a, b)| (*a, b.clone()))
892 .collect(),
893 action: description,
894 }
895 }
896 })?
897 } else {
898 fut.await
899 }
900 }
901
902 pub async fn get_latest_object_version_for_testing(
909 &self,
910 object_id: ObjectID,
911 ) -> IotaResult<Object> {
912 #[derive(Debug, Default)]
913 struct State {
914 latest_object_version: Option<Object>,
915 total_weight: StakeUnit,
916 }
917 let initial_state = State::default();
918 let result = quorum_map_then_reduce_with_timeout(
919 self.committee.clone(),
920 self.authority_clients.clone(),
921 initial_state,
922 |_name, client| {
923 Box::pin(async move {
924 let request =
925 ObjectInfoRequest::latest_object_info_request(object_id, LayoutGenerationOption::None);
926 client.handle_object_info_request(request).await
927 })
928 },
929 |mut state, name, weight, result| {
930 Box::pin(async move {
931 state.total_weight += weight;
932 match result {
933 Ok(object_info) => {
934 debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
935 if state.latest_object_version.as_ref().is_none_or(|latest| {
936 object_info.object.version() > latest.version()
937 }) {
938 state.latest_object_version = Some(object_info.object);
939 }
940 }
941 Err(err) => {
942 debug!("Received error from validator {:?}: {:?}", name.concise(), err);
943 }
944 };
945 if state.total_weight >= self.committee.quorum_threshold() {
946 if let Some(object) = state.latest_object_version {
947 return ReduceOutput::Success(object);
948 } else {
949 return ReduceOutput::Failed(state);
950 }
951 }
952 ReduceOutput::Continue(state)
953 })
954 },
955 self.timeouts.pre_quorum_timeout,
957 )
958 .await.map_err(|_state| IotaError::from(UserInputError::ObjectNotFound {
959 object_id,
960 version: None,
961 }))?;
962 Ok(result.0)
963 }
964
965 pub async fn get_latest_system_state_object_for_testing(
969 &self,
970 ) -> anyhow::Result<IotaSystemState> {
971 #[derive(Debug, Default)]
972 struct State {
973 latest_system_state: Option<IotaSystemState>,
974 total_weight: StakeUnit,
975 }
976 let initial_state = State::default();
977 let result = quorum_map_then_reduce_with_timeout(
978 self.committee.clone(),
979 self.authority_clients.clone(),
980 initial_state,
981 |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
982 |mut state, name, weight, result| {
983 Box::pin(async move {
984 state.total_weight += weight;
985 match result {
986 Ok(system_state) => {
987 debug!(
988 "Received system state object from validator {:?} with epoch: {:?}",
989 name.concise(),
990 system_state.epoch()
991 );
992 if state
993 .latest_system_state
994 .as_ref()
995 .is_none_or(|latest| system_state.epoch() > latest.epoch())
996 {
997 state.latest_system_state = Some(system_state);
998 }
999 }
1000 Err(err) => {
1001 debug!(
1002 "Received error from validator {:?}: {:?}",
1003 name.concise(),
1004 err
1005 );
1006 }
1007 };
1008 if state.total_weight >= self.committee.quorum_threshold() {
1009 if let Some(system_state) = state.latest_system_state {
1010 return ReduceOutput::Success(system_state);
1011 } else {
1012 return ReduceOutput::Failed(state);
1013 }
1014 }
1015 ReduceOutput::Continue(state)
1016 })
1017 },
1018 self.timeouts.pre_quorum_timeout,
1020 )
1021 .await
1022 .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
1023 Ok(result.0)
1024 }
1025
1026 #[instrument(level = "trace", skip_all)]
1028 pub async fn process_transaction(
1029 &self,
1030 transaction: Transaction,
1031 client_addr: Option<SocketAddr>,
1032 ) -> Result<ProcessTransactionResult, AggregatorProcessTransactionError> {
1033 let tx_digest = transaction.digest();
1035 debug!(
1036 tx_digest = ?tx_digest,
1037 "Broadcasting transaction request to authorities"
1038 );
1039 trace!(
1040 "Transaction data: {:?}",
1041 transaction.data().intent_message().value
1042 );
1043 let committee = self.committee.clone();
1044 let state = ProcessTransactionState {
1045 tx_signatures: StakeAggregator::new(committee.clone()),
1046 effects_map: MultiStakeAggregator::new(committee.clone()),
1047 errors: vec![],
1048 object_or_package_not_found_stake: 0,
1049 non_retryable_stake: 0,
1050 overloaded_stake: 0,
1051 retryable_overload_info: Default::default(),
1052 retryable: true,
1053 conflicting_tx_digests: Default::default(),
1054 tx_finalized_with_different_user_sig: false,
1055 };
1056
1057 let transaction_ref = &transaction;
1058 let validity_threshold = committee.validity_threshold();
1059 let quorum_threshold = committee.quorum_threshold();
1060 let validator_display_names = self.validator_display_names.clone();
1061 let result = quorum_map_then_reduce_with_timeout(
1062 committee.clone(),
1063 self.authority_clients.clone(),
1064 state,
1065 |name, client| {
1066 Box::pin(
1067 async move {
1068 let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
1069 let concise_name = name.concise_owned();
1070 client.handle_transaction(transaction_ref.clone(), client_addr)
1071 .monitor_cancellation()
1072 .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
1073 .await
1074 },
1075 )
1076 },
1077 |mut state, name, weight, response| {
1078 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1079 Box::pin(async move {
1080 match self.handle_process_transaction_response(
1081 tx_digest, &mut state, response, name, weight,
1082 ) {
1083 Ok(Some(result)) => {
1084 self.record_process_transaction_metrics(tx_digest, &state);
1085 return ReduceOutput::Success(result);
1086 }
1087 Ok(None) => {},
1088 Err(err) => {
1089 let concise_name = name.concise();
1090 debug!(?tx_digest, name=?concise_name, weight, "Error processing transaction from validator: {:?}", err);
1091 self.metrics
1092 .process_tx_errors
1093 .with_label_values(&[display_name.as_str(), err.as_ref()])
1094 .inc();
1095 Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
1096 state.record_conflicting_transaction_if_any(name, weight, &err);
1098 let (retryable, categorized) = err.is_retryable();
1099 if !categorized {
1100 error!(?tx_digest, "uncategorized tx error: {err}");
1103 }
1104 if err.is_object_or_package_not_found() {
1105 state.object_or_package_not_found_stake += weight;
1110 }
1111 else if err.is_overload() {
1112 state.overloaded_stake += weight;
1119 }
1120 else if err.is_retryable_overload() {
1121 state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
1129 }
1130 else if !retryable {
1131 state.non_retryable_stake += weight;
1132 }
1133 state.errors.push((err, vec![name], weight));
1134
1135 }
1136 };
1137
1138 let retryable_stake = self.get_retryable_stake(&state);
1139 let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
1140 if good_stake + retryable_stake < quorum_threshold {
1141 debug!(
1142 tx_digest = ?tx_digest,
1143 good_stake,
1144 retryable_stake,
1145 "No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
1146 state.conflicting_tx_digests
1147 );
1148 state.retryable = false;
1150 return ReduceOutput::Failed(state);
1151 }
1152
1153 if state.non_retryable_stake >= validity_threshold
1155 || state.object_or_package_not_found_stake >= quorum_threshold || state.overloaded_stake >= quorum_threshold {
1157 state.retryable = false;
1160 ReduceOutput::Failed(state)
1161 } else {
1162 ReduceOutput::Continue(state)
1163 }
1164 })
1165 },
1166 self.timeouts.pre_quorum_timeout,
1168 )
1169 .await;
1170
1171 match result {
1172 Ok((result, _)) => Ok(result),
1173 Err(state) => {
1174 self.record_process_transaction_metrics(tx_digest, &state);
1175 let state = self.record_non_quorum_effects_maybe(tx_digest, state);
1176 Err(self.handle_process_transaction_error(state))
1177 }
1178 }
1179 }
1180
1181 fn record_rpc_error_maybe(metrics: Arc<AuthAggMetrics>, display_name: &str, error: &IotaError) {
1183 if let IotaError::Rpc(_message, code) = error {
1184 metrics
1185 .total_rpc_err
1186 .with_label_values(&[display_name, code.as_str()])
1187 .inc();
1188 }
1189 }
1190
1191 fn handle_process_transaction_error(
1193 &self,
1194 state: ProcessTransactionState,
1195 ) -> AggregatorProcessTransactionError {
1196 let quorum_threshold = self.committee.quorum_threshold();
1197
1198 if state.overloaded_stake >= quorum_threshold {
1200 return AggregatorProcessTransactionError::SystemOverload {
1201 overloaded_stake: state.overloaded_stake,
1202 errors: group_errors(state.errors),
1203 };
1204 }
1205
1206 if !state.retryable {
1207 if state.tx_finalized_with_different_user_sig
1208 || state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1209 self.committee.validity_threshold(),
1210 )
1211 {
1212 return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
1213 }
1214
1215 if !state.conflicting_tx_digests.is_empty() {
1218 let good_stake = state.tx_signatures.total_votes();
1219 warn!(
1220 ?state.conflicting_tx_digests,
1221 original_tx_stake = good_stake,
1222 "Client double spend attempt detected!",
1223 );
1224 self.metrics
1225 .total_client_double_spend_attempts_detected
1226 .inc();
1227 return AggregatorProcessTransactionError::FatalConflictingTransaction {
1228 errors: group_errors(state.errors),
1229 conflicting_tx_digests: state.conflicting_tx_digests,
1230 };
1231 }
1232
1233 return AggregatorProcessTransactionError::FatalTransaction {
1234 errors: group_errors(state.errors),
1235 };
1236 }
1237
1238 if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
1245 >= quorum_threshold
1246 {
1247 let retry_after_secs = state
1248 .retryable_overload_info
1249 .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
1250 .as_secs();
1251 return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
1252 overload_stake: state.retryable_overload_info.total_stake,
1253 errors: group_errors(state.errors),
1254 retry_after_secs,
1255 };
1256 }
1257
1258 AggregatorProcessTransactionError::RetryableTransaction {
1260 errors: group_errors(state.errors),
1261 }
1262 }
1263
1264 fn record_process_transaction_metrics(
1266 &self,
1267 tx_digest: &TransactionDigest,
1268 state: &ProcessTransactionState,
1269 ) {
1270 let num_signatures = state.tx_signatures.validator_sig_count();
1271 let good_stake = state.tx_signatures.total_votes();
1272 debug!(
1273 ?tx_digest,
1274 num_errors = state.errors.iter().map(|e| e.1.len()).sum::<usize>(),
1275 num_unique_errors = state.errors.len(),
1276 ?good_stake,
1277 non_retryable_stake = state.non_retryable_stake,
1278 ?num_signatures,
1279 "Received signatures response from validators handle_transaction"
1280 );
1281 if !state.errors.is_empty() {
1282 debug!(?tx_digest, "Errors received: {:?}", state.errors);
1283 }
1284 }
1285
1286 fn handle_process_transaction_response(
1288 &self,
1289 tx_digest: &TransactionDigest,
1290 state: &mut ProcessTransactionState,
1291 response: IotaResult<PlainTransactionInfoResponse>,
1292 name: AuthorityName,
1293 weight: StakeUnit,
1294 ) -> IotaResult<Option<ProcessTransactionResult>> {
1295 match response {
1296 Ok(PlainTransactionInfoResponse::Signed(signed)) => {
1297 debug!(?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction");
1298 self.handle_transaction_response_with_signed(state, signed)
1299 }
1300 Ok(PlainTransactionInfoResponse::ExecutedWithCert(cert, effects, events)) => {
1301 debug!(?tx_digest, name=?name.concise(), weight, "Received prev certificate and effects from validator handle_transaction");
1302 self.handle_transaction_response_with_executed(state, Some(cert), effects, events)
1303 }
1304 Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(_, effects, events)) => {
1305 debug!(?tx_digest, name=?name.concise(), weight, "Received prev effects from validator handle_transaction");
1306 self.handle_transaction_response_with_executed(state, None, effects, events)
1307 }
1308 Err(err) => Err(err),
1309 }
1310 }
1311
1312 fn handle_transaction_response_with_signed(
1315 &self,
1316 state: &mut ProcessTransactionState,
1317 plain_tx: SignedTransaction,
1318 ) -> IotaResult<Option<ProcessTransactionResult>> {
1319 match state.tx_signatures.insert(plain_tx.clone()) {
1320 InsertResult::NotEnoughVotes {
1321 bad_votes,
1322 bad_authorities,
1323 } => {
1324 state.non_retryable_stake += bad_votes;
1325 if bad_votes > 0 {
1326 state.errors.push((
1327 IotaError::InvalidSignature {
1328 error: "Individual signature verification failed".to_string(),
1329 },
1330 bad_authorities,
1331 bad_votes,
1332 ));
1333 }
1334 Ok(None)
1335 }
1336 InsertResult::Failed { error } => Err(error),
1337 InsertResult::QuorumReached(cert_sig) => {
1338 let certificate =
1339 CertifiedTransaction::new_from_data_and_sig(plain_tx.into_data(), cert_sig);
1340 certificate.verify_committee_sigs_only(&self.committee)?;
1341 Ok(Some(ProcessTransactionResult::Certified {
1342 certificate,
1343 newly_formed: true,
1344 }))
1345 }
1346 }
1347 }
1348
1349 fn handle_transaction_response_with_executed(
1353 &self,
1354 state: &mut ProcessTransactionState,
1355 certificate: Option<CertifiedTransaction>,
1356 plain_tx_effects: SignedTransactionEffects,
1357 events: TransactionEvents,
1358 ) -> IotaResult<Option<ProcessTransactionResult>> {
1359 match certificate {
1360 Some(certificate) if certificate.epoch() == self.committee.epoch => {
1361 Ok(Some(ProcessTransactionResult::Certified {
1365 certificate,
1366 newly_formed: false,
1367 }))
1368 }
1369 _ => {
1370 let digest = plain_tx_effects.data().digest();
1375 match state.effects_map.insert(digest, plain_tx_effects.clone()) {
1376 InsertResult::NotEnoughVotes {
1377 bad_votes,
1378 bad_authorities,
1379 } => {
1380 state.non_retryable_stake += bad_votes;
1381 if bad_votes > 0 {
1382 state.errors.push((
1383 IotaError::InvalidSignature {
1384 error: "Individual signature verification failed".to_string(),
1385 },
1386 bad_authorities,
1387 bad_votes,
1388 ));
1389 }
1390 Ok(None)
1391 }
1392 InsertResult::Failed { error } => Err(error),
1393 InsertResult::QuorumReached(cert_sig) => {
1394 let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1395 plain_tx_effects.into_data(),
1396 cert_sig,
1397 );
1398 Ok(Some(ProcessTransactionResult::Executed(
1399 ct.verify(&self.committee)?,
1400 events,
1401 )))
1402 }
1403 }
1404 }
1405 }
1406 }
1407
1408 fn record_non_quorum_effects_maybe(
1410 &self,
1411 tx_digest: &TransactionDigest,
1412 mut state: ProcessTransactionState,
1413 ) -> ProcessTransactionState {
1414 if state.effects_map.unique_key_count() > 0 {
1415 let non_quorum_effects = state.effects_map.get_all_unique_values();
1416 warn!(
1417 ?tx_digest,
1418 "Received signed Effects but not with a quorum {:?}", non_quorum_effects
1419 );
1420
1421 let (_most_staked_effects_digest, (_, most_staked_effects_digest_stake)) =
1424 non_quorum_effects
1425 .iter()
1426 .max_by_key(|&(_, (_, stake))| stake)
1427 .unwrap();
1428 if most_staked_effects_digest_stake + self.get_retryable_stake(&state)
1432 < self.committee.quorum_threshold()
1433 {
1434 state.retryable = false;
1435 if state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1436 self.committee.validity_threshold(),
1437 ) {
1438 state.tx_finalized_with_different_user_sig = true;
1439 } else {
1440 error!(
1442 "We have seen signed effects but unable to reach quorum threshold even including retriable stakes. This is very rare. Tx: {tx_digest:?}. Non-quorum effects: {non_quorum_effects:?}."
1443 );
1444 }
1445 }
1446
1447 let mut involved_validators = Vec::new();
1448 let mut total_stake = 0;
1449 for (validators, stake) in non_quorum_effects.values() {
1450 involved_validators.extend_from_slice(validators);
1451 total_stake += stake;
1452 }
1453 state.errors.push((
1457 IotaError::QuorumFailedToGetEffectsQuorumWhenProcessingTransaction {
1458 effects_map: non_quorum_effects,
1459 },
1460 involved_validators,
1461 total_stake,
1462 ));
1463 }
1464 state
1465 }
1466
1467 fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
1469 self.committee.total_votes()
1470 - state.non_retryable_stake
1471 - state.effects_map.total_votes()
1472 - state.tx_signatures.total_votes()
1473 }
1474
1475 #[instrument(level = "trace", skip_all)]
1478 pub async fn process_certificate(
1479 &self,
1480 request: HandleCertificateRequestV1,
1481 client_addr: Option<SocketAddr>,
1482 ) -> Result<QuorumDriverResponse, AggregatorProcessCertificateError> {
1483 let state = ProcessCertificateState {
1484 effects_map: MultiStakeAggregator::new(self.committee.clone()),
1485 non_retryable_stake: 0,
1486 non_retryable_errors: vec![],
1487 retryable_errors: vec![],
1488 retryable: true,
1489 events: None,
1490 input_objects: None,
1491 output_objects: None,
1492 auxiliary_data: None,
1493 request: request.clone(),
1494 };
1495
1496 let validators_to_sample =
1499 if request.include_input_objects || request.include_output_objects {
1500 const NUMBER_TO_SAMPLE: usize = 10;
1502
1503 self.committee
1504 .choose_multiple_weighted_iter(NUMBER_TO_SAMPLE)
1505 .cloned()
1506 .collect()
1507 } else {
1508 HashSet::new()
1509 };
1510
1511 let tx_digest = *request.certificate.digest();
1512 let timeout_after_quorum = self.timeouts.post_quorum_timeout;
1513
1514 let request_ref = request;
1515 let threshold = self.committee.quorum_threshold();
1516 let validity = self.committee.validity_threshold();
1517
1518 debug!(
1519 ?tx_digest,
1520 quorum_threshold = threshold,
1521 validity_threshold = validity,
1522 ?timeout_after_quorum,
1523 "Broadcasting certificate to authorities"
1524 );
1525 let committee: Arc<Committee> = self.committee.clone();
1526 let authority_clients = self.authority_clients.clone();
1527 let metrics = self.metrics.clone();
1528 let metrics_clone = metrics.clone();
1529 let validator_display_names = self.validator_display_names.clone();
1530 let (result, mut remaining_tasks) = quorum_map_then_reduce_with_timeout(
1531 committee.clone(),
1532 authority_clients.clone(),
1533 state,
1534 move |name, client| {
1535 Box::pin(async move {
1536 let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
1537 let concise_name = name.concise_owned();
1538 if request_ref.include_input_objects || request_ref.include_output_objects {
1539
1540 let req = if validators_to_sample.contains(&name) {
1542 request_ref
1543 } else {
1544 HandleCertificateRequestV1 {
1545 include_input_objects: false,
1546 include_output_objects: false,
1547 include_auxiliary_data: false,
1548 ..request_ref
1549 }
1550 };
1551
1552 client
1553 .handle_certificate_v1(req, client_addr)
1554 .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1555 .await
1556 } else {
1557 client
1558 .handle_certificate_v1(HandleCertificateRequestV1::new(request_ref.certificate).with_events(), client_addr)
1559 .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1560 .await
1561 .map(|response| HandleCertificateResponseV1 {
1562 signed_effects: response.signed_effects,
1563 events: response.events,
1564 input_objects: None,
1565 output_objects: None,
1566 auxiliary_data: None,
1567 })
1568 }
1569 })
1570 },
1571 move |mut state, name, weight, response| {
1572 let committee_clone = committee.clone();
1573 let metrics = metrics.clone();
1574 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1575 Box::pin(async move {
1576 match AuthorityAggregator::<A>::handle_process_certificate_response(
1579 committee_clone,
1580 &metrics,
1581 &tx_digest, &mut state, response, name)
1582 {
1583 Ok(Some(effects)) => ReduceOutput::Success(effects),
1584 Ok(None) => {
1585 if state.non_retryable_stake >= validity {
1589 state.retryable = false;
1590 ReduceOutput::Failed(state)
1591 } else {
1592 ReduceOutput::Continue(state)
1593 }
1594 },
1595 Err(err) => {
1596 let concise_name = name.concise();
1597 debug!(?tx_digest, name=?concise_name, "Error processing certificate from validator: {:?}", err);
1598 metrics
1599 .process_cert_errors
1600 .with_label_values(&[display_name.as_str(), err.as_ref()])
1601 .inc();
1602 Self::record_rpc_error_maybe(metrics, &display_name, &err);
1603 let (retryable, categorized) = err.is_retryable();
1604 if !categorized {
1605 error!(?tx_digest, "[WATCHOUT] uncategorized tx error: {err}");
1608 }
1609 if !retryable {
1610 state.non_retryable_stake += weight;
1611 state.non_retryable_errors.push((err, vec![name], weight));
1612 } else {
1613 state.retryable_errors.push((err, vec![name], weight));
1614 }
1615 if state.non_retryable_stake >= validity {
1616 state.retryable = false;
1617 ReduceOutput::Failed(state)
1618 } else {
1619 ReduceOutput::Continue(state)
1620 }
1621 }
1622 }
1623 })
1624 },
1625 self.timeouts.pre_quorum_timeout,
1627 )
1628 .await
1629 .map_err(|state| {
1630 debug!(
1631 ?tx_digest,
1632 num_unique_effects = state.effects_map.unique_key_count(),
1633 non_retryable_stake = state.non_retryable_stake,
1634 "Received effects responses from validators"
1635 );
1636
1637 for (iota_err, _, _) in state.retryable_errors.iter().chain(state.non_retryable_errors.iter()) {
1639 self
1640 .metrics
1641 .total_aggregated_err
1642 .with_label_values(&[
1643 iota_err.as_ref(),
1644 if state.retryable {
1645 "recoverable"
1646 } else {
1647 "non-recoverable"
1648 },
1649 ])
1650 .inc();
1651 }
1652 if state.retryable {
1653 AggregatorProcessCertificateError::RetryableExecuteCertificate {
1654 retryable_errors: group_errors(state.retryable_errors),
1655 }
1656 } else {
1657 AggregatorProcessCertificateError::FatalExecuteCertificate {
1658 non_retryable_errors: group_errors(state.non_retryable_errors),
1659 }
1660 }
1661 })?;
1662
1663 let metrics = self.metrics.clone();
1664 metrics
1665 .remaining_tasks_when_reaching_cert_quorum
1666 .observe(remaining_tasks.len() as f64);
1667 if !remaining_tasks.is_empty() {
1668 spawn_monitored_task!(async move {
1670 let mut timeout = Box::pin(sleep(timeout_after_quorum));
1671 loop {
1672 tokio::select! {
1673 _ = &mut timeout => {
1674 debug!(?tx_digest, "Timed out in post quorum cert broadcasting: {:?}. Remaining tasks: {:?}", timeout_after_quorum, remaining_tasks.len());
1675 metrics.cert_broadcasting_post_quorum_timeout.inc();
1676 metrics.remaining_tasks_when_cert_broadcasting_post_quorum_timeout.observe(remaining_tasks.len() as f64);
1677 break;
1678 }
1679 res = remaining_tasks.next() => {
1680 if res.is_none() {
1681 break;
1682 }
1683 }
1684 }
1685 }
1686 });
1687 }
1688 Ok(result)
1689 }
1690
1691 fn handle_process_certificate_response(
1693 committee: Arc<Committee>,
1694 metrics: &AuthAggMetrics,
1695 tx_digest: &TransactionDigest,
1696 state: &mut ProcessCertificateState,
1697 response: IotaResult<HandleCertificateResponseV1>,
1698 name: AuthorityName,
1699 ) -> IotaResult<Option<QuorumDriverResponse>> {
1700 match response {
1701 Ok(HandleCertificateResponseV1 {
1702 signed_effects,
1703 events,
1704 input_objects,
1705 output_objects,
1706 auxiliary_data,
1707 }) => {
1708 debug!(
1709 ?tx_digest,
1710 name = ?name.concise(),
1711 "Validator handled certificate successfully",
1712 );
1713
1714 if events.is_some() && state.events.is_none() {
1715 state.events = events;
1716 }
1717
1718 if input_objects.is_some() && state.input_objects.is_none() {
1719 state.input_objects = input_objects;
1720 }
1721
1722 if output_objects.is_some() && state.output_objects.is_none() {
1723 state.output_objects = output_objects;
1724 }
1725
1726 if auxiliary_data.is_some() && state.auxiliary_data.is_none() {
1727 state.auxiliary_data = auxiliary_data;
1728 }
1729
1730 let effects_digest = *signed_effects.digest();
1731 match state.effects_map.insert(
1733 (signed_effects.epoch(), effects_digest),
1734 signed_effects.clone(),
1735 ) {
1736 InsertResult::NotEnoughVotes {
1737 bad_votes,
1738 bad_authorities,
1739 } => {
1740 state.non_retryable_stake += bad_votes;
1741 if bad_votes > 0 {
1742 state.non_retryable_errors.push((
1743 IotaError::InvalidSignature {
1744 error: "Individual signature verification failed".to_string(),
1745 },
1746 bad_authorities,
1747 bad_votes,
1748 ));
1749 }
1750 Ok(None)
1751 }
1752 InsertResult::Failed { error } => Err(error),
1753 InsertResult::QuorumReached(cert_sig) => {
1754 let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1755 signed_effects.into_data(),
1756 cert_sig,
1757 );
1758
1759 if (state.request.include_input_objects && state.input_objects.is_none())
1760 || (state.request.include_output_objects
1761 && state.output_objects.is_none())
1762 {
1763 metrics.quorum_reached_without_requested_objects.inc();
1764 debug!(
1765 ?tx_digest,
1766 "Quorum Reached but requested input/output objects were not returned"
1767 );
1768 }
1769
1770 ct.verify(&committee).map(|ct| {
1771 debug!(?tx_digest, "Got quorum for validators handle_certificate.");
1772 Some(QuorumDriverResponse {
1773 effects_cert: ct,
1774 events: state.events.take(),
1775 input_objects: state.input_objects.take(),
1776 output_objects: state.output_objects.take(),
1777 auxiliary_data: state.auxiliary_data.take(),
1778 })
1779 })
1780 }
1781 }
1782 }
1783 Err(err) => Err(err),
1784 }
1785 }
1786
1787 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1788 pub async fn execute_transaction_block(
1789 &self,
1790 transaction: &Transaction,
1791 client_addr: Option<SocketAddr>,
1792 ) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
1793 let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
1794 let result = self
1795 .process_transaction(transaction.clone(), client_addr)
1796 .await?;
1797 let cert = match result {
1798 ProcessTransactionResult::Certified { certificate, .. } => certificate,
1799 ProcessTransactionResult::Executed(effects, _) => {
1800 return Ok(effects);
1801 }
1802 };
1803 self.metrics.total_tx_certificates_created.inc();
1804 drop(tx_guard);
1805
1806 let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
1807 let response = self
1808 .process_certificate(
1809 HandleCertificateRequestV1 {
1810 certificate: cert.clone(),
1811 include_events: true,
1812 include_input_objects: false,
1813 include_output_objects: false,
1814 include_auxiliary_data: false,
1815 },
1816 client_addr,
1817 )
1818 .await?;
1819
1820 Ok(response.effects_cert)
1821 }
1822
1823 #[instrument(level = "trace", skip_all, fields(?tx_digest))]
1826 pub async fn handle_transaction_info_request_from_some_validators(
1827 &self,
1828 tx_digest: &TransactionDigest,
1829 validators: &BTreeSet<AuthorityName>,
1831 timeout_total: Option<Duration>,
1832 ) -> IotaResult<PlainTransactionInfoResponse> {
1833 self.quorum_once_with_timeout(
1834 None,
1835 Some(validators),
1836 |_authority, client| {
1837 Box::pin(async move {
1838 client
1839 .handle_transaction_info_request(TransactionInfoRequest {
1840 transaction_digest: *tx_digest,
1841 })
1842 .await
1843 })
1844 },
1845 Duration::from_secs(2),
1846 timeout_total,
1847 "handle_transaction_info_request_from_some_validators".to_string(),
1848 )
1849 .await
1850 }
1851}
1852
1853#[derive(Default)]
1856pub struct AuthorityAggregatorBuilder<'a> {
1857 network_config: Option<&'a NetworkConfig>,
1858 genesis: Option<&'a Genesis>,
1859 committee: Option<Committee>,
1860 committee_store: Option<Arc<CommitteeStore>>,
1861 registry: Option<&'a Registry>,
1862 timeouts_config: Option<TimeoutConfig>,
1863}
1864
1865impl<'a> AuthorityAggregatorBuilder<'a> {
1866 pub fn from_network_config(config: &'a NetworkConfig) -> Self {
1868 Self {
1869 network_config: Some(config),
1870 ..Default::default()
1871 }
1872 }
1873
1874 pub fn from_genesis(genesis: &'a Genesis) -> Self {
1876 Self {
1877 genesis: Some(genesis),
1878 ..Default::default()
1879 }
1880 }
1881
1882 pub fn from_committee(committee: Committee) -> Self {
1884 Self {
1885 committee: Some(committee),
1886 ..Default::default()
1887 }
1888 }
1889
1890 pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
1892 self.committee_store = Some(committee_store);
1893 self
1894 }
1895
1896 pub fn with_registry(mut self, registry: &'a Registry) -> Self {
1898 self.registry = Some(registry);
1899 self
1900 }
1901
1902 pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
1904 self.timeouts_config = Some(timeouts_config);
1905 self
1906 }
1907
1908 fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
1909 let genesis = if let Some(network_config) = self.network_config {
1910 &network_config.genesis
1911 } else if let Some(genesis) = self.genesis {
1912 genesis
1913 } else {
1914 panic!("need either NetworkConfig or Genesis.");
1915 };
1916 genesis.committee_with_network()
1917 }
1918
1919 fn get_committee(&self) -> Committee {
1920 self.committee
1921 .clone()
1922 .unwrap_or_else(|| self.get_network_committee().committee().clone())
1923 }
1924
1925 pub fn build_network_clients(
1926 self,
1927 ) -> (
1928 AuthorityAggregator<NetworkAuthorityClient>,
1929 BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
1930 ) {
1931 let network_committee = self.get_network_committee();
1932 let auth_clients = make_authority_clients_with_timeout_config(
1933 &network_committee,
1934 DEFAULT_CONNECT_TIMEOUT_SEC,
1935 DEFAULT_REQUEST_TIMEOUT_SEC,
1936 );
1937 let auth_agg = self.build_custom_clients(auth_clients.clone());
1938 (auth_agg, auth_clients)
1939 }
1940
1941 pub fn build_custom_clients<C: Clone>(
1942 self,
1943 authority_clients: BTreeMap<AuthorityName, C>,
1944 ) -> AuthorityAggregator<C> {
1945 let committee = self.get_committee();
1946 let registry = Registry::new();
1947 let registry = self.registry.unwrap_or(®istry);
1948 let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
1949 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(registry));
1950
1951 let committee_store = self
1952 .committee_store
1953 .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
1954
1955 let timeouts_config = self.timeouts_config.unwrap_or_default();
1956
1957 AuthorityAggregator::new(
1958 committee,
1959 committee_store,
1960 authority_clients,
1961 safe_client_metrics_base,
1962 auth_agg_metrics,
1963 Arc::new(HashMap::new()),
1964 timeouts_config,
1965 )
1966 }
1967}