1use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12 FutureExt,
13 future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18 spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22 base_types::TransactionDigest,
23 error::{IotaError, IotaResult},
24 iota_system_state::IotaSystemState,
25 quorum_driver_types::{
26 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
27 FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
28 QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
29 },
30 transaction::{TransactionData, VerifiedTransaction},
31 transaction_executor::{SimulateTransactionResult, VmChecks},
32};
33use prometheus::{
34 Histogram, Registry,
35 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
36 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
37 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
38 register_int_gauge_with_registry,
39};
40use tokio::{
41 sync::broadcast::{Receiver, error::RecvError},
42 task::JoinHandle,
43 time::timeout,
44};
45use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
46
47use crate::{
48 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
49 authority_aggregator::AuthorityAggregator,
50 authority_client::{AuthorityAPI, NetworkAuthorityClient},
51 quorum_driver::{
52 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
53 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
54 },
55};
56
57const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
60
61const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
62
63pub struct TransactionOrchestrator<A: Clone> {
67 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
68 validator_state: Arc<AuthorityState>,
69 _local_executor_handle: JoinHandle<()>,
70 pending_tx_log: Arc<WritePathPendingTransactionLog>,
71 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
72 metrics: Arc<TransactionOrchestratorMetrics>,
73}
74
75impl TransactionOrchestrator<NetworkAuthorityClient> {
76 pub fn new_with_auth_aggregator(
77 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
78 validator_state: Arc<AuthorityState>,
79 reconfig_channel: Receiver<IotaSystemState>,
80 parent_path: &Path,
81 prometheus_registry: &Registry,
82 ) -> Self {
83 let observer = OnsiteReconfigObserver::new(
84 reconfig_channel,
85 validator_state.get_object_cache_reader().clone(),
86 validator_state.clone_committee_store(),
87 validators.safe_client_metrics_base.clone(),
88 validators.metrics.deref().clone(),
89 );
90
91 TransactionOrchestrator::new(
92 validators,
93 validator_state,
94 parent_path,
95 prometheus_registry,
96 observer,
97 )
98 }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103 A: AuthorityAPI + Send + Sync + 'static + Clone,
104 OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106 pub fn new(
107 validators: Arc<AuthorityAggregator<A>>,
108 validator_state: Arc<AuthorityState>,
109 parent_path: &Path,
110 prometheus_registry: &Registry,
111 reconfig_observer: OnsiteReconfigObserver,
112 ) -> Self {
113 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
114 let notifier = Arc::new(NotifyRead::new());
115 let reconfig_observer = Arc::new(reconfig_observer);
116 let quorum_driver_handler = Arc::new(
117 QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
118 .with_notifier(notifier.clone())
119 .with_reconfig_observer(reconfig_observer.clone())
120 .start(),
121 );
122
123 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
124 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
125 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
126 parent_path.join("fullnode_pending_transactions"),
127 ));
128 let pending_tx_log_clone = pending_tx_log.clone();
129 let _local_executor_handle = {
130 spawn_monitored_task!(async move {
131 Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
132 })
133 };
134 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
135 Self {
136 quorum_driver_handler,
137 validator_state,
138 _local_executor_handle,
139 pending_tx_log,
140 notifier,
141 metrics,
142 }
143 }
144}
145
146impl<A> TransactionOrchestrator<A>
147where
148 A: AuthorityAPI + Send + Sync + 'static + Clone,
149{
150 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
151 fields(
152 tx_digest = ?request.transaction.digest(),
153 tx_type = ?request_type,
154 ),
155 err)]
156 pub async fn execute_transaction_block(
157 &self,
158 request: ExecuteTransactionRequestV1,
159 request_type: ExecuteTransactionRequestType,
160 client_addr: Option<SocketAddr>,
161 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
162 {
163 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
164
165 let (transaction, response) = self
166 .execute_transaction_impl(&epoch_store, request, client_addr)
167 .await?;
168
169 let executed_locally = if matches!(
170 request_type,
171 ExecuteTransactionRequestType::WaitForLocalExecution
172 ) {
173 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
174 &self.validator_state,
175 &transaction,
176 &self.metrics,
177 )
178 .await
179 .is_ok();
180 add_server_timing("local_execution");
181 executed_locally
182 } else {
183 false
184 };
185
186 let QuorumDriverResponse {
187 effects_cert,
188 events,
189 input_objects,
190 output_objects,
191 auxiliary_data,
192 } = response;
193
194 let response = ExecuteTransactionResponseV1 {
195 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
196 events,
197 input_objects,
198 output_objects,
199 auxiliary_data,
200 };
201
202 Ok((response, executed_locally))
203 }
204
205 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
208 fields(tx_digest = ?request.transaction.digest()))]
209 pub async fn execute_transaction_v1(
210 &self,
211 request: ExecuteTransactionRequestV1,
212 client_addr: Option<SocketAddr>,
213 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
214 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
215
216 let QuorumDriverResponse {
217 effects_cert,
218 events,
219 input_objects,
220 output_objects,
221 auxiliary_data,
222 } = self
223 .execute_transaction_impl(&epoch_store, request, client_addr)
224 .await
225 .map(|(_, r)| r)?;
226
227 Ok(ExecuteTransactionResponseV1 {
228 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 })
234 }
235
236 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
240 pub async fn execute_transaction_impl(
241 &self,
242 epoch_store: &AuthorityPerEpochStore,
243 request: ExecuteTransactionRequestV1,
244 client_addr: Option<SocketAddr>,
245 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
246 let transaction = epoch_store
247 .verify_transaction(request.transaction.clone())
248 .map_err(QuorumDriverError::InvalidUserSignature)?;
249 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
250 let tx_digest = *transaction.digest();
251 debug!(?tx_digest, "TO Received transaction execution request.");
252
253 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
254 (
255 self.metrics.request_latency_shared_obj.start_timer(),
256 self.metrics
257 .wait_for_finality_latency_shared_obj
258 .start_timer(),
259 )
260 } else {
261 (
262 self.metrics.request_latency_single_writer.start_timer(),
263 self.metrics
264 .wait_for_finality_latency_single_writer
265 .start_timer(),
266 )
267 };
268
269 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
271 wait_for_finality_gauge.inc();
272 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
273 in_flight.dec();
274 });
275
276 let ticket = self
277 .submit(transaction.clone(), request, client_addr)
278 .await
279 .map_err(|e| {
280 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
281 QuorumDriverError::QuorumDriverInternal(e)
282 })?;
283
284 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
285 debug!(?tx_digest, "Timeout waiting for transaction finality.");
286 self.metrics.wait_for_finality_timeout.inc();
287 return Err(QuorumDriverError::TimeoutBeforeFinality);
288 };
289 add_server_timing("wait_for_finality");
290
291 drop(_txn_finality_timer);
292 drop(_wait_for_finality_gauge);
293 self.metrics.wait_for_finality_finished.inc();
294
295 match result {
296 Err(err) => {
297 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
298 Err(QuorumDriverError::QuorumDriverInternal(err))
299 }
300 Ok(Err(err)) => Err(err),
301 Ok(Ok(response)) => {
302 good_response_metrics.inc();
303 Ok((transaction, response))
304 }
305 }
306 }
307
308 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
311 async fn submit(
312 &self,
313 transaction: VerifiedTransaction,
314 request: ExecuteTransactionRequestV1,
315 client_addr: Option<SocketAddr>,
316 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
317 let tx_digest = *transaction.digest();
318 let ticket = self.notifier.register_one(&tx_digest);
319 if self
322 .pending_tx_log
323 .write_pending_transaction_maybe(&transaction)
324 .await?
325 {
326 debug!(?tx_digest, "no pending request in flight, submitting.");
327 self.quorum_driver()
328 .submit_transaction_no_ticket(request.clone(), client_addr)
329 .await?;
330 }
331 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
337 let qd = self.clone_quorum_driver();
338 Ok(async move {
339 let digests = [tx_digest];
340 let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
341 let res = match select(ticket, effects_await.boxed()).await {
343 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
344 Either::Right((_, unfinished_quorum_driver_task)) => {
345 debug!(
346 ?tx_digest,
347 "Effects are available in DB, use quorum driver to get a certificate"
348 );
349 qd.submit_transaction_no_ticket(request, client_addr)
350 .await?;
351 Ok(unfinished_quorum_driver_task.await)
352 }
353 };
354 res
355 })
356 }
357
358 #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
359 async fn wait_for_finalized_tx_executed_locally_with_timeout(
360 validator_state: &Arc<AuthorityState>,
361 transaction: &VerifiedTransaction,
362 metrics: &TransactionOrchestratorMetrics,
363 ) -> IotaResult {
364 let tx_digest = *transaction.digest();
365 metrics.local_execution_in_flight.inc();
366 let _metrics_guard =
367 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
368 in_flight.dec();
369 });
370
371 let _guard = if transaction.contains_shared_object() {
372 metrics.local_execution_latency_shared_obj.start_timer()
373 } else {
374 metrics.local_execution_latency_single_writer.start_timer()
375 };
376 debug!(
377 ?tx_digest,
378 "Waiting for finalized tx to be executed locally."
379 );
380 match timeout(
381 LOCAL_EXECUTION_TIMEOUT,
382 validator_state
383 .get_transaction_cache_reader()
384 .try_notify_read_executed_effects_digests(&[tx_digest]),
385 )
386 .instrument(trace_span!("local_execution"))
387 .await
388 {
389 Err(_elapsed) => {
390 debug!(
391 ?tx_digest,
392 "Waiting for finalized tx to be executed locally timed out within {:?}.",
393 LOCAL_EXECUTION_TIMEOUT
394 );
395 metrics.local_execution_timeout.inc();
396 Err(IotaError::Timeout)
397 }
398 Ok(Err(err)) => {
399 debug!(
400 ?tx_digest,
401 "Waiting for finalized tx to be executed locally failed with error: {:?}", err
402 );
403 metrics.local_execution_failure.inc();
404 Err(IotaError::TransactionOrchestratorLocalExecution {
405 error: err.to_string(),
406 })
407 }
408 Ok(Ok(_)) => {
409 metrics.local_execution_success.inc();
410 Ok(())
411 }
412 }
413 }
414
415 async fn loop_pending_transaction_log(
417 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
418 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
419 ) {
420 loop {
421 match effects_receiver.recv().await {
422 Ok(Ok((transaction, ..))) => {
423 let tx_digest = transaction.digest();
424 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
425 error!(
426 ?tx_digest,
427 "Failed to finish transaction in pending transaction log: {err}"
428 );
429 }
430 }
431 Ok(Err((tx_digest, _err))) => {
432 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
433 error!(
434 ?tx_digest,
435 "Failed to finish transaction in pending transaction log: {err}"
436 );
437 }
438 }
439 Err(RecvError::Closed) => {
440 error!("Sender of effects subscriber queue has been dropped!");
441 return;
442 }
443 Err(RecvError::Lagged(skipped_count)) => {
444 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
445 }
446 }
447 }
448 }
449
450 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
451 &self.quorum_driver_handler
452 }
453
454 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
455 self.quorum_driver_handler.clone()
456 }
457
458 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
459 self.quorum_driver().authority_aggregator().load_full()
460 }
461
462 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
463 self.quorum_driver_handler.subscribe_to_effects()
464 }
465
466 fn update_metrics(
467 &'_ self,
468 transaction: &VerifiedTransaction,
469 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
470 let (in_flight, good_response) = if transaction.contains_shared_object() {
471 self.metrics.total_req_received_shared_object.inc();
472 (
473 self.metrics.req_in_flight_shared_object.clone(),
474 &self.metrics.good_response_shared_object,
475 )
476 } else {
477 self.metrics.total_req_received_single_writer.inc();
478 (
479 self.metrics.req_in_flight_single_writer.clone(),
480 &self.metrics.good_response_single_writer,
481 )
482 };
483 in_flight.inc();
484 (
485 scopeguard::guard(in_flight, |in_flight| {
486 in_flight.dec();
487 }),
488 good_response,
489 )
490 }
491
492 fn schedule_txes_in_log(
493 pending_tx_log: Arc<WritePathPendingTransactionLog>,
494 quorum_driver: Arc<QuorumDriverHandler<A>>,
495 ) {
496 spawn_logged_monitored_task!(async move {
497 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
498 info!("Skipping loading pending transactions from pending_tx_log.");
499 return;
500 }
501 let pending_txes = pending_tx_log
502 .load_all_pending_transactions()
503 .expect("failed to load all pending transactions");
504 info!(
505 "Recovering {} pending transactions from pending_tx_log.",
506 pending_txes.len()
507 );
508 for (i, tx) in pending_txes.into_iter().enumerate() {
509 let tx = tx.into_inner();
512 let tx_digest = *tx.digest();
513 if let Err(err) = quorum_driver
516 .submit_transaction_no_ticket(
517 ExecuteTransactionRequestV1 {
518 transaction: tx,
519 include_events: true,
520 include_input_objects: false,
521 include_output_objects: false,
522 include_auxiliary_data: false,
523 },
524 None,
525 )
526 .await
527 {
528 warn!(
529 ?tx_digest,
530 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
531 );
532 } else {
533 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
534 if (i + 1) % 1000 == 0 {
535 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
536 }
537 }
538 }
539 });
543 }
544
545 pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
546 self.pending_tx_log.load_all_pending_transactions()
547 }
548}
549
550#[derive(Clone)]
552pub struct TransactionOrchestratorMetrics {
553 total_req_received_single_writer: GenericCounter<AtomicU64>,
554 total_req_received_shared_object: GenericCounter<AtomicU64>,
555
556 good_response_single_writer: GenericCounter<AtomicU64>,
557 good_response_shared_object: GenericCounter<AtomicU64>,
558
559 req_in_flight_single_writer: GenericGauge<AtomicI64>,
560 req_in_flight_shared_object: GenericGauge<AtomicI64>,
561
562 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
563 wait_for_finality_finished: GenericCounter<AtomicU64>,
564 wait_for_finality_timeout: GenericCounter<AtomicU64>,
565
566 local_execution_in_flight: GenericGauge<AtomicI64>,
567 local_execution_success: GenericCounter<AtomicU64>,
568 local_execution_timeout: GenericCounter<AtomicU64>,
569 local_execution_failure: GenericCounter<AtomicU64>,
570
571 request_latency_single_writer: Histogram,
572 request_latency_shared_obj: Histogram,
573 wait_for_finality_latency_single_writer: Histogram,
574 wait_for_finality_latency_shared_obj: Histogram,
575 local_execution_latency_single_writer: Histogram,
576 local_execution_latency_shared_obj: Histogram,
577}
578
579impl TransactionOrchestratorMetrics {
583 pub fn new(registry: &Registry) -> Self {
584 let total_req_received = register_int_counter_vec_with_registry!(
585 "tx_orchestrator_total_req_received",
586 "Total number of executions request Transaction Orchestrator receives, group by tx type",
587 &["tx_type"],
588 registry
589 )
590 .unwrap();
591
592 let total_req_received_single_writer =
593 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
594 let total_req_received_shared_object =
595 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
596
597 let good_response = register_int_counter_vec_with_registry!(
598 "tx_orchestrator_good_response",
599 "Total number of good responses Transaction Orchestrator generates, group by tx type",
600 &["tx_type"],
601 registry
602 )
603 .unwrap();
604
605 let good_response_single_writer =
606 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
607 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
608
609 let req_in_flight = register_int_gauge_vec_with_registry!(
610 "tx_orchestrator_req_in_flight",
611 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
612 &["tx_type"],
613 registry
614 )
615 .unwrap();
616
617 let req_in_flight_single_writer =
618 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
619 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
620
621 let request_latency = register_histogram_vec_with_registry!(
622 "tx_orchestrator_request_latency",
623 "Time spent in processing one Transaction Orchestrator request",
624 &["tx_type"],
625 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
626 registry,
627 )
628 .unwrap();
629 let wait_for_finality_latency = register_histogram_vec_with_registry!(
630 "tx_orchestrator_wait_for_finality_latency",
631 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
632 &["tx_type"],
633 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
634 registry,
635 )
636 .unwrap();
637 let local_execution_latency = register_histogram_vec_with_registry!(
638 "tx_orchestrator_local_execution_latency",
639 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
640 &["tx_type"],
641 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
642 registry,
643 )
644 .unwrap();
645
646 Self {
647 total_req_received_single_writer,
648 total_req_received_shared_object,
649 good_response_single_writer,
650 good_response_shared_object,
651 req_in_flight_single_writer,
652 req_in_flight_shared_object,
653 wait_for_finality_in_flight: register_int_gauge_with_registry!(
654 "tx_orchestrator_wait_for_finality_in_flight",
655 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
656 registry,
657 )
658 .unwrap(),
659 wait_for_finality_finished: register_int_counter_with_registry!(
660 "tx_orchestrator_wait_for_finality_finished",
661 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
662 registry,
663 )
664 .unwrap(),
665 wait_for_finality_timeout: register_int_counter_with_registry!(
666 "tx_orchestrator_wait_for_finality_timeout",
667 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
668 registry,
669 )
670 .unwrap(),
671 local_execution_in_flight: register_int_gauge_with_registry!(
672 "tx_orchestrator_local_execution_in_flight",
673 "Number of local execution txns in flights Transaction Orchestrator handles",
674 registry,
675 )
676 .unwrap(),
677 local_execution_success: register_int_counter_with_registry!(
678 "tx_orchestrator_local_execution_success",
679 "Total number of successful local execution txns Transaction Orchestrator handles",
680 registry,
681 )
682 .unwrap(),
683 local_execution_timeout: register_int_counter_with_registry!(
684 "tx_orchestrator_local_execution_timeout",
685 "Total number of timed-out local execution txns Transaction Orchestrator handles",
686 registry,
687 )
688 .unwrap(),
689 local_execution_failure: register_int_counter_with_registry!(
690 "tx_orchestrator_local_execution_failure",
691 "Total number of failed local execution txns Transaction Orchestrator handles",
692 registry,
693 )
694 .unwrap(),
695 request_latency_single_writer: request_latency
696 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
697 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
698 wait_for_finality_latency_single_writer: wait_for_finality_latency
699 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
700 wait_for_finality_latency_shared_obj: wait_for_finality_latency
701 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
702 local_execution_latency_single_writer: local_execution_latency
703 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
704 local_execution_latency_shared_obj: local_execution_latency
705 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
706 }
707 }
708
709 pub fn new_for_tests() -> Self {
710 let registry = Registry::new();
711 Self::new(®istry)
712 }
713}
714
715#[async_trait::async_trait]
716impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
717where
718 A: AuthorityAPI + Send + Sync + 'static + Clone,
719{
720 async fn execute_transaction(
721 &self,
722 request: ExecuteTransactionRequestV1,
723 client_addr: Option<std::net::SocketAddr>,
724 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
725 self.execute_transaction_v1(request, client_addr).await
726 }
727
728 fn simulate_transaction(
729 &self,
730 transaction: TransactionData,
731 checks: VmChecks,
732 ) -> Result<SimulateTransactionResult, IotaError> {
733 self.validator_state
734 .simulate_transaction(transaction, checks)
735 }
736}