Skip to main content

iota_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5// Transaction Orchestrator is a Node component that utilizes Quorum Driver (or
6// optionally TransactionDriver) to submit transactions to validators for
7// finality, and proactively executes finalized transactions locally.
8
9use std::{
10    collections::BTreeMap, net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration,
11};
12
13use futures::{
14    FutureExt,
15    future::{Either, Future, select},
16};
17use iota_common::sync::notify_read::NotifyRead;
18use iota_config::NodeConfig;
19use iota_metrics::{
20    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
21    spawn_logged_monitored_task, spawn_monitored_task,
22};
23use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use iota_types::{
25    base_types::TransactionDigest,
26    error::{IotaError, IotaResult},
27    iota_system_state::IotaSystemState,
28    messages_checkpoint::CheckpointSequenceNumber,
29    quorum_driver_types::{
30        EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV1,
31        ExecuteTransactionResponseV1, FinalizedEffects, IsTransactionExecutedLocally,
32        QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResponse,
33        QuorumDriverResult,
34    },
35    transaction::{TransactionData, VerifiedTransaction},
36    transaction_driver_types::{
37        EffectsFinalityInfo as TdEffectsFinalityInfo, FinalizedEffects as TdFinalizedEffects,
38    },
39    transaction_executor::{SimulateTransactionResult, VmChecks},
40};
41use prometheus::{
42    Histogram, Registry,
43    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
44    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
45    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
46    register_int_gauge_with_registry,
47};
48use tokio::{
49    sync::broadcast::{Receiver, error::RecvError},
50    task::JoinHandle,
51    time::timeout,
52};
53use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
54
55use crate::{
56    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
57    authority_aggregator::AuthorityAggregator,
58    authority_client::{AuthorityAPI, NetworkAuthorityClient},
59    quorum_driver::{
60        QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
61        reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
62    },
63    transaction_driver::{
64        AggregatedRequestErrors, QuorumTransactionResponse, SubmitTransactionOptions,
65        TransactionDriver, TransactionDriverError, TransactionDriverMetrics,
66        reconfig_observer::OnsiteReconfigObserver as TdOnsiteReconfigObserver,
67    },
68    validator_client_monitor::ValidatorClientMetrics,
69};
70
71// How long to wait for local execution (including parents) before a timeout
72// is returned to client.
73const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
74
75const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
76
77/// Transaction Orchestrator is a Node component that supports both QuorumDriver
78/// and TransactionDriver for submitting transactions to validators for
79/// finality. It adds inflight deduplication, waiting for local execution,
80/// recovery, and epoch change handling.
81pub struct TransactionOrchestrator<A: Clone> {
82    // QuorumDriverHandler for the normal flow. Always present if white flag flow is disabled, and
83    // None if white flag flow is enabled.
84    quorum_driver_handler: Option<Arc<QuorumDriverHandler<A>>>,
85    /// Optional TransactionDriver for the white flag direct-to-consensus flow.
86    transaction_driver: Option<Arc<TransactionDriver<A>>>,
87    validator_state: Arc<AuthorityState>,
88    _local_executor_handle: Option<JoinHandle<()>>,
89    pending_tx_log: Arc<WritePathPendingTransactionLog>,
90    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
91    metrics: Arc<TransactionOrchestratorMetrics>,
92}
93
94impl TransactionOrchestrator<NetworkAuthorityClient> {
95    pub fn new_with_auth_aggregator(
96        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
97        validator_state: Arc<AuthorityState>,
98        reconfig_channel: Receiver<IotaSystemState>,
99        parent_path: &Path,
100        prometheus_registry: &Registry,
101        node_config: Option<&NodeConfig>,
102    ) -> Self {
103        // Check protocol config to determine if white flag flow is enabled
104        let epoch_store = validator_state.load_epoch_store_one_call_per_task();
105        let use_transaction_driver = epoch_store.protocol_config().enable_white_flag_flow();
106
107        // Create TransactionDriver reconfig observer only if white flag is enabled
108        let td_reconfig_observer = if use_transaction_driver {
109            Some(TdOnsiteReconfigObserver::new(
110                reconfig_channel.resubscribe(),
111                validator_state.get_object_cache_reader().clone(),
112                validator_state.clone_committee_store(),
113                validators.safe_client_metrics_base.clone(),
114            ))
115        } else {
116            None
117        };
118
119        // Create QuorumDriver reconfig observer only if white flag is NOT enabled
120        let qd_reconfig_observer = if !use_transaction_driver {
121            Some(OnsiteReconfigObserver::new(
122                reconfig_channel.resubscribe(),
123                validator_state.get_object_cache_reader().clone(),
124                validator_state.clone_committee_store(),
125                validators.safe_client_metrics_base.clone(),
126                validators.metrics.deref().clone(),
127            ))
128        } else {
129            None
130        };
131
132        TransactionOrchestrator::new(
133            validators,
134            validator_state,
135            parent_path,
136            prometheus_registry,
137            qd_reconfig_observer,
138            td_reconfig_observer,
139            node_config,
140        )
141    }
142}
143
144impl<A> TransactionOrchestrator<A>
145where
146    A: AuthorityAPI + Send + Sync + 'static + Clone,
147    OnsiteReconfigObserver: ReconfigObserver<A>,
148    TdOnsiteReconfigObserver: crate::transaction_driver::reconfig_observer::ReconfigObserver<A>,
149{
150    pub fn new(
151        validators: Arc<AuthorityAggregator<A>>,
152        validator_state: Arc<AuthorityState>,
153        parent_path: &Path,
154        prometheus_registry: &Registry,
155        reconfig_observer: Option<OnsiteReconfigObserver>,
156        td_reconfig_observer: Option<TdOnsiteReconfigObserver>,
157        node_config: Option<&NodeConfig>,
158    ) -> Self {
159        // Check protocol config to determine if white flag flow is enabled
160        let epoch_store = validator_state.load_epoch_store_one_call_per_task();
161        let use_transaction_driver = epoch_store.protocol_config().enable_white_flag_flow();
162
163        let qd_metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
164        let notifier = Arc::new(NotifyRead::new());
165
166        // Create QuorumDriver only if white flag is NOT enabled
167        let (quorum_driver_handler, effects_receiver) = if !use_transaction_driver {
168            let reconfig_observer = Arc::new(
169                reconfig_observer
170                    .expect("QuorumDriver reconfig observer required when white flag is disabled"),
171            );
172            let handler = Arc::new(
173                QuorumDriverHandlerBuilder::new(validators.clone(), qd_metrics)
174                    .with_notifier(notifier.clone())
175                    .with_reconfig_observer(reconfig_observer)
176                    .start(),
177            );
178            let receiver = handler.subscribe_to_effects();
179            (Some(handler), Some(receiver))
180        } else {
181            (None, None)
182        };
183
184        // Create TransactionDriver only if white flag is enabled
185        let transaction_driver = if use_transaction_driver {
186            let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
187            let client_metrics = Arc::new(ValidatorClientMetrics::new(prometheus_registry));
188            let observer = td_reconfig_observer
189                .expect("TransactionDriver reconfig observer required when white flag is enabled");
190            Some(TransactionDriver::new(
191                validators,
192                Arc::new(observer),
193                td_metrics,
194                node_config,
195                client_metrics,
196            ))
197        } else {
198            None
199        };
200
201        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
202        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
203            parent_path.join("fullnode_pending_transactions"),
204        ));
205
206        // Start pending transaction log cleanup only if QuorumDriver is used
207        let _local_executor_handle =
208            if let (Some(handler), Some(receiver)) = (&quorum_driver_handler, effects_receiver) {
209                let pending_tx_log_clone = pending_tx_log.clone();
210                let res = Some(spawn_monitored_task!(async move {
211                    Self::loop_pending_transaction_log(receiver, pending_tx_log_clone).await;
212                }));
213
214                // Schedule pending transaction recovery (QuorumDriver mode only;
215                // TransactionDriver does not track pending certificates)
216                Self::schedule_txes_in_log(pending_tx_log.clone(), handler.clone());
217
218                res
219            } else {
220                // TransactionDriver mode: no pending tx log cleanup needed
221                // (transactions go directly to consensus, no certificate tracking)
222                None
223            };
224
225        Self {
226            quorum_driver_handler,
227            transaction_driver,
228            validator_state,
229            _local_executor_handle,
230            pending_tx_log,
231            notifier,
232            metrics,
233        }
234    }
235}
236
237impl<A> TransactionOrchestrator<A>
238where
239    A: AuthorityAPI + Send + Sync + 'static + Clone,
240{
241    #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
242    fields(
243        tx_digest = ?request.transaction.digest(),
244        tx_type = ?request_type,
245    ),
246    err)]
247    pub async fn execute_transaction_block(
248        &self,
249        request: ExecuteTransactionRequestV1,
250        request_type: ExecuteTransactionRequestType,
251        client_addr: Option<SocketAddr>,
252    ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
253    {
254        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
255
256        // Use TransactionDriver if configured, otherwise fall back to QuorumDriver.
257        let (transaction, response) = if let Some(td) = &self.transaction_driver {
258            self.submit_with_transaction_driver(td.clone(), &epoch_store, request, client_addr)
259                .await?
260        } else {
261            let (tx, qd_resp) = self
262                .execute_transaction_impl(&epoch_store, request, client_addr)
263                .await?;
264            let resp = quorum_driver_response_to_v1(qd_resp);
265            (tx, resp)
266        };
267
268        let executed_locally = if matches!(
269            request_type,
270            ExecuteTransactionRequestType::WaitForLocalExecution
271        ) {
272            let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
273                &self.validator_state,
274                &transaction,
275                &self.metrics,
276            )
277            .await
278            .is_ok();
279            add_server_timing("local_execution");
280            executed_locally
281        } else {
282            false
283        };
284
285        Ok((response, executed_locally))
286    }
287
288    // Utilize the handle_certificate_v1 validator api to request input/output
289    // objects
290    #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
291                 fields(tx_digest = ?request.transaction.digest()))]
292    pub async fn execute_transaction_v1(
293        &self,
294        request: ExecuteTransactionRequestV1,
295        client_addr: Option<SocketAddr>,
296    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
297        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
298
299        if let Some(td) = &self.transaction_driver {
300            let (_, response) = self
301                .submit_with_transaction_driver(td.clone(), &epoch_store, request, client_addr)
302                .await?;
303            return Ok(response);
304        }
305
306        let qd_resp = self
307            .execute_transaction_impl(&epoch_store, request, client_addr)
308            .await
309            .map(|(_, r)| r)?;
310
311        Ok(quorum_driver_response_to_v1(qd_resp))
312    }
313
314    /// Submit a transaction using the TransactionDriver (white flag flow).
315    #[instrument(name = "tx_orchestrator_submit_with_td", level = "trace", skip_all,
316                 fields(tx_digest = ?request.transaction.digest()))]
317    async fn submit_with_transaction_driver(
318        &self,
319        td: Arc<TransactionDriver<A>>,
320        epoch_store: &AuthorityPerEpochStore,
321        request: ExecuteTransactionRequestV1,
322        client_addr: Option<SocketAddr>,
323    ) -> Result<(VerifiedTransaction, ExecuteTransactionResponseV1), QuorumDriverError> {
324        let transaction = epoch_store
325            .verify_transaction(request.transaction.clone())
326            .map_err(QuorumDriverError::InvalidUserSignature)?;
327        let tx_digest = *transaction.digest();
328
329        // TODO: add transaction to some struct to prevent sending the same transaction
330        // multiple times in case client sends it multiple times if self
331        //     .pending_tx_log
332        //     .write_pending_transaction_maybe(&transaction)
333        //     .await
334        //     .map_err(|e| QuorumDriverError::QuorumDriverInternal(e))?
335        // {
336        //     debug!(?tx_digest, "no pending request in flight, submitting to
337        // TransactionDriver."); } else {
338        //     debug!(?tx_digest, "transaction already in flight, skipping duplicate
339        // submission."); }
340
341        let td_response = td
342            .drive_transaction(
343                Some(request.transaction.clone()),
344                SubmitTransactionOptions {
345                    forwarded_client_addr: client_addr,
346                    ..Default::default()
347                },
348                Some(WAIT_FOR_FINALITY_TIMEOUT),
349            )
350            .await
351            .map_err(map_td_error_to_qd)?;
352
353        debug!(
354            "TransactionOrchestrator: TransactionDriver submission succeeded for transaction {}",
355            tx_digest
356        );
357
358        let QuorumTransactionResponse {
359            effects: td_effects,
360            events,
361            input_objects,
362            output_objects,
363            auxiliary_data,
364        } = td_response;
365
366        let effects = convert_td_to_qd_effects(td_effects);
367        let response = ExecuteTransactionResponseV1 {
368            effects,
369            events: if request.include_events { events } else { None },
370            input_objects: if request.include_input_objects {
371                input_objects
372            } else {
373                None
374            },
375            output_objects: if request.include_output_objects {
376                output_objects
377            } else {
378                None
379            },
380            auxiliary_data: if request.include_auxiliary_data {
381                auxiliary_data
382            } else {
383                None
384            },
385        };
386
387        Ok((transaction, response))
388    }
389
390    // TODO check if tx is already executed on this node.
391    // Note: since EffectsCert is not stored today, we need to gather that from
392    // validators (and maybe store it for caching purposes)
393    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
394    pub async fn execute_transaction_impl(
395        &self,
396        epoch_store: &AuthorityPerEpochStore,
397        request: ExecuteTransactionRequestV1,
398        client_addr: Option<SocketAddr>,
399    ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
400        // Reject malformed transactions before any code path inspects shared
401        // inputs or `MoveAuthenticator`
402        request
403            .transaction
404            .validity_check(epoch_store.protocol_config(), epoch_store.epoch())
405            .map_err(QuorumDriverError::InvalidTransaction)?;
406        let transaction = epoch_store
407            .verify_transaction(request.transaction.clone())
408            .map_err(QuorumDriverError::InvalidUserSignature)?;
409        let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
410        let tx_digest = *transaction.digest();
411        debug!(?tx_digest, "TO Received transaction execution request.");
412
413        let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
414            (
415                self.metrics.request_latency_shared_obj.start_timer(),
416                self.metrics
417                    .wait_for_finality_latency_shared_obj
418                    .start_timer(),
419            )
420        } else {
421            (
422                self.metrics.request_latency_single_writer.start_timer(),
423                self.metrics
424                    .wait_for_finality_latency_single_writer
425                    .start_timer(),
426            )
427        };
428
429        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
430        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
431        wait_for_finality_gauge.inc();
432        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
433            in_flight.dec();
434        });
435
436        let ticket = self
437            .submit(transaction.clone(), request, client_addr)
438            .await
439            .map_err(|e| {
440                warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
441                QuorumDriverError::QuorumDriverInternal(e)
442            })?;
443
444        let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
445            debug!(?tx_digest, "Timeout waiting for transaction finality.");
446            self.metrics.wait_for_finality_timeout.inc();
447            return Err(QuorumDriverError::TimeoutBeforeFinality);
448        };
449        add_server_timing("wait_for_finality");
450
451        drop(_txn_finality_timer);
452        drop(_wait_for_finality_gauge);
453        self.metrics.wait_for_finality_finished.inc();
454
455        match result {
456            Err(err) => {
457                warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
458                Err(QuorumDriverError::QuorumDriverInternal(err))
459            }
460            Ok(Err(err)) => Err(err),
461            Ok(Ok(response)) => {
462                good_response_metrics.inc();
463                Ok((transaction, response))
464            }
465        }
466    }
467
468    /// Submits the transaction to Quorum Driver for execution.
469    /// Returns an awaitable Future.
470    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
471    async fn submit(
472        &self,
473        transaction: VerifiedTransaction,
474        request: ExecuteTransactionRequestV1,
475        client_addr: Option<SocketAddr>,
476    ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
477        let tx_digest = *transaction.digest();
478        let ticket = self.notifier.register_one(&tx_digest);
479        // TODO(william) need to also write client adr to pending tx log below
480        // so that we can re-execute with this client addr if we restart
481        if self
482            .pending_tx_log
483            .write_pending_transaction_maybe(&transaction)
484            .await?
485        {
486            debug!(?tx_digest, "no pending request in flight, submitting.");
487            self.quorum_driver()
488                .submit_transaction_no_ticket(request.clone(), client_addr)
489                .await?;
490        }
491        // It's possible that the transaction effects is already stored in DB at this
492        // point. So we also subscribe to that. If we hear from `effects_await`
493        // first, it means the ticket misses the previous notification, and we
494        // want to ask quorum driver to form a certificate for us again, to
495        // serve this request.
496        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
497        let qd = self.clone_quorum_driver();
498        Ok(async move {
499            let digests = [tx_digest];
500            let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
501            // let-and-return necessary to satisfy borrow checker.
502            let res = match select(ticket, effects_await.boxed()).await {
503                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
504                Either::Right((_, unfinished_quorum_driver_task)) => {
505                    debug!(
506                        ?tx_digest,
507                        "Effects are available in DB, use quorum driver to get a certificate"
508                    );
509                    qd.submit_transaction_no_ticket(request, client_addr)
510                        .await?;
511                    Ok(unfinished_quorum_driver_task.await)
512                }
513            };
514            res
515        })
516    }
517
518    #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
519    async fn wait_for_finalized_tx_executed_locally_with_timeout(
520        validator_state: &Arc<AuthorityState>,
521        transaction: &VerifiedTransaction,
522        metrics: &TransactionOrchestratorMetrics,
523    ) -> IotaResult {
524        let tx_digest = *transaction.digest();
525        metrics.local_execution_in_flight.inc();
526        let _metrics_guard =
527            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
528                in_flight.dec();
529            });
530
531        let _guard = if transaction.contains_shared_object() {
532            metrics.local_execution_latency_shared_obj.start_timer()
533        } else {
534            metrics.local_execution_latency_single_writer.start_timer()
535        };
536        debug!(
537            ?tx_digest,
538            "Waiting for finalized tx to be executed locally."
539        );
540        match timeout(
541            LOCAL_EXECUTION_TIMEOUT,
542            validator_state
543                .get_transaction_cache_reader()
544                .try_notify_read_executed_effects_digests(&[tx_digest]),
545        )
546        .instrument(trace_span!("local_execution"))
547        .await
548        {
549            Err(_elapsed) => {
550                debug!(
551                    ?tx_digest,
552                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
553                    LOCAL_EXECUTION_TIMEOUT
554                );
555                metrics.local_execution_timeout.inc();
556                Err(IotaError::Timeout)
557            }
558            Ok(Err(err)) => {
559                debug!(
560                    ?tx_digest,
561                    "Waiting for finalized tx to be executed locally failed with error: {:?}", err
562                );
563                metrics.local_execution_failure.inc();
564                Err(IotaError::TransactionOrchestratorLocalExecution {
565                    error: err.to_string(),
566                })
567            }
568            Ok(Ok(_)) => {
569                metrics.local_execution_success.inc();
570                Ok(())
571            }
572        }
573    }
574
575    // TODO: Potentially cleanup this function and pending transaction log.
576    async fn loop_pending_transaction_log(
577        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
578        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
579    ) {
580        loop {
581            match effects_receiver.recv().await {
582                Ok(Ok((transaction, ..))) => {
583                    let tx_digest = transaction.digest();
584                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
585                        error!(
586                            ?tx_digest,
587                            "Failed to finish transaction in pending transaction log: {err}"
588                        );
589                    }
590                }
591                Ok(Err((tx_digest, _err))) => {
592                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
593                        error!(
594                            ?tx_digest,
595                            "Failed to finish transaction in pending transaction log: {err}"
596                        );
597                    }
598                }
599                Err(RecvError::Closed) => {
600                    error!("Sender of effects subscriber queue has been dropped!");
601                    return;
602                }
603                Err(RecvError::Lagged(skipped_count)) => {
604                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
605                }
606            }
607        }
608    }
609
610    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
611        self.quorum_driver_handler
612            .as_ref()
613            .expect("QuorumDriverHandler is not initialized.")
614    }
615
616    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
617        self.quorum_driver_handler
618            .clone()
619            .expect("QuorumDriverHandler is not initialized.")
620    }
621
622    pub fn transaction_driver(&self) -> Option<&Arc<TransactionDriver<A>>> {
623        self.transaction_driver.as_ref()
624    }
625
626    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
627        self.quorum_driver().authority_aggregator().load_full()
628    }
629
630    pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
631        if let Some(handler) = &self.quorum_driver_handler {
632            handler.subscribe_to_effects()
633        } else {
634            panic!("QuorumDriverHandler is not initialized, cannot subscribe to effects queue.");
635        }
636    }
637
638    fn update_metrics(
639        &'_ self,
640        transaction: &VerifiedTransaction,
641    ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
642        let (in_flight, good_response) = if transaction.contains_shared_object() {
643            self.metrics.total_req_received_shared_object.inc();
644            (
645                self.metrics.req_in_flight_shared_object.clone(),
646                &self.metrics.good_response_shared_object,
647            )
648        } else {
649            self.metrics.total_req_received_single_writer.inc();
650            (
651                self.metrics.req_in_flight_single_writer.clone(),
652                &self.metrics.good_response_single_writer,
653            )
654        };
655        in_flight.inc();
656        (
657            scopeguard::guard(in_flight, |in_flight| {
658                in_flight.dec();
659            }),
660            good_response,
661        )
662    }
663
664    fn schedule_txes_in_log(
665        pending_tx_log: Arc<WritePathPendingTransactionLog>,
666        quorum_driver: Arc<QuorumDriverHandler<A>>,
667    ) {
668        spawn_logged_monitored_task!(async move {
669            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
670                info!("Skipping loading pending transactions from pending_tx_log.");
671                return;
672            }
673            let pending_txes = pending_tx_log
674                .load_all_pending_transactions()
675                .expect("failed to load all pending transactions");
676            info!(
677                "Recovering {} pending transactions from pending_tx_log.",
678                pending_txes.len()
679            );
680            for (i, tx) in pending_txes.into_iter().enumerate() {
681                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
682                // requires a migration.
683                let tx = tx.into_inner();
684                let tx_digest = *tx.digest();
685                // It's not impossible we fail to enqueue a task but that's not the end of
686                // world. TODO(william) correctly extract client_addr from logs
687                if let Err(err) = quorum_driver
688                    .submit_transaction_no_ticket(
689                        ExecuteTransactionRequestV1 {
690                            transaction: tx,
691                            include_events: true,
692                            include_input_objects: false,
693                            include_output_objects: false,
694                            include_auxiliary_data: false,
695                        },
696                        None,
697                    )
698                    .await
699                {
700                    warn!(
701                        ?tx_digest,
702                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
703                    );
704                } else {
705                    debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
706                    if (i + 1) % 1000 == 0 {
707                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
708                    }
709                }
710            }
711            // Transactions will be cleaned up in
712            // loop_execute_finalized_tx_locally() after they
713            // produce effects.
714        });
715    }
716
717    pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
718        self.pending_tx_log.load_all_pending_transactions()
719    }
720}
721
722/// Convert a `QuorumDriverResponse` (contains
723/// `VerifiedCertifiedTransactionEffects`) to the V1 response format that uses
724/// `FinalizedEffects`.
725fn quorum_driver_response_to_v1(response: QuorumDriverResponse) -> ExecuteTransactionResponseV1 {
726    let QuorumDriverResponse {
727        effects_cert,
728        events,
729        input_objects,
730        output_objects,
731        auxiliary_data,
732    } = response;
733    ExecuteTransactionResponseV1 {
734        effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
735        events,
736        input_objects,
737        output_objects,
738        auxiliary_data,
739    }
740}
741
742/// Convert a `transaction_driver_types::FinalizedEffects` into a
743/// `quorum_driver_types::FinalizedEffects`.
744fn convert_td_to_qd_effects(td: TdFinalizedEffects) -> FinalizedEffects {
745    let finality_info = match td.finality_info {
746        TdEffectsFinalityInfo::Certified(sig) => EffectsFinalityInfo::Certified(sig),
747        TdEffectsFinalityInfo::Checkpointed(epoch, seq) => {
748            EffectsFinalityInfo::Checkpointed(epoch, seq)
749        }
750        TdEffectsFinalityInfo::QuorumExecuted(epoch) => EffectsFinalityInfo::QuorumExecuted(epoch),
751    };
752    FinalizedEffects {
753        effects: td.effects,
754        finality_info,
755    }
756}
757
758/// Map a `TransactionDriverError` to a `QuorumDriverError` for client
759/// reporting. The variant choice signals retriability: clients retry on
760/// `QuorumDriverInternal`, `FailedWithTransientErrorAfterMaximumAttempts`,
761/// and `TimeoutBeforeFinality`, but treat `InvalidTransaction` /
762/// `InvalidUserSignature` as terminal. Submission-time rejections that
763/// cannot succeed on resubmission must therefore not be reported as
764/// internal.
765fn map_td_error_to_qd(e: TransactionDriverError) -> QuorumDriverError {
766    use TransactionDriverError::*;
767    match e {
768        ValidationFailed { error } => {
769            QuorumDriverError::InvalidUserSignature(IotaError::InvalidSignature { error })
770        }
771        TimeoutWithLastRetriableError { .. } => QuorumDriverError::TimeoutBeforeFinality,
772        RejectedByValidators {
773            submission_non_retriable_errors,
774            ..
775        } => {
776            // f+1 stake of validators returned non-retriable errors during
777            // submission (bad signature, malformed tx, lock conflict, ...).
778            // f+1 means at least one honest validator considered this tx
779            // invalid, so resubmitting the same bytes cannot succeed.
780            let representative = submission_non_retriable_errors
781                .errors
782                .into_iter()
783                .next()
784                .map(|(msg, _, _, _)| msg)
785                .unwrap_or_else(|| "transaction rejected as invalid during submission".to_string());
786            QuorumDriverError::InvalidTransaction(IotaError::Unknown(format!(
787                "Transaction was rejected as invalid by more than 1/3 of validator stake \
788                 during submission (non-retriable): {representative}"
789            )))
790        }
791        Aborted {
792            submission_retriable_errors,
793            submission_non_retriable_errors,
794            ..
795        } => {
796            // Driver exhausted the validator list without reaching the f+1
797            // non-retriable threshold — most failures were transient
798            // (validator down, network, overload). Surface as retriable so
799            // the client can resubmit.
800            let attempts = count_validator_attempts(&submission_retriable_errors)
801                + count_validator_attempts(&submission_non_retriable_errors);
802            QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
803                total_attempts: attempts,
804            }
805        }
806        other @ ForkedExecution { .. } => {
807            // Validators disagree on effects digests — a protocol-level
808            // invariant violation, never a client retry case. Log loud so
809            // on-call sees it; surface as internal.
810            let msg = other.to_string();
811            error!("TransactionDriver observed forked execution: {msg}");
812            QuorumDriverError::QuorumDriverInternal(IotaError::Unknown(msg))
813        }
814        other @ ClientInternal { .. } => {
815            let msg = other.to_string();
816            warn!("TransactionDriver client-internal error: {msg}");
817            QuorumDriverError::QuorumDriverInternal(IotaError::Unknown(msg))
818        }
819    }
820}
821
822fn count_validator_attempts(errors: &AggregatedRequestErrors) -> u32 {
823    errors
824        .errors
825        .iter()
826        .map(|(_, authorities, _, _)| authorities.len() as u32)
827        .sum()
828}
829
830/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
831#[derive(Clone)]
832pub struct TransactionOrchestratorMetrics {
833    total_req_received_single_writer: GenericCounter<AtomicU64>,
834    total_req_received_shared_object: GenericCounter<AtomicU64>,
835
836    good_response_single_writer: GenericCounter<AtomicU64>,
837    good_response_shared_object: GenericCounter<AtomicU64>,
838
839    req_in_flight_single_writer: GenericGauge<AtomicI64>,
840    req_in_flight_shared_object: GenericGauge<AtomicI64>,
841
842    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
843    wait_for_finality_finished: GenericCounter<AtomicU64>,
844    wait_for_finality_timeout: GenericCounter<AtomicU64>,
845
846    local_execution_in_flight: GenericGauge<AtomicI64>,
847    local_execution_success: GenericCounter<AtomicU64>,
848    local_execution_timeout: GenericCounter<AtomicU64>,
849    local_execution_failure: GenericCounter<AtomicU64>,
850
851    request_latency_single_writer: Histogram,
852    request_latency_shared_obj: Histogram,
853    wait_for_finality_latency_single_writer: Histogram,
854    wait_for_finality_latency_shared_obj: Histogram,
855    local_execution_latency_single_writer: Histogram,
856    local_execution_latency_shared_obj: Histogram,
857}
858
859// Note that labeled-metrics are stored upfront individually
860// to mitigate the perf hit by MetricsVec.
861// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
862impl TransactionOrchestratorMetrics {
863    pub fn new(registry: &Registry) -> Self {
864        let total_req_received = register_int_counter_vec_with_registry!(
865            "tx_orchestrator_total_req_received",
866            "Total number of executions request Transaction Orchestrator receives, group by tx type",
867            &["tx_type"],
868            registry
869        )
870        .unwrap();
871
872        let total_req_received_single_writer =
873            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
874        let total_req_received_shared_object =
875            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
876
877        let good_response = register_int_counter_vec_with_registry!(
878            "tx_orchestrator_good_response",
879            "Total number of good responses Transaction Orchestrator generates, group by tx type",
880            &["tx_type"],
881            registry
882        )
883        .unwrap();
884
885        let good_response_single_writer =
886            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
887        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
888
889        let req_in_flight = register_int_gauge_vec_with_registry!(
890            "tx_orchestrator_req_in_flight",
891            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
892            &["tx_type"],
893            registry
894        )
895        .unwrap();
896
897        let req_in_flight_single_writer =
898            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
899        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
900
901        let request_latency = register_histogram_vec_with_registry!(
902            "tx_orchestrator_request_latency",
903            "Time spent in processing one Transaction Orchestrator request",
904            &["tx_type"],
905            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
906            registry,
907        )
908        .unwrap();
909        let wait_for_finality_latency = register_histogram_vec_with_registry!(
910            "tx_orchestrator_wait_for_finality_latency",
911            "Time spent in waiting for one Transaction Orchestrator request gets finalized",
912            &["tx_type"],
913            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
914            registry,
915        )
916        .unwrap();
917        let local_execution_latency = register_histogram_vec_with_registry!(
918            "tx_orchestrator_local_execution_latency",
919            "Time spent in waiting for one Transaction Orchestrator gets locally executed",
920            &["tx_type"],
921            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
922            registry,
923        )
924        .unwrap();
925
926        Self {
927            total_req_received_single_writer,
928            total_req_received_shared_object,
929            good_response_single_writer,
930            good_response_shared_object,
931            req_in_flight_single_writer,
932            req_in_flight_shared_object,
933            wait_for_finality_in_flight: register_int_gauge_with_registry!(
934                "tx_orchestrator_wait_for_finality_in_flight",
935                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
936                registry,
937            )
938            .unwrap(),
939            wait_for_finality_finished: register_int_counter_with_registry!(
940                "tx_orchestrator_wait_for_finality_finished",
941                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
942                registry,
943            )
944            .unwrap(),
945            wait_for_finality_timeout: register_int_counter_with_registry!(
946                "tx_orchestrator_wait_for_finality_timeout",
947                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
948                registry,
949            )
950            .unwrap(),
951            local_execution_in_flight: register_int_gauge_with_registry!(
952                "tx_orchestrator_local_execution_in_flight",
953                "Number of local execution txns in flights Transaction Orchestrator handles",
954                registry,
955            )
956            .unwrap(),
957            local_execution_success: register_int_counter_with_registry!(
958                "tx_orchestrator_local_execution_success",
959                "Total number of successful local execution txns Transaction Orchestrator handles",
960                registry,
961            )
962            .unwrap(),
963            local_execution_timeout: register_int_counter_with_registry!(
964                "tx_orchestrator_local_execution_timeout",
965                "Total number of timed-out local execution txns Transaction Orchestrator handles",
966                registry,
967            )
968            .unwrap(),
969            local_execution_failure: register_int_counter_with_registry!(
970                "tx_orchestrator_local_execution_failure",
971                "Total number of failed local execution txns Transaction Orchestrator handles",
972                registry,
973            )
974            .unwrap(),
975            request_latency_single_writer: request_latency
976                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
977            request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
978            wait_for_finality_latency_single_writer: wait_for_finality_latency
979                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
980            wait_for_finality_latency_shared_obj: wait_for_finality_latency
981                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
982            local_execution_latency_single_writer: local_execution_latency
983                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
984            local_execution_latency_shared_obj: local_execution_latency
985                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
986        }
987    }
988
989    pub fn new_for_tests() -> Self {
990        let registry = Registry::new();
991        Self::new(&registry)
992    }
993}
994
995#[async_trait::async_trait]
996impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
997where
998    A: AuthorityAPI + Send + Sync + 'static + Clone,
999{
1000    async fn execute_transaction(
1001        &self,
1002        request: ExecuteTransactionRequestV1,
1003        client_addr: Option<std::net::SocketAddr>,
1004    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
1005        self.execute_transaction_v1(request, client_addr).await
1006    }
1007
1008    fn simulate_transaction(
1009        &self,
1010        transaction: TransactionData,
1011        checks: VmChecks,
1012    ) -> Result<SimulateTransactionResult, IotaError> {
1013        self.validator_state
1014            .simulate_transaction(transaction, checks)
1015    }
1016
1017    /// Wait for the given transactions to be included in a checkpoint.
1018    ///
1019    /// Returns a mapping from transaction digest to
1020    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
1021    /// On timeout, returns partial results for any transactions that were
1022    /// already checkpointed.
1023    async fn wait_for_checkpoint_inclusion(
1024        &self,
1025        digests: &[TransactionDigest],
1026        timeout: Duration,
1027    ) -> Result<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>, IotaError> {
1028        self.validator_state
1029            .wait_for_checkpoint_inclusion(digests, timeout)
1030            .await
1031    }
1032}