1use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12 FutureExt,
13 future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18 histogram::{Histogram, HistogramVec},
19 spawn_logged_monitored_task, spawn_monitored_task,
20};
21use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
22use iota_types::{
23 base_types::TransactionDigest,
24 effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects},
25 error::{IotaError, IotaResult},
26 executable_transaction::VerifiedExecutableTransaction,
27 iota_system_state::IotaSystemState,
28 quorum_driver_types::{
29 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
30 FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
31 QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
32 },
33 transaction::VerifiedTransaction,
34};
35use prometheus::{
36 Registry,
37 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
38 register_int_counter_vec_with_registry, register_int_counter_with_registry,
39 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
40};
41use tokio::{
42 sync::broadcast::{Receiver, error::RecvError},
43 task::JoinHandle,
44 time::timeout,
45};
46use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
47
48use crate::{
49 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
50 authority_aggregator::AuthorityAggregator,
51 authority_client::{AuthorityAPI, NetworkAuthorityClient},
52 quorum_driver::{
53 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
54 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
55 },
56};
57
58const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
61
62const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
63
64pub struct TransactionOrchestrator<A: Clone> {
68 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
69 validator_state: Arc<AuthorityState>,
70 _local_executor_handle: JoinHandle<()>,
71 pending_tx_log: Arc<WritePathPendingTransactionLog>,
72 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
73 metrics: Arc<TransactionOrchestratorMetrics>,
74}
75
76impl TransactionOrchestrator<NetworkAuthorityClient> {
77 pub fn new_with_auth_aggregator(
78 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
79 validator_state: Arc<AuthorityState>,
80 reconfig_channel: Receiver<IotaSystemState>,
81 parent_path: &Path,
82 prometheus_registry: &Registry,
83 ) -> Self {
84 let observer = OnsiteReconfigObserver::new(
85 reconfig_channel,
86 validator_state.get_object_cache_reader().clone(),
87 validator_state.clone_committee_store(),
88 validators.safe_client_metrics_base.clone(),
89 validators.metrics.deref().clone(),
90 );
91
92 TransactionOrchestrator::new(
93 validators,
94 validator_state,
95 parent_path,
96 prometheus_registry,
97 observer,
98 )
99 }
100}
101
102impl<A> TransactionOrchestrator<A>
103where
104 A: AuthorityAPI + Send + Sync + 'static + Clone,
105 OnsiteReconfigObserver: ReconfigObserver<A>,
106{
107 pub fn new(
108 validators: Arc<AuthorityAggregator<A>>,
109 validator_state: Arc<AuthorityState>,
110 parent_path: &Path,
111 prometheus_registry: &Registry,
112 reconfig_observer: OnsiteReconfigObserver,
113 ) -> Self {
114 let notifier = Arc::new(NotifyRead::new());
115 let quorum_driver_handler = Arc::new(
116 QuorumDriverHandlerBuilder::new(
117 validators,
118 Arc::new(QuorumDriverMetrics::new(prometheus_registry)),
119 )
120 .with_notifier(notifier.clone())
121 .with_reconfig_observer(Arc::new(reconfig_observer))
122 .start(),
123 );
124
125 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
126 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
127 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
128 parent_path.join("fullnode_pending_transactions"),
129 ));
130 let pending_tx_log_clone = pending_tx_log.clone();
131 let _local_executor_handle = {
132 spawn_monitored_task!(async move {
133 Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
134 .await;
135 })
136 };
137 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138 Self {
139 quorum_driver_handler,
140 validator_state,
141 _local_executor_handle,
142 pending_tx_log,
143 notifier,
144 metrics,
145 }
146 }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151 A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "debug", skip_all,
154 fields(
155 tx_digest = ?request.transaction.digest(),
156 tx_type = ?request_type,
157 ),
158 err)]
159 pub async fn execute_transaction_block(
160 &self,
161 request: ExecuteTransactionRequestV1,
162 request_type: ExecuteTransactionRequestType,
163 client_addr: Option<SocketAddr>,
164 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165 {
166 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168 let (transaction, response) = self
169 .execute_transaction_impl(&epoch_store, request, client_addr)
170 .await?;
171
172 let executed_locally = if matches!(
173 request_type,
174 ExecuteTransactionRequestType::WaitForLocalExecution
175 ) {
176 let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
177 transaction,
178 response.effects_cert.executed_epoch(),
179 );
180 let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
181 &self.validator_state,
182 &epoch_store,
183 &executable_tx,
184 &response.effects_cert,
185 &self.metrics,
186 )
187 .await
188 .is_ok();
189 add_server_timing("local_execution");
190 executed_locally
191 } else {
192 false
193 };
194
195 let QuorumDriverResponse {
196 effects_cert,
197 events,
198 input_objects,
199 output_objects,
200 auxiliary_data,
201 } = response;
202
203 let response = ExecuteTransactionResponseV1 {
204 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
205 events,
206 input_objects,
207 output_objects,
208 auxiliary_data,
209 };
210
211 Ok((response, executed_locally))
212 }
213
214 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
217 fields(tx_digest = ?request.transaction.digest()))]
218 pub async fn execute_transaction_v1(
219 &self,
220 request: ExecuteTransactionRequestV1,
221 client_addr: Option<SocketAddr>,
222 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
223 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
224
225 let QuorumDriverResponse {
226 effects_cert,
227 events,
228 input_objects,
229 output_objects,
230 auxiliary_data,
231 } = self
232 .execute_transaction_impl(&epoch_store, request, client_addr)
233 .await
234 .map(|(_, r)| r)?;
235
236 Ok(ExecuteTransactionResponseV1 {
237 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
238 events,
239 input_objects,
240 output_objects,
241 auxiliary_data,
242 })
243 }
244
245 pub async fn execute_transaction_impl(
249 &self,
250 epoch_store: &AuthorityPerEpochStore,
251 request: ExecuteTransactionRequestV1,
252 client_addr: Option<SocketAddr>,
253 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
254 let transaction = epoch_store
255 .verify_transaction(request.transaction.clone())
256 .map_err(QuorumDriverError::InvalidUserSignature)?;
257 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
258 let tx_digest = *transaction.digest();
259 debug!(?tx_digest, "TO Received transaction execution request.");
260
261 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
262 (
263 self.metrics.request_latency_shared_obj.start_timer(),
264 self.metrics
265 .wait_for_finality_latency_shared_obj
266 .start_timer(),
267 )
268 } else {
269 (
270 self.metrics.request_latency_single_writer.start_timer(),
271 self.metrics
272 .wait_for_finality_latency_single_writer
273 .start_timer(),
274 )
275 };
276
277 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
279 wait_for_finality_gauge.inc();
280 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
281 in_flight.dec();
282 });
283
284 let ticket = self
285 .submit(transaction.clone(), request, client_addr)
286 .await
287 .map_err(|e| {
288 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
289 QuorumDriverError::QuorumDriverInternal(e)
290 })?;
291
292 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
293 debug!(?tx_digest, "Timeout waiting for transaction finality.");
294 self.metrics.wait_for_finality_timeout.inc();
295 return Err(QuorumDriverError::TimeoutBeforeFinality);
296 };
297 add_server_timing("wait_for_finality");
298
299 drop(_txn_finality_timer);
300 drop(_wait_for_finality_gauge);
301 self.metrics.wait_for_finality_finished.inc();
302
303 match result {
304 Err(err) => {
305 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
306 Err(QuorumDriverError::QuorumDriverInternal(err))
307 }
308 Ok(Err(err)) => Err(err),
309 Ok(Ok(response)) => {
310 good_response_metrics.inc();
311 Ok((transaction, response))
312 }
313 }
314 }
315
316 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
319 async fn submit(
320 &self,
321 transaction: VerifiedTransaction,
322 request: ExecuteTransactionRequestV1,
323 client_addr: Option<SocketAddr>,
324 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
325 let tx_digest = *transaction.digest();
326 let ticket = self.notifier.register_one(&tx_digest);
327 if self
330 .pending_tx_log
331 .write_pending_transaction_maybe(&transaction)
332 .await?
333 {
334 debug!(?tx_digest, "no pending request in flight, submitting.");
335 self.quorum_driver()
336 .submit_transaction_no_ticket(request.clone(), client_addr)
337 .await?;
338 }
339 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
345 let qd = self.clone_quorum_driver();
346 Ok(async move {
347 let digests = [tx_digest];
348 let effects_await = cache_reader.notify_read_executed_effects(&digests);
349 let res = match select(ticket, effects_await.boxed()).await {
351 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
352 Either::Right((_, unfinished_quorum_driver_task)) => {
353 debug!(
354 ?tx_digest,
355 "Effects are available in DB, use quorum driver to get a certificate"
356 );
357 qd.submit_transaction_no_ticket(request, client_addr)
358 .await?;
359 Ok(unfinished_quorum_driver_task.await)
360 }
361 };
362 #[expect(clippy::let_and_return)]
363 res
364 })
365 }
366
367 #[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
368 async fn execute_finalized_tx_locally_with_timeout(
369 validator_state: &Arc<AuthorityState>,
370 epoch_store: &Arc<AuthorityPerEpochStore>,
371 transaction: &VerifiedExecutableTransaction,
372 effects_cert: &VerifiedCertifiedTransactionEffects,
373 metrics: &TransactionOrchestratorMetrics,
374 ) -> IotaResult {
375 let tx_digest = transaction.digest();
385 if validator_state.is_tx_already_executed(tx_digest)? {
386 return Ok(());
387 }
388 metrics.local_execution_in_flight.inc();
389 let _metrics_guard =
390 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
391 in_flight.dec();
392 });
393
394 let _guard = if transaction.contains_shared_object() {
395 metrics.local_execution_latency_shared_obj.start_timer()
396 } else {
397 metrics.local_execution_latency_single_writer.start_timer()
398 };
399 debug!(?tx_digest, "Executing finalized tx locally.");
400 match timeout(
401 LOCAL_EXECUTION_TIMEOUT,
402 validator_state.fullnode_execute_certificate_with_effects(
403 transaction,
404 effects_cert,
405 epoch_store,
406 ),
407 )
408 .instrument(error_span!(
409 "transaction_orchestrator::local_execution",
410 ?tx_digest
411 ))
412 .await
413 {
414 Err(_elapsed) => {
415 debug!(
416 ?tx_digest,
417 "Executing tx locally by orchestrator timed out within {:?}.",
418 LOCAL_EXECUTION_TIMEOUT
419 );
420 metrics.local_execution_timeout.inc();
421 Err(IotaError::Timeout)
422 }
423 Ok(Err(err)) => {
424 debug!(
425 ?tx_digest,
426 "Executing tx locally by orchestrator failed with error: {:?}", err
427 );
428 metrics.local_execution_failure.inc();
429 Err(IotaError::TransactionOrchestratorLocalExecution {
430 error: err.to_string(),
431 })
432 }
433 Ok(Ok(_)) => {
434 metrics.local_execution_success.inc();
435 Ok(())
436 }
437 }
438 }
439
440 async fn loop_execute_finalized_tx_locally(
441 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
442 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
443 ) {
444 loop {
445 match effects_receiver.recv().await {
446 Ok(Ok((transaction, ..))) => {
447 let tx_digest = transaction.digest();
448 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
449 error!(
450 ?tx_digest,
451 "Failed to finish transaction in pending transaction log: {err}"
452 );
453 }
454 }
455 Ok(Err((tx_digest, _err))) => {
456 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
457 error!(
458 ?tx_digest,
459 "Failed to finish transaction in pending transaction log: {err}"
460 );
461 }
462 }
463 Err(RecvError::Closed) => {
464 error!("Sender of effects subscriber queue has been dropped!");
465 return;
466 }
467 Err(RecvError::Lagged(skipped_count)) => {
468 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
469 }
470 }
471 }
472 }
473
474 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
475 &self.quorum_driver_handler
476 }
477
478 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
479 self.quorum_driver_handler.clone()
480 }
481
482 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
483 self.quorum_driver().authority_aggregator().load_full()
484 }
485
486 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
487 self.quorum_driver_handler.subscribe_to_effects()
488 }
489
490 fn update_metrics(
491 &'_ self,
492 transaction: &VerifiedTransaction,
493 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
494 let (in_flight, good_response) = if transaction.contains_shared_object() {
495 self.metrics.total_req_received_shared_object.inc();
496 (
497 self.metrics.req_in_flight_shared_object.clone(),
498 &self.metrics.good_response_shared_object,
499 )
500 } else {
501 self.metrics.total_req_received_single_writer.inc();
502 (
503 self.metrics.req_in_flight_single_writer.clone(),
504 &self.metrics.good_response_single_writer,
505 )
506 };
507 in_flight.inc();
508 (
509 scopeguard::guard(in_flight, |in_flight| {
510 in_flight.dec();
511 }),
512 good_response,
513 )
514 }
515
516 fn schedule_txes_in_log(
517 pending_tx_log: Arc<WritePathPendingTransactionLog>,
518 quorum_driver: Arc<QuorumDriverHandler<A>>,
519 ) {
520 spawn_logged_monitored_task!(async move {
521 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
522 info!("Skipping loading pending transactions from pending_tx_log.");
523 return;
524 }
525 let pending_txes = pending_tx_log.load_all_pending_transactions();
526 info!(
527 "Recovering {} pending transactions from pending_tx_log.",
528 pending_txes.len()
529 );
530 for (i, tx) in pending_txes.into_iter().enumerate() {
531 let tx = tx.into_inner();
534 let tx_digest = *tx.digest();
535 if let Err(err) = quorum_driver
538 .submit_transaction_no_ticket(
539 ExecuteTransactionRequestV1 {
540 transaction: tx,
541 include_events: true,
542 include_input_objects: false,
543 include_output_objects: false,
544 include_auxiliary_data: false,
545 },
546 None,
547 )
548 .await
549 {
550 warn!(
551 ?tx_digest,
552 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
553 );
554 } else {
555 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
556 if (i + 1) % 1000 == 0 {
557 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
558 }
559 }
560 }
561 });
565 }
566
567 pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
568 self.pending_tx_log.load_all_pending_transactions()
569 }
570}
571
572#[derive(Clone)]
574pub struct TransactionOrchestratorMetrics {
575 total_req_received_single_writer: GenericCounter<AtomicU64>,
576 total_req_received_shared_object: GenericCounter<AtomicU64>,
577
578 good_response_single_writer: GenericCounter<AtomicU64>,
579 good_response_shared_object: GenericCounter<AtomicU64>,
580
581 req_in_flight_single_writer: GenericGauge<AtomicI64>,
582 req_in_flight_shared_object: GenericGauge<AtomicI64>,
583
584 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
585 wait_for_finality_finished: GenericCounter<AtomicU64>,
586 wait_for_finality_timeout: GenericCounter<AtomicU64>,
587
588 local_execution_in_flight: GenericGauge<AtomicI64>,
589 local_execution_success: GenericCounter<AtomicU64>,
590 local_execution_timeout: GenericCounter<AtomicU64>,
591 local_execution_failure: GenericCounter<AtomicU64>,
592
593 request_latency_single_writer: Histogram,
594 request_latency_shared_obj: Histogram,
595 wait_for_finality_latency_single_writer: Histogram,
596 wait_for_finality_latency_shared_obj: Histogram,
597 local_execution_latency_single_writer: Histogram,
598 local_execution_latency_shared_obj: Histogram,
599}
600
601impl TransactionOrchestratorMetrics {
605 pub fn new(registry: &Registry) -> Self {
606 let total_req_received = register_int_counter_vec_with_registry!(
607 "tx_orchestrator_total_req_received",
608 "Total number of executions request Transaction Orchestrator receives, group by tx type",
609 &["tx_type"],
610 registry
611 )
612 .unwrap();
613
614 let total_req_received_single_writer =
615 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
616 let total_req_received_shared_object =
617 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
618
619 let good_response = register_int_counter_vec_with_registry!(
620 "tx_orchestrator_good_response",
621 "Total number of good responses Transaction Orchestrator generates, group by tx type",
622 &["tx_type"],
623 registry
624 )
625 .unwrap();
626
627 let good_response_single_writer =
628 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
629 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
630
631 let req_in_flight = register_int_gauge_vec_with_registry!(
632 "tx_orchestrator_req_in_flight",
633 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
634 &["tx_type"],
635 registry
636 )
637 .unwrap();
638
639 let req_in_flight_single_writer =
640 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
641 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
642
643 let request_latency = HistogramVec::new_in_registry(
644 "tx_orchestrator_request_latency",
645 "Time spent in processing one Transaction Orchestrator request",
646 &["tx_type"],
647 registry,
648 );
649 let wait_for_finality_latency = HistogramVec::new_in_registry(
650 "tx_orchestrator_wait_for_finality_latency",
651 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
652 &["tx_type"],
653 registry,
654 );
655 let local_execution_latency = HistogramVec::new_in_registry(
656 "tx_orchestrator_local_execution_latency",
657 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
658 &["tx_type"],
659 registry,
660 );
661
662 Self {
663 total_req_received_single_writer,
664 total_req_received_shared_object,
665 good_response_single_writer,
666 good_response_shared_object,
667 req_in_flight_single_writer,
668 req_in_flight_shared_object,
669 wait_for_finality_in_flight: register_int_gauge_with_registry!(
670 "tx_orchestrator_wait_for_finality_in_flight",
671 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
672 registry,
673 )
674 .unwrap(),
675 wait_for_finality_finished: register_int_counter_with_registry!(
676 "tx_orchestrator_wait_for_finality_finished",
677 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
678 registry,
679 )
680 .unwrap(),
681 wait_for_finality_timeout: register_int_counter_with_registry!(
682 "tx_orchestrator_wait_for_finality_timeout",
683 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
684 registry,
685 )
686 .unwrap(),
687 local_execution_in_flight: register_int_gauge_with_registry!(
688 "tx_orchestrator_local_execution_in_flight",
689 "Number of local execution txns in flights Transaction Orchestrator handles",
690 registry,
691 )
692 .unwrap(),
693 local_execution_success: register_int_counter_with_registry!(
694 "tx_orchestrator_local_execution_success",
695 "Total number of successful local execution txns Transaction Orchestrator handles",
696 registry,
697 )
698 .unwrap(),
699 local_execution_timeout: register_int_counter_with_registry!(
700 "tx_orchestrator_local_execution_timeout",
701 "Total number of timed-out local execution txns Transaction Orchestrator handles",
702 registry,
703 )
704 .unwrap(),
705 local_execution_failure: register_int_counter_with_registry!(
706 "tx_orchestrator_local_execution_failure",
707 "Total number of failed local execution txns Transaction Orchestrator handles",
708 registry,
709 )
710 .unwrap(),
711 request_latency_single_writer: request_latency
712 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
713 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
714 wait_for_finality_latency_single_writer: wait_for_finality_latency
715 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
716 wait_for_finality_latency_shared_obj: wait_for_finality_latency
717 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
718 local_execution_latency_single_writer: local_execution_latency
719 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
720 local_execution_latency_shared_obj: local_execution_latency
721 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
722 }
723 }
724
725 pub fn new_for_tests() -> Self {
726 let registry = Registry::new();
727 Self::new(®istry)
728 }
729}
730
731#[async_trait::async_trait]
732impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
733where
734 A: AuthorityAPI + Send + Sync + 'static + Clone,
735{
736 async fn execute_transaction(
737 &self,
738 request: ExecuteTransactionRequestV1,
739 client_addr: Option<std::net::SocketAddr>,
740 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
741 self.execute_transaction_v1(request, client_addr).await
742 }
743}