iota_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6// submit transactions to validators for finality, and proactively executes
7// finalized transactions locally, when possible.
8
9use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12    FutureExt,
13    future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18    spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22    base_types::TransactionDigest,
23    effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects},
24    error::{IotaError, IotaResult},
25    executable_transaction::VerifiedExecutableTransaction,
26    iota_system_state::IotaSystemState,
27    quorum_driver_types::{
28        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
29        FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
30        QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
31    },
32    transaction::{TransactionData, VerifiedTransaction},
33    transaction_executor::SimulateTransactionResult,
34};
35use prometheus::{
36    Histogram, Registry,
37    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
38    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
39    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
40    register_int_gauge_with_registry,
41};
42use tokio::{
43    sync::broadcast::{Receiver, error::RecvError},
44    task::JoinHandle,
45    time::timeout,
46};
47use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
48
49use crate::{
50    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
51    authority_aggregator::AuthorityAggregator,
52    authority_client::{AuthorityAPI, NetworkAuthorityClient},
53    quorum_driver::{
54        QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
55        reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
56    },
57};
58
59// How long to wait for local execution (including parents) before a timeout
60// is returned to client.
61const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
62
63const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
64
65/// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
66/// submit transactions to validators for finality, and proactively executes
67/// finalized transactions locally, when possible.
68pub struct TransactionOrchestrator<A: Clone> {
69    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
70    validator_state: Arc<AuthorityState>,
71    _local_executor_handle: JoinHandle<()>,
72    pending_tx_log: Arc<WritePathPendingTransactionLog>,
73    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
74    metrics: Arc<TransactionOrchestratorMetrics>,
75}
76
77impl TransactionOrchestrator<NetworkAuthorityClient> {
78    pub fn new_with_auth_aggregator(
79        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
80        validator_state: Arc<AuthorityState>,
81        reconfig_channel: Receiver<IotaSystemState>,
82        parent_path: &Path,
83        prometheus_registry: &Registry,
84    ) -> Self {
85        let observer = OnsiteReconfigObserver::new(
86            reconfig_channel,
87            validator_state.get_object_cache_reader().clone(),
88            validator_state.clone_committee_store(),
89            validators.safe_client_metrics_base.clone(),
90            validators.metrics.deref().clone(),
91        );
92
93        TransactionOrchestrator::new(
94            validators,
95            validator_state,
96            parent_path,
97            prometheus_registry,
98            observer,
99        )
100    }
101}
102
103impl<A> TransactionOrchestrator<A>
104where
105    A: AuthorityAPI + Send + Sync + 'static + Clone,
106    OnsiteReconfigObserver: ReconfigObserver<A>,
107{
108    pub fn new(
109        validators: Arc<AuthorityAggregator<A>>,
110        validator_state: Arc<AuthorityState>,
111        parent_path: &Path,
112        prometheus_registry: &Registry,
113        reconfig_observer: OnsiteReconfigObserver,
114    ) -> Self {
115        let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
116        let notifier = Arc::new(NotifyRead::new());
117        let reconfig_observer = Arc::new(reconfig_observer);
118        let quorum_driver_handler = Arc::new(
119            QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
120                .with_notifier(notifier.clone())
121                .with_reconfig_observer(reconfig_observer.clone())
122                .start(),
123        );
124
125        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
126        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
127        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
128            parent_path.join("fullnode_pending_transactions"),
129        ));
130        let pending_tx_log_clone = pending_tx_log.clone();
131        let _local_executor_handle = {
132            spawn_monitored_task!(async move {
133                Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
134                    .await;
135            })
136        };
137        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138        Self {
139            quorum_driver_handler,
140            validator_state,
141            _local_executor_handle,
142            pending_tx_log,
143            notifier,
144            metrics,
145        }
146    }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151    A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153    #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
154    fields(
155        tx_digest = ?request.transaction.digest(),
156        tx_type = ?request_type,
157    ),
158    err)]
159    pub async fn execute_transaction_block(
160        &self,
161        request: ExecuteTransactionRequestV1,
162        request_type: ExecuteTransactionRequestType,
163        client_addr: Option<SocketAddr>,
164    ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165    {
166        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168        let (transaction, response) = self
169            .execute_transaction_impl(&epoch_store, request, client_addr)
170            .await?;
171
172        let executed_locally = if matches!(
173            request_type,
174            ExecuteTransactionRequestType::WaitForLocalExecution
175        ) {
176            let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
177                transaction,
178                response.effects_cert.executed_epoch(),
179            );
180            let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
181                &self.validator_state,
182                &epoch_store,
183                &executable_tx,
184                &response.effects_cert,
185                &self.metrics,
186            )
187            .await
188            .is_ok();
189            add_server_timing("local_execution");
190            executed_locally
191        } else {
192            false
193        };
194
195        let QuorumDriverResponse {
196            effects_cert,
197            events,
198            input_objects,
199            output_objects,
200            auxiliary_data,
201        } = response;
202
203        let response = ExecuteTransactionResponseV1 {
204            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
205            events,
206            input_objects,
207            output_objects,
208            auxiliary_data,
209        };
210
211        Ok((response, executed_locally))
212    }
213
214    // Utilize the handle_certificate_v1 validator api to request input/output
215    // objects
216    #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
217                 fields(tx_digest = ?request.transaction.digest()))]
218    pub async fn execute_transaction_v1(
219        &self,
220        request: ExecuteTransactionRequestV1,
221        client_addr: Option<SocketAddr>,
222    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
223        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
224
225        let QuorumDriverResponse {
226            effects_cert,
227            events,
228            input_objects,
229            output_objects,
230            auxiliary_data,
231        } = self
232            .execute_transaction_impl(&epoch_store, request, client_addr)
233            .await
234            .map(|(_, r)| r)?;
235
236        Ok(ExecuteTransactionResponseV1 {
237            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
238            events,
239            input_objects,
240            output_objects,
241            auxiliary_data,
242        })
243    }
244
245    // TODO check if tx is already executed on this node.
246    // Note: since EffectsCert is not stored today, we need to gather that from
247    // validators (and maybe store it for caching purposes)
248    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
249    pub async fn execute_transaction_impl(
250        &self,
251        epoch_store: &AuthorityPerEpochStore,
252        request: ExecuteTransactionRequestV1,
253        client_addr: Option<SocketAddr>,
254    ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
255        let transaction = epoch_store
256            .verify_transaction(request.transaction.clone())
257            .map_err(QuorumDriverError::InvalidUserSignature)?;
258        let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
259        let tx_digest = *transaction.digest();
260        debug!(?tx_digest, "TO Received transaction execution request.");
261
262        let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
263            (
264                self.metrics.request_latency_shared_obj.start_timer(),
265                self.metrics
266                    .wait_for_finality_latency_shared_obj
267                    .start_timer(),
268            )
269        } else {
270            (
271                self.metrics.request_latency_single_writer.start_timer(),
272                self.metrics
273                    .wait_for_finality_latency_single_writer
274                    .start_timer(),
275            )
276        };
277
278        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
279        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
280        wait_for_finality_gauge.inc();
281        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
282            in_flight.dec();
283        });
284
285        let ticket = self
286            .submit(transaction.clone(), request, client_addr)
287            .await
288            .map_err(|e| {
289                warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
290                QuorumDriverError::QuorumDriverInternal(e)
291            })?;
292
293        let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
294            debug!(?tx_digest, "Timeout waiting for transaction finality.");
295            self.metrics.wait_for_finality_timeout.inc();
296            return Err(QuorumDriverError::TimeoutBeforeFinality);
297        };
298        add_server_timing("wait_for_finality");
299
300        drop(_txn_finality_timer);
301        drop(_wait_for_finality_gauge);
302        self.metrics.wait_for_finality_finished.inc();
303
304        match result {
305            Err(err) => {
306                warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
307                Err(QuorumDriverError::QuorumDriverInternal(err))
308            }
309            Ok(Err(err)) => Err(err),
310            Ok(Ok(response)) => {
311                good_response_metrics.inc();
312                Ok((transaction, response))
313            }
314        }
315    }
316
317    /// Submits the transaction to Quorum Driver for execution.
318    /// Returns an awaitable Future.
319    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
320    async fn submit(
321        &self,
322        transaction: VerifiedTransaction,
323        request: ExecuteTransactionRequestV1,
324        client_addr: Option<SocketAddr>,
325    ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
326        let tx_digest = *transaction.digest();
327        let ticket = self.notifier.register_one(&tx_digest);
328        // TODO(william) need to also write client adr to pending tx log below
329        // so that we can re-execute with this client addr if we restart
330        if self
331            .pending_tx_log
332            .write_pending_transaction_maybe(&transaction)
333            .await?
334        {
335            debug!(?tx_digest, "no pending request in flight, submitting.");
336            self.quorum_driver()
337                .submit_transaction_no_ticket(request.clone(), client_addr)
338                .await?;
339        }
340        // It's possible that the transaction effects is already stored in DB at this
341        // point. So we also subscribe to that. If we hear from `effects_await`
342        // first, it means the ticket misses the previous notification, and we
343        // want to ask quorum driver to form a certificate for us again, to
344        // serve this request.
345        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
346        let qd = self.clone_quorum_driver();
347        Ok(async move {
348            let digests = [tx_digest];
349            let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
350            // let-and-return necessary to satisfy borrow checker.
351            let res = match select(ticket, effects_await.boxed()).await {
352                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
353                Either::Right((_, unfinished_quorum_driver_task)) => {
354                    debug!(
355                        ?tx_digest,
356                        "Effects are available in DB, use quorum driver to get a certificate"
357                    );
358                    qd.submit_transaction_no_ticket(request, client_addr)
359                        .await?;
360                    Ok(unfinished_quorum_driver_task.await)
361                }
362            };
363            res
364        })
365    }
366
367    #[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
368    async fn execute_finalized_tx_locally_with_timeout(
369        validator_state: &Arc<AuthorityState>,
370        epoch_store: &Arc<AuthorityPerEpochStore>,
371        transaction: &VerifiedExecutableTransaction,
372        effects_cert: &VerifiedCertifiedTransactionEffects,
373        metrics: &TransactionOrchestratorMetrics,
374    ) -> IotaResult {
375        // TODO: attempt a finalized tx at most once per request.
376        // Every WaitForLocalExecution request will be attempted to execute twice,
377        // one from the subscriber queue, one from the proactive execution before
378        // returning results to clients. This is not insanely bad because:
379        // 1. it's possible that one attempt finishes before the other, so there's zero
380        //    extra work except DB checks
381        // 2. an up-to-date fullnode should have minimal overhead to sync parents (for
382        //    one extra time)
383        // 3. at the end of day, the tx will be executed at most once per lock guard.
384        let tx_digest = transaction.digest();
385        if validator_state.try_is_tx_already_executed(tx_digest)? {
386            return Ok(());
387        }
388        metrics.local_execution_in_flight.inc();
389        let _metrics_guard =
390            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
391                in_flight.dec();
392            });
393
394        let _guard = if transaction.contains_shared_object() {
395            metrics.local_execution_latency_shared_obj.start_timer()
396        } else {
397            metrics.local_execution_latency_single_writer.start_timer()
398        };
399        debug!(?tx_digest, "Executing finalized tx locally.");
400        match timeout(
401            LOCAL_EXECUTION_TIMEOUT,
402            validator_state.fullnode_execute_certificate_with_effects(
403                transaction,
404                effects_cert,
405                epoch_store,
406            ),
407        )
408        .instrument(trace_span!("local_execution"))
409        .await
410        {
411            Err(_elapsed) => {
412                debug!(
413                    ?tx_digest,
414                    "Executing tx locally by orchestrator timed out within {:?}.",
415                    LOCAL_EXECUTION_TIMEOUT
416                );
417                metrics.local_execution_timeout.inc();
418                Err(IotaError::Timeout)
419            }
420            Ok(Err(err)) => {
421                debug!(
422                    ?tx_digest,
423                    "Executing tx locally by orchestrator failed with error: {:?}", err
424                );
425                metrics.local_execution_failure.inc();
426                Err(IotaError::TransactionOrchestratorLocalExecution {
427                    error: err.to_string(),
428                })
429            }
430            Ok(Ok(_)) => {
431                metrics.local_execution_success.inc();
432                Ok(())
433            }
434        }
435    }
436
437    async fn loop_execute_finalized_tx_locally(
438        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
439        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
440    ) {
441        loop {
442            match effects_receiver.recv().await {
443                Ok(Ok((transaction, ..))) => {
444                    let tx_digest = transaction.digest();
445                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
446                        error!(
447                            ?tx_digest,
448                            "Failed to finish transaction in pending transaction log: {err}"
449                        );
450                    }
451                }
452                Ok(Err((tx_digest, _err))) => {
453                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
454                        error!(
455                            ?tx_digest,
456                            "Failed to finish transaction in pending transaction log: {err}"
457                        );
458                    }
459                }
460                Err(RecvError::Closed) => {
461                    error!("Sender of effects subscriber queue has been dropped!");
462                    return;
463                }
464                Err(RecvError::Lagged(skipped_count)) => {
465                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
466                }
467            }
468        }
469    }
470
471    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
472        &self.quorum_driver_handler
473    }
474
475    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
476        self.quorum_driver_handler.clone()
477    }
478
479    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
480        self.quorum_driver().authority_aggregator().load_full()
481    }
482
483    pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
484        self.quorum_driver_handler.subscribe_to_effects()
485    }
486
487    fn update_metrics(
488        &'_ self,
489        transaction: &VerifiedTransaction,
490    ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
491        let (in_flight, good_response) = if transaction.contains_shared_object() {
492            self.metrics.total_req_received_shared_object.inc();
493            (
494                self.metrics.req_in_flight_shared_object.clone(),
495                &self.metrics.good_response_shared_object,
496            )
497        } else {
498            self.metrics.total_req_received_single_writer.inc();
499            (
500                self.metrics.req_in_flight_single_writer.clone(),
501                &self.metrics.good_response_single_writer,
502            )
503        };
504        in_flight.inc();
505        (
506            scopeguard::guard(in_flight, |in_flight| {
507                in_flight.dec();
508            }),
509            good_response,
510        )
511    }
512
513    fn schedule_txes_in_log(
514        pending_tx_log: Arc<WritePathPendingTransactionLog>,
515        quorum_driver: Arc<QuorumDriverHandler<A>>,
516    ) {
517        spawn_logged_monitored_task!(async move {
518            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
519                info!("Skipping loading pending transactions from pending_tx_log.");
520                return;
521            }
522            let pending_txes = pending_tx_log.load_all_pending_transactions();
523            info!(
524                "Recovering {} pending transactions from pending_tx_log.",
525                pending_txes.len()
526            );
527            for (i, tx) in pending_txes.into_iter().enumerate() {
528                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
529                // requires a migration.
530                let tx = tx.into_inner();
531                let tx_digest = *tx.digest();
532                // It's not impossible we fail to enqueue a task but that's not the end of
533                // world. TODO(william) correctly extract client_addr from logs
534                if let Err(err) = quorum_driver
535                    .submit_transaction_no_ticket(
536                        ExecuteTransactionRequestV1 {
537                            transaction: tx,
538                            include_events: true,
539                            include_input_objects: false,
540                            include_output_objects: false,
541                            include_auxiliary_data: false,
542                        },
543                        None,
544                    )
545                    .await
546                {
547                    warn!(
548                        ?tx_digest,
549                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
550                    );
551                } else {
552                    debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
553                    if (i + 1) % 1000 == 0 {
554                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
555                    }
556                }
557            }
558            // Transactions will be cleaned up in
559            // loop_execute_finalized_tx_locally() after they
560            // produce effects.
561        });
562    }
563
564    pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
565        self.pending_tx_log.load_all_pending_transactions()
566    }
567}
568
569/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
570#[derive(Clone)]
571pub struct TransactionOrchestratorMetrics {
572    total_req_received_single_writer: GenericCounter<AtomicU64>,
573    total_req_received_shared_object: GenericCounter<AtomicU64>,
574
575    good_response_single_writer: GenericCounter<AtomicU64>,
576    good_response_shared_object: GenericCounter<AtomicU64>,
577
578    req_in_flight_single_writer: GenericGauge<AtomicI64>,
579    req_in_flight_shared_object: GenericGauge<AtomicI64>,
580
581    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
582    wait_for_finality_finished: GenericCounter<AtomicU64>,
583    wait_for_finality_timeout: GenericCounter<AtomicU64>,
584
585    local_execution_in_flight: GenericGauge<AtomicI64>,
586    local_execution_success: GenericCounter<AtomicU64>,
587    local_execution_timeout: GenericCounter<AtomicU64>,
588    local_execution_failure: GenericCounter<AtomicU64>,
589
590    request_latency_single_writer: Histogram,
591    request_latency_shared_obj: Histogram,
592    wait_for_finality_latency_single_writer: Histogram,
593    wait_for_finality_latency_shared_obj: Histogram,
594    local_execution_latency_single_writer: Histogram,
595    local_execution_latency_shared_obj: Histogram,
596}
597
598// Note that labeled-metrics are stored upfront individually
599// to mitigate the perf hit by MetricsVec.
600// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
601impl TransactionOrchestratorMetrics {
602    pub fn new(registry: &Registry) -> Self {
603        let total_req_received = register_int_counter_vec_with_registry!(
604            "tx_orchestrator_total_req_received",
605            "Total number of executions request Transaction Orchestrator receives, group by tx type",
606            &["tx_type"],
607            registry
608        )
609        .unwrap();
610
611        let total_req_received_single_writer =
612            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
613        let total_req_received_shared_object =
614            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
615
616        let good_response = register_int_counter_vec_with_registry!(
617            "tx_orchestrator_good_response",
618            "Total number of good responses Transaction Orchestrator generates, group by tx type",
619            &["tx_type"],
620            registry
621        )
622        .unwrap();
623
624        let good_response_single_writer =
625            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
626        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
627
628        let req_in_flight = register_int_gauge_vec_with_registry!(
629            "tx_orchestrator_req_in_flight",
630            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
631            &["tx_type"],
632            registry
633        )
634        .unwrap();
635
636        let req_in_flight_single_writer =
637            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
638        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
639
640        let request_latency = register_histogram_vec_with_registry!(
641            "tx_orchestrator_request_latency",
642            "Time spent in processing one Transaction Orchestrator request",
643            &["tx_type"],
644            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
645            registry,
646        )
647        .unwrap();
648        let wait_for_finality_latency = register_histogram_vec_with_registry!(
649            "tx_orchestrator_wait_for_finality_latency",
650            "Time spent in waiting for one Transaction Orchestrator request gets finalized",
651            &["tx_type"],
652            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
653            registry,
654        )
655        .unwrap();
656        let local_execution_latency = register_histogram_vec_with_registry!(
657            "tx_orchestrator_local_execution_latency",
658            "Time spent in waiting for one Transaction Orchestrator gets locally executed",
659            &["tx_type"],
660            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
661            registry,
662        )
663        .unwrap();
664
665        Self {
666            total_req_received_single_writer,
667            total_req_received_shared_object,
668            good_response_single_writer,
669            good_response_shared_object,
670            req_in_flight_single_writer,
671            req_in_flight_shared_object,
672            wait_for_finality_in_flight: register_int_gauge_with_registry!(
673                "tx_orchestrator_wait_for_finality_in_flight",
674                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
675                registry,
676            )
677            .unwrap(),
678            wait_for_finality_finished: register_int_counter_with_registry!(
679                "tx_orchestrator_wait_for_finality_finished",
680                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
681                registry,
682            )
683            .unwrap(),
684            wait_for_finality_timeout: register_int_counter_with_registry!(
685                "tx_orchestrator_wait_for_finality_timeout",
686                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
687                registry,
688            )
689            .unwrap(),
690            local_execution_in_flight: register_int_gauge_with_registry!(
691                "tx_orchestrator_local_execution_in_flight",
692                "Number of local execution txns in flights Transaction Orchestrator handles",
693                registry,
694            )
695            .unwrap(),
696            local_execution_success: register_int_counter_with_registry!(
697                "tx_orchestrator_local_execution_success",
698                "Total number of successful local execution txns Transaction Orchestrator handles",
699                registry,
700            )
701            .unwrap(),
702            local_execution_timeout: register_int_counter_with_registry!(
703                "tx_orchestrator_local_execution_timeout",
704                "Total number of timed-out local execution txns Transaction Orchestrator handles",
705                registry,
706            )
707            .unwrap(),
708            local_execution_failure: register_int_counter_with_registry!(
709                "tx_orchestrator_local_execution_failure",
710                "Total number of failed local execution txns Transaction Orchestrator handles",
711                registry,
712            )
713            .unwrap(),
714            request_latency_single_writer: request_latency
715                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
716            request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
717            wait_for_finality_latency_single_writer: wait_for_finality_latency
718                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
719            wait_for_finality_latency_shared_obj: wait_for_finality_latency
720                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
721            local_execution_latency_single_writer: local_execution_latency
722                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
723            local_execution_latency_shared_obj: local_execution_latency
724                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
725        }
726    }
727
728    pub fn new_for_tests() -> Self {
729        let registry = Registry::new();
730        Self::new(&registry)
731    }
732}
733
734#[async_trait::async_trait]
735impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
736where
737    A: AuthorityAPI + Send + Sync + 'static + Clone,
738{
739    async fn execute_transaction(
740        &self,
741        request: ExecuteTransactionRequestV1,
742        client_addr: Option<std::net::SocketAddr>,
743    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
744        self.execute_transaction_v1(request, client_addr).await
745    }
746
747    fn simulate_transaction(
748        &self,
749        transaction: TransactionData,
750    ) -> Result<SimulateTransactionResult, IotaError> {
751        self.validator_state.simulate_transaction(transaction)
752    }
753}