1mod metrics;
6pub use metrics::*;
7
8pub mod reconfig_observer;
9
10use std::{
11 collections::{BTreeMap, BTreeSet},
12 fmt::{Debug, Formatter, Write},
13 net::SocketAddr,
14 sync::Arc,
15 time::Duration,
16};
17
18use arc_swap::ArcSwap;
19use iota_common::sync::notify_read::{NotifyRead, Registration};
20use iota_macros::fail_point;
21use iota_metrics::{
22 GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task,
23};
24use iota_types::{
25 base_types::{AuthorityName, ObjectRef, TransactionDigest},
26 committee::{Committee, EpochId, StakeUnit},
27 error::{IotaError, IotaResult},
28 messages_grpc::HandleCertificateRequestV1,
29 messages_safe_client::PlainTransactionInfoResponse,
30 quorum_driver_types::{
31 ExecuteTransactionRequestV1, QuorumDriverEffectsQueueResult, QuorumDriverError,
32 QuorumDriverResponse, QuorumDriverResult,
33 },
34 transaction::{CertifiedTransaction, Transaction},
35};
36use tap::TapFallible;
37use tokio::{
38 sync::{
39 Semaphore,
40 mpsc::{self, Receiver, Sender},
41 },
42 task::JoinHandle,
43 time::{Instant, sleep_until},
44};
45use tracing::{debug, error, info, instrument, trace_span, warn};
46
47use self::reconfig_observer::ReconfigObserver;
48use crate::{
49 authority_aggregator::{
50 AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
51 ProcessTransactionResult,
52 },
53 authority_client::AuthorityAPI,
54};
55
56#[cfg(test)]
57mod tests;
58
59const TASK_QUEUE_SIZE: usize = 2000;
60const EFFECTS_QUEUE_SIZE: usize = 10000;
61const TX_MAX_RETRY_TIMES: u32 = 10;
62
63#[derive(Clone)]
64pub struct QuorumDriverTask {
65 pub request: ExecuteTransactionRequestV1,
66 pub tx_cert: Option<CertifiedTransaction>,
67 pub retry_times: u32,
68 pub next_retry_after: Instant,
69 pub client_addr: Option<SocketAddr>,
70 pub trace_span: Option<tracing::Span>,
71}
72
73impl Debug for QuorumDriverTask {
74 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75 let mut writer = String::new();
76 write!(writer, "tx_digest={:?} ", self.request.transaction.digest())?;
77 write!(writer, "has_tx_cert={} ", self.tx_cert.is_some())?;
78 write!(writer, "retry_times={} ", self.retry_times)?;
79 write!(writer, "next_retry_after={:?} ", self.next_retry_after)?;
80 write!(f, "{}", writer)
81 }
82}
83
84pub struct QuorumDriver<A: Clone> {
85 validators: ArcSwap<AuthorityAggregator<A>>,
86 task_sender: Sender<QuorumDriverTask>,
87 effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
88 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
89 metrics: Arc<QuorumDriverMetrics>,
90 max_retry_times: u32,
91}
92
93impl<A: Clone> QuorumDriver<A> {
94 pub(crate) fn new(
95 validators: ArcSwap<AuthorityAggregator<A>>,
96 task_sender: Sender<QuorumDriverTask>,
97 effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
98 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
99 metrics: Arc<QuorumDriverMetrics>,
100 max_retry_times: u32,
101 ) -> Self {
102 Self {
103 validators,
104 task_sender,
105 effects_subscribe_sender,
106 notifier,
107 metrics,
108 max_retry_times,
109 }
110 }
111
112 pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
113 &self.validators
114 }
115
116 pub fn clone_committee(&self) -> Arc<Committee> {
117 self.validators.load().committee.clone()
118 }
119
120 pub fn current_epoch(&self) -> EpochId {
121 self.validators.load().committee.epoch
122 }
123
124 async fn enqueue_task(&self, task: QuorumDriverTask) -> IotaResult<()> {
125 self.task_sender
126 .send(task.clone())
127 .await
128 .tap_err(|e| debug!(?task, "Failed to enqueue task: {:?}", e))
129 .tap_ok(|_| {
130 debug!(?task, "Enqueued task.");
131 self.metrics.current_requests_in_flight.inc();
132 self.metrics.total_enqueued.inc();
133 if task.retry_times > 0 {
134 if task.retry_times == 1 {
135 self.metrics.current_transactions_in_retry.inc();
136 }
137 self.metrics
138 .transaction_retry_count
139 .report(task.retry_times as u64);
140 }
141 })
142 .map_err(|e| IotaError::QuorumDriverCommunication {
143 error: e.to_string(),
144 })
145 }
146
147 async fn enqueue_again_maybe(
150 &self,
151 request: ExecuteTransactionRequestV1,
152 tx_cert: Option<CertifiedTransaction>,
153 old_retry_times: u32,
154 client_addr: Option<SocketAddr>,
155 ) -> IotaResult<()> {
156 if old_retry_times >= self.max_retry_times {
157 info!(tx_digest=?request.transaction.digest(), "Failed to reach finality after attempting for {} times", old_retry_times+1);
159 self.notify(
160 &request.transaction,
161 &Err(
162 QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
163 total_attempts: old_retry_times + 1,
164 },
165 ),
166 old_retry_times + 1,
167 );
168 return Ok(());
169 }
170 self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
171 .await
172 }
173
174 async fn backoff_and_enqueue(
178 &self,
179 request: ExecuteTransactionRequestV1,
180 tx_cert: Option<CertifiedTransaction>,
181 old_retry_times: u32,
182 client_addr: Option<SocketAddr>,
183 min_backoff_duration: Option<Duration>,
184 ) -> IotaResult<()> {
185 let next_retry_after = Instant::now()
186 + Duration::from_millis(200 * u64::pow(2, old_retry_times))
187 .max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
188 sleep_until(next_retry_after).await;
189
190 fail_point!("count_retry_times");
191
192 let tx_cert = match tx_cert {
193 Some(tx_cert) if tx_cert.epoch() == self.current_epoch() => Some(tx_cert),
197 _other => None,
198 };
199
200 self.enqueue_task(QuorumDriverTask {
201 request,
202 tx_cert,
203 retry_times: old_retry_times + 1,
204 next_retry_after,
205 client_addr,
206 trace_span: Some(tracing::Span::current()),
207 })
208 .await
209 }
210
211 pub fn notify(
212 &self,
213 transaction: &Transaction,
214 response: &QuorumDriverResult,
215 total_attempts: u32,
216 ) {
217 let tx_digest = transaction.digest();
218 let effects_queue_result = match &response {
219 Ok(resp) => {
220 self.metrics.total_ok_responses.inc();
221 self.metrics
222 .attempt_times_ok_response
223 .report(total_attempts as u64);
224 Ok((transaction.clone(), resp.clone()))
225 }
226 Err(err) => {
227 self.metrics
228 .total_err_responses
229 .with_label_values(&[err.as_ref()])
230 .inc();
231 Err((*tx_digest, err.clone()))
232 }
233 };
234 if total_attempts > 1 {
235 self.metrics.current_transactions_in_retry.dec();
236 }
237 if let Err(err) = self.effects_subscribe_sender.send(effects_queue_result) {
241 warn!(?tx_digest, "No subscriber found for effects: {}", err);
242 }
243 debug!(?tx_digest, "notify QuorumDriver task result");
244 self.notifier.notify(tx_digest, response);
245 }
246}
247
248impl<A> QuorumDriver<A>
249where
250 A: AuthorityAPI + Send + Sync + 'static + Clone,
251{
252 #[instrument(level = "trace", skip_all)]
253 pub async fn submit_transaction(
254 &self,
255 request: ExecuteTransactionRequestV1,
256 ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
257 let tx_digest = request.transaction.digest();
258 debug!(?tx_digest, "Received transaction execution request.");
259 self.metrics.total_requests.inc();
260
261 let ticket = self.notifier.register_one(tx_digest);
262 self.enqueue_task(QuorumDriverTask {
263 request,
264 tx_cert: None,
265 retry_times: 0,
266 next_retry_after: Instant::now(),
267 client_addr: None,
268 trace_span: Some(tracing::Span::current()),
269 })
270 .await?;
271 Ok(ticket)
272 }
273
274 #[instrument(level = "trace", skip_all)]
278 pub async fn submit_transaction_no_ticket(
279 &self,
280 request: ExecuteTransactionRequestV1,
281 client_addr: Option<SocketAddr>,
282 ) -> IotaResult<()> {
283 let tx_digest = request.transaction.digest();
284 debug!(
285 ?tx_digest,
286 "Received transaction execution request, no ticket."
287 );
288 self.metrics.total_requests.inc();
289
290 self.enqueue_task(QuorumDriverTask {
291 request,
292 tx_cert: None,
293 retry_times: 0,
294 next_retry_after: Instant::now(),
295 client_addr,
296 trace_span: Some(tracing::Span::current()),
297 })
298 .await
299 }
300
301 #[instrument(level = "trace", skip_all)]
302 pub(crate) async fn process_transaction(
303 &self,
304 transaction: Transaction,
305 client_addr: Option<SocketAddr>,
306 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
307 let auth_agg = self.validators.load();
308 let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
309 let tx_digest = *transaction.digest();
310 let result = auth_agg.process_transaction(transaction, client_addr).await;
311
312 self.process_transaction_result(result, tx_digest, client_addr)
313 .await
314 }
315
316 #[instrument(level = "trace", skip_all)]
317 async fn process_transaction_result(
318 &self,
319 result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
320 tx_digest: TransactionDigest,
321 client_addr: Option<SocketAddr>,
322 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
323 match result {
324 Ok(resp) => Ok(resp),
325 Err(AggregatorProcessTransactionError::RetryableConflictingTransaction {
326 conflicting_tx_digest_to_retry,
327 errors,
328 conflicting_tx_digests,
329 }) => {
330 self.metrics
331 .total_err_process_tx_responses_with_nonzero_conflicting_transactions
332 .inc();
333 debug!(
334 ?tx_digest,
335 "Observed {} conflicting transactions: {:?}",
336 conflicting_tx_digests.len(),
337 conflicting_tx_digests
338 );
339
340 if let Some(conflicting_tx_digest) = conflicting_tx_digest_to_retry {
341 self.process_conflicting_tx(
342 tx_digest,
343 conflicting_tx_digest,
344 conflicting_tx_digests,
345 client_addr,
346 )
347 .await
348 } else {
349 debug!(
353 ?errors,
354 "Observed Tx {tx_digest:} is still in retryable state. Conflicting Txes: {conflicting_tx_digests:?}",
355 );
356 Err(None)
357 }
358 }
359
360 Err(AggregatorProcessTransactionError::FatalConflictingTransaction {
361 errors,
362 conflicting_tx_digests,
363 }) => {
364 debug!(
365 ?errors,
366 "Observed Tx {tx_digest:} double spend attempted. Conflicting Txes: {conflicting_tx_digests:?}",
367 );
368 Err(Some(QuorumDriverError::ObjectsDoubleUsed {
369 conflicting_txes: conflicting_tx_digests,
370 retried_tx: None,
371 retried_tx_success: None,
372 }))
373 }
374
375 Err(AggregatorProcessTransactionError::FatalTransaction { errors }) => {
376 debug!(?tx_digest, ?errors, "Nonretryable transaction error");
377 Err(Some(QuorumDriverError::NonRecoverableTransactionError {
378 errors,
379 }))
380 }
381
382 Err(AggregatorProcessTransactionError::SystemOverload {
383 overloaded_stake,
384 errors,
385 }) => {
386 debug!(?tx_digest, ?errors, "System overload");
387 Err(Some(QuorumDriverError::SystemOverload {
388 overloaded_stake,
389 errors,
390 }))
391 }
392
393 Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
394 overload_stake,
395 errors,
396 retry_after_secs,
397 }) => {
398 self.metrics.total_retryable_overload_errors.inc();
399 debug!(
400 ?tx_digest,
401 ?errors,
402 "System overload and retry after secs {retry_after_secs}",
403 );
404 Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
405 overload_stake,
406 errors,
407 retry_after_secs,
408 }))
409 }
410
411 Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
412 debug!(?tx_digest, ?errors, "Retryable transaction error");
413 Err(None)
414 }
415
416 Err(
417 AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures,
418 ) => {
419 debug!(
420 ?tx_digest,
421 "Transaction is already finalized with different user signatures"
422 );
423 Err(Some(
424 QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures,
425 ))
426 }
427 }
428 }
429
430 #[instrument(level = "trace", skip_all)]
431 async fn process_conflicting_tx(
432 &self,
433 tx_digest: TransactionDigest,
434 conflicting_tx_digest: TransactionDigest,
435 conflicting_tx_digests: BTreeMap<
436 TransactionDigest,
437 (Vec<(AuthorityName, ObjectRef)>, StakeUnit),
438 >,
439 client_addr: Option<SocketAddr>,
440 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
441 let (validators, _) = conflicting_tx_digests.get(&conflicting_tx_digest).unwrap();
445 let attempt_result = self
446 .attempt_conflicting_transaction(
447 &conflicting_tx_digest,
448 &tx_digest,
449 validators.iter().map(|(pub_key, _)| *pub_key).collect(),
450 client_addr,
451 )
452 .await;
453 self.metrics
454 .total_attempts_retrying_conflicting_transaction
455 .inc();
456
457 match attempt_result {
458 Err(err) => {
459 debug!(
460 ?tx_digest,
461 ?conflicting_tx_digest,
462 "Encountered error while attempting conflicting transaction: {:?}",
463 err
464 );
465 Err(Some(QuorumDriverError::ObjectsDoubleUsed {
466 conflicting_txes: conflicting_tx_digests,
467 retried_tx: None,
468 retried_tx_success: None,
469 }))
470 }
471 Ok(success) => {
472 debug!(
473 ?tx_digest,
474 ?conflicting_tx_digest,
475 "Retried conflicting transaction. Success: {}",
476 success
477 );
478 if success {
479 self.metrics
480 .total_successful_attempts_retrying_conflicting_transaction
481 .inc();
482 }
483 Err(Some(QuorumDriverError::ObjectsDoubleUsed {
484 conflicting_txes: conflicting_tx_digests,
485 retried_tx: Some(conflicting_tx_digest),
486 retried_tx_success: Some(success),
487 }))
488 }
489 }
490 }
491
492 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
493 pub(crate) async fn process_certificate(
494 &self,
495 request: HandleCertificateRequestV1,
496 client_addr: Option<SocketAddr>,
497 ) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
498 let auth_agg = self.validators.load();
499 let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
500 let tx_digest = *request.certificate.digest();
501 let response = auth_agg
502 .process_certificate(request.clone(), client_addr)
503 .await
504 .map_err(|agg_err| match agg_err {
505 AggregatorProcessCertificateError::FatalExecuteCertificate {
506 non_retryable_errors,
507 } => {
508 error!(
510 ?tx_digest,
511 ?non_retryable_errors,
512 "[WATCHOUT] Unexpected Fatal error for certificate"
513 );
514 Some(QuorumDriverError::NonRecoverableTransactionError {
515 errors: non_retryable_errors,
516 })
517 }
518 AggregatorProcessCertificateError::RetryableExecuteCertificate {
519 retryable_errors,
520 } => {
521 debug!(?retryable_errors, "Retryable certificate");
522 None
523 }
524 })?;
525
526 Ok(response)
527 }
528
529 pub async fn update_validators(&self, new_validators: Arc<AuthorityAggregator<A>>) {
530 info!(
531 "Quorum Driver updating AuthorityAggregator with committee {}",
532 new_validators.committee
533 );
534 self.validators.store(new_validators);
535 }
536
537 #[instrument(level = "trace", skip_all)]
540 async fn attempt_conflicting_transaction(
541 &self,
542 tx_digest: &TransactionDigest,
543 original_tx_digest: &TransactionDigest,
544 validators: BTreeSet<AuthorityName>,
545 client_addr: Option<SocketAddr>,
546 ) -> IotaResult<bool> {
547 let response = self
548 .validators
549 .load()
550 .handle_transaction_info_request_from_some_validators(
551 tx_digest,
552 &validators,
553 Some(Duration::from_secs(10)),
554 )
555 .await?;
556
557 let transaction = match response {
560 PlainTransactionInfoResponse::ExecutedWithCert(cert, _, _) => {
561 self.metrics
562 .total_times_conflicting_transaction_already_finalized_when_retrying
563 .inc();
564 let result = self
568 .validators
569 .load()
570 .process_certificate(
571 HandleCertificateRequestV1 {
572 certificate: cert,
573 include_events: true,
574 include_input_objects: false,
575 include_output_objects: false,
576 include_auxiliary_data: false,
577 },
578 client_addr,
579 )
580 .await
581 .tap_ok(|_resp| {
582 debug!(
583 ?tx_digest,
584 ?original_tx_digest,
585 "Retry conflicting transaction certificate succeeded."
586 );
587 })
588 .tap_err(|err| {
589 debug!(
590 ?tx_digest,
591 ?original_tx_digest,
592 "Retry conflicting transaction certificate got an error: {:?}",
593 err
594 );
595 });
596 return Ok(result.is_ok());
598 }
599 PlainTransactionInfoResponse::Signed(signed) => {
600 signed.verify_committee_sigs_only(&self.clone_committee())?;
601 signed.into_unsigned()
602 }
603 PlainTransactionInfoResponse::ExecutedWithoutCert(transaction, _, _) => transaction,
604 };
605 let result = self
607 .validators
608 .load()
609 .execute_transaction_block(&transaction, client_addr)
610 .await
611 .tap_ok(|_resp| {
612 debug!(
613 ?tx_digest,
614 ?original_tx_digest,
615 "Retry conflicting transaction succeeded."
616 );
617 })
618 .tap_err(|err| {
619 debug!(
620 ?tx_digest,
621 ?original_tx_digest,
622 "Retry conflicting transaction got an error: {:?}",
623 err
624 );
625 });
626 Ok(result.is_ok())
628 }
629}
630
631pub struct QuorumDriverHandler<A: Clone> {
632 quorum_driver: Arc<QuorumDriver<A>>,
633 effects_subscriber: tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>,
634 quorum_driver_metrics: Arc<QuorumDriverMetrics>,
635 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
636 _processor_handle: JoinHandle<()>,
637}
638
639impl<A> QuorumDriverHandler<A>
640where
641 A: AuthorityAPI + Send + Sync + 'static + Clone,
642{
643 pub(crate) fn new(
644 validators: Arc<AuthorityAggregator<A>>,
645 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
646 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
647 metrics: Arc<QuorumDriverMetrics>,
648 max_retry_times: u32,
649 ) -> Self {
650 let (task_tx, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
651 let (subscriber_tx, subscriber_rx) =
652 tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
653 let quorum_driver = Arc::new(QuorumDriver::new(
654 ArcSwap::new(validators),
655 task_tx,
656 subscriber_tx,
657 notifier,
658 metrics.clone(),
659 max_retry_times,
660 ));
661 let metrics_clone = metrics.clone();
662 let processor_handle = {
663 let quorum_driver_clone = quorum_driver.clone();
664 spawn_monitored_task!(Self::task_queue_processor(
665 quorum_driver_clone,
666 task_rx,
667 metrics_clone
668 ))
669 };
670 let reconfig_observer_clone = reconfig_observer.clone();
671 {
672 let quorum_driver_clone = quorum_driver.clone();
673 spawn_monitored_task!({
674 async move {
675 let mut reconfig_observer_clone = reconfig_observer_clone.clone_boxed();
676 reconfig_observer_clone.run(quorum_driver_clone).await;
677 }
678 });
679 };
680 Self {
681 quorum_driver,
682 effects_subscriber: subscriber_rx,
683 quorum_driver_metrics: metrics,
684 reconfig_observer,
685 _processor_handle: processor_handle,
686 }
687 }
688
689 pub async fn submit_transaction_no_ticket(
693 &self,
694 request: ExecuteTransactionRequestV1,
695 client_addr: Option<SocketAddr>,
696 ) -> IotaResult<()> {
697 self.quorum_driver
698 .submit_transaction_no_ticket(request, client_addr)
699 .await
700 }
701
702 pub async fn submit_transaction(
703 &self,
704 request: ExecuteTransactionRequestV1,
705 ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
706 self.quorum_driver.submit_transaction(request).await
707 }
708
709 pub fn clone_new(&self) -> Self {
715 let (task_sender, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
716 let (effects_subscribe_sender, subscriber_rx) =
717 tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
718 let validators = ArcSwap::new(self.quorum_driver.authority_aggregator().load_full());
719 let quorum_driver = Arc::new(QuorumDriver {
720 validators,
721 task_sender,
722 effects_subscribe_sender,
723 notifier: Arc::new(NotifyRead::new()),
724 metrics: self.quorum_driver_metrics.clone(),
725 max_retry_times: self.quorum_driver.max_retry_times,
726 });
727 let metrics = self.quorum_driver_metrics.clone();
728 let processor_handle = {
729 let quorum_driver_copy = quorum_driver.clone();
730 spawn_monitored_task!(Self::task_queue_processor(
731 quorum_driver_copy,
732 task_rx,
733 metrics,
734 ))
735 };
736 {
737 let quorum_driver_copy = quorum_driver.clone();
738 let reconfig_observer = self.reconfig_observer.clone();
739 spawn_monitored_task!({
740 async move {
741 let mut reconfig_observer_clone = reconfig_observer.clone_boxed();
742 reconfig_observer_clone.run(quorum_driver_copy).await;
743 }
744 })
745 };
746
747 Self {
748 quorum_driver,
749 effects_subscriber: subscriber_rx,
750 quorum_driver_metrics: self.quorum_driver_metrics.clone(),
751 reconfig_observer: self.reconfig_observer.clone(),
752 _processor_handle: processor_handle,
753 }
754 }
755
756 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
757 self.quorum_driver.clone()
758 }
759
760 pub fn subscribe_to_effects(
761 &self,
762 ) -> tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult> {
763 self.effects_subscriber.resubscribe()
764 }
765
766 pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
767 self.quorum_driver.authority_aggregator()
768 }
769
770 pub fn current_epoch(&self) -> EpochId {
771 self.quorum_driver.current_epoch()
772 }
773
774 #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
778 async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
779 debug!(?task, "Quorum Driver processing task");
780 let QuorumDriverTask {
781 request,
782 tx_cert,
783 retry_times: old_retry_times,
784 client_addr,
785 ..
786 } = task;
787 let transaction = &request.transaction;
788 let tx_digest = *transaction.digest();
789 let is_single_writer_tx = !transaction.contains_shared_object();
790
791 let timer = Instant::now();
792 let (tx_cert, newly_formed) = match tx_cert {
793 None => match quorum_driver
794 .process_transaction(transaction.clone(), client_addr)
795 .await
796 {
797 Ok(ProcessTransactionResult::Certified {
798 certificate,
799 newly_formed,
800 }) => {
801 debug!(?tx_digest, "Transaction processing succeeded");
802 (certificate, newly_formed)
803 }
804 Ok(ProcessTransactionResult::Executed(effects_cert, events)) => {
805 debug!(
806 ?tx_digest,
807 "Transaction processing succeeded with effects directly"
808 );
809 let response = QuorumDriverResponse {
810 effects_cert,
811 events: Some(events),
812 input_objects: None,
813 output_objects: None,
814 auxiliary_data: None,
815 };
816 quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
817 return;
818 }
819 Err(err) => {
820 Self::handle_error(
821 quorum_driver,
822 request,
823 err,
824 None,
825 old_retry_times,
826 "get tx cert",
827 client_addr,
828 );
829 return;
830 }
831 },
832 Some(tx_cert) => (tx_cert, false),
833 };
834
835 let response = match quorum_driver
836 .process_certificate(
837 HandleCertificateRequestV1 {
838 certificate: tx_cert.clone(),
839 include_events: request.include_events,
840 include_input_objects: request.include_input_objects,
841 include_output_objects: request.include_output_objects,
842 include_auxiliary_data: request.include_auxiliary_data,
843 },
844 client_addr,
845 )
846 .await
847 {
848 Ok(response) => {
849 debug!(?tx_digest, "Certificate processing succeeded");
850 response
851 }
852 Err(err) => {
855 Self::handle_error(
856 quorum_driver,
857 request,
858 err,
859 Some(tx_cert),
860 old_retry_times,
861 "get effects cert",
862 client_addr,
863 );
864 return;
865 }
866 };
867 if newly_formed {
868 let settlement_finality_latency = timer.elapsed().as_secs_f64();
869 quorum_driver
870 .metrics
871 .settlement_finality_latency
872 .with_label_values(&[if is_single_writer_tx {
873 TX_TYPE_SINGLE_WRITER_TX
874 } else {
875 TX_TYPE_SHARED_OBJ_TX
876 }])
877 .observe(settlement_finality_latency);
878 let is_out_of_expected_range =
879 settlement_finality_latency >= 8.0 || settlement_finality_latency <= 0.1;
880 debug!(
881 ?tx_digest,
882 ?is_single_writer_tx,
883 ?is_out_of_expected_range,
884 "QuorumDriver settlement finality latency: {:.3} seconds",
885 settlement_finality_latency
886 );
887 }
888
889 quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
890 }
891
892 fn handle_error(
893 quorum_driver: Arc<QuorumDriver<A>>,
894 request: ExecuteTransactionRequestV1,
895 err: Option<QuorumDriverError>,
896 tx_cert: Option<CertifiedTransaction>,
897 old_retry_times: u32,
898 action: &'static str,
899 client_addr: Option<SocketAddr>,
900 ) {
901 let tx_digest = *request.transaction.digest();
902 match err {
903 None => {
904 debug!(?tx_digest, "Failed to {action} - Retrying");
905 spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
906 request.clone(),
907 tx_cert,
908 old_retry_times,
909 client_addr,
910 ));
911 }
912 Some(QuorumDriverError::SystemOverloadRetryAfter {
913 retry_after_secs, ..
914 }) => {
915 debug!(?tx_digest, "Failed to {action} - Retrying");
923 spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
924 request.clone(),
925 tx_cert,
926 old_retry_times,
927 client_addr,
928 Some(Duration::from_secs(retry_after_secs)),
929 ));
930 }
931 Some(qd_error) => {
932 debug!(?tx_digest, "Failed to {action}: {}", qd_error);
933 quorum_driver.notify(&request.transaction, &Err(qd_error), old_retry_times + 1);
936 }
937 }
938 }
939
940 async fn task_queue_processor(
941 quorum_driver: Arc<QuorumDriver<A>>,
942 mut task_receiver: Receiver<QuorumDriverTask>,
943 metrics: Arc<QuorumDriverMetrics>,
944 ) {
945 let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
946 while let Some(task) = task_receiver.recv().await {
947 let task_queue_span =
948 trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
949 let task_span_guard = task_queue_span.enter();
950
951 let limit = limit.clone();
954 let permit = limit.acquire_owned().await.unwrap();
955
956 debug!(?task, "Dequeued task");
959 if Instant::now()
960 .checked_duration_since(task.next_retry_after)
961 .is_none()
962 {
963 let _ = quorum_driver.enqueue_task(task).await;
965 continue;
966 }
967 metrics.current_requests_in_flight.dec();
968 let qd = quorum_driver.clone();
969 drop(task_span_guard);
970 spawn_monitored_task!(async move {
971 let _guard = permit;
972 QuorumDriverHandler::process_task(qd, task).await
973 });
974 }
975 }
976}
977
978pub struct QuorumDriverHandlerBuilder<A: Clone> {
979 validators: Arc<AuthorityAggregator<A>>,
980 metrics: Arc<QuorumDriverMetrics>,
981 notifier: Option<Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>>,
982 reconfig_observer: Option<Arc<dyn ReconfigObserver<A> + Sync + Send>>,
983 max_retry_times: u32,
984}
985
986impl<A> QuorumDriverHandlerBuilder<A>
987where
988 A: AuthorityAPI + Send + Sync + 'static + Clone,
989{
990 pub fn new(validators: Arc<AuthorityAggregator<A>>, metrics: Arc<QuorumDriverMetrics>) -> Self {
991 Self {
992 validators,
993 metrics,
994 notifier: None,
995 reconfig_observer: None,
996 max_retry_times: TX_MAX_RETRY_TIMES,
997 }
998 }
999
1000 pub(crate) fn with_notifier(
1001 mut self,
1002 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
1003 ) -> Self {
1004 self.notifier = Some(notifier);
1005 self
1006 }
1007
1008 pub fn with_reconfig_observer(
1009 mut self,
1010 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
1011 ) -> Self {
1012 self.reconfig_observer = Some(reconfig_observer);
1013 self
1014 }
1015
1016 pub fn with_max_retry_times(mut self, max_retry_times: u32) -> Self {
1018 self.max_retry_times = max_retry_times;
1019 self
1020 }
1021
1022 pub fn start(self) -> QuorumDriverHandler<A> {
1023 QuorumDriverHandler::new(
1024 self.validators,
1025 self.notifier.unwrap_or_else(|| {
1026 Arc::new(NotifyRead::<TransactionDigest, QuorumDriverResult>::new())
1027 }),
1028 self.reconfig_observer
1029 .expect("Reconfig observer is missing"),
1030 self.metrics,
1031 self.max_retry_times,
1032 )
1033 }
1034}