iota_core/
authority_aggregator.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
8    convert::AsRef,
9    net::SocketAddr,
10    string::ToString,
11    sync::Arc,
12    time::Duration,
13};
14
15use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
16use iota_authority_aggregation::{AsyncResult, ReduceOutput, quorum_map_then_reduce_with_timeout};
17use iota_config::genesis::Genesis;
18use iota_metrics::{GaugeGuard, MonitorCancellation, monitored_future, spawn_monitored_task};
19use iota_network::{
20    DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_iota_network_config,
21};
22use iota_network_stack::config::Config;
23use iota_swarm_config::network_config::NetworkConfig;
24use iota_types::{
25    base_types::*,
26    committee::{Committee, CommitteeTrait, CommitteeWithNetworkMetadata, StakeUnit},
27    crypto::{AuthorityPublicKeyBytes, AuthoritySignInfo},
28    effects::{
29        CertifiedTransactionEffects, SignedTransactionEffects, TransactionEffects,
30        TransactionEvents, VerifiedCertifiedTransactionEffects,
31    },
32    error::{IotaError, IotaResult, UserInputError},
33    fp_ensure,
34    iota_system_state::{
35        IotaSystemState, IotaSystemStateTrait,
36        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
37    },
38    message_envelope::Message,
39    messages_grpc::{
40        HandleCapabilityNotificationRequestV1, HandleCertificateRequestV1,
41        HandleCertificateResponseV1, LayoutGenerationOption, ObjectInfoRequest,
42        TransactionInfoRequest,
43    },
44    messages_safe_client::PlainTransactionInfoResponse,
45    object::Object,
46    quorum_driver_types::{GroupedErrors, QuorumDriverResponse},
47    transaction::*,
48};
49use prometheus::{
50    Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry,
51    register_int_counter_vec_with_registry, register_int_counter_with_registry,
52    register_int_gauge_with_registry,
53};
54use thiserror::Error;
55use tokio::time::{sleep, timeout};
56use tracing::{Instrument, debug, error, info, instrument, trace, trace_span, warn};
57
58use crate::{
59    authority_client::{
60        AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
61        make_network_authority_clients_with_network_config,
62    },
63    epoch::committee_store::CommitteeStore,
64    safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase},
65    stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator},
66};
67
68pub const DEFAULT_RETRIES: usize = 4;
69
70#[cfg(test)]
71#[path = "unit_tests/authority_aggregator_tests.rs"]
72pub mod authority_aggregator_tests;
73
74/// Configuration for timeouts in the authority aggregator.
75#[derive(Clone)]
76pub struct TimeoutConfig {
77    pub pre_quorum_timeout: Duration,
78    pub post_quorum_timeout: Duration,
79
80    // Timeout used to determine when to start a second "serial" request for
81    // quorum_once_with_timeout. If this is set to zero, then
82    // quorum_once_with_timeout becomes completely parallelized.
83    pub serial_authority_request_interval: Duration,
84}
85
86impl Default for TimeoutConfig {
87    fn default() -> Self {
88        Self {
89            pre_quorum_timeout: Duration::from_secs(60),
90            post_quorum_timeout: Duration::from_secs(7),
91            serial_authority_request_interval: Duration::from_millis(1000),
92        }
93    }
94}
95
96/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
97#[derive(Clone)]
98pub struct AuthAggMetrics {
99    pub total_tx_certificates_created: IntCounter,
100    pub process_tx_errors: IntCounterVec,
101    pub process_cert_errors: IntCounterVec,
102    pub total_client_double_spend_attempts_detected: IntCounter,
103    pub total_aggregated_err: IntCounterVec,
104    pub total_rpc_err: IntCounterVec,
105    pub inflight_transactions: IntGauge,
106    pub inflight_certificates: IntGauge,
107    pub inflight_transaction_requests: IntGauge,
108    pub inflight_certificate_requests: IntGauge,
109
110    pub cert_broadcasting_post_quorum_timeout: IntCounter,
111    pub remaining_tasks_when_reaching_cert_quorum: Histogram,
112    pub remaining_tasks_when_cert_broadcasting_post_quorum_timeout: Histogram,
113    pub quorum_reached_without_requested_objects: IntCounter,
114
115    pub capability_notification_success: IntCounter,
116    pub capability_notification_errors: IntCounter,
117}
118
119impl AuthAggMetrics {
120    /// Create a new instance of `AuthAggMetrics` with a Prometheus registry.
121    pub fn new(registry: &prometheus::Registry) -> Self {
122        Self {
123            total_tx_certificates_created: register_int_counter_with_registry!(
124                "total_tx_certificates_created",
125                "Total number of certificates made in the authority_aggregator",
126                registry,
127            )
128            .unwrap(),
129            process_tx_errors: register_int_counter_vec_with_registry!(
130                "process_tx_errors",
131                "Number of errors returned from validators when processing transaction, group by validator name and error type",
132                &["name","error"],
133                registry,
134            )
135            .unwrap(),
136            process_cert_errors: register_int_counter_vec_with_registry!(
137                "process_cert_errors",
138                "Number of errors returned from validators when processing certificate, group by validator name and error type",
139                &["name", "error"],
140                registry,
141            )
142            .unwrap(),
143            total_client_double_spend_attempts_detected: register_int_counter_with_registry!(
144                "total_client_double_spend_attempts_detected",
145                "Total number of client double spend attempts that are detected",
146                registry,
147            )
148            .unwrap(),
149            total_aggregated_err: register_int_counter_vec_with_registry!(
150                "total_aggregated_err",
151                "Total number of errors returned from validators, grouped by error type",
152                &["error", "tx_recoverable"],
153                registry,
154            )
155            .unwrap(),
156            total_rpc_err: register_int_counter_vec_with_registry!(
157                "total_rpc_err",
158                "Total number of rpc errors returned from validators, grouped by validator short name and RPC error message",
159                &["name", "error_message"],
160                registry,
161            )
162            .unwrap(),
163            inflight_transactions: register_int_gauge_with_registry!(
164                "auth_agg_inflight_transactions",
165                "Inflight transaction gathering signatures",
166                registry,
167            )
168            .unwrap(),
169            inflight_certificates: register_int_gauge_with_registry!(
170                "auth_agg_inflight_certificates",
171                "Inflight certificates gathering effects",
172                registry,
173            )
174            .unwrap(),
175            inflight_transaction_requests: register_int_gauge_with_registry!(
176                "auth_agg_inflight_transaction_requests",
177                "Inflight handle_transaction requests",
178                registry,
179            )
180            .unwrap(),
181            inflight_certificate_requests: register_int_gauge_with_registry!(
182                "auth_agg_inflight_certificate_requests",
183                "Inflight handle_certificate requests",
184                registry,
185            )
186            .unwrap(),
187            cert_broadcasting_post_quorum_timeout: register_int_counter_with_registry!(
188                "auth_agg_cert_broadcasting_post_quorum_timeout",
189                "Total number of timeout in cert processing post quorum",
190                registry,
191            )
192            .unwrap(),
193            remaining_tasks_when_reaching_cert_quorum: register_histogram_with_registry!(
194                "auth_agg_remaining_tasks_when_reaching_cert_quorum",
195                "Number of remaining tasks when reaching certificate quorum",
196                registry,
197            ).unwrap(),
198            remaining_tasks_when_cert_broadcasting_post_quorum_timeout: register_histogram_with_registry!(
199                "auth_agg_remaining_tasks_when_cert_broadcasting_post_quorum_timeout",
200                "Number of remaining tasks when post quorum certificate broadcasting times out",
201                registry,
202            ).unwrap(),
203            quorum_reached_without_requested_objects: register_int_counter_with_registry!(
204                "auth_agg_quorum_reached_without_requested_objects",
205                "Number of times quorum was reached without getting the requested objects back from at least 1 validator",
206                registry,
207            )
208            .unwrap(),
209            capability_notification_success: register_int_counter_with_registry!(
210                "capability_notification_success",
211                "Total number of successful capability notifications sent to committee validators",
212                registry,
213            )
214            .unwrap(),
215            capability_notification_errors: register_int_counter_with_registry!(
216                "capability_notification_errors",
217                "Number of errors returned from validators when sending capability notifications",
218                registry,
219            )
220            .unwrap(),
221        }
222    }
223
224    /// Creates a new instance of `AuthAggMetrics` for testing.
225    pub fn new_for_tests() -> Self {
226        let registry = prometheus::Registry::new();
227        Self::new(&registry)
228    }
229}
230
231/// Errors that can occur when processing transactions in an aggregator.
232#[derive(Error, Debug, Eq, PartialEq)]
233pub enum AggregatorProcessTransactionError {
234    #[error(
235        "Failed to execute transaction on a quorum of validators due to non-retryable errors. Validator errors: {:?}",
236        errors
237    )]
238    FatalTransaction { errors: GroupedErrors },
239
240    #[error(
241        "Failed to execute transaction on a quorum of validators but state is still retryable. Validator errors: {:?}",
242        errors
243    )]
244    RetryableTransaction { errors: GroupedErrors },
245
246    #[error(
247        "Failed to execute transaction on a quorum of validators due to conflicting transactions. Locked objects: {:?}. Validator errors: {:?}",
248        conflicting_tx_digests,
249        errors
250    )]
251    FatalConflictingTransaction {
252        errors: GroupedErrors,
253        conflicting_tx_digests:
254            BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
255    },
256
257    #[error(
258        "{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
259        overloaded_stake,
260        errors
261    )]
262    SystemOverload {
263        overloaded_stake: StakeUnit,
264        errors: GroupedErrors,
265    },
266
267    #[error("Transaction is already finalized but with different user signatures")]
268    TxAlreadyFinalizedWithDifferentUserSignatures,
269
270    #[error(
271        "{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
272        overload_stake,
273        retry_after_secs,
274        errors
275    )]
276    SystemOverloadRetryAfter {
277        overload_stake: StakeUnit,
278        errors: GroupedErrors,
279        retry_after_secs: u64,
280    },
281}
282
283#[derive(Error, Debug)]
284pub enum AggregatorSendCapabilityNotificationError {
285    #[error(
286        "Failed to send capability notification to a quorum of validators due to non-retryable errors. Validator errors: {:?}",
287        errors
288    )]
289    NonRetryableNotification { errors: GroupedErrors },
290
291    #[error(
292        "Failed to send capability notification to a quorum of validators but state is still retryable. Validator errors: {:?}",
293        errors
294    )]
295    RetryableNotification { errors: GroupedErrors },
296}
297
298#[derive(Error, Debug)]
299pub enum AggregatorProcessCertificateError {
300    #[error(
301        "Failed to execute certificate on a quorum of validators. Non-retryable errors: {:?}",
302        non_retryable_errors
303    )]
304    FatalExecuteCertificate { non_retryable_errors: GroupedErrors },
305
306    #[error(
307        "Failed to execute certificate on a quorum of validators but state is still retryable. Retryable errors: {:?}",
308        retryable_errors
309    )]
310    RetryableExecuteCertificate { retryable_errors: GroupedErrors },
311}
312
313/// Groups the errors by error type and stake.
314pub fn group_errors(errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>) -> GroupedErrors {
315    #[expect(clippy::mutable_key_type)]
316    let mut grouped_errors = HashMap::new();
317    for (error, names, stake) in errors {
318        let entry = grouped_errors.entry(error).or_insert((0, vec![]));
319        entry.0 += stake;
320        entry.1.extend(
321            names
322                .into_iter()
323                .map(|n| n.concise_owned())
324                .collect::<Vec<_>>(),
325        );
326    }
327    grouped_errors
328        .into_iter()
329        .map(|(e, (s, n))| (e, s, n))
330        .collect()
331}
332
333/// `RetryableOverloadInfo` stores information about the state of overloaded
334/// validators that request clients to retry operations.
335#[derive(Debug, Default)]
336pub struct RetryableOverloadInfo {
337    // Total stake of validators that are overloaded and request client to retry.
338    pub total_stake: StakeUnit,
339
340    // Records requested retry duration by stakes.
341    pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
342}
343
344impl RetryableOverloadInfo {
345    pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
346        self.total_stake += stake;
347        self.stake_requested_retry_after
348            .entry(retry_after)
349            .and_modify(|s| *s += stake)
350            .or_insert(stake);
351    }
352
353    // Gets the duration of retry requested by a quorum of validators with smallest
354    // retry durations.
355    pub fn get_quorum_retry_after(
356        &self,
357        good_stake: StakeUnit,
358        quorum_threshold: StakeUnit,
359    ) -> Duration {
360        if self.stake_requested_retry_after.is_empty() {
361            return Duration::from_secs(0);
362        }
363
364        let mut quorum_stake = good_stake;
365        for (retry_after, stake) in self.stake_requested_retry_after.iter() {
366            quorum_stake += *stake;
367            if quorum_stake >= quorum_threshold {
368                return *retry_after;
369            }
370        }
371        *self.stake_requested_retry_after.last_key_value().unwrap().0
372    }
373}
374
375#[derive(Debug)]
376struct ProcessTransactionState {
377    // The list of signatures gathered at any point
378    tx_signatures: StakeAggregator<AuthoritySignInfo, true>,
379    effects_map: MultiStakeAggregator<TransactionEffectsDigest, TransactionEffects, true>,
380    // The list of errors gathered at any point
381    errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
382    // This is exclusively non-retryable stake.
383    non_retryable_stake: StakeUnit,
384    // This includes both object and package not found iota errors.
385    object_or_package_not_found_stake: StakeUnit,
386    // Validators that are overloaded with txns pending execution.
387    overloaded_stake: StakeUnit,
388    // Validators that are overloaded and request client to retry.
389    retryable_overload_info: RetryableOverloadInfo,
390    // If there are conflicting transactions, we note them down to report to user.
391    conflicting_tx_digests:
392        BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
393    // As long as none of the exit criteria are met we consider the state retryable
394    // 1) >= 2f+1 signatures
395    // 2) >= f+1 non-retryable errors
396    // 3) >= 2f+1 object not found errors
397    retryable: bool,
398    tx_finalized_with_different_user_sig: bool,
399}
400
401impl ProcessTransactionState {
402    /// Records the conflicting transaction, returns `true` if there is any, and
403    /// returns `false` otherwise.
404    pub fn record_conflicting_transaction_if_any(
405        &mut self,
406        validator_name: AuthorityName,
407        weight: StakeUnit,
408        err: &IotaError,
409    ) {
410        if let IotaError::ObjectLockConflict {
411            obj_ref,
412            pending_transaction: transaction,
413        } = err
414        {
415            let (lock_records, total_stake) = self
416                .conflicting_tx_digests
417                .entry(*transaction)
418                .or_insert((Vec::new(), 0));
419            lock_records.push((validator_name, *obj_ref));
420            *total_stake += weight;
421        }
422    }
423
424    /// Checks if the error indicates that the transaction is already finalized.
425    pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
426        &self,
427        validity_threshold: StakeUnit,
428    ) -> bool {
429        // In some edge cases, the client may send the same transaction multiple times
430        // but with different user signatures. When this happens, the "minority"
431        // tx will fail in safe_client because the certificate verification would fail
432        // and return Iota::FailedToVerifyTxCertWithExecutedEffects.
433        // Here, we check if there are f+1 validators return this error. If so, the
434        // transaction is already finalized with a different set of user
435        // signatures. It's not trivial to return the results of that successful
436        // transaction because we don't want fullnode to store the transaction
437        // with non-canonical user signatures. Given that this is very rare, we
438        // simply return an error here.
439        let invalid_sig_stake: StakeUnit = self
440            .errors
441            .iter()
442            .filter_map(|(e, _, stake)| {
443                if matches!(e, IotaError::FailedToVerifyTxCertWithExecutedEffects { .. }) {
444                    Some(stake)
445                } else {
446                    None
447                }
448            })
449            .sum();
450        invalid_sig_stake >= validity_threshold
451    }
452}
453
454/// State for processing a certificate.
455struct ProcessCertificateState {
456    // Different authorities could return different effects.  We want at least one effect to come
457    // from 2f+1 authorities, which meets quorum and can be considered the approved effect.
458    // The map here allows us to count the stake for each unique effect.
459    effects_map:
460        MultiStakeAggregator<(EpochId, TransactionEffectsDigest), TransactionEffects, true>,
461    non_retryable_stake: StakeUnit,
462    non_retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
463    retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
464    // As long as none of the exit criteria are met we consider the state retryable
465    // 1) >= 2f+1 signatures
466    // 2) >= f+1 non-retryable errors
467    retryable: bool,
468
469    // collection of extended data returned from the validators.
470    // Not all validators will be asked to return this data so we need to hold onto it when one
471    // validator has provided it
472    events: Option<TransactionEvents>,
473    input_objects: Option<Vec<Object>>,
474    output_objects: Option<Vec<Object>>,
475    auxiliary_data: Option<Vec<u8>>,
476    request: HandleCertificateRequestV1,
477}
478
479/// The result of processing a transaction.
480#[derive(Debug)]
481pub enum ProcessTransactionResult {
482    Certified {
483        certificate: CertifiedTransaction,
484        /// Whether this certificate is newly created by aggregating 2f+1
485        /// signatures. If a validator returned a cert directly, this
486        /// will be false. This is used to inform the quorum driver,
487        /// which could make better decisions on telemetry
488        /// such as settlement latency.
489        newly_formed: bool,
490    },
491    Executed(VerifiedCertifiedTransactionEffects, TransactionEvents),
492}
493
494impl ProcessTransactionResult {
495    /// Returns the `CertifiedTransaction` if it is a `Certified` variant.
496    pub fn into_cert_for_testing(self) -> CertifiedTransaction {
497        match self {
498            Self::Certified { certificate, .. } => certificate,
499            Self::Executed(..) => panic!("Wrong type"),
500        }
501    }
502
503    /// Returns the `VerifiedCertifiedTransactionEffects` if it is an
504    /// `Executed` variant.
505    pub fn into_effects_for_testing(self) -> VerifiedCertifiedTransactionEffects {
506        match self {
507            Self::Certified { .. } => panic!("Wrong type"),
508            Self::Executed(effects, ..) => effects,
509        }
510    }
511}
512
513/// The AuthorityAggregator is responsible for aggregating the responses from
514/// the validators and determining the final state of the transaction.
515#[derive(Clone)]
516pub struct AuthorityAggregator<A: Clone> {
517    /// Our IOTA committee.
518    pub committee: Arc<Committee>,
519    /// For more human readable metrics reporting.
520    /// It's OK for this map to be empty or missing validators, it then defaults
521    /// to use concise validator public keys.
522    pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
523    /// How to talk to this committee.
524    pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
525    /// Metrics
526    pub metrics: Arc<AuthAggMetrics>,
527    /// Metric base for the purpose of creating new safe clients during
528    /// reconfiguration.
529    pub safe_client_metrics_base: SafeClientMetricsBase,
530    pub timeouts: TimeoutConfig,
531    /// Store here for clone during re-config.
532    pub committee_store: Arc<CommitteeStore>,
533}
534
535impl<A: Clone> AuthorityAggregator<A> {
536    /// Create a new `AuthorityAggregator`.
537    pub fn new(
538        committee: Committee,
539        committee_store: Arc<CommitteeStore>,
540        authority_clients: BTreeMap<AuthorityName, A>,
541        safe_client_metrics_base: SafeClientMetricsBase,
542        auth_agg_metrics: Arc<AuthAggMetrics>,
543        validator_display_names: Arc<HashMap<AuthorityName, String>>,
544        timeouts: TimeoutConfig,
545    ) -> Self {
546        Self {
547            committee: Arc::new(committee),
548            authority_clients: create_safe_clients(
549                authority_clients,
550                &committee_store,
551                &safe_client_metrics_base,
552            ),
553            metrics: auth_agg_metrics,
554            safe_client_metrics_base,
555            timeouts,
556            committee_store,
557            validator_display_names,
558        }
559    }
560
561    /// This function recreates AuthorityAggregator with the given committee.
562    /// It also updates committee store which impacts other of its references.
563    /// When disallow_missing_intermediate_committees is true, it requires the
564    /// new committee needs to be current epoch + 1.
565    /// The function could be used along with `reconfig_from_genesis` to fill in
566    /// all previous epoch's committee info.
567    pub fn recreate_with_net_addresses(
568        &self,
569        committee: CommitteeWithNetworkMetadata,
570        network_config: &Config,
571        disallow_missing_intermediate_committees: bool,
572    ) -> IotaResult<AuthorityAggregator<NetworkAuthorityClient>> {
573        let network_clients =
574            make_network_authority_clients_with_network_config(&committee, network_config);
575
576        let safe_clients = network_clients
577            .into_iter()
578            .map(|(name, api)| {
579                (
580                    name,
581                    Arc::new(SafeClient::new(
582                        api,
583                        self.committee_store.clone(),
584                        name,
585                        SafeClientMetrics::new(&self.safe_client_metrics_base, name),
586                    )),
587                )
588            })
589            .collect::<BTreeMap<_, _>>();
590
591        // TODO: It's likely safer to do the following operations atomically, in case
592        // this function gets called from different threads. It cannot happen
593        // today, but worth the caution.
594        let new_committee = committee.committee().clone();
595        if disallow_missing_intermediate_committees {
596            fp_ensure!(
597                self.committee.epoch + 1 == new_committee.epoch,
598                IotaError::AdvanceEpoch {
599                    error: format!(
600                        "Trying to advance from epoch {} to epoch {}",
601                        self.committee.epoch, new_committee.epoch
602                    )
603                }
604            );
605        }
606        // This call may return error if this committee is already inserted,
607        // which is fine. We should continue to construct the new aggregator.
608        // This is because there may be multiple AuthorityAggregators
609        // or its containers (e.g. Quorum Drivers)  share the same committee
610        // store and all of them need to reconfigure.
611        let _ = self.committee_store.insert_new_committee(&new_committee);
612        Ok(AuthorityAggregator {
613            committee: Arc::new(new_committee),
614            authority_clients: Arc::new(safe_clients),
615            metrics: self.metrics.clone(),
616            timeouts: self.timeouts.clone(),
617            safe_client_metrics_base: self.safe_client_metrics_base.clone(),
618            committee_store: self.committee_store.clone(),
619            validator_display_names: Arc::new(HashMap::new()),
620        })
621    }
622
623    /// Gets the authority client for the given name.
624    pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
625        self.authority_clients.get(name)
626    }
627
628    /// Gets the cloned authority client for the given name.
629    pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
630    where
631        A: Clone,
632    {
633        self.authority_clients[name].clone()
634    }
635
636    /// Gets the cloned `CommitteeStore`.
637    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
638        self.committee_store.clone()
639    }
640
641    /// Gets the cloned `Committee`.
642    pub fn clone_inner_committee_test_only(&self) -> Committee {
643        (*self.committee).clone()
644    }
645
646    /// Get the cloned authority clients.
647    pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
648        (*self.authority_clients)
649            .clone()
650            .into_iter()
651            .map(|(k, v)| (k, (*v).clone()))
652            .collect()
653    }
654}
655
656/// Creates safe clients for each authority.
657fn create_safe_clients<A: Clone>(
658    authority_clients: BTreeMap<AuthorityName, A>,
659    committee_store: &Arc<CommitteeStore>,
660    safe_client_metrics_base: &SafeClientMetricsBase,
661) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
662    Arc::new(
663        authority_clients
664            .into_iter()
665            .map(|(name, api)| {
666                (
667                    name,
668                    Arc::new(SafeClient::new(
669                        api,
670                        committee_store.clone(),
671                        name,
672                        SafeClientMetrics::new(safe_client_metrics_base, name),
673                    )),
674                )
675            })
676            .collect(),
677    )
678}
679
680impl AuthorityAggregator<NetworkAuthorityClient> {
681    /// Create a new network authority aggregator by reading the committee and
682    /// network addresses information from the given epoch start system
683    /// state.
684    pub fn new_from_epoch_start_state(
685        epoch_start_state: &EpochStartSystemState,
686        committee_store: &Arc<CommitteeStore>,
687        safe_client_metrics_base: SafeClientMetricsBase,
688        auth_agg_metrics: Arc<AuthAggMetrics>,
689    ) -> Self {
690        let committee = epoch_start_state.get_iota_committee_with_network_metadata();
691        let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
692        Self::new_from_committee(
693            committee,
694            committee_store,
695            safe_client_metrics_base,
696            auth_agg_metrics,
697            Arc::new(validator_display_names),
698        )
699    }
700
701    /// Create a new AuthorityAggregator using information from the given epoch
702    /// start system state. This is typically used during reconfiguration to
703    /// create a new AuthorityAggregator with the new committee and network
704    /// addresses.
705    pub fn recreate_with_new_epoch_start_state(
706        &self,
707        epoch_start_state: &EpochStartSystemState,
708    ) -> Self {
709        Self::new_from_epoch_start_state(
710            epoch_start_state,
711            &self.committee_store,
712            self.safe_client_metrics_base.clone(),
713            self.metrics.clone(),
714        )
715    }
716
717    pub fn new_from_committee(
718        committee: CommitteeWithNetworkMetadata,
719        committee_store: &Arc<CommitteeStore>,
720        safe_client_metrics_base: SafeClientMetricsBase,
721        auth_agg_metrics: Arc<AuthAggMetrics>,
722        validator_display_names: Arc<HashMap<AuthorityName, String>>,
723    ) -> Self {
724        let net_config = default_iota_network_config();
725        let authority_clients =
726            make_network_authority_clients_with_network_config(&committee, &net_config);
727        Self::new(
728            committee.committee().clone(),
729            committee_store.clone(),
730            authority_clients,
731            safe_client_metrics_base,
732            auth_agg_metrics,
733            validator_display_names,
734            Default::default(),
735        )
736    }
737}
738
739impl<A> AuthorityAggregator<A>
740where
741    A: AuthorityAPI + Send + Sync + 'static + Clone,
742{
743    // Repeatedly calls the provided closure on a randomly selected validator until
744    // it succeeds. Once all validators have been attempted, starts over at the
745    // beginning. Intended for cases that must eventually succeed as long as the
746    // network is up (or comes back up) eventually.
747    async fn quorum_once_inner<'a, S, FMap>(
748        &'a self,
749        // try these authorities first
750        preferences: Option<&BTreeSet<AuthorityName>>,
751        // only attempt from these authorities.
752        restrict_to: Option<&BTreeSet<AuthorityName>>,
753        // The async function used to apply to each authority. It takes an authority name,
754        // and authority client parameter and returns a Result<V>.
755        map_each_authority: FMap,
756        timeout_each_authority: Duration,
757        authority_errors: &mut HashMap<AuthorityName, IotaError>,
758    ) -> Result<S, IotaError>
759    where
760        FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
761            + Send
762            + Clone
763            + 'a,
764        S: Send,
765    {
766        let start = tokio::time::Instant::now();
767        let mut delay = Duration::from_secs(1);
768        loop {
769            let authorities_shuffled = self.committee.shuffle_by_stake(preferences, restrict_to);
770            let mut authorities_shuffled = authorities_shuffled.iter();
771
772            type RequestResult<S> = Result<Result<S, IotaError>, tokio::time::error::Elapsed>;
773
774            #[expect(clippy::large_enum_variant)]
775            enum Event<S> {
776                StartNext,
777                Request(AuthorityName, RequestResult<S>),
778            }
779
780            let mut futures = FuturesUnordered::<BoxFuture<'a, Event<S>>>::new();
781
782            let start_req = |name: AuthorityName, client: Arc<SafeClient<A>>| {
783                let map_each_authority = map_each_authority.clone();
784                Box::pin(monitored_future!(async move {
785                    trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start, "new request");
786                    let map = map_each_authority(name, client);
787                    Event::Request(name, timeout(timeout_each_authority, map).await)
788                }))
789            };
790
791            let schedule_next = || {
792                let delay = self.timeouts.serial_authority_request_interval;
793                Box::pin(monitored_future!(async move {
794                    sleep(delay).await;
795                    Event::StartNext
796                }))
797            };
798
799            // This process is intended to minimize latency in the face of unreliable
800            // authorities, without creating undue load on authorities.
801            //
802            // The fastest possible process from the
803            // client's point of view would simply be to issue a concurrent request to every
804            // authority and then take the winner - this would create unnecessary load on
805            // authorities.
806            //
807            // The most efficient process from the network's point of view is to do one
808            // request at a time, however if the first validator that the client
809            // contacts is unavailable or slow, the client must wait for the
810            // serial_authority_request_interval period to elapse
811            // before starting its next request.
812            //
813            // So, this process is designed as a compromise between these two extremes.
814            // - We start one request, and schedule another request to begin after
815            //   serial_authority_request_interval.
816            // - Whenever a request finishes, if it succeeded, we return. if it failed, we
817            //   start a new request.
818            // - If serial_authority_request_interval elapses, we begin a new request even
819            //   if the previous one is not finished, and schedule another future request.
820
821            let name = authorities_shuffled.next().ok_or_else(|| {
822                error!(
823                    ?preferences,
824                    ?restrict_to,
825                    "Available authorities list is empty."
826                );
827                IotaError::from("Available authorities list is empty")
828            })?;
829            futures.push(start_req(*name, self.authority_clients[name].clone()));
830            futures.push(schedule_next());
831
832            while let Some(res) = futures.next().await {
833                match res {
834                    Event::StartNext => {
835                        trace!(now = ?tokio::time::Instant::now() - start, "eagerly beginning next request");
836                        futures.push(schedule_next());
837                    }
838                    Event::Request(name, res) => {
839                        match res {
840                            // timeout
841                            Err(_) => {
842                                debug!(name=?name.concise(), "authority request timed out");
843                                authority_errors.insert(name, IotaError::Timeout);
844                            }
845                            // request completed
846                            Ok(inner_res) => {
847                                trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start,
848                                       "request completed successfully");
849                                match inner_res {
850                                    Err(e) => authority_errors.insert(name, e),
851                                    Ok(res) => return Ok(res),
852                                };
853                            }
854                        };
855                    }
856                }
857
858                if let Some(next_authority) = authorities_shuffled.next() {
859                    futures.push(start_req(
860                        *next_authority,
861                        self.authority_clients[next_authority].clone(),
862                    ));
863                } else {
864                    break;
865                }
866            }
867
868            info!(
869                ?authority_errors,
870                "quorum_once_with_timeout failed on all authorities, retrying in {:?}", delay
871            );
872            sleep(delay).await;
873            delay = std::cmp::min(delay * 2, Duration::from_secs(5 * 60));
874        }
875    }
876
877    /// Like quorum_map_then_reduce_with_timeout, but for things that need only
878    /// a single successful response, such as fetching a Transaction from
879    /// some authority. This is intended for cases in which byzantine
880    /// authorities can time out or slow-loris, but can't give a false
881    /// answer, because e.g. the digest of the response is known, or a
882    /// quorum-signed object such as a checkpoint has been requested.
883    pub(crate) async fn quorum_once_with_timeout<'a, S, FMap>(
884        &'a self,
885        // try these authorities first
886        preferences: Option<&BTreeSet<AuthorityName>>,
887        // only attempt from these authorities.
888        restrict_to: Option<&BTreeSet<AuthorityName>>,
889        // The async function used to apply to each authority. It takes an authority name,
890        // and authority client parameter and returns a Result<V>.
891        map_each_authority: FMap,
892        timeout_each_authority: Duration,
893        // When to give up on the attempt entirely.
894        timeout_total: Option<Duration>,
895        // The behavior that authorities expect to perform, used for logging and error
896        description: String,
897    ) -> Result<S, IotaError>
898    where
899        FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
900            + Send
901            + Clone
902            + 'a,
903        S: Send,
904    {
905        let mut authority_errors = HashMap::new();
906
907        let fut = self.quorum_once_inner(
908            preferences,
909            restrict_to,
910            map_each_authority,
911            timeout_each_authority,
912            &mut authority_errors,
913        );
914
915        if let Some(t) = timeout_total {
916            timeout(t, fut).await.map_err(|_timeout_error| {
917                if authority_errors.is_empty() {
918                    IotaError::Timeout
919                } else {
920                    IotaError::TooManyIncorrectAuthorities {
921                        errors: authority_errors
922                            .iter()
923                            .map(|(a, b)| (*a, b.clone()))
924                            .collect(),
925                        action: description,
926                    }
927                }
928            })?
929        } else {
930            fut.await
931        }
932    }
933
934    /// Queries the object with highest version number from the authorities.
935    /// We stop after receiving responses from 2f+1 validators.
936    /// This function is untrusted because we simply assume each response is
937    /// valid and there are no byzantine validators.
938    /// Because of this, this function should only be used for testing or
939    /// benchmarking.
940    pub async fn get_latest_object_version_for_testing(
941        &self,
942        object_id: ObjectID,
943    ) -> IotaResult<Object> {
944        #[derive(Debug, Default)]
945        struct State {
946            latest_object_version: Option<Object>,
947            total_weight: StakeUnit,
948        }
949        let initial_state = State::default();
950        let result = quorum_map_then_reduce_with_timeout(
951                self.committee.clone(),
952                self.authority_clients.clone(),
953                initial_state,
954                |_name, client| {
955                    Box::pin(async move {
956                        let request =
957                            ObjectInfoRequest::latest_object_info_request(object_id, /* generate_layout */ LayoutGenerationOption::None);
958                        client.handle_object_info_request(request).await
959                    })
960                },
961                |mut state, name, weight, result| {
962                    Box::pin(async move {
963                        state.total_weight += weight;
964                        match result {
965                            Ok(object_info) => {
966                                debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
967                                if state.latest_object_version.as_ref().is_none_or(|latest| {
968                                    object_info.object.version() > latest.version()
969                                }) {
970                                    state.latest_object_version = Some(object_info.object);
971                                }
972                            }
973                            Err(err) => {
974                                debug!("Received error from validator {:?}: {:?}", name.concise(), err);
975                            }
976                        };
977                        if state.total_weight >= self.committee.quorum_threshold() {
978                            if let Some(object) = state.latest_object_version {
979                                return ReduceOutput::Success(object);
980                            } else {
981                                return ReduceOutput::Failed(state);
982                            }
983                        }
984                        ReduceOutput::Continue(state)
985                    })
986                },
987                // A long timeout before we hear back from a quorum
988                self.timeouts.pre_quorum_timeout,
989            )
990            .await.map_err(|_state| IotaError::from(UserInputError::ObjectNotFound {
991                object_id,
992                version: None,
993            }))?;
994        Ok(result.0)
995    }
996
997    /// Gets the latest system state object from the authorities.
998    /// This function assumes all validators are honest.
999    /// It should only be used for testing or benchmarking.
1000    pub async fn get_latest_system_state_object_for_testing(
1001        &self,
1002    ) -> anyhow::Result<IotaSystemState> {
1003        #[derive(Debug, Default)]
1004        struct State {
1005            latest_system_state: Option<IotaSystemState>,
1006            total_weight: StakeUnit,
1007        }
1008        let initial_state = State::default();
1009        let result = quorum_map_then_reduce_with_timeout(
1010            self.committee.clone(),
1011            self.authority_clients.clone(),
1012            initial_state,
1013            |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
1014            |mut state, name, weight, result| {
1015                Box::pin(async move {
1016                    state.total_weight += weight;
1017                    match result {
1018                        Ok(system_state) => {
1019                            debug!(
1020                                "Received system state object from validator {:?} with epoch: {:?}",
1021                                name.concise(),
1022                                system_state.epoch()
1023                            );
1024                            if state
1025                                .latest_system_state
1026                                .as_ref()
1027                                .is_none_or(|latest| system_state.epoch() > latest.epoch())
1028                            {
1029                                state.latest_system_state = Some(system_state);
1030                            }
1031                        }
1032                        Err(err) => {
1033                            debug!(
1034                                "Received error from validator {:?}: {:?}",
1035                                name.concise(),
1036                                err
1037                            );
1038                        }
1039                    };
1040                    if state.total_weight >= self.committee.quorum_threshold() {
1041                        if let Some(system_state) = state.latest_system_state {
1042                            return ReduceOutput::Success(system_state);
1043                        } else {
1044                            return ReduceOutput::Failed(state);
1045                        }
1046                    }
1047                    ReduceOutput::Continue(state)
1048                })
1049            },
1050            // A long timeout before we hear back from a quorum
1051            self.timeouts.pre_quorum_timeout,
1052        )
1053        .await
1054        .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
1055        Ok(result.0)
1056    }
1057
1058    /// Submits the transaction to a quorum of validators to make a certificate.
1059    #[instrument(level = "trace", skip_all)]
1060    pub async fn process_transaction(
1061        &self,
1062        transaction: Transaction,
1063        client_addr: Option<SocketAddr>,
1064    ) -> Result<ProcessTransactionResult, AggregatorProcessTransactionError> {
1065        // Now broadcast the transaction to all authorities.
1066        let tx_digest = transaction.digest();
1067        debug!(
1068            tx_digest = ?tx_digest,
1069            "Broadcasting transaction request to authorities"
1070        );
1071        trace!(
1072            "Transaction data: {:?}",
1073            transaction.data().intent_message().value
1074        );
1075        let committee = self.committee.clone();
1076        let state = ProcessTransactionState {
1077            tx_signatures: StakeAggregator::new(committee.clone()),
1078            effects_map: MultiStakeAggregator::new(committee.clone()),
1079            errors: vec![],
1080            object_or_package_not_found_stake: 0,
1081            non_retryable_stake: 0,
1082            overloaded_stake: 0,
1083            retryable_overload_info: Default::default(),
1084            retryable: true,
1085            conflicting_tx_digests: Default::default(),
1086            tx_finalized_with_different_user_sig: false,
1087        };
1088
1089        let transaction_ref = &transaction;
1090        let validity_threshold = committee.validity_threshold();
1091        let quorum_threshold = committee.quorum_threshold();
1092        let validator_display_names = self.validator_display_names.clone();
1093        let result = quorum_map_then_reduce_with_timeout(
1094                committee.clone(),
1095                self.authority_clients.clone(),
1096                state,
1097                |name, client| {
1098                    Box::pin(
1099                        async move {
1100                            let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
1101                            let concise_name = name.concise_owned();
1102                            client.handle_transaction(transaction_ref.clone(), client_addr)
1103                                .monitor_cancellation()
1104                                .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
1105                                .await
1106                        },
1107                    )
1108                },
1109                |mut state, name, weight, response| {
1110                    let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1111                    Box::pin(async move {
1112                        match self.handle_process_transaction_response(
1113                            tx_digest, &mut state, response, name, weight,
1114                        ) {
1115                            Ok(Some(result)) => {
1116                                self.record_process_transaction_metrics(tx_digest, &state);
1117                                return ReduceOutput::Success(result);
1118                            }
1119                            Ok(None) => {},
1120                            Err(err) => {
1121                                let concise_name = name.concise();
1122                                debug!(?tx_digest, name=?concise_name, weight, "Error processing transaction from validator: {:?}", err);
1123                                self.metrics
1124                                    .process_tx_errors
1125                                    .with_label_values(&[display_name.as_str(), err.as_ref()])
1126                                    .inc();
1127                                Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
1128                                // Record conflicting transactions if any to report to user.
1129                                state.record_conflicting_transaction_if_any(name, weight, &err);
1130                                let (retryable, categorized) = err.is_retryable();
1131                                if !categorized {
1132                                    // TODO: Should minimize possible uncategorized errors here
1133                                    // use ERROR for now to make them easier to spot.
1134                                    error!(?tx_digest, "uncategorized tx error: {err}");
1135                                }
1136                                if err.is_object_or_package_not_found() {
1137                                    // Special case for object not found because we can
1138                                    // retry if we have < 2f+1 object not found errors.
1139                                    // However once we reach >= 2f+1 object not found errors
1140                                    // we cannot retry.
1141                                    state.object_or_package_not_found_stake += weight;
1142                                }
1143                                else if err.is_overload() {
1144                                    // Special case for validator overload too. Once we have >= 2f + 1
1145                                    // overloaded validators we consider the system overloaded so we exit
1146                                    // and notify the user.
1147                                    // Note that currently, this overload account for
1148                                    //   - per object queue overload
1149                                    //   - consensus overload
1150                                    state.overloaded_stake += weight;
1151                                }
1152                                else if err.is_retryable_overload() {
1153                                    // Different from above overload error, retryable overload targets authority overload (entire
1154                                    // authority server is overload). In this case, the retry behavior is different from
1155                                    // above that we may perform continuous retry due to that objects may have been locked
1156                                    // in the validator.
1157                                    //
1158                                    // TODO: currently retryable overload and above overload error look redundant. We want to have a unified
1159                                    // code path to handle both overload scenarios.
1160                                    state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
1161                                }
1162                                else if !retryable {
1163                                    state.non_retryable_stake += weight;
1164                                }
1165                                state.errors.push((err, vec![name], weight));
1166
1167                            }
1168                        };
1169
1170                        let retryable_stake = self.get_retryable_stake(&state);
1171                        let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
1172                        if good_stake + retryable_stake < quorum_threshold {
1173                            debug!(
1174                                tx_digest = ?tx_digest,
1175                                good_stake,
1176                                retryable_stake,
1177                                "No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
1178                                state.conflicting_tx_digests
1179                            );
1180                            // If there is no chance for any tx to get quorum, exit.
1181                            state.retryable = false;
1182                            return ReduceOutput::Failed(state);
1183                        }
1184
1185                        // TODO: add more comments to explain each condition.
1186                        if state.non_retryable_stake >= validity_threshold
1187                            || state.object_or_package_not_found_stake >= quorum_threshold // In normal case, object/package not found should be more than f+1
1188                            || state.overloaded_stake >= quorum_threshold {
1189                            // We have hit an exit condition, f+1 non-retryable err or 2f+1 object not found or overload,
1190                            // so we no longer consider the transaction state as retryable.
1191                            state.retryable = false;
1192                            ReduceOutput::Failed(state)
1193                        } else {
1194                            ReduceOutput::Continue(state)
1195                        }
1196                    })
1197                },
1198                // A long timeout before we hear back from a quorum
1199                self.timeouts.pre_quorum_timeout,
1200            )
1201            .await;
1202
1203        match result {
1204            Ok((result, _)) => Ok(result),
1205            Err(state) => {
1206                self.record_process_transaction_metrics(tx_digest, &state);
1207                let state = self.record_non_quorum_effects_maybe(tx_digest, state);
1208                Err(self.handle_process_transaction_error(state))
1209            }
1210        }
1211    }
1212
1213    /// Records the rpc error if it is.
1214    fn record_rpc_error_maybe(metrics: Arc<AuthAggMetrics>, display_name: &str, error: &IotaError) {
1215        if let IotaError::Rpc(_message, code) = error {
1216            metrics
1217                .total_rpc_err
1218                .with_label_values(&[display_name, code.as_str()])
1219                .inc();
1220        }
1221    }
1222
1223    /// Handles the transaction processing error.
1224    fn handle_process_transaction_error(
1225        &self,
1226        state: ProcessTransactionState,
1227    ) -> AggregatorProcessTransactionError {
1228        let quorum_threshold = self.committee.quorum_threshold();
1229
1230        // Return system overload error if we see >= 2f + 1 overloaded stake.
1231        if state.overloaded_stake >= quorum_threshold {
1232            return AggregatorProcessTransactionError::SystemOverload {
1233                overloaded_stake: state.overloaded_stake,
1234                errors: group_errors(state.errors),
1235            };
1236        }
1237
1238        if !state.retryable {
1239            if state.tx_finalized_with_different_user_sig
1240                || state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1241                    self.committee.validity_threshold(),
1242                )
1243            {
1244                return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
1245            }
1246
1247            // Handle conflicts first as `FatalConflictingTransaction` which is
1248            // more meaningful than `FatalTransaction`
1249            if !state.conflicting_tx_digests.is_empty() {
1250                let good_stake = state.tx_signatures.total_votes();
1251                warn!(
1252                    ?state.conflicting_tx_digests,
1253                    original_tx_stake = good_stake,
1254                    "Client double spend attempt detected!",
1255                );
1256                self.metrics
1257                    .total_client_double_spend_attempts_detected
1258                    .inc();
1259                return AggregatorProcessTransactionError::FatalConflictingTransaction {
1260                    errors: group_errors(state.errors),
1261                    conflicting_tx_digests: state.conflicting_tx_digests,
1262                };
1263            }
1264
1265            return AggregatorProcessTransactionError::FatalTransaction {
1266                errors: group_errors(state.errors),
1267            };
1268        }
1269
1270        // When state is in a retryable state and process transaction was not
1271        // successful, it indicates that we have heard from *all* validators.
1272        // Check if any SystemOverloadRetryAfter error caused the txn
1273        // to fail. If so, return explicit SystemOverloadRetryAfter error for continuous
1274        // retry (since objects are locked in validators). If not, retry regular
1275        // RetryableTransaction error.
1276        if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
1277            >= quorum_threshold
1278        {
1279            let retry_after_secs = state
1280                .retryable_overload_info
1281                .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
1282                .as_secs();
1283            return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
1284                overload_stake: state.retryable_overload_info.total_stake,
1285                errors: group_errors(state.errors),
1286                retry_after_secs,
1287            };
1288        }
1289
1290        // The system is not overloaded and transaction state is still retryable.
1291        AggregatorProcessTransactionError::RetryableTransaction {
1292            errors: group_errors(state.errors),
1293        }
1294    }
1295
1296    /// Debug logs the transaction processing metrics.
1297    fn record_process_transaction_metrics(
1298        &self,
1299        tx_digest: &TransactionDigest,
1300        state: &ProcessTransactionState,
1301    ) {
1302        let num_signatures = state.tx_signatures.validator_sig_count();
1303        let good_stake = state.tx_signatures.total_votes();
1304        debug!(
1305            ?tx_digest,
1306            num_errors = state.errors.iter().map(|e| e.1.len()).sum::<usize>(),
1307            num_unique_errors = state.errors.len(),
1308            ?good_stake,
1309            non_retryable_stake = state.non_retryable_stake,
1310            ?num_signatures,
1311            "Received signatures response from validators handle_transaction"
1312        );
1313        if !state.errors.is_empty() {
1314            debug!(?tx_digest, "Errors received: {:?}", state.errors);
1315        }
1316    }
1317
1318    /// Handles the `PlainTransactionInfoResponse` variants.
1319    fn handle_process_transaction_response(
1320        &self,
1321        tx_digest: &TransactionDigest,
1322        state: &mut ProcessTransactionState,
1323        response: IotaResult<PlainTransactionInfoResponse>,
1324        name: AuthorityName,
1325        weight: StakeUnit,
1326    ) -> IotaResult<Option<ProcessTransactionResult>> {
1327        match response {
1328            Ok(PlainTransactionInfoResponse::Signed(signed)) => {
1329                debug!(?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction");
1330                self.handle_transaction_response_with_signed(state, signed)
1331            }
1332            Ok(PlainTransactionInfoResponse::ExecutedWithCert(cert, effects, events)) => {
1333                debug!(?tx_digest, name=?name.concise(), weight, "Received prev certificate and effects from validator handle_transaction");
1334                self.handle_transaction_response_with_executed(state, Some(cert), effects, events)
1335            }
1336            Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(_, effects, events)) => {
1337                debug!(?tx_digest, name=?name.concise(), weight, "Received prev effects from validator handle_transaction");
1338                self.handle_transaction_response_with_executed(state, None, effects, events)
1339            }
1340            Err(err) => Err(err),
1341        }
1342    }
1343
1344    /// Handles the `SignedTransaction`. based on the transaction signature
1345    /// insertion result.
1346    fn handle_transaction_response_with_signed(
1347        &self,
1348        state: &mut ProcessTransactionState,
1349        plain_tx: SignedTransaction,
1350    ) -> IotaResult<Option<ProcessTransactionResult>> {
1351        match state.tx_signatures.insert(plain_tx.clone()) {
1352            InsertResult::NotEnoughVotes {
1353                bad_votes,
1354                bad_authorities,
1355            } => {
1356                state.non_retryable_stake += bad_votes;
1357                if bad_votes > 0 {
1358                    state.errors.push((
1359                        IotaError::InvalidSignature {
1360                            error: "Individual signature verification failed".to_string(),
1361                        },
1362                        bad_authorities,
1363                        bad_votes,
1364                    ));
1365                }
1366                Ok(None)
1367            }
1368            InsertResult::Failed { error } => Err(error),
1369            InsertResult::QuorumReached(cert_sig) => {
1370                let certificate =
1371                    CertifiedTransaction::new_from_data_and_sig(plain_tx.into_data(), cert_sig);
1372                certificate.verify_committee_sigs_only(&self.committee)?;
1373                Ok(Some(ProcessTransactionResult::Certified {
1374                    certificate,
1375                    newly_formed: true,
1376                }))
1377            }
1378        }
1379    }
1380
1381    /// Handles `CertifiedTransaction`. Use the certificate if it's in the same
1382    /// epoch. Or update the `ProcessTransactionState` by the
1383    /// `SignedTransactionEffects` if there is no certificate yet.
1384    fn handle_transaction_response_with_executed(
1385        &self,
1386        state: &mut ProcessTransactionState,
1387        certificate: Option<CertifiedTransaction>,
1388        plain_tx_effects: SignedTransactionEffects,
1389        events: TransactionEvents,
1390    ) -> IotaResult<Option<ProcessTransactionResult>> {
1391        match certificate {
1392            Some(certificate) if certificate.epoch() == self.committee.epoch => {
1393                // If we get a certificate in the same epoch, then we use it.
1394                // A certificate in a past epoch does not guarantee finality
1395                // and validators may reject to process it.
1396                Ok(Some(ProcessTransactionResult::Certified {
1397                    certificate,
1398                    newly_formed: false,
1399                }))
1400            }
1401            _ => {
1402                // If we get 2f+1 effects, it's a proof that the transaction
1403                // has already been finalized. This works because validators would re-sign
1404                // effects for transactions that were finalized in previous
1405                // epochs.
1406                let digest = plain_tx_effects.data().digest();
1407                match state.effects_map.insert(digest, plain_tx_effects.clone()) {
1408                    InsertResult::NotEnoughVotes {
1409                        bad_votes,
1410                        bad_authorities,
1411                    } => {
1412                        state.non_retryable_stake += bad_votes;
1413                        if bad_votes > 0 {
1414                            state.errors.push((
1415                                IotaError::InvalidSignature {
1416                                    error: "Individual signature verification failed".to_string(),
1417                                },
1418                                bad_authorities,
1419                                bad_votes,
1420                            ));
1421                        }
1422                        Ok(None)
1423                    }
1424                    InsertResult::Failed { error } => Err(error),
1425                    InsertResult::QuorumReached(cert_sig) => {
1426                        let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1427                            plain_tx_effects.into_data(),
1428                            cert_sig,
1429                        );
1430                        Ok(Some(ProcessTransactionResult::Executed(
1431                            ct.verify(&self.committee)?,
1432                            events,
1433                        )))
1434                    }
1435                }
1436            }
1437        }
1438    }
1439
1440    /// Checks if we have some signed TransactionEffects but not a quorum.
1441    fn record_non_quorum_effects_maybe(
1442        &self,
1443        tx_digest: &TransactionDigest,
1444        mut state: ProcessTransactionState,
1445    ) -> ProcessTransactionState {
1446        if state.effects_map.unique_key_count() > 0 {
1447            let non_quorum_effects = state.effects_map.get_all_unique_values();
1448            warn!(
1449                ?tx_digest,
1450                "Received signed Effects but not with a quorum {:?}", non_quorum_effects
1451            );
1452
1453            // Safe to unwrap because we know that there is at least one entry in the map
1454            // from the check above.
1455            let (_most_staked_effects_digest, (_, most_staked_effects_digest_stake)) =
1456                non_quorum_effects
1457                    .iter()
1458                    .max_by_key(|&(_, (_, stake))| stake)
1459                    .unwrap();
1460            // We check if we have enough retryable stake to get quorum for the most staked
1461            // effects digest, otherwise it indicates we have violated safety assumptions
1462            // or we have forked.
1463            if most_staked_effects_digest_stake + self.get_retryable_stake(&state)
1464                < self.committee.quorum_threshold()
1465            {
1466                state.retryable = false;
1467                if state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1468                    self.committee.validity_threshold(),
1469                ) {
1470                    state.tx_finalized_with_different_user_sig = true;
1471                } else {
1472                    // TODO: Figure out a more reliable way to detect invariance violations.
1473                    error!(
1474                        "We have seen signed effects but unable to reach quorum threshold even including retriable stakes. This is very rare. Tx: {tx_digest:?}. Non-quorum effects: {non_quorum_effects:?}."
1475                    );
1476                }
1477            }
1478
1479            let mut involved_validators = Vec::new();
1480            let mut total_stake = 0;
1481            for (validators, stake) in non_quorum_effects.values() {
1482                involved_validators.extend_from_slice(validators);
1483                total_stake += stake;
1484            }
1485            // TODO: Instead of pushing a new error, we should add more information about
1486            // the non-quorum effects in the final error if state is no longer
1487            // retryable
1488            state.errors.push((
1489                IotaError::QuorumFailedToGetEffectsQuorumWhenProcessingTransaction {
1490                    effects_map: non_quorum_effects,
1491                },
1492                involved_validators,
1493                total_stake,
1494            ));
1495        }
1496        state
1497    }
1498
1499    /// Gets the retryable stake for the transaction.
1500    fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
1501        self.committee.total_votes()
1502            - state.non_retryable_stake
1503            - state.effects_map.total_votes()
1504            - state.tx_signatures.total_votes()
1505    }
1506
1507    /// Processes a given certificate by broadcasting it to authorities and
1508    /// aggregating the results until reaching a quorum.
1509    #[instrument(level = "trace", skip_all)]
1510    pub async fn process_certificate(
1511        &self,
1512        request: HandleCertificateRequestV1,
1513        client_addr: Option<SocketAddr>,
1514    ) -> Result<QuorumDriverResponse, AggregatorProcessCertificateError> {
1515        let state = ProcessCertificateState {
1516            effects_map: MultiStakeAggregator::new(self.committee.clone()),
1517            non_retryable_stake: 0,
1518            non_retryable_errors: vec![],
1519            retryable_errors: vec![],
1520            retryable: true,
1521            events: None,
1522            input_objects: None,
1523            output_objects: None,
1524            auxiliary_data: None,
1525            request: request.clone(),
1526        };
1527
1528        // create a set of validators that we should sample to request input/output
1529        // objects from
1530        let validators_to_sample =
1531            if request.include_input_objects || request.include_output_objects {
1532                // Number of validators to request input/output objects from
1533                const NUMBER_TO_SAMPLE: usize = 10;
1534
1535                self.committee
1536                    .choose_multiple_weighted_iter(NUMBER_TO_SAMPLE)
1537                    .cloned()
1538                    .collect()
1539            } else {
1540                HashSet::new()
1541            };
1542
1543        let tx_digest = *request.certificate.digest();
1544        let timeout_after_quorum = self.timeouts.post_quorum_timeout;
1545
1546        let request_ref = request;
1547        let threshold = self.committee.quorum_threshold();
1548        let validity = self.committee.validity_threshold();
1549
1550        debug!(
1551            ?tx_digest,
1552            quorum_threshold = threshold,
1553            validity_threshold = validity,
1554            ?timeout_after_quorum,
1555            "Broadcasting certificate to authorities"
1556        );
1557        let committee: Arc<Committee> = self.committee.clone();
1558        let authority_clients = self.authority_clients.clone();
1559        let metrics = self.metrics.clone();
1560        let metrics_clone = metrics.clone();
1561        let validator_display_names = self.validator_display_names.clone();
1562        let (result, mut remaining_tasks) = quorum_map_then_reduce_with_timeout(
1563            committee.clone(),
1564            authority_clients.clone(),
1565            state,
1566            move |name, client| {
1567                Box::pin(async move {
1568                    let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
1569                    let concise_name = name.concise_owned();
1570                    if request_ref.include_input_objects || request_ref.include_output_objects {
1571
1572                        // adjust the request to validators we aren't planning on sampling
1573                        let req = if validators_to_sample.contains(&name) {
1574                            request_ref
1575                        } else {
1576                            HandleCertificateRequestV1 {
1577                                include_input_objects: false,
1578                                include_output_objects: false,
1579                                include_auxiliary_data: false,
1580                                ..request_ref
1581                            }
1582                        };
1583
1584                        client
1585                            .handle_certificate_v1(req, client_addr)
1586                            .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1587                            .await
1588                    } else {
1589                        client
1590                            .handle_certificate_v1(HandleCertificateRequestV1::new(request_ref.certificate).with_events(), client_addr)
1591                            .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1592                            .await
1593                            .map(|response| HandleCertificateResponseV1 {
1594                                signed_effects: response.signed_effects,
1595                                events: response.events,
1596                                input_objects: None,
1597                                output_objects: None,
1598                                auxiliary_data: None,
1599                            })
1600                    }
1601                })
1602            },
1603            move |mut state, name, weight, response| {
1604                let committee_clone = committee.clone();
1605                let metrics = metrics.clone();
1606                let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1607                Box::pin(async move {
1608                    // We aggregate the effects response, until we have more than 2f
1609                    // and return.
1610                    match AuthorityAggregator::<A>::handle_process_certificate_response(
1611                        committee_clone,
1612                        &metrics,
1613                        &tx_digest, &mut state, response, name)
1614                    {
1615                        Ok(Some(effects)) => ReduceOutput::Success(effects),
1616                        Ok(None) => {
1617                            // When the result is none, it is possible that the
1618                            // non_retryable_stake had been incremented due to
1619                            // failed individual signature verification.
1620                            if state.non_retryable_stake >= validity {
1621                                state.retryable = false;
1622                                ReduceOutput::Failed(state)
1623                            } else {
1624                                ReduceOutput::Continue(state)
1625                            }
1626                        },
1627                        Err(err) => {
1628                            let concise_name = name.concise();
1629                            debug!(?tx_digest, name=?concise_name, "Error processing certificate from validator: {:?}", err);
1630                            metrics
1631                                .process_cert_errors
1632                                .with_label_values(&[display_name.as_str(), err.as_ref()])
1633                                .inc();
1634                            Self::record_rpc_error_maybe(metrics, &display_name, &err);
1635                            let (retryable, categorized) = err.is_retryable();
1636                            if !categorized {
1637                                // TODO: Should minimize possible uncategorized errors here
1638                                // use ERROR for now to make them easier to spot.
1639                                error!(?tx_digest, "[WATCHOUT] uncategorized tx error: {err}");
1640                            }
1641                            if !retryable {
1642                                state.non_retryable_stake += weight;
1643                                state.non_retryable_errors.push((err, vec![name], weight));
1644                            } else {
1645                                state.retryable_errors.push((err, vec![name], weight));
1646                            }
1647                            if state.non_retryable_stake >= validity {
1648                                state.retryable = false;
1649                                ReduceOutput::Failed(state)
1650                            } else {
1651                                ReduceOutput::Continue(state)
1652                            }
1653                        }
1654                    }
1655                })
1656            },
1657            // A long timeout before we hear back from a quorum
1658            self.timeouts.pre_quorum_timeout,
1659        )
1660        .await
1661        .map_err(|state| {
1662            debug!(
1663                ?tx_digest,
1664                num_unique_effects = state.effects_map.unique_key_count(),
1665                non_retryable_stake = state.non_retryable_stake,
1666                "Received effects responses from validators"
1667            );
1668
1669            // record errors and tx retryable state
1670            for (iota_err, _, _) in state.retryable_errors.iter().chain(state.non_retryable_errors.iter()) {
1671                self
1672                    .metrics
1673                    .total_aggregated_err
1674                    .with_label_values(&[
1675                        iota_err.as_ref(),
1676                        if state.retryable {
1677                            "recoverable"
1678                        } else {
1679                            "non-recoverable"
1680                        },
1681                    ])
1682                    .inc();
1683            }
1684            if state.retryable {
1685                AggregatorProcessCertificateError::RetryableExecuteCertificate {
1686                    retryable_errors: group_errors(state.retryable_errors),
1687                }
1688            } else {
1689                AggregatorProcessCertificateError::FatalExecuteCertificate {
1690                    non_retryable_errors: group_errors(state.non_retryable_errors),
1691                }
1692            }
1693        })?;
1694
1695        let metrics = self.metrics.clone();
1696        metrics
1697            .remaining_tasks_when_reaching_cert_quorum
1698            .observe(remaining_tasks.len() as f64);
1699        if !remaining_tasks.is_empty() {
1700            // Use best efforts to send the cert to remaining validators.
1701            spawn_monitored_task!(async move {
1702                let mut timeout = Box::pin(sleep(timeout_after_quorum));
1703                loop {
1704                    tokio::select! {
1705                        _ = &mut timeout => {
1706                            debug!(?tx_digest, "Timed out in post quorum cert broadcasting: {:?}. Remaining tasks: {:?}", timeout_after_quorum, remaining_tasks.len());
1707                            metrics.cert_broadcasting_post_quorum_timeout.inc();
1708                            metrics.remaining_tasks_when_cert_broadcasting_post_quorum_timeout.observe(remaining_tasks.len() as f64);
1709                            break;
1710                        }
1711                        res = remaining_tasks.next() => {
1712                            if res.is_none() {
1713                                break;
1714                            }
1715                        }
1716                    }
1717                }
1718            });
1719        }
1720        Ok(result)
1721    }
1722
1723    /// Handles the `HandleCertificateResponseV1` variants.
1724    fn handle_process_certificate_response(
1725        committee: Arc<Committee>,
1726        metrics: &AuthAggMetrics,
1727        tx_digest: &TransactionDigest,
1728        state: &mut ProcessCertificateState,
1729        response: IotaResult<HandleCertificateResponseV1>,
1730        name: AuthorityName,
1731    ) -> IotaResult<Option<QuorumDriverResponse>> {
1732        match response {
1733            Ok(HandleCertificateResponseV1 {
1734                signed_effects,
1735                events,
1736                input_objects,
1737                output_objects,
1738                auxiliary_data,
1739            }) => {
1740                debug!(
1741                    ?tx_digest,
1742                    name = ?name.concise(),
1743                    "Validator handled certificate successfully",
1744                );
1745
1746                if events.is_some() && state.events.is_none() {
1747                    state.events = events;
1748                }
1749
1750                if input_objects.is_some() && state.input_objects.is_none() {
1751                    state.input_objects = input_objects;
1752                }
1753
1754                if output_objects.is_some() && state.output_objects.is_none() {
1755                    state.output_objects = output_objects;
1756                }
1757
1758                if auxiliary_data.is_some() && state.auxiliary_data.is_none() {
1759                    state.auxiliary_data = auxiliary_data;
1760                }
1761
1762                let effects_digest = *signed_effects.digest();
1763                // Note: here we aggregate votes by the hash of the effects structure
1764                match state.effects_map.insert(
1765                    (signed_effects.epoch(), effects_digest),
1766                    signed_effects.clone(),
1767                ) {
1768                    InsertResult::NotEnoughVotes {
1769                        bad_votes,
1770                        bad_authorities,
1771                    } => {
1772                        state.non_retryable_stake += bad_votes;
1773                        if bad_votes > 0 {
1774                            state.non_retryable_errors.push((
1775                                IotaError::InvalidSignature {
1776                                    error: "Individual signature verification failed".to_string(),
1777                                },
1778                                bad_authorities,
1779                                bad_votes,
1780                            ));
1781                        }
1782                        Ok(None)
1783                    }
1784                    InsertResult::Failed { error } => Err(error),
1785                    InsertResult::QuorumReached(cert_sig) => {
1786                        let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1787                            signed_effects.into_data(),
1788                            cert_sig,
1789                        );
1790
1791                        if (state.request.include_input_objects && state.input_objects.is_none())
1792                            || (state.request.include_output_objects
1793                                && state.output_objects.is_none())
1794                        {
1795                            metrics.quorum_reached_without_requested_objects.inc();
1796                            debug!(
1797                                ?tx_digest,
1798                                "Quorum Reached but requested input/output objects were not returned"
1799                            );
1800                        }
1801
1802                        ct.verify(&committee).map(|ct| {
1803                            debug!(?tx_digest, "Got quorum for validators handle_certificate.");
1804                            Some(QuorumDriverResponse {
1805                                effects_cert: ct,
1806                                events: state.events.take(),
1807                                input_objects: state.input_objects.take(),
1808                                output_objects: state.output_objects.take(),
1809                                auxiliary_data: state.auxiliary_data.take(),
1810                            })
1811                        })
1812                    }
1813                }
1814            }
1815            Err(err) => Err(err),
1816        }
1817    }
1818
1819    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1820    pub async fn execute_transaction_block(
1821        &self,
1822        transaction: &Transaction,
1823        client_addr: Option<SocketAddr>,
1824    ) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
1825        let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
1826        let result = self
1827            .process_transaction(transaction.clone(), client_addr)
1828            .await?;
1829        let cert = match result {
1830            ProcessTransactionResult::Certified { certificate, .. } => certificate,
1831            ProcessTransactionResult::Executed(effects, _) => {
1832                return Ok(effects);
1833            }
1834        };
1835        self.metrics.total_tx_certificates_created.inc();
1836        drop(tx_guard);
1837
1838        let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
1839        let response = self
1840            .process_certificate(
1841                HandleCertificateRequestV1 {
1842                    certificate: cert.clone(),
1843                    include_events: true,
1844                    include_input_objects: false,
1845                    include_output_objects: false,
1846                    include_auxiliary_data: false,
1847                },
1848                client_addr,
1849            )
1850            .await?;
1851
1852        Ok(response.effects_cert)
1853    }
1854
1855    /// This function tries to get SignedTransaction OR CertifiedTransaction
1856    /// from an given list of validators who are supposed to know about it.
1857    #[instrument(level = "trace", skip_all, fields(?tx_digest))]
1858    pub async fn handle_transaction_info_request_from_some_validators(
1859        &self,
1860        tx_digest: &TransactionDigest,
1861        // authorities known to have the transaction info we are requesting.
1862        validators: &BTreeSet<AuthorityName>,
1863        timeout_total: Option<Duration>,
1864    ) -> IotaResult<PlainTransactionInfoResponse> {
1865        self.quorum_once_with_timeout(
1866            None,
1867            Some(validators),
1868            |_authority, client| {
1869                Box::pin(async move {
1870                    client
1871                        .handle_transaction_info_request(TransactionInfoRequest {
1872                            transaction_digest: *tx_digest,
1873                        })
1874                        .await
1875                })
1876            },
1877            Duration::from_secs(2),
1878            timeout_total,
1879            "handle_transaction_info_request_from_some_validators".to_string(),
1880        )
1881        .await
1882    }
1883
1884    /// Sends signed capability notification to a quorum of committee
1885    /// validators. Uses validity threshold (f+1) to ensure at least one
1886    /// honest node processes it.
1887    #[instrument(level = "trace", skip_all)]
1888    pub async fn send_capability_notification_to_quorum(
1889        &self,
1890        request: HandleCapabilityNotificationRequestV1,
1891    ) -> Result<(), AggregatorSendCapabilityNotificationError> {
1892        #[derive(Debug, Default)]
1893        struct CapabilityNotificationState {
1894            good_responses: StakeUnit,
1895            non_retryable_errors: StakeUnit,
1896            retryable_errors: StakeUnit,
1897            errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
1898        }
1899
1900        let validity_threshold = self.committee.validity_threshold();
1901        let quorum_threshold = self.committee.quorum_threshold();
1902        let validator_display_names = self.validator_display_names.clone();
1903
1904        debug!(
1905            "Sending capability notification to committee validators with validity threshold: {}",
1906            validity_threshold
1907        );
1908
1909        let result = quorum_map_then_reduce_with_timeout(
1910            self.committee.clone(),
1911            self.authority_clients.clone(),
1912            CapabilityNotificationState::default(),
1913            |name, client| {
1914                Box::pin(async move {
1915                    let concise_name = name.concise_owned();
1916                    client
1917                        .authority_client()
1918                        .handle_capability_notification_v1(request.clone())
1919                        .instrument(trace_span!("handle_capability_notification_v1", authority = ?concise_name))
1920                        .await
1921                })
1922            },
1923            |mut state, name, weight, response| {
1924                let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1925                Box::pin(async move {
1926                    match response {
1927                        Ok(_) => {
1928                            debug!(
1929                                authority = ?name.concise(),
1930                                weight,
1931                                "Successfully sent capability notification to committee validator"
1932                            );
1933                            state.good_responses += weight;
1934                            // Check if we've reached validity threshold (f+1) for success
1935                            if state.good_responses >= validity_threshold {
1936                                return ReduceOutput::Success(());
1937                            }
1938                        }
1939                        Err(err) => {
1940                            debug!(
1941                                authority = ?name.concise(),
1942                                weight,
1943                                error = ?err,
1944                                "Failed to send capability notification to committee validator"
1945                            );
1946                            Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
1947
1948                            let (retryable, _categorized) = err.is_retryable();
1949                            if  retryable {
1950                                // Other retryable errors (timeouts, etc.)
1951                                state.retryable_errors += weight;
1952                            } else {
1953                                // Non-retryable errors
1954                                state.non_retryable_errors += weight;
1955                            }
1956                            state.errors.push((err, vec![name], weight));
1957
1958                            // Check if we have reached 2f+1 non-retryable errors OR we have reached 2f+1 total errors, and there is still a chance to reach the validity threshold with retryable errors and good responses.
1959                            if state.non_retryable_errors >= quorum_threshold || (state.non_retryable_errors + state.retryable_errors  >= quorum_threshold && state.good_responses + state.retryable_errors >= validity_threshold) {
1960                                return ReduceOutput::Failed(state);
1961                            }
1962                        }
1963                    }
1964
1965                    ReduceOutput::Continue(state)
1966                })
1967            },
1968            // Use pre_quorum_timeout for capability notifications
1969            self.timeouts.pre_quorum_timeout,
1970        ).await;
1971
1972        match result {
1973            Ok(_) => {
1974                info!("Successfully sent capability notification to quorum of validators");
1975                self.metrics.capability_notification_success.inc();
1976                Ok(())
1977            }
1978            Err(state) => {
1979                warn!(
1980                    good_responses = state.good_responses,
1981                    non_retryable_errors = state.non_retryable_errors,
1982                    retryable_errors = state.retryable_errors,
1983                    validity_threshold,
1984                    quorum_threshold,
1985                    errors = ?state.errors,
1986                    "Failed to reach validity threshold for capability notification"
1987                );
1988                self.metrics.capability_notification_errors.inc();
1989
1990                let grouped_errors = group_errors(state.errors);
1991
1992                // Determine an error type based on which condition was met
1993                if state.non_retryable_errors >= quorum_threshold {
1994                    Err(
1995                        AggregatorSendCapabilityNotificationError::NonRetryableNotification {
1996                            errors: grouped_errors,
1997                        },
1998                    )
1999                } else {
2000                    Err(
2001                        AggregatorSendCapabilityNotificationError::RetryableNotification {
2002                            errors: grouped_errors,
2003                        },
2004                    )
2005                }
2006            }
2007        }
2008    }
2009}
2010
2011/// `AuthorityAggregatorBuilder` is used to build an `AuthorityAggregator` with
2012/// customizable configurations for the IOTA network.
2013#[derive(Default)]
2014pub struct AuthorityAggregatorBuilder<'a> {
2015    network_config: Option<&'a NetworkConfig>,
2016    genesis: Option<&'a Genesis>,
2017    committee: Option<Committee>,
2018    committee_store: Option<Arc<CommitteeStore>>,
2019    registry: Option<&'a Registry>,
2020    timeouts_config: Option<TimeoutConfig>,
2021}
2022
2023impl<'a> AuthorityAggregatorBuilder<'a> {
2024    /// Creates a new `AuthorityAggregatorBuilder` from a `NetworkConfig`.
2025    pub fn from_network_config(config: &'a NetworkConfig) -> Self {
2026        Self {
2027            network_config: Some(config),
2028            ..Default::default()
2029        }
2030    }
2031
2032    /// Creates a new `AuthorityAggregatorBuilder` from a `Genesis`.
2033    pub fn from_genesis(genesis: &'a Genesis) -> Self {
2034        Self {
2035            genesis: Some(genesis),
2036            ..Default::default()
2037        }
2038    }
2039
2040    /// Creates a new `AuthorityAggregatorBuilder` from a `Committee`.
2041    pub fn from_committee(committee: Committee) -> Self {
2042        Self {
2043            committee: Some(committee),
2044            ..Default::default()
2045        }
2046    }
2047
2048    /// Sets the `CommitteeStore`.
2049    pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
2050        self.committee_store = Some(committee_store);
2051        self
2052    }
2053
2054    /// Sets the Prometheus registry.
2055    pub fn with_registry(mut self, registry: &'a Registry) -> Self {
2056        self.registry = Some(registry);
2057        self
2058    }
2059
2060    /// Sets the timeouts configuration.
2061    pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
2062        self.timeouts_config = Some(timeouts_config);
2063        self
2064    }
2065
2066    fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
2067        let genesis = if let Some(network_config) = self.network_config {
2068            &network_config.genesis
2069        } else if let Some(genesis) = self.genesis {
2070            genesis
2071        } else {
2072            panic!("need either NetworkConfig or Genesis.");
2073        };
2074        genesis.committee_with_network()
2075    }
2076
2077    fn get_committee(&self) -> Committee {
2078        self.committee
2079            .clone()
2080            .unwrap_or_else(|| self.get_network_committee().committee().clone())
2081    }
2082
2083    pub fn build_network_clients(
2084        self,
2085    ) -> (
2086        AuthorityAggregator<NetworkAuthorityClient>,
2087        BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
2088    ) {
2089        let network_committee = self.get_network_committee();
2090        let auth_clients = make_authority_clients_with_timeout_config(
2091            &network_committee,
2092            DEFAULT_CONNECT_TIMEOUT_SEC,
2093            DEFAULT_REQUEST_TIMEOUT_SEC,
2094        );
2095        let auth_agg = self.build_custom_clients(auth_clients.clone());
2096        (auth_agg, auth_clients)
2097    }
2098
2099    pub fn build_custom_clients<C: Clone>(
2100        self,
2101        authority_clients: BTreeMap<AuthorityName, C>,
2102    ) -> AuthorityAggregator<C> {
2103        let committee = self.get_committee();
2104        let registry = Registry::new();
2105        let registry = self.registry.unwrap_or(&registry);
2106        let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
2107        let auth_agg_metrics = Arc::new(AuthAggMetrics::new(registry));
2108
2109        let committee_store = self
2110            .committee_store
2111            .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
2112
2113        let timeouts_config = self.timeouts_config.unwrap_or_default();
2114
2115        AuthorityAggregator::new(
2116            committee,
2117            committee_store,
2118            authority_clients,
2119            safe_client_metrics_base,
2120            auth_agg_metrics,
2121            Arc::new(HashMap::new()),
2122            timeouts_config,
2123        )
2124    }
2125}