1#[cfg(any(msim, test))]
6use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
7use std::{cmp::min, ops::Add, sync::Arc, time::Duration};
8
9use arc_swap::ArcSwap;
10use iota_metrics::LATENCY_SEC_BUCKETS;
11use iota_types::{
12 base_types::{AuthorityName, TransactionDigest},
13 transaction::VerifiedSignedTransaction,
14};
15use prometheus::{
16 Histogram, IntCounter, Registry, register_histogram_with_registry,
17 register_int_counter_with_registry,
18};
19use tokio::{select, time::Instant};
20use tracing::{debug, error, trace};
21
22use crate::{
23 authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI,
24 execution_cache::TransactionCacheRead,
25};
26
27struct ValidatorTxFinalizerMetrics {
28 num_finalization_attempts: IntCounter,
29 num_successful_finalizations: IntCounter,
30 finalization_latency: Histogram,
31 validator_tx_finalizer_attempt_delay: Histogram,
32 #[cfg(any(msim, test))]
33 num_finalization_attempts_for_testing: AtomicU64,
34 #[cfg(test)]
35 num_successful_finalizations_for_testing: AtomicU64,
36}
37
38impl ValidatorTxFinalizerMetrics {
39 fn new(registry: &Registry) -> Self {
40 Self {
41 num_finalization_attempts: register_int_counter_with_registry!(
42 "validator_tx_finalizer_num_finalization_attempts",
43 "Total number of attempts to finalize a transaction",
44 registry,
45 )
46 .unwrap(),
47 num_successful_finalizations: register_int_counter_with_registry!(
48 "validator_tx_finalizer_num_successful_finalizations",
49 "Number of transactions successfully finalized",
50 registry,
51 )
52 .unwrap(),
53 finalization_latency: register_histogram_with_registry!(
54 "validator_tx_finalizer_finalization_latency",
55 "Latency of transaction finalization",
56 LATENCY_SEC_BUCKETS.to_vec(),
57 registry,
58 )
59 .unwrap(),
60 validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
61 "validator_tx_finalizer_attempt_delay",
62 "Duration that a validator in the committee waited before attempting to finalize the transaction",
63 vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
64 registry,
65 )
66 .unwrap(),
67 #[cfg(any(msim, test))]
68 num_finalization_attempts_for_testing: AtomicU64::new(0),
69 #[cfg(test)]
70 num_successful_finalizations_for_testing: AtomicU64::new(0),
71 }
72 }
73
74 fn start_finalization(&self) -> Instant {
75 self.num_finalization_attempts.inc();
76 #[cfg(any(msim, test))]
77 self.num_finalization_attempts_for_testing
78 .fetch_add(1, Relaxed);
79 Instant::now()
80 }
81
82 fn finalization_succeeded(&self, start_time: Instant) {
83 let latency = start_time.elapsed();
84 self.num_successful_finalizations.inc();
85 self.finalization_latency.observe(latency.as_secs_f64());
86 #[cfg(test)]
87 self.num_successful_finalizations_for_testing
88 .fetch_add(1, Relaxed);
89 }
90}
91
92pub struct ValidatorTxFinalizerConfig {
93 pub tx_finalization_delay: Duration,
94 pub tx_finalization_timeout: Duration,
95 pub validator_delay_increments_sec: u64,
97 pub validator_max_delay: Duration,
98}
99
100#[cfg(not(any(msim, test)))]
101impl Default for ValidatorTxFinalizerConfig {
102 fn default() -> Self {
103 Self {
104 tx_finalization_delay: Duration::from_secs(60),
109 tx_finalization_timeout: Duration::from_secs(60),
111 validator_delay_increments_sec: 10,
112 validator_max_delay: Duration::from_secs(180),
113 }
114 }
115}
116
117#[cfg(any(msim, test))]
118impl Default for ValidatorTxFinalizerConfig {
119 fn default() -> Self {
120 Self {
121 tx_finalization_delay: Duration::from_secs(5),
122 tx_finalization_timeout: Duration::from_secs(60),
123 validator_delay_increments_sec: 1,
124 validator_max_delay: Duration::from_secs(15),
125 }
126 }
127}
128
129pub struct ValidatorTxFinalizer<C: Clone> {
134 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
135 name: AuthorityName,
136 config: Arc<ValidatorTxFinalizerConfig>,
137 metrics: Arc<ValidatorTxFinalizerMetrics>,
138}
139
140impl<C: Clone> ValidatorTxFinalizer<C> {
141 pub fn new(
142 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
143 name: AuthorityName,
144 registry: &Registry,
145 ) -> Self {
146 Self {
147 agg,
148 name,
149 config: Arc::new(ValidatorTxFinalizerConfig::default()),
150 metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
151 }
152 }
153
154 #[cfg(test)]
155 pub(crate) fn new_for_testing(
156 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
157 name: AuthorityName,
158 ) -> Self {
159 Self::new(agg, name, &Registry::new())
160 }
161
162 #[cfg(test)]
163 pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
164 &self.agg
165 }
166
167 #[cfg(any(msim, test))]
168 pub fn num_finalization_attempts_for_testing(&self) -> u64 {
169 self.metrics
170 .num_finalization_attempts_for_testing
171 .load(Relaxed)
172 }
173}
174
175impl<C> ValidatorTxFinalizer<C>
176where
177 C: Clone + AuthorityAPI + Send + Sync + 'static,
178{
179 pub async fn track_signed_tx(
180 &self,
181 cache_read: Arc<dyn TransactionCacheRead>,
182 tx: VerifiedSignedTransaction,
183 ) {
184 let tx_digest = *tx.digest();
185 trace!(?tx_digest, "Tracking signed transaction");
186 match self.delay_and_finalize_tx(cache_read, tx).await {
187 Ok(did_run) => {
188 if did_run {
189 debug!(?tx_digest, "Transaction finalized");
190 }
191 }
192 Err(err) => {
193 error!(?tx_digest, ?err, "Failed to finalize transaction");
194 }
195 }
196 }
197
198 async fn delay_and_finalize_tx(
199 &self,
200 cache_read: Arc<dyn TransactionCacheRead>,
201 tx: VerifiedSignedTransaction,
202 ) -> anyhow::Result<bool> {
203 let tx_digest = *tx.digest();
204 let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
205 return Ok(false);
206 };
207 let digests = [tx_digest];
208 select! {
209 _ = tokio::time::sleep(tx_finalization_delay) => {
210 trace!(?tx_digest, "Waking up to finalize transaction");
211 }
212 _ = cache_read.notify_read_executed_effects_digests(&digests) => {
213 trace!(?tx_digest, "Transaction already finalized");
214 return Ok(false);
215 }
216 }
217
218 self.metrics
219 .validator_tx_finalizer_attempt_delay
220 .observe(tx_finalization_delay.as_secs_f64());
221 let start_time = self.metrics.start_finalization();
222 debug!(
223 ?tx_digest,
224 "Invoking authority aggregator to finalize transaction"
225 );
226 tokio::time::timeout(
227 self.config.tx_finalization_timeout,
228 self.agg
229 .load()
230 .execute_transaction_block(tx.into_unsigned().inner(), None),
231 )
232 .await??;
233 self.metrics.finalization_succeeded(start_time);
234 Ok(true)
235 }
236
237 fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
245 let agg = self.agg.load();
246 let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
247 let Some(position) = order.iter().position(|&name| name == self.name) else {
248 error!("Validator {} not found in the committee", self.name);
252 return None;
253 };
254 let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
257 let delay = self
258 .config
259 .tx_finalization_delay
260 .add(Duration::from_secs(extra_delay));
261 Some(min(delay, self.config.validator_max_delay))
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use std::{
268 cmp::min,
269 collections::BTreeMap,
270 iter,
271 net::SocketAddr,
272 num::NonZeroUsize,
273 sync::{
274 Arc,
275 atomic::{AtomicBool, Ordering::Relaxed},
276 },
277 };
278
279 use arc_swap::ArcSwap;
280 use async_trait::async_trait;
281 use iota_macros::sim_test;
282 use iota_swarm_config::network_config_builder::ConfigBuilder;
283 use iota_test_transaction_builder::TestTransactionBuilder;
284 use iota_types::{
285 base_types::{AuthorityName, IotaAddress, ObjectID, TransactionDigest},
286 committee::{CommitteeTrait, StakeUnit},
287 crypto::{AccountKeyPair, get_account_key_pair},
288 effects::{TransactionEffectsAPI, TransactionEvents},
289 error::IotaError,
290 executable_transaction::VerifiedExecutableTransaction,
291 iota_system_state::IotaSystemState,
292 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
293 messages_grpc::{
294 HandleCertificateRequestV1, HandleCertificateResponseV1,
295 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
296 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
297 TransactionInfoRequest, TransactionInfoResponse,
298 },
299 object::Object,
300 transaction::{
301 SignedTransaction, Transaction, VerifiedCertificate, VerifiedSignedTransaction,
302 VerifiedTransaction,
303 },
304 utils::to_sender_signed_transaction,
305 };
306
307 use crate::{
308 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
309 authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder},
310 authority_client::AuthorityAPI,
311 validator_tx_finalizer::ValidatorTxFinalizer,
312 };
313
314 #[derive(Clone)]
315 struct MockAuthorityClient {
316 authority: Arc<AuthorityState>,
317 inject_fault: Arc<AtomicBool>,
318 }
319
320 #[async_trait]
321 impl AuthorityAPI for MockAuthorityClient {
322 async fn handle_transaction(
323 &self,
324 transaction: Transaction,
325 _client_addr: Option<SocketAddr>,
326 ) -> Result<HandleTransactionResponse, IotaError> {
327 if self.inject_fault.load(Relaxed) {
328 return Err(IotaError::Timeout);
329 }
330 let epoch_store = self.authority.epoch_store_for_testing();
331 self.authority
332 .handle_transaction(
333 &epoch_store,
334 VerifiedTransaction::new_unchecked(transaction),
335 )
336 .await
337 }
338
339 async fn handle_certificate_v1(
340 &self,
341 request: HandleCertificateRequestV1,
342 _client_addr: Option<SocketAddr>,
343 ) -> Result<HandleCertificateResponseV1, IotaError> {
344 let epoch_store = self.authority.epoch_store_for_testing();
345 let (effects, _) = self
346 .authority
347 .try_execute_immediately(
348 &VerifiedExecutableTransaction::new_from_certificate(
349 VerifiedCertificate::new_unchecked(request.certificate),
350 ),
351 None,
352 &epoch_store,
353 )
354 .await?;
355 let events = match effects.events_digest() {
356 None => TransactionEvents::default(),
357 Some(digest) => self.authority.get_transaction_events(digest)?,
358 };
359 let signed_effects = self
360 .authority
361 .sign_effects(effects, &epoch_store)?
362 .into_inner();
363 Ok(HandleCertificateResponseV1 {
364 signed_effects,
365 events: Some(events),
366 input_objects: None,
367 output_objects: None,
368 auxiliary_data: None,
369 })
370 }
371
372 async fn handle_soft_bundle_certificates_v1(
373 &self,
374 _request: HandleSoftBundleCertificatesRequestV1,
375 _client_addr: Option<SocketAddr>,
376 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
377 unimplemented!()
378 }
379
380 async fn handle_object_info_request(
381 &self,
382 _request: ObjectInfoRequest,
383 ) -> Result<ObjectInfoResponse, IotaError> {
384 unimplemented!()
385 }
386
387 async fn handle_transaction_info_request(
388 &self,
389 _request: TransactionInfoRequest,
390 ) -> Result<TransactionInfoResponse, IotaError> {
391 unimplemented!()
392 }
393
394 async fn handle_checkpoint(
395 &self,
396 _request: CheckpointRequest,
397 ) -> Result<CheckpointResponse, IotaError> {
398 unimplemented!()
399 }
400
401 async fn handle_system_state_object(
402 &self,
403 _request: SystemStateRequest,
404 ) -> Result<IotaSystemState, IotaError> {
405 unimplemented!()
406 }
407 }
408
409 #[sim_test]
410 async fn test_validator_tx_finalizer_basic_flow() {
411 telemetry_subscribers::init_for_testing();
412 let (sender, keypair) = get_account_key_pair();
413 let gas_object = Object::with_owner_for_testing(sender);
414 let gas_object_id = gas_object.id();
415 let (states, auth_agg, clients) = create_validators(gas_object).await;
416 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
417 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
418 let tx_digest = *signed_tx.digest();
419 let cache_read = states[0].get_transaction_cache_reader().clone();
420 let metrics = finalizer1.metrics.clone();
421 let handle = tokio::spawn(async move {
422 finalizer1.track_signed_tx(cache_read, signed_tx).await;
423 });
424 handle.await.unwrap();
425 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
426 assert_eq!(
427 metrics.num_finalization_attempts_for_testing.load(Relaxed),
428 1
429 );
430 assert_eq!(
431 metrics
432 .num_successful_finalizations_for_testing
433 .load(Relaxed),
434 1
435 );
436 }
437
438 #[tokio::test]
439 async fn test_validator_tx_finalizer_new_epoch() {
440 let (sender, keypair) = get_account_key_pair();
441 let gas_object = Object::with_owner_for_testing(sender);
442 let gas_object_id = gas_object.id();
443 let (states, auth_agg, clients) = create_validators(gas_object).await;
444 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
445 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
446 let tx_digest = *signed_tx.digest();
447 let epoch_store = states[0].epoch_store_for_testing();
448 let cache_read = states[0].get_transaction_cache_reader().clone();
449
450 let metrics = finalizer1.metrics.clone();
451 let handle = tokio::spawn(async move {
452 let _ = epoch_store
453 .within_alive_epoch(finalizer1.track_signed_tx(cache_read, signed_tx))
454 .await;
455 });
456 states[0].reconfigure_for_testing().await;
457 handle.await.unwrap();
458 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
459 assert_eq!(
460 metrics.num_finalization_attempts_for_testing.load(Relaxed),
461 0
462 );
463 assert_eq!(
464 metrics
465 .num_successful_finalizations_for_testing
466 .load(Relaxed),
467 0
468 );
469 }
470
471 #[tokio::test]
472 async fn test_validator_tx_finalizer_auth_agg_reconfig() {
473 let (sender, _) = get_account_key_pair();
474 let gas_object = Object::with_owner_for_testing(sender);
475 let (states, auth_agg, _clients) = create_validators(gas_object).await;
476 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
477 let mut new_auth_agg = (**auth_agg.load()).clone();
478 let mut new_committee = (*new_auth_agg.committee).clone();
479 new_committee.epoch = 100;
480 new_auth_agg.committee = Arc::new(new_committee);
481 auth_agg.store(Arc::new(new_auth_agg));
482 assert_eq!(
483 finalizer1.auth_agg().load().committee.epoch,
484 100,
485 "AuthorityAggregator not updated"
486 );
487 }
488
489 #[tokio::test]
490 async fn test_validator_tx_finalizer_already_executed() {
491 telemetry_subscribers::init_for_testing();
492 let (sender, keypair) = get_account_key_pair();
493 let gas_object = Object::with_owner_for_testing(sender);
494 let gas_object_id = gas_object.id();
495 let (states, auth_agg, clients) = create_validators(gas_object).await;
496 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
497 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
498 let tx_digest = *signed_tx.digest();
499 let cache_read = states[0].get_transaction_cache_reader().clone();
500
501 let metrics = finalizer1.metrics.clone();
502 let signed_tx_clone = signed_tx.clone();
503 let handle = tokio::spawn(async move {
504 finalizer1
505 .track_signed_tx(cache_read, signed_tx_clone)
506 .await;
507 });
508 auth_agg
509 .load()
510 .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
511 .await
512 .unwrap();
513 handle.await.unwrap();
514 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
515 assert_eq!(
516 metrics.num_finalization_attempts_for_testing.load(Relaxed),
517 0
518 );
519 assert_eq!(
520 metrics
521 .num_successful_finalizations_for_testing
522 .load(Relaxed),
523 0
524 );
525 }
526
527 #[tokio::test]
528 async fn test_validator_tx_finalizer_timeout() {
529 telemetry_subscribers::init_for_testing();
530 let (sender, keypair) = get_account_key_pair();
531 let gas_object = Object::with_owner_for_testing(sender);
532 let gas_object_id = gas_object.id();
533 let (states, auth_agg, clients) = create_validators(gas_object).await;
534 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
535 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
536 let tx_digest = *signed_tx.digest();
537 let cache_read = states[0].get_transaction_cache_reader().clone();
538 for client in clients.values() {
539 client.inject_fault.store(true, Relaxed);
540 }
541
542 let metrics = finalizer1.metrics.clone();
543 let signed_tx_clone = signed_tx.clone();
544 let handle = tokio::spawn(async move {
545 finalizer1
546 .track_signed_tx(cache_read, signed_tx_clone)
547 .await;
548 });
549 handle.await.unwrap();
550 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
551 assert_eq!(
552 metrics.num_finalization_attempts_for_testing.load(Relaxed),
553 1
554 );
555 assert_eq!(
556 metrics
557 .num_successful_finalizations_for_testing
558 .load(Relaxed),
559 0
560 );
561 }
562
563 #[tokio::test]
564 async fn test_validator_tx_finalizer_determine_finalization_delay() {
565 const COMMITTEE_SIZE: usize = 15;
566 let network_config = ConfigBuilder::new_with_temp_dir()
567 .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
568 .build();
569 let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
570 .build_network_clients();
571 let auth_agg = Arc::new(auth_agg);
572 let finalizers = (0..COMMITTEE_SIZE)
573 .map(|idx| {
574 ValidatorTxFinalizer::new_for_testing(
575 Arc::new(ArcSwap::new(auth_agg.clone())),
576 auth_agg.committee.voting_rights[idx].0,
577 )
578 })
579 .collect::<Vec<_>>();
580 let config = finalizers[0].config.clone();
581 for _ in 0..100 {
582 let tx_digest = TransactionDigest::random();
583 let mut delays: Vec<_> = finalizers
584 .iter()
585 .map(|finalizer| {
586 finalizer
587 .determine_finalization_delay(&tx_digest)
588 .map(|delay| delay.as_secs())
589 .unwrap()
590 })
591 .collect();
592 delays.sort();
593 for (idx, delay) in delays.iter().enumerate() {
594 assert_eq!(
595 *delay,
596 min(
597 config.validator_max_delay.as_secs(),
598 config.tx_finalization_delay.as_secs()
599 + idx as u64 * config.validator_delay_increments_sec
600 )
601 );
602 }
603 }
604 }
605
606 async fn create_validators(
607 gas_object: Object,
608 ) -> (
609 Vec<Arc<AuthorityState>>,
610 Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
611 BTreeMap<AuthorityName, MockAuthorityClient>,
612 ) {
613 let network_config = ConfigBuilder::new_with_temp_dir()
614 .committee_size(NonZeroUsize::new(4).unwrap())
615 .with_objects(iter::once(gas_object))
616 .build();
617 let mut authority_states = vec![];
618 for idx in 0..4 {
619 let state = TestAuthorityBuilder::new()
620 .with_network_config(&network_config, idx)
621 .build()
622 .await;
623 authority_states.push(state);
624 }
625 let clients: BTreeMap<_, _> = authority_states
626 .iter()
627 .map(|state| {
628 (
629 state.name,
630 MockAuthorityClient {
631 authority: state.clone(),
632 inject_fault: Arc::new(AtomicBool::new(false)),
633 },
634 )
635 })
636 .collect();
637 let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
638 .build_custom_clients(clients.clone());
639 (
640 authority_states,
641 Arc::new(ArcSwap::new(Arc::new(auth_agg))),
642 clients,
643 )
644 }
645
646 async fn create_tx(
647 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
648 state: &Arc<AuthorityState>,
649 sender: IotaAddress,
650 keypair: &AccountKeyPair,
651 gas_object_id: ObjectID,
652 ) -> VerifiedSignedTransaction {
653 let gas_object_ref = state
654 .get_object(&gas_object_id)
655 .await
656 .unwrap()
657 .unwrap()
658 .compute_object_reference();
659 let tx_data = TestTransactionBuilder::new(
660 sender,
661 gas_object_ref,
662 state.reference_gas_price_for_testing().unwrap(),
663 )
664 .transfer_iota(None, sender)
665 .build();
666 let tx = to_sender_signed_transaction(tx_data, keypair);
667 let response = clients
668 .get(&state.name)
669 .unwrap()
670 .handle_transaction(tx.clone(), None)
671 .await
672 .unwrap();
673 VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
674 tx.into_data(),
675 response.status.into_signed_for_testing(),
676 ))
677 }
678
679 fn check_quorum_execution(
680 auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
681 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
682 tx_digest: &TransactionDigest,
683 expected: bool,
684 ) {
685 let quorum = auth_agg.committee.quorum_threshold();
686 let executed_weight: StakeUnit = clients
687 .iter()
688 .filter_map(|(name, client)| {
689 client
690 .authority
691 .is_tx_already_executed(tx_digest)
692 .unwrap()
693 .then_some(auth_agg.committee.weight(name))
694 })
695 .sum();
696 assert_eq!(executed_weight >= quorum, expected);
697 }
698}