iota_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6// submit transactions to validators for finality, and proactively executes
7// finalized transactions locally, when possible.
8
9use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12    FutureExt,
13    future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18    histogram::{Histogram, HistogramVec},
19    spawn_logged_monitored_task, spawn_monitored_task,
20};
21use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
22use iota_types::{
23    base_types::TransactionDigest,
24    effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects},
25    error::{IotaError, IotaResult},
26    executable_transaction::VerifiedExecutableTransaction,
27    iota_system_state::IotaSystemState,
28    quorum_driver_types::{
29        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
30        FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
31        QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
32    },
33    transaction::VerifiedTransaction,
34};
35use prometheus::{
36    Registry,
37    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
38    register_int_counter_vec_with_registry, register_int_counter_with_registry,
39    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
40};
41use tokio::{
42    sync::broadcast::{Receiver, error::RecvError},
43    task::JoinHandle,
44    time::timeout,
45};
46use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
47
48use crate::{
49    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
50    authority_aggregator::AuthorityAggregator,
51    authority_client::{AuthorityAPI, NetworkAuthorityClient},
52    quorum_driver::{
53        QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
54        reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
55    },
56};
57
58// How long to wait for local execution (including parents) before a timeout
59// is returned to client.
60const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
61
62const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
63
64/// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
65/// submit transactions to validators for finality, and proactively executes
66/// finalized transactions locally, when possible.
67pub struct TransactionOrchestrator<A: Clone> {
68    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
69    validator_state: Arc<AuthorityState>,
70    _local_executor_handle: JoinHandle<()>,
71    pending_tx_log: Arc<WritePathPendingTransactionLog>,
72    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
73    metrics: Arc<TransactionOrchestratorMetrics>,
74}
75
76impl TransactionOrchestrator<NetworkAuthorityClient> {
77    pub fn new_with_auth_aggregator(
78        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
79        validator_state: Arc<AuthorityState>,
80        reconfig_channel: Receiver<IotaSystemState>,
81        parent_path: &Path,
82        prometheus_registry: &Registry,
83    ) -> Self {
84        let observer = OnsiteReconfigObserver::new(
85            reconfig_channel,
86            validator_state.get_object_cache_reader().clone(),
87            validator_state.clone_committee_store(),
88            validators.safe_client_metrics_base.clone(),
89            validators.metrics.deref().clone(),
90        );
91
92        TransactionOrchestrator::new(
93            validators,
94            validator_state,
95            parent_path,
96            prometheus_registry,
97            observer,
98        )
99    }
100}
101
102impl<A> TransactionOrchestrator<A>
103where
104    A: AuthorityAPI + Send + Sync + 'static + Clone,
105    OnsiteReconfigObserver: ReconfigObserver<A>,
106{
107    pub fn new(
108        validators: Arc<AuthorityAggregator<A>>,
109        validator_state: Arc<AuthorityState>,
110        parent_path: &Path,
111        prometheus_registry: &Registry,
112        reconfig_observer: OnsiteReconfigObserver,
113    ) -> Self {
114        let notifier = Arc::new(NotifyRead::new());
115        let quorum_driver_handler = Arc::new(
116            QuorumDriverHandlerBuilder::new(
117                validators,
118                Arc::new(QuorumDriverMetrics::new(prometheus_registry)),
119            )
120            .with_notifier(notifier.clone())
121            .with_reconfig_observer(Arc::new(reconfig_observer))
122            .start(),
123        );
124
125        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
126        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
127        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
128            parent_path.join("fullnode_pending_transactions"),
129        ));
130        let pending_tx_log_clone = pending_tx_log.clone();
131        let _local_executor_handle = {
132            spawn_monitored_task!(async move {
133                Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
134                    .await;
135            })
136        };
137        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138        Self {
139            quorum_driver_handler,
140            validator_state,
141            _local_executor_handle,
142            pending_tx_log,
143            notifier,
144            metrics,
145        }
146    }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151    A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153    #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "debug", skip_all,
154    fields(
155        tx_digest = ?request.transaction.digest(),
156        tx_type = ?request_type,
157    ),
158    err)]
159    pub async fn execute_transaction_block(
160        &self,
161        request: ExecuteTransactionRequestV1,
162        request_type: ExecuteTransactionRequestType,
163        client_addr: Option<SocketAddr>,
164    ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165    {
166        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168        let (transaction, response) = self
169            .execute_transaction_impl(&epoch_store, request, client_addr)
170            .await?;
171
172        let executed_locally = if matches!(
173            request_type,
174            ExecuteTransactionRequestType::WaitForLocalExecution
175        ) {
176            let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
177                transaction,
178                response.effects_cert.executed_epoch(),
179            );
180            let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
181                &self.validator_state,
182                &epoch_store,
183                &executable_tx,
184                &response.effects_cert,
185                &self.metrics,
186            )
187            .await
188            .is_ok();
189            add_server_timing("local_execution");
190            executed_locally
191        } else {
192            false
193        };
194
195        let QuorumDriverResponse {
196            effects_cert,
197            events,
198            input_objects,
199            output_objects,
200            auxiliary_data,
201        } = response;
202
203        let response = ExecuteTransactionResponseV1 {
204            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
205            events,
206            input_objects,
207            output_objects,
208            auxiliary_data,
209        };
210
211        Ok((response, executed_locally))
212    }
213
214    // Utilize the handle_certificate_v1 validator api to request input/output
215    // objects
216    #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
217                 fields(tx_digest = ?request.transaction.digest()))]
218    pub async fn execute_transaction_v1(
219        &self,
220        request: ExecuteTransactionRequestV1,
221        client_addr: Option<SocketAddr>,
222    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
223        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
224
225        let QuorumDriverResponse {
226            effects_cert,
227            events,
228            input_objects,
229            output_objects,
230            auxiliary_data,
231        } = self
232            .execute_transaction_impl(&epoch_store, request, client_addr)
233            .await
234            .map(|(_, r)| r)?;
235
236        Ok(ExecuteTransactionResponseV1 {
237            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
238            events,
239            input_objects,
240            output_objects,
241            auxiliary_data,
242        })
243    }
244
245    // TODO check if tx is already executed on this node.
246    // Note: since EffectsCert is not stored today, we need to gather that from
247    // validators (and maybe store it for caching purposes)
248    pub async fn execute_transaction_impl(
249        &self,
250        epoch_store: &AuthorityPerEpochStore,
251        request: ExecuteTransactionRequestV1,
252        client_addr: Option<SocketAddr>,
253    ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
254        let transaction = epoch_store
255            .verify_transaction(request.transaction.clone())
256            .map_err(QuorumDriverError::InvalidUserSignature)?;
257        let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
258        let tx_digest = *transaction.digest();
259        debug!(?tx_digest, "TO Received transaction execution request.");
260
261        let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
262            (
263                self.metrics.request_latency_shared_obj.start_timer(),
264                self.metrics
265                    .wait_for_finality_latency_shared_obj
266                    .start_timer(),
267            )
268        } else {
269            (
270                self.metrics.request_latency_single_writer.start_timer(),
271                self.metrics
272                    .wait_for_finality_latency_single_writer
273                    .start_timer(),
274            )
275        };
276
277        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
278        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
279        wait_for_finality_gauge.inc();
280        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
281            in_flight.dec();
282        });
283
284        let ticket = self
285            .submit(transaction.clone(), request, client_addr)
286            .await
287            .map_err(|e| {
288                warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
289                QuorumDriverError::QuorumDriverInternal(e)
290            })?;
291
292        let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
293            debug!(?tx_digest, "Timeout waiting for transaction finality.");
294            self.metrics.wait_for_finality_timeout.inc();
295            return Err(QuorumDriverError::TimeoutBeforeFinality);
296        };
297        add_server_timing("wait_for_finality");
298
299        drop(_txn_finality_timer);
300        drop(_wait_for_finality_gauge);
301        self.metrics.wait_for_finality_finished.inc();
302
303        match result {
304            Err(err) => {
305                warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
306                Err(QuorumDriverError::QuorumDriverInternal(err))
307            }
308            Ok(Err(err)) => Err(err),
309            Ok(Ok(response)) => {
310                good_response_metrics.inc();
311                Ok((transaction, response))
312            }
313        }
314    }
315
316    /// Submits the transaction to Quorum Driver for execution.
317    /// Returns an awaitable Future.
318    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
319    async fn submit(
320        &self,
321        transaction: VerifiedTransaction,
322        request: ExecuteTransactionRequestV1,
323        client_addr: Option<SocketAddr>,
324    ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
325        let tx_digest = *transaction.digest();
326        let ticket = self.notifier.register_one(&tx_digest);
327        // TODO(william) need to also write client adr to pending tx log below
328        // so that we can re-execute with this client addr if we restart
329        if self
330            .pending_tx_log
331            .write_pending_transaction_maybe(&transaction)
332            .await?
333        {
334            debug!(?tx_digest, "no pending request in flight, submitting.");
335            self.quorum_driver()
336                .submit_transaction_no_ticket(request.clone(), client_addr)
337                .await?;
338        }
339        // It's possible that the transaction effects is already stored in DB at this
340        // point. So we also subscribe to that. If we hear from `effects_await`
341        // first, it means the ticket misses the previous notification, and we
342        // want to ask quorum driver to form a certificate for us again, to
343        // serve this request.
344        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
345        let qd = self.clone_quorum_driver();
346        Ok(async move {
347            let digests = [tx_digest];
348            let effects_await = cache_reader.notify_read_executed_effects(&digests);
349            // let-and-return necessary to satisfy borrow checker.
350            let res = match select(ticket, effects_await.boxed()).await {
351                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
352                Either::Right((_, unfinished_quorum_driver_task)) => {
353                    debug!(
354                        ?tx_digest,
355                        "Effects are available in DB, use quorum driver to get a certificate"
356                    );
357                    qd.submit_transaction_no_ticket(request, client_addr)
358                        .await?;
359                    Ok(unfinished_quorum_driver_task.await)
360                }
361            };
362            #[expect(clippy::let_and_return)]
363            res
364        })
365    }
366
367    #[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
368    async fn execute_finalized_tx_locally_with_timeout(
369        validator_state: &Arc<AuthorityState>,
370        epoch_store: &Arc<AuthorityPerEpochStore>,
371        transaction: &VerifiedExecutableTransaction,
372        effects_cert: &VerifiedCertifiedTransactionEffects,
373        metrics: &TransactionOrchestratorMetrics,
374    ) -> IotaResult {
375        // TODO: attempt a finalized tx at most once per request.
376        // Every WaitForLocalExecution request will be attempted to execute twice,
377        // one from the subscriber queue, one from the proactive execution before
378        // returning results to clients. This is not insanely bad because:
379        // 1. it's possible that one attempt finishes before the other, so there's zero
380        //    extra work except DB checks
381        // 2. an up-to-date fullnode should have minimal overhead to sync parents (for
382        //    one extra time)
383        // 3. at the end of day, the tx will be executed at most once per lock guard.
384        let tx_digest = transaction.digest();
385        if validator_state.is_tx_already_executed(tx_digest)? {
386            return Ok(());
387        }
388        metrics.local_execution_in_flight.inc();
389        let _metrics_guard =
390            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
391                in_flight.dec();
392            });
393
394        let _guard = if transaction.contains_shared_object() {
395            metrics.local_execution_latency_shared_obj.start_timer()
396        } else {
397            metrics.local_execution_latency_single_writer.start_timer()
398        };
399        debug!(?tx_digest, "Executing finalized tx locally.");
400        match timeout(
401            LOCAL_EXECUTION_TIMEOUT,
402            validator_state.fullnode_execute_certificate_with_effects(
403                transaction,
404                effects_cert,
405                epoch_store,
406            ),
407        )
408        .instrument(error_span!(
409            "transaction_orchestrator::local_execution",
410            ?tx_digest
411        ))
412        .await
413        {
414            Err(_elapsed) => {
415                debug!(
416                    ?tx_digest,
417                    "Executing tx locally by orchestrator timed out within {:?}.",
418                    LOCAL_EXECUTION_TIMEOUT
419                );
420                metrics.local_execution_timeout.inc();
421                Err(IotaError::Timeout)
422            }
423            Ok(Err(err)) => {
424                debug!(
425                    ?tx_digest,
426                    "Executing tx locally by orchestrator failed with error: {:?}", err
427                );
428                metrics.local_execution_failure.inc();
429                Err(IotaError::TransactionOrchestratorLocalExecution {
430                    error: err.to_string(),
431                })
432            }
433            Ok(Ok(_)) => {
434                metrics.local_execution_success.inc();
435                Ok(())
436            }
437        }
438    }
439
440    async fn loop_execute_finalized_tx_locally(
441        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
442        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
443    ) {
444        loop {
445            match effects_receiver.recv().await {
446                Ok(Ok((transaction, ..))) => {
447                    let tx_digest = transaction.digest();
448                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
449                        error!(
450                            ?tx_digest,
451                            "Failed to finish transaction in pending transaction log: {err}"
452                        );
453                    }
454                }
455                Ok(Err((tx_digest, _err))) => {
456                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
457                        error!(
458                            ?tx_digest,
459                            "Failed to finish transaction in pending transaction log: {err}"
460                        );
461                    }
462                }
463                Err(RecvError::Closed) => {
464                    error!("Sender of effects subscriber queue has been dropped!");
465                    return;
466                }
467                Err(RecvError::Lagged(skipped_count)) => {
468                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
469                }
470            }
471        }
472    }
473
474    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
475        &self.quorum_driver_handler
476    }
477
478    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
479        self.quorum_driver_handler.clone()
480    }
481
482    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
483        self.quorum_driver().authority_aggregator().load_full()
484    }
485
486    pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
487        self.quorum_driver_handler.subscribe_to_effects()
488    }
489
490    fn update_metrics(
491        &'_ self,
492        transaction: &VerifiedTransaction,
493    ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
494        let (in_flight, good_response) = if transaction.contains_shared_object() {
495            self.metrics.total_req_received_shared_object.inc();
496            (
497                self.metrics.req_in_flight_shared_object.clone(),
498                &self.metrics.good_response_shared_object,
499            )
500        } else {
501            self.metrics.total_req_received_single_writer.inc();
502            (
503                self.metrics.req_in_flight_single_writer.clone(),
504                &self.metrics.good_response_single_writer,
505            )
506        };
507        in_flight.inc();
508        (
509            scopeguard::guard(in_flight, |in_flight| {
510                in_flight.dec();
511            }),
512            good_response,
513        )
514    }
515
516    fn schedule_txes_in_log(
517        pending_tx_log: Arc<WritePathPendingTransactionLog>,
518        quorum_driver: Arc<QuorumDriverHandler<A>>,
519    ) {
520        spawn_logged_monitored_task!(async move {
521            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
522                info!("Skipping loading pending transactions from pending_tx_log.");
523                return;
524            }
525            let pending_txes = pending_tx_log.load_all_pending_transactions();
526            info!(
527                "Recovering {} pending transactions from pending_tx_log.",
528                pending_txes.len()
529            );
530            for (i, tx) in pending_txes.into_iter().enumerate() {
531                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
532                // requires a migration.
533                let tx = tx.into_inner();
534                let tx_digest = *tx.digest();
535                // It's not impossible we fail to enqueue a task but that's not the end of
536                // world. TODO(william) correctly extract client_addr from logs
537                if let Err(err) = quorum_driver
538                    .submit_transaction_no_ticket(
539                        ExecuteTransactionRequestV1 {
540                            transaction: tx,
541                            include_events: true,
542                            include_input_objects: false,
543                            include_output_objects: false,
544                            include_auxiliary_data: false,
545                        },
546                        None,
547                    )
548                    .await
549                {
550                    warn!(
551                        ?tx_digest,
552                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
553                    );
554                } else {
555                    debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
556                    if (i + 1) % 1000 == 0 {
557                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
558                    }
559                }
560            }
561            // Transactions will be cleaned up in
562            // loop_execute_finalized_tx_locally() after they
563            // produce effects.
564        });
565    }
566
567    pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
568        self.pending_tx_log.load_all_pending_transactions()
569    }
570}
571
572/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
573#[derive(Clone)]
574pub struct TransactionOrchestratorMetrics {
575    total_req_received_single_writer: GenericCounter<AtomicU64>,
576    total_req_received_shared_object: GenericCounter<AtomicU64>,
577
578    good_response_single_writer: GenericCounter<AtomicU64>,
579    good_response_shared_object: GenericCounter<AtomicU64>,
580
581    req_in_flight_single_writer: GenericGauge<AtomicI64>,
582    req_in_flight_shared_object: GenericGauge<AtomicI64>,
583
584    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
585    wait_for_finality_finished: GenericCounter<AtomicU64>,
586    wait_for_finality_timeout: GenericCounter<AtomicU64>,
587
588    local_execution_in_flight: GenericGauge<AtomicI64>,
589    local_execution_success: GenericCounter<AtomicU64>,
590    local_execution_timeout: GenericCounter<AtomicU64>,
591    local_execution_failure: GenericCounter<AtomicU64>,
592
593    request_latency_single_writer: Histogram,
594    request_latency_shared_obj: Histogram,
595    wait_for_finality_latency_single_writer: Histogram,
596    wait_for_finality_latency_shared_obj: Histogram,
597    local_execution_latency_single_writer: Histogram,
598    local_execution_latency_shared_obj: Histogram,
599}
600
601// Note that labeled-metrics are stored upfront individually
602// to mitigate the perf hit by MetricsVec.
603// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
604impl TransactionOrchestratorMetrics {
605    pub fn new(registry: &Registry) -> Self {
606        let total_req_received = register_int_counter_vec_with_registry!(
607            "tx_orchestrator_total_req_received",
608            "Total number of executions request Transaction Orchestrator receives, group by tx type",
609            &["tx_type"],
610            registry
611        )
612        .unwrap();
613
614        let total_req_received_single_writer =
615            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
616        let total_req_received_shared_object =
617            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
618
619        let good_response = register_int_counter_vec_with_registry!(
620            "tx_orchestrator_good_response",
621            "Total number of good responses Transaction Orchestrator generates, group by tx type",
622            &["tx_type"],
623            registry
624        )
625        .unwrap();
626
627        let good_response_single_writer =
628            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
629        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
630
631        let req_in_flight = register_int_gauge_vec_with_registry!(
632            "tx_orchestrator_req_in_flight",
633            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
634            &["tx_type"],
635            registry
636        )
637        .unwrap();
638
639        let req_in_flight_single_writer =
640            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
641        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
642
643        let request_latency = HistogramVec::new_in_registry(
644            "tx_orchestrator_request_latency",
645            "Time spent in processing one Transaction Orchestrator request",
646            &["tx_type"],
647            registry,
648        );
649        let wait_for_finality_latency = HistogramVec::new_in_registry(
650            "tx_orchestrator_wait_for_finality_latency",
651            "Time spent in waiting for one Transaction Orchestrator request gets finalized",
652            &["tx_type"],
653            registry,
654        );
655        let local_execution_latency = HistogramVec::new_in_registry(
656            "tx_orchestrator_local_execution_latency",
657            "Time spent in waiting for one Transaction Orchestrator gets locally executed",
658            &["tx_type"],
659            registry,
660        );
661
662        Self {
663            total_req_received_single_writer,
664            total_req_received_shared_object,
665            good_response_single_writer,
666            good_response_shared_object,
667            req_in_flight_single_writer,
668            req_in_flight_shared_object,
669            wait_for_finality_in_flight: register_int_gauge_with_registry!(
670                "tx_orchestrator_wait_for_finality_in_flight",
671                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
672                registry,
673            )
674            .unwrap(),
675            wait_for_finality_finished: register_int_counter_with_registry!(
676                "tx_orchestrator_wait_for_finality_finished",
677                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
678                registry,
679            )
680            .unwrap(),
681            wait_for_finality_timeout: register_int_counter_with_registry!(
682                "tx_orchestrator_wait_for_finality_timeout",
683                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
684                registry,
685            )
686            .unwrap(),
687            local_execution_in_flight: register_int_gauge_with_registry!(
688                "tx_orchestrator_local_execution_in_flight",
689                "Number of local execution txns in flights Transaction Orchestrator handles",
690                registry,
691            )
692            .unwrap(),
693            local_execution_success: register_int_counter_with_registry!(
694                "tx_orchestrator_local_execution_success",
695                "Total number of successful local execution txns Transaction Orchestrator handles",
696                registry,
697            )
698            .unwrap(),
699            local_execution_timeout: register_int_counter_with_registry!(
700                "tx_orchestrator_local_execution_timeout",
701                "Total number of timed-out local execution txns Transaction Orchestrator handles",
702                registry,
703            )
704            .unwrap(),
705            local_execution_failure: register_int_counter_with_registry!(
706                "tx_orchestrator_local_execution_failure",
707                "Total number of failed local execution txns Transaction Orchestrator handles",
708                registry,
709            )
710            .unwrap(),
711            request_latency_single_writer: request_latency
712                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
713            request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
714            wait_for_finality_latency_single_writer: wait_for_finality_latency
715                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
716            wait_for_finality_latency_shared_obj: wait_for_finality_latency
717                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
718            local_execution_latency_single_writer: local_execution_latency
719                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
720            local_execution_latency_shared_obj: local_execution_latency
721                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
722        }
723    }
724
725    pub fn new_for_tests() -> Self {
726        let registry = Registry::new();
727        Self::new(&registry)
728    }
729}
730
731#[async_trait::async_trait]
732impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
733where
734    A: AuthorityAPI + Send + Sync + 'static + Clone,
735{
736    async fn execute_transaction(
737        &self,
738        request: ExecuteTransactionRequestV1,
739        client_addr: Option<std::net::SocketAddr>,
740    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
741        self.execute_transaction_v1(request, client_addr).await
742    }
743}