iota_core/quorum_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod metrics;
6pub use metrics::*;
7
8pub mod reconfig_observer;
9
10use std::{
11    fmt::{Debug, Formatter, Write},
12    net::SocketAddr,
13    sync::Arc,
14    time::Duration,
15};
16
17use arc_swap::ArcSwap;
18use iota_common::sync::notify_read::{NotifyRead, Registration};
19use iota_macros::fail_point;
20use iota_metrics::{
21    GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task,
22};
23use iota_types::{
24    base_types::TransactionDigest,
25    committee::{Committee, EpochId},
26    error::{IotaError, IotaResult},
27    messages_grpc::HandleCertificateRequestV1,
28    quorum_driver_types::{
29        ExecuteTransactionRequestV1, QuorumDriverEffectsQueueResult, QuorumDriverError,
30        QuorumDriverResponse, QuorumDriverResult,
31    },
32    transaction::{CertifiedTransaction, Transaction},
33};
34use tap::TapFallible;
35use tokio::{
36    sync::{
37        Semaphore,
38        mpsc::{self, Receiver, Sender},
39    },
40    task::JoinHandle,
41    time::{Instant, sleep_until},
42};
43use tracing::{debug, error, info, instrument, trace_span, warn};
44
45use self::reconfig_observer::ReconfigObserver;
46use crate::{
47    authority_aggregator::{
48        AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
49        ProcessTransactionResult,
50    },
51    authority_client::AuthorityAPI,
52};
53
54#[cfg(test)]
55mod tests;
56
57const TASK_QUEUE_SIZE: usize = 2000;
58const EFFECTS_QUEUE_SIZE: usize = 10000;
59const TX_MAX_RETRY_TIMES: u32 = 10;
60
61#[derive(Clone)]
62pub struct QuorumDriverTask {
63    pub request: ExecuteTransactionRequestV1,
64    pub tx_cert: Option<CertifiedTransaction>,
65    pub retry_times: u32,
66    pub next_retry_after: Instant,
67    pub client_addr: Option<SocketAddr>,
68    pub trace_span: Option<tracing::Span>,
69}
70
71impl Debug for QuorumDriverTask {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        let mut writer = String::new();
74        write!(writer, "tx_digest={:?} ", self.request.transaction.digest())?;
75        write!(writer, "has_tx_cert={} ", self.tx_cert.is_some())?;
76        write!(writer, "retry_times={} ", self.retry_times)?;
77        write!(writer, "next_retry_after={:?} ", self.next_retry_after)?;
78        write!(f, "{}", writer)
79    }
80}
81
82pub struct QuorumDriver<A: Clone> {
83    validators: ArcSwap<AuthorityAggregator<A>>,
84    task_sender: Sender<QuorumDriverTask>,
85    effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
86    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
87    metrics: Arc<QuorumDriverMetrics>,
88    max_retry_times: u32,
89}
90
91impl<A: Clone> QuorumDriver<A> {
92    pub(crate) fn new(
93        validators: ArcSwap<AuthorityAggregator<A>>,
94        task_sender: Sender<QuorumDriverTask>,
95        effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
96        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
97        metrics: Arc<QuorumDriverMetrics>,
98        max_retry_times: u32,
99    ) -> Self {
100        Self {
101            validators,
102            task_sender,
103            effects_subscribe_sender,
104            notifier,
105            metrics,
106            max_retry_times,
107        }
108    }
109
110    pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
111        &self.validators
112    }
113
114    pub fn clone_committee(&self) -> Arc<Committee> {
115        self.validators.load().committee.clone()
116    }
117
118    pub fn current_epoch(&self) -> EpochId {
119        self.validators.load().committee.epoch
120    }
121
122    async fn enqueue_task(&self, task: QuorumDriverTask) -> IotaResult<()> {
123        self.task_sender
124            .send(task.clone())
125            .await
126            .tap_err(|e| debug!(?task, "Failed to enqueue task: {:?}", e))
127            .tap_ok(|_| {
128                debug!(?task, "Enqueued task.");
129                self.metrics.current_requests_in_flight.inc();
130                self.metrics.total_enqueued.inc();
131                if task.retry_times > 0 {
132                    if task.retry_times == 1 {
133                        self.metrics.current_transactions_in_retry.inc();
134                    }
135                    self.metrics
136                        .transaction_retry_count
137                        .observe(task.retry_times as f64);
138                }
139            })
140            .map_err(|e| IotaError::QuorumDriverCommunication {
141                error: e.to_string(),
142            })
143    }
144
145    /// Enqueue the task again if it hasn't maxed out the total retry attempts.
146    /// If it has, notify failure.
147    async fn enqueue_again_maybe(
148        &self,
149        request: ExecuteTransactionRequestV1,
150        tx_cert: Option<CertifiedTransaction>,
151        old_retry_times: u32,
152        client_addr: Option<SocketAddr>,
153    ) -> IotaResult<()> {
154        if old_retry_times >= self.max_retry_times {
155            // max out the retry times, notify failure
156            info!(tx_digest=?request.transaction.digest(), "Failed to reach finality after attempting for {} times", old_retry_times+1);
157            self.notify(
158                &request.transaction,
159                &Err(
160                    QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
161                        total_attempts: old_retry_times + 1,
162                    },
163                ),
164                old_retry_times + 1,
165            );
166            return Ok(());
167        }
168        self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
169            .await
170    }
171
172    /// Performs exponential backoff and enqueue the `transaction` to the
173    /// execution queue. When `min_backoff_duration` is provided, the
174    /// backoff duration will be at least `min_backoff_duration`.
175    async fn backoff_and_enqueue(
176        &self,
177        request: ExecuteTransactionRequestV1,
178        tx_cert: Option<CertifiedTransaction>,
179        old_retry_times: u32,
180        client_addr: Option<SocketAddr>,
181        min_backoff_duration: Option<Duration>,
182    ) -> IotaResult<()> {
183        let next_retry_after = Instant::now()
184            + Duration::from_millis(200 * u64::pow(2, old_retry_times))
185                .max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
186        sleep_until(next_retry_after).await;
187
188        fail_point!("count_retry_times");
189
190        let tx_cert = match tx_cert {
191            // TxCert is only valid when its epoch matches current epoch.
192            // Note, it's impossible that TxCert's epoch is larger than current epoch
193            // because the TxCert will be considered invalid and cannot reach here.
194            Some(tx_cert) if tx_cert.epoch() == self.current_epoch() => Some(tx_cert),
195            _other => None,
196        };
197
198        self.enqueue_task(QuorumDriverTask {
199            request,
200            tx_cert,
201            retry_times: old_retry_times + 1,
202            next_retry_after,
203            client_addr,
204            trace_span: Some(tracing::Span::current()),
205        })
206        .await
207    }
208
209    pub fn notify(
210        &self,
211        transaction: &Transaction,
212        response: &QuorumDriverResult,
213        total_attempts: u32,
214    ) {
215        let tx_digest = transaction.digest();
216        let effects_queue_result = match &response {
217            Ok(resp) => {
218                self.metrics.total_ok_responses.inc();
219                self.metrics
220                    .attempt_times_ok_response
221                    .observe(total_attempts as f64);
222                Ok((transaction.clone(), resp.clone()))
223            }
224            Err(err) => {
225                self.metrics
226                    .total_err_responses
227                    .with_label_values(&[err.as_ref()])
228                    .inc();
229                Err((*tx_digest, err.clone()))
230            }
231        };
232        if total_attempts > 1 {
233            self.metrics.current_transactions_in_retry.dec();
234        }
235        // On fullnode we expect the send to always succeed because
236        // TransactionOrchestrator should be subscribing to this queue all the
237        // time. However the if QuorumDriver is used elsewhere log may be noisy.
238        if let Err(err) = self.effects_subscribe_sender.send(effects_queue_result) {
239            warn!(?tx_digest, "No subscriber found for effects: {}", err);
240        }
241        debug!(?tx_digest, "notify QuorumDriver task result");
242        self.notifier.notify(tx_digest, response);
243    }
244}
245
246impl<A> QuorumDriver<A>
247where
248    A: AuthorityAPI + Send + Sync + 'static + Clone,
249{
250    #[instrument(level = "trace", skip_all)]
251    pub async fn submit_transaction(
252        &self,
253        request: ExecuteTransactionRequestV1,
254    ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
255        let tx_digest = request.transaction.digest();
256        debug!(?tx_digest, "Received transaction execution request.");
257        self.metrics.total_requests.inc();
258
259        let ticket = self.notifier.register_one(tx_digest);
260        self.enqueue_task(QuorumDriverTask {
261            request,
262            tx_cert: None,
263            retry_times: 0,
264            next_retry_after: Instant::now(),
265            client_addr: None,
266            trace_span: Some(tracing::Span::current()),
267        })
268        .await?;
269        Ok(ticket)
270    }
271
272    // Used when the it is called in a component holding the notifier, and a ticket
273    // is already obtained prior to calling this function, for instance,
274    // TransactionOrchestrator
275    #[instrument(level = "trace", skip_all)]
276    pub async fn submit_transaction_no_ticket(
277        &self,
278        request: ExecuteTransactionRequestV1,
279        client_addr: Option<SocketAddr>,
280    ) -> IotaResult<()> {
281        let tx_digest = request.transaction.digest();
282        debug!(
283            ?tx_digest,
284            "Received transaction execution request, no ticket."
285        );
286        self.metrics.total_requests.inc();
287
288        self.enqueue_task(QuorumDriverTask {
289            request,
290            tx_cert: None,
291            retry_times: 0,
292            next_retry_after: Instant::now(),
293            client_addr,
294            trace_span: Some(tracing::Span::current()),
295        })
296        .await
297    }
298
299    #[instrument(level = "trace", skip_all)]
300    pub(crate) async fn process_transaction(
301        &self,
302        transaction: Transaction,
303        client_addr: Option<SocketAddr>,
304    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
305        let auth_agg = self.validators.load();
306        let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
307        let tx_digest = *transaction.digest();
308        let result = auth_agg.process_transaction(transaction, client_addr).await;
309
310        self.process_transaction_result(result, tx_digest).await
311    }
312
313    #[instrument(level = "trace", skip_all)]
314    async fn process_transaction_result(
315        &self,
316        result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
317        tx_digest: TransactionDigest,
318    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
319        match result {
320            Ok(resp) => Ok(resp),
321
322            Err(AggregatorProcessTransactionError::FatalConflictingTransaction {
323                errors,
324                conflicting_tx_digests,
325            }) => {
326                debug!(
327                    ?errors,
328                    "Observed Tx {tx_digest:} double spend attempted. Conflicting Txes: {conflicting_tx_digests:?}",
329                );
330                Err(Some(QuorumDriverError::ObjectsDoubleUsed {
331                    conflicting_txes: conflicting_tx_digests,
332                }))
333            }
334
335            Err(AggregatorProcessTransactionError::FatalTransaction { errors }) => {
336                debug!(?tx_digest, ?errors, "Nonretryable transaction error");
337                Err(Some(QuorumDriverError::NonRecoverableTransactionError {
338                    errors,
339                }))
340            }
341
342            Err(AggregatorProcessTransactionError::SystemOverload {
343                overloaded_stake,
344                errors,
345            }) => {
346                debug!(?tx_digest, ?errors, "System overload");
347                Err(Some(QuorumDriverError::SystemOverload {
348                    overloaded_stake,
349                    errors,
350                }))
351            }
352
353            Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
354                overload_stake,
355                errors,
356                retry_after_secs,
357            }) => {
358                self.metrics.total_retryable_overload_errors.inc();
359                debug!(
360                    ?tx_digest,
361                    ?errors,
362                    "System overload and retry after secs {retry_after_secs}",
363                );
364                Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
365                    overload_stake,
366                    errors,
367                    retry_after_secs,
368                }))
369            }
370
371            Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
372                debug!(?tx_digest, ?errors, "Retryable transaction error");
373                Err(None)
374            }
375
376            Err(
377                AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures,
378            ) => {
379                debug!(
380                    ?tx_digest,
381                    "Transaction is already finalized with different user signatures"
382                );
383                Err(Some(
384                    QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures,
385                ))
386            }
387        }
388    }
389
390    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
391    pub(crate) async fn process_certificate(
392        &self,
393        request: HandleCertificateRequestV1,
394        client_addr: Option<SocketAddr>,
395    ) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
396        let auth_agg = self.validators.load();
397        let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
398        let tx_digest = *request.certificate.digest();
399        let response = auth_agg
400            .process_certificate(request.clone(), client_addr)
401            .await
402            .map_err(|agg_err| match agg_err {
403                AggregatorProcessCertificateError::FatalExecuteCertificate {
404                    non_retryable_errors,
405                } => {
406                    // Normally a certificate shouldn't have fatal errors.
407                    error!(
408                        ?tx_digest,
409                        ?non_retryable_errors,
410                        "[WATCHOUT] Unexpected Fatal error for certificate"
411                    );
412                    Some(QuorumDriverError::NonRecoverableTransactionError {
413                        errors: non_retryable_errors,
414                    })
415                }
416                AggregatorProcessCertificateError::RetryableExecuteCertificate {
417                    retryable_errors,
418                } => {
419                    debug!(?retryable_errors, "Retryable certificate");
420                    None
421                }
422            })?;
423
424        Ok(response)
425    }
426
427    pub async fn update_validators(&self, new_validators: Arc<AuthorityAggregator<A>>) {
428        info!(
429            "Quorum Driver updating AuthorityAggregator with committee {}",
430            new_validators.committee
431        );
432        self.validators.store(new_validators);
433    }
434}
435
436pub struct QuorumDriverHandler<A: Clone> {
437    quorum_driver: Arc<QuorumDriver<A>>,
438    effects_subscriber: tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>,
439    quorum_driver_metrics: Arc<QuorumDriverMetrics>,
440    reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
441    _processor_handle: JoinHandle<()>,
442}
443
444impl<A> QuorumDriverHandler<A>
445where
446    A: AuthorityAPI + Send + Sync + 'static + Clone,
447{
448    pub(crate) fn new(
449        validators: Arc<AuthorityAggregator<A>>,
450        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
451        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
452        metrics: Arc<QuorumDriverMetrics>,
453        max_retry_times: u32,
454    ) -> Self {
455        let (task_tx, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
456        let (subscriber_tx, subscriber_rx) =
457            tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
458        let quorum_driver = Arc::new(QuorumDriver::new(
459            ArcSwap::new(validators),
460            task_tx,
461            subscriber_tx,
462            notifier,
463            metrics.clone(),
464            max_retry_times,
465        ));
466        let metrics_clone = metrics.clone();
467        let processor_handle = {
468            let quorum_driver_clone = quorum_driver.clone();
469            spawn_monitored_task!(Self::task_queue_processor(
470                quorum_driver_clone,
471                task_rx,
472                metrics_clone
473            ))
474        };
475        let reconfig_observer_clone = reconfig_observer.clone();
476        {
477            let quorum_driver_clone = quorum_driver.clone();
478            spawn_monitored_task!({
479                async move {
480                    let mut reconfig_observer_clone = reconfig_observer_clone.clone_boxed();
481                    reconfig_observer_clone.run(quorum_driver_clone).await;
482                }
483            });
484        };
485        Self {
486            quorum_driver,
487            effects_subscriber: subscriber_rx,
488            quorum_driver_metrics: metrics,
489            reconfig_observer,
490            _processor_handle: processor_handle,
491        }
492    }
493
494    // Used when the it is called in a component holding the notifier, and a ticket
495    // is already obtained prior to calling this function, for instance,
496    // TransactionOrchestrator
497    pub async fn submit_transaction_no_ticket(
498        &self,
499        request: ExecuteTransactionRequestV1,
500        client_addr: Option<SocketAddr>,
501    ) -> IotaResult<()> {
502        self.quorum_driver
503            .submit_transaction_no_ticket(request, client_addr)
504            .await
505    }
506
507    pub async fn submit_transaction(
508        &self,
509        request: ExecuteTransactionRequestV1,
510    ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
511        self.quorum_driver.submit_transaction(request).await
512    }
513
514    /// Create a new `QuorumDriverHandler` based on the same
515    /// AuthorityAggregator. Note: the new `QuorumDriverHandler` will have a
516    /// new `ArcSwap<AuthorityAggregator>` that is NOT tied to the original
517    /// one. So if there are multiple QuorumDriver(Handler) then all of them
518    /// need to do reconfigs on their own.
519    pub fn clone_new(&self) -> Self {
520        let (task_sender, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
521        let (effects_subscribe_sender, subscriber_rx) =
522            tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
523        let validators = ArcSwap::new(self.quorum_driver.authority_aggregator().load_full());
524        let quorum_driver = Arc::new(QuorumDriver {
525            validators,
526            task_sender,
527            effects_subscribe_sender,
528            notifier: Arc::new(NotifyRead::new()),
529            metrics: self.quorum_driver_metrics.clone(),
530            max_retry_times: self.quorum_driver.max_retry_times,
531        });
532        let metrics = self.quorum_driver_metrics.clone();
533        let processor_handle = {
534            let quorum_driver_copy = quorum_driver.clone();
535            spawn_monitored_task!(Self::task_queue_processor(
536                quorum_driver_copy,
537                task_rx,
538                metrics,
539            ))
540        };
541        {
542            let quorum_driver_copy = quorum_driver.clone();
543            let reconfig_observer = self.reconfig_observer.clone();
544            spawn_monitored_task!({
545                async move {
546                    let mut reconfig_observer_clone = reconfig_observer.clone_boxed();
547                    reconfig_observer_clone.run(quorum_driver_copy).await;
548                }
549            })
550        };
551
552        Self {
553            quorum_driver,
554            effects_subscriber: subscriber_rx,
555            quorum_driver_metrics: self.quorum_driver_metrics.clone(),
556            reconfig_observer: self.reconfig_observer.clone(),
557            _processor_handle: processor_handle,
558        }
559    }
560
561    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
562        self.quorum_driver.clone()
563    }
564
565    pub fn subscribe_to_effects(
566        &self,
567    ) -> tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult> {
568        self.effects_subscriber.resubscribe()
569    }
570
571    pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
572        self.quorum_driver.authority_aggregator()
573    }
574
575    pub fn current_epoch(&self) -> EpochId {
576        self.quorum_driver.current_epoch()
577    }
578
579    /// Process a QuorumDriverTask.
580    /// The function has no return value - the corresponding actions of task
581    /// result are performed in this call.
582    #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
583    async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
584        debug!(?task, "Quorum Driver processing task");
585        let QuorumDriverTask {
586            request,
587            tx_cert,
588            retry_times: old_retry_times,
589            client_addr,
590            ..
591        } = task;
592        let transaction = &request.transaction;
593        let tx_digest = *transaction.digest();
594        let is_single_writer_tx = !transaction.contains_shared_object();
595
596        let timer = Instant::now();
597        let (tx_cert, newly_formed) = match tx_cert {
598            None => match quorum_driver
599                .process_transaction(transaction.clone(), client_addr)
600                .await
601            {
602                Ok(ProcessTransactionResult::Certified {
603                    certificate,
604                    newly_formed,
605                }) => {
606                    debug!(?tx_digest, "Transaction processing succeeded");
607                    (certificate, newly_formed)
608                }
609                Ok(ProcessTransactionResult::Executed(effects_cert, events)) => {
610                    debug!(
611                        ?tx_digest,
612                        "Transaction processing succeeded with effects directly"
613                    );
614                    let response = QuorumDriverResponse {
615                        effects_cert,
616                        events: Some(events),
617                        input_objects: None,
618                        output_objects: None,
619                        auxiliary_data: None,
620                    };
621                    quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
622                    return;
623                }
624                Err(err) => {
625                    Self::handle_error(
626                        quorum_driver,
627                        request,
628                        err,
629                        None,
630                        old_retry_times,
631                        "get tx cert",
632                        client_addr,
633                    );
634                    return;
635                }
636            },
637            Some(tx_cert) => (tx_cert, false),
638        };
639
640        let response = match quorum_driver
641            .process_certificate(
642                HandleCertificateRequestV1 {
643                    certificate: tx_cert.clone(),
644                    include_events: request.include_events,
645                    include_input_objects: request.include_input_objects,
646                    include_output_objects: request.include_output_objects,
647                    include_auxiliary_data: request.include_auxiliary_data,
648                },
649                client_addr,
650            )
651            .await
652        {
653            Ok(response) => {
654                debug!(?tx_digest, "Certificate processing succeeded");
655                response
656            }
657            // Note: non retryable failure when processing a cert
658            // should be very rare.
659            Err(err) => {
660                Self::handle_error(
661                    quorum_driver,
662                    request,
663                    err,
664                    Some(tx_cert),
665                    old_retry_times,
666                    "get effects cert",
667                    client_addr,
668                );
669                return;
670            }
671        };
672        if newly_formed {
673            let settlement_finality_latency = timer.elapsed().as_secs_f64();
674            quorum_driver
675                .metrics
676                .settlement_finality_latency
677                .with_label_values(&[if is_single_writer_tx {
678                    TX_TYPE_SINGLE_WRITER_TX
679                } else {
680                    TX_TYPE_SHARED_OBJ_TX
681                }])
682                .observe(settlement_finality_latency);
683            let is_out_of_expected_range =
684                settlement_finality_latency >= 8.0 || settlement_finality_latency <= 0.1;
685            debug!(
686                ?tx_digest,
687                ?is_single_writer_tx,
688                ?is_out_of_expected_range,
689                "QuorumDriver settlement finality latency: {:.3} seconds",
690                settlement_finality_latency
691            );
692        }
693
694        quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
695    }
696
697    fn handle_error(
698        quorum_driver: Arc<QuorumDriver<A>>,
699        request: ExecuteTransactionRequestV1,
700        err: Option<QuorumDriverError>,
701        tx_cert: Option<CertifiedTransaction>,
702        old_retry_times: u32,
703        action: &'static str,
704        client_addr: Option<SocketAddr>,
705    ) {
706        let tx_digest = *request.transaction.digest();
707        match err {
708            None => {
709                debug!(?tx_digest, "Failed to {action} - Retrying");
710                spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
711                    request.clone(),
712                    tx_cert,
713                    old_retry_times,
714                    client_addr,
715                ));
716            }
717            Some(QuorumDriverError::SystemOverloadRetryAfter {
718                retry_after_secs, ..
719            }) => {
720                // Special case for SystemOverloadRetryAfter error. In this case, due to that
721                // objects are already locked inside validators, we need to
722                // perform continuous retry and ignore `max_retry_times`.
723                // TODO: the txn can potentially be retried unlimited times, therefore, we need
724                // to bound the number of on going transactions in a quorum
725                // driver. When the limit is reached, the quorum driver should
726                // reject any new transaction requests.
727                debug!(?tx_digest, "Failed to {action} - Retrying");
728                spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
729                    request.clone(),
730                    tx_cert,
731                    old_retry_times,
732                    client_addr,
733                    Some(Duration::from_secs(retry_after_secs)),
734                ));
735            }
736            Some(qd_error) => {
737                debug!(?tx_digest, "Failed to {action}: {}", qd_error);
738                // non-retryable failure, this task reaches terminal state for now, notify
739                // waiter.
740                quorum_driver.notify(&request.transaction, &Err(qd_error), old_retry_times + 1);
741            }
742        }
743    }
744
745    async fn task_queue_processor(
746        quorum_driver: Arc<QuorumDriver<A>>,
747        mut task_receiver: Receiver<QuorumDriverTask>,
748        metrics: Arc<QuorumDriverMetrics>,
749    ) {
750        let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
751        while let Some(task) = task_receiver.recv().await {
752            let task_queue_span =
753                trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
754            let task_span_guard = task_queue_span.enter();
755
756            // hold semaphore permit until task completes. unwrap ok because we never close
757            // the semaphore in this context.
758            let limit = limit.clone();
759            let permit = limit.acquire_owned().await.unwrap();
760
761            // TODO check reconfig process here
762
763            debug!(?task, "Dequeued task");
764            if Instant::now()
765                .checked_duration_since(task.next_retry_after)
766                .is_none()
767            {
768                // Not ready for next attempt yet, re-enqueue
769                let _ = quorum_driver.enqueue_task(task).await;
770                continue;
771            }
772            metrics.current_requests_in_flight.dec();
773            let qd = quorum_driver.clone();
774            drop(task_span_guard);
775            spawn_monitored_task!(async move {
776                let _guard = permit;
777                QuorumDriverHandler::process_task(qd, task).await
778            });
779        }
780    }
781}
782
783pub struct QuorumDriverHandlerBuilder<A: Clone> {
784    validators: Arc<AuthorityAggregator<A>>,
785    metrics: Arc<QuorumDriverMetrics>,
786    notifier: Option<Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>>,
787    reconfig_observer: Option<Arc<dyn ReconfigObserver<A> + Sync + Send>>,
788    max_retry_times: u32,
789}
790
791impl<A> QuorumDriverHandlerBuilder<A>
792where
793    A: AuthorityAPI + Send + Sync + 'static + Clone,
794{
795    pub fn new(validators: Arc<AuthorityAggregator<A>>, metrics: Arc<QuorumDriverMetrics>) -> Self {
796        Self {
797            validators,
798            metrics,
799            notifier: None,
800            reconfig_observer: None,
801            max_retry_times: TX_MAX_RETRY_TIMES,
802        }
803    }
804
805    pub(crate) fn with_notifier(
806        mut self,
807        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
808    ) -> Self {
809        self.notifier = Some(notifier);
810        self
811    }
812
813    pub fn with_reconfig_observer(
814        mut self,
815        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
816    ) -> Self {
817        self.reconfig_observer = Some(reconfig_observer);
818        self
819    }
820
821    /// Used in tests when smaller number of retries is desired
822    pub fn with_max_retry_times(mut self, max_retry_times: u32) -> Self {
823        self.max_retry_times = max_retry_times;
824        self
825    }
826
827    pub fn start(self) -> QuorumDriverHandler<A> {
828        QuorumDriverHandler::new(
829            self.validators,
830            self.notifier.unwrap_or_else(|| {
831                Arc::new(NotifyRead::<TransactionDigest, QuorumDriverResult>::new())
832            }),
833            self.reconfig_observer
834                .expect("Reconfig observer is missing"),
835            self.metrics,
836            self.max_retry_times,
837        )
838    }
839}