1#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12 base_types::{AuthorityName, TransactionDigest},
13 transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16 Histogram, IntCounter, Registry, register_histogram_with_registry,
17 register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23 authority::authority_per_epoch_store::AuthorityPerEpochStore,
24 authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
25 execution_cache::TransactionCacheRead,
26};
27
28struct ValidatorTxFinalizerMetrics {
29 num_finalization_attempts: IntCounter,
30 num_successful_finalizations: IntCounter,
31 finalization_latency: Histogram,
32 validator_tx_finalizer_attempt_delay: Histogram,
33 #[cfg(any(msim, test))]
34 num_finalization_attempts_for_testing: AtomicU64,
35 #[cfg(test)]
36 num_successful_finalizations_for_testing: AtomicU64,
37}
38
39impl ValidatorTxFinalizerMetrics {
40 fn new(registry: &Registry) -> Self {
41 Self {
42 num_finalization_attempts: register_int_counter_with_registry!(
43 "validator_tx_finalizer_num_finalization_attempts",
44 "Total number of attempts to finalize a transaction",
45 registry,
46 )
47 .unwrap(),
48 num_successful_finalizations: register_int_counter_with_registry!(
49 "validator_tx_finalizer_num_successful_finalizations",
50 "Number of transactions successfully finalized",
51 registry,
52 )
53 .unwrap(),
54 finalization_latency: register_histogram_with_registry!(
55 "validator_tx_finalizer_finalization_latency",
56 "Latency of transaction finalization",
57 LATENCY_SEC_BUCKETS.to_vec(),
58 registry,
59 )
60 .unwrap(),
61 validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
62 "validator_tx_finalizer_attempt_delay",
63 "Duration that a validator in the committee waited before attempting to finalize the transaction",
64 vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
65 registry,
66 )
67 .unwrap(),
68 #[cfg(any(msim, test))]
69 num_finalization_attempts_for_testing: AtomicU64::new(0),
70 #[cfg(test)]
71 num_successful_finalizations_for_testing: AtomicU64::new(0),
72 }
73 }
74
75 fn start_finalization(&self) -> Instant {
76 self.num_finalization_attempts.inc();
77 #[cfg(any(msim, test))]
78 self.num_finalization_attempts_for_testing
79 .fetch_add(1, Relaxed);
80 Instant::now()
81 }
82
83 fn finalization_succeeded(&self, start_time: Instant) {
84 let latency = start_time.elapsed();
85 self.num_successful_finalizations.inc();
86 self.finalization_latency.observe(latency.as_secs_f64());
87 #[cfg(test)]
88 self.num_successful_finalizations_for_testing
89 .fetch_add(1, Relaxed);
90 }
91}
92
93pub struct ValidatorTxFinalizerConfig {
94 pub tx_finalization_delay: Duration,
95 pub tx_finalization_timeout: Duration,
96 pub validator_delay_increments_sec: u64,
98 pub validator_max_delay: Duration,
99}
100
101#[cfg(not(any(msim, test)))]
102impl Default for ValidatorTxFinalizerConfig {
103 fn default() -> Self {
104 Self {
105 tx_finalization_delay: Duration::from_secs(60),
110 tx_finalization_timeout: Duration::from_secs(60),
112 validator_delay_increments_sec: 10,
113 validator_max_delay: Duration::from_secs(180),
114 }
115 }
116}
117
118#[cfg(any(msim, test))]
119impl Default for ValidatorTxFinalizerConfig {
120 fn default() -> Self {
121 Self {
122 tx_finalization_delay: Duration::from_secs(5),
123 tx_finalization_timeout: Duration::from_secs(60),
124 validator_delay_increments_sec: 1,
125 validator_max_delay: Duration::from_secs(15),
126 }
127 }
128}
129
130pub struct ValidatorTxFinalizer<C: Clone> {
135 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
136 name: AuthorityName,
137 config: Arc<ValidatorTxFinalizerConfig>,
138 metrics: Arc<ValidatorTxFinalizerMetrics>,
139}
140
141impl<C: Clone> ValidatorTxFinalizer<C> {
142 pub fn new(
143 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
144 name: AuthorityName,
145 registry: &Registry,
146 ) -> Self {
147 Self {
148 agg,
149 name,
150 config: Arc::new(ValidatorTxFinalizerConfig::default()),
151 metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
152 }
153 }
154
155 #[cfg(test)]
156 pub(crate) fn new_for_testing(
157 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
158 name: AuthorityName,
159 ) -> Self {
160 Self::new(agg, name, &Registry::new())
161 }
162
163 #[cfg(test)]
164 pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
165 &self.agg
166 }
167
168 #[cfg(any(msim, test))]
169 pub fn num_finalization_attempts_for_testing(&self) -> u64 {
170 self.metrics
171 .num_finalization_attempts_for_testing
172 .load(Relaxed)
173 }
174}
175
176impl<C> ValidatorTxFinalizer<C>
177where
178 C: Clone + AuthorityAPI + Send + Sync + 'static,
179{
180 pub async fn track_signed_tx(
181 &self,
182 cache_read: Arc<dyn TransactionCacheRead>,
183 epoch_store: &Arc<AuthorityPerEpochStore>,
184 tx: VerifiedSignedTransaction,
185 ) {
186 let tx_digest = *tx.digest();
187 trace!(?tx_digest, "Tracking signed transaction");
188 match self
189 .delay_and_finalize_tx(cache_read, epoch_store, tx)
190 .await
191 {
192 Ok(did_run) => {
193 if did_run {
194 debug!(?tx_digest, "Transaction finalized");
195 }
196 }
197 Err(err) => {
198 error!(?tx_digest, ?err, "Failed to finalize transaction");
199 }
200 }
201 }
202
203 async fn delay_and_finalize_tx(
204 &self,
205 cache_read: Arc<dyn TransactionCacheRead>,
206 epoch_store: &Arc<AuthorityPerEpochStore>,
207 tx: VerifiedSignedTransaction,
208 ) -> anyhow::Result<bool> {
209 let tx_digest = *tx.digest();
210 let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
211 return Ok(false);
212 };
213 let digests = [tx_digest];
214 select! {
215 _ = tokio::time::sleep(tx_finalization_delay) => {
216 trace!(?tx_digest, "Waking up to finalize transaction");
217 }
218 _ = cache_read.try_notify_read_executed_effects_digests(&digests) => {
219 trace!(?tx_digest, "Transaction already finalized");
220 return Ok(false);
221 }
222 }
223
224 if epoch_store.is_pending_consensus_certificate(&tx_digest) {
225 trace!(
226 ?tx_digest,
227 "Transaction has been submitted to consensus, no need to help drive finality"
228 );
229 return Ok(false);
230 }
231
232 self.metrics
233 .validator_tx_finalizer_attempt_delay
234 .observe(tx_finalization_delay.as_secs_f64());
235 let start_time = self.metrics.start_finalization();
236 debug!(
237 ?tx_digest,
238 "Invoking authority aggregator to finalize transaction"
239 );
240 tokio::time::timeout(
241 self.config.tx_finalization_timeout,
242 self.agg
243 .load()
244 .execute_transaction_block(tx.into_unsigned().inner(), None),
245 )
246 .await??;
247 self.metrics.finalization_succeeded(start_time);
248 Ok(true)
249 }
250
251 fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259 let agg = self.agg.load();
260 let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261 let Some(position) = order.iter().position(|&name| name == self.name) else {
262 error!("Validator {} not found in the committee", self.name);
266 return None;
267 };
268 let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
271 let delay = self
272 .config
273 .tx_finalization_delay
274 .add(Duration::from_secs(extra_delay));
275 Some(min(delay, self.config.validator_max_delay))
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::{
282 cmp::min,
283 collections::BTreeMap,
284 iter,
285 net::SocketAddr,
286 num::NonZeroUsize,
287 sync::{
288 Arc,
289 atomic::{AtomicBool, Ordering::Relaxed},
290 },
291 };
292
293 use arc_swap::ArcSwap;
294 use async_trait::async_trait;
295 use iota_macros::sim_test;
296 use iota_swarm_config::network_config_builder::ConfigBuilder;
297 use iota_test_transaction_builder::TestTransactionBuilder;
298 use iota_types::{
299 base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
300 committee::{CommitteeTrait, StakeUnit},
301 crypto::{AccountKeyPair, get_account_key_pair},
302 effects::{TransactionEffectsAPI, TransactionEvents},
303 error::IotaError,
304 executable_transaction::VerifiedExecutableTransaction,
305 iota_system_state::IotaSystemState,
306 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
307 messages_grpc::{
308 HandleCertificateRequestV1, HandleCertificateResponseV1,
309 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
310 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
311 TransactionInfoRequest, TransactionInfoResponse,
312 },
313 object::Object,
314 transaction::{
315 SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
316 VerifiedTransaction,
317 },
318 utils::to_sender_signed_transaction,
319 };
320
321 use crate::{
322 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
323 authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
324 authority_client::AuthorityAPI,
325 validator_tx_finalizer::ValidatorTxFinalizer,
326 };
327
328 #[derive(Clone)]
329 struct MockAuthorityClient {
330 authority: Arc<AuthorityState>,
331 inject_fault: Arc<AtomicBool>,
332 }
333
334 #[async_trait]
335 impl AuthorityAPI for MockAuthorityClient {
336 async fn handle_transaction(
337 &self,
338 transaction: Transaction,
339 _client_addr: Option<SocketAddr>,
340 ) -> Result<HandleTransactionResponse, IotaError> {
341 if self.inject_fault.load(Relaxed) {
342 return Err(IotaError::Timeout);
343 }
344 let epoch_store = self.authority.epoch_store_for_testing();
345 self.authority
346 .handle_transaction(
347 &epoch_store,
348 VerifiedTransaction::new_unchecked(transaction),
349 )
350 .await
351 }
352
353 async fn handle_certificate_v1(
354 &self,
355 request: HandleCertificateRequestV1,
356 _client_addr: Option<SocketAddr>,
357 ) -> Result<HandleCertificateResponseV1, IotaError> {
358 let epoch_store = self.authority.epoch_store_for_testing();
359 let (effects, _) = self.authority.try_execute_immediately(
360 &VerifiedExecutableTransaction::new_from_certificate(
361 VerifiedCertificate::new_unchecked(request.certificate),
362 ),
363 None,
364 &epoch_store,
365 )?;
366 let events = match effects.events_digest() {
367 None => TransactionEvents::default(),
368 Some(digest) => self.authority.get_transaction_events(digest)?,
369 };
370 let signed_effects = self
371 .authority
372 .sign_effects(effects, &epoch_store)?
373 .into_inner();
374 Ok(HandleCertificateResponseV1 {
375 signed_effects,
376 events: Some(events),
377 input_objects: None,
378 output_objects: None,
379 auxiliary_data: None,
380 })
381 }
382
383 async fn handle_soft_bundle_certificates_v1(
384 &self,
385 _request: HandleSoftBundleCertificatesRequestV1,
386 _client_addr: Option<SocketAddr>,
387 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
388 unimplemented!()
389 }
390
391 async fn handle_object_info_request(
392 &self,
393 _request: ObjectInfoRequest,
394 ) -> Result<ObjectInfoResponse, IotaError> {
395 unimplemented!()
396 }
397
398 async fn handle_transaction_info_request(
399 &self,
400 _request: TransactionInfoRequest,
401 ) -> Result<TransactionInfoResponse, IotaError> {
402 unimplemented!()
403 }
404
405 async fn handle_checkpoint(
406 &self,
407 _request: CheckpointRequest,
408 ) -> Result<CheckpointResponse, IotaError> {
409 unimplemented!()
410 }
411
412 async fn handle_system_state_object(
413 &self,
414 _request: SystemStateRequest,
415 ) -> Result<IotaSystemState, IotaError> {
416 unimplemented!()
417 }
418 }
419
420 #[sim_test]
421 async fn test_validator_tx_finalizer_basic_flow() {
422 telemetry_subscribers::init_for_testing();
423 let (sender, keypair) = get_account_key_pair();
424 let gas_object = Object::with_owner_for_testing(sender);
425 let gas_object_id = gas_object.id();
426 let (states, auth_agg, clients) = create_validators(gas_object).await;
427 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
428 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
429 let tx_digest = *signed_tx.digest();
430 let cache_read = states[0].get_transaction_cache_reader().clone();
431 let epoch_store = states[0].epoch_store_for_testing();
432 let metrics = finalizer1.metrics.clone();
433 let handle = tokio::spawn(async move {
434 finalizer1
435 .track_signed_tx(cache_read, &epoch_store, signed_tx)
436 .await;
437 });
438 handle.await.unwrap();
439 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
440 assert_eq!(
441 metrics.num_finalization_attempts_for_testing.load(Relaxed),
442 1
443 );
444 assert_eq!(
445 metrics
446 .num_successful_finalizations_for_testing
447 .load(Relaxed),
448 1
449 );
450 }
451
452 #[tokio::test]
453 async fn test_validator_tx_finalizer_new_epoch() {
454 let (sender, keypair) = get_account_key_pair();
455 let gas_object = Object::with_owner_for_testing(sender);
456 let gas_object_id = gas_object.id();
457 let (states, auth_agg, clients) = create_validators(gas_object).await;
458 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
459 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
460 let tx_digest = *signed_tx.digest();
461 let epoch_store = states[0].epoch_store_for_testing();
462 let cache_read = states[0].get_transaction_cache_reader().clone();
463
464 let metrics = finalizer1.metrics.clone();
465 let handle = tokio::spawn(async move {
466 let _ = epoch_store
467 .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
468 .await;
469 });
470 states[0].reconfigure_for_testing().await;
471 handle.await.unwrap();
472 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
473 assert_eq!(
474 metrics.num_finalization_attempts_for_testing.load(Relaxed),
475 0
476 );
477 assert_eq!(
478 metrics
479 .num_successful_finalizations_for_testing
480 .load(Relaxed),
481 0
482 );
483 }
484
485 #[tokio::test]
486 async fn test_validator_tx_finalizer_auth_agg_reconfig() {
487 let (sender, _) = get_account_key_pair();
488 let gas_object = Object::with_owner_for_testing(sender);
489 let (states, auth_agg, _clients) = create_validators(gas_object).await;
490 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
491 let mut new_auth_agg = (**auth_agg.load()).clone();
492 let mut new_committee = (*new_auth_agg.committee).clone();
493 new_committee.epoch = 100;
494 new_auth_agg.committee = Arc::new(new_committee);
495 auth_agg.store(Arc::new(new_auth_agg));
496 assert_eq!(
497 finalizer1.auth_agg().load().committee.epoch,
498 100,
499 "AuthorityAggregator not updated"
500 );
501 }
502
503 #[tokio::test]
504 async fn test_validator_tx_finalizer_already_executed() {
505 telemetry_subscribers::init_for_testing();
506 let (sender, keypair) = get_account_key_pair();
507 let gas_object = Object::with_owner_for_testing(sender);
508 let gas_object_id = gas_object.id();
509 let (states, auth_agg, clients) = create_validators(gas_object).await;
510 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
511 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
512 let tx_digest = *signed_tx.digest();
513 let cache_read = states[0].get_transaction_cache_reader().clone();
514 let epoch_store = states[0].epoch_store_for_testing();
515
516 let metrics = finalizer1.metrics.clone();
517 let signed_tx_clone = signed_tx.clone();
518 let handle = tokio::spawn(async move {
519 finalizer1
520 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
521 .await;
522 });
523 auth_agg
524 .load()
525 .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
526 .await
527 .unwrap();
528 handle.await.unwrap();
529 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
530 assert_eq!(
531 metrics.num_finalization_attempts_for_testing.load(Relaxed),
532 0
533 );
534 assert_eq!(
535 metrics
536 .num_successful_finalizations_for_testing
537 .load(Relaxed),
538 0
539 );
540 }
541
542 #[tokio::test]
543 async fn test_validator_tx_finalizer_timeout() {
544 telemetry_subscribers::init_for_testing();
545 let (sender, keypair) = get_account_key_pair();
546 let gas_object = Object::with_owner_for_testing(sender);
547 let gas_object_id = gas_object.id();
548 let (states, auth_agg, clients) = create_validators(gas_object).await;
549 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
550 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
551 let tx_digest = *signed_tx.digest();
552 let cache_read = states[0].get_transaction_cache_reader().clone();
553 let epoch_store = states[0].epoch_store_for_testing();
554 for client in clients.values() {
555 client.inject_fault.store(true, Relaxed);
556 }
557
558 let metrics = finalizer1.metrics.clone();
559 let signed_tx_clone = signed_tx.clone();
560 let handle = tokio::spawn(async move {
561 finalizer1
562 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
563 .await;
564 });
565 handle.await.unwrap();
566 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
567 assert_eq!(
568 metrics.num_finalization_attempts_for_testing.load(Relaxed),
569 1
570 );
571 assert_eq!(
572 metrics
573 .num_successful_finalizations_for_testing
574 .load(Relaxed),
575 0
576 );
577 }
578
579 #[tokio::test]
580 async fn test_validator_tx_finalizer_determine_finalization_delay() {
581 const COMMITTEE_SIZE: usize = 15;
582 let network_config = ConfigBuilder::new_with_temp_dir()
583 .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
584 .build();
585 let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
586 .build_network_clients();
587 let auth_agg = Arc::new(auth_agg);
588 let finalizers = (0..COMMITTEE_SIZE)
589 .map(|idx| {
590 ValidatorTxFinalizer::new_for_testing(
591 Arc::new(ArcSwap::new(auth_agg.clone())),
592 auth_agg.committee.voting_rights[idx].0,
593 )
594 })
595 .collect::<Vec<_>>();
596 let config = finalizers[0].config.clone();
597 for _ in 0..100 {
598 let tx_digest = TransactionDigest::random();
599 let mut delays: Vec<_> = finalizers
600 .iter()
601 .map(|finalizer| {
602 finalizer
603 .determine_finalization_delay(&tx_digest)
604 .map(|delay| delay.as_secs())
605 .unwrap()
606 })
607 .collect();
608 delays.sort();
609 for (idx, delay) in delays.iter().enumerate() {
610 assert_eq!(
611 *delay,
612 min(
613 config.validator_max_delay.as_secs(),
614 config.tx_finalization_delay.as_secs()
615 + idx as u64 * config.validator_delay_increments_sec
616 )
617 );
618 }
619 }
620 }
621
622 async fn create_validators(
623 gas_object: Object,
624 ) -> (
625 Vec<Arc<AuthorityState>>,
626 Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
627 BTreeMap<AuthorityName, MockAuthorityClient>,
628 ) {
629 let network_config = ConfigBuilder::new_with_temp_dir()
630 .committee_size(NonZeroUsize::new(4).unwrap())
631 .with_objects(iter::once(gas_object))
632 .build();
633 let mut authority_states = vec![];
634 for idx in 0..4 {
635 let state = TestAuthorityBuilder::new()
636 .with_network_config(&network_config, idx)
637 .build()
638 .await;
639 authority_states.push(state);
640 }
641 let clients: BTreeMap<_, _> = authority_states
642 .iter()
643 .map(|state| {
644 (
645 state.name,
646 MockAuthorityClient {
647 authority: state.clone(),
648 inject_fault: Arc::new(AtomicBool::new(false)),
649 },
650 )
651 })
652 .collect();
653 let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
654 .build_custom_clients(clients.clone());
655 (
656 authority_states,
657 Arc::new(ArcSwap::new(Arc::new(auth_agg))),
658 clients,
659 )
660 }
661
662 async fn create_tx(
663 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
664 state: &Arc<AuthorityState>,
665 sender: IotaAddress,
666 keypair: &AccountKeyPair,
667 gas_object_id: ObjectID,
668 ) -> VerifiedSignedTransaction {
669 let gas_object_ref = state
670 .get_object(&gas_object_id)
671 .await
672 .unwrap()
673 .compute_object_reference();
674 let tx_data = TestTransactionBuilder::new(
675 sender,
676 gas_object_ref,
677 state.reference_gas_price_for_testing().unwrap(),
678 )
679 .transfer_iota(None, sender)
680 .build();
681 let tx = to_sender_signed_transaction(tx_data, keypair);
682 let response = clients
683 .get(&state.name)
684 .unwrap()
685 .handle_transaction(tx.clone(), None)
686 .await
687 .unwrap();
688 VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
689 tx.into_data(),
690 response.status.into_signed_for_testing(),
691 ))
692 }
693
694 fn check_quorum_execution(
695 auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
696 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
697 tx_digest: &TransactionDigest,
698 expected: bool,
699 ) {
700 let quorum = auth_agg.committee.quorum_threshold();
701 let executed_weight: StakeUnit = clients
702 .iter()
703 .filter_map(|(name, client)| {
704 client
705 .authority
706 .is_tx_already_executed(tx_digest)
707 .then_some(auth_agg.committee.weight(name))
708 })
709 .sum();
710 assert_eq!(executed_weight >= quorum, expected);
711 }
712}