iota_core/
validator_tx_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12    base_types::{AuthorityName, TransactionDigest},
13    transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16    Histogram, IntCounter, Registry, register_histogram_with_registry,
17    register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23    authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
24    execution_cache::TransactionCacheRead,
25};
26
27struct ValidatorTxFinalizerMetrics {
28    num_finalization_attempts: IntCounter,
29    num_successful_finalizations: IntCounter,
30    finalization_latency: Histogram,
31    validator_tx_finalizer_attempt_delay: Histogram,
32    #[cfg(any(msim, test))]
33    num_finalization_attempts_for_testing: AtomicU64,
34    #[cfg(test)]
35    num_successful_finalizations_for_testing: AtomicU64,
36}
37
38impl ValidatorTxFinalizerMetrics {
39    fn new(registry: &Registry) -> Self {
40        Self {
41            num_finalization_attempts: register_int_counter_with_registry!(
42                "validator_tx_finalizer_num_finalization_attempts",
43                "Total number of attempts to finalize a transaction",
44                registry,
45            )
46            .unwrap(),
47            num_successful_finalizations: register_int_counter_with_registry!(
48                "validator_tx_finalizer_num_successful_finalizations",
49                "Number of transactions successfully finalized",
50                registry,
51            )
52            .unwrap(),
53            finalization_latency: register_histogram_with_registry!(
54                "validator_tx_finalizer_finalization_latency",
55                "Latency of transaction finalization",
56                LATENCY_SEC_BUCKETS.to_vec(),
57                registry,
58            )
59            .unwrap(),
60            validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
61                "validator_tx_finalizer_attempt_delay",
62                "Duration that a validator in the committee waited before attempting to finalize the transaction",
63                vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
64                registry,
65            )
66            .unwrap(),
67            #[cfg(any(msim, test))]
68            num_finalization_attempts_for_testing: AtomicU64::new(0),
69            #[cfg(test)]
70            num_successful_finalizations_for_testing: AtomicU64::new(0),
71        }
72    }
73
74    fn start_finalization(&self) -> Instant {
75        self.num_finalization_attempts.inc();
76        #[cfg(any(msim, test))]
77        self.num_finalization_attempts_for_testing
78            .fetch_add(1, Relaxed);
79        Instant::now()
80    }
81
82    fn finalization_succeeded(&self, start_time: Instant) {
83        let latency = start_time.elapsed();
84        self.num_successful_finalizations.inc();
85        self.finalization_latency.observe(latency.as_secs_f64());
86        #[cfg(test)]
87        self.num_successful_finalizations_for_testing
88            .fetch_add(1, Relaxed);
89    }
90}
91
92pub struct ValidatorTxFinalizerConfig {
93    pub tx_finalization_delay: Duration,
94    pub tx_finalization_timeout: Duration,
95    /// Incremental delay for validators to wake up to finalize a transaction.
96    pub validator_delay_increments_sec: u64,
97    pub validator_max_delay: Duration,
98}
99
100#[cfg(not(any(msim, test)))]
101impl Default for ValidatorTxFinalizerConfig {
102    fn default() -> Self {
103        Self {
104            // Only wake up the transaction finalization task for a given transaction
105            // after 1 mins of seeing it. This gives plenty of time for the transaction
106            // to become final in the normal way. We also don't want this delay to be too long
107            // to reduce memory usage held up by the finalizer threads.
108            tx_finalization_delay: Duration::from_secs(60),
109            // If a transaction can not be finalized within 1 min of being woken up, give up.
110            tx_finalization_timeout: Duration::from_secs(60),
111            validator_delay_increments_sec: 10,
112            validator_max_delay: Duration::from_secs(180),
113        }
114    }
115}
116
117#[cfg(any(msim, test))]
118impl Default for ValidatorTxFinalizerConfig {
119    fn default() -> Self {
120        Self {
121            tx_finalization_delay: Duration::from_secs(5),
122            tx_finalization_timeout: Duration::from_secs(60),
123            validator_delay_increments_sec: 1,
124            validator_max_delay: Duration::from_secs(15),
125        }
126    }
127}
128
129/// The `ValidatorTxFinalizer` is responsible for finalizing transactions that
130/// have been signed by the validator. It does this by waiting for a delay
131/// after the transaction has been signed, and then attempting to finalize
132/// the transaction if it has not yet been done by a fullnode.
133pub struct ValidatorTxFinalizer<C: Clone> {
134    agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
135    name: AuthorityName,
136    config: Arc<ValidatorTxFinalizerConfig>,
137    metrics: Arc<ValidatorTxFinalizerMetrics>,
138}
139
140impl<C: Clone> ValidatorTxFinalizer<C> {
141    pub fn new(
142        agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
143        name: AuthorityName,
144        registry: &Registry,
145    ) -> Self {
146        Self {
147            agg,
148            name,
149            config: Arc::new(ValidatorTxFinalizerConfig::default()),
150            metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
151        }
152    }
153
154    #[cfg(test)]
155    pub(crate) fn new_for_testing(
156        agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
157        name: AuthorityName,
158    ) -> Self {
159        Self::new(agg, name, &Registry::new())
160    }
161
162    #[cfg(test)]
163    pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
164        &self.agg
165    }
166
167    #[cfg(any(msim, test))]
168    pub fn num_finalization_attempts_for_testing(&self) -> u64 {
169        self.metrics
170            .num_finalization_attempts_for_testing
171            .load(Relaxed)
172    }
173}
174
175impl<C> ValidatorTxFinalizer<C>
176where
177    C: Clone + AuthorityAPI + Send + Sync + 'static,
178{
179    pub async fn track_signed_tx(
180        &self,
181        cache_read: Arc<dyn TransactionCacheRead>,
182        tx: VerifiedSignedTransaction,
183    ) {
184        let tx_digest = *tx.digest();
185        trace!(?tx_digest, "Tracking signed transaction");
186        match self.delay_and_finalize_tx(cache_read, tx).await {
187            Ok(did_run) => {
188                if did_run {
189                    debug!(?tx_digest, "Transaction finalized");
190                }
191            }
192            Err(err) => {
193                error!(?tx_digest, ?err, "Failed to finalize transaction");
194            }
195        }
196    }
197
198    async fn delay_and_finalize_tx(
199        &self,
200        cache_read: Arc<dyn TransactionCacheRead>,
201        tx: VerifiedSignedTransaction,
202    ) -> anyhow::Result<bool> {
203        let tx_digest = *tx.digest();
204        let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
205            return Ok(false);
206        };
207        let digests = [tx_digest];
208        select! {
209            _ = tokio::time::sleep(tx_finalization_delay) => {
210                trace!(?tx_digest, "Waking up to finalize transaction");
211            }
212            _ = cache_read.notify_read_executed_effects_digests(&digests) => {
213                trace!(?tx_digest, "Transaction already finalized");
214                return Ok(false);
215            }
216        }
217
218        self.metrics
219            .validator_tx_finalizer_attempt_delay
220            .observe(tx_finalization_delay.as_secs_f64());
221        let start_time = self.metrics.start_finalization();
222        debug!(
223            ?tx_digest,
224            "Invoking authority aggregator to finalize transaction"
225        );
226        tokio::time::timeout(
227            self.config.tx_finalization_timeout,
228            self.agg
229                .load()
230                .execute_transaction_block(tx.into_unsigned().inner(), None),
231        )
232        .await??;
233        self.metrics.finalization_succeeded(start_time);
234        Ok(true)
235    }
236
237    // We want to avoid all validators waking up at the same time to finalize the
238    // same transaction. That can lead to a waste of resource and flood the
239    // network unnecessarily. Here we use the transaction digest to determine an
240    // order of all validators. Validators will wake up one by one with
241    // incremental delays to finalize the transaction. The hope is that the
242    // first few should be able to finalize the transaction, and the rest will
243    // see it already executed and do not need to do anything.
244    fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
245        let agg = self.agg.load();
246        let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
247        let Some(position) = order.iter().position(|&name| name == self.name) else {
248            // Somehow the validator is not found in the committee. This should never
249            // happen. TODO: This is where we should report system invariant
250            // violation.
251            error!("Validator {} not found in the committee", self.name);
252            return None;
253        };
254        // TODO: As an optimization, we could also limit the number of validators that
255        // would do this.
256        let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
257        let delay = self
258            .config
259            .tx_finalization_delay
260            .add(Duration::from_secs(extra_delay));
261        Some(min(delay, self.config.validator_max_delay))
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use std::{
268        cmp::min,
269        collections::BTreeMap,
270        iter,
271        net::SocketAddr,
272        num::NonZeroUsize,
273        sync::{
274            Arc,
275            atomic::{AtomicBool, Ordering::Relaxed},
276        },
277    };
278
279    use arc_swap::ArcSwap;
280    use async_trait::async_trait;
281    use iota_macros::sim_test;
282    use iota_swarm_config::network_config_builder::ConfigBuilder;
283    use iota_test_transaction_builder::TestTransactionBuilder;
284    use iota_types::{
285        base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
286        committee::{CommitteeTrait, StakeUnit},
287        crypto::{AccountKeyPair, get_account_key_pair},
288        effects::{TransactionEffectsAPI, TransactionEvents},
289        error::IotaError,
290        executable_transaction::VerifiedExecutableTransaction,
291        iota_system_state::IotaSystemState,
292        messages_checkpoint::{CheckpointRequest, CheckpointResponse},
293        messages_grpc::{
294            HandleCertificateRequestV1, HandleCertificateResponseV1,
295            HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
296            HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
297            TransactionInfoRequest, TransactionInfoResponse,
298        },
299        object::Object,
300        transaction::{
301            SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
302            VerifiedTransaction,
303        },
304        utils::to_sender_signed_transaction,
305    };
306
307    use crate::{
308        authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
309        authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
310        authority_client::AuthorityAPI,
311        validator_tx_finalizer::ValidatorTxFinalizer,
312    };
313
314    #[derive(Clone)]
315    struct MockAuthorityClient {
316        authority: Arc<AuthorityState>,
317        inject_fault: Arc<AtomicBool>,
318    }
319
320    #[async_trait]
321    impl AuthorityAPI for MockAuthorityClient {
322        async fn handle_transaction(
323            &self,
324            transaction: Transaction,
325            _client_addr: Option<SocketAddr>,
326        ) -> Result<HandleTransactionResponse, IotaError> {
327            if self.inject_fault.load(Relaxed) {
328                return Err(IotaError::Timeout);
329            }
330            let epoch_store = self.authority.epoch_store_for_testing();
331            self.authority
332                .handle_transaction(
333                    &epoch_store,
334                    VerifiedTransaction::new_unchecked(transaction),
335                )
336                .await
337        }
338
339        async fn handle_certificate_v1(
340            &self,
341            request: HandleCertificateRequestV1,
342            _client_addr: Option<SocketAddr>,
343        ) -> Result<HandleCertificateResponseV1, IotaError> {
344            let epoch_store = self.authority.epoch_store_for_testing();
345            let (effects, _) = self
346                .authority
347                .try_execute_immediately(
348                    &VerifiedExecutableTransaction::new_from_certificate(
349                        VerifiedCertificate::new_unchecked(request.certificate),
350                    ),
351                    None,
352                    &epoch_store,
353                )
354                .await?;
355            let events = match effects.events_digest() {
356                None => TransactionEvents::default(),
357                Some(digest) => self.authority.get_transaction_events(digest)?,
358            };
359            let signed_effects = self
360                .authority
361                .sign_effects(effects, &epoch_store)?
362                .into_inner();
363            Ok(HandleCertificateResponseV1 {
364                signed_effects,
365                events: Some(events),
366                input_objects: None,
367                output_objects: None,
368                auxiliary_data: None,
369            })
370        }
371
372        async fn handle_soft_bundle_certificates_v1(
373            &self,
374            _request: HandleSoftBundleCertificatesRequestV1,
375            _client_addr: Option<SocketAddr>,
376        ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
377            unimplemented!()
378        }
379
380        async fn handle_object_info_request(
381            &self,
382            _request: ObjectInfoRequest,
383        ) -> Result<ObjectInfoResponse, IotaError> {
384            unimplemented!()
385        }
386
387        async fn handle_transaction_info_request(
388            &self,
389            _request: TransactionInfoRequest,
390        ) -> Result<TransactionInfoResponse, IotaError> {
391            unimplemented!()
392        }
393
394        async fn handle_checkpoint(
395            &self,
396            _request: CheckpointRequest,
397        ) -> Result<CheckpointResponse, IotaError> {
398            unimplemented!()
399        }
400
401        async fn handle_system_state_object(
402            &self,
403            _request: SystemStateRequest,
404        ) -> Result<IotaSystemState, IotaError> {
405            unimplemented!()
406        }
407    }
408
409    #[sim_test]
410    async fn test_validator_tx_finalizer_basic_flow() {
411        telemetry_subscribers::init_for_testing();
412        let (sender, keypair) = get_account_key_pair();
413        let gas_object = Object::with_owner_for_testing(sender);
414        let gas_object_id = gas_object.id();
415        let (states, auth_agg, clients) = create_validators(gas_object).await;
416        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
417        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
418        let tx_digest = *signed_tx.digest();
419        let cache_read = states[0].get_transaction_cache_reader().clone();
420        let metrics = finalizer1.metrics.clone();
421        let handle = tokio::spawn(async move {
422            finalizer1.track_signed_tx(cache_read, signed_tx).await;
423        });
424        handle.await.unwrap();
425        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
426        assert_eq!(
427            metrics.num_finalization_attempts_for_testing.load(Relaxed),
428            1
429        );
430        assert_eq!(
431            metrics
432                .num_successful_finalizations_for_testing
433                .load(Relaxed),
434            1
435        );
436    }
437
438    #[tokio::test]
439    async fn test_validator_tx_finalizer_new_epoch() {
440        let (sender, keypair) = get_account_key_pair();
441        let gas_object = Object::with_owner_for_testing(sender);
442        let gas_object_id = gas_object.id();
443        let (states, auth_agg, clients) = create_validators(gas_object).await;
444        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
445        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
446        let tx_digest = *signed_tx.digest();
447        let epoch_store = states[0].epoch_store_for_testing();
448        let cache_read = states[0].get_transaction_cache_reader().clone();
449
450        let metrics = finalizer1.metrics.clone();
451        let handle = tokio::spawn(async move {
452            let _ = epoch_store
453                .within_alive_epoch(finalizer1.track_signed_tx(cache_read, signed_tx))
454                .await;
455        });
456        states[0].reconfigure_for_testing().await;
457        handle.await.unwrap();
458        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
459        assert_eq!(
460            metrics.num_finalization_attempts_for_testing.load(Relaxed),
461            0
462        );
463        assert_eq!(
464            metrics
465                .num_successful_finalizations_for_testing
466                .load(Relaxed),
467            0
468        );
469    }
470
471    #[tokio::test]
472    async fn test_validator_tx_finalizer_auth_agg_reconfig() {
473        let (sender, _) = get_account_key_pair();
474        let gas_object = Object::with_owner_for_testing(sender);
475        let (states, auth_agg, _clients) = create_validators(gas_object).await;
476        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
477        let mut new_auth_agg = (**auth_agg.load()).clone();
478        let mut new_committee = (*new_auth_agg.committee).clone();
479        new_committee.epoch = 100;
480        new_auth_agg.committee = Arc::new(new_committee);
481        auth_agg.store(Arc::new(new_auth_agg));
482        assert_eq!(
483            finalizer1.auth_agg().load().committee.epoch,
484            100,
485            "AuthorityAggregator not updated"
486        );
487    }
488
489    #[tokio::test]
490    async fn test_validator_tx_finalizer_already_executed() {
491        telemetry_subscribers::init_for_testing();
492        let (sender, keypair) = get_account_key_pair();
493        let gas_object = Object::with_owner_for_testing(sender);
494        let gas_object_id = gas_object.id();
495        let (states, auth_agg, clients) = create_validators(gas_object).await;
496        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
497        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
498        let tx_digest = *signed_tx.digest();
499        let cache_read = states[0].get_transaction_cache_reader().clone();
500
501        let metrics = finalizer1.metrics.clone();
502        let signed_tx_clone = signed_tx.clone();
503        let handle = tokio::spawn(async move {
504            finalizer1
505                .track_signed_tx(cache_read, signed_tx_clone)
506                .await;
507        });
508        auth_agg
509            .load()
510            .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
511            .await
512            .unwrap();
513        handle.await.unwrap();
514        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
515        assert_eq!(
516            metrics.num_finalization_attempts_for_testing.load(Relaxed),
517            0
518        );
519        assert_eq!(
520            metrics
521                .num_successful_finalizations_for_testing
522                .load(Relaxed),
523            0
524        );
525    }
526
527    #[tokio::test]
528    async fn test_validator_tx_finalizer_timeout() {
529        telemetry_subscribers::init_for_testing();
530        let (sender, keypair) = get_account_key_pair();
531        let gas_object = Object::with_owner_for_testing(sender);
532        let gas_object_id = gas_object.id();
533        let (states, auth_agg, clients) = create_validators(gas_object).await;
534        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
535        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
536        let tx_digest = *signed_tx.digest();
537        let cache_read = states[0].get_transaction_cache_reader().clone();
538        for client in clients.values() {
539            client.inject_fault.store(true, Relaxed);
540        }
541
542        let metrics = finalizer1.metrics.clone();
543        let signed_tx_clone = signed_tx.clone();
544        let handle = tokio::spawn(async move {
545            finalizer1
546                .track_signed_tx(cache_read, signed_tx_clone)
547                .await;
548        });
549        handle.await.unwrap();
550        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
551        assert_eq!(
552            metrics.num_finalization_attempts_for_testing.load(Relaxed),
553            1
554        );
555        assert_eq!(
556            metrics
557                .num_successful_finalizations_for_testing
558                .load(Relaxed),
559            0
560        );
561    }
562
563    #[tokio::test]
564    async fn test_validator_tx_finalizer_determine_finalization_delay() {
565        const COMMITTEE_SIZE: usize = 15;
566        let network_config = ConfigBuilder::new_with_temp_dir()
567            .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
568            .build();
569        let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
570            .build_network_clients();
571        let auth_agg = Arc::new(auth_agg);
572        let finalizers = (0..COMMITTEE_SIZE)
573            .map(|idx| {
574                ValidatorTxFinalizer::new_for_testing(
575                    Arc::new(ArcSwap::new(auth_agg.clone())),
576                    auth_agg.committee.voting_rights[idx].0,
577                )
578            })
579            .collect::<Vec<_>>();
580        let config = finalizers[0].config.clone();
581        for _ in 0..100 {
582            let tx_digest = TransactionDigest::random();
583            let mut delays: Vec<_> = finalizers
584                .iter()
585                .map(|finalizer| {
586                    finalizer
587                        .determine_finalization_delay(&tx_digest)
588                        .map(|delay| delay.as_secs())
589                        .unwrap()
590                })
591                .collect();
592            delays.sort();
593            for (idx, delay) in delays.iter().enumerate() {
594                assert_eq!(
595                    *delay,
596                    min(
597                        config.validator_max_delay.as_secs(),
598                        config.tx_finalization_delay.as_secs()
599                            + idx as u64 * config.validator_delay_increments_sec
600                    )
601                );
602            }
603        }
604    }
605
606    async fn create_validators(
607        gas_object: Object,
608    ) -> (
609        Vec<Arc<AuthorityState>>,
610        Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
611        BTreeMap<AuthorityName, MockAuthorityClient>,
612    ) {
613        let network_config = ConfigBuilder::new_with_temp_dir()
614            .committee_size(NonZeroUsize::new(4).unwrap())
615            .with_objects(iter::once(gas_object))
616            .build();
617        let mut authority_states = vec![];
618        for idx in 0..4 {
619            let state = TestAuthorityBuilder::new()
620                .with_network_config(&network_config, idx)
621                .build()
622                .await;
623            authority_states.push(state);
624        }
625        let clients: BTreeMap<_, _> = authority_states
626            .iter()
627            .map(|state| {
628                (
629                    state.name,
630                    MockAuthorityClient {
631                        authority: state.clone(),
632                        inject_fault: Arc::new(AtomicBool::new(false)),
633                    },
634                )
635            })
636            .collect();
637        let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
638            .build_custom_clients(clients.clone());
639        (
640            authority_states,
641            Arc::new(ArcSwap::new(Arc::new(auth_agg))),
642            clients,
643        )
644    }
645
646    async fn create_tx(
647        clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
648        state: &Arc<AuthorityState>,
649        sender: IotaAddress,
650        keypair: &AccountKeyPair,
651        gas_object_id: ObjectID,
652    ) -> VerifiedSignedTransaction {
653        let gas_object_ref = state
654            .get_object(&gas_object_id)
655            .await
656            .unwrap()
657            .unwrap()
658            .compute_object_reference();
659        let tx_data = TestTransactionBuilder::new(
660            sender,
661            gas_object_ref,
662            state.reference_gas_price_for_testing().unwrap(),
663        )
664        .transfer_iota(None, sender)
665        .build();
666        let tx = to_sender_signed_transaction(tx_data, keypair);
667        let response = clients
668            .get(&state.name)
669            .unwrap()
670            .handle_transaction(tx.clone(), None)
671            .await
672            .unwrap();
673        VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
674            tx.into_data(),
675            response.status.into_signed_for_testing(),
676        ))
677    }
678
679    fn check_quorum_execution(
680        auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
681        clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
682        tx_digest: &TransactionDigest,
683        expected: bool,
684    ) {
685        let quorum = auth_agg.committee.quorum_threshold();
686        let executed_weight: StakeUnit = clients
687            .iter()
688            .filter_map(|(name, client)| {
689                client
690                    .authority
691                    .is_tx_already_executed(tx_digest)
692                    .unwrap()
693                    .then_some(auth_agg.committee.weight(name))
694            })
695            .sum();
696        assert_eq!(executed_weight >= quorum, expected);
697    }
698}