iota_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6// submit transactions to validators for finality, and proactively executes
7// finalized transactions locally, when possible.
8
9use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12    FutureExt,
13    future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18    spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22    base_types::TransactionDigest,
23    effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects},
24    error::{IotaError, IotaResult},
25    executable_transaction::VerifiedExecutableTransaction,
26    iota_system_state::IotaSystemState,
27    quorum_driver_types::{
28        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
29        FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
30        QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
31    },
32    transaction::{TransactionData, VerifiedTransaction},
33    transaction_executor::SimulateTransactionResult,
34};
35use prometheus::{
36    Histogram, Registry,
37    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
38    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
39    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
40    register_int_gauge_with_registry,
41};
42use tokio::{
43    sync::broadcast::{Receiver, error::RecvError},
44    task::JoinHandle,
45    time::timeout,
46};
47use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
48
49use crate::{
50    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
51    authority_aggregator::AuthorityAggregator,
52    authority_client::{AuthorityAPI, NetworkAuthorityClient},
53    quorum_driver::{
54        QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
55        reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
56    },
57};
58
59// How long to wait for local execution (including parents) before a timeout
60// is returned to client.
61const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
62
63const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
64
65/// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
66/// submit transactions to validators for finality, and proactively executes
67/// finalized transactions locally, when possible.
68pub struct TransactionOrchestrator<A: Clone> {
69    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
70    validator_state: Arc<AuthorityState>,
71    _local_executor_handle: JoinHandle<()>,
72    pending_tx_log: Arc<WritePathPendingTransactionLog>,
73    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
74    metrics: Arc<TransactionOrchestratorMetrics>,
75}
76
77impl TransactionOrchestrator<NetworkAuthorityClient> {
78    pub fn new_with_auth_aggregator(
79        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
80        validator_state: Arc<AuthorityState>,
81        reconfig_channel: Receiver<IotaSystemState>,
82        parent_path: &Path,
83        prometheus_registry: &Registry,
84    ) -> Self {
85        let observer = OnsiteReconfigObserver::new(
86            reconfig_channel,
87            validator_state.get_object_cache_reader().clone(),
88            validator_state.clone_committee_store(),
89            validators.safe_client_metrics_base.clone(),
90            validators.metrics.deref().clone(),
91        );
92
93        TransactionOrchestrator::new(
94            validators,
95            validator_state,
96            parent_path,
97            prometheus_registry,
98            observer,
99        )
100    }
101}
102
103impl<A> TransactionOrchestrator<A>
104where
105    A: AuthorityAPI + Send + Sync + 'static + Clone,
106    OnsiteReconfigObserver: ReconfigObserver<A>,
107{
108    pub fn new(
109        validators: Arc<AuthorityAggregator<A>>,
110        validator_state: Arc<AuthorityState>,
111        parent_path: &Path,
112        prometheus_registry: &Registry,
113        reconfig_observer: OnsiteReconfigObserver,
114    ) -> Self {
115        let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
116        let notifier = Arc::new(NotifyRead::new());
117        let reconfig_observer = Arc::new(reconfig_observer);
118        let quorum_driver_handler = Arc::new(
119            QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
120                .with_notifier(notifier.clone())
121                .with_reconfig_observer(reconfig_observer.clone())
122                .start(),
123        );
124
125        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
126        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
127        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
128            parent_path.join("fullnode_pending_transactions"),
129        ));
130        let pending_tx_log_clone = pending_tx_log.clone();
131        let _local_executor_handle = {
132            spawn_monitored_task!(async move {
133                Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
134                    .await;
135            })
136        };
137        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138        Self {
139            quorum_driver_handler,
140            validator_state,
141            _local_executor_handle,
142            pending_tx_log,
143            notifier,
144            metrics,
145        }
146    }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151    A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153    #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "debug", skip_all,
154    fields(
155        tx_digest = ?request.transaction.digest(),
156        tx_type = ?request_type,
157    ),
158    err)]
159    pub async fn execute_transaction_block(
160        &self,
161        request: ExecuteTransactionRequestV1,
162        request_type: ExecuteTransactionRequestType,
163        client_addr: Option<SocketAddr>,
164    ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165    {
166        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168        let (transaction, response) = self
169            .execute_transaction_impl(&epoch_store, request, client_addr)
170            .await?;
171
172        let executed_locally = if matches!(
173            request_type,
174            ExecuteTransactionRequestType::WaitForLocalExecution
175        ) {
176            let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
177                transaction,
178                response.effects_cert.executed_epoch(),
179            );
180            let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
181                &self.validator_state,
182                &epoch_store,
183                &executable_tx,
184                &response.effects_cert,
185                &self.metrics,
186            )
187            .await
188            .is_ok();
189            add_server_timing("local_execution");
190            executed_locally
191        } else {
192            false
193        };
194
195        let QuorumDriverResponse {
196            effects_cert,
197            events,
198            input_objects,
199            output_objects,
200            auxiliary_data,
201        } = response;
202
203        let response = ExecuteTransactionResponseV1 {
204            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
205            events,
206            input_objects,
207            output_objects,
208            auxiliary_data,
209        };
210
211        Ok((response, executed_locally))
212    }
213
214    // Utilize the handle_certificate_v1 validator api to request input/output
215    // objects
216    #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
217                 fields(tx_digest = ?request.transaction.digest()))]
218    pub async fn execute_transaction_v1(
219        &self,
220        request: ExecuteTransactionRequestV1,
221        client_addr: Option<SocketAddr>,
222    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
223        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
224
225        let QuorumDriverResponse {
226            effects_cert,
227            events,
228            input_objects,
229            output_objects,
230            auxiliary_data,
231        } = self
232            .execute_transaction_impl(&epoch_store, request, client_addr)
233            .await
234            .map(|(_, r)| r)?;
235
236        Ok(ExecuteTransactionResponseV1 {
237            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
238            events,
239            input_objects,
240            output_objects,
241            auxiliary_data,
242        })
243    }
244
245    // TODO check if tx is already executed on this node.
246    // Note: since EffectsCert is not stored today, we need to gather that from
247    // validators (and maybe store it for caching purposes)
248    pub async fn execute_transaction_impl(
249        &self,
250        epoch_store: &AuthorityPerEpochStore,
251        request: ExecuteTransactionRequestV1,
252        client_addr: Option<SocketAddr>,
253    ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
254        let transaction = epoch_store
255            .verify_transaction(request.transaction.clone())
256            .map_err(QuorumDriverError::InvalidUserSignature)?;
257        let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
258        let tx_digest = *transaction.digest();
259        debug!(?tx_digest, "TO Received transaction execution request.");
260
261        let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
262            (
263                self.metrics.request_latency_shared_obj.start_timer(),
264                self.metrics
265                    .wait_for_finality_latency_shared_obj
266                    .start_timer(),
267            )
268        } else {
269            (
270                self.metrics.request_latency_single_writer.start_timer(),
271                self.metrics
272                    .wait_for_finality_latency_single_writer
273                    .start_timer(),
274            )
275        };
276
277        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
278        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
279        wait_for_finality_gauge.inc();
280        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
281            in_flight.dec();
282        });
283
284        let ticket = self
285            .submit(transaction.clone(), request, client_addr)
286            .await
287            .map_err(|e| {
288                warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
289                QuorumDriverError::QuorumDriverInternal(e)
290            })?;
291
292        let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
293            debug!(?tx_digest, "Timeout waiting for transaction finality.");
294            self.metrics.wait_for_finality_timeout.inc();
295            return Err(QuorumDriverError::TimeoutBeforeFinality);
296        };
297        add_server_timing("wait_for_finality");
298
299        drop(_txn_finality_timer);
300        drop(_wait_for_finality_gauge);
301        self.metrics.wait_for_finality_finished.inc();
302
303        match result {
304            Err(err) => {
305                warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
306                Err(QuorumDriverError::QuorumDriverInternal(err))
307            }
308            Ok(Err(err)) => Err(err),
309            Ok(Ok(response)) => {
310                good_response_metrics.inc();
311                Ok((transaction, response))
312            }
313        }
314    }
315
316    /// Submits the transaction to Quorum Driver for execution.
317    /// Returns an awaitable Future.
318    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
319    async fn submit(
320        &self,
321        transaction: VerifiedTransaction,
322        request: ExecuteTransactionRequestV1,
323        client_addr: Option<SocketAddr>,
324    ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
325        let tx_digest = *transaction.digest();
326        let ticket = self.notifier.register_one(&tx_digest);
327        // TODO(william) need to also write client adr to pending tx log below
328        // so that we can re-execute with this client addr if we restart
329        if self
330            .pending_tx_log
331            .write_pending_transaction_maybe(&transaction)
332            .await?
333        {
334            debug!(?tx_digest, "no pending request in flight, submitting.");
335            self.quorum_driver()
336                .submit_transaction_no_ticket(request.clone(), client_addr)
337                .await?;
338        }
339        // It's possible that the transaction effects is already stored in DB at this
340        // point. So we also subscribe to that. If we hear from `effects_await`
341        // first, it means the ticket misses the previous notification, and we
342        // want to ask quorum driver to form a certificate for us again, to
343        // serve this request.
344        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
345        let qd = self.clone_quorum_driver();
346        Ok(async move {
347            let digests = [tx_digest];
348            let effects_await = cache_reader.notify_read_executed_effects(&digests);
349            // let-and-return necessary to satisfy borrow checker.
350            let res = match select(ticket, effects_await.boxed()).await {
351                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
352                Either::Right((_, unfinished_quorum_driver_task)) => {
353                    debug!(
354                        ?tx_digest,
355                        "Effects are available in DB, use quorum driver to get a certificate"
356                    );
357                    qd.submit_transaction_no_ticket(request, client_addr)
358                        .await?;
359                    Ok(unfinished_quorum_driver_task.await)
360                }
361            };
362            res
363        })
364    }
365
366    #[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
367    async fn execute_finalized_tx_locally_with_timeout(
368        validator_state: &Arc<AuthorityState>,
369        epoch_store: &Arc<AuthorityPerEpochStore>,
370        transaction: &VerifiedExecutableTransaction,
371        effects_cert: &VerifiedCertifiedTransactionEffects,
372        metrics: &TransactionOrchestratorMetrics,
373    ) -> IotaResult {
374        // TODO: attempt a finalized tx at most once per request.
375        // Every WaitForLocalExecution request will be attempted to execute twice,
376        // one from the subscriber queue, one from the proactive execution before
377        // returning results to clients. This is not insanely bad because:
378        // 1. it's possible that one attempt finishes before the other, so there's zero
379        //    extra work except DB checks
380        // 2. an up-to-date fullnode should have minimal overhead to sync parents (for
381        //    one extra time)
382        // 3. at the end of day, the tx will be executed at most once per lock guard.
383        let tx_digest = transaction.digest();
384        if validator_state.is_tx_already_executed(tx_digest)? {
385            return Ok(());
386        }
387        metrics.local_execution_in_flight.inc();
388        let _metrics_guard =
389            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
390                in_flight.dec();
391            });
392
393        let _guard = if transaction.contains_shared_object() {
394            metrics.local_execution_latency_shared_obj.start_timer()
395        } else {
396            metrics.local_execution_latency_single_writer.start_timer()
397        };
398        debug!(?tx_digest, "Executing finalized tx locally.");
399        match timeout(
400            LOCAL_EXECUTION_TIMEOUT,
401            validator_state.fullnode_execute_certificate_with_effects(
402                transaction,
403                effects_cert,
404                epoch_store,
405            ),
406        )
407        .instrument(error_span!(
408            "transaction_orchestrator::local_execution",
409            ?tx_digest
410        ))
411        .await
412        {
413            Err(_elapsed) => {
414                debug!(
415                    ?tx_digest,
416                    "Executing tx locally by orchestrator timed out within {:?}.",
417                    LOCAL_EXECUTION_TIMEOUT
418                );
419                metrics.local_execution_timeout.inc();
420                Err(IotaError::Timeout)
421            }
422            Ok(Err(err)) => {
423                debug!(
424                    ?tx_digest,
425                    "Executing tx locally by orchestrator failed with error: {:?}", err
426                );
427                metrics.local_execution_failure.inc();
428                Err(IotaError::TransactionOrchestratorLocalExecution {
429                    error: err.to_string(),
430                })
431            }
432            Ok(Ok(_)) => {
433                metrics.local_execution_success.inc();
434                Ok(())
435            }
436        }
437    }
438
439    async fn loop_execute_finalized_tx_locally(
440        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
441        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
442    ) {
443        loop {
444            match effects_receiver.recv().await {
445                Ok(Ok((transaction, ..))) => {
446                    let tx_digest = transaction.digest();
447                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
448                        error!(
449                            ?tx_digest,
450                            "Failed to finish transaction in pending transaction log: {err}"
451                        );
452                    }
453                }
454                Ok(Err((tx_digest, _err))) => {
455                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
456                        error!(
457                            ?tx_digest,
458                            "Failed to finish transaction in pending transaction log: {err}"
459                        );
460                    }
461                }
462                Err(RecvError::Closed) => {
463                    error!("Sender of effects subscriber queue has been dropped!");
464                    return;
465                }
466                Err(RecvError::Lagged(skipped_count)) => {
467                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
468                }
469            }
470        }
471    }
472
473    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
474        &self.quorum_driver_handler
475    }
476
477    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
478        self.quorum_driver_handler.clone()
479    }
480
481    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
482        self.quorum_driver().authority_aggregator().load_full()
483    }
484
485    pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
486        self.quorum_driver_handler.subscribe_to_effects()
487    }
488
489    fn update_metrics(
490        &'_ self,
491        transaction: &VerifiedTransaction,
492    ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
493        let (in_flight, good_response) = if transaction.contains_shared_object() {
494            self.metrics.total_req_received_shared_object.inc();
495            (
496                self.metrics.req_in_flight_shared_object.clone(),
497                &self.metrics.good_response_shared_object,
498            )
499        } else {
500            self.metrics.total_req_received_single_writer.inc();
501            (
502                self.metrics.req_in_flight_single_writer.clone(),
503                &self.metrics.good_response_single_writer,
504            )
505        };
506        in_flight.inc();
507        (
508            scopeguard::guard(in_flight, |in_flight| {
509                in_flight.dec();
510            }),
511            good_response,
512        )
513    }
514
515    fn schedule_txes_in_log(
516        pending_tx_log: Arc<WritePathPendingTransactionLog>,
517        quorum_driver: Arc<QuorumDriverHandler<A>>,
518    ) {
519        spawn_logged_monitored_task!(async move {
520            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
521                info!("Skipping loading pending transactions from pending_tx_log.");
522                return;
523            }
524            let pending_txes = pending_tx_log.load_all_pending_transactions();
525            info!(
526                "Recovering {} pending transactions from pending_tx_log.",
527                pending_txes.len()
528            );
529            for (i, tx) in pending_txes.into_iter().enumerate() {
530                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
531                // requires a migration.
532                let tx = tx.into_inner();
533                let tx_digest = *tx.digest();
534                // It's not impossible we fail to enqueue a task but that's not the end of
535                // world. TODO(william) correctly extract client_addr from logs
536                if let Err(err) = quorum_driver
537                    .submit_transaction_no_ticket(
538                        ExecuteTransactionRequestV1 {
539                            transaction: tx,
540                            include_events: true,
541                            include_input_objects: false,
542                            include_output_objects: false,
543                            include_auxiliary_data: false,
544                        },
545                        None,
546                    )
547                    .await
548                {
549                    warn!(
550                        ?tx_digest,
551                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
552                    );
553                } else {
554                    debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
555                    if (i + 1) % 1000 == 0 {
556                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
557                    }
558                }
559            }
560            // Transactions will be cleaned up in
561            // loop_execute_finalized_tx_locally() after they
562            // produce effects.
563        });
564    }
565
566    pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
567        self.pending_tx_log.load_all_pending_transactions()
568    }
569}
570
571/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
572#[derive(Clone)]
573pub struct TransactionOrchestratorMetrics {
574    total_req_received_single_writer: GenericCounter<AtomicU64>,
575    total_req_received_shared_object: GenericCounter<AtomicU64>,
576
577    good_response_single_writer: GenericCounter<AtomicU64>,
578    good_response_shared_object: GenericCounter<AtomicU64>,
579
580    req_in_flight_single_writer: GenericGauge<AtomicI64>,
581    req_in_flight_shared_object: GenericGauge<AtomicI64>,
582
583    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
584    wait_for_finality_finished: GenericCounter<AtomicU64>,
585    wait_for_finality_timeout: GenericCounter<AtomicU64>,
586
587    local_execution_in_flight: GenericGauge<AtomicI64>,
588    local_execution_success: GenericCounter<AtomicU64>,
589    local_execution_timeout: GenericCounter<AtomicU64>,
590    local_execution_failure: GenericCounter<AtomicU64>,
591
592    request_latency_single_writer: Histogram,
593    request_latency_shared_obj: Histogram,
594    wait_for_finality_latency_single_writer: Histogram,
595    wait_for_finality_latency_shared_obj: Histogram,
596    local_execution_latency_single_writer: Histogram,
597    local_execution_latency_shared_obj: Histogram,
598}
599
600// Note that labeled-metrics are stored upfront individually
601// to mitigate the perf hit by MetricsVec.
602// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
603impl TransactionOrchestratorMetrics {
604    pub fn new(registry: &Registry) -> Self {
605        let total_req_received = register_int_counter_vec_with_registry!(
606            "tx_orchestrator_total_req_received",
607            "Total number of executions request Transaction Orchestrator receives, group by tx type",
608            &["tx_type"],
609            registry
610        )
611        .unwrap();
612
613        let total_req_received_single_writer =
614            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
615        let total_req_received_shared_object =
616            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
617
618        let good_response = register_int_counter_vec_with_registry!(
619            "tx_orchestrator_good_response",
620            "Total number of good responses Transaction Orchestrator generates, group by tx type",
621            &["tx_type"],
622            registry
623        )
624        .unwrap();
625
626        let good_response_single_writer =
627            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
628        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
629
630        let req_in_flight = register_int_gauge_vec_with_registry!(
631            "tx_orchestrator_req_in_flight",
632            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
633            &["tx_type"],
634            registry
635        )
636        .unwrap();
637
638        let req_in_flight_single_writer =
639            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
640        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
641
642        let request_latency = register_histogram_vec_with_registry!(
643            "tx_orchestrator_request_latency",
644            "Time spent in processing one Transaction Orchestrator request",
645            &["tx_type"],
646            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
647            registry,
648        )
649        .unwrap();
650        let wait_for_finality_latency = register_histogram_vec_with_registry!(
651            "tx_orchestrator_wait_for_finality_latency",
652            "Time spent in waiting for one Transaction Orchestrator request gets finalized",
653            &["tx_type"],
654            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
655            registry,
656        )
657        .unwrap();
658        let local_execution_latency = register_histogram_vec_with_registry!(
659            "tx_orchestrator_local_execution_latency",
660            "Time spent in waiting for one Transaction Orchestrator gets locally executed",
661            &["tx_type"],
662            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
663            registry,
664        )
665        .unwrap();
666
667        Self {
668            total_req_received_single_writer,
669            total_req_received_shared_object,
670            good_response_single_writer,
671            good_response_shared_object,
672            req_in_flight_single_writer,
673            req_in_flight_shared_object,
674            wait_for_finality_in_flight: register_int_gauge_with_registry!(
675                "tx_orchestrator_wait_for_finality_in_flight",
676                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
677                registry,
678            )
679            .unwrap(),
680            wait_for_finality_finished: register_int_counter_with_registry!(
681                "tx_orchestrator_wait_for_finality_finished",
682                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
683                registry,
684            )
685            .unwrap(),
686            wait_for_finality_timeout: register_int_counter_with_registry!(
687                "tx_orchestrator_wait_for_finality_timeout",
688                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
689                registry,
690            )
691            .unwrap(),
692            local_execution_in_flight: register_int_gauge_with_registry!(
693                "tx_orchestrator_local_execution_in_flight",
694                "Number of local execution txns in flights Transaction Orchestrator handles",
695                registry,
696            )
697            .unwrap(),
698            local_execution_success: register_int_counter_with_registry!(
699                "tx_orchestrator_local_execution_success",
700                "Total number of successful local execution txns Transaction Orchestrator handles",
701                registry,
702            )
703            .unwrap(),
704            local_execution_timeout: register_int_counter_with_registry!(
705                "tx_orchestrator_local_execution_timeout",
706                "Total number of timed-out local execution txns Transaction Orchestrator handles",
707                registry,
708            )
709            .unwrap(),
710            local_execution_failure: register_int_counter_with_registry!(
711                "tx_orchestrator_local_execution_failure",
712                "Total number of failed local execution txns Transaction Orchestrator handles",
713                registry,
714            )
715            .unwrap(),
716            request_latency_single_writer: request_latency
717                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
718            request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
719            wait_for_finality_latency_single_writer: wait_for_finality_latency
720                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
721            wait_for_finality_latency_shared_obj: wait_for_finality_latency
722                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
723            local_execution_latency_single_writer: local_execution_latency
724                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
725            local_execution_latency_shared_obj: local_execution_latency
726                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
727        }
728    }
729
730    pub fn new_for_tests() -> Self {
731        let registry = Registry::new();
732        Self::new(&registry)
733    }
734}
735
736#[async_trait::async_trait]
737impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
738where
739    A: AuthorityAPI + Send + Sync + 'static + Clone,
740{
741    async fn execute_transaction(
742        &self,
743        request: ExecuteTransactionRequestV1,
744        client_addr: Option<std::net::SocketAddr>,
745    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
746        self.execute_transaction_v1(request, client_addr).await
747    }
748
749    fn simulate_transaction(
750        &self,
751        transaction: TransactionData,
752    ) -> Result<SimulateTransactionResult, IotaError> {
753        self.validator_state.simulate_transaction(transaction)
754    }
755}