iota_core/quorum_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod metrics;
6pub use metrics::*;
7
8pub mod reconfig_observer;
9
10use std::{
11    collections::{BTreeMap, BTreeSet},
12    fmt::{Debug, Formatter, Write},
13    net::SocketAddr,
14    sync::Arc,
15    time::Duration,
16};
17
18use arc_swap::ArcSwap;
19use iota_common::sync::notify_read::{NotifyRead, Registration};
20use iota_macros::fail_point;
21use iota_metrics::{
22    GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task,
23};
24use iota_types::{
25    base_types::{AuthorityName, ObjectRef, TransactionDigest},
26    committee::{Committee, EpochId, StakeUnit},
27    error::{IotaError, IotaResult},
28    messages_grpc::HandleCertificateRequestV1,
29    messages_safe_client::PlainTransactionInfoResponse,
30    quorum_driver_types::{
31        ExecuteTransactionRequestV1, QuorumDriverEffectsQueueResult, QuorumDriverError,
32        QuorumDriverResponse, QuorumDriverResult,
33    },
34    transaction::{CertifiedTransaction, Transaction},
35};
36use tap::TapFallible;
37use tokio::{
38    sync::{
39        Semaphore,
40        mpsc::{self, Receiver, Sender},
41    },
42    task::JoinHandle,
43    time::{Instant, sleep_until},
44};
45use tracing::{debug, error, info, instrument, trace_span, warn};
46
47use self::reconfig_observer::ReconfigObserver;
48use crate::{
49    authority_aggregator::{
50        AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
51        ProcessTransactionResult,
52    },
53    authority_client::AuthorityAPI,
54};
55
56#[cfg(test)]
57mod tests;
58
59const TASK_QUEUE_SIZE: usize = 2000;
60const EFFECTS_QUEUE_SIZE: usize = 10000;
61const TX_MAX_RETRY_TIMES: u32 = 10;
62
63#[derive(Clone)]
64pub struct QuorumDriverTask {
65    pub request: ExecuteTransactionRequestV1,
66    pub tx_cert: Option<CertifiedTransaction>,
67    pub retry_times: u32,
68    pub next_retry_after: Instant,
69    pub client_addr: Option<SocketAddr>,
70    pub trace_span: Option<tracing::Span>,
71}
72
73impl Debug for QuorumDriverTask {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        let mut writer = String::new();
76        write!(writer, "tx_digest={:?} ", self.request.transaction.digest())?;
77        write!(writer, "has_tx_cert={} ", self.tx_cert.is_some())?;
78        write!(writer, "retry_times={} ", self.retry_times)?;
79        write!(writer, "next_retry_after={:?} ", self.next_retry_after)?;
80        write!(f, "{}", writer)
81    }
82}
83
84pub struct QuorumDriver<A: Clone> {
85    validators: ArcSwap<AuthorityAggregator<A>>,
86    task_sender: Sender<QuorumDriverTask>,
87    effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
88    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
89    metrics: Arc<QuorumDriverMetrics>,
90    max_retry_times: u32,
91}
92
93impl<A: Clone> QuorumDriver<A> {
94    pub(crate) fn new(
95        validators: ArcSwap<AuthorityAggregator<A>>,
96        task_sender: Sender<QuorumDriverTask>,
97        effects_subscribe_sender: tokio::sync::broadcast::Sender<QuorumDriverEffectsQueueResult>,
98        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
99        metrics: Arc<QuorumDriverMetrics>,
100        max_retry_times: u32,
101    ) -> Self {
102        Self {
103            validators,
104            task_sender,
105            effects_subscribe_sender,
106            notifier,
107            metrics,
108            max_retry_times,
109        }
110    }
111
112    pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
113        &self.validators
114    }
115
116    pub fn clone_committee(&self) -> Arc<Committee> {
117        self.validators.load().committee.clone()
118    }
119
120    pub fn current_epoch(&self) -> EpochId {
121        self.validators.load().committee.epoch
122    }
123
124    async fn enqueue_task(&self, task: QuorumDriverTask) -> IotaResult<()> {
125        self.task_sender
126            .send(task.clone())
127            .await
128            .tap_err(|e| debug!(?task, "Failed to enqueue task: {:?}", e))
129            .tap_ok(|_| {
130                debug!(?task, "Enqueued task.");
131                self.metrics.current_requests_in_flight.inc();
132                self.metrics.total_enqueued.inc();
133                if task.retry_times > 0 {
134                    if task.retry_times == 1 {
135                        self.metrics.current_transactions_in_retry.inc();
136                    }
137                    self.metrics
138                        .transaction_retry_count
139                        .report(task.retry_times as u64);
140                }
141            })
142            .map_err(|e| IotaError::QuorumDriverCommunication {
143                error: e.to_string(),
144            })
145    }
146
147    /// Enqueue the task again if it hasn't maxed out the total retry attempts.
148    /// If it has, notify failure.
149    async fn enqueue_again_maybe(
150        &self,
151        request: ExecuteTransactionRequestV1,
152        tx_cert: Option<CertifiedTransaction>,
153        old_retry_times: u32,
154        client_addr: Option<SocketAddr>,
155    ) -> IotaResult<()> {
156        if old_retry_times >= self.max_retry_times {
157            // max out the retry times, notify failure
158            info!(tx_digest=?request.transaction.digest(), "Failed to reach finality after attempting for {} times", old_retry_times+1);
159            self.notify(
160                &request.transaction,
161                &Err(
162                    QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
163                        total_attempts: old_retry_times + 1,
164                    },
165                ),
166                old_retry_times + 1,
167            );
168            return Ok(());
169        }
170        self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
171            .await
172    }
173
174    /// Performs exponential backoff and enqueue the `transaction` to the
175    /// execution queue. When `min_backoff_duration` is provided, the
176    /// backoff duration will be at least `min_backoff_duration`.
177    async fn backoff_and_enqueue(
178        &self,
179        request: ExecuteTransactionRequestV1,
180        tx_cert: Option<CertifiedTransaction>,
181        old_retry_times: u32,
182        client_addr: Option<SocketAddr>,
183        min_backoff_duration: Option<Duration>,
184    ) -> IotaResult<()> {
185        let next_retry_after = Instant::now()
186            + Duration::from_millis(200 * u64::pow(2, old_retry_times))
187                .max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
188        sleep_until(next_retry_after).await;
189
190        fail_point!("count_retry_times");
191
192        let tx_cert = match tx_cert {
193            // TxCert is only valid when its epoch matches current epoch.
194            // Note, it's impossible that TxCert's epoch is larger than current epoch
195            // because the TxCert will be considered invalid and cannot reach here.
196            Some(tx_cert) if tx_cert.epoch() == self.current_epoch() => Some(tx_cert),
197            _other => None,
198        };
199
200        self.enqueue_task(QuorumDriverTask {
201            request,
202            tx_cert,
203            retry_times: old_retry_times + 1,
204            next_retry_after,
205            client_addr,
206            trace_span: Some(tracing::Span::current()),
207        })
208        .await
209    }
210
211    pub fn notify(
212        &self,
213        transaction: &Transaction,
214        response: &QuorumDriverResult,
215        total_attempts: u32,
216    ) {
217        let tx_digest = transaction.digest();
218        let effects_queue_result = match &response {
219            Ok(resp) => {
220                self.metrics.total_ok_responses.inc();
221                self.metrics
222                    .attempt_times_ok_response
223                    .report(total_attempts as u64);
224                Ok((transaction.clone(), resp.clone()))
225            }
226            Err(err) => {
227                self.metrics
228                    .total_err_responses
229                    .with_label_values(&[err.as_ref()])
230                    .inc();
231                Err((*tx_digest, err.clone()))
232            }
233        };
234        if total_attempts > 1 {
235            self.metrics.current_transactions_in_retry.dec();
236        }
237        // On fullnode we expect the send to always succeed because
238        // TransactionOrchestrator should be subscribing to this queue all the
239        // time. However the if QuorumDriver is used elsewhere log may be noisy.
240        if let Err(err) = self.effects_subscribe_sender.send(effects_queue_result) {
241            warn!(?tx_digest, "No subscriber found for effects: {}", err);
242        }
243        debug!(?tx_digest, "notify QuorumDriver task result");
244        self.notifier.notify(tx_digest, response);
245    }
246}
247
248impl<A> QuorumDriver<A>
249where
250    A: AuthorityAPI + Send + Sync + 'static + Clone,
251{
252    #[instrument(level = "trace", skip_all)]
253    pub async fn submit_transaction(
254        &self,
255        request: ExecuteTransactionRequestV1,
256    ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
257        let tx_digest = request.transaction.digest();
258        debug!(?tx_digest, "Received transaction execution request.");
259        self.metrics.total_requests.inc();
260
261        let ticket = self.notifier.register_one(tx_digest);
262        self.enqueue_task(QuorumDriverTask {
263            request,
264            tx_cert: None,
265            retry_times: 0,
266            next_retry_after: Instant::now(),
267            client_addr: None,
268            trace_span: Some(tracing::Span::current()),
269        })
270        .await?;
271        Ok(ticket)
272    }
273
274    // Used when the it is called in a component holding the notifier, and a ticket
275    // is already obtained prior to calling this function, for instance,
276    // TransactionOrchestrator
277    #[instrument(level = "trace", skip_all)]
278    pub async fn submit_transaction_no_ticket(
279        &self,
280        request: ExecuteTransactionRequestV1,
281        client_addr: Option<SocketAddr>,
282    ) -> IotaResult<()> {
283        let tx_digest = request.transaction.digest();
284        debug!(
285            ?tx_digest,
286            "Received transaction execution request, no ticket."
287        );
288        self.metrics.total_requests.inc();
289
290        self.enqueue_task(QuorumDriverTask {
291            request,
292            tx_cert: None,
293            retry_times: 0,
294            next_retry_after: Instant::now(),
295            client_addr,
296            trace_span: Some(tracing::Span::current()),
297        })
298        .await
299    }
300
301    #[instrument(level = "trace", skip_all)]
302    pub(crate) async fn process_transaction(
303        &self,
304        transaction: Transaction,
305        client_addr: Option<SocketAddr>,
306    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
307        let auth_agg = self.validators.load();
308        let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
309        let tx_digest = *transaction.digest();
310        let result = auth_agg.process_transaction(transaction, client_addr).await;
311
312        self.process_transaction_result(result, tx_digest, client_addr)
313            .await
314    }
315
316    #[instrument(level = "trace", skip_all)]
317    async fn process_transaction_result(
318        &self,
319        result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
320        tx_digest: TransactionDigest,
321        client_addr: Option<SocketAddr>,
322    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
323        match result {
324            Ok(resp) => Ok(resp),
325            Err(AggregatorProcessTransactionError::RetryableConflictingTransaction {
326                conflicting_tx_digest_to_retry,
327                errors,
328                conflicting_tx_digests,
329            }) => {
330                self.metrics
331                    .total_err_process_tx_responses_with_nonzero_conflicting_transactions
332                    .inc();
333                debug!(
334                    ?tx_digest,
335                    "Observed {} conflicting transactions: {:?}",
336                    conflicting_tx_digests.len(),
337                    conflicting_tx_digests
338                );
339
340                if let Some(conflicting_tx_digest) = conflicting_tx_digest_to_retry {
341                    self.process_conflicting_tx(
342                        tx_digest,
343                        conflicting_tx_digest,
344                        conflicting_tx_digests,
345                        client_addr,
346                    )
347                    .await
348                } else {
349                    // If no retryable conflicting transaction was returned that means we have >=
350                    // 2f+1 good stake for the original transaction + retryable
351                    // stake. Will continue to retry the original transaction.
352                    debug!(
353                        ?errors,
354                        "Observed Tx {tx_digest:} is still in retryable state. Conflicting Txes: {conflicting_tx_digests:?}",
355                    );
356                    Err(None)
357                }
358            }
359
360            Err(AggregatorProcessTransactionError::FatalConflictingTransaction {
361                errors,
362                conflicting_tx_digests,
363            }) => {
364                debug!(
365                    ?errors,
366                    "Observed Tx {tx_digest:} double spend attempted. Conflicting Txes: {conflicting_tx_digests:?}",
367                );
368                Err(Some(QuorumDriverError::ObjectsDoubleUsed {
369                    conflicting_txes: conflicting_tx_digests,
370                    retried_tx: None,
371                    retried_tx_success: None,
372                }))
373            }
374
375            Err(AggregatorProcessTransactionError::FatalTransaction { errors }) => {
376                debug!(?tx_digest, ?errors, "Nonretryable transaction error");
377                Err(Some(QuorumDriverError::NonRecoverableTransactionError {
378                    errors,
379                }))
380            }
381
382            Err(AggregatorProcessTransactionError::SystemOverload {
383                overloaded_stake,
384                errors,
385            }) => {
386                debug!(?tx_digest, ?errors, "System overload");
387                Err(Some(QuorumDriverError::SystemOverload {
388                    overloaded_stake,
389                    errors,
390                }))
391            }
392
393            Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
394                overload_stake,
395                errors,
396                retry_after_secs,
397            }) => {
398                self.metrics.total_retryable_overload_errors.inc();
399                debug!(
400                    ?tx_digest,
401                    ?errors,
402                    "System overload and retry after secs {retry_after_secs}",
403                );
404                Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
405                    overload_stake,
406                    errors,
407                    retry_after_secs,
408                }))
409            }
410
411            Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
412                debug!(?tx_digest, ?errors, "Retryable transaction error");
413                Err(None)
414            }
415
416            Err(
417                AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures,
418            ) => {
419                debug!(
420                    ?tx_digest,
421                    "Transaction is already finalized with different user signatures"
422                );
423                Err(Some(
424                    QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures,
425                ))
426            }
427        }
428    }
429
430    #[instrument(level = "trace", skip_all)]
431    async fn process_conflicting_tx(
432        &self,
433        tx_digest: TransactionDigest,
434        conflicting_tx_digest: TransactionDigest,
435        conflicting_tx_digests: BTreeMap<
436            TransactionDigest,
437            (Vec<(AuthorityName, ObjectRef)>, StakeUnit),
438        >,
439        client_addr: Option<SocketAddr>,
440    ) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
441        // Safe to unwrap because tx_digest_to_retry is generated from
442        // conflicting_tx_digests
443        // in ProcessTransactionState::conflicting_tx_digest_with_most_stake()
444        let (validators, _) = conflicting_tx_digests.get(&conflicting_tx_digest).unwrap();
445        let attempt_result = self
446            .attempt_conflicting_transaction(
447                &conflicting_tx_digest,
448                &tx_digest,
449                validators.iter().map(|(pub_key, _)| *pub_key).collect(),
450                client_addr,
451            )
452            .await;
453        self.metrics
454            .total_attempts_retrying_conflicting_transaction
455            .inc();
456
457        match attempt_result {
458            Err(err) => {
459                debug!(
460                    ?tx_digest,
461                    ?conflicting_tx_digest,
462                    "Encountered error while attempting conflicting transaction: {:?}",
463                    err
464                );
465                Err(Some(QuorumDriverError::ObjectsDoubleUsed {
466                    conflicting_txes: conflicting_tx_digests,
467                    retried_tx: None,
468                    retried_tx_success: None,
469                }))
470            }
471            Ok(success) => {
472                debug!(
473                    ?tx_digest,
474                    ?conflicting_tx_digest,
475                    "Retried conflicting transaction. Success: {}",
476                    success
477                );
478                if success {
479                    self.metrics
480                        .total_successful_attempts_retrying_conflicting_transaction
481                        .inc();
482                }
483                Err(Some(QuorumDriverError::ObjectsDoubleUsed {
484                    conflicting_txes: conflicting_tx_digests,
485                    retried_tx: Some(conflicting_tx_digest),
486                    retried_tx_success: Some(success),
487                }))
488            }
489        }
490    }
491
492    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
493    pub(crate) async fn process_certificate(
494        &self,
495        request: HandleCertificateRequestV1,
496        client_addr: Option<SocketAddr>,
497    ) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
498        let auth_agg = self.validators.load();
499        let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
500        let tx_digest = *request.certificate.digest();
501        let response = auth_agg
502            .process_certificate(request.clone(), client_addr)
503            .await
504            .map_err(|agg_err| match agg_err {
505                AggregatorProcessCertificateError::FatalExecuteCertificate {
506                    non_retryable_errors,
507                } => {
508                    // Normally a certificate shouldn't have fatal errors.
509                    error!(
510                        ?tx_digest,
511                        ?non_retryable_errors,
512                        "[WATCHOUT] Unexpected Fatal error for certificate"
513                    );
514                    Some(QuorumDriverError::NonRecoverableTransactionError {
515                        errors: non_retryable_errors,
516                    })
517                }
518                AggregatorProcessCertificateError::RetryableExecuteCertificate {
519                    retryable_errors,
520                } => {
521                    debug!(?retryable_errors, "Retryable certificate");
522                    None
523                }
524            })?;
525
526        Ok(response)
527    }
528
529    pub async fn update_validators(&self, new_validators: Arc<AuthorityAggregator<A>>) {
530        info!(
531            "Quorum Driver updating AuthorityAggregator with committee {}",
532            new_validators.committee
533        );
534        self.validators.store(new_validators);
535    }
536
537    /// Returns Some(true) if the conflicting transaction is executed
538    /// successfully (or already executed), or Some(false) if it did not.
539    #[instrument(level = "trace", skip_all)]
540    async fn attempt_conflicting_transaction(
541        &self,
542        tx_digest: &TransactionDigest,
543        original_tx_digest: &TransactionDigest,
544        validators: BTreeSet<AuthorityName>,
545        client_addr: Option<SocketAddr>,
546    ) -> IotaResult<bool> {
547        let response = self
548            .validators
549            .load()
550            .handle_transaction_info_request_from_some_validators(
551                tx_digest,
552                &validators,
553                Some(Duration::from_secs(10)),
554            )
555            .await?;
556
557        // If we are able to get a certificate right away, we use it and execute the
558        // cert; otherwise, we have to re-form a cert and execute it.
559        let transaction = match response {
560            PlainTransactionInfoResponse::ExecutedWithCert(cert, _, _) => {
561                self.metrics
562                    .total_times_conflicting_transaction_already_finalized_when_retrying
563                    .inc();
564                // We still want to ask validators to execute this certificate in case this
565                // certificate is not known to the rest of them (e.g. when
566                // *this* validator is bad).
567                let result = self
568                    .validators
569                    .load()
570                    .process_certificate(
571                        HandleCertificateRequestV1 {
572                            certificate: cert,
573                            include_events: true,
574                            include_input_objects: false,
575                            include_output_objects: false,
576                            include_auxiliary_data: false,
577                        },
578                        client_addr,
579                    )
580                    .await
581                    .tap_ok(|_resp| {
582                        debug!(
583                            ?tx_digest,
584                            ?original_tx_digest,
585                            "Retry conflicting transaction certificate succeeded."
586                        );
587                    })
588                    .tap_err(|err| {
589                        debug!(
590                            ?tx_digest,
591                            ?original_tx_digest,
592                            "Retry conflicting transaction certificate got an error: {:?}",
593                            err
594                        );
595                    });
596                // We only try it once.
597                return Ok(result.is_ok());
598            }
599            PlainTransactionInfoResponse::Signed(signed) => {
600                signed.verify_committee_sigs_only(&self.clone_committee())?;
601                signed.into_unsigned()
602            }
603            PlainTransactionInfoResponse::ExecutedWithoutCert(transaction, _, _) => transaction,
604        };
605        // Now ask validators to execute this transaction.
606        let result = self
607            .validators
608            .load()
609            .execute_transaction_block(&transaction, client_addr)
610            .await
611            .tap_ok(|_resp| {
612                debug!(
613                    ?tx_digest,
614                    ?original_tx_digest,
615                    "Retry conflicting transaction succeeded."
616                );
617            })
618            .tap_err(|err| {
619                debug!(
620                    ?tx_digest,
621                    ?original_tx_digest,
622                    "Retry conflicting transaction got an error: {:?}",
623                    err
624                );
625            });
626        // We only try it once
627        Ok(result.is_ok())
628    }
629}
630
631pub struct QuorumDriverHandler<A: Clone> {
632    quorum_driver: Arc<QuorumDriver<A>>,
633    effects_subscriber: tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>,
634    quorum_driver_metrics: Arc<QuorumDriverMetrics>,
635    reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
636    _processor_handle: JoinHandle<()>,
637}
638
639impl<A> QuorumDriverHandler<A>
640where
641    A: AuthorityAPI + Send + Sync + 'static + Clone,
642{
643    pub(crate) fn new(
644        validators: Arc<AuthorityAggregator<A>>,
645        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
646        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
647        metrics: Arc<QuorumDriverMetrics>,
648        max_retry_times: u32,
649    ) -> Self {
650        let (task_tx, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
651        let (subscriber_tx, subscriber_rx) =
652            tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
653        let quorum_driver = Arc::new(QuorumDriver::new(
654            ArcSwap::new(validators),
655            task_tx,
656            subscriber_tx,
657            notifier,
658            metrics.clone(),
659            max_retry_times,
660        ));
661        let metrics_clone = metrics.clone();
662        let processor_handle = {
663            let quorum_driver_clone = quorum_driver.clone();
664            spawn_monitored_task!(Self::task_queue_processor(
665                quorum_driver_clone,
666                task_rx,
667                metrics_clone
668            ))
669        };
670        let reconfig_observer_clone = reconfig_observer.clone();
671        {
672            let quorum_driver_clone = quorum_driver.clone();
673            spawn_monitored_task!({
674                async move {
675                    let mut reconfig_observer_clone = reconfig_observer_clone.clone_boxed();
676                    reconfig_observer_clone.run(quorum_driver_clone).await;
677                }
678            });
679        };
680        Self {
681            quorum_driver,
682            effects_subscriber: subscriber_rx,
683            quorum_driver_metrics: metrics,
684            reconfig_observer,
685            _processor_handle: processor_handle,
686        }
687    }
688
689    // Used when the it is called in a component holding the notifier, and a ticket
690    // is already obtained prior to calling this function, for instance,
691    // TransactionOrchestrator
692    pub async fn submit_transaction_no_ticket(
693        &self,
694        request: ExecuteTransactionRequestV1,
695        client_addr: Option<SocketAddr>,
696    ) -> IotaResult<()> {
697        self.quorum_driver
698            .submit_transaction_no_ticket(request, client_addr)
699            .await
700    }
701
702    pub async fn submit_transaction(
703        &self,
704        request: ExecuteTransactionRequestV1,
705    ) -> IotaResult<Registration<TransactionDigest, QuorumDriverResult>> {
706        self.quorum_driver.submit_transaction(request).await
707    }
708
709    /// Create a new `QuorumDriverHandler` based on the same
710    /// AuthorityAggregator. Note: the new `QuorumDriverHandler` will have a
711    /// new `ArcSwap<AuthorityAggregator>` that is NOT tied to the original
712    /// one. So if there are multiple QuorumDriver(Handler) then all of them
713    /// need to do reconfigs on their own.
714    pub fn clone_new(&self) -> Self {
715        let (task_sender, task_rx) = mpsc::channel::<QuorumDriverTask>(TASK_QUEUE_SIZE);
716        let (effects_subscribe_sender, subscriber_rx) =
717            tokio::sync::broadcast::channel::<_>(EFFECTS_QUEUE_SIZE);
718        let validators = ArcSwap::new(self.quorum_driver.authority_aggregator().load_full());
719        let quorum_driver = Arc::new(QuorumDriver {
720            validators,
721            task_sender,
722            effects_subscribe_sender,
723            notifier: Arc::new(NotifyRead::new()),
724            metrics: self.quorum_driver_metrics.clone(),
725            max_retry_times: self.quorum_driver.max_retry_times,
726        });
727        let metrics = self.quorum_driver_metrics.clone();
728        let processor_handle = {
729            let quorum_driver_copy = quorum_driver.clone();
730            spawn_monitored_task!(Self::task_queue_processor(
731                quorum_driver_copy,
732                task_rx,
733                metrics,
734            ))
735        };
736        {
737            let quorum_driver_copy = quorum_driver.clone();
738            let reconfig_observer = self.reconfig_observer.clone();
739            spawn_monitored_task!({
740                async move {
741                    let mut reconfig_observer_clone = reconfig_observer.clone_boxed();
742                    reconfig_observer_clone.run(quorum_driver_copy).await;
743                }
744            })
745        };
746
747        Self {
748            quorum_driver,
749            effects_subscriber: subscriber_rx,
750            quorum_driver_metrics: self.quorum_driver_metrics.clone(),
751            reconfig_observer: self.reconfig_observer.clone(),
752            _processor_handle: processor_handle,
753        }
754    }
755
756    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
757        self.quorum_driver.clone()
758    }
759
760    pub fn subscribe_to_effects(
761        &self,
762    ) -> tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult> {
763        self.effects_subscriber.resubscribe()
764    }
765
766    pub fn authority_aggregator(&self) -> &ArcSwap<AuthorityAggregator<A>> {
767        self.quorum_driver.authority_aggregator()
768    }
769
770    pub fn current_epoch(&self) -> EpochId {
771        self.quorum_driver.current_epoch()
772    }
773
774    /// Process a QuorumDriverTask.
775    /// The function has no return value - the corresponding actions of task
776    /// result are performed in this call.
777    #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
778    async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
779        debug!(?task, "Quorum Driver processing task");
780        let QuorumDriverTask {
781            request,
782            tx_cert,
783            retry_times: old_retry_times,
784            client_addr,
785            ..
786        } = task;
787        let transaction = &request.transaction;
788        let tx_digest = *transaction.digest();
789        let is_single_writer_tx = !transaction.contains_shared_object();
790
791        let timer = Instant::now();
792        let (tx_cert, newly_formed) = match tx_cert {
793            None => match quorum_driver
794                .process_transaction(transaction.clone(), client_addr)
795                .await
796            {
797                Ok(ProcessTransactionResult::Certified {
798                    certificate,
799                    newly_formed,
800                }) => {
801                    debug!(?tx_digest, "Transaction processing succeeded");
802                    (certificate, newly_formed)
803                }
804                Ok(ProcessTransactionResult::Executed(effects_cert, events)) => {
805                    debug!(
806                        ?tx_digest,
807                        "Transaction processing succeeded with effects directly"
808                    );
809                    let response = QuorumDriverResponse {
810                        effects_cert,
811                        events: Some(events),
812                        input_objects: None,
813                        output_objects: None,
814                        auxiliary_data: None,
815                    };
816                    quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
817                    return;
818                }
819                Err(err) => {
820                    Self::handle_error(
821                        quorum_driver,
822                        request,
823                        err,
824                        None,
825                        old_retry_times,
826                        "get tx cert",
827                        client_addr,
828                    );
829                    return;
830                }
831            },
832            Some(tx_cert) => (tx_cert, false),
833        };
834
835        let response = match quorum_driver
836            .process_certificate(
837                HandleCertificateRequestV1 {
838                    certificate: tx_cert.clone(),
839                    include_events: request.include_events,
840                    include_input_objects: request.include_input_objects,
841                    include_output_objects: request.include_output_objects,
842                    include_auxiliary_data: request.include_auxiliary_data,
843                },
844                client_addr,
845            )
846            .await
847        {
848            Ok(response) => {
849                debug!(?tx_digest, "Certificate processing succeeded");
850                response
851            }
852            // Note: non retryable failure when processing a cert
853            // should be very rare.
854            Err(err) => {
855                Self::handle_error(
856                    quorum_driver,
857                    request,
858                    err,
859                    Some(tx_cert),
860                    old_retry_times,
861                    "get effects cert",
862                    client_addr,
863                );
864                return;
865            }
866        };
867        if newly_formed {
868            let settlement_finality_latency = timer.elapsed().as_secs_f64();
869            quorum_driver
870                .metrics
871                .settlement_finality_latency
872                .with_label_values(&[if is_single_writer_tx {
873                    TX_TYPE_SINGLE_WRITER_TX
874                } else {
875                    TX_TYPE_SHARED_OBJ_TX
876                }])
877                .observe(settlement_finality_latency);
878            let is_out_of_expected_range =
879                settlement_finality_latency >= 8.0 || settlement_finality_latency <= 0.1;
880            debug!(
881                ?tx_digest,
882                ?is_single_writer_tx,
883                ?is_out_of_expected_range,
884                "QuorumDriver settlement finality latency: {:.3} seconds",
885                settlement_finality_latency
886            );
887        }
888
889        quorum_driver.notify(transaction, &Ok(response), old_retry_times + 1);
890    }
891
892    fn handle_error(
893        quorum_driver: Arc<QuorumDriver<A>>,
894        request: ExecuteTransactionRequestV1,
895        err: Option<QuorumDriverError>,
896        tx_cert: Option<CertifiedTransaction>,
897        old_retry_times: u32,
898        action: &'static str,
899        client_addr: Option<SocketAddr>,
900    ) {
901        let tx_digest = *request.transaction.digest();
902        match err {
903            None => {
904                debug!(?tx_digest, "Failed to {action} - Retrying");
905                spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
906                    request.clone(),
907                    tx_cert,
908                    old_retry_times,
909                    client_addr,
910                ));
911            }
912            Some(QuorumDriverError::SystemOverloadRetryAfter {
913                retry_after_secs, ..
914            }) => {
915                // Special case for SystemOverloadRetryAfter error. In this case, due to that
916                // objects are already locked inside validators, we need to
917                // perform continuous retry and ignore `max_retry_times`.
918                // TODO: the txn can potentially be retried unlimited times, therefore, we need
919                // to bound the number of on going transactions in a quorum
920                // driver. When the limit is reached, the quorum driver should
921                // reject any new transaction requests.
922                debug!(?tx_digest, "Failed to {action} - Retrying");
923                spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
924                    request.clone(),
925                    tx_cert,
926                    old_retry_times,
927                    client_addr,
928                    Some(Duration::from_secs(retry_after_secs)),
929                ));
930            }
931            Some(qd_error) => {
932                debug!(?tx_digest, "Failed to {action}: {}", qd_error);
933                // non-retryable failure, this task reaches terminal state for now, notify
934                // waiter.
935                quorum_driver.notify(&request.transaction, &Err(qd_error), old_retry_times + 1);
936            }
937        }
938    }
939
940    async fn task_queue_processor(
941        quorum_driver: Arc<QuorumDriver<A>>,
942        mut task_receiver: Receiver<QuorumDriverTask>,
943        metrics: Arc<QuorumDriverMetrics>,
944    ) {
945        let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
946        while let Some(task) = task_receiver.recv().await {
947            let task_queue_span =
948                trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
949            let task_span_guard = task_queue_span.enter();
950
951            // hold semaphore permit until task completes. unwrap ok because we never close
952            // the semaphore in this context.
953            let limit = limit.clone();
954            let permit = limit.acquire_owned().await.unwrap();
955
956            // TODO check reconfig process here
957
958            debug!(?task, "Dequeued task");
959            if Instant::now()
960                .checked_duration_since(task.next_retry_after)
961                .is_none()
962            {
963                // Not ready for next attempt yet, re-enqueue
964                let _ = quorum_driver.enqueue_task(task).await;
965                continue;
966            }
967            metrics.current_requests_in_flight.dec();
968            let qd = quorum_driver.clone();
969            drop(task_span_guard);
970            spawn_monitored_task!(async move {
971                let _guard = permit;
972                QuorumDriverHandler::process_task(qd, task).await
973            });
974        }
975    }
976}
977
978pub struct QuorumDriverHandlerBuilder<A: Clone> {
979    validators: Arc<AuthorityAggregator<A>>,
980    metrics: Arc<QuorumDriverMetrics>,
981    notifier: Option<Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>>,
982    reconfig_observer: Option<Arc<dyn ReconfigObserver<A> + Sync + Send>>,
983    max_retry_times: u32,
984}
985
986impl<A> QuorumDriverHandlerBuilder<A>
987where
988    A: AuthorityAPI + Send + Sync + 'static + Clone,
989{
990    pub fn new(validators: Arc<AuthorityAggregator<A>>, metrics: Arc<QuorumDriverMetrics>) -> Self {
991        Self {
992            validators,
993            metrics,
994            notifier: None,
995            reconfig_observer: None,
996            max_retry_times: TX_MAX_RETRY_TIMES,
997        }
998    }
999
1000    pub(crate) fn with_notifier(
1001        mut self,
1002        notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
1003    ) -> Self {
1004        self.notifier = Some(notifier);
1005        self
1006    }
1007
1008    pub fn with_reconfig_observer(
1009        mut self,
1010        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
1011    ) -> Self {
1012        self.reconfig_observer = Some(reconfig_observer);
1013        self
1014    }
1015
1016    /// Used in tests when smaller number of retries is desired
1017    pub fn with_max_retry_times(mut self, max_retry_times: u32) -> Self {
1018        self.max_retry_times = max_retry_times;
1019        self
1020    }
1021
1022    pub fn start(self) -> QuorumDriverHandler<A> {
1023        QuorumDriverHandler::new(
1024            self.validators,
1025            self.notifier.unwrap_or_else(|| {
1026                Arc::new(NotifyRead::<TransactionDigest, QuorumDriverResult>::new())
1027            }),
1028            self.reconfig_observer
1029                .expect("Reconfig observer is missing"),
1030            self.metrics,
1031            self.max_retry_times,
1032        )
1033    }
1034}