1mod metrics;
6pub use metrics::*;
7
8pub mod reconfig_observer;
9
10use std::{
11 fmt::{Debug, Formatter, Write},
12 net::SocketAddr,
13 sync::Arc,
14 time::Duration,
15};
16
17use arc_swap::ArcSwap;
18use iota_common::sync::notify_read::{NotifyRead, Registration};
19use iota_macros::fail_point;
20use iota_metrics::{
21 GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task,
22};
23use iota_types::{
24 base_types::TransactionDigest,
25 committee::{Committee, EpochId},
26 error::{IotaError, IotaResult},
27 messages_grpc::HandleCertificateRequestV1,
28 quorum_driver_types::{
29 ExecuteTransactionRequestV1, QuorumDriverEffectsQueueResult, QuorumDriverError,
30 QuorumDriverResponse, QuorumDriverResult,
31 },
32 transaction::{CertifiedTransaction, Transaction},
33};
34use tap::TapFallible;
35use tokio::{
36 sync::{
37 Semaphore,
38 mpsc::{self, Receiver, Sender},
39 },
40 task::JoinHandle,
41 time::{Instant, sleep_until},
42};
43use tracing::{debug, error, info, instrument, trace_span, warn};
44
45use self::reconfig_observer::ReconfigObserver;
46use crate::{
47 authority_aggregator::{
48 AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
49 ProcessTransactionResult,
50 },
51 authority_client::AuthorityAPI,
52};
53
54#[cfg(test)]
55mod tests;
56
57const TASK_QUEUE_SIZE: usize = 2000;
58const EFFECTS_QUEUE_SIZE: usize = 10000;
59const TX_MAX_RETRY_TIMES: u32 = 10;
60
61#[derive(Clone)]
62pub struct QuorumDriverTask {
63 pub request: ExecuteTransactionRequestV1,
64 pub tx_cert: Option<CertifiedTransaction>,
65 pub retry_times: u32,
66 pub next_retry_after: Instant,
67 pub client_addr: Option<SocketAddr>,
68 pub trace_span: Option<tracing::Span>,
69}
70
71impl Debug for QuorumDriverTask {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 let mut writer = String::new();
74 write!(writer, "tx_digest={:?} ", self.request.transaction.digest())?;
75 write!(writer, "has_tx_cert={} ", self.tx_cert.is_some())?;
76 write!(writer, "retry_times={} ", self.retry_times)?;
77 write!(writer, "next_retry_after={:?} ", self.next_retry_after)?;
78 write!(f, "{}", writer)
79 }
80}
81
82pub struct QuorumDriver<A: Clone> {
83 validators: ArcSwap<AuthorityAggregator<A>>,
84 task_sender: Sender<QuorumDriverTask>,
85 effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
86 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
87 metrics: Arc<QuorumDriverMetrics>,
88 max_retry_times: u32,
89}
90
91impl<A: Clone> QuorumDriver<A> {
92 pub(crate) fn new(
93 validators: ArcSwap<AuthorityAggregator<A>>,
94 task_sender: Sender<QuorumDriverTask>,
95 effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
96 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
97 metrics: Arc<QuorumDriverMetrics>,
98 max_retry_times: u32,
99 ) -> Self {
100 Self {
101 validators,
102 task_sender,
103 effects_subscribe_sender,
104 notifier,
105 metrics,
106 max_retry_times,
107 }
108 }
109
110 pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
111 &self.validators
112 }
113
114 pub fn clone_committee(&self) -> Arc<Committee> {
115 self.validators.load().committee.clone()
116 }
117
118 pub fn current_epoch(&self) -> EpochId {
119 self.validators.load().committee.epoch
120 }
121
122 async fn enqueue_task(&self, task: QuorumDriverTask) -> IotaResult<()> {
123 self.task_sender
124 .send(task.clone())
125 .await
126 .tap_err(|e| debug!(?task, "Failed to enqueue task: {:?}", e))
127 .tap_ok(|_| {
128 debug!(?task, "Enqueued task.");
129 self.metrics.current_requests_in_flight.inc();
130 self.metrics.total_enqueued.inc();
131 if task.retry_times > 0 {
132 if task.retry_times == 1 {
133 self.metrics.current_transactions_in_retry.inc();
134 }
135 self.metrics
136 .transaction_retry_count
137 .observe(task.retry_times as f64);
138 }
139 })
140 .map_err(|e| IotaError::QuorumDriverCommunication {
141 error: e.to_string(),
142 })
143 }
144
145 async fn enqueue_again_maybe(
148 &self,
149 request: ExecuteTransactionRequestV1,
150 tx_cert: Option<CertifiedTransaction>,
151 old_retry_times: u32,
152 client_addr: Option<SocketAddr>,
153 ) -> IotaResult<()> {
154 if old_retry_times >= self.max_retry_times {
155 info!(tx_digest=?request.transaction.digest(), "Failed to reach finality after attempting for {} times", old_retry_times+1);
157 self.notify(
158 &request.transaction,
159 &Err(
160 QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
161 total_attempts: old_retry_times + 1,
162 },
163 ),
164 old_retry_times + 1,
165 );
166 return Ok(());
167 }
168 self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
169 .await
170 }
171
172 async fn backoff_and_enqueue(
176 &self,
177 request: ExecuteTransactionRequestV1,
178 tx_cert: Option<CertifiedTransaction>,
179 old_retry_times: u32,
180 client_addr: Option<SocketAddr>,
181 min_backoff_duration: Option<Duration>,
182 ) -> IotaResult<()> {
183 let next_retry_after = Instant::now()
184 + Duration::from_millis(200 * u64::pow(2, old_retry_times))
185 .max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
186 sleep_until(next_retry_after).await;
187
188 fail_point!("count_retry_times");
189
190 let tx_cert = match tx_cert {
191 Some(tx_cert) if tx_cert.epoch() == self.current_epoch() => Some(tx_cert),
195 _other => None,
196 };
197
198 self.enqueue_task(QuorumDriverTask {
199 request,
200 tx_cert,
201 retry_times: old_retry_times + 1,
202 next_retry_after,
203 client_addr,
204 trace_span: Some(tracing::Span::current()),
205 })
206 .await
207 }
208
209 pub fn notify(
210 &self,
211 transaction: &Transaction,
212 response: &QuorumDriverResult,
213 total_attempts: u32,
214 ) {
215 let tx_digest = transaction.digest();
216 let effects_queue_result = match &response {
217 Ok(resp) => {
218 self.metrics.total_ok_responses.inc();
219 self.metrics
220 .attempt_times_ok_response
221 .observe(total_attempts as f64);
222 Ok((transaction.clone(), resp.clone()))
223 }
224 Err(err) => {
225 self.metrics
226 .total_err_responses
227 .with_label_values(&[err.as_ref()])
228 .inc();
229 Err((*tx_digest, err.clone()))
230 }
231 };
232 if total_attempts > 1 {
233 self.metrics.current_transactions_in_retry.dec();
234 }
235 if let Err(err) = self.effects_subscribe_sender.send(effects_queue_result) {
239 warn!(?tx_digest, "No subscriber found for effects: {}", err);
240 }
241 debug!(?tx_digest, "notify QuorumDriver task result");
242 self.notifier.notify(tx_digest, response);
243 }
244}
245
246impl<A> QuorumDriver<A>
247where
248 A: AuthorityAPI + Send + Sync + 'static + Clone,
249{
250 #[instrument(level = "trace", skip_all)]
251 pub async fn submit_transaction(
252 &self,
253 request: ExecuteTransactionRequestV1,
254 ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
255 let tx_digest = request.transaction.digest();
256 debug!(?tx_digest, "Received transaction execution request.");
257 self.metrics.total_requests.inc();
258
259 let ticket = self.notifier.register_one(tx_digest);
260 self.enqueue_task(QuorumDriverTask {
261 request,
262 tx_cert: None,
263 retry_times: 0,
264 next_retry_after: Instant::now(),
265 client_addr: None,
266 trace_span: Some(tracing::Span::current()),
267 })
268 .await?;
269 Ok(ticket)
270 }
271
272 #[instrument(level = "trace", skip_all)]
276 pub async fn submit_transaction_no_ticket(
277 &self,
278 request: ExecuteTransactionRequestV1,
279 client_addr: Option<SocketAddr>,
280 ) -> IotaResult<()> {
281 let tx_digest = request.transaction.digest();
282 debug!(
283 ?tx_digest,
284 "Received transaction execution request, no ticket."
285 );
286 self.metrics.total_requests.inc();
287
288 self.enqueue_task(QuorumDriverTask {
289 request,
290 tx_cert: None,
291 retry_times: 0,
292 next_retry_after: Instant::now(),
293 client_addr,
294 trace_span: Some(tracing::Span::current()),
295 })
296 .await
297 }
298
299 #[instrument(level = "trace", skip_all)]
300 pub(crate) async fn process_transaction(
301 &self,
302 transaction: Transaction,
303 client_addr: Option<SocketAddr>,
304 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
305 let auth_agg = self.validators.load();
306 let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
307 let tx_digest = *transaction.digest();
308 let result = auth_agg.process_transaction(transaction, client_addr).await;
309
310 self.process_transaction_result(result, tx_digest).await
311 }
312
313 #[instrument(level = "trace", skip_all)]
314 async fn process_transaction_result(
315 &self,
316 result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
317 tx_digest: TransactionDigest,
318 ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
319 match result {
320 Ok(resp) => Ok(resp),
321
322 Err(AggregatorProcessTransactionError::FatalConflictingTransaction {
323 errors,
324 conflicting_tx_digests,
325 }) => {
326 debug!(
327 ?errors,
328 "Observed Tx {tx_digest:} double spend attempted. Conflicting Txes: {conflicting_tx_digests:?}",
329 );
330 Err(Some(QuorumDriverError::ObjectsDoubleUsed {
331 conflicting_txes: conflicting_tx_digests,
332 }))
333 }
334
335 Err(AggregatorProcessTransactionError::FatalTransaction { errors }) => {
336 debug!(?tx_digest, ?errors, "Nonretryable transaction error");
337 Err(Some(QuorumDriverError::NonRecoverableTransactionError {
338 errors,
339 }))
340 }
341
342 Err(AggregatorProcessTransactionError::SystemOverload {
343 overloaded_stake,
344 errors,
345 }) => {
346 debug!(?tx_digest, ?errors, "System overload");
347 Err(Some(QuorumDriverError::SystemOverload {
348 overloaded_stake,
349 errors,
350 }))
351 }
352
353 Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
354 overload_stake,
355 errors,
356 retry_after_secs,
357 }) => {
358 self.metrics.total_retryable_overload_errors.inc();
359 debug!(
360 ?tx_digest,
361 ?errors,
362 "System overload and retry after secs {retry_after_secs}",
363 );
364 Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
365 overload_stake,
366 errors,
367 retry_after_secs,
368 }))
369 }
370
371 Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
372 debug!(?tx_digest, ?errors, "Retryable transaction error");
373 Err(None)
374 }
375
376 Err(
377 AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures,
378 ) => {
379 debug!(
380 ?tx_digest,
381 "Transaction is already finalized with different user signatures"
382 );
383 Err(Some(
384 QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures,
385 ))
386 }
387 }
388 }
389
390 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
391 pub(crate) async fn process_certificate(
392 &self,
393 request: HandleCertificateRequestV1,
394 client_addr: Option<SocketAddr>,
395 ) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
396 let auth_agg = self.validators.load();
397 let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
398 let tx_digest = *request.certificate.digest();
399 let response = auth_agg
400 .process_certificate(request.clone(), client_addr)
401 .await
402 .map_err(|agg_err| match agg_err {
403 AggregatorProcessCertificateError::FatalExecuteCertificate {
404 non_retryable_errors,
405 } => {
406 error!(
408 ?tx_digest,
409 ?non_retryable_errors,
410 "[WATCHOUT] Unexpected Fatal error for certificate"
411 );
412 Some(QuorumDriverError::NonRecoverableTransactionError {
413 errors: non_retryable_errors,
414 })
415 }
416 AggregatorProcessCertificateError::RetryableExecuteCertificate {
417 retryable_errors,
418 } => {
419 debug!(?retryable_errors, "Retryable certificate");
420 None
421 }
422 })?;
423
424 Ok(response)
425 }
426
427 pub async fn update_validators(&self, new_validators: Arc<AuthorityAggregator<A>>) {
428 info!(
429 "Quorum Driver updating AuthorityAggregator with committee {}",
430 new_validators.committee
431 );
432 self.validators.store(new_validators);
433 }
434}
435
436pub struct QuorumDriverHandler<A: Clone> {
437 quorum_driver: Arc<QuorumDriver<A>>,
438 effects_subscriber: tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>,
439 quorum_driver_metrics: Arc<QuorumDriverMetrics>,
440 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
441 _processor_handle: JoinHandle<()>,
442}
443
444impl<A> QuorumDriverHandler<A>
445where
446 A: AuthorityAPI + Send + Sync + 'static + Clone,
447{
448 pub(crate) fn new(
449 validators: Arc<AuthorityAggregator<A>>,
450 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
451 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
452 metrics: Arc<QuorumDriverMetrics>,
453 max_retry_times: u32,
454 ) -> Self {
455 let (task_tx, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
456 let (subscriber_tx, subscriber_rx) =
457 tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
458 let quorum_driver = Arc::new(QuorumDriver::new(
459 ArcSwap::new(validators),
460 task_tx,
461 subscriber_tx,
462 notifier,
463 metrics.clone(),
464 max_retry_times,
465 ));
466 let metrics_clone = metrics.clone();
467 let processor_handle = {
468 let quorum_driver_clone = quorum_driver.clone();
469 spawn_monitored_task!(Self::task_queue_processor(
470 quorum_driver_clone,
471 task_rx,
472 metrics_clone
473 ))
474 };
475 let reconfig_observer_clone = reconfig_observer.clone();
476 {
477 let quorum_driver_clone = quorum_driver.clone();
478 spawn_monitored_task!({
479 async move {
480 let mut reconfig_observer_clone = reconfig_observer_clone.clone_boxed();
481 reconfig_observer_clone.run(quorum_driver_clone).await;
482 }
483 });
484 };
485 Self {
486 quorum_driver,
487 effects_subscriber: subscriber_rx,
488 quorum_driver_metrics: metrics,
489 reconfig_observer,
490 _processor_handle: processor_handle,
491 }
492 }
493
494 pub async fn submit_transaction_no_ticket(
498 &self,
499 request: ExecuteTransactionRequestV1,
500 client_addr: Option<SocketAddr>,
501 ) -> IotaResult<()> {
502 self.quorum_driver
503 .submit_transaction_no_ticket(request, client_addr)
504 .await
505 }
506
507 pub async fn submit_transaction(
508 &self,
509 request: ExecuteTransactionRequestV1,
510 ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
511 self.quorum_driver.submit_transaction(request).await
512 }
513
514 pub fn clone_new(&self) -> Self {
520 let (task_sender, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
521 let (effects_subscribe_sender, subscriber_rx) =
522 tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
523 let validators = ArcSwap::new(self.quorum_driver.authority_aggregator().load_full());
524 let quorum_driver = Arc::new(QuorumDriver {
525 validators,
526 task_sender,
527 effects_subscribe_sender,
528 notifier: Arc::new(NotifyRead::new()),
529 metrics: self.quorum_driver_metrics.clone(),
530 max_retry_times: self.quorum_driver.max_retry_times,
531 });
532 let metrics = self.quorum_driver_metrics.clone();
533 let processor_handle = {
534 let quorum_driver_copy = quorum_driver.clone();
535 spawn_monitored_task!(Self::task_queue_processor(
536 quorum_driver_copy,
537 task_rx,
538 metrics,
539 ))
540 };
541 {
542 let quorum_driver_copy = quorum_driver.clone();
543 let reconfig_observer = self.reconfig_observer.clone();
544 spawn_monitored_task!({
545 async move {
546 let mut reconfig_observer_clone = reconfig_observer.clone_boxed();
547 reconfig_observer_clone.run(quorum_driver_copy).await;
548 }
549 })
550 };
551
552 Self {
553 quorum_driver,
554 effects_subscriber: subscriber_rx,
555 quorum_driver_metrics: self.quorum_driver_metrics.clone(),
556 reconfig_observer: self.reconfig_observer.clone(),
557 _processor_handle: processor_handle,
558 }
559 }
560
561 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
562 self.quorum_driver.clone()
563 }
564
565 pub fn subscribe_to_effects(
566 &self,
567 ) -> tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult> {
568 self.effects_subscriber.resubscribe()
569 }
570
571 pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
572 self.quorum_driver.authority_aggregator()
573 }
574
575 pub fn current_epoch(&self) -> EpochId {
576 self.quorum_driver.current_epoch()
577 }
578
579 #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
583 async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
584 debug!(?task, "Quorum Driver processing task");
585 let QuorumDriverTask {
586 request,
587 tx_cert,
588 retry_times: old_retry_times,
589 client_addr,
590 ..
591 } = task;
592 let transaction = &request.transaction;
593 let tx_digest = *transaction.digest();
594 let is_single_writer_tx = !transaction.contains_shared_object();
595
596 let timer = Instant::now();
597 let (tx_cert, newly_formed) = match tx_cert {
598 None => match quorum_driver
599 .process_transaction(transaction.clone(), client_addr)
600 .await
601 {
602 Ok(ProcessTransactionResult::Certified {
603 certificate,
604 newly_formed,
605 }) => {
606 debug!(?tx_digest, "Transaction processing succeeded");
607 (certificate, newly_formed)
608 }
609 Ok(ProcessTransactionResult::Executed(effects_cert, events)) => {
610 debug!(
611 ?tx_digest,
612 "Transaction processing succeeded with effects directly"
613 );
614 let response = QuorumDriverResponse {
615 effects_cert,
616 events: Some(events),
617 input_objects: None,
618 output_objects: None,
619 auxiliary_data: None,
620 };
621 quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
622 return;
623 }
624 Err(err) => {
625 Self::handle_error(
626 quorum_driver,
627 request,
628 err,
629 None,
630 old_retry_times,
631 "get tx cert",
632 client_addr,
633 );
634 return;
635 }
636 },
637 Some(tx_cert) => (tx_cert, false),
638 };
639
640 let response = match quorum_driver
641 .process_certificate(
642 HandleCertificateRequestV1 {
643 certificate: tx_cert.clone(),
644 include_events: request.include_events,
645 include_input_objects: request.include_input_objects,
646 include_output_objects: request.include_output_objects,
647 include_auxiliary_data: request.include_auxiliary_data,
648 },
649 client_addr,
650 )
651 .await
652 {
653 Ok(response) => {
654 debug!(?tx_digest, "Certificate processing succeeded");
655 response
656 }
657 Err(err) => {
660 Self::handle_error(
661 quorum_driver,
662 request,
663 err,
664 Some(tx_cert),
665 old_retry_times,
666 "get effects cert",
667 client_addr,
668 );
669 return;
670 }
671 };
672 if newly_formed {
673 let settlement_finality_latency = timer.elapsed().as_secs_f64();
674 quorum_driver
675 .metrics
676 .settlement_finality_latency
677 .with_label_values(&[if is_single_writer_tx {
678 TX_TYPE_SINGLE_WRITER_TX
679 } else {
680 TX_TYPE_SHARED_OBJ_TX
681 }])
682 .observe(settlement_finality_latency);
683 let is_out_of_expected_range =
684 settlement_finality_latency >= 8.0 || settlement_finality_latency <= 0.1;
685 debug!(
686 ?tx_digest,
687 ?is_single_writer_tx,
688 ?is_out_of_expected_range,
689 "QuorumDriver settlement finality latency: {:.3} seconds",
690 settlement_finality_latency
691 );
692 }
693
694 quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
695 }
696
697 fn handle_error(
698 quorum_driver: Arc<QuorumDriver<A>>,
699 request: ExecuteTransactionRequestV1,
700 err: Option<QuorumDriverError>,
701 tx_cert: Option<CertifiedTransaction>,
702 old_retry_times: u32,
703 action: &'static str,
704 client_addr: Option<SocketAddr>,
705 ) {
706 let tx_digest = *request.transaction.digest();
707 match err {
708 None => {
709 debug!(?tx_digest, "Failed to {action} - Retrying");
710 spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
711 request.clone(),
712 tx_cert,
713 old_retry_times,
714 client_addr,
715 ));
716 }
717 Some(QuorumDriverError::SystemOverloadRetryAfter {
718 retry_after_secs, ..
719 }) => {
720 debug!(?tx_digest, "Failed to {action} - Retrying");
728 spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
729 request.clone(),
730 tx_cert,
731 old_retry_times,
732 client_addr,
733 Some(Duration::from_secs(retry_after_secs)),
734 ));
735 }
736 Some(qd_error) => {
737 debug!(?tx_digest, "Failed to {action}: {}", qd_error);
738 quorum_driver.notify(&request.transaction, &Err(qd_error), old_retry_times + 1);
741 }
742 }
743 }
744
745 async fn task_queue_processor(
746 quorum_driver: Arc<QuorumDriver<A>>,
747 mut task_receiver: Receiver<QuorumDriverTask>,
748 metrics: Arc<QuorumDriverMetrics>,
749 ) {
750 let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
751 while let Some(task) = task_receiver.recv().await {
752 let task_queue_span =
753 trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
754 let task_span_guard = task_queue_span.enter();
755
756 let limit = limit.clone();
759 let permit = limit.acquire_owned().await.unwrap();
760
761 debug!(?task, "Dequeued task");
764 if Instant::now()
765 .checked_duration_since(task.next_retry_after)
766 .is_none()
767 {
768 let _ = quorum_driver.enqueue_task(task).await;
770 continue;
771 }
772 metrics.current_requests_in_flight.dec();
773 let qd = quorum_driver.clone();
774 drop(task_span_guard);
775 spawn_monitored_task!(async move {
776 let _guard = permit;
777 QuorumDriverHandler::process_task(qd, task).await
778 });
779 }
780 }
781}
782
783pub struct QuorumDriverHandlerBuilder<A: Clone> {
784 validators: Arc<AuthorityAggregator<A>>,
785 metrics: Arc<QuorumDriverMetrics>,
786 notifier: Option<Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>>,
787 reconfig_observer: Option<Arc<dyn ReconfigObserver<A> + Sync + Send>>,
788 max_retry_times: u32,
789}
790
791impl<A> QuorumDriverHandlerBuilder<A>
792where
793 A: AuthorityAPI + Send + Sync + 'static + Clone,
794{
795 pub fn new(validators: Arc<AuthorityAggregator<A>>, metrics: Arc<QuorumDriverMetrics>) -> Self {
796 Self {
797 validators,
798 metrics,
799 notifier: None,
800 reconfig_observer: None,
801 max_retry_times: TX_MAX_RETRY_TIMES,
802 }
803 }
804
805 pub(crate) fn with_notifier(
806 mut self,
807 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
808 ) -> Self {
809 self.notifier = Some(notifier);
810 self
811 }
812
813 pub fn with_reconfig_observer(
814 mut self,
815 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
816 ) -> Self {
817 self.reconfig_observer = Some(reconfig_observer);
818 self
819 }
820
821 pub fn with_max_retry_times(mut self, max_retry_times: u32) -> Self {
823 self.max_retry_times = max_retry_times;
824 self
825 }
826
827 pub fn start(self) -> QuorumDriverHandler<A> {
828 QuorumDriverHandler::new(
829 self.validators,
830 self.notifier.unwrap_or_else(|| {
831 Arc::new(NotifyRead::<TransactionDigest, QuorumDriverResult>::new())
832 }),
833 self.reconfig_observer
834 .expect("Reconfig observer is missing"),
835 self.metrics,
836 self.max_retry_times,
837 )
838 }
839}