iota_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6// submit transactions to validators for finality, and proactively executes
7// finalized transactions locally, when possible.
8
9use std::{
10    collections::BTreeMap, net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration,
11};
12
13use futures::{
14    FutureExt,
15    future::{Either, Future, select},
16};
17use iota_common::sync::notify_read::NotifyRead;
18use iota_metrics::{
19    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
20    spawn_logged_monitored_task, spawn_monitored_task,
21};
22use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
23use iota_types::{
24    base_types::TransactionDigest,
25    error::{IotaError, IotaResult},
26    iota_system_state::IotaSystemState,
27    messages_checkpoint::CheckpointSequenceNumber,
28    quorum_driver_types::{
29        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
30        FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
31        QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
32    },
33    transaction::{TransactionData, VerifiedTransaction},
34    transaction_executor::{SimulateTransactionResult, VmChecks},
35};
36use prometheus::{
37    Histogram, Registry,
38    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
39    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
40    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
41    register_int_gauge_with_registry,
42};
43use tokio::{
44    sync::broadcast::{Receiver, error::RecvError},
45    task::JoinHandle,
46    time::timeout,
47};
48use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
49
50use crate::{
51    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
52    authority_aggregator::AuthorityAggregator,
53    authority_client::{AuthorityAPI, NetworkAuthorityClient},
54    quorum_driver::{
55        QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
56        reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
57    },
58};
59
60// How long to wait for local execution (including parents) before a timeout
61// is returned to client.
62const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
63
64const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
65
66/// Transaction Orchestrator is a Node component that utilizes Quorum Driver to
67/// submit transactions to validators for finality, and proactively executes
68/// finalized transactions locally, when possible.
69pub struct TransactionOrchestrator<A: Clone> {
70    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
71    validator_state: Arc<AuthorityState>,
72    _local_executor_handle: JoinHandle<()>,
73    pending_tx_log: Arc<WritePathPendingTransactionLog>,
74    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
75    metrics: Arc<TransactionOrchestratorMetrics>,
76}
77
78impl TransactionOrchestrator<NetworkAuthorityClient> {
79    pub fn new_with_auth_aggregator(
80        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
81        validator_state: Arc<AuthorityState>,
82        reconfig_channel: Receiver<IotaSystemState>,
83        parent_path: &Path,
84        prometheus_registry: &Registry,
85    ) -> Self {
86        let observer = OnsiteReconfigObserver::new(
87            reconfig_channel,
88            validator_state.get_object_cache_reader().clone(),
89            validator_state.clone_committee_store(),
90            validators.safe_client_metrics_base.clone(),
91            validators.metrics.deref().clone(),
92        );
93
94        TransactionOrchestrator::new(
95            validators,
96            validator_state,
97            parent_path,
98            prometheus_registry,
99            observer,
100        )
101    }
102}
103
104impl<A> TransactionOrchestrator<A>
105where
106    A: AuthorityAPI + Send + Sync + 'static + Clone,
107    OnsiteReconfigObserver: ReconfigObserver<A>,
108{
109    pub fn new(
110        validators: Arc<AuthorityAggregator<A>>,
111        validator_state: Arc<AuthorityState>,
112        parent_path: &Path,
113        prometheus_registry: &Registry,
114        reconfig_observer: OnsiteReconfigObserver,
115    ) -> Self {
116        let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
117        let notifier = Arc::new(NotifyRead::new());
118        let reconfig_observer = Arc::new(reconfig_observer);
119        let quorum_driver_handler = Arc::new(
120            QuorumDriverHandlerBuilder::new(validators, metrics)
121                .with_notifier(notifier.clone())
122                .with_reconfig_observer(reconfig_observer)
123                .start(),
124        );
125
126        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
127        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
128        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
129            parent_path.join("fullnode_pending_transactions"),
130        ));
131        let pending_tx_log_clone = pending_tx_log.clone();
132        let _local_executor_handle = {
133            spawn_monitored_task!(async move {
134                Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
135            })
136        };
137        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138        Self {
139            quorum_driver_handler,
140            validator_state,
141            _local_executor_handle,
142            pending_tx_log,
143            notifier,
144            metrics,
145        }
146    }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151    A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153    #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
154    fields(
155        tx_digest = ?request.transaction.digest(),
156        tx_type = ?request_type,
157    ),
158    err)]
159    pub async fn execute_transaction_block(
160        &self,
161        request: ExecuteTransactionRequestV1,
162        request_type: ExecuteTransactionRequestType,
163        client_addr: Option<SocketAddr>,
164    ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165    {
166        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168        let (transaction, response) = self
169            .execute_transaction_impl(&epoch_store, request, client_addr)
170            .await?;
171
172        let executed_locally = if matches!(
173            request_type,
174            ExecuteTransactionRequestType::WaitForLocalExecution
175        ) {
176            let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
177                &self.validator_state,
178                &transaction,
179                &self.metrics,
180            )
181            .await
182            .is_ok();
183            add_server_timing("local_execution");
184            executed_locally
185        } else {
186            false
187        };
188
189        let QuorumDriverResponse {
190            effects_cert,
191            events,
192            input_objects,
193            output_objects,
194            auxiliary_data,
195        } = response;
196
197        let response = ExecuteTransactionResponseV1 {
198            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
199            events,
200            input_objects,
201            output_objects,
202            auxiliary_data,
203        };
204
205        Ok((response, executed_locally))
206    }
207
208    // Utilize the handle_certificate_v1 validator api to request input/output
209    // objects
210    #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
211                 fields(tx_digest = ?request.transaction.digest()))]
212    pub async fn execute_transaction_v1(
213        &self,
214        request: ExecuteTransactionRequestV1,
215        client_addr: Option<SocketAddr>,
216    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
217        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
218
219        let QuorumDriverResponse {
220            effects_cert,
221            events,
222            input_objects,
223            output_objects,
224            auxiliary_data,
225        } = self
226            .execute_transaction_impl(&epoch_store, request, client_addr)
227            .await
228            .map(|(_, r)| r)?;
229
230        Ok(ExecuteTransactionResponseV1 {
231            effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
232            events,
233            input_objects,
234            output_objects,
235            auxiliary_data,
236        })
237    }
238
239    // TODO check if tx is already executed on this node.
240    // Note: since EffectsCert is not stored today, we need to gather that from
241    // validators (and maybe store it for caching purposes)
242    #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
243    pub async fn execute_transaction_impl(
244        &self,
245        epoch_store: &AuthorityPerEpochStore,
246        request: ExecuteTransactionRequestV1,
247        client_addr: Option<SocketAddr>,
248    ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
249        let transaction = epoch_store
250            .verify_transaction(request.transaction.clone())
251            .map_err(QuorumDriverError::InvalidUserSignature)?;
252        let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
253        let tx_digest = *transaction.digest();
254        debug!(?tx_digest, "TO Received transaction execution request.");
255
256        let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
257            (
258                self.metrics.request_latency_shared_obj.start_timer(),
259                self.metrics
260                    .wait_for_finality_latency_shared_obj
261                    .start_timer(),
262            )
263        } else {
264            (
265                self.metrics.request_latency_single_writer.start_timer(),
266                self.metrics
267                    .wait_for_finality_latency_single_writer
268                    .start_timer(),
269            )
270        };
271
272        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
273        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
274        wait_for_finality_gauge.inc();
275        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
276            in_flight.dec();
277        });
278
279        let ticket = self
280            .submit(transaction.clone(), request, client_addr)
281            .await
282            .map_err(|e| {
283                warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
284                QuorumDriverError::QuorumDriverInternal(e)
285            })?;
286
287        let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
288            debug!(?tx_digest, "Timeout waiting for transaction finality.");
289            self.metrics.wait_for_finality_timeout.inc();
290            return Err(QuorumDriverError::TimeoutBeforeFinality);
291        };
292        add_server_timing("wait_for_finality");
293
294        drop(_txn_finality_timer);
295        drop(_wait_for_finality_gauge);
296        self.metrics.wait_for_finality_finished.inc();
297
298        match result {
299            Err(err) => {
300                warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
301                Err(QuorumDriverError::QuorumDriverInternal(err))
302            }
303            Ok(Err(err)) => Err(err),
304            Ok(Ok(response)) => {
305                good_response_metrics.inc();
306                Ok((transaction, response))
307            }
308        }
309    }
310
311    /// Submits the transaction to Quorum Driver for execution.
312    /// Returns an awaitable Future.
313    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
314    async fn submit(
315        &self,
316        transaction: VerifiedTransaction,
317        request: ExecuteTransactionRequestV1,
318        client_addr: Option<SocketAddr>,
319    ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
320        let tx_digest = *transaction.digest();
321        let ticket = self.notifier.register_one(&tx_digest);
322        // TODO(william) need to also write client adr to pending tx log below
323        // so that we can re-execute with this client addr if we restart
324        if self
325            .pending_tx_log
326            .write_pending_transaction_maybe(&transaction)
327            .await?
328        {
329            debug!(?tx_digest, "no pending request in flight, submitting.");
330            self.quorum_driver()
331                .submit_transaction_no_ticket(request.clone(), client_addr)
332                .await?;
333        }
334        // It's possible that the transaction effects is already stored in DB at this
335        // point. So we also subscribe to that. If we hear from `effects_await`
336        // first, it means the ticket misses the previous notification, and we
337        // want to ask quorum driver to form a certificate for us again, to
338        // serve this request.
339        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
340        let qd = self.clone_quorum_driver();
341        Ok(async move {
342            let digests = [tx_digest];
343            let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
344            // let-and-return necessary to satisfy borrow checker.
345            let res = match select(ticket, effects_await.boxed()).await {
346                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
347                Either::Right((_, unfinished_quorum_driver_task)) => {
348                    debug!(
349                        ?tx_digest,
350                        "Effects are available in DB, use quorum driver to get a certificate"
351                    );
352                    qd.submit_transaction_no_ticket(request, client_addr)
353                        .await?;
354                    Ok(unfinished_quorum_driver_task.await)
355                }
356            };
357            res
358        })
359    }
360
361    #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
362    async fn wait_for_finalized_tx_executed_locally_with_timeout(
363        validator_state: &Arc<AuthorityState>,
364        transaction: &VerifiedTransaction,
365        metrics: &TransactionOrchestratorMetrics,
366    ) -> IotaResult {
367        let tx_digest = *transaction.digest();
368        metrics.local_execution_in_flight.inc();
369        let _metrics_guard =
370            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
371                in_flight.dec();
372            });
373
374        let _guard = if transaction.contains_shared_object() {
375            metrics.local_execution_latency_shared_obj.start_timer()
376        } else {
377            metrics.local_execution_latency_single_writer.start_timer()
378        };
379        debug!(
380            ?tx_digest,
381            "Waiting for finalized tx to be executed locally."
382        );
383        match timeout(
384            LOCAL_EXECUTION_TIMEOUT,
385            validator_state
386                .get_transaction_cache_reader()
387                .try_notify_read_executed_effects_digests(&[tx_digest]),
388        )
389        .instrument(trace_span!("local_execution"))
390        .await
391        {
392            Err(_elapsed) => {
393                debug!(
394                    ?tx_digest,
395                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
396                    LOCAL_EXECUTION_TIMEOUT
397                );
398                metrics.local_execution_timeout.inc();
399                Err(IotaError::Timeout)
400            }
401            Ok(Err(err)) => {
402                debug!(
403                    ?tx_digest,
404                    "Waiting for finalized tx to be executed locally failed with error: {:?}", err
405                );
406                metrics.local_execution_failure.inc();
407                Err(IotaError::TransactionOrchestratorLocalExecution {
408                    error: err.to_string(),
409                })
410            }
411            Ok(Ok(_)) => {
412                metrics.local_execution_success.inc();
413                Ok(())
414            }
415        }
416    }
417
418    // TODO: Potentially cleanup this function and pending transaction log.
419    async fn loop_pending_transaction_log(
420        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
421        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
422    ) {
423        loop {
424            match effects_receiver.recv().await {
425                Ok(Ok((transaction, ..))) => {
426                    let tx_digest = transaction.digest();
427                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
428                        error!(
429                            ?tx_digest,
430                            "Failed to finish transaction in pending transaction log: {err}"
431                        );
432                    }
433                }
434                Ok(Err((tx_digest, _err))) => {
435                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
436                        error!(
437                            ?tx_digest,
438                            "Failed to finish transaction in pending transaction log: {err}"
439                        );
440                    }
441                }
442                Err(RecvError::Closed) => {
443                    error!("Sender of effects subscriber queue has been dropped!");
444                    return;
445                }
446                Err(RecvError::Lagged(skipped_count)) => {
447                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
448                }
449            }
450        }
451    }
452
453    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
454        &self.quorum_driver_handler
455    }
456
457    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
458        self.quorum_driver_handler.clone()
459    }
460
461    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
462        self.quorum_driver().authority_aggregator().load_full()
463    }
464
465    pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
466        self.quorum_driver_handler.subscribe_to_effects()
467    }
468
469    fn update_metrics(
470        &'_ self,
471        transaction: &VerifiedTransaction,
472    ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
473        let (in_flight, good_response) = if transaction.contains_shared_object() {
474            self.metrics.total_req_received_shared_object.inc();
475            (
476                self.metrics.req_in_flight_shared_object.clone(),
477                &self.metrics.good_response_shared_object,
478            )
479        } else {
480            self.metrics.total_req_received_single_writer.inc();
481            (
482                self.metrics.req_in_flight_single_writer.clone(),
483                &self.metrics.good_response_single_writer,
484            )
485        };
486        in_flight.inc();
487        (
488            scopeguard::guard(in_flight, |in_flight| {
489                in_flight.dec();
490            }),
491            good_response,
492        )
493    }
494
495    fn schedule_txes_in_log(
496        pending_tx_log: Arc<WritePathPendingTransactionLog>,
497        quorum_driver: Arc<QuorumDriverHandler<A>>,
498    ) {
499        spawn_logged_monitored_task!(async move {
500            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
501                info!("Skipping loading pending transactions from pending_tx_log.");
502                return;
503            }
504            let pending_txes = pending_tx_log
505                .load_all_pending_transactions()
506                .expect("failed to load all pending transactions");
507            info!(
508                "Recovering {} pending transactions from pending_tx_log.",
509                pending_txes.len()
510            );
511            for (i, tx) in pending_txes.into_iter().enumerate() {
512                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
513                // requires a migration.
514                let tx = tx.into_inner();
515                let tx_digest = *tx.digest();
516                // It's not impossible we fail to enqueue a task but that's not the end of
517                // world. TODO(william) correctly extract client_addr from logs
518                if let Err(err) = quorum_driver
519                    .submit_transaction_no_ticket(
520                        ExecuteTransactionRequestV1 {
521                            transaction: tx,
522                            include_events: true,
523                            include_input_objects: false,
524                            include_output_objects: false,
525                            include_auxiliary_data: false,
526                        },
527                        None,
528                    )
529                    .await
530                {
531                    warn!(
532                        ?tx_digest,
533                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
534                    );
535                } else {
536                    debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
537                    if (i + 1) % 1000 == 0 {
538                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
539                    }
540                }
541            }
542            // Transactions will be cleaned up in
543            // loop_execute_finalized_tx_locally() after they
544            // produce effects.
545        });
546    }
547
548    pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
549        self.pending_tx_log.load_all_pending_transactions()
550    }
551}
552
553/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
554#[derive(Clone)]
555pub struct TransactionOrchestratorMetrics {
556    total_req_received_single_writer: GenericCounter<AtomicU64>,
557    total_req_received_shared_object: GenericCounter<AtomicU64>,
558
559    good_response_single_writer: GenericCounter<AtomicU64>,
560    good_response_shared_object: GenericCounter<AtomicU64>,
561
562    req_in_flight_single_writer: GenericGauge<AtomicI64>,
563    req_in_flight_shared_object: GenericGauge<AtomicI64>,
564
565    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
566    wait_for_finality_finished: GenericCounter<AtomicU64>,
567    wait_for_finality_timeout: GenericCounter<AtomicU64>,
568
569    local_execution_in_flight: GenericGauge<AtomicI64>,
570    local_execution_success: GenericCounter<AtomicU64>,
571    local_execution_timeout: GenericCounter<AtomicU64>,
572    local_execution_failure: GenericCounter<AtomicU64>,
573
574    request_latency_single_writer: Histogram,
575    request_latency_shared_obj: Histogram,
576    wait_for_finality_latency_single_writer: Histogram,
577    wait_for_finality_latency_shared_obj: Histogram,
578    local_execution_latency_single_writer: Histogram,
579    local_execution_latency_shared_obj: Histogram,
580}
581
582// Note that labeled-metrics are stored upfront individually
583// to mitigate the perf hit by MetricsVec.
584// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
585impl TransactionOrchestratorMetrics {
586    pub fn new(registry: &Registry) -> Self {
587        let total_req_received = register_int_counter_vec_with_registry!(
588            "tx_orchestrator_total_req_received",
589            "Total number of executions request Transaction Orchestrator receives, group by tx type",
590            &["tx_type"],
591            registry
592        )
593        .unwrap();
594
595        let total_req_received_single_writer =
596            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
597        let total_req_received_shared_object =
598            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
599
600        let good_response = register_int_counter_vec_with_registry!(
601            "tx_orchestrator_good_response",
602            "Total number of good responses Transaction Orchestrator generates, group by tx type",
603            &["tx_type"],
604            registry
605        )
606        .unwrap();
607
608        let good_response_single_writer =
609            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
610        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
611
612        let req_in_flight = register_int_gauge_vec_with_registry!(
613            "tx_orchestrator_req_in_flight",
614            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
615            &["tx_type"],
616            registry
617        )
618        .unwrap();
619
620        let req_in_flight_single_writer =
621            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
622        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
623
624        let request_latency = register_histogram_vec_with_registry!(
625            "tx_orchestrator_request_latency",
626            "Time spent in processing one Transaction Orchestrator request",
627            &["tx_type"],
628            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
629            registry,
630        )
631        .unwrap();
632        let wait_for_finality_latency = register_histogram_vec_with_registry!(
633            "tx_orchestrator_wait_for_finality_latency",
634            "Time spent in waiting for one Transaction Orchestrator request gets finalized",
635            &["tx_type"],
636            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
637            registry,
638        )
639        .unwrap();
640        let local_execution_latency = register_histogram_vec_with_registry!(
641            "tx_orchestrator_local_execution_latency",
642            "Time spent in waiting for one Transaction Orchestrator gets locally executed",
643            &["tx_type"],
644            iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
645            registry,
646        )
647        .unwrap();
648
649        Self {
650            total_req_received_single_writer,
651            total_req_received_shared_object,
652            good_response_single_writer,
653            good_response_shared_object,
654            req_in_flight_single_writer,
655            req_in_flight_shared_object,
656            wait_for_finality_in_flight: register_int_gauge_with_registry!(
657                "tx_orchestrator_wait_for_finality_in_flight",
658                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
659                registry,
660            )
661            .unwrap(),
662            wait_for_finality_finished: register_int_counter_with_registry!(
663                "tx_orchestrator_wait_for_finality_finished",
664                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
665                registry,
666            )
667            .unwrap(),
668            wait_for_finality_timeout: register_int_counter_with_registry!(
669                "tx_orchestrator_wait_for_finality_timeout",
670                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
671                registry,
672            )
673            .unwrap(),
674            local_execution_in_flight: register_int_gauge_with_registry!(
675                "tx_orchestrator_local_execution_in_flight",
676                "Number of local execution txns in flights Transaction Orchestrator handles",
677                registry,
678            )
679            .unwrap(),
680            local_execution_success: register_int_counter_with_registry!(
681                "tx_orchestrator_local_execution_success",
682                "Total number of successful local execution txns Transaction Orchestrator handles",
683                registry,
684            )
685            .unwrap(),
686            local_execution_timeout: register_int_counter_with_registry!(
687                "tx_orchestrator_local_execution_timeout",
688                "Total number of timed-out local execution txns Transaction Orchestrator handles",
689                registry,
690            )
691            .unwrap(),
692            local_execution_failure: register_int_counter_with_registry!(
693                "tx_orchestrator_local_execution_failure",
694                "Total number of failed local execution txns Transaction Orchestrator handles",
695                registry,
696            )
697            .unwrap(),
698            request_latency_single_writer: request_latency
699                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
700            request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
701            wait_for_finality_latency_single_writer: wait_for_finality_latency
702                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
703            wait_for_finality_latency_shared_obj: wait_for_finality_latency
704                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
705            local_execution_latency_single_writer: local_execution_latency
706                .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
707            local_execution_latency_shared_obj: local_execution_latency
708                .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
709        }
710    }
711
712    pub fn new_for_tests() -> Self {
713        let registry = Registry::new();
714        Self::new(&registry)
715    }
716}
717
718#[async_trait::async_trait]
719impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
720where
721    A: AuthorityAPI + Send + Sync + 'static + Clone,
722{
723    async fn execute_transaction(
724        &self,
725        request: ExecuteTransactionRequestV1,
726        client_addr: Option<std::net::SocketAddr>,
727    ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
728        self.execute_transaction_v1(request, client_addr).await
729    }
730
731    fn simulate_transaction(
732        &self,
733        transaction: TransactionData,
734        checks: VmChecks,
735    ) -> Result<SimulateTransactionResult, IotaError> {
736        self.validator_state
737            .simulate_transaction(transaction, checks)
738    }
739
740    /// Wait for the given transactions to be included in a checkpoint.
741    ///
742    /// Returns a mapping from transaction digest to
743    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
744    /// On timeout, returns partial results for any transactions that were
745    /// already checkpointed.
746    async fn wait_for_checkpoint_inclusion(
747        &self,
748        digests: &[TransactionDigest],
749        timeout: Duration,
750    ) -> Result<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>, IotaError> {
751        self.validator_state
752            .wait_for_checkpoint_inclusion(digests, timeout)
753            .await
754    }
755}