Skip to main content

iota_core/
validator_tx_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12    base_types::{AuthorityName, TransactionDigest},
13    transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16    Histogram, IntCounter, Registry, register_histogram_with_registry,
17    register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23    authority::authority_per_epoch_store::AuthorityPerEpochStore,
24    authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
25    execution_cache::TransactionCacheRead,
26};
27
28struct ValidatorTxFinalizerMetrics {
29    num_finalization_attempts: IntCounter,
30    num_successful_finalizations: IntCounter,
31    finalization_latency: Histogram,
32    validator_tx_finalizer_attempt_delay: Histogram,
33    #[cfg(any(msim, test))]
34    num_finalization_attempts_for_testing: AtomicU64,
35    #[cfg(test)]
36    num_successful_finalizations_for_testing: AtomicU64,
37}
38
39impl ValidatorTxFinalizerMetrics {
40    fn new(registry: &Registry) -> Self {
41        Self {
42            num_finalization_attempts: register_int_counter_with_registry!(
43                "validator_tx_finalizer_num_finalization_attempts",
44                "Total number of attempts to finalize a transaction",
45                registry,
46            )
47            .unwrap(),
48            num_successful_finalizations: register_int_counter_with_registry!(
49                "validator_tx_finalizer_num_successful_finalizations",
50                "Number of transactions successfully finalized",
51                registry,
52            )
53            .unwrap(),
54            finalization_latency: register_histogram_with_registry!(
55                "validator_tx_finalizer_finalization_latency",
56                "Latency of transaction finalization",
57                LATENCY_SEC_BUCKETS.to_vec(),
58                registry,
59            )
60            .unwrap(),
61            validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
62                "validator_tx_finalizer_attempt_delay",
63                "Duration that a validator in the committee waited before attempting to finalize the transaction",
64                vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
65                registry,
66            )
67            .unwrap(),
68            #[cfg(any(msim, test))]
69            num_finalization_attempts_for_testing: AtomicU64::new(0),
70            #[cfg(test)]
71            num_successful_finalizations_for_testing: AtomicU64::new(0),
72        }
73    }
74
75    fn start_finalization(&self) -> Instant {
76        self.num_finalization_attempts.inc();
77        #[cfg(any(msim, test))]
78        self.num_finalization_attempts_for_testing
79            .fetch_add(1, Relaxed);
80        Instant::now()
81    }
82
83    fn finalization_succeeded(&self, start_time: Instant) {
84        let latency = start_time.elapsed();
85        self.num_successful_finalizations.inc();
86        self.finalization_latency.observe(latency.as_secs_f64());
87        #[cfg(test)]
88        self.num_successful_finalizations_for_testing
89            .fetch_add(1, Relaxed);
90    }
91}
92
93pub struct ValidatorTxFinalizerConfig {
94    pub tx_finalization_delay: Duration,
95    pub tx_finalization_timeout: Duration,
96    /// Incremental delay for validators to wake up to finalize a transaction.
97    pub validator_delay_increments_sec: u64,
98    pub validator_max_delay: Duration,
99}
100
101#[cfg(not(any(msim, test)))]
102impl Default for ValidatorTxFinalizerConfig {
103    fn default() -> Self {
104        Self {
105            // Only wake up the transaction finalization task for a given transaction
106            // after 1 mins of seeing it. This gives plenty of time for the transaction
107            // to become final in the normal way. We also don't want this delay to be too long
108            // to reduce memory usage held up by the finalizer threads.
109            tx_finalization_delay: Duration::from_secs(60),
110            // If a transaction can not be finalized within 1 min of being woken up, give up.
111            tx_finalization_timeout: Duration::from_secs(60),
112            validator_delay_increments_sec: 10,
113            validator_max_delay: Duration::from_secs(180),
114        }
115    }
116}
117
118#[cfg(any(msim, test))]
119impl Default for ValidatorTxFinalizerConfig {
120    fn default() -> Self {
121        Self {
122            tx_finalization_delay: Duration::from_secs(5),
123            tx_finalization_timeout: Duration::from_secs(60),
124            validator_delay_increments_sec: 1,
125            validator_max_delay: Duration::from_secs(15),
126        }
127    }
128}
129
130/// The `ValidatorTxFinalizer` is responsible for finalizing transactions that
131/// have been signed by the validator. It does this by waiting for a delay
132/// after the transaction has been signed, and then attempting to finalize
133/// the transaction if it has not yet been done by a fullnode.
134pub struct ValidatorTxFinalizer<C: Clone> {
135    agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
136    name: AuthorityName,
137    config: Arc<ValidatorTxFinalizerConfig>,
138    metrics: Arc<ValidatorTxFinalizerMetrics>,
139}
140
141impl<C: Clone> ValidatorTxFinalizer<C> {
142    pub fn new(
143        agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
144        name: AuthorityName,
145        registry: &Registry,
146    ) -> Self {
147        Self {
148            agg,
149            name,
150            config: Arc::new(ValidatorTxFinalizerConfig::default()),
151            metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
152        }
153    }
154
155    #[cfg(test)]
156    pub(crate) fn new_for_testing(
157        agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
158        name: AuthorityName,
159    ) -> Self {
160        Self::new(agg, name, &Registry::new())
161    }
162
163    #[cfg(test)]
164    pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
165        &self.agg
166    }
167
168    #[cfg(any(msim, test))]
169    pub fn num_finalization_attempts_for_testing(&self) -> u64 {
170        self.metrics
171            .num_finalization_attempts_for_testing
172            .load(Relaxed)
173    }
174}
175
176impl<C> ValidatorTxFinalizer<C>
177where
178    C: Clone + AuthorityAPI + Send + Sync + 'static,
179{
180    pub async fn track_signed_tx(
181        &self,
182        cache_read: Arc<dyn TransactionCacheRead>,
183        epoch_store: &Arc<AuthorityPerEpochStore>,
184        tx: VerifiedSignedTransaction,
185    ) {
186        let tx_digest = *tx.digest();
187        trace!(?tx_digest, "Tracking signed transaction");
188        match self
189            .delay_and_finalize_tx(cache_read, epoch_store, tx)
190            .await
191        {
192            Ok(did_run) => {
193                if did_run {
194                    debug!(?tx_digest, "Transaction finalized");
195                }
196            }
197            Err(err) => {
198                error!(?tx_digest, ?err, "Failed to finalize transaction");
199            }
200        }
201    }
202
203    async fn delay_and_finalize_tx(
204        &self,
205        cache_read: Arc<dyn TransactionCacheRead>,
206        epoch_store: &Arc<AuthorityPerEpochStore>,
207        tx: VerifiedSignedTransaction,
208    ) -> anyhow::Result<bool> {
209        let tx_digest = *tx.digest();
210        let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
211            return Ok(false);
212        };
213        let digests = [tx_digest];
214        select! {
215            _ = tokio::time::sleep(tx_finalization_delay) => {
216                trace!(?tx_digest, "Waking up to finalize transaction");
217            }
218            _ = cache_read.try_notify_read_executed_effects_digests(&digests) => {
219                trace!(?tx_digest, "Transaction already finalized");
220                return Ok(false);
221            }
222        }
223
224        if epoch_store.is_pending_consensus_certificate(&tx_digest) {
225            trace!(
226                ?tx_digest,
227                "Transaction has been submitted to consensus, no need to help drive finality"
228            );
229            return Ok(false);
230        }
231
232        self.metrics
233            .validator_tx_finalizer_attempt_delay
234            .observe(tx_finalization_delay.as_secs_f64());
235        let start_time = self.metrics.start_finalization();
236        debug!(
237            ?tx_digest,
238            "Invoking authority aggregator to finalize transaction"
239        );
240        tokio::time::timeout(
241            self.config.tx_finalization_timeout,
242            self.agg
243                .load()
244                .execute_transaction_block(tx.into_unsigned().inner(), None),
245        )
246        .await??;
247        self.metrics.finalization_succeeded(start_time);
248        Ok(true)
249    }
250
251    // We want to avoid all validators waking up at the same time to finalize the
252    // same transaction. That can lead to a waste of resource and flood the
253    // network unnecessarily. Here we use the transaction digest to determine an
254    // order of all validators. Validators will wake up one by one with
255    // incremental delays to finalize the transaction. The hope is that the
256    // first few should be able to finalize the transaction, and the rest will
257    // see it already executed and do not need to do anything.
258    fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259        let agg = self.agg.load();
260        let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261        let Some(position) = order.iter().position(|&name| name == self.name) else {
262            // Somehow the validator is not found in the committee. This should never
263            // happen. TODO: This is where we should report system invariant
264            // violation.
265            error!("Validator {} not found in the committee", self.name);
266            return None;
267        };
268        // TODO: As an optimization, we could also limit the number of validators that
269        // would do this.
270        let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
271        let delay = self
272            .config
273            .tx_finalization_delay
274            .add(Duration::from_secs(extra_delay));
275        Some(min(delay, self.config.validator_max_delay))
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use std::{
282        cmp::min,
283        collections::BTreeMap,
284        iter,
285        net::SocketAddr,
286        num::NonZeroUsize,
287        sync::{
288            Arc,
289            atomic::{AtomicBool, Ordering::Relaxed},
290        },
291    };
292
293    use arc_swap::ArcSwap;
294    use async_trait::async_trait;
295    use iota_macros::sim_test;
296    use iota_swarm_config::network_config_builder::ConfigBuilder;
297    use iota_test_transaction_builder::TestTransactionBuilder;
298    use iota_types::{
299        base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
300        committee::{CommitteeTrait, StakeUnit},
301        crypto::{AccountKeyPair, get_account_key_pair},
302        effects::{TransactionEffectsAPI, TransactionEvents},
303        error::IotaError,
304        executable_transaction::VerifiedExecutableTransaction,
305        iota_system_state::IotaSystemState,
306        messages_checkpoint::{CheckpointRequest, CheckpointResponse},
307        messages_grpc::{
308            HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
309            HandleCertificateRequestV1, HandleCertificateResponseV1,
310            HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
311            HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
312            TransactionInfoRequest, TransactionInfoResponse,
313        },
314        object::Object,
315        transaction::{
316            SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
317            VerifiedTransaction,
318        },
319        utils::to_sender_signed_transaction,
320    };
321
322    use crate::{
323        authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
324        authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
325        authority_client::AuthorityAPI,
326        validator_tx_finalizer::ValidatorTxFinalizer,
327    };
328
329    #[derive(Clone)]
330    struct MockAuthorityClient {
331        authority: Arc<AuthorityState>,
332        inject_fault: Arc<AtomicBool>,
333    }
334
335    #[async_trait]
336    impl AuthorityAPI for MockAuthorityClient {
337        async fn handle_transaction(
338            &self,
339            transaction: Transaction,
340            _client_addr: Option<SocketAddr>,
341        ) -> Result<HandleTransactionResponse, IotaError> {
342            if self.inject_fault.load(Relaxed) {
343                return Err(IotaError::Timeout);
344            }
345            let epoch_store = self.authority.epoch_store_for_testing();
346            self.authority
347                .handle_transaction(
348                    &epoch_store,
349                    VerifiedTransaction::new_unchecked(transaction),
350                )
351                .await
352        }
353
354        async fn handle_certificate_v1(
355            &self,
356            request: HandleCertificateRequestV1,
357            _client_addr: Option<SocketAddr>,
358        ) -> Result<HandleCertificateResponseV1, IotaError> {
359            let epoch_store = self.authority.epoch_store_for_testing();
360            let (effects, _) = self.authority.try_execute_immediately(
361                &VerifiedExecutableTransaction::new_from_certificate(
362                    VerifiedCertificate::new_unchecked(request.certificate),
363                ),
364                None,
365                &epoch_store,
366            )?;
367            let events = if effects.events_digest().is_some() {
368                self.authority
369                    .get_transaction_events(effects.transaction_digest())?
370            } else {
371                TransactionEvents::default()
372            };
373            let signed_effects = self
374                .authority
375                .sign_effects(effects, &epoch_store)?
376                .into_inner();
377            Ok(HandleCertificateResponseV1 {
378                signed_effects,
379                events: Some(events),
380                input_objects: None,
381                output_objects: None,
382                auxiliary_data: None,
383            })
384        }
385
386        async fn handle_soft_bundle_certificates_v1(
387            &self,
388            _request: HandleSoftBundleCertificatesRequestV1,
389            _client_addr: Option<SocketAddr>,
390        ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
391            unimplemented!()
392        }
393
394        async fn handle_object_info_request(
395            &self,
396            _request: ObjectInfoRequest,
397        ) -> Result<ObjectInfoResponse, IotaError> {
398            unimplemented!()
399        }
400
401        async fn handle_transaction_info_request(
402            &self,
403            _request: TransactionInfoRequest,
404        ) -> Result<TransactionInfoResponse, IotaError> {
405            unimplemented!()
406        }
407
408        async fn handle_checkpoint(
409            &self,
410            _request: CheckpointRequest,
411        ) -> Result<CheckpointResponse, IotaError> {
412            unimplemented!()
413        }
414
415        async fn handle_system_state_object(
416            &self,
417            _request: SystemStateRequest,
418        ) -> Result<IotaSystemState, IotaError> {
419            unimplemented!()
420        }
421
422        async fn handle_capability_notification_v1(
423            &self,
424            _request: HandleCapabilityNotificationRequestV1,
425        ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
426            unimplemented!()
427        }
428    }
429
430    #[sim_test]
431    async fn test_validator_tx_finalizer_basic_flow() {
432        telemetry_subscribers::init_for_testing();
433        let (sender, keypair) = get_account_key_pair();
434        let gas_object = Object::with_owner_for_testing(sender);
435        let gas_object_id = gas_object.id();
436        let (states, auth_agg, clients) = create_validators(gas_object).await;
437        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
438        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
439        let tx_digest = *signed_tx.digest();
440        let cache_read = states[0].get_transaction_cache_reader().clone();
441        let epoch_store = states[0].epoch_store_for_testing();
442        let metrics = finalizer1.metrics.clone();
443        let handle = tokio::spawn(async move {
444            finalizer1
445                .track_signed_tx(cache_read, &epoch_store, signed_tx)
446                .await;
447        });
448        handle.await.unwrap();
449        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
450        assert_eq!(
451            metrics.num_finalization_attempts_for_testing.load(Relaxed),
452            1
453        );
454        assert_eq!(
455            metrics
456                .num_successful_finalizations_for_testing
457                .load(Relaxed),
458            1
459        );
460    }
461
462    #[tokio::test]
463    async fn test_validator_tx_finalizer_new_epoch() {
464        let (sender, keypair) = get_account_key_pair();
465        let gas_object = Object::with_owner_for_testing(sender);
466        let gas_object_id = gas_object.id();
467        let (states, auth_agg, clients) = create_validators(gas_object).await;
468        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
469        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
470        let tx_digest = *signed_tx.digest();
471        let epoch_store = states[0].epoch_store_for_testing();
472        let cache_read = states[0].get_transaction_cache_reader().clone();
473
474        let metrics = finalizer1.metrics.clone();
475        let handle = tokio::spawn(async move {
476            let _ = epoch_store
477                .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
478                .await;
479        });
480        states[0].reconfigure_for_testing().await;
481        handle.await.unwrap();
482        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
483        assert_eq!(
484            metrics.num_finalization_attempts_for_testing.load(Relaxed),
485            0
486        );
487        assert_eq!(
488            metrics
489                .num_successful_finalizations_for_testing
490                .load(Relaxed),
491            0
492        );
493    }
494
495    #[tokio::test]
496    async fn test_validator_tx_finalizer_auth_agg_reconfig() {
497        let (sender, _) = get_account_key_pair();
498        let gas_object = Object::with_owner_for_testing(sender);
499        let (states, auth_agg, _clients) = create_validators(gas_object).await;
500        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
501        let mut new_auth_agg = (**auth_agg.load()).clone();
502        let mut new_committee = (*new_auth_agg.committee).clone();
503        new_committee.epoch = 100;
504        new_auth_agg.committee = Arc::new(new_committee);
505        auth_agg.store(Arc::new(new_auth_agg));
506        assert_eq!(
507            finalizer1.auth_agg().load().committee.epoch,
508            100,
509            "AuthorityAggregator not updated"
510        );
511    }
512
513    #[tokio::test]
514    async fn test_validator_tx_finalizer_already_executed() {
515        telemetry_subscribers::init_for_testing();
516        let (sender, keypair) = get_account_key_pair();
517        let gas_object = Object::with_owner_for_testing(sender);
518        let gas_object_id = gas_object.id();
519        let (states, auth_agg, clients) = create_validators(gas_object).await;
520        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
521        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
522        let tx_digest = *signed_tx.digest();
523        let cache_read = states[0].get_transaction_cache_reader().clone();
524        let epoch_store = states[0].epoch_store_for_testing();
525
526        let metrics = finalizer1.metrics.clone();
527        let signed_tx_clone = signed_tx.clone();
528        let handle = tokio::spawn(async move {
529            finalizer1
530                .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
531                .await;
532        });
533        auth_agg
534            .load()
535            .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
536            .await
537            .unwrap();
538        handle.await.unwrap();
539        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
540        assert_eq!(
541            metrics.num_finalization_attempts_for_testing.load(Relaxed),
542            0
543        );
544        assert_eq!(
545            metrics
546                .num_successful_finalizations_for_testing
547                .load(Relaxed),
548            0
549        );
550    }
551
552    #[tokio::test]
553    async fn test_validator_tx_finalizer_timeout() {
554        telemetry_subscribers::init_for_testing();
555        let (sender, keypair) = get_account_key_pair();
556        let gas_object = Object::with_owner_for_testing(sender);
557        let gas_object_id = gas_object.id();
558        let (states, auth_agg, clients) = create_validators(gas_object).await;
559        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
560        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
561        let tx_digest = *signed_tx.digest();
562        let cache_read = states[0].get_transaction_cache_reader().clone();
563        let epoch_store = states[0].epoch_store_for_testing();
564        for client in clients.values() {
565            client.inject_fault.store(true, Relaxed);
566        }
567
568        let metrics = finalizer1.metrics.clone();
569        let signed_tx_clone = signed_tx.clone();
570        let handle = tokio::spawn(async move {
571            finalizer1
572                .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
573                .await;
574        });
575        handle.await.unwrap();
576        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
577        assert_eq!(
578            metrics.num_finalization_attempts_for_testing.load(Relaxed),
579            1
580        );
581        assert_eq!(
582            metrics
583                .num_successful_finalizations_for_testing
584                .load(Relaxed),
585            0
586        );
587    }
588
589    #[tokio::test]
590    async fn test_validator_tx_finalizer_determine_finalization_delay() {
591        const COMMITTEE_SIZE: usize = 15;
592        let network_config = ConfigBuilder::new_with_temp_dir()
593            .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
594            .build();
595        let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
596            .build_network_clients();
597        let auth_agg = Arc::new(auth_agg);
598        let finalizers = (0..COMMITTEE_SIZE)
599            .map(|idx| {
600                ValidatorTxFinalizer::new_for_testing(
601                    Arc::new(ArcSwap::new(auth_agg.clone())),
602                    auth_agg.committee.voting_rights[idx].0,
603                )
604            })
605            .collect::<Vec<_>>();
606        let config = finalizers[0].config.clone();
607        for _ in 0..100 {
608            let tx_digest = TransactionDigest::random();
609            let mut delays: Vec<_> = finalizers
610                .iter()
611                .map(|finalizer| {
612                    finalizer
613                        .determine_finalization_delay(&tx_digest)
614                        .map(|delay| delay.as_secs())
615                        .unwrap()
616                })
617                .collect();
618            delays.sort();
619            for (idx, delay) in delays.iter().enumerate() {
620                assert_eq!(
621                    *delay,
622                    min(
623                        config.validator_max_delay.as_secs(),
624                        config.tx_finalization_delay.as_secs()
625                            + idx as u64 * config.validator_delay_increments_sec
626                    )
627                );
628            }
629        }
630    }
631
632    async fn create_validators(
633        gas_object: Object,
634    ) -> (
635        Vec<Arc<AuthorityState>>,
636        Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
637        BTreeMap<AuthorityName, MockAuthorityClient>,
638    ) {
639        let network_config = ConfigBuilder::new_with_temp_dir()
640            .committee_size(NonZeroUsize::new(4).unwrap())
641            .with_objects(iter::once(gas_object))
642            .build();
643        let mut authority_states = vec![];
644        for idx in 0..4 {
645            let state = TestAuthorityBuilder::new()
646                .with_network_config(&network_config, idx)
647                .build()
648                .await;
649            authority_states.push(state);
650        }
651        let clients: BTreeMap<_, _> = authority_states
652            .iter()
653            .map(|state| {
654                (
655                    state.name,
656                    MockAuthorityClient {
657                        authority: state.clone(),
658                        inject_fault: Arc::new(AtomicBool::new(false)),
659                    },
660                )
661            })
662            .collect();
663        let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
664            .build_custom_clients(clients.clone());
665        (
666            authority_states,
667            Arc::new(ArcSwap::new(Arc::new(auth_agg))),
668            clients,
669        )
670    }
671
672    async fn create_tx(
673        clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
674        state: &Arc<AuthorityState>,
675        sender: IotaAddress,
676        keypair: &AccountKeyPair,
677        gas_object_id: ObjectID,
678    ) -> VerifiedSignedTransaction {
679        let gas_object_ref = state
680            .get_object(&gas_object_id)
681            .await
682            .unwrap()
683            .compute_object_reference();
684        let tx_data = TestTransactionBuilder::new(
685            sender,
686            gas_object_ref,
687            state.reference_gas_price_for_testing().unwrap(),
688        )
689        .transfer_iota(None, sender)
690        .build();
691        let tx = to_sender_signed_transaction(tx_data, keypair);
692        let response = clients
693            .get(&state.name)
694            .unwrap()
695            .handle_transaction(tx.clone(), None)
696            .await
697            .unwrap();
698        VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
699            tx.into_data(),
700            response.status.into_signed_for_testing(),
701        ))
702    }
703
704    fn check_quorum_execution(
705        auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
706        clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
707        tx_digest: &TransactionDigest,
708        expected: bool,
709    ) {
710        let quorum = auth_agg.committee.quorum_threshold();
711        let executed_weight: StakeUnit = clients
712            .iter()
713            .filter_map(|(name, client)| {
714                client
715                    .authority
716                    .is_tx_already_executed(tx_digest)
717                    .then_some(auth_agg.committee.weight(name))
718            })
719            .sum();
720        assert_eq!(executed_weight >= quorum, expected);
721    }
722}