iota_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6// submit transactions to validators for finality, and proactively executes
7// finalized transactions locally, when possible.
8
9use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12    FutureExt,
13    future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18    spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22    base_types::TransactionDigest,
23    error::{IotaError, IotaResult},
24    iota_system_state::IotaSystemState,
25    quorum_driver_types::{
26        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
27        FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
28        QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
29    },
30    transaction::{TransactionData, VerifiedTransaction},
31    transaction_executor::{SimulateTransactionResult, VmChecks},
32};
33use prometheus::{
34    Histogram, Registry,
35    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
36    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
37    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
38    register_int_gauge_with_registry,
39};
40use tokio::{
41    sync::broadcast::{Receiver, error::RecvError},
42    task::JoinHandle,
43    time::timeout,
44};
45use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
46
47use crate::{
48    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
49    authority_aggregator::AuthorityAggregator,
50    authority_client::{AuthorityAPI, NetworkAuthorityClient},
51    quorum_driver::{
52        QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
53        reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
54    },
55};
56
57// How long to wait for local execution (including parents) before a timeout
58// is returned to client.
59const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
60
61const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
62
63/// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
64/// submit transactions to validators for finality, and proactively executes
65/// finalized transactions locally, when possible.
66pub struct TransactionOrchestrator<A: Clone> {
67    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
68    validator_state: Arc<AuthorityState>,
69    _local_executor_handle: JoinHandle<()>,
70    pending_tx_log: Arc<WritePathPendingTransactionLog>,
71    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
72    metrics: Arc<TransactionOrchestratorMetrics>,
73}
74
75impl TransactionOrchestrator<NetworkAuthorityClient> {
76    pub fn new_with_auth_aggregator(
77        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
78        validator_state: Arc<AuthorityState>,
79        reconfig_channel: Receiver<IotaSystemState>,
80        parent_path: &Path,
81        prometheus_registry: &Registry,
82    ) -> Self {
83        let observer = OnsiteReconfigObserver::new(
84            reconfig_channel,
85            validator_state.get_object_cache_reader().clone(),
86            validator_state.clone_committee_store(),
87            validators.safe_client_metrics_base.clone(),
88            validators.metrics.deref().clone(),
89        );
90
91        TransactionOrchestrator::new(
92            validators,
93            validator_state,
94            parent_path,
95            prometheus_registry,
96            observer,
97        )
98    }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103    A: AuthorityAPI + Send + Sync + 'static + Clone,
104    OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106    pub fn new(
107        validators: Arc<AuthorityAggregator<A>>,
108        validator_state: Arc<AuthorityState>,
109        parent_path: &Path,
110        prometheus_registry: &Registry,
111        reconfig_observer: OnsiteReconfigObserver,
112    ) -> Self {
113        let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
114        let notifier = Arc::new(NotifyRead::new());
115        let reconfig_observer = Arc::new(reconfig_observer);
116        let quorum_driver_handler = Arc::new(
117            QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
118                .with_notifier(notifier.clone())
119                .with_reconfig_observer(reconfig_observer.clone())
120                .start(),
121        );
122
123        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
124        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
125        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
126            parent_path.join("fullnode_pending_transactions"),
127        ));
128        let pending_tx_log_clone = pending_tx_log.clone();
129        let _local_executor_handle = {
130            spawn_monitored_task!(async move {
131                Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
132            })
133        };
134        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
135        Self {
136            quorum_driver_handler,
137            validator_state,
138            _local_executor_handle,
139            pending_tx_log,
140            notifier,
141            metrics,
142        }
143    }
144}
145
146impl<A> TransactionOrchestrator<A>
147where
148    A: AuthorityAPI + Send + Sync + 'static + Clone,
149{
150    #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
151    fields(
152        tx_digest = ?request.transaction.digest(),
153        tx_type = ?request_type,
154    ),
155    err)]
156    pub async fn execute_transaction_block(
157        &self,
158        request: ExecuteTransactionRequestV1,
159        request_type: ExecuteTransactionRequestType,
160        client_addr: Option<SocketAddr>,
161    ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
162    {
163        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
164
165        let (transaction, response) = self
166            .execute_transaction_impl(&epoch_store, request, client_addr)
167            .await?;
168
169        let executed_locally = if matches!(
170            request_type,
171            ExecuteTransactionRequestType::WaitForLocalExecution
172        ) {
173            let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
174                &self.validator_state,
175                &transaction,
176                &self.metrics,
177            )
178            .await
179            .is_ok();
180            add_server_timing("local_execution");
181            executed_locally
182        } else {
183            false
184        };
185
186        let QuorumDriverResponse {
187            effects_cert,
188            events,
189            input_objects,
190            output_objects,
191            auxiliary_data,
192        } = response;
193
194        let response = ExecuteTransactionResponseV1 {
195            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
196            events,
197            input_objects,
198            output_objects,
199            auxiliary_data,
200        };
201
202        Ok((response, executed_locally))
203    }
204
205    // Utilize the handle_certificate_v1 validator api to request input/output
206    // objects
207    #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
208                 fields(tx_digest = ?request.transaction.digest()))]
209    pub async fn execute_transaction_v1(
210        &self,
211        request: ExecuteTransactionRequestV1,
212        client_addr: Option<SocketAddr>,
213    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
214        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
215
216        let QuorumDriverResponse {
217            effects_cert,
218            events,
219            input_objects,
220            output_objects,
221            auxiliary_data,
222        } = self
223            .execute_transaction_impl(&epoch_store, request, client_addr)
224            .await
225            .map(|(_, r)| r)?;
226
227        Ok(ExecuteTransactionResponseV1 {
228            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
229            events,
230            input_objects,
231            output_objects,
232            auxiliary_data,
233        })
234    }
235
236    // TODO check if tx is already executed on this node.
237    // Note: since EffectsCert is not stored today, we need to gather that from
238    // validators (and maybe store it for caching purposes)
239    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
240    pub async fn execute_transaction_impl(
241        &self,
242        epoch_store: &AuthorityPerEpochStore,
243        request: ExecuteTransactionRequestV1,
244        client_addr: Option<SocketAddr>,
245    ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
246        let transaction = epoch_store
247            .verify_transaction(request.transaction.clone())
248            .map_err(QuorumDriverError::InvalidUserSignature)?;
249        let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
250        let tx_digest = *transaction.digest();
251        debug!(?tx_digest, "TO Received transaction execution request.");
252
253        let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
254            (
255                self.metrics.request_latency_shared_obj.start_timer(),
256                self.metrics
257                    .wait_for_finality_latency_shared_obj
258                    .start_timer(),
259            )
260        } else {
261            (
262                self.metrics.request_latency_single_writer.start_timer(),
263                self.metrics
264                    .wait_for_finality_latency_single_writer
265                    .start_timer(),
266            )
267        };
268
269        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
270        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
271        wait_for_finality_gauge.inc();
272        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
273            in_flight.dec();
274        });
275
276        let ticket = self
277            .submit(transaction.clone(), request, client_addr)
278            .await
279            .map_err(|e| {
280                warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
281                QuorumDriverError::QuorumDriverInternal(e)
282            })?;
283
284        let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
285            debug!(?tx_digest, "Timeout waiting for transaction finality.");
286            self.metrics.wait_for_finality_timeout.inc();
287            return Err(QuorumDriverError::TimeoutBeforeFinality);
288        };
289        add_server_timing("wait_for_finality");
290
291        drop(_txn_finality_timer);
292        drop(_wait_for_finality_gauge);
293        self.metrics.wait_for_finality_finished.inc();
294
295        match result {
296            Err(err) => {
297                warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
298                Err(QuorumDriverError::QuorumDriverInternal(err))
299            }
300            Ok(Err(err)) => Err(err),
301            Ok(Ok(response)) => {
302                good_response_metrics.inc();
303                Ok((transaction, response))
304            }
305        }
306    }
307
308    /// Submits the transaction to Quorum Driver for execution.
309    /// Returns an awaitable Future.
310    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
311    async fn submit(
312        &self,
313        transaction: VerifiedTransaction,
314        request: ExecuteTransactionRequestV1,
315        client_addr: Option<SocketAddr>,
316    ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
317        let tx_digest = *transaction.digest();
318        let ticket = self.notifier.register_one(&tx_digest);
319        // TODO(william) need to also write client adr to pending tx log below
320        // so that we can re-execute with this client addr if we restart
321        if self
322            .pending_tx_log
323            .write_pending_transaction_maybe(&transaction)
324            .await?
325        {
326            debug!(?tx_digest, "no pending request in flight, submitting.");
327            self.quorum_driver()
328                .submit_transaction_no_ticket(request.clone(), client_addr)
329                .await?;
330        }
331        // It's possible that the transaction effects is already stored in DB at this
332        // point. So we also subscribe to that. If we hear from `effects_await`
333        // first, it means the ticket misses the previous notification, and we
334        // want to ask quorum driver to form a certificate for us again, to
335        // serve this request.
336        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
337        let qd = self.clone_quorum_driver();
338        Ok(async move {
339            let digests = [tx_digest];
340            let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
341            // let-and-return necessary to satisfy borrow checker.
342            let res = match select(ticket, effects_await.boxed()).await {
343                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
344                Either::Right((_, unfinished_quorum_driver_task)) => {
345                    debug!(
346                        ?tx_digest,
347                        "Effects are available in DB, use quorum driver to get a certificate"
348                    );
349                    qd.submit_transaction_no_ticket(request, client_addr)
350                        .await?;
351                    Ok(unfinished_quorum_driver_task.await)
352                }
353            };
354            res
355        })
356    }
357
358    #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
359    async fn wait_for_finalized_tx_executed_locally_with_timeout(
360        validator_state: &Arc<AuthorityState>,
361        transaction: &VerifiedTransaction,
362        metrics: &TransactionOrchestratorMetrics,
363    ) -> IotaResult {
364        let tx_digest = *transaction.digest();
365        metrics.local_execution_in_flight.inc();
366        let _metrics_guard =
367            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
368                in_flight.dec();
369            });
370
371        let _guard = if transaction.contains_shared_object() {
372            metrics.local_execution_latency_shared_obj.start_timer()
373        } else {
374            metrics.local_execution_latency_single_writer.start_timer()
375        };
376        debug!(
377            ?tx_digest,
378            "Waiting for finalized tx to be executed locally."
379        );
380        match timeout(
381            LOCAL_EXECUTION_TIMEOUT,
382            validator_state
383                .get_transaction_cache_reader()
384                .try_notify_read_executed_effects_digests(&[tx_digest]),
385        )
386        .instrument(trace_span!("local_execution"))
387        .await
388        {
389            Err(_elapsed) => {
390                debug!(
391                    ?tx_digest,
392                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
393                    LOCAL_EXECUTION_TIMEOUT
394                );
395                metrics.local_execution_timeout.inc();
396                Err(IotaError::Timeout)
397            }
398            Ok(Err(err)) => {
399                debug!(
400                    ?tx_digest,
401                    "Waiting for finalized tx to be executed locally failed with error: {:?}", err
402                );
403                metrics.local_execution_failure.inc();
404                Err(IotaError::TransactionOrchestratorLocalExecution {
405                    error: err.to_string(),
406                })
407            }
408            Ok(Ok(_)) => {
409                metrics.local_execution_success.inc();
410                Ok(())
411            }
412        }
413    }
414
415    // TODO: Potentially cleanup this function and pending transaction log.
416    async fn loop_pending_transaction_log(
417        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
418        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
419    ) {
420        loop {
421            match effects_receiver.recv().await {
422                Ok(Ok((transaction, ..))) => {
423                    let tx_digest = transaction.digest();
424                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
425                        error!(
426                            ?tx_digest,
427                            "Failed to finish transaction in pending transaction log: {err}"
428                        );
429                    }
430                }
431                Ok(Err((tx_digest, _err))) => {
432                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
433                        error!(
434                            ?tx_digest,
435                            "Failed to finish transaction in pending transaction log: {err}"
436                        );
437                    }
438                }
439                Err(RecvError::Closed) => {
440                    error!("Sender of effects subscriber queue has been dropped!");
441                    return;
442                }
443                Err(RecvError::Lagged(skipped_count)) => {
444                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
445                }
446            }
447        }
448    }
449
450    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
451        &self.quorum_driver_handler
452    }
453
454    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
455        self.quorum_driver_handler.clone()
456    }
457
458    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
459        self.quorum_driver().authority_aggregator().load_full()
460    }
461
462    pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
463        self.quorum_driver_handler.subscribe_to_effects()
464    }
465
466    fn update_metrics(
467        &'_ self,
468        transaction: &VerifiedTransaction,
469    ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
470        let (in_flight, good_response) = if transaction.contains_shared_object() {
471            self.metrics.total_req_received_shared_object.inc();
472            (
473                self.metrics.req_in_flight_shared_object.clone(),
474                &self.metrics.good_response_shared_object,
475            )
476        } else {
477            self.metrics.total_req_received_single_writer.inc();
478            (
479                self.metrics.req_in_flight_single_writer.clone(),
480                &self.metrics.good_response_single_writer,
481            )
482        };
483        in_flight.inc();
484        (
485            scopeguard::guard(in_flight, |in_flight| {
486                in_flight.dec();
487            }),
488            good_response,
489        )
490    }
491
492    fn schedule_txes_in_log(
493        pending_tx_log: Arc<WritePathPendingTransactionLog>,
494        quorum_driver: Arc<QuorumDriverHandler<A>>,
495    ) {
496        spawn_logged_monitored_task!(async move {
497            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
498                info!("Skipping loading pending transactions from pending_tx_log.");
499                return;
500            }
501            let pending_txes = pending_tx_log.load_all_pending_transactions();
502            info!(
503                "Recovering {} pending transactions from pending_tx_log.",
504                pending_txes.len()
505            );
506            for (i, tx) in pending_txes.into_iter().enumerate() {
507                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
508                // requires a migration.
509                let tx = tx.into_inner();
510                let tx_digest = *tx.digest();
511                // It's not impossible we fail to enqueue a task but that's not the end of
512                // world. TODO(william) correctly extract client_addr from logs
513                if let Err(err) = quorum_driver
514                    .submit_transaction_no_ticket(
515                        ExecuteTransactionRequestV1 {
516                            transaction: tx,
517                            include_events: true,
518                            include_input_objects: false,
519                            include_output_objects: false,
520                            include_auxiliary_data: false,
521                        },
522                        None,
523                    )
524                    .await
525                {
526                    warn!(
527                        ?tx_digest,
528                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
529                    );
530                } else {
531                    debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
532                    if (i + 1) % 1000 == 0 {
533                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
534                    }
535                }
536            }
537            // Transactions will be cleaned up in
538            // loop_execute_finalized_tx_locally() after they
539            // produce effects.
540        });
541    }
542
543    pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
544        self.pending_tx_log.load_all_pending_transactions()
545    }
546}
547
548/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
549#[derive(Clone)]
550pub struct TransactionOrchestratorMetrics {
551    total_req_received_single_writer: GenericCounter<AtomicU64>,
552    total_req_received_shared_object: GenericCounter<AtomicU64>,
553
554    good_response_single_writer: GenericCounter<AtomicU64>,
555    good_response_shared_object: GenericCounter<AtomicU64>,
556
557    req_in_flight_single_writer: GenericGauge<AtomicI64>,
558    req_in_flight_shared_object: GenericGauge<AtomicI64>,
559
560    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
561    wait_for_finality_finished: GenericCounter<AtomicU64>,
562    wait_for_finality_timeout: GenericCounter<AtomicU64>,
563
564    local_execution_in_flight: GenericGauge<AtomicI64>,
565    local_execution_success: GenericCounter<AtomicU64>,
566    local_execution_timeout: GenericCounter<AtomicU64>,
567    local_execution_failure: GenericCounter<AtomicU64>,
568
569    request_latency_single_writer: Histogram,
570    request_latency_shared_obj: Histogram,
571    wait_for_finality_latency_single_writer: Histogram,
572    wait_for_finality_latency_shared_obj: Histogram,
573    local_execution_latency_single_writer: Histogram,
574    local_execution_latency_shared_obj: Histogram,
575}
576
577// Note that labeled-metrics are stored upfront individually
578// to mitigate the perf hit by MetricsVec.
579// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
580impl TransactionOrchestratorMetrics {
581    pub fn new(registry: &Registry) -> Self {
582        let total_req_received = register_int_counter_vec_with_registry!(
583            "tx_orchestrator_total_req_received",
584            "Total number of executions request Transaction Orchestrator receives, group by tx type",
585            &["tx_type"],
586            registry
587        )
588        .unwrap();
589
590        let total_req_received_single_writer =
591            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
592        let total_req_received_shared_object =
593            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
594
595        let good_response = register_int_counter_vec_with_registry!(
596            "tx_orchestrator_good_response",
597            "Total number of good responses Transaction Orchestrator generates, group by tx type",
598            &["tx_type"],
599            registry
600        )
601        .unwrap();
602
603        let good_response_single_writer =
604            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
605        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
606
607        let req_in_flight = register_int_gauge_vec_with_registry!(
608            "tx_orchestrator_req_in_flight",
609            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
610            &["tx_type"],
611            registry
612        )
613        .unwrap();
614
615        let req_in_flight_single_writer =
616            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
617        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
618
619        let request_latency = register_histogram_vec_with_registry!(
620            "tx_orchestrator_request_latency",
621            "Time spent in processing one Transaction Orchestrator request",
622            &["tx_type"],
623            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
624            registry,
625        )
626        .unwrap();
627        let wait_for_finality_latency = register_histogram_vec_with_registry!(
628            "tx_orchestrator_wait_for_finality_latency",
629            "Time spent in waiting for one Transaction Orchestrator request gets finalized",
630            &["tx_type"],
631            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
632            registry,
633        )
634        .unwrap();
635        let local_execution_latency = register_histogram_vec_with_registry!(
636            "tx_orchestrator_local_execution_latency",
637            "Time spent in waiting for one Transaction Orchestrator gets locally executed",
638            &["tx_type"],
639            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
640            registry,
641        )
642        .unwrap();
643
644        Self {
645            total_req_received_single_writer,
646            total_req_received_shared_object,
647            good_response_single_writer,
648            good_response_shared_object,
649            req_in_flight_single_writer,
650            req_in_flight_shared_object,
651            wait_for_finality_in_flight: register_int_gauge_with_registry!(
652                "tx_orchestrator_wait_for_finality_in_flight",
653                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
654                registry,
655            )
656            .unwrap(),
657            wait_for_finality_finished: register_int_counter_with_registry!(
658                "tx_orchestrator_wait_for_finality_finished",
659                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
660                registry,
661            )
662            .unwrap(),
663            wait_for_finality_timeout: register_int_counter_with_registry!(
664                "tx_orchestrator_wait_for_finality_timeout",
665                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
666                registry,
667            )
668            .unwrap(),
669            local_execution_in_flight: register_int_gauge_with_registry!(
670                "tx_orchestrator_local_execution_in_flight",
671                "Number of local execution txns in flights Transaction Orchestrator handles",
672                registry,
673            )
674            .unwrap(),
675            local_execution_success: register_int_counter_with_registry!(
676                "tx_orchestrator_local_execution_success",
677                "Total number of successful local execution txns Transaction Orchestrator handles",
678                registry,
679            )
680            .unwrap(),
681            local_execution_timeout: register_int_counter_with_registry!(
682                "tx_orchestrator_local_execution_timeout",
683                "Total number of timed-out local execution txns Transaction Orchestrator handles",
684                registry,
685            )
686            .unwrap(),
687            local_execution_failure: register_int_counter_with_registry!(
688                "tx_orchestrator_local_execution_failure",
689                "Total number of failed local execution txns Transaction Orchestrator handles",
690                registry,
691            )
692            .unwrap(),
693            request_latency_single_writer: request_latency
694                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
695            request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
696            wait_for_finality_latency_single_writer: wait_for_finality_latency
697                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
698            wait_for_finality_latency_shared_obj: wait_for_finality_latency
699                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
700            local_execution_latency_single_writer: local_execution_latency
701                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
702            local_execution_latency_shared_obj: local_execution_latency
703                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
704        }
705    }
706
707    pub fn new_for_tests() -> Self {
708        let registry = Registry::new();
709        Self::new(&registry)
710    }
711}
712
713#[async_trait::async_trait]
714impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
715where
716    A: AuthorityAPI + Send + Sync + 'static + Clone,
717{
718    async fn execute_transaction(
719        &self,
720        request: ExecuteTransactionRequestV1,
721        client_addr: Option<std::net::SocketAddr>,
722    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
723        self.execute_transaction_v1(request, client_addr).await
724    }
725
726    fn simulate_transaction(
727        &self,
728        transaction: TransactionData,
729        checks: VmChecks,
730    ) -> Result<SimulateTransactionResult, IotaError> {
731        self.validator_state
732            .simulate_transaction(transaction, checks)
733    }
734}