1#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12 base_types::{AuthorityName, TransactionDigest},
13 transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16 Histogram, IntCounter, Registry, register_histogram_with_registry,
17 register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23 authority::authority_per_epoch_store::AuthorityPerEpochStore,
24 authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
25 execution_cache::TransactionCacheRead,
26};
27
28struct ValidatorTxFinalizerMetrics {
29 num_finalization_attempts: IntCounter,
30 num_successful_finalizations: IntCounter,
31 finalization_latency: Histogram,
32 validator_tx_finalizer_attempt_delay: Histogram,
33 #[cfg(any(msim, test))]
34 num_finalization_attempts_for_testing: AtomicU64,
35 #[cfg(test)]
36 num_successful_finalizations_for_testing: AtomicU64,
37}
38
39impl ValidatorTxFinalizerMetrics {
40 fn new(registry: &Registry) -> Self {
41 Self {
42 num_finalization_attempts: register_int_counter_with_registry!(
43 "validator_tx_finalizer_num_finalization_attempts",
44 "Total number of attempts to finalize a transaction",
45 registry,
46 )
47 .unwrap(),
48 num_successful_finalizations: register_int_counter_with_registry!(
49 "validator_tx_finalizer_num_successful_finalizations",
50 "Number of transactions successfully finalized",
51 registry,
52 )
53 .unwrap(),
54 finalization_latency: register_histogram_with_registry!(
55 "validator_tx_finalizer_finalization_latency",
56 "Latency of transaction finalization",
57 LATENCY_SEC_BUCKETS.to_vec(),
58 registry,
59 )
60 .unwrap(),
61 validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
62 "validator_tx_finalizer_attempt_delay",
63 "Duration that a validator in the committee waited before attempting to finalize the transaction",
64 vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
65 registry,
66 )
67 .unwrap(),
68 #[cfg(any(msim, test))]
69 num_finalization_attempts_for_testing: AtomicU64::new(0),
70 #[cfg(test)]
71 num_successful_finalizations_for_testing: AtomicU64::new(0),
72 }
73 }
74
75 fn start_finalization(&self) -> Instant {
76 self.num_finalization_attempts.inc();
77 #[cfg(any(msim, test))]
78 self.num_finalization_attempts_for_testing
79 .fetch_add(1, Relaxed);
80 Instant::now()
81 }
82
83 fn finalization_succeeded(&self, start_time: Instant) {
84 let latency = start_time.elapsed();
85 self.num_successful_finalizations.inc();
86 self.finalization_latency.observe(latency.as_secs_f64());
87 #[cfg(test)]
88 self.num_successful_finalizations_for_testing
89 .fetch_add(1, Relaxed);
90 }
91}
92
93pub struct ValidatorTxFinalizerConfig {
94 pub tx_finalization_delay: Duration,
95 pub tx_finalization_timeout: Duration,
96 pub validator_delay_increments_sec: u64,
98 pub validator_max_delay: Duration,
99}
100
101#[cfg(not(any(msim, test)))]
102impl Default for ValidatorTxFinalizerConfig {
103 fn default() -> Self {
104 Self {
105 tx_finalization_delay: Duration::from_secs(60),
110 tx_finalization_timeout: Duration::from_secs(60),
112 validator_delay_increments_sec: 10,
113 validator_max_delay: Duration::from_secs(180),
114 }
115 }
116}
117
118#[cfg(any(msim, test))]
119impl Default for ValidatorTxFinalizerConfig {
120 fn default() -> Self {
121 Self {
122 tx_finalization_delay: Duration::from_secs(5),
123 tx_finalization_timeout: Duration::from_secs(60),
124 validator_delay_increments_sec: 1,
125 validator_max_delay: Duration::from_secs(15),
126 }
127 }
128}
129
130pub struct ValidatorTxFinalizer<C: Clone> {
135 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
136 name: AuthorityName,
137 config: Arc<ValidatorTxFinalizerConfig>,
138 metrics: Arc<ValidatorTxFinalizerMetrics>,
139}
140
141impl<C: Clone> ValidatorTxFinalizer<C> {
142 pub fn new(
143 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
144 name: AuthorityName,
145 registry: &Registry,
146 ) -> Self {
147 Self {
148 agg,
149 name,
150 config: Arc::new(ValidatorTxFinalizerConfig::default()),
151 metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
152 }
153 }
154
155 #[cfg(test)]
156 pub(crate) fn new_for_testing(
157 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
158 name: AuthorityName,
159 ) -> Self {
160 Self::new(agg, name, &Registry::new())
161 }
162
163 #[cfg(test)]
164 pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
165 &self.agg
166 }
167
168 #[cfg(any(msim, test))]
169 pub fn num_finalization_attempts_for_testing(&self) -> u64 {
170 self.metrics
171 .num_finalization_attempts_for_testing
172 .load(Relaxed)
173 }
174}
175
176impl<C> ValidatorTxFinalizer<C>
177where
178 C: Clone + AuthorityAPI + Send + Sync + 'static,
179{
180 pub async fn track_signed_tx(
181 &self,
182 cache_read: Arc<dyn TransactionCacheRead>,
183 epoch_store: &Arc<AuthorityPerEpochStore>,
184 tx: VerifiedSignedTransaction,
185 ) {
186 let tx_digest = *tx.digest();
187 trace!(?tx_digest, "Tracking signed transaction");
188 match self
189 .delay_and_finalize_tx(cache_read, epoch_store, tx)
190 .await
191 {
192 Ok(did_run) => {
193 if did_run {
194 debug!(?tx_digest, "Transaction finalized");
195 }
196 }
197 Err(err) => {
198 error!(?tx_digest, ?err, "Failed to finalize transaction");
199 }
200 }
201 }
202
203 async fn delay_and_finalize_tx(
204 &self,
205 cache_read: Arc<dyn TransactionCacheRead>,
206 epoch_store: &Arc<AuthorityPerEpochStore>,
207 tx: VerifiedSignedTransaction,
208 ) -> anyhow::Result<bool> {
209 let tx_digest = *tx.digest();
210 let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
211 return Ok(false);
212 };
213 let digests = [tx_digest];
214 select! {
215 _ = tokio::time::sleep(tx_finalization_delay) => {
216 trace!(?tx_digest, "Waking up to finalize transaction");
217 }
218 _ = cache_read.try_notify_read_executed_effects_digests(&digests) => {
219 trace!(?tx_digest, "Transaction already finalized");
220 return Ok(false);
221 }
222 }
223
224 if epoch_store.is_pending_consensus_certificate(&tx_digest) {
225 trace!(
226 ?tx_digest,
227 "Transaction has been submitted to consensus, no need to help drive finality"
228 );
229 return Ok(false);
230 }
231
232 self.metrics
233 .validator_tx_finalizer_attempt_delay
234 .observe(tx_finalization_delay.as_secs_f64());
235 let start_time = self.metrics.start_finalization();
236 debug!(
237 ?tx_digest,
238 "Invoking authority aggregator to finalize transaction"
239 );
240 tokio::time::timeout(
241 self.config.tx_finalization_timeout,
242 self.agg
243 .load()
244 .execute_transaction_block(tx.into_unsigned().inner(), None),
245 )
246 .await??;
247 self.metrics.finalization_succeeded(start_time);
248 Ok(true)
249 }
250
251 fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259 let agg = self.agg.load();
260 let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261 let Some(position) = order.iter().position(|&name| name == self.name) else {
262 error!("Validator {} not found in the committee", self.name);
266 return None;
267 };
268 let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
271 let delay = self
272 .config
273 .tx_finalization_delay
274 .add(Duration::from_secs(extra_delay));
275 Some(min(delay, self.config.validator_max_delay))
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::{
282 cmp::min,
283 collections::BTreeMap,
284 iter,
285 net::SocketAddr,
286 num::NonZeroUsize,
287 sync::{
288 Arc,
289 atomic::{AtomicBool, Ordering::Relaxed},
290 },
291 };
292
293 use arc_swap::ArcSwap;
294 use async_trait::async_trait;
295 use iota_macros::sim_test;
296 use iota_swarm_config::network_config_builder::ConfigBuilder;
297 use iota_test_transaction_builder::TestTransactionBuilder;
298 use iota_types::{
299 base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
300 committee::{CommitteeTrait, StakeUnit},
301 crypto::{AccountKeyPair, get_account_key_pair},
302 effects::{TransactionEffectsAPI, TransactionEvents},
303 error::IotaError,
304 executable_transaction::VerifiedExecutableTransaction,
305 iota_system_state::IotaSystemState,
306 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
307 messages_grpc::{
308 HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
309 HandleCertificateRequestV1, HandleCertificateResponseV1,
310 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
311 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
312 TransactionInfoRequest, TransactionInfoResponse,
313 },
314 object::Object,
315 transaction::{
316 SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
317 VerifiedTransaction,
318 },
319 utils::to_sender_signed_transaction,
320 };
321
322 use crate::{
323 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
324 authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
325 authority_client::AuthorityAPI,
326 validator_tx_finalizer::ValidatorTxFinalizer,
327 };
328
329 #[derive(Clone)]
330 struct MockAuthorityClient {
331 authority: Arc<AuthorityState>,
332 inject_fault: Arc<AtomicBool>,
333 }
334
335 #[async_trait]
336 impl AuthorityAPI for MockAuthorityClient {
337 async fn handle_transaction(
338 &self,
339 transaction: Transaction,
340 _client_addr: Option<SocketAddr>,
341 ) -> Result<HandleTransactionResponse, IotaError> {
342 if self.inject_fault.load(Relaxed) {
343 return Err(IotaError::Timeout);
344 }
345 let epoch_store = self.authority.epoch_store_for_testing();
346 self.authority
347 .handle_transaction(
348 &epoch_store,
349 VerifiedTransaction::new_unchecked(transaction),
350 )
351 .await
352 }
353
354 async fn handle_certificate_v1(
355 &self,
356 request: HandleCertificateRequestV1,
357 _client_addr: Option<SocketAddr>,
358 ) -> Result<HandleCertificateResponseV1, IotaError> {
359 let epoch_store = self.authority.epoch_store_for_testing();
360 let (effects, _) = self.authority.try_execute_immediately(
361 &VerifiedExecutableTransaction::new_from_certificate(
362 VerifiedCertificate::new_unchecked(request.certificate),
363 ),
364 None,
365 &epoch_store,
366 )?;
367 let events = if effects.events_digest().is_some() {
368 self.authority
369 .get_transaction_events(effects.transaction_digest())?
370 } else {
371 TransactionEvents::default()
372 };
373 let signed_effects = self
374 .authority
375 .sign_effects(effects, &epoch_store)?
376 .into_inner();
377 Ok(HandleCertificateResponseV1 {
378 signed_effects,
379 events: Some(events),
380 input_objects: None,
381 output_objects: None,
382 auxiliary_data: None,
383 })
384 }
385
386 async fn handle_soft_bundle_certificates_v1(
387 &self,
388 _request: HandleSoftBundleCertificatesRequestV1,
389 _client_addr: Option<SocketAddr>,
390 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
391 unimplemented!()
392 }
393
394 async fn handle_object_info_request(
395 &self,
396 _request: ObjectInfoRequest,
397 ) -> Result<ObjectInfoResponse, IotaError> {
398 unimplemented!()
399 }
400
401 async fn handle_transaction_info_request(
402 &self,
403 _request: TransactionInfoRequest,
404 ) -> Result<TransactionInfoResponse, IotaError> {
405 unimplemented!()
406 }
407
408 async fn handle_checkpoint(
409 &self,
410 _request: CheckpointRequest,
411 ) -> Result<CheckpointResponse, IotaError> {
412 unimplemented!()
413 }
414
415 async fn handle_system_state_object(
416 &self,
417 _request: SystemStateRequest,
418 ) -> Result<IotaSystemState, IotaError> {
419 unimplemented!()
420 }
421
422 async fn handle_capability_notification_v1(
423 &self,
424 _request: HandleCapabilityNotificationRequestV1,
425 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
426 unimplemented!()
427 }
428 }
429
430 #[sim_test]
431 async fn test_validator_tx_finalizer_basic_flow() {
432 telemetry_subscribers::init_for_testing();
433 let (sender, keypair) = get_account_key_pair();
434 let gas_object = Object::with_owner_for_testing(sender);
435 let gas_object_id = gas_object.id();
436 let (states, auth_agg, clients) = create_validators(gas_object).await;
437 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
438 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
439 let tx_digest = *signed_tx.digest();
440 let cache_read = states[0].get_transaction_cache_reader().clone();
441 let epoch_store = states[0].epoch_store_for_testing();
442 let metrics = finalizer1.metrics.clone();
443 let handle = tokio::spawn(async move {
444 finalizer1
445 .track_signed_tx(cache_read, &epoch_store, signed_tx)
446 .await;
447 });
448 handle.await.unwrap();
449 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
450 assert_eq!(
451 metrics.num_finalization_attempts_for_testing.load(Relaxed),
452 1
453 );
454 assert_eq!(
455 metrics
456 .num_successful_finalizations_for_testing
457 .load(Relaxed),
458 1
459 );
460 }
461
462 #[tokio::test]
463 async fn test_validator_tx_finalizer_new_epoch() {
464 let (sender, keypair) = get_account_key_pair();
465 let gas_object = Object::with_owner_for_testing(sender);
466 let gas_object_id = gas_object.id();
467 let (states, auth_agg, clients) = create_validators(gas_object).await;
468 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
469 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
470 let tx_digest = *signed_tx.digest();
471 let epoch_store = states[0].epoch_store_for_testing();
472 let cache_read = states[0].get_transaction_cache_reader().clone();
473
474 let metrics = finalizer1.metrics.clone();
475 let handle = tokio::spawn(async move {
476 let _ = epoch_store
477 .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
478 .await;
479 });
480 states[0].reconfigure_for_testing().await;
481 handle.await.unwrap();
482 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
483 assert_eq!(
484 metrics.num_finalization_attempts_for_testing.load(Relaxed),
485 0
486 );
487 assert_eq!(
488 metrics
489 .num_successful_finalizations_for_testing
490 .load(Relaxed),
491 0
492 );
493 }
494
495 #[tokio::test]
496 async fn test_validator_tx_finalizer_auth_agg_reconfig() {
497 let (sender, _) = get_account_key_pair();
498 let gas_object = Object::with_owner_for_testing(sender);
499 let (states, auth_agg, _clients) = create_validators(gas_object).await;
500 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
501 let mut new_auth_agg = (**auth_agg.load()).clone();
502 let mut new_committee = (*new_auth_agg.committee).clone();
503 new_committee.epoch = 100;
504 new_auth_agg.committee = Arc::new(new_committee);
505 auth_agg.store(Arc::new(new_auth_agg));
506 assert_eq!(
507 finalizer1.auth_agg().load().committee.epoch,
508 100,
509 "AuthorityAggregator not updated"
510 );
511 }
512
513 #[tokio::test]
514 async fn test_validator_tx_finalizer_already_executed() {
515 telemetry_subscribers::init_for_testing();
516 let (sender, keypair) = get_account_key_pair();
517 let gas_object = Object::with_owner_for_testing(sender);
518 let gas_object_id = gas_object.id();
519 let (states, auth_agg, clients) = create_validators(gas_object).await;
520 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
521 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
522 let tx_digest = *signed_tx.digest();
523 let cache_read = states[0].get_transaction_cache_reader().clone();
524 let epoch_store = states[0].epoch_store_for_testing();
525
526 let metrics = finalizer1.metrics.clone();
527 let signed_tx_clone = signed_tx.clone();
528 let handle = tokio::spawn(async move {
529 finalizer1
530 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
531 .await;
532 });
533 auth_agg
534 .load()
535 .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
536 .await
537 .unwrap();
538 handle.await.unwrap();
539 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
540 assert_eq!(
541 metrics.num_finalization_attempts_for_testing.load(Relaxed),
542 0
543 );
544 assert_eq!(
545 metrics
546 .num_successful_finalizations_for_testing
547 .load(Relaxed),
548 0
549 );
550 }
551
552 #[tokio::test]
553 async fn test_validator_tx_finalizer_timeout() {
554 telemetry_subscribers::init_for_testing();
555 let (sender, keypair) = get_account_key_pair();
556 let gas_object = Object::with_owner_for_testing(sender);
557 let gas_object_id = gas_object.id();
558 let (states, auth_agg, clients) = create_validators(gas_object).await;
559 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
560 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
561 let tx_digest = *signed_tx.digest();
562 let cache_read = states[0].get_transaction_cache_reader().clone();
563 let epoch_store = states[0].epoch_store_for_testing();
564 for client in clients.values() {
565 client.inject_fault.store(true, Relaxed);
566 }
567
568 let metrics = finalizer1.metrics.clone();
569 let signed_tx_clone = signed_tx.clone();
570 let handle = tokio::spawn(async move {
571 finalizer1
572 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
573 .await;
574 });
575 handle.await.unwrap();
576 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
577 assert_eq!(
578 metrics.num_finalization_attempts_for_testing.load(Relaxed),
579 1
580 );
581 assert_eq!(
582 metrics
583 .num_successful_finalizations_for_testing
584 .load(Relaxed),
585 0
586 );
587 }
588
589 #[tokio::test]
590 async fn test_validator_tx_finalizer_determine_finalization_delay() {
591 const COMMITTEE_SIZE: usize = 15;
592 let network_config = ConfigBuilder::new_with_temp_dir()
593 .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
594 .build();
595 let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
596 .build_network_clients();
597 let auth_agg = Arc::new(auth_agg);
598 let finalizers = (0..COMMITTEE_SIZE)
599 .map(|idx| {
600 ValidatorTxFinalizer::new_for_testing(
601 Arc::new(ArcSwap::new(auth_agg.clone())),
602 auth_agg.committee.voting_rights[idx].0,
603 )
604 })
605 .collect::<Vec<_>>();
606 let config = finalizers[0].config.clone();
607 for _ in 0..100 {
608 let tx_digest = TransactionDigest::random();
609 let mut delays: Vec<_> = finalizers
610 .iter()
611 .map(|finalizer| {
612 finalizer
613 .determine_finalization_delay(&tx_digest)
614 .map(|delay| delay.as_secs())
615 .unwrap()
616 })
617 .collect();
618 delays.sort();
619 for (idx, delay) in delays.iter().enumerate() {
620 assert_eq!(
621 *delay,
622 min(
623 config.validator_max_delay.as_secs(),
624 config.tx_finalization_delay.as_secs()
625 + idx as u64 * config.validator_delay_increments_sec
626 )
627 );
628 }
629 }
630 }
631
632 async fn create_validators(
633 gas_object: Object,
634 ) -> (
635 Vec<Arc<AuthorityState>>,
636 Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
637 BTreeMap<AuthorityName, MockAuthorityClient>,
638 ) {
639 let network_config = ConfigBuilder::new_with_temp_dir()
640 .committee_size(NonZeroUsize::new(4).unwrap())
641 .with_objects(iter::once(gas_object))
642 .build();
643 let mut authority_states = vec![];
644 for idx in 0..4 {
645 let state = TestAuthorityBuilder::new()
646 .with_network_config(&network_config, idx)
647 .build()
648 .await;
649 authority_states.push(state);
650 }
651 let clients: BTreeMap<_, _> = authority_states
652 .iter()
653 .map(|state| {
654 (
655 state.name,
656 MockAuthorityClient {
657 authority: state.clone(),
658 inject_fault: Arc::new(AtomicBool::new(false)),
659 },
660 )
661 })
662 .collect();
663 let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
664 .build_custom_clients(clients.clone());
665 (
666 authority_states,
667 Arc::new(ArcSwap::new(Arc::new(auth_agg))),
668 clients,
669 )
670 }
671
672 async fn create_tx(
673 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
674 state: &Arc<AuthorityState>,
675 sender: IotaAddress,
676 keypair: &AccountKeyPair,
677 gas_object_id: ObjectID,
678 ) -> VerifiedSignedTransaction {
679 let gas_object_ref = state
680 .get_object(&gas_object_id)
681 .await
682 .unwrap()
683 .compute_object_reference();
684 let tx_data = TestTransactionBuilder::new(
685 sender,
686 gas_object_ref,
687 state.reference_gas_price_for_testing().unwrap(),
688 )
689 .transfer_iota(None, sender)
690 .build();
691 let tx = to_sender_signed_transaction(tx_data, keypair);
692 let response = clients
693 .get(&state.name)
694 .unwrap()
695 .handle_transaction(tx.clone(), None)
696 .await
697 .unwrap();
698 VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
699 tx.into_data(),
700 response.status.into_signed_for_testing(),
701 ))
702 }
703
704 fn check_quorum_execution(
705 auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
706 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
707 tx_digest: &TransactionDigest,
708 expected: bool,
709 ) {
710 let quorum = auth_agg.committee.quorum_threshold();
711 let executed_weight: StakeUnit = clients
712 .iter()
713 .filter_map(|(name, client)| {
714 client
715 .authority
716 .is_tx_already_executed(tx_digest)
717 .then_some(auth_agg.committee.weight(name))
718 })
719 .sum();
720 assert_eq!(executed_weight >= quorum, expected);
721 }
722}