1use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12 FutureExt,
13 future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18 spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22 base_types::TransactionDigest,
23 effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects},
24 error::{IotaError, IotaResult},
25 executable_transaction::VerifiedExecutableTransaction,
26 iota_system_state::IotaSystemState,
27 quorum_driver_types::{
28 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
29 FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
30 QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
31 },
32 transaction::{TransactionData, VerifiedTransaction},
33 transaction_executor::SimulateTransactionResult,
34};
35use prometheus::{
36 Histogram, Registry,
37 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
38 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
39 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
40 register_int_gauge_with_registry,
41};
42use tokio::{
43 sync::broadcast::{Receiver, error::RecvError},
44 task::JoinHandle,
45 time::timeout,
46};
47use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
48
49use crate::{
50 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
51 authority_aggregator::AuthorityAggregator,
52 authority_client::{AuthorityAPI, NetworkAuthorityClient},
53 quorum_driver::{
54 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
55 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
56 },
57};
58
59const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
62
63const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
64
65pub struct TransactionOrchestrator<A: Clone> {
69 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
70 validator_state: Arc<AuthorityState>,
71 _local_executor_handle: JoinHandle<()>,
72 pending_tx_log: Arc<WritePathPendingTransactionLog>,
73 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
74 metrics: Arc<TransactionOrchestratorMetrics>,
75}
76
77impl TransactionOrchestrator<NetworkAuthorityClient> {
78 pub fn new_with_auth_aggregator(
79 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
80 validator_state: Arc<AuthorityState>,
81 reconfig_channel: Receiver<IotaSystemState>,
82 parent_path: &Path,
83 prometheus_registry: &Registry,
84 ) -> Self {
85 let observer = OnsiteReconfigObserver::new(
86 reconfig_channel,
87 validator_state.get_object_cache_reader().clone(),
88 validator_state.clone_committee_store(),
89 validators.safe_client_metrics_base.clone(),
90 validators.metrics.deref().clone(),
91 );
92
93 TransactionOrchestrator::new(
94 validators,
95 validator_state,
96 parent_path,
97 prometheus_registry,
98 observer,
99 )
100 }
101}
102
103impl<A> TransactionOrchestrator<A>
104where
105 A: AuthorityAPI + Send + Sync + 'static + Clone,
106 OnsiteReconfigObserver: ReconfigObserver<A>,
107{
108 pub fn new(
109 validators: Arc<AuthorityAggregator<A>>,
110 validator_state: Arc<AuthorityState>,
111 parent_path: &Path,
112 prometheus_registry: &Registry,
113 reconfig_observer: OnsiteReconfigObserver,
114 ) -> Self {
115 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
116 let notifier = Arc::new(NotifyRead::new());
117 let reconfig_observer = Arc::new(reconfig_observer);
118 let quorum_driver_handler = Arc::new(
119 QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
120 .with_notifier(notifier.clone())
121 .with_reconfig_observer(reconfig_observer.clone())
122 .start(),
123 );
124
125 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
126 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
127 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
128 parent_path.join("fullnode_pending_transactions"),
129 ));
130 let pending_tx_log_clone = pending_tx_log.clone();
131 let _local_executor_handle = {
132 spawn_monitored_task!(async move {
133 Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
134 .await;
135 })
136 };
137 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138 Self {
139 quorum_driver_handler,
140 validator_state,
141 _local_executor_handle,
142 pending_tx_log,
143 notifier,
144 metrics,
145 }
146 }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151 A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
154 fields(
155 tx_digest = ?request.transaction.digest(),
156 tx_type = ?request_type,
157 ),
158 err)]
159 pub async fn execute_transaction_block(
160 &self,
161 request: ExecuteTransactionRequestV1,
162 request_type: ExecuteTransactionRequestType,
163 client_addr: Option<SocketAddr>,
164 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165 {
166 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168 let (transaction, response) = self
169 .execute_transaction_impl(&epoch_store, request, client_addr)
170 .await?;
171
172 let executed_locally = if matches!(
173 request_type,
174 ExecuteTransactionRequestType::WaitForLocalExecution
175 ) {
176 let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
177 transaction,
178 response.effects_cert.executed_epoch(),
179 );
180 let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
181 &self.validator_state,
182 &epoch_store,
183 &executable_tx,
184 &response.effects_cert,
185 &self.metrics,
186 )
187 .await
188 .is_ok();
189 add_server_timing("local_execution");
190 executed_locally
191 } else {
192 false
193 };
194
195 let QuorumDriverResponse {
196 effects_cert,
197 events,
198 input_objects,
199 output_objects,
200 auxiliary_data,
201 } = response;
202
203 let response = ExecuteTransactionResponseV1 {
204 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
205 events,
206 input_objects,
207 output_objects,
208 auxiliary_data,
209 };
210
211 Ok((response, executed_locally))
212 }
213
214 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
217 fields(tx_digest = ?request.transaction.digest()))]
218 pub async fn execute_transaction_v1(
219 &self,
220 request: ExecuteTransactionRequestV1,
221 client_addr: Option<SocketAddr>,
222 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
223 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
224
225 let QuorumDriverResponse {
226 effects_cert,
227 events,
228 input_objects,
229 output_objects,
230 auxiliary_data,
231 } = self
232 .execute_transaction_impl(&epoch_store, request, client_addr)
233 .await
234 .map(|(_, r)| r)?;
235
236 Ok(ExecuteTransactionResponseV1 {
237 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
238 events,
239 input_objects,
240 output_objects,
241 auxiliary_data,
242 })
243 }
244
245 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
249 pub async fn execute_transaction_impl(
250 &self,
251 epoch_store: &AuthorityPerEpochStore,
252 request: ExecuteTransactionRequestV1,
253 client_addr: Option<SocketAddr>,
254 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
255 let transaction = epoch_store
256 .verify_transaction(request.transaction.clone())
257 .map_err(QuorumDriverError::InvalidUserSignature)?;
258 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
259 let tx_digest = *transaction.digest();
260 debug!(?tx_digest, "TO Received transaction execution request.");
261
262 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
263 (
264 self.metrics.request_latency_shared_obj.start_timer(),
265 self.metrics
266 .wait_for_finality_latency_shared_obj
267 .start_timer(),
268 )
269 } else {
270 (
271 self.metrics.request_latency_single_writer.start_timer(),
272 self.metrics
273 .wait_for_finality_latency_single_writer
274 .start_timer(),
275 )
276 };
277
278 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
280 wait_for_finality_gauge.inc();
281 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
282 in_flight.dec();
283 });
284
285 let ticket = self
286 .submit(transaction.clone(), request, client_addr)
287 .await
288 .map_err(|e| {
289 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
290 QuorumDriverError::QuorumDriverInternal(e)
291 })?;
292
293 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
294 debug!(?tx_digest, "Timeout waiting for transaction finality.");
295 self.metrics.wait_for_finality_timeout.inc();
296 return Err(QuorumDriverError::TimeoutBeforeFinality);
297 };
298 add_server_timing("wait_for_finality");
299
300 drop(_txn_finality_timer);
301 drop(_wait_for_finality_gauge);
302 self.metrics.wait_for_finality_finished.inc();
303
304 match result {
305 Err(err) => {
306 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
307 Err(QuorumDriverError::QuorumDriverInternal(err))
308 }
309 Ok(Err(err)) => Err(err),
310 Ok(Ok(response)) => {
311 good_response_metrics.inc();
312 Ok((transaction, response))
313 }
314 }
315 }
316
317 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
320 async fn submit(
321 &self,
322 transaction: VerifiedTransaction,
323 request: ExecuteTransactionRequestV1,
324 client_addr: Option<SocketAddr>,
325 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
326 let tx_digest = *transaction.digest();
327 let ticket = self.notifier.register_one(&tx_digest);
328 if self
331 .pending_tx_log
332 .write_pending_transaction_maybe(&transaction)
333 .await?
334 {
335 debug!(?tx_digest, "no pending request in flight, submitting.");
336 self.quorum_driver()
337 .submit_transaction_no_ticket(request.clone(), client_addr)
338 .await?;
339 }
340 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
346 let qd = self.clone_quorum_driver();
347 Ok(async move {
348 let digests = [tx_digest];
349 let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
350 let res = match select(ticket, effects_await.boxed()).await {
352 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
353 Either::Right((_, unfinished_quorum_driver_task)) => {
354 debug!(
355 ?tx_digest,
356 "Effects are available in DB, use quorum driver to get a certificate"
357 );
358 qd.submit_transaction_no_ticket(request, client_addr)
359 .await?;
360 Ok(unfinished_quorum_driver_task.await)
361 }
362 };
363 res
364 })
365 }
366
367 #[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
368 async fn execute_finalized_tx_locally_with_timeout(
369 validator_state: &Arc<AuthorityState>,
370 epoch_store: &Arc<AuthorityPerEpochStore>,
371 transaction: &VerifiedExecutableTransaction,
372 effects_cert: &VerifiedCertifiedTransactionEffects,
373 metrics: &TransactionOrchestratorMetrics,
374 ) -> IotaResult {
375 let tx_digest = transaction.digest();
385 if validator_state.try_is_tx_already_executed(tx_digest)? {
386 return Ok(());
387 }
388 metrics.local_execution_in_flight.inc();
389 let _metrics_guard =
390 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
391 in_flight.dec();
392 });
393
394 let _guard = if transaction.contains_shared_object() {
395 metrics.local_execution_latency_shared_obj.start_timer()
396 } else {
397 metrics.local_execution_latency_single_writer.start_timer()
398 };
399 debug!(?tx_digest, "Executing finalized tx locally.");
400 match timeout(
401 LOCAL_EXECUTION_TIMEOUT,
402 validator_state.fullnode_execute_certificate_with_effects(
403 transaction,
404 effects_cert,
405 epoch_store,
406 ),
407 )
408 .instrument(trace_span!("local_execution"))
409 .await
410 {
411 Err(_elapsed) => {
412 debug!(
413 ?tx_digest,
414 "Executing tx locally by orchestrator timed out within {:?}.",
415 LOCAL_EXECUTION_TIMEOUT
416 );
417 metrics.local_execution_timeout.inc();
418 Err(IotaError::Timeout)
419 }
420 Ok(Err(err)) => {
421 debug!(
422 ?tx_digest,
423 "Executing tx locally by orchestrator failed with error: {:?}", err
424 );
425 metrics.local_execution_failure.inc();
426 Err(IotaError::TransactionOrchestratorLocalExecution {
427 error: err.to_string(),
428 })
429 }
430 Ok(Ok(_)) => {
431 metrics.local_execution_success.inc();
432 Ok(())
433 }
434 }
435 }
436
437 async fn loop_execute_finalized_tx_locally(
438 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
439 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
440 ) {
441 loop {
442 match effects_receiver.recv().await {
443 Ok(Ok((transaction, ..))) => {
444 let tx_digest = transaction.digest();
445 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
446 error!(
447 ?tx_digest,
448 "Failed to finish transaction in pending transaction log: {err}"
449 );
450 }
451 }
452 Ok(Err((tx_digest, _err))) => {
453 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
454 error!(
455 ?tx_digest,
456 "Failed to finish transaction in pending transaction log: {err}"
457 );
458 }
459 }
460 Err(RecvError::Closed) => {
461 error!("Sender of effects subscriber queue has been dropped!");
462 return;
463 }
464 Err(RecvError::Lagged(skipped_count)) => {
465 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
466 }
467 }
468 }
469 }
470
471 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
472 &self.quorum_driver_handler
473 }
474
475 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
476 self.quorum_driver_handler.clone()
477 }
478
479 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
480 self.quorum_driver().authority_aggregator().load_full()
481 }
482
483 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
484 self.quorum_driver_handler.subscribe_to_effects()
485 }
486
487 fn update_metrics(
488 &'_ self,
489 transaction: &VerifiedTransaction,
490 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
491 let (in_flight, good_response) = if transaction.contains_shared_object() {
492 self.metrics.total_req_received_shared_object.inc();
493 (
494 self.metrics.req_in_flight_shared_object.clone(),
495 &self.metrics.good_response_shared_object,
496 )
497 } else {
498 self.metrics.total_req_received_single_writer.inc();
499 (
500 self.metrics.req_in_flight_single_writer.clone(),
501 &self.metrics.good_response_single_writer,
502 )
503 };
504 in_flight.inc();
505 (
506 scopeguard::guard(in_flight, |in_flight| {
507 in_flight.dec();
508 }),
509 good_response,
510 )
511 }
512
513 fn schedule_txes_in_log(
514 pending_tx_log: Arc<WritePathPendingTransactionLog>,
515 quorum_driver: Arc<QuorumDriverHandler<A>>,
516 ) {
517 spawn_logged_monitored_task!(async move {
518 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
519 info!("Skipping loading pending transactions from pending_tx_log.");
520 return;
521 }
522 let pending_txes = pending_tx_log.load_all_pending_transactions();
523 info!(
524 "Recovering {} pending transactions from pending_tx_log.",
525 pending_txes.len()
526 );
527 for (i, tx) in pending_txes.into_iter().enumerate() {
528 let tx = tx.into_inner();
531 let tx_digest = *tx.digest();
532 if let Err(err) = quorum_driver
535 .submit_transaction_no_ticket(
536 ExecuteTransactionRequestV1 {
537 transaction: tx,
538 include_events: true,
539 include_input_objects: false,
540 include_output_objects: false,
541 include_auxiliary_data: false,
542 },
543 None,
544 )
545 .await
546 {
547 warn!(
548 ?tx_digest,
549 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
550 );
551 } else {
552 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
553 if (i + 1) % 1000 == 0 {
554 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
555 }
556 }
557 }
558 });
562 }
563
564 pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
565 self.pending_tx_log.load_all_pending_transactions()
566 }
567}
568
569#[derive(Clone)]
571pub struct TransactionOrchestratorMetrics {
572 total_req_received_single_writer: GenericCounter<AtomicU64>,
573 total_req_received_shared_object: GenericCounter<AtomicU64>,
574
575 good_response_single_writer: GenericCounter<AtomicU64>,
576 good_response_shared_object: GenericCounter<AtomicU64>,
577
578 req_in_flight_single_writer: GenericGauge<AtomicI64>,
579 req_in_flight_shared_object: GenericGauge<AtomicI64>,
580
581 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
582 wait_for_finality_finished: GenericCounter<AtomicU64>,
583 wait_for_finality_timeout: GenericCounter<AtomicU64>,
584
585 local_execution_in_flight: GenericGauge<AtomicI64>,
586 local_execution_success: GenericCounter<AtomicU64>,
587 local_execution_timeout: GenericCounter<AtomicU64>,
588 local_execution_failure: GenericCounter<AtomicU64>,
589
590 request_latency_single_writer: Histogram,
591 request_latency_shared_obj: Histogram,
592 wait_for_finality_latency_single_writer: Histogram,
593 wait_for_finality_latency_shared_obj: Histogram,
594 local_execution_latency_single_writer: Histogram,
595 local_execution_latency_shared_obj: Histogram,
596}
597
598impl TransactionOrchestratorMetrics {
602 pub fn new(registry: &Registry) -> Self {
603 let total_req_received = register_int_counter_vec_with_registry!(
604 "tx_orchestrator_total_req_received",
605 "Total number of executions request Transaction Orchestrator receives, group by tx type",
606 &["tx_type"],
607 registry
608 )
609 .unwrap();
610
611 let total_req_received_single_writer =
612 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
613 let total_req_received_shared_object =
614 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
615
616 let good_response = register_int_counter_vec_with_registry!(
617 "tx_orchestrator_good_response",
618 "Total number of good responses Transaction Orchestrator generates, group by tx type",
619 &["tx_type"],
620 registry
621 )
622 .unwrap();
623
624 let good_response_single_writer =
625 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
626 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
627
628 let req_in_flight = register_int_gauge_vec_with_registry!(
629 "tx_orchestrator_req_in_flight",
630 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
631 &["tx_type"],
632 registry
633 )
634 .unwrap();
635
636 let req_in_flight_single_writer =
637 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
638 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
639
640 let request_latency = register_histogram_vec_with_registry!(
641 "tx_orchestrator_request_latency",
642 "Time spent in processing one Transaction Orchestrator request",
643 &["tx_type"],
644 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
645 registry,
646 )
647 .unwrap();
648 let wait_for_finality_latency = register_histogram_vec_with_registry!(
649 "tx_orchestrator_wait_for_finality_latency",
650 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
651 &["tx_type"],
652 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
653 registry,
654 )
655 .unwrap();
656 let local_execution_latency = register_histogram_vec_with_registry!(
657 "tx_orchestrator_local_execution_latency",
658 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
659 &["tx_type"],
660 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
661 registry,
662 )
663 .unwrap();
664
665 Self {
666 total_req_received_single_writer,
667 total_req_received_shared_object,
668 good_response_single_writer,
669 good_response_shared_object,
670 req_in_flight_single_writer,
671 req_in_flight_shared_object,
672 wait_for_finality_in_flight: register_int_gauge_with_registry!(
673 "tx_orchestrator_wait_for_finality_in_flight",
674 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
675 registry,
676 )
677 .unwrap(),
678 wait_for_finality_finished: register_int_counter_with_registry!(
679 "tx_orchestrator_wait_for_finality_finished",
680 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
681 registry,
682 )
683 .unwrap(),
684 wait_for_finality_timeout: register_int_counter_with_registry!(
685 "tx_orchestrator_wait_for_finality_timeout",
686 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
687 registry,
688 )
689 .unwrap(),
690 local_execution_in_flight: register_int_gauge_with_registry!(
691 "tx_orchestrator_local_execution_in_flight",
692 "Number of local execution txns in flights Transaction Orchestrator handles",
693 registry,
694 )
695 .unwrap(),
696 local_execution_success: register_int_counter_with_registry!(
697 "tx_orchestrator_local_execution_success",
698 "Total number of successful local execution txns Transaction Orchestrator handles",
699 registry,
700 )
701 .unwrap(),
702 local_execution_timeout: register_int_counter_with_registry!(
703 "tx_orchestrator_local_execution_timeout",
704 "Total number of timed-out local execution txns Transaction Orchestrator handles",
705 registry,
706 )
707 .unwrap(),
708 local_execution_failure: register_int_counter_with_registry!(
709 "tx_orchestrator_local_execution_failure",
710 "Total number of failed local execution txns Transaction Orchestrator handles",
711 registry,
712 )
713 .unwrap(),
714 request_latency_single_writer: request_latency
715 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
716 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
717 wait_for_finality_latency_single_writer: wait_for_finality_latency
718 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
719 wait_for_finality_latency_shared_obj: wait_for_finality_latency
720 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
721 local_execution_latency_single_writer: local_execution_latency
722 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
723 local_execution_latency_shared_obj: local_execution_latency
724 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
725 }
726 }
727
728 pub fn new_for_tests() -> Self {
729 let registry = Registry::new();
730 Self::new(®istry)
731 }
732}
733
734#[async_trait::async_trait]
735impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
736where
737 A: AuthorityAPI + Send + Sync + 'static + Clone,
738{
739 async fn execute_transaction(
740 &self,
741 request: ExecuteTransactionRequestV1,
742 client_addr: Option<std::net::SocketAddr>,
743 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
744 self.execute_transaction_v1(request, client_addr).await
745 }
746
747 fn simulate_transaction(
748 &self,
749 transaction: TransactionData,
750 ) -> Result<SimulateTransactionResult, IotaError> {
751 self.validator_state.simulate_transaction(transaction)
752 }
753}