1#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12 base_types::{AuthorityName, TransactionDigest},
13 transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16 Histogram, IntCounter, Registry, register_histogram_with_registry,
17 register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23 authority::authority_per_epoch_store::AuthorityPerEpochStore,
24 authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
25 execution_cache::TransactionCacheRead,
26};
27
28struct ValidatorTxFinalizerMetrics {
29 num_finalization_attempts: IntCounter,
30 num_successful_finalizations: IntCounter,
31 finalization_latency: Histogram,
32 validator_tx_finalizer_attempt_delay: Histogram,
33 #[cfg(any(msim, test))]
34 num_finalization_attempts_for_testing: AtomicU64,
35 #[cfg(test)]
36 num_successful_finalizations_for_testing: AtomicU64,
37}
38
39impl ValidatorTxFinalizerMetrics {
40 fn new(registry: &Registry) -> Self {
41 Self {
42 num_finalization_attempts: register_int_counter_with_registry!(
43 "validator_tx_finalizer_num_finalization_attempts",
44 "Total number of attempts to finalize a transaction",
45 registry,
46 )
47 .unwrap(),
48 num_successful_finalizations: register_int_counter_with_registry!(
49 "validator_tx_finalizer_num_successful_finalizations",
50 "Number of transactions successfully finalized",
51 registry,
52 )
53 .unwrap(),
54 finalization_latency: register_histogram_with_registry!(
55 "validator_tx_finalizer_finalization_latency",
56 "Latency of transaction finalization",
57 LATENCY_SEC_BUCKETS.to_vec(),
58 registry,
59 )
60 .unwrap(),
61 validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
62 "validator_tx_finalizer_attempt_delay",
63 "Duration that a validator in the committee waited before attempting to finalize the transaction",
64 vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
65 registry,
66 )
67 .unwrap(),
68 #[cfg(any(msim, test))]
69 num_finalization_attempts_for_testing: AtomicU64::new(0),
70 #[cfg(test)]
71 num_successful_finalizations_for_testing: AtomicU64::new(0),
72 }
73 }
74
75 fn start_finalization(&self) -> Instant {
76 self.num_finalization_attempts.inc();
77 #[cfg(any(msim, test))]
78 self.num_finalization_attempts_for_testing
79 .fetch_add(1, Relaxed);
80 Instant::now()
81 }
82
83 fn finalization_succeeded(&self, start_time: Instant) {
84 let latency = start_time.elapsed();
85 self.num_successful_finalizations.inc();
86 self.finalization_latency.observe(latency.as_secs_f64());
87 #[cfg(test)]
88 self.num_successful_finalizations_for_testing
89 .fetch_add(1, Relaxed);
90 }
91}
92
93pub struct ValidatorTxFinalizerConfig {
94 pub tx_finalization_delay: Duration,
95 pub tx_finalization_timeout: Duration,
96 pub validator_delay_increments_sec: u64,
98 pub validator_max_delay: Duration,
99}
100
101#[cfg(not(any(msim, test)))]
102impl Default for ValidatorTxFinalizerConfig {
103 fn default() -> Self {
104 Self {
105 tx_finalization_delay: Duration::from_secs(60),
110 tx_finalization_timeout: Duration::from_secs(60),
112 validator_delay_increments_sec: 10,
113 validator_max_delay: Duration::from_secs(180),
114 }
115 }
116}
117
118#[cfg(any(msim, test))]
119impl Default for ValidatorTxFinalizerConfig {
120 fn default() -> Self {
121 Self {
122 tx_finalization_delay: Duration::from_secs(5),
123 tx_finalization_timeout: Duration::from_secs(60),
124 validator_delay_increments_sec: 1,
125 validator_max_delay: Duration::from_secs(15),
126 }
127 }
128}
129
130pub struct ValidatorTxFinalizer<C: Clone> {
135 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
136 name: AuthorityName,
137 config: Arc<ValidatorTxFinalizerConfig>,
138 metrics: Arc<ValidatorTxFinalizerMetrics>,
139}
140
141impl<C: Clone> ValidatorTxFinalizer<C> {
142 pub fn new(
143 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
144 name: AuthorityName,
145 registry: &Registry,
146 ) -> Self {
147 Self {
148 agg,
149 name,
150 config: Arc::new(ValidatorTxFinalizerConfig::default()),
151 metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
152 }
153 }
154
155 #[cfg(test)]
156 pub(crate) fn new_for_testing(
157 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
158 name: AuthorityName,
159 ) -> Self {
160 Self::new(agg, name, &Registry::new())
161 }
162
163 #[cfg(test)]
164 pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
165 &self.agg
166 }
167
168 #[cfg(any(msim, test))]
169 pub fn num_finalization_attempts_for_testing(&self) -> u64 {
170 self.metrics
171 .num_finalization_attempts_for_testing
172 .load(Relaxed)
173 }
174}
175
176impl<C> ValidatorTxFinalizer<C>
177where
178 C: Clone + AuthorityAPI + Send + Sync + 'static,
179{
180 pub async fn track_signed_tx(
181 &self,
182 cache_read: Arc<dyn TransactionCacheRead>,
183 epoch_store: &Arc<AuthorityPerEpochStore>,
184 tx: VerifiedSignedTransaction,
185 ) {
186 let tx_digest = *tx.digest();
187 trace!(?tx_digest, "Tracking signed transaction");
188 match self
189 .delay_and_finalize_tx(cache_read, epoch_store, tx)
190 .await
191 {
192 Ok(did_run) => {
193 if did_run {
194 debug!(?tx_digest, "Transaction finalized");
195 }
196 }
197 Err(err) => {
198 error!(?tx_digest, ?err, "Failed to finalize transaction");
199 }
200 }
201 }
202
203 async fn delay_and_finalize_tx(
204 &self,
205 cache_read: Arc<dyn TransactionCacheRead>,
206 epoch_store: &Arc<AuthorityPerEpochStore>,
207 tx: VerifiedSignedTransaction,
208 ) -> anyhow::Result<bool> {
209 let tx_digest = *tx.digest();
210 let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
211 return Ok(false);
212 };
213 let digests = [tx_digest];
214 select! {
215 _ = tokio::time::sleep(tx_finalization_delay) => {
216 trace!(?tx_digest, "Waking up to finalize transaction");
217 }
218 _ = cache_read.notify_read_executed_effects_digests(&digests) => {
219 trace!(?tx_digest, "Transaction already finalized");
220 return Ok(false);
221 }
222 }
223
224 if epoch_store.is_pending_consensus_certificate(&tx_digest) {
225 trace!(
226 ?tx_digest,
227 "Transaction has been submitted to consensus, no need to help drive finality"
228 );
229 return Ok(false);
230 }
231
232 self.metrics
233 .validator_tx_finalizer_attempt_delay
234 .observe(tx_finalization_delay.as_secs_f64());
235 let start_time = self.metrics.start_finalization();
236 debug!(
237 ?tx_digest,
238 "Invoking authority aggregator to finalize transaction"
239 );
240 tokio::time::timeout(
241 self.config.tx_finalization_timeout,
242 self.agg
243 .load()
244 .execute_transaction_block(tx.into_unsigned().inner(), None),
245 )
246 .await??;
247 self.metrics.finalization_succeeded(start_time);
248 Ok(true)
249 }
250
251 fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259 let agg = self.agg.load();
260 let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261 let Some(position) = order.iter().position(|&name| name == self.name) else {
262 error!("Validator {} not found in the committee", self.name);
266 return None;
267 };
268 let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
271 let delay = self
272 .config
273 .tx_finalization_delay
274 .add(Duration::from_secs(extra_delay));
275 Some(min(delay, self.config.validator_max_delay))
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::{
282 cmp::min,
283 collections::BTreeMap,
284 iter,
285 net::SocketAddr,
286 num::NonZeroUsize,
287 sync::{
288 Arc,
289 atomic::{AtomicBool, Ordering::Relaxed},
290 },
291 };
292
293 use arc_swap::ArcSwap;
294 use async_trait::async_trait;
295 use iota_macros::sim_test;
296 use iota_swarm_config::network_config_builder::ConfigBuilder;
297 use iota_test_transaction_builder::TestTransactionBuilder;
298 use iota_types::{
299 base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
300 committee::{CommitteeTrait, StakeUnit},
301 crypto::{AccountKeyPair, get_account_key_pair},
302 effects::{TransactionEffectsAPI, TransactionEvents},
303 error::IotaError,
304 executable_transaction::VerifiedExecutableTransaction,
305 iota_system_state::IotaSystemState,
306 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
307 messages_grpc::{
308 HandleCertificateRequestV1, HandleCertificateResponseV1,
309 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
310 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
311 TransactionInfoRequest, TransactionInfoResponse,
312 },
313 object::Object,
314 transaction::{
315 SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
316 VerifiedTransaction,
317 },
318 utils::to_sender_signed_transaction,
319 };
320
321 use crate::{
322 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
323 authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
324 authority_client::AuthorityAPI,
325 validator_tx_finalizer::ValidatorTxFinalizer,
326 };
327
328 #[derive(Clone)]
329 struct MockAuthorityClient {
330 authority: Arc<AuthorityState>,
331 inject_fault: Arc<AtomicBool>,
332 }
333
334 #[async_trait]
335 impl AuthorityAPI for MockAuthorityClient {
336 async fn handle_transaction(
337 &self,
338 transaction: Transaction,
339 _client_addr: Option<SocketAddr>,
340 ) -> Result<HandleTransactionResponse, IotaError> {
341 if self.inject_fault.load(Relaxed) {
342 return Err(IotaError::Timeout);
343 }
344 let epoch_store = self.authority.epoch_store_for_testing();
345 self.authority
346 .handle_transaction(
347 &epoch_store,
348 VerifiedTransaction::new_unchecked(transaction),
349 )
350 .await
351 }
352
353 async fn handle_certificate_v1(
354 &self,
355 request: HandleCertificateRequestV1,
356 _client_addr: Option<SocketAddr>,
357 ) -> Result<HandleCertificateResponseV1, IotaError> {
358 let epoch_store = self.authority.epoch_store_for_testing();
359 let (effects, _) = self
360 .authority
361 .try_execute_immediately(
362 &VerifiedExecutableTransaction::new_from_certificate(
363 VerifiedCertificate::new_unchecked(request.certificate),
364 ),
365 None,
366 &epoch_store,
367 )
368 .await?;
369 let events = match effects.events_digest() {
370 None => TransactionEvents::default(),
371 Some(digest) => self.authority.get_transaction_events(digest)?,
372 };
373 let signed_effects = self
374 .authority
375 .sign_effects(effects, &epoch_store)?
376 .into_inner();
377 Ok(HandleCertificateResponseV1 {
378 signed_effects,
379 events: Some(events),
380 input_objects: None,
381 output_objects: None,
382 auxiliary_data: None,
383 })
384 }
385
386 async fn handle_soft_bundle_certificates_v1(
387 &self,
388 _request: HandleSoftBundleCertificatesRequestV1,
389 _client_addr: Option<SocketAddr>,
390 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
391 unimplemented!()
392 }
393
394 async fn handle_object_info_request(
395 &self,
396 _request: ObjectInfoRequest,
397 ) -> Result<ObjectInfoResponse, IotaError> {
398 unimplemented!()
399 }
400
401 async fn handle_transaction_info_request(
402 &self,
403 _request: TransactionInfoRequest,
404 ) -> Result<TransactionInfoResponse, IotaError> {
405 unimplemented!()
406 }
407
408 async fn handle_checkpoint(
409 &self,
410 _request: CheckpointRequest,
411 ) -> Result<CheckpointResponse, IotaError> {
412 unimplemented!()
413 }
414
415 async fn handle_system_state_object(
416 &self,
417 _request: SystemStateRequest,
418 ) -> Result<IotaSystemState, IotaError> {
419 unimplemented!()
420 }
421 }
422
423 #[sim_test]
424 async fn test_validator_tx_finalizer_basic_flow() {
425 telemetry_subscribers::init_for_testing();
426 let (sender, keypair) = get_account_key_pair();
427 let gas_object = Object::with_owner_for_testing(sender);
428 let gas_object_id = gas_object.id();
429 let (states, auth_agg, clients) = create_validators(gas_object).await;
430 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
431 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
432 let tx_digest = *signed_tx.digest();
433 let cache_read = states[0].get_transaction_cache_reader().clone();
434 let epoch_store = states[0].epoch_store_for_testing();
435 let metrics = finalizer1.metrics.clone();
436 let handle = tokio::spawn(async move {
437 finalizer1
438 .track_signed_tx(cache_read, &epoch_store, signed_tx)
439 .await;
440 });
441 handle.await.unwrap();
442 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
443 assert_eq!(
444 metrics.num_finalization_attempts_for_testing.load(Relaxed),
445 1
446 );
447 assert_eq!(
448 metrics
449 .num_successful_finalizations_for_testing
450 .load(Relaxed),
451 1
452 );
453 }
454
455 #[tokio::test]
456 async fn test_validator_tx_finalizer_new_epoch() {
457 let (sender, keypair) = get_account_key_pair();
458 let gas_object = Object::with_owner_for_testing(sender);
459 let gas_object_id = gas_object.id();
460 let (states, auth_agg, clients) = create_validators(gas_object).await;
461 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
462 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
463 let tx_digest = *signed_tx.digest();
464 let epoch_store = states[0].epoch_store_for_testing();
465 let cache_read = states[0].get_transaction_cache_reader().clone();
466
467 let metrics = finalizer1.metrics.clone();
468 let handle = tokio::spawn(async move {
469 let _ = epoch_store
470 .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
471 .await;
472 });
473 states[0].reconfigure_for_testing().await;
474 handle.await.unwrap();
475 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
476 assert_eq!(
477 metrics.num_finalization_attempts_for_testing.load(Relaxed),
478 0
479 );
480 assert_eq!(
481 metrics
482 .num_successful_finalizations_for_testing
483 .load(Relaxed),
484 0
485 );
486 }
487
488 #[tokio::test]
489 async fn test_validator_tx_finalizer_auth_agg_reconfig() {
490 let (sender, _) = get_account_key_pair();
491 let gas_object = Object::with_owner_for_testing(sender);
492 let (states, auth_agg, _clients) = create_validators(gas_object).await;
493 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
494 let mut new_auth_agg = (**auth_agg.load()).clone();
495 let mut new_committee = (*new_auth_agg.committee).clone();
496 new_committee.epoch = 100;
497 new_auth_agg.committee = Arc::new(new_committee);
498 auth_agg.store(Arc::new(new_auth_agg));
499 assert_eq!(
500 finalizer1.auth_agg().load().committee.epoch,
501 100,
502 "AuthorityAggregator not updated"
503 );
504 }
505
506 #[tokio::test]
507 async fn test_validator_tx_finalizer_already_executed() {
508 telemetry_subscribers::init_for_testing();
509 let (sender, keypair) = get_account_key_pair();
510 let gas_object = Object::with_owner_for_testing(sender);
511 let gas_object_id = gas_object.id();
512 let (states, auth_agg, clients) = create_validators(gas_object).await;
513 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
514 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
515 let tx_digest = *signed_tx.digest();
516 let cache_read = states[0].get_transaction_cache_reader().clone();
517 let epoch_store = states[0].epoch_store_for_testing();
518
519 let metrics = finalizer1.metrics.clone();
520 let signed_tx_clone = signed_tx.clone();
521 let handle = tokio::spawn(async move {
522 finalizer1
523 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
524 .await;
525 });
526 auth_agg
527 .load()
528 .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
529 .await
530 .unwrap();
531 handle.await.unwrap();
532 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
533 assert_eq!(
534 metrics.num_finalization_attempts_for_testing.load(Relaxed),
535 0
536 );
537 assert_eq!(
538 metrics
539 .num_successful_finalizations_for_testing
540 .load(Relaxed),
541 0
542 );
543 }
544
545 #[tokio::test]
546 async fn test_validator_tx_finalizer_timeout() {
547 telemetry_subscribers::init_for_testing();
548 let (sender, keypair) = get_account_key_pair();
549 let gas_object = Object::with_owner_for_testing(sender);
550 let gas_object_id = gas_object.id();
551 let (states, auth_agg, clients) = create_validators(gas_object).await;
552 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
553 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
554 let tx_digest = *signed_tx.digest();
555 let cache_read = states[0].get_transaction_cache_reader().clone();
556 let epoch_store = states[0].epoch_store_for_testing();
557 for client in clients.values() {
558 client.inject_fault.store(true, Relaxed);
559 }
560
561 let metrics = finalizer1.metrics.clone();
562 let signed_tx_clone = signed_tx.clone();
563 let handle = tokio::spawn(async move {
564 finalizer1
565 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
566 .await;
567 });
568 handle.await.unwrap();
569 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
570 assert_eq!(
571 metrics.num_finalization_attempts_for_testing.load(Relaxed),
572 1
573 );
574 assert_eq!(
575 metrics
576 .num_successful_finalizations_for_testing
577 .load(Relaxed),
578 0
579 );
580 }
581
582 #[tokio::test]
583 async fn test_validator_tx_finalizer_determine_finalization_delay() {
584 const COMMITTEE_SIZE: usize = 15;
585 let network_config = ConfigBuilder::new_with_temp_dir()
586 .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
587 .build();
588 let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
589 .build_network_clients();
590 let auth_agg = Arc::new(auth_agg);
591 let finalizers = (0..COMMITTEE_SIZE)
592 .map(|idx| {
593 ValidatorTxFinalizer::new_for_testing(
594 Arc::new(ArcSwap::new(auth_agg.clone())),
595 auth_agg.committee.voting_rights[idx].0,
596 )
597 })
598 .collect::<Vec<_>>();
599 let config = finalizers[0].config.clone();
600 for _ in 0..100 {
601 let tx_digest = TransactionDigest::random();
602 let mut delays: Vec<_> = finalizers
603 .iter()
604 .map(|finalizer| {
605 finalizer
606 .determine_finalization_delay(&tx_digest)
607 .map(|delay| delay.as_secs())
608 .unwrap()
609 })
610 .collect();
611 delays.sort();
612 for (idx, delay) in delays.iter().enumerate() {
613 assert_eq!(
614 *delay,
615 min(
616 config.validator_max_delay.as_secs(),
617 config.tx_finalization_delay.as_secs()
618 + idx as u64 * config.validator_delay_increments_sec
619 )
620 );
621 }
622 }
623 }
624
625 async fn create_validators(
626 gas_object: Object,
627 ) -> (
628 Vec<Arc<AuthorityState>>,
629 Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
630 BTreeMap<AuthorityName, MockAuthorityClient>,
631 ) {
632 let network_config = ConfigBuilder::new_with_temp_dir()
633 .committee_size(NonZeroUsize::new(4).unwrap())
634 .with_objects(iter::once(gas_object))
635 .build();
636 let mut authority_states = vec![];
637 for idx in 0..4 {
638 let state = TestAuthorityBuilder::new()
639 .with_network_config(&network_config, idx)
640 .build()
641 .await;
642 authority_states.push(state);
643 }
644 let clients: BTreeMap<_, _> = authority_states
645 .iter()
646 .map(|state| {
647 (
648 state.name,
649 MockAuthorityClient {
650 authority: state.clone(),
651 inject_fault: Arc::new(AtomicBool::new(false)),
652 },
653 )
654 })
655 .collect();
656 let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
657 .build_custom_clients(clients.clone());
658 (
659 authority_states,
660 Arc::new(ArcSwap::new(Arc::new(auth_agg))),
661 clients,
662 )
663 }
664
665 async fn create_tx(
666 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
667 state: &Arc<AuthorityState>,
668 sender: IotaAddress,
669 keypair: &AccountKeyPair,
670 gas_object_id: ObjectID,
671 ) -> VerifiedSignedTransaction {
672 let gas_object_ref = state
673 .get_object(&gas_object_id)
674 .await
675 .unwrap()
676 .unwrap()
677 .compute_object_reference();
678 let tx_data = TestTransactionBuilder::new(
679 sender,
680 gas_object_ref,
681 state.reference_gas_price_for_testing().unwrap(),
682 )
683 .transfer_iota(None, sender)
684 .build();
685 let tx = to_sender_signed_transaction(tx_data, keypair);
686 let response = clients
687 .get(&state.name)
688 .unwrap()
689 .handle_transaction(tx.clone(), None)
690 .await
691 .unwrap();
692 VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
693 tx.into_data(),
694 response.status.into_signed_for_testing(),
695 ))
696 }
697
698 fn check_quorum_execution(
699 auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
700 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
701 tx_digest: &TransactionDigest,
702 expected: bool,
703 ) {
704 let quorum = auth_agg.committee.quorum_threshold();
705 let executed_weight: StakeUnit = clients
706 .iter()
707 .filter_map(|(name, client)| {
708 client
709 .authority
710 .is_tx_already_executed(tx_digest)
711 .unwrap()
712 .then_some(auth_agg.committee.weight(name))
713 })
714 .sum();
715 assert_eq!(executed_weight >= quorum, expected);
716 }
717}