1#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12 base_types::{AuthorityName, TransactionDigest},
13 transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16 Histogram, IntCounter, Registry, register_histogram_with_registry,
17 register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23 authority::authority_per_epoch_store::AuthorityPerEpochStore,
24 authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
25 execution_cache::TransactionCacheRead,
26};
27
28struct ValidatorTxFinalizerMetrics {
29 num_finalization_attempts: IntCounter,
30 num_successful_finalizations: IntCounter,
31 finalization_latency: Histogram,
32 validator_tx_finalizer_attempt_delay: Histogram,
33 #[cfg(any(msim, test))]
34 num_finalization_attempts_for_testing: AtomicU64,
35 #[cfg(test)]
36 num_successful_finalizations_for_testing: AtomicU64,
37}
38
39impl ValidatorTxFinalizerMetrics {
40 fn new(registry: &Registry) -> Self {
41 Self {
42 num_finalization_attempts: register_int_counter_with_registry!(
43 "validator_tx_finalizer_num_finalization_attempts",
44 "Total number of attempts to finalize a transaction",
45 registry,
46 )
47 .unwrap(),
48 num_successful_finalizations: register_int_counter_with_registry!(
49 "validator_tx_finalizer_num_successful_finalizations",
50 "Number of transactions successfully finalized",
51 registry,
52 )
53 .unwrap(),
54 finalization_latency: register_histogram_with_registry!(
55 "validator_tx_finalizer_finalization_latency",
56 "Latency of transaction finalization",
57 LATENCY_SEC_BUCKETS.to_vec(),
58 registry,
59 )
60 .unwrap(),
61 validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
62 "validator_tx_finalizer_attempt_delay",
63 "Duration that a validator in the committee waited before attempting to finalize the transaction",
64 vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
65 registry,
66 )
67 .unwrap(),
68 #[cfg(any(msim, test))]
69 num_finalization_attempts_for_testing: AtomicU64::new(0),
70 #[cfg(test)]
71 num_successful_finalizations_for_testing: AtomicU64::new(0),
72 }
73 }
74
75 fn start_finalization(&self) -> Instant {
76 self.num_finalization_attempts.inc();
77 #[cfg(any(msim, test))]
78 self.num_finalization_attempts_for_testing
79 .fetch_add(1, Relaxed);
80 Instant::now()
81 }
82
83 fn finalization_succeeded(&self, start_time: Instant) {
84 let latency = start_time.elapsed();
85 self.num_successful_finalizations.inc();
86 self.finalization_latency.observe(latency.as_secs_f64());
87 #[cfg(test)]
88 self.num_successful_finalizations_for_testing
89 .fetch_add(1, Relaxed);
90 }
91}
92
93pub struct ValidatorTxFinalizerConfig {
94 pub tx_finalization_delay: Duration,
95 pub tx_finalization_timeout: Duration,
96 pub validator_delay_increments_sec: u64,
98 pub validator_max_delay: Duration,
99}
100
101#[cfg(not(any(msim, test)))]
102impl Default for ValidatorTxFinalizerConfig {
103 fn default() -> Self {
104 Self {
105 tx_finalization_delay: Duration::from_secs(60),
110 tx_finalization_timeout: Duration::from_secs(60),
112 validator_delay_increments_sec: 10,
113 validator_max_delay: Duration::from_secs(180),
114 }
115 }
116}
117
118#[cfg(any(msim, test))]
119impl Default for ValidatorTxFinalizerConfig {
120 fn default() -> Self {
121 Self {
122 tx_finalization_delay: Duration::from_secs(5),
123 tx_finalization_timeout: Duration::from_secs(60),
124 validator_delay_increments_sec: 1,
125 validator_max_delay: Duration::from_secs(15),
126 }
127 }
128}
129
130pub struct ValidatorTxFinalizer<C: Clone> {
135 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
136 name: AuthorityName,
137 config: Arc<ValidatorTxFinalizerConfig>,
138 metrics: Arc<ValidatorTxFinalizerMetrics>,
139}
140
141impl<C: Clone> ValidatorTxFinalizer<C> {
142 pub fn new(
143 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
144 name: AuthorityName,
145 registry: &Registry,
146 ) -> Self {
147 Self {
148 agg,
149 name,
150 config: Arc::new(ValidatorTxFinalizerConfig::default()),
151 metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
152 }
153 }
154
155 #[cfg(test)]
156 pub(crate) fn new_for_testing(
157 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
158 name: AuthorityName,
159 ) -> Self {
160 Self::new(agg, name, &Registry::new())
161 }
162
163 #[cfg(test)]
164 pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
165 &self.agg
166 }
167
168 #[cfg(any(msim, test))]
169 pub fn num_finalization_attempts_for_testing(&self) -> u64 {
170 self.metrics
171 .num_finalization_attempts_for_testing
172 .load(Relaxed)
173 }
174}
175
176impl<C> ValidatorTxFinalizer<C>
177where
178 C: Clone + AuthorityAPI + Send + Sync + 'static,
179{
180 pub async fn track_signed_tx(
181 &self,
182 cache_read: Arc<dyn TransactionCacheRead>,
183 epoch_store: &Arc<AuthorityPerEpochStore>,
184 tx: VerifiedSignedTransaction,
185 ) {
186 let tx_digest = *tx.digest();
187 trace!(?tx_digest, "Tracking signed transaction");
188 match self
189 .delay_and_finalize_tx(cache_read, epoch_store, tx)
190 .await
191 {
192 Ok(did_run) => {
193 if did_run {
194 debug!(?tx_digest, "Transaction finalized");
195 }
196 }
197 Err(err) => {
198 error!(?tx_digest, ?err, "Failed to finalize transaction");
199 }
200 }
201 }
202
203 async fn delay_and_finalize_tx(
204 &self,
205 cache_read: Arc<dyn TransactionCacheRead>,
206 epoch_store: &Arc<AuthorityPerEpochStore>,
207 tx: VerifiedSignedTransaction,
208 ) -> anyhow::Result<bool> {
209 let tx_digest = *tx.digest();
210 let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
211 return Ok(false);
212 };
213 let digests = [tx_digest];
214 select! {
215 _ = tokio::time::sleep(tx_finalization_delay) => {
216 trace!(?tx_digest, "Waking up to finalize transaction");
217 }
218 _ = cache_read.try_notify_read_executed_effects_digests(&digests) => {
219 trace!(?tx_digest, "Transaction already finalized");
220 return Ok(false);
221 }
222 }
223
224 if epoch_store.is_pending_consensus_certificate(&tx_digest) {
225 trace!(
226 ?tx_digest,
227 "Transaction has been submitted to consensus, no need to help drive finality"
228 );
229 return Ok(false);
230 }
231
232 self.metrics
233 .validator_tx_finalizer_attempt_delay
234 .observe(tx_finalization_delay.as_secs_f64());
235 let start_time = self.metrics.start_finalization();
236 debug!(
237 ?tx_digest,
238 "Invoking authority aggregator to finalize transaction"
239 );
240 tokio::time::timeout(
241 self.config.tx_finalization_timeout,
242 self.agg
243 .load()
244 .execute_transaction_block(tx.into_unsigned().inner(), None),
245 )
246 .await??;
247 self.metrics.finalization_succeeded(start_time);
248 Ok(true)
249 }
250
251 fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259 let agg = self.agg.load();
260 let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261 let Some(position) = order.iter().position(|&name| name == self.name) else {
262 error!("Validator {} not found in the committee", self.name);
266 return None;
267 };
268 let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
271 let delay = self
272 .config
273 .tx_finalization_delay
274 .add(Duration::from_secs(extra_delay));
275 Some(min(delay, self.config.validator_max_delay))
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::{
282 cmp::min,
283 collections::BTreeMap,
284 iter,
285 net::SocketAddr,
286 num::NonZeroUsize,
287 sync::{
288 Arc,
289 atomic::{AtomicBool, Ordering::Relaxed},
290 },
291 };
292
293 use arc_swap::ArcSwap;
294 use async_trait::async_trait;
295 use iota_macros::sim_test;
296 use iota_swarm_config::network_config_builder::ConfigBuilder;
297 use iota_test_transaction_builder::TestTransactionBuilder;
298 use iota_types::{
299 base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
300 committee::{CommitteeTrait, StakeUnit},
301 crypto::{AccountKeyPair, get_account_key_pair},
302 effects::{TransactionEffectsAPI, TransactionEvents},
303 error::IotaError,
304 executable_transaction::VerifiedExecutableTransaction,
305 iota_system_state::IotaSystemState,
306 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
307 messages_grpc::{
308 HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
309 HandleCertificateRequestV1, HandleCertificateResponseV1,
310 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
311 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
312 TransactionInfoRequest, TransactionInfoResponse,
313 },
314 object::Object,
315 transaction::{
316 SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
317 VerifiedTransaction,
318 },
319 utils::to_sender_signed_transaction,
320 };
321
322 use crate::{
323 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
324 authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
325 authority_client::AuthorityAPI,
326 validator_tx_finalizer::ValidatorTxFinalizer,
327 };
328
329 #[derive(Clone)]
330 struct MockAuthorityClient {
331 authority: Arc<AuthorityState>,
332 inject_fault: Arc<AtomicBool>,
333 }
334
335 #[async_trait]
336 impl AuthorityAPI for MockAuthorityClient {
337 async fn handle_transaction(
338 &self,
339 transaction: Transaction,
340 _client_addr: Option<SocketAddr>,
341 ) -> Result<HandleTransactionResponse, IotaError> {
342 if self.inject_fault.load(Relaxed) {
343 return Err(IotaError::Timeout);
344 }
345 let epoch_store = self.authority.epoch_store_for_testing();
346 self.authority
347 .handle_transaction(
348 &epoch_store,
349 VerifiedTransaction::new_unchecked(transaction),
350 )
351 .await
352 }
353
354 async fn handle_certificate_v1(
355 &self,
356 request: HandleCertificateRequestV1,
357 _client_addr: Option<SocketAddr>,
358 ) -> Result<HandleCertificateResponseV1, IotaError> {
359 let epoch_store = self.authority.epoch_store_for_testing();
360 let (effects, _) = self.authority.try_execute_immediately(
361 &VerifiedExecutableTransaction::new_from_certificate(
362 VerifiedCertificate::new_unchecked(request.certificate),
363 ),
364 None,
365 &epoch_store,
366 )?;
367 let events = match effects.events_digest() {
368 None => TransactionEvents::default(),
369 Some(digest) => self.authority.get_transaction_events(digest)?,
370 };
371 let signed_effects = self
372 .authority
373 .sign_effects(effects, &epoch_store)?
374 .into_inner();
375 Ok(HandleCertificateResponseV1 {
376 signed_effects,
377 events: Some(events),
378 input_objects: None,
379 output_objects: None,
380 auxiliary_data: None,
381 })
382 }
383
384 async fn handle_soft_bundle_certificates_v1(
385 &self,
386 _request: HandleSoftBundleCertificatesRequestV1,
387 _client_addr: Option<SocketAddr>,
388 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
389 unimplemented!()
390 }
391
392 async fn handle_object_info_request(
393 &self,
394 _request: ObjectInfoRequest,
395 ) -> Result<ObjectInfoResponse, IotaError> {
396 unimplemented!()
397 }
398
399 async fn handle_transaction_info_request(
400 &self,
401 _request: TransactionInfoRequest,
402 ) -> Result<TransactionInfoResponse, IotaError> {
403 unimplemented!()
404 }
405
406 async fn handle_checkpoint(
407 &self,
408 _request: CheckpointRequest,
409 ) -> Result<CheckpointResponse, IotaError> {
410 unimplemented!()
411 }
412
413 async fn handle_system_state_object(
414 &self,
415 _request: SystemStateRequest,
416 ) -> Result<IotaSystemState, IotaError> {
417 unimplemented!()
418 }
419
420 async fn handle_capability_notification_v1(
421 &self,
422 _request: HandleCapabilityNotificationRequestV1,
423 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
424 unimplemented!()
425 }
426 }
427
428 #[sim_test]
429 async fn test_validator_tx_finalizer_basic_flow() {
430 telemetry_subscribers::init_for_testing();
431 let (sender, keypair) = get_account_key_pair();
432 let gas_object = Object::with_owner_for_testing(sender);
433 let gas_object_id = gas_object.id();
434 let (states, auth_agg, clients) = create_validators(gas_object).await;
435 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
436 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
437 let tx_digest = *signed_tx.digest();
438 let cache_read = states[0].get_transaction_cache_reader().clone();
439 let epoch_store = states[0].epoch_store_for_testing();
440 let metrics = finalizer1.metrics.clone();
441 let handle = tokio::spawn(async move {
442 finalizer1
443 .track_signed_tx(cache_read, &epoch_store, signed_tx)
444 .await;
445 });
446 handle.await.unwrap();
447 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
448 assert_eq!(
449 metrics.num_finalization_attempts_for_testing.load(Relaxed),
450 1
451 );
452 assert_eq!(
453 metrics
454 .num_successful_finalizations_for_testing
455 .load(Relaxed),
456 1
457 );
458 }
459
460 #[tokio::test]
461 async fn test_validator_tx_finalizer_new_epoch() {
462 let (sender, keypair) = get_account_key_pair();
463 let gas_object = Object::with_owner_for_testing(sender);
464 let gas_object_id = gas_object.id();
465 let (states, auth_agg, clients) = create_validators(gas_object).await;
466 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
467 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
468 let tx_digest = *signed_tx.digest();
469 let epoch_store = states[0].epoch_store_for_testing();
470 let cache_read = states[0].get_transaction_cache_reader().clone();
471
472 let metrics = finalizer1.metrics.clone();
473 let handle = tokio::spawn(async move {
474 let _ = epoch_store
475 .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
476 .await;
477 });
478 states[0].reconfigure_for_testing().await;
479 handle.await.unwrap();
480 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
481 assert_eq!(
482 metrics.num_finalization_attempts_for_testing.load(Relaxed),
483 0
484 );
485 assert_eq!(
486 metrics
487 .num_successful_finalizations_for_testing
488 .load(Relaxed),
489 0
490 );
491 }
492
493 #[tokio::test]
494 async fn test_validator_tx_finalizer_auth_agg_reconfig() {
495 let (sender, _) = get_account_key_pair();
496 let gas_object = Object::with_owner_for_testing(sender);
497 let (states, auth_agg, _clients) = create_validators(gas_object).await;
498 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
499 let mut new_auth_agg = (**auth_agg.load()).clone();
500 let mut new_committee = (*new_auth_agg.committee).clone();
501 new_committee.epoch = 100;
502 new_auth_agg.committee = Arc::new(new_committee);
503 auth_agg.store(Arc::new(new_auth_agg));
504 assert_eq!(
505 finalizer1.auth_agg().load().committee.epoch,
506 100,
507 "AuthorityAggregator not updated"
508 );
509 }
510
511 #[tokio::test]
512 async fn test_validator_tx_finalizer_already_executed() {
513 telemetry_subscribers::init_for_testing();
514 let (sender, keypair) = get_account_key_pair();
515 let gas_object = Object::with_owner_for_testing(sender);
516 let gas_object_id = gas_object.id();
517 let (states, auth_agg, clients) = create_validators(gas_object).await;
518 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
519 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
520 let tx_digest = *signed_tx.digest();
521 let cache_read = states[0].get_transaction_cache_reader().clone();
522 let epoch_store = states[0].epoch_store_for_testing();
523
524 let metrics = finalizer1.metrics.clone();
525 let signed_tx_clone = signed_tx.clone();
526 let handle = tokio::spawn(async move {
527 finalizer1
528 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
529 .await;
530 });
531 auth_agg
532 .load()
533 .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
534 .await
535 .unwrap();
536 handle.await.unwrap();
537 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
538 assert_eq!(
539 metrics.num_finalization_attempts_for_testing.load(Relaxed),
540 0
541 );
542 assert_eq!(
543 metrics
544 .num_successful_finalizations_for_testing
545 .load(Relaxed),
546 0
547 );
548 }
549
550 #[tokio::test]
551 async fn test_validator_tx_finalizer_timeout() {
552 telemetry_subscribers::init_for_testing();
553 let (sender, keypair) = get_account_key_pair();
554 let gas_object = Object::with_owner_for_testing(sender);
555 let gas_object_id = gas_object.id();
556 let (states, auth_agg, clients) = create_validators(gas_object).await;
557 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
558 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
559 let tx_digest = *signed_tx.digest();
560 let cache_read = states[0].get_transaction_cache_reader().clone();
561 let epoch_store = states[0].epoch_store_for_testing();
562 for client in clients.values() {
563 client.inject_fault.store(true, Relaxed);
564 }
565
566 let metrics = finalizer1.metrics.clone();
567 let signed_tx_clone = signed_tx.clone();
568 let handle = tokio::spawn(async move {
569 finalizer1
570 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
571 .await;
572 });
573 handle.await.unwrap();
574 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
575 assert_eq!(
576 metrics.num_finalization_attempts_for_testing.load(Relaxed),
577 1
578 );
579 assert_eq!(
580 metrics
581 .num_successful_finalizations_for_testing
582 .load(Relaxed),
583 0
584 );
585 }
586
587 #[tokio::test]
588 async fn test_validator_tx_finalizer_determine_finalization_delay() {
589 const COMMITTEE_SIZE: usize = 15;
590 let network_config = ConfigBuilder::new_with_temp_dir()
591 .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
592 .build();
593 let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
594 .build_network_clients();
595 let auth_agg = Arc::new(auth_agg);
596 let finalizers = (0..COMMITTEE_SIZE)
597 .map(|idx| {
598 ValidatorTxFinalizer::new_for_testing(
599 Arc::new(ArcSwap::new(auth_agg.clone())),
600 auth_agg.committee.voting_rights[idx].0,
601 )
602 })
603 .collect::<Vec<_>>();
604 let config = finalizers[0].config.clone();
605 for _ in 0..100 {
606 let tx_digest = TransactionDigest::random();
607 let mut delays: Vec<_> = finalizers
608 .iter()
609 .map(|finalizer| {
610 finalizer
611 .determine_finalization_delay(&tx_digest)
612 .map(|delay| delay.as_secs())
613 .unwrap()
614 })
615 .collect();
616 delays.sort();
617 for (idx, delay) in delays.iter().enumerate() {
618 assert_eq!(
619 *delay,
620 min(
621 config.validator_max_delay.as_secs(),
622 config.tx_finalization_delay.as_secs()
623 + idx as u64 * config.validator_delay_increments_sec
624 )
625 );
626 }
627 }
628 }
629
630 async fn create_validators(
631 gas_object: Object,
632 ) -> (
633 Vec<Arc<AuthorityState>>,
634 Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
635 BTreeMap<AuthorityName, MockAuthorityClient>,
636 ) {
637 let network_config = ConfigBuilder::new_with_temp_dir()
638 .committee_size(NonZeroUsize::new(4).unwrap())
639 .with_objects(iter::once(gas_object))
640 .build();
641 let mut authority_states = vec![];
642 for idx in 0..4 {
643 let state = TestAuthorityBuilder::new()
644 .with_network_config(&network_config, idx)
645 .build()
646 .await;
647 authority_states.push(state);
648 }
649 let clients: BTreeMap<_, _> = authority_states
650 .iter()
651 .map(|state| {
652 (
653 state.name,
654 MockAuthorityClient {
655 authority: state.clone(),
656 inject_fault: Arc::new(AtomicBool::new(false)),
657 },
658 )
659 })
660 .collect();
661 let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
662 .build_custom_clients(clients.clone());
663 (
664 authority_states,
665 Arc::new(ArcSwap::new(Arc::new(auth_agg))),
666 clients,
667 )
668 }
669
670 async fn create_tx(
671 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
672 state: &Arc<AuthorityState>,
673 sender: IotaAddress,
674 keypair: &AccountKeyPair,
675 gas_object_id: ObjectID,
676 ) -> VerifiedSignedTransaction {
677 let gas_object_ref = state
678 .get_object(&gas_object_id)
679 .await
680 .unwrap()
681 .compute_object_reference();
682 let tx_data = TestTransactionBuilder::new(
683 sender,
684 gas_object_ref,
685 state.reference_gas_price_for_testing().unwrap(),
686 )
687 .transfer_iota(None, sender)
688 .build();
689 let tx = to_sender_signed_transaction(tx_data, keypair);
690 let response = clients
691 .get(&state.name)
692 .unwrap()
693 .handle_transaction(tx.clone(), None)
694 .await
695 .unwrap();
696 VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
697 tx.into_data(),
698 response.status.into_signed_for_testing(),
699 ))
700 }
701
702 fn check_quorum_execution(
703 auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
704 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
705 tx_digest: &TransactionDigest,
706 expected: bool,
707 ) {
708 let quorum = auth_agg.committee.quorum_threshold();
709 let executed_weight: StakeUnit = clients
710 .iter()
711 .filter_map(|(name, client)| {
712 client
713 .authority
714 .is_tx_already_executed(tx_digest)
715 .then_some(auth_agg.committee.weight(name))
716 })
717 .sum();
718 assert_eq!(executed_weight >= quorum, expected);
719 }
720}