iota_core/
authority_aggregator.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
8    convert::AsRef,
9    net::SocketAddr,
10    string::ToString,
11    sync::Arc,
12    time::Duration,
13};
14
15use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
16use iota_authority_aggregation::{AsyncResult, ReduceOutput, quorum_map_then_reduce_with_timeout};
17use iota_config::genesis::Genesis;
18use iota_metrics::{GaugeGuard, MonitorCancellation, monitored_future, spawn_monitored_task};
19use iota_network::{
20    DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_iota_network_config,
21};
22use iota_network_stack::config::Config;
23use iota_swarm_config::network_config::NetworkConfig;
24use iota_types::{
25    base_types::*,
26    committee::{Committee, CommitteeTrait, CommitteeWithNetworkMetadata, StakeUnit},
27    crypto::{AuthorityPublicKeyBytes, AuthoritySignInfo},
28    effects::{
29        CertifiedTransactionEffects, SignedTransactionEffects, TransactionEffects,
30        TransactionEvents, VerifiedCertifiedTransactionEffects,
31    },
32    error::{IotaError, IotaResult, UserInputError},
33    fp_ensure,
34    iota_system_state::{
35        IotaSystemState, IotaSystemStateTrait,
36        epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
37    },
38    message_envelope::Message,
39    messages_grpc::{
40        HandleCertificateRequestV1, HandleCertificateResponseV1, LayoutGenerationOption,
41        ObjectInfoRequest, TransactionInfoRequest,
42    },
43    messages_safe_client::PlainTransactionInfoResponse,
44    object::Object,
45    quorum_driver_types::{GroupedErrors, QuorumDriverResponse},
46    transaction::*,
47};
48use prometheus::{
49    Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry,
50    register_int_counter_vec_with_registry, register_int_counter_with_registry,
51    register_int_gauge_with_registry,
52};
53use thiserror::Error;
54use tokio::time::{sleep, timeout};
55use tracing::{Instrument, debug, error, info, instrument, trace, trace_span, warn};
56
57use crate::{
58    authority_client::{
59        AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
60        make_network_authority_clients_with_network_config,
61    },
62    epoch::committee_store::CommitteeStore,
63    safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase},
64    stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator},
65};
66
67pub const DEFAULT_RETRIES: usize = 4;
68
69#[cfg(test)]
70#[path = "unit_tests/authority_aggregator_tests.rs"]
71pub mod authority_aggregator_tests;
72
73/// Configuration for timeouts in the authority aggregator.
74#[derive(Clone)]
75pub struct TimeoutConfig {
76    pub pre_quorum_timeout: Duration,
77    pub post_quorum_timeout: Duration,
78
79    // Timeout used to determine when to start a second "serial" request for
80    // quorum_once_with_timeout. If this is set to zero, then
81    // quorum_once_with_timeout becomes completely parallelized.
82    pub serial_authority_request_interval: Duration,
83}
84
85impl Default for TimeoutConfig {
86    fn default() -> Self {
87        Self {
88            pre_quorum_timeout: Duration::from_secs(60),
89            post_quorum_timeout: Duration::from_secs(7),
90            serial_authority_request_interval: Duration::from_millis(1000),
91        }
92    }
93}
94
95/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
96#[derive(Clone)]
97pub struct AuthAggMetrics {
98    pub total_tx_certificates_created: IntCounter,
99    pub process_tx_errors: IntCounterVec,
100    pub process_cert_errors: IntCounterVec,
101    pub total_client_double_spend_attempts_detected: IntCounter,
102    pub total_aggregated_err: IntCounterVec,
103    pub total_rpc_err: IntCounterVec,
104    pub inflight_transactions: IntGauge,
105    pub inflight_certificates: IntGauge,
106    pub inflight_transaction_requests: IntGauge,
107    pub inflight_certificate_requests: IntGauge,
108
109    pub cert_broadcasting_post_quorum_timeout: IntCounter,
110    pub remaining_tasks_when_reaching_cert_quorum: Histogram,
111    pub remaining_tasks_when_cert_broadcasting_post_quorum_timeout: Histogram,
112    pub quorum_reached_without_requested_objects: IntCounter,
113}
114
115impl AuthAggMetrics {
116    /// Create a new instance of `AuthAggMetrics` with a Prometheus registry.
117    pub fn new(registry: &prometheus::Registry) -> Self {
118        Self {
119            total_tx_certificates_created: register_int_counter_with_registry!(
120                "total_tx_certificates_created",
121                "Total number of certificates made in the authority_aggregator",
122                registry,
123            )
124            .unwrap(),
125            process_tx_errors: register_int_counter_vec_with_registry!(
126                "process_tx_errors",
127                "Number of errors returned from validators when processing transaction, group by validator name and error type",
128                &["name","error"],
129                registry,
130            )
131            .unwrap(),
132            process_cert_errors: register_int_counter_vec_with_registry!(
133                "process_cert_errors",
134                "Number of errors returned from validators when processing certificate, group by validator name and error type",
135                &["name", "error"],
136                registry,
137            )
138            .unwrap(),
139            total_client_double_spend_attempts_detected: register_int_counter_with_registry!(
140                "total_client_double_spend_attempts_detected",
141                "Total number of client double spend attempts that are detected",
142                registry,
143            )
144            .unwrap(),
145            total_aggregated_err: register_int_counter_vec_with_registry!(
146                "total_aggregated_err",
147                "Total number of errors returned from validators, grouped by error type",
148                &["error", "tx_recoverable"],
149                registry,
150            )
151            .unwrap(),
152            total_rpc_err: register_int_counter_vec_with_registry!(
153                "total_rpc_err",
154                "Total number of rpc errors returned from validators, grouped by validator short name and RPC error message",
155                &["name", "error_message"],
156                registry,
157            )
158            .unwrap(),
159            inflight_transactions: register_int_gauge_with_registry!(
160                "auth_agg_inflight_transactions",
161                "Inflight transaction gathering signatures",
162                registry,
163            )
164            .unwrap(),
165            inflight_certificates: register_int_gauge_with_registry!(
166                "auth_agg_inflight_certificates",
167                "Inflight certificates gathering effects",
168                registry,
169            )
170            .unwrap(),
171            inflight_transaction_requests: register_int_gauge_with_registry!(
172                "auth_agg_inflight_transaction_requests",
173                "Inflight handle_transaction requests",
174                registry,
175            )
176            .unwrap(),
177            inflight_certificate_requests: register_int_gauge_with_registry!(
178                "auth_agg_inflight_certificate_requests",
179                "Inflight handle_certificate requests",
180                registry,
181            )
182            .unwrap(),
183            cert_broadcasting_post_quorum_timeout: register_int_counter_with_registry!(
184                "auth_agg_cert_broadcasting_post_quorum_timeout",
185                "Total number of timeout in cert processing post quorum",
186                registry,
187            )
188            .unwrap(),
189            remaining_tasks_when_reaching_cert_quorum: register_histogram_with_registry!(
190                "auth_agg_remaining_tasks_when_reaching_cert_quorum",
191                "Number of remaining tasks when reaching certificate quorum",
192                registry,
193            ).unwrap(),
194            remaining_tasks_when_cert_broadcasting_post_quorum_timeout: register_histogram_with_registry!(
195                "auth_agg_remaining_tasks_when_cert_broadcasting_post_quorum_timeout",
196                "Number of remaining tasks when post quorum certificate broadcasting times out",
197                registry,
198            ).unwrap(),
199            quorum_reached_without_requested_objects: register_int_counter_with_registry!(
200                "auth_agg_quorum_reached_without_requested_objects",
201                "Number of times quorum was reached without getting the requested objects back from at least 1 validator",
202                registry,
203            )
204            .unwrap(),
205        }
206    }
207
208    /// Creates a new instance of `AuthAggMetrics` for testing.
209    pub fn new_for_tests() -> Self {
210        let registry = prometheus::Registry::new();
211        Self::new(&registry)
212    }
213}
214
215/// Errors that can occur when processing transactions in an aggregator.
216#[derive(Error, Debug, Eq, PartialEq)]
217pub enum AggregatorProcessTransactionError {
218    #[error(
219        "Failed to execute transaction on a quorum of validators due to non-retryable errors. Validator errors: {:?}",
220        errors
221    )]
222    FatalTransaction { errors: GroupedErrors },
223
224    #[error(
225        "Failed to execute transaction on a quorum of validators but state is still retryable. Validator errors: {:?}",
226        errors
227    )]
228    RetryableTransaction { errors: GroupedErrors },
229
230    #[error(
231        "Failed to execute transaction on a quorum of validators due to conflicting transactions. Locked objects: {:?}. Validator errors: {:?}",
232        conflicting_tx_digests,
233        errors
234    )]
235    FatalConflictingTransaction {
236        errors: GroupedErrors,
237        conflicting_tx_digests:
238            BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
239    },
240
241    #[error(
242        "{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
243        overloaded_stake,
244        errors
245    )]
246    SystemOverload {
247        overloaded_stake: StakeUnit,
248        errors: GroupedErrors,
249    },
250
251    #[error("Transaction is already finalized but with different user signatures")]
252    TxAlreadyFinalizedWithDifferentUserSignatures,
253
254    #[error(
255        "{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
256        overload_stake,
257        retry_after_secs,
258        errors
259    )]
260    SystemOverloadRetryAfter {
261        overload_stake: StakeUnit,
262        errors: GroupedErrors,
263        retry_after_secs: u64,
264    },
265}
266
267#[derive(Error, Debug)]
268pub enum AggregatorProcessCertificateError {
269    #[error(
270        "Failed to execute certificate on a quorum of validators. Non-retryable errors: {:?}",
271        non_retryable_errors
272    )]
273    FatalExecuteCertificate { non_retryable_errors: GroupedErrors },
274
275    #[error(
276        "Failed to execute certificate on a quorum of validators but state is still retryable. Retryable errors: {:?}",
277        retryable_errors
278    )]
279    RetryableExecuteCertificate { retryable_errors: GroupedErrors },
280}
281
282/// Groups the errors by error type and stake.
283pub fn group_errors(errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>) -> GroupedErrors {
284    #[expect(clippy::mutable_key_type)]
285    let mut grouped_errors = HashMap::new();
286    for (error, names, stake) in errors {
287        let entry = grouped_errors.entry(error).or_insert((0, vec![]));
288        entry.0 += stake;
289        entry.1.extend(
290            names
291                .into_iter()
292                .map(|n| n.concise_owned())
293                .collect::<Vec<_>>(),
294        );
295    }
296    grouped_errors
297        .into_iter()
298        .map(|(e, (s, n))| (e, s, n))
299        .collect()
300}
301
302/// `RetryableOverloadInfo` stores information about the state of overloaded
303/// validators that request clients to retry operations.
304#[derive(Debug, Default)]
305pub struct RetryableOverloadInfo {
306    // Total stake of validators that are overloaded and request client to retry.
307    pub total_stake: StakeUnit,
308
309    // Records requested retry duration by stakes.
310    pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
311}
312
313impl RetryableOverloadInfo {
314    pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
315        self.total_stake += stake;
316        self.stake_requested_retry_after
317            .entry(retry_after)
318            .and_modify(|s| *s += stake)
319            .or_insert(stake);
320    }
321
322    // Gets the duration of retry requested by a quorum of validators with smallest
323    // retry durations.
324    pub fn get_quorum_retry_after(
325        &self,
326        good_stake: StakeUnit,
327        quorum_threshold: StakeUnit,
328    ) -> Duration {
329        if self.stake_requested_retry_after.is_empty() {
330            return Duration::from_secs(0);
331        }
332
333        let mut quorum_stake = good_stake;
334        for (retry_after, stake) in self.stake_requested_retry_after.iter() {
335            quorum_stake += *stake;
336            if quorum_stake >= quorum_threshold {
337                return *retry_after;
338            }
339        }
340        *self.stake_requested_retry_after.last_key_value().unwrap().0
341    }
342}
343
344#[derive(Debug)]
345struct ProcessTransactionState {
346    // The list of signatures gathered at any point
347    tx_signatures: StakeAggregator<AuthoritySignInfo, true>,
348    effects_map: MultiStakeAggregator<TransactionEffectsDigest, TransactionEffects, true>,
349    // The list of errors gathered at any point
350    errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
351    // This is exclusively non-retryable stake.
352    non_retryable_stake: StakeUnit,
353    // This includes both object and package not found iota errors.
354    object_or_package_not_found_stake: StakeUnit,
355    // Validators that are overloaded with txns pending execution.
356    overloaded_stake: StakeUnit,
357    // Validators that are overloaded and request client to retry.
358    retryable_overload_info: RetryableOverloadInfo,
359    // If there are conflicting transactions, we note them down to report to user.
360    conflicting_tx_digests:
361        BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
362    // As long as none of the exit criteria are met we consider the state retryable
363    // 1) >= 2f+1 signatures
364    // 2) >= f+1 non-retryable errors
365    // 3) >= 2f+1 object not found errors
366    retryable: bool,
367    tx_finalized_with_different_user_sig: bool,
368}
369
370impl ProcessTransactionState {
371    /// Records the conflicting transaction, returns `true` if there is any, and
372    /// returns `false` otherwise.
373    pub fn record_conflicting_transaction_if_any(
374        &mut self,
375        validator_name: AuthorityName,
376        weight: StakeUnit,
377        err: &IotaError,
378    ) {
379        if let IotaError::ObjectLockConflict {
380            obj_ref,
381            pending_transaction: transaction,
382        } = err
383        {
384            let (lock_records, total_stake) = self
385                .conflicting_tx_digests
386                .entry(*transaction)
387                .or_insert((Vec::new(), 0));
388            lock_records.push((validator_name, *obj_ref));
389            *total_stake += weight;
390        }
391    }
392
393    /// Checks if the error indicates that the transaction is already finalized.
394    pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
395        &self,
396        validity_threshold: StakeUnit,
397    ) -> bool {
398        // In some edge cases, the client may send the same transaction multiple times
399        // but with different user signatures. When this happens, the "minority"
400        // tx will fail in safe_client because the certificate verification would fail
401        // and return Iota::FailedToVerifyTxCertWithExecutedEffects.
402        // Here, we check if there are f+1 validators return this error. If so, the
403        // transaction is already finalized with a different set of user
404        // signatures. It's not trivial to return the results of that successful
405        // transaction because we don't want fullnode to store the transaction
406        // with non-canonical user signatures. Given that this is very rare, we
407        // simply return an error here.
408        let invalid_sig_stake: StakeUnit = self
409            .errors
410            .iter()
411            .filter_map(|(e, _, stake)| {
412                if matches!(e, IotaError::FailedToVerifyTxCertWithExecutedEffects { .. }) {
413                    Some(stake)
414                } else {
415                    None
416                }
417            })
418            .sum();
419        invalid_sig_stake >= validity_threshold
420    }
421}
422
423/// State for processing a certificate.
424struct ProcessCertificateState {
425    // Different authorities could return different effects.  We want at least one effect to come
426    // from 2f+1 authorities, which meets quorum and can be considered the approved effect.
427    // The map here allows us to count the stake for each unique effect.
428    effects_map:
429        MultiStakeAggregator<(EpochId, TransactionEffectsDigest), TransactionEffects, true>,
430    non_retryable_stake: StakeUnit,
431    non_retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
432    retryable_errors: Vec<(IotaError, Vec<AuthorityName>, StakeUnit)>,
433    // As long as none of the exit criteria are met we consider the state retryable
434    // 1) >= 2f+1 signatures
435    // 2) >= f+1 non-retryable errors
436    retryable: bool,
437
438    // collection of extended data returned from the validators.
439    // Not all validators will be asked to return this data so we need to hold onto it when one
440    // validator has provided it
441    events: Option<TransactionEvents>,
442    input_objects: Option<Vec<Object>>,
443    output_objects: Option<Vec<Object>>,
444    auxiliary_data: Option<Vec<u8>>,
445    request: HandleCertificateRequestV1,
446}
447
448/// The result of processing a transaction.
449#[derive(Debug)]
450pub enum ProcessTransactionResult {
451    Certified {
452        certificate: CertifiedTransaction,
453        /// Whether this certificate is newly created by aggregating 2f+1
454        /// signatures. If a validator returned a cert directly, this
455        /// will be false. This is used to inform the quorum driver,
456        /// which could make better decisions on telemetry
457        /// such as settlement latency.
458        newly_formed: bool,
459    },
460    Executed(VerifiedCertifiedTransactionEffects, TransactionEvents),
461}
462
463impl ProcessTransactionResult {
464    /// Returns the `CertifiedTransaction` if it is a `Certified` variant.
465    pub fn into_cert_for_testing(self) -> CertifiedTransaction {
466        match self {
467            Self::Certified { certificate, .. } => certificate,
468            Self::Executed(..) => panic!("Wrong type"),
469        }
470    }
471
472    /// Returns the `VerifiedCertifiedTransactionEffects` if it is an
473    /// `Executed` variant.
474    pub fn into_effects_for_testing(self) -> VerifiedCertifiedTransactionEffects {
475        match self {
476            Self::Certified { .. } => panic!("Wrong type"),
477            Self::Executed(effects, ..) => effects,
478        }
479    }
480}
481
482/// The AuthorityAggregator is responsible for aggregating the responses from
483/// the validators and determining the final state of the transaction.
484#[derive(Clone)]
485pub struct AuthorityAggregator<A: Clone> {
486    /// Our IOTA committee.
487    pub committee: Arc<Committee>,
488    /// For more human readable metrics reporting.
489    /// It's OK for this map to be empty or missing validators, it then defaults
490    /// to use concise validator public keys.
491    pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
492    /// How to talk to this committee.
493    pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
494    /// Metrics
495    pub metrics: Arc<AuthAggMetrics>,
496    /// Metric base for the purpose of creating new safe clients during
497    /// reconfiguration.
498    pub safe_client_metrics_base: SafeClientMetricsBase,
499    pub timeouts: TimeoutConfig,
500    /// Store here for clone during re-config.
501    pub committee_store: Arc<CommitteeStore>,
502}
503
504impl<A: Clone> AuthorityAggregator<A> {
505    /// Create a new `AuthorityAggregator`.
506    pub fn new(
507        committee: Committee,
508        committee_store: Arc<CommitteeStore>,
509        authority_clients: BTreeMap<AuthorityName, A>,
510        safe_client_metrics_base: SafeClientMetricsBase,
511        auth_agg_metrics: Arc<AuthAggMetrics>,
512        validator_display_names: Arc<HashMap<AuthorityName, String>>,
513        timeouts: TimeoutConfig,
514    ) -> Self {
515        Self {
516            committee: Arc::new(committee),
517            authority_clients: create_safe_clients(
518                authority_clients,
519                &committee_store,
520                &safe_client_metrics_base,
521            ),
522            metrics: auth_agg_metrics,
523            safe_client_metrics_base,
524            timeouts,
525            committee_store,
526            validator_display_names,
527        }
528    }
529
530    /// This function recreates AuthorityAggregator with the given committee.
531    /// It also updates committee store which impacts other of its references.
532    /// When disallow_missing_intermediate_committees is true, it requires the
533    /// new committee needs to be current epoch + 1.
534    /// The function could be used along with `reconfig_from_genesis` to fill in
535    /// all previous epoch's committee info.
536    pub fn recreate_with_net_addresses(
537        &self,
538        committee: CommitteeWithNetworkMetadata,
539        network_config: &Config,
540        disallow_missing_intermediate_committees: bool,
541    ) -> IotaResult<AuthorityAggregator<NetworkAuthorityClient>> {
542        let network_clients =
543            make_network_authority_clients_with_network_config(&committee, network_config);
544
545        let safe_clients = network_clients
546            .into_iter()
547            .map(|(name, api)| {
548                (
549                    name,
550                    Arc::new(SafeClient::new(
551                        api,
552                        self.committee_store.clone(),
553                        name,
554                        SafeClientMetrics::new(&self.safe_client_metrics_base, name),
555                    )),
556                )
557            })
558            .collect::<BTreeMap<_, _>>();
559
560        // TODO: It's likely safer to do the following operations atomically, in case
561        // this function gets called from different threads. It cannot happen
562        // today, but worth the caution.
563        let new_committee = committee.committee().clone();
564        if disallow_missing_intermediate_committees {
565            fp_ensure!(
566                self.committee.epoch + 1 == new_committee.epoch,
567                IotaError::AdvanceEpoch {
568                    error: format!(
569                        "Trying to advance from epoch {} to epoch {}",
570                        self.committee.epoch, new_committee.epoch
571                    )
572                }
573            );
574        }
575        // This call may return error if this committee is already inserted,
576        // which is fine. We should continue to construct the new aggregator.
577        // This is because there may be multiple AuthorityAggregators
578        // or its containers (e.g. Quorum Drivers)  share the same committee
579        // store and all of them need to reconfigure.
580        let _ = self.committee_store.insert_new_committee(&new_committee);
581        Ok(AuthorityAggregator {
582            committee: Arc::new(new_committee),
583            authority_clients: Arc::new(safe_clients),
584            metrics: self.metrics.clone(),
585            timeouts: self.timeouts.clone(),
586            safe_client_metrics_base: self.safe_client_metrics_base.clone(),
587            committee_store: self.committee_store.clone(),
588            validator_display_names: Arc::new(HashMap::new()),
589        })
590    }
591
592    /// Gets the authority client for the given name.
593    pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
594        self.authority_clients.get(name)
595    }
596
597    /// Gets the cloned authority client for the given name.
598    pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
599    where
600        A: Clone,
601    {
602        self.authority_clients[name].clone()
603    }
604
605    /// Gets the cloned `CommitteeStore`.
606    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
607        self.committee_store.clone()
608    }
609
610    /// Gets the cloned `Committee`.
611    pub fn clone_inner_committee_test_only(&self) -> Committee {
612        (*self.committee).clone()
613    }
614
615    /// Get the cloned authority clients.
616    pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
617        (*self.authority_clients)
618            .clone()
619            .into_iter()
620            .map(|(k, v)| (k, (*v).clone()))
621            .collect()
622    }
623}
624
625/// Creates safe clients for each authority.
626fn create_safe_clients<A: Clone>(
627    authority_clients: BTreeMap<AuthorityName, A>,
628    committee_store: &Arc<CommitteeStore>,
629    safe_client_metrics_base: &SafeClientMetricsBase,
630) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
631    Arc::new(
632        authority_clients
633            .into_iter()
634            .map(|(name, api)| {
635                (
636                    name,
637                    Arc::new(SafeClient::new(
638                        api,
639                        committee_store.clone(),
640                        name,
641                        SafeClientMetrics::new(safe_client_metrics_base, name),
642                    )),
643                )
644            })
645            .collect(),
646    )
647}
648
649impl AuthorityAggregator<NetworkAuthorityClient> {
650    /// Create a new network authority aggregator by reading the committee and
651    /// network addresses information from the given epoch start system
652    /// state.
653    pub fn new_from_epoch_start_state(
654        epoch_start_state: &EpochStartSystemState,
655        committee_store: &Arc<CommitteeStore>,
656        safe_client_metrics_base: SafeClientMetricsBase,
657        auth_agg_metrics: Arc<AuthAggMetrics>,
658    ) -> Self {
659        let committee = epoch_start_state.get_iota_committee_with_network_metadata();
660        let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
661        Self::new_from_committee(
662            committee,
663            committee_store,
664            safe_client_metrics_base,
665            auth_agg_metrics,
666            Arc::new(validator_display_names),
667        )
668    }
669
670    /// Create a new AuthorityAggregator using information from the given epoch
671    /// start system state. This is typically used during reconfiguration to
672    /// create a new AuthorityAggregator with the new committee and network
673    /// addresses.
674    pub fn recreate_with_new_epoch_start_state(
675        &self,
676        epoch_start_state: &EpochStartSystemState,
677    ) -> Self {
678        Self::new_from_epoch_start_state(
679            epoch_start_state,
680            &self.committee_store,
681            self.safe_client_metrics_base.clone(),
682            self.metrics.clone(),
683        )
684    }
685
686    pub fn new_from_committee(
687        committee: CommitteeWithNetworkMetadata,
688        committee_store: &Arc<CommitteeStore>,
689        safe_client_metrics_base: SafeClientMetricsBase,
690        auth_agg_metrics: Arc<AuthAggMetrics>,
691        validator_display_names: Arc<HashMap<AuthorityName, String>>,
692    ) -> Self {
693        let net_config = default_iota_network_config();
694        let authority_clients =
695            make_network_authority_clients_with_network_config(&committee, &net_config);
696        Self::new(
697            committee.committee().clone(),
698            committee_store.clone(),
699            authority_clients,
700            safe_client_metrics_base,
701            auth_agg_metrics,
702            validator_display_names,
703            Default::default(),
704        )
705    }
706}
707
708impl<A> AuthorityAggregator<A>
709where
710    A: AuthorityAPI + Send + Sync + 'static + Clone,
711{
712    // Repeatedly calls the provided closure on a randomly selected validator until
713    // it succeeds. Once all validators have been attempted, starts over at the
714    // beginning. Intended for cases that must eventually succeed as long as the
715    // network is up (or comes back up) eventually.
716    async fn quorum_once_inner<'a, S, FMap>(
717        &'a self,
718        // try these authorities first
719        preferences: Option<&BTreeSet<AuthorityName>>,
720        // only attempt from these authorities.
721        restrict_to: Option<&BTreeSet<AuthorityName>>,
722        // The async function used to apply to each authority. It takes an authority name,
723        // and authority client parameter and returns a Result<V>.
724        map_each_authority: FMap,
725        timeout_each_authority: Duration,
726        authority_errors: &mut HashMap<AuthorityName, IotaError>,
727    ) -> Result<S, IotaError>
728    where
729        FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
730            + Send
731            + Clone
732            + 'a,
733        S: Send,
734    {
735        let start = tokio::time::Instant::now();
736        let mut delay = Duration::from_secs(1);
737        loop {
738            let authorities_shuffled = self.committee.shuffle_by_stake(preferences, restrict_to);
739            let mut authorities_shuffled = authorities_shuffled.iter();
740
741            type RequestResult<S> = Result<Result<S, IotaError>, tokio::time::error::Elapsed>;
742
743            enum Event<S> {
744                StartNext,
745                Request(AuthorityName, RequestResult<S>),
746            }
747
748            let mut futures = FuturesUnordered::<BoxFuture<'a, Event<S>>>::new();
749
750            let start_req = |name: AuthorityName, client: Arc<SafeClient<A>>| {
751                let map_each_authority = map_each_authority.clone();
752                Box::pin(monitored_future!(async move {
753                    trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start, "new request");
754                    let map = map_each_authority(name, client);
755                    Event::Request(name, timeout(timeout_each_authority, map).await)
756                }))
757            };
758
759            let schedule_next = || {
760                let delay = self.timeouts.serial_authority_request_interval;
761                Box::pin(monitored_future!(async move {
762                    sleep(delay).await;
763                    Event::StartNext
764                }))
765            };
766
767            // This process is intended to minimize latency in the face of unreliable
768            // authorities, without creating undue load on authorities.
769            //
770            // The fastest possible process from the
771            // client's point of view would simply be to issue a concurrent request to every
772            // authority and then take the winner - this would create unnecessary load on
773            // authorities.
774            //
775            // The most efficient process from the network's point of view is to do one
776            // request at a time, however if the first validator that the client
777            // contacts is unavailable or slow, the client must wait for the
778            // serial_authority_request_interval period to elapse
779            // before starting its next request.
780            //
781            // So, this process is designed as a compromise between these two extremes.
782            // - We start one request, and schedule another request to begin after
783            //   serial_authority_request_interval.
784            // - Whenever a request finishes, if it succeeded, we return. if it failed, we
785            //   start a new request.
786            // - If serial_authority_request_interval elapses, we begin a new request even
787            //   if the previous one is not finished, and schedule another future request.
788
789            let name = authorities_shuffled.next().ok_or_else(|| {
790                error!(
791                    ?preferences,
792                    ?restrict_to,
793                    "Available authorities list is empty."
794                );
795                IotaError::from("Available authorities list is empty")
796            })?;
797            futures.push(start_req(*name, self.authority_clients[name].clone()));
798            futures.push(schedule_next());
799
800            while let Some(res) = futures.next().await {
801                match res {
802                    Event::StartNext => {
803                        trace!(now = ?tokio::time::Instant::now() - start, "eagerly beginning next request");
804                        futures.push(schedule_next());
805                    }
806                    Event::Request(name, res) => {
807                        match res {
808                            // timeout
809                            Err(_) => {
810                                debug!(name=?name.concise(), "authority request timed out");
811                                authority_errors.insert(name, IotaError::Timeout);
812                            }
813                            // request completed
814                            Ok(inner_res) => {
815                                trace!(name=?name.concise(), now = ?tokio::time::Instant::now() - start,
816                                       "request completed successfully");
817                                match inner_res {
818                                    Err(e) => authority_errors.insert(name, e),
819                                    Ok(res) => return Ok(res),
820                                };
821                            }
822                        };
823                    }
824                }
825
826                if let Some(next_authority) = authorities_shuffled.next() {
827                    futures.push(start_req(
828                        *next_authority,
829                        self.authority_clients[next_authority].clone(),
830                    ));
831                } else {
832                    break;
833                }
834            }
835
836            info!(
837                ?authority_errors,
838                "quorum_once_with_timeout failed on all authorities, retrying in {:?}", delay
839            );
840            sleep(delay).await;
841            delay = std::cmp::min(delay * 2, Duration::from_secs(5 * 60));
842        }
843    }
844
845    /// Like quorum_map_then_reduce_with_timeout, but for things that need only
846    /// a single successful response, such as fetching a Transaction from
847    /// some authority. This is intended for cases in which byzantine
848    /// authorities can time out or slow-loris, but can't give a false
849    /// answer, because e.g. the digest of the response is known, or a
850    /// quorum-signed object such as a checkpoint has been requested.
851    pub(crate) async fn quorum_once_with_timeout<'a, S, FMap>(
852        &'a self,
853        // try these authorities first
854        preferences: Option<&BTreeSet<AuthorityName>>,
855        // only attempt from these authorities.
856        restrict_to: Option<&BTreeSet<AuthorityName>>,
857        // The async function used to apply to each authority. It takes an authority name,
858        // and authority client parameter and returns a Result<V>.
859        map_each_authority: FMap,
860        timeout_each_authority: Duration,
861        // When to give up on the attempt entirely.
862        timeout_total: Option<Duration>,
863        // The behavior that authorities expect to perform, used for logging and error
864        description: String,
865    ) -> Result<S, IotaError>
866    where
867        FMap: Fn(AuthorityName, Arc<SafeClient<A>>) -> AsyncResult<'a, S, IotaError>
868            + Send
869            + Clone
870            + 'a,
871        S: Send,
872    {
873        let mut authority_errors = HashMap::new();
874
875        let fut = self.quorum_once_inner(
876            preferences,
877            restrict_to,
878            map_each_authority,
879            timeout_each_authority,
880            &mut authority_errors,
881        );
882
883        if let Some(t) = timeout_total {
884            timeout(t, fut).await.map_err(|_timeout_error| {
885                if authority_errors.is_empty() {
886                    IotaError::Timeout
887                } else {
888                    IotaError::TooManyIncorrectAuthorities {
889                        errors: authority_errors
890                            .iter()
891                            .map(|(a, b)| (*a, b.clone()))
892                            .collect(),
893                        action: description,
894                    }
895                }
896            })?
897        } else {
898            fut.await
899        }
900    }
901
902    /// Queries the object with highest version number from the authorities.
903    /// We stop after receiving responses from 2f+1 validators.
904    /// This function is untrusted because we simply assume each response is
905    /// valid and there are no byzantine validators.
906    /// Because of this, this function should only be used for testing or
907    /// benchmarking.
908    pub async fn get_latest_object_version_for_testing(
909        &self,
910        object_id: ObjectID,
911    ) -> IotaResult<Object> {
912        #[derive(Debug, Default)]
913        struct State {
914            latest_object_version: Option<Object>,
915            total_weight: StakeUnit,
916        }
917        let initial_state = State::default();
918        let result = quorum_map_then_reduce_with_timeout(
919                self.committee.clone(),
920                self.authority_clients.clone(),
921                initial_state,
922                |_name, client| {
923                    Box::pin(async move {
924                        let request =
925                            ObjectInfoRequest::latest_object_info_request(object_id, /* generate_layout */ LayoutGenerationOption::None);
926                        client.handle_object_info_request(request).await
927                    })
928                },
929                |mut state, name, weight, result| {
930                    Box::pin(async move {
931                        state.total_weight += weight;
932                        match result {
933                            Ok(object_info) => {
934                                debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
935                                if state.latest_object_version.as_ref().is_none_or(|latest| {
936                                    object_info.object.version() > latest.version()
937                                }) {
938                                    state.latest_object_version = Some(object_info.object);
939                                }
940                            }
941                            Err(err) => {
942                                debug!("Received error from validator {:?}: {:?}", name.concise(), err);
943                            }
944                        };
945                        if state.total_weight >= self.committee.quorum_threshold() {
946                            if let Some(object) = state.latest_object_version {
947                                return ReduceOutput::Success(object);
948                            } else {
949                                return ReduceOutput::Failed(state);
950                            }
951                        }
952                        ReduceOutput::Continue(state)
953                    })
954                },
955                // A long timeout before we hear back from a quorum
956                self.timeouts.pre_quorum_timeout,
957            )
958            .await.map_err(|_state| IotaError::from(UserInputError::ObjectNotFound {
959                object_id,
960                version: None,
961            }))?;
962        Ok(result.0)
963    }
964
965    /// Gets the latest system state object from the authorities.
966    /// This function assumes all validators are honest.
967    /// It should only be used for testing or benchmarking.
968    pub async fn get_latest_system_state_object_for_testing(
969        &self,
970    ) -> anyhow::Result<IotaSystemState> {
971        #[derive(Debug, Default)]
972        struct State {
973            latest_system_state: Option<IotaSystemState>,
974            total_weight: StakeUnit,
975        }
976        let initial_state = State::default();
977        let result = quorum_map_then_reduce_with_timeout(
978            self.committee.clone(),
979            self.authority_clients.clone(),
980            initial_state,
981            |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
982            |mut state, name, weight, result| {
983                Box::pin(async move {
984                    state.total_weight += weight;
985                    match result {
986                        Ok(system_state) => {
987                            debug!(
988                                "Received system state object from validator {:?} with epoch: {:?}",
989                                name.concise(),
990                                system_state.epoch()
991                            );
992                            if state
993                                .latest_system_state
994                                .as_ref()
995                                .is_none_or(|latest| system_state.epoch() > latest.epoch())
996                            {
997                                state.latest_system_state = Some(system_state);
998                            }
999                        }
1000                        Err(err) => {
1001                            debug!(
1002                                "Received error from validator {:?}: {:?}",
1003                                name.concise(),
1004                                err
1005                            );
1006                        }
1007                    };
1008                    if state.total_weight >= self.committee.quorum_threshold() {
1009                        if let Some(system_state) = state.latest_system_state {
1010                            return ReduceOutput::Success(system_state);
1011                        } else {
1012                            return ReduceOutput::Failed(state);
1013                        }
1014                    }
1015                    ReduceOutput::Continue(state)
1016                })
1017            },
1018            // A long timeout before we hear back from a quorum
1019            self.timeouts.pre_quorum_timeout,
1020        )
1021        .await
1022        .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
1023        Ok(result.0)
1024    }
1025
1026    /// Submits the transaction to a quorum of validators to make a certificate.
1027    #[instrument(level = "trace", skip_all)]
1028    pub async fn process_transaction(
1029        &self,
1030        transaction: Transaction,
1031        client_addr: Option<SocketAddr>,
1032    ) -> Result<ProcessTransactionResult, AggregatorProcessTransactionError> {
1033        // Now broadcast the transaction to all authorities.
1034        let tx_digest = transaction.digest();
1035        debug!(
1036            tx_digest = ?tx_digest,
1037            "Broadcasting transaction request to authorities"
1038        );
1039        trace!(
1040            "Transaction data: {:?}",
1041            transaction.data().intent_message().value
1042        );
1043        let committee = self.committee.clone();
1044        let state = ProcessTransactionState {
1045            tx_signatures: StakeAggregator::new(committee.clone()),
1046            effects_map: MultiStakeAggregator::new(committee.clone()),
1047            errors: vec![],
1048            object_or_package_not_found_stake: 0,
1049            non_retryable_stake: 0,
1050            overloaded_stake: 0,
1051            retryable_overload_info: Default::default(),
1052            retryable: true,
1053            conflicting_tx_digests: Default::default(),
1054            tx_finalized_with_different_user_sig: false,
1055        };
1056
1057        let transaction_ref = &transaction;
1058        let validity_threshold = committee.validity_threshold();
1059        let quorum_threshold = committee.quorum_threshold();
1060        let validator_display_names = self.validator_display_names.clone();
1061        let result = quorum_map_then_reduce_with_timeout(
1062                committee.clone(),
1063                self.authority_clients.clone(),
1064                state,
1065                |name, client| {
1066                    Box::pin(
1067                        async move {
1068                            let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
1069                            let concise_name = name.concise_owned();
1070                            client.handle_transaction(transaction_ref.clone(), client_addr)
1071                                .monitor_cancellation()
1072                                .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
1073                                .await
1074                        },
1075                    )
1076                },
1077                |mut state, name, weight, response| {
1078                    let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1079                    Box::pin(async move {
1080                        match self.handle_process_transaction_response(
1081                            tx_digest, &mut state, response, name, weight,
1082                        ) {
1083                            Ok(Some(result)) => {
1084                                self.record_process_transaction_metrics(tx_digest, &state);
1085                                return ReduceOutput::Success(result);
1086                            }
1087                            Ok(None) => {},
1088                            Err(err) => {
1089                                let concise_name = name.concise();
1090                                debug!(?tx_digest, name=?concise_name, weight, "Error processing transaction from validator: {:?}", err);
1091                                self.metrics
1092                                    .process_tx_errors
1093                                    .with_label_values(&[display_name.as_str(), err.as_ref()])
1094                                    .inc();
1095                                Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
1096                                // Record conflicting transactions if any to report to user.
1097                                state.record_conflicting_transaction_if_any(name, weight, &err);
1098                                let (retryable, categorized) = err.is_retryable();
1099                                if !categorized {
1100                                    // TODO: Should minimize possible uncategorized errors here
1101                                    // use ERROR for now to make them easier to spot.
1102                                    error!(?tx_digest, "uncategorized tx error: {err}");
1103                                }
1104                                if err.is_object_or_package_not_found() {
1105                                    // Special case for object not found because we can
1106                                    // retry if we have < 2f+1 object not found errors.
1107                                    // However once we reach >= 2f+1 object not found errors
1108                                    // we cannot retry.
1109                                    state.object_or_package_not_found_stake += weight;
1110                                }
1111                                else if err.is_overload() {
1112                                    // Special case for validator overload too. Once we have >= 2f + 1
1113                                    // overloaded validators we consider the system overloaded so we exit
1114                                    // and notify the user.
1115                                    // Note that currently, this overload account for
1116                                    //   - per object queue overload
1117                                    //   - consensus overload
1118                                    state.overloaded_stake += weight;
1119                                }
1120                                else if err.is_retryable_overload() {
1121                                    // Different from above overload error, retryable overload targets authority overload (entire
1122                                    // authority server is overload). In this case, the retry behavior is different from
1123                                    // above that we may perform continuous retry due to that objects may have been locked
1124                                    // in the validator.
1125                                    //
1126                                    // TODO: currently retryable overload and above overload error look redundant. We want to have a unified
1127                                    // code path to handle both overload scenarios.
1128                                    state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
1129                                }
1130                                else if !retryable {
1131                                    state.non_retryable_stake += weight;
1132                                }
1133                                state.errors.push((err, vec![name], weight));
1134
1135                            }
1136                        };
1137
1138                        let retryable_stake = self.get_retryable_stake(&state);
1139                        let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
1140                        if good_stake + retryable_stake < quorum_threshold {
1141                            debug!(
1142                                tx_digest = ?tx_digest,
1143                                good_stake,
1144                                retryable_stake,
1145                                "No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
1146                                state.conflicting_tx_digests
1147                            );
1148                            // If there is no chance for any tx to get quorum, exit.
1149                            state.retryable = false;
1150                            return ReduceOutput::Failed(state);
1151                        }
1152
1153                        // TODO: add more comments to explain each condition.
1154                        if state.non_retryable_stake >= validity_threshold
1155                            || state.object_or_package_not_found_stake >= quorum_threshold // In normal case, object/package not found should be more than f+1
1156                            || state.overloaded_stake >= quorum_threshold {
1157                            // We have hit an exit condition, f+1 non-retryable err or 2f+1 object not found or overload,
1158                            // so we no longer consider the transaction state as retryable.
1159                            state.retryable = false;
1160                            ReduceOutput::Failed(state)
1161                        } else {
1162                            ReduceOutput::Continue(state)
1163                        }
1164                    })
1165                },
1166                // A long timeout before we hear back from a quorum
1167                self.timeouts.pre_quorum_timeout,
1168            )
1169            .await;
1170
1171        match result {
1172            Ok((result, _)) => Ok(result),
1173            Err(state) => {
1174                self.record_process_transaction_metrics(tx_digest, &state);
1175                let state = self.record_non_quorum_effects_maybe(tx_digest, state);
1176                Err(self.handle_process_transaction_error(state))
1177            }
1178        }
1179    }
1180
1181    /// Records the rpc error if it is.
1182    fn record_rpc_error_maybe(metrics: Arc<AuthAggMetrics>, display_name: &str, error: &IotaError) {
1183        if let IotaError::Rpc(_message, code) = error {
1184            metrics
1185                .total_rpc_err
1186                .with_label_values(&[display_name, code.as_str()])
1187                .inc();
1188        }
1189    }
1190
1191    /// Handles the transaction processing error.
1192    fn handle_process_transaction_error(
1193        &self,
1194        state: ProcessTransactionState,
1195    ) -> AggregatorProcessTransactionError {
1196        let quorum_threshold = self.committee.quorum_threshold();
1197
1198        // Return system overload error if we see >= 2f + 1 overloaded stake.
1199        if state.overloaded_stake >= quorum_threshold {
1200            return AggregatorProcessTransactionError::SystemOverload {
1201                overloaded_stake: state.overloaded_stake,
1202                errors: group_errors(state.errors),
1203            };
1204        }
1205
1206        if !state.retryable {
1207            if state.tx_finalized_with_different_user_sig
1208                || state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1209                    self.committee.validity_threshold(),
1210                )
1211            {
1212                return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
1213            }
1214
1215            // Handle conflicts first as `FatalConflictingTransaction` which is
1216            // more meaningful than `FatalTransaction`
1217            if !state.conflicting_tx_digests.is_empty() {
1218                let good_stake = state.tx_signatures.total_votes();
1219                warn!(
1220                    ?state.conflicting_tx_digests,
1221                    original_tx_stake = good_stake,
1222                    "Client double spend attempt detected!",
1223                );
1224                self.metrics
1225                    .total_client_double_spend_attempts_detected
1226                    .inc();
1227                return AggregatorProcessTransactionError::FatalConflictingTransaction {
1228                    errors: group_errors(state.errors),
1229                    conflicting_tx_digests: state.conflicting_tx_digests,
1230                };
1231            }
1232
1233            return AggregatorProcessTransactionError::FatalTransaction {
1234                errors: group_errors(state.errors),
1235            };
1236        }
1237
1238        // When state is in a retryable state and process transaction was not
1239        // successful, it indicates that we have heard from *all* validators.
1240        // Check if any SystemOverloadRetryAfter error caused the txn
1241        // to fail. If so, return explicit SystemOverloadRetryAfter error for continuous
1242        // retry (since objects are locked in validators). If not, retry regular
1243        // RetryableTransaction error.
1244        if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
1245            >= quorum_threshold
1246        {
1247            let retry_after_secs = state
1248                .retryable_overload_info
1249                .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
1250                .as_secs();
1251            return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
1252                overload_stake: state.retryable_overload_info.total_stake,
1253                errors: group_errors(state.errors),
1254                retry_after_secs,
1255            };
1256        }
1257
1258        // The system is not overloaded and transaction state is still retryable.
1259        AggregatorProcessTransactionError::RetryableTransaction {
1260            errors: group_errors(state.errors),
1261        }
1262    }
1263
1264    /// Debug logs the transaction processing metrics.
1265    fn record_process_transaction_metrics(
1266        &self,
1267        tx_digest: &TransactionDigest,
1268        state: &ProcessTransactionState,
1269    ) {
1270        let num_signatures = state.tx_signatures.validator_sig_count();
1271        let good_stake = state.tx_signatures.total_votes();
1272        debug!(
1273            ?tx_digest,
1274            num_errors = state.errors.iter().map(|e| e.1.len()).sum::<usize>(),
1275            num_unique_errors = state.errors.len(),
1276            ?good_stake,
1277            non_retryable_stake = state.non_retryable_stake,
1278            ?num_signatures,
1279            "Received signatures response from validators handle_transaction"
1280        );
1281        if !state.errors.is_empty() {
1282            debug!(?tx_digest, "Errors received: {:?}", state.errors);
1283        }
1284    }
1285
1286    /// Handles the `PlainTransactionInfoResponse` variants.
1287    fn handle_process_transaction_response(
1288        &self,
1289        tx_digest: &TransactionDigest,
1290        state: &mut ProcessTransactionState,
1291        response: IotaResult<PlainTransactionInfoResponse>,
1292        name: AuthorityName,
1293        weight: StakeUnit,
1294    ) -> IotaResult<Option<ProcessTransactionResult>> {
1295        match response {
1296            Ok(PlainTransactionInfoResponse::Signed(signed)) => {
1297                debug!(?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction");
1298                self.handle_transaction_response_with_signed(state, signed)
1299            }
1300            Ok(PlainTransactionInfoResponse::ExecutedWithCert(cert, effects, events)) => {
1301                debug!(?tx_digest, name=?name.concise(), weight, "Received prev certificate and effects from validator handle_transaction");
1302                self.handle_transaction_response_with_executed(state, Some(cert), effects, events)
1303            }
1304            Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(_, effects, events)) => {
1305                debug!(?tx_digest, name=?name.concise(), weight, "Received prev effects from validator handle_transaction");
1306                self.handle_transaction_response_with_executed(state, None, effects, events)
1307            }
1308            Err(err) => Err(err),
1309        }
1310    }
1311
1312    /// Handles the `SignedTransaction`. based on the transaction signature
1313    /// insertion result.
1314    fn handle_transaction_response_with_signed(
1315        &self,
1316        state: &mut ProcessTransactionState,
1317        plain_tx: SignedTransaction,
1318    ) -> IotaResult<Option<ProcessTransactionResult>> {
1319        match state.tx_signatures.insert(plain_tx.clone()) {
1320            InsertResult::NotEnoughVotes {
1321                bad_votes,
1322                bad_authorities,
1323            } => {
1324                state.non_retryable_stake += bad_votes;
1325                if bad_votes > 0 {
1326                    state.errors.push((
1327                        IotaError::InvalidSignature {
1328                            error: "Individual signature verification failed".to_string(),
1329                        },
1330                        bad_authorities,
1331                        bad_votes,
1332                    ));
1333                }
1334                Ok(None)
1335            }
1336            InsertResult::Failed { error } => Err(error),
1337            InsertResult::QuorumReached(cert_sig) => {
1338                let certificate =
1339                    CertifiedTransaction::new_from_data_and_sig(plain_tx.into_data(), cert_sig);
1340                certificate.verify_committee_sigs_only(&self.committee)?;
1341                Ok(Some(ProcessTransactionResult::Certified {
1342                    certificate,
1343                    newly_formed: true,
1344                }))
1345            }
1346        }
1347    }
1348
1349    /// Handles `CertifiedTransaction`. Use the certificate if it's in the same
1350    /// epoch. Or update the `ProcessTransactionState` by the
1351    /// `SignedTransactionEffects` if there is no certificate yet.
1352    fn handle_transaction_response_with_executed(
1353        &self,
1354        state: &mut ProcessTransactionState,
1355        certificate: Option<CertifiedTransaction>,
1356        plain_tx_effects: SignedTransactionEffects,
1357        events: TransactionEvents,
1358    ) -> IotaResult<Option<ProcessTransactionResult>> {
1359        match certificate {
1360            Some(certificate) if certificate.epoch() == self.committee.epoch => {
1361                // If we get a certificate in the same epoch, then we use it.
1362                // A certificate in a past epoch does not guarantee finality
1363                // and validators may reject to process it.
1364                Ok(Some(ProcessTransactionResult::Certified {
1365                    certificate,
1366                    newly_formed: false,
1367                }))
1368            }
1369            _ => {
1370                // If we get 2f+1 effects, it's a proof that the transaction
1371                // has already been finalized. This works because validators would re-sign
1372                // effects for transactions that were finalized in previous
1373                // epochs.
1374                let digest = plain_tx_effects.data().digest();
1375                match state.effects_map.insert(digest, plain_tx_effects.clone()) {
1376                    InsertResult::NotEnoughVotes {
1377                        bad_votes,
1378                        bad_authorities,
1379                    } => {
1380                        state.non_retryable_stake += bad_votes;
1381                        if bad_votes > 0 {
1382                            state.errors.push((
1383                                IotaError::InvalidSignature {
1384                                    error: "Individual signature verification failed".to_string(),
1385                                },
1386                                bad_authorities,
1387                                bad_votes,
1388                            ));
1389                        }
1390                        Ok(None)
1391                    }
1392                    InsertResult::Failed { error } => Err(error),
1393                    InsertResult::QuorumReached(cert_sig) => {
1394                        let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1395                            plain_tx_effects.into_data(),
1396                            cert_sig,
1397                        );
1398                        Ok(Some(ProcessTransactionResult::Executed(
1399                            ct.verify(&self.committee)?,
1400                            events,
1401                        )))
1402                    }
1403                }
1404            }
1405        }
1406    }
1407
1408    /// Checks if we have some signed TransactionEffects but not a quorum.
1409    fn record_non_quorum_effects_maybe(
1410        &self,
1411        tx_digest: &TransactionDigest,
1412        mut state: ProcessTransactionState,
1413    ) -> ProcessTransactionState {
1414        if state.effects_map.unique_key_count() > 0 {
1415            let non_quorum_effects = state.effects_map.get_all_unique_values();
1416            warn!(
1417                ?tx_digest,
1418                "Received signed Effects but not with a quorum {:?}", non_quorum_effects
1419            );
1420
1421            // Safe to unwrap because we know that there is at least one entry in the map
1422            // from the check above.
1423            let (_most_staked_effects_digest, (_, most_staked_effects_digest_stake)) =
1424                non_quorum_effects
1425                    .iter()
1426                    .max_by_key(|&(_, (_, stake))| stake)
1427                    .unwrap();
1428            // We check if we have enough retryable stake to get quorum for the most staked
1429            // effects digest, otherwise it indicates we have violated safety assumptions
1430            // or we have forked.
1431            if most_staked_effects_digest_stake + self.get_retryable_stake(&state)
1432                < self.committee.quorum_threshold()
1433            {
1434                state.retryable = false;
1435                if state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1436                    self.committee.validity_threshold(),
1437                ) {
1438                    state.tx_finalized_with_different_user_sig = true;
1439                } else {
1440                    // TODO: Figure out a more reliable way to detect invariance violations.
1441                    error!(
1442                        "We have seen signed effects but unable to reach quorum threshold even including retriable stakes. This is very rare. Tx: {tx_digest:?}. Non-quorum effects: {non_quorum_effects:?}."
1443                    );
1444                }
1445            }
1446
1447            let mut involved_validators = Vec::new();
1448            let mut total_stake = 0;
1449            for (validators, stake) in non_quorum_effects.values() {
1450                involved_validators.extend_from_slice(validators);
1451                total_stake += stake;
1452            }
1453            // TODO: Instead of pushing a new error, we should add more information about
1454            // the non-quorum effects in the final error if state is no longer
1455            // retryable
1456            state.errors.push((
1457                IotaError::QuorumFailedToGetEffectsQuorumWhenProcessingTransaction {
1458                    effects_map: non_quorum_effects,
1459                },
1460                involved_validators,
1461                total_stake,
1462            ));
1463        }
1464        state
1465    }
1466
1467    /// Gets the retryable stake for the transaction.
1468    fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
1469        self.committee.total_votes()
1470            - state.non_retryable_stake
1471            - state.effects_map.total_votes()
1472            - state.tx_signatures.total_votes()
1473    }
1474
1475    /// Processes a given certificate by broadcasting it to authorities and
1476    /// aggregating the results until reaching a quorum.
1477    #[instrument(level = "trace", skip_all)]
1478    pub async fn process_certificate(
1479        &self,
1480        request: HandleCertificateRequestV1,
1481        client_addr: Option<SocketAddr>,
1482    ) -> Result<QuorumDriverResponse, AggregatorProcessCertificateError> {
1483        let state = ProcessCertificateState {
1484            effects_map: MultiStakeAggregator::new(self.committee.clone()),
1485            non_retryable_stake: 0,
1486            non_retryable_errors: vec![],
1487            retryable_errors: vec![],
1488            retryable: true,
1489            events: None,
1490            input_objects: None,
1491            output_objects: None,
1492            auxiliary_data: None,
1493            request: request.clone(),
1494        };
1495
1496        // create a set of validators that we should sample to request input/output
1497        // objects from
1498        let validators_to_sample =
1499            if request.include_input_objects || request.include_output_objects {
1500                // Number of validators to request input/output objects from
1501                const NUMBER_TO_SAMPLE: usize = 10;
1502
1503                self.committee
1504                    .choose_multiple_weighted_iter(NUMBER_TO_SAMPLE)
1505                    .cloned()
1506                    .collect()
1507            } else {
1508                HashSet::new()
1509            };
1510
1511        let tx_digest = *request.certificate.digest();
1512        let timeout_after_quorum = self.timeouts.post_quorum_timeout;
1513
1514        let request_ref = request;
1515        let threshold = self.committee.quorum_threshold();
1516        let validity = self.committee.validity_threshold();
1517
1518        debug!(
1519            ?tx_digest,
1520            quorum_threshold = threshold,
1521            validity_threshold = validity,
1522            ?timeout_after_quorum,
1523            "Broadcasting certificate to authorities"
1524        );
1525        let committee: Arc<Committee> = self.committee.clone();
1526        let authority_clients = self.authority_clients.clone();
1527        let metrics = self.metrics.clone();
1528        let metrics_clone = metrics.clone();
1529        let validator_display_names = self.validator_display_names.clone();
1530        let (result, mut remaining_tasks) = quorum_map_then_reduce_with_timeout(
1531            committee.clone(),
1532            authority_clients.clone(),
1533            state,
1534            move |name, client| {
1535                Box::pin(async move {
1536                    let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
1537                    let concise_name = name.concise_owned();
1538                    if request_ref.include_input_objects || request_ref.include_output_objects {
1539
1540                        // adjust the request to validators we aren't planning on sampling
1541                        let req = if validators_to_sample.contains(&name) {
1542                            request_ref
1543                        } else {
1544                            HandleCertificateRequestV1 {
1545                                include_input_objects: false,
1546                                include_output_objects: false,
1547                                include_auxiliary_data: false,
1548                                ..request_ref
1549                            }
1550                        };
1551
1552                        client
1553                            .handle_certificate_v1(req, client_addr)
1554                            .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1555                            .await
1556                    } else {
1557                        client
1558                            .handle_certificate_v1(HandleCertificateRequestV1::new(request_ref.certificate).with_events(), client_addr)
1559                            .instrument(trace_span!("handle_certificate_v1", authority =? concise_name))
1560                            .await
1561                            .map(|response| HandleCertificateResponseV1 {
1562                                signed_effects: response.signed_effects,
1563                                events: response.events,
1564                                input_objects: None,
1565                                output_objects: None,
1566                                auxiliary_data: None,
1567                            })
1568                    }
1569                })
1570            },
1571            move |mut state, name, weight, response| {
1572                let committee_clone = committee.clone();
1573                let metrics = metrics.clone();
1574                let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1575                Box::pin(async move {
1576                    // We aggregate the effects response, until we have more than 2f
1577                    // and return.
1578                    match AuthorityAggregator::<A>::handle_process_certificate_response(
1579                        committee_clone,
1580                        &metrics,
1581                        &tx_digest, &mut state, response, name)
1582                    {
1583                        Ok(Some(effects)) => ReduceOutput::Success(effects),
1584                        Ok(None) => {
1585                            // When the result is none, it is possible that the
1586                            // non_retryable_stake had been incremented due to
1587                            // failed individual signature verification.
1588                            if state.non_retryable_stake >= validity {
1589                                state.retryable = false;
1590                                ReduceOutput::Failed(state)
1591                            } else {
1592                                ReduceOutput::Continue(state)
1593                            }
1594                        },
1595                        Err(err) => {
1596                            let concise_name = name.concise();
1597                            debug!(?tx_digest, name=?concise_name, "Error processing certificate from validator: {:?}", err);
1598                            metrics
1599                                .process_cert_errors
1600                                .with_label_values(&[display_name.as_str(), err.as_ref()])
1601                                .inc();
1602                            Self::record_rpc_error_maybe(metrics, &display_name, &err);
1603                            let (retryable, categorized) = err.is_retryable();
1604                            if !categorized {
1605                                // TODO: Should minimize possible uncategorized errors here
1606                                // use ERROR for now to make them easier to spot.
1607                                error!(?tx_digest, "[WATCHOUT] uncategorized tx error: {err}");
1608                            }
1609                            if !retryable {
1610                                state.non_retryable_stake += weight;
1611                                state.non_retryable_errors.push((err, vec![name], weight));
1612                            } else {
1613                                state.retryable_errors.push((err, vec![name], weight));
1614                            }
1615                            if state.non_retryable_stake >= validity {
1616                                state.retryable = false;
1617                                ReduceOutput::Failed(state)
1618                            } else {
1619                                ReduceOutput::Continue(state)
1620                            }
1621                        }
1622                    }
1623                })
1624            },
1625            // A long timeout before we hear back from a quorum
1626            self.timeouts.pre_quorum_timeout,
1627        )
1628        .await
1629        .map_err(|state| {
1630            debug!(
1631                ?tx_digest,
1632                num_unique_effects = state.effects_map.unique_key_count(),
1633                non_retryable_stake = state.non_retryable_stake,
1634                "Received effects responses from validators"
1635            );
1636
1637            // record errors and tx retryable state
1638            for (iota_err, _, _) in state.retryable_errors.iter().chain(state.non_retryable_errors.iter()) {
1639                self
1640                    .metrics
1641                    .total_aggregated_err
1642                    .with_label_values(&[
1643                        iota_err.as_ref(),
1644                        if state.retryable {
1645                            "recoverable"
1646                        } else {
1647                            "non-recoverable"
1648                        },
1649                    ])
1650                    .inc();
1651            }
1652            if state.retryable {
1653                AggregatorProcessCertificateError::RetryableExecuteCertificate {
1654                    retryable_errors: group_errors(state.retryable_errors),
1655                }
1656            } else {
1657                AggregatorProcessCertificateError::FatalExecuteCertificate {
1658                    non_retryable_errors: group_errors(state.non_retryable_errors),
1659                }
1660            }
1661        })?;
1662
1663        let metrics = self.metrics.clone();
1664        metrics
1665            .remaining_tasks_when_reaching_cert_quorum
1666            .observe(remaining_tasks.len() as f64);
1667        if !remaining_tasks.is_empty() {
1668            // Use best efforts to send the cert to remaining validators.
1669            spawn_monitored_task!(async move {
1670                let mut timeout = Box::pin(sleep(timeout_after_quorum));
1671                loop {
1672                    tokio::select! {
1673                        _ = &mut timeout => {
1674                            debug!(?tx_digest, "Timed out in post quorum cert broadcasting: {:?}. Remaining tasks: {:?}", timeout_after_quorum, remaining_tasks.len());
1675                            metrics.cert_broadcasting_post_quorum_timeout.inc();
1676                            metrics.remaining_tasks_when_cert_broadcasting_post_quorum_timeout.observe(remaining_tasks.len() as f64);
1677                            break;
1678                        }
1679                        res = remaining_tasks.next() => {
1680                            if res.is_none() {
1681                                break;
1682                            }
1683                        }
1684                    }
1685                }
1686            });
1687        }
1688        Ok(result)
1689    }
1690
1691    /// Handles the `HandleCertificateResponseV1` variants.
1692    fn handle_process_certificate_response(
1693        committee: Arc<Committee>,
1694        metrics: &AuthAggMetrics,
1695        tx_digest: &TransactionDigest,
1696        state: &mut ProcessCertificateState,
1697        response: IotaResult<HandleCertificateResponseV1>,
1698        name: AuthorityName,
1699    ) -> IotaResult<Option<QuorumDriverResponse>> {
1700        match response {
1701            Ok(HandleCertificateResponseV1 {
1702                signed_effects,
1703                events,
1704                input_objects,
1705                output_objects,
1706                auxiliary_data,
1707            }) => {
1708                debug!(
1709                    ?tx_digest,
1710                    name = ?name.concise(),
1711                    "Validator handled certificate successfully",
1712                );
1713
1714                if events.is_some() && state.events.is_none() {
1715                    state.events = events;
1716                }
1717
1718                if input_objects.is_some() && state.input_objects.is_none() {
1719                    state.input_objects = input_objects;
1720                }
1721
1722                if output_objects.is_some() && state.output_objects.is_none() {
1723                    state.output_objects = output_objects;
1724                }
1725
1726                if auxiliary_data.is_some() && state.auxiliary_data.is_none() {
1727                    state.auxiliary_data = auxiliary_data;
1728                }
1729
1730                let effects_digest = *signed_effects.digest();
1731                // Note: here we aggregate votes by the hash of the effects structure
1732                match state.effects_map.insert(
1733                    (signed_effects.epoch(), effects_digest),
1734                    signed_effects.clone(),
1735                ) {
1736                    InsertResult::NotEnoughVotes {
1737                        bad_votes,
1738                        bad_authorities,
1739                    } => {
1740                        state.non_retryable_stake += bad_votes;
1741                        if bad_votes > 0 {
1742                            state.non_retryable_errors.push((
1743                                IotaError::InvalidSignature {
1744                                    error: "Individual signature verification failed".to_string(),
1745                                },
1746                                bad_authorities,
1747                                bad_votes,
1748                            ));
1749                        }
1750                        Ok(None)
1751                    }
1752                    InsertResult::Failed { error } => Err(error),
1753                    InsertResult::QuorumReached(cert_sig) => {
1754                        let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1755                            signed_effects.into_data(),
1756                            cert_sig,
1757                        );
1758
1759                        if (state.request.include_input_objects && state.input_objects.is_none())
1760                            || (state.request.include_output_objects
1761                                && state.output_objects.is_none())
1762                        {
1763                            metrics.quorum_reached_without_requested_objects.inc();
1764                            debug!(
1765                                ?tx_digest,
1766                                "Quorum Reached but requested input/output objects were not returned"
1767                            );
1768                        }
1769
1770                        ct.verify(&committee).map(|ct| {
1771                            debug!(?tx_digest, "Got quorum for validators handle_certificate.");
1772                            Some(QuorumDriverResponse {
1773                                effects_cert: ct,
1774                                events: state.events.take(),
1775                                input_objects: state.input_objects.take(),
1776                                output_objects: state.output_objects.take(),
1777                                auxiliary_data: state.auxiliary_data.take(),
1778                            })
1779                        })
1780                    }
1781                }
1782            }
1783            Err(err) => Err(err),
1784        }
1785    }
1786
1787    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1788    pub async fn execute_transaction_block(
1789        &self,
1790        transaction: &Transaction,
1791        client_addr: Option<SocketAddr>,
1792    ) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
1793        let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
1794        let result = self
1795            .process_transaction(transaction.clone(), client_addr)
1796            .await?;
1797        let cert = match result {
1798            ProcessTransactionResult::Certified { certificate, .. } => certificate,
1799            ProcessTransactionResult::Executed(effects, _) => {
1800                return Ok(effects);
1801            }
1802        };
1803        self.metrics.total_tx_certificates_created.inc();
1804        drop(tx_guard);
1805
1806        let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
1807        let response = self
1808            .process_certificate(
1809                HandleCertificateRequestV1 {
1810                    certificate: cert.clone(),
1811                    include_events: true,
1812                    include_input_objects: false,
1813                    include_output_objects: false,
1814                    include_auxiliary_data: false,
1815                },
1816                client_addr,
1817            )
1818            .await?;
1819
1820        Ok(response.effects_cert)
1821    }
1822
1823    /// This function tries to get SignedTransaction OR CertifiedTransaction
1824    /// from an given list of validators who are supposed to know about it.
1825    #[instrument(level = "trace", skip_all, fields(?tx_digest))]
1826    pub async fn handle_transaction_info_request_from_some_validators(
1827        &self,
1828        tx_digest: &TransactionDigest,
1829        // authorities known to have the transaction info we are requesting.
1830        validators: &BTreeSet<AuthorityName>,
1831        timeout_total: Option<Duration>,
1832    ) -> IotaResult<PlainTransactionInfoResponse> {
1833        self.quorum_once_with_timeout(
1834            None,
1835            Some(validators),
1836            |_authority, client| {
1837                Box::pin(async move {
1838                    client
1839                        .handle_transaction_info_request(TransactionInfoRequest {
1840                            transaction_digest: *tx_digest,
1841                        })
1842                        .await
1843                })
1844            },
1845            Duration::from_secs(2),
1846            timeout_total,
1847            "handle_transaction_info_request_from_some_validators".to_string(),
1848        )
1849        .await
1850    }
1851}
1852
1853/// `AuthorityAggregatorBuilder` is used to build an `AuthorityAggregator` with
1854/// customizable configurations for the IOTA network.
1855#[derive(Default)]
1856pub struct AuthorityAggregatorBuilder<'a> {
1857    network_config: Option<&'a NetworkConfig>,
1858    genesis: Option<&'a Genesis>,
1859    committee: Option<Committee>,
1860    committee_store: Option<Arc<CommitteeStore>>,
1861    registry: Option<&'a Registry>,
1862    timeouts_config: Option<TimeoutConfig>,
1863}
1864
1865impl<'a> AuthorityAggregatorBuilder<'a> {
1866    /// Creates a new `AuthorityAggregatorBuilder` from a `NetworkConfig`.
1867    pub fn from_network_config(config: &'a NetworkConfig) -> Self {
1868        Self {
1869            network_config: Some(config),
1870            ..Default::default()
1871        }
1872    }
1873
1874    /// Creates a new `AuthorityAggregatorBuilder` from a `Genesis`.
1875    pub fn from_genesis(genesis: &'a Genesis) -> Self {
1876        Self {
1877            genesis: Some(genesis),
1878            ..Default::default()
1879        }
1880    }
1881
1882    /// Creates a new `AuthorityAggregatorBuilder` from a `Committee`.
1883    pub fn from_committee(committee: Committee) -> Self {
1884        Self {
1885            committee: Some(committee),
1886            ..Default::default()
1887        }
1888    }
1889
1890    /// Sets the `CommitteeStore`.
1891    pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
1892        self.committee_store = Some(committee_store);
1893        self
1894    }
1895
1896    /// Sets the Prometheus registry.
1897    pub fn with_registry(mut self, registry: &'a Registry) -> Self {
1898        self.registry = Some(registry);
1899        self
1900    }
1901
1902    /// Sets the timeouts configuration.
1903    pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
1904        self.timeouts_config = Some(timeouts_config);
1905        self
1906    }
1907
1908    fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
1909        let genesis = if let Some(network_config) = self.network_config {
1910            &network_config.genesis
1911        } else if let Some(genesis) = self.genesis {
1912            genesis
1913        } else {
1914            panic!("need either NetworkConfig or Genesis.");
1915        };
1916        genesis.committee_with_network()
1917    }
1918
1919    fn get_committee(&self) -> Committee {
1920        self.committee
1921            .clone()
1922            .unwrap_or_else(|| self.get_network_committee().committee().clone())
1923    }
1924
1925    pub fn build_network_clients(
1926        self,
1927    ) -> (
1928        AuthorityAggregator<NetworkAuthorityClient>,
1929        BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
1930    ) {
1931        let network_committee = self.get_network_committee();
1932        let auth_clients = make_authority_clients_with_timeout_config(
1933            &network_committee,
1934            DEFAULT_CONNECT_TIMEOUT_SEC,
1935            DEFAULT_REQUEST_TIMEOUT_SEC,
1936        );
1937        let auth_agg = self.build_custom_clients(auth_clients.clone());
1938        (auth_agg, auth_clients)
1939    }
1940
1941    pub fn build_custom_clients<C: Clone>(
1942        self,
1943        authority_clients: BTreeMap<AuthorityName, C>,
1944    ) -> AuthorityAggregator<C> {
1945        let committee = self.get_committee();
1946        let registry = Registry::new();
1947        let registry = self.registry.unwrap_or(&registry);
1948        let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
1949        let auth_agg_metrics = Arc::new(AuthAggMetrics::new(registry));
1950
1951        let committee_store = self
1952            .committee_store
1953            .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
1954
1955        let timeouts_config = self.timeouts_config.unwrap_or_default();
1956
1957        AuthorityAggregator::new(
1958            committee,
1959            committee_store,
1960            authority_clients,
1961            safe_client_metrics_base,
1962            auth_agg_metrics,
1963            Arc::new(HashMap::new()),
1964            timeouts_config,
1965        )
1966    }
1967}