1use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12 FutureExt,
13 future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18 spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22 base_types::TransactionDigest,
23 effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects},
24 error::{IotaError, IotaResult},
25 executable_transaction::VerifiedExecutableTransaction,
26 iota_system_state::IotaSystemState,
27 quorum_driver_types::{
28 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
29 FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
30 QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
31 },
32 transaction::{TransactionData, VerifiedTransaction},
33 transaction_executor::SimulateTransactionResult,
34};
35use prometheus::{
36 Histogram, Registry,
37 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
38 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
39 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
40 register_int_gauge_with_registry,
41};
42use tokio::{
43 sync::broadcast::{Receiver, error::RecvError},
44 task::JoinHandle,
45 time::timeout,
46};
47use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
48
49use crate::{
50 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
51 authority_aggregator::AuthorityAggregator,
52 authority_client::{AuthorityAPI, NetworkAuthorityClient},
53 quorum_driver::{
54 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
55 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
56 },
57};
58
59const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
62
63const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
64
65pub struct TransactionOrchestrator<A: Clone> {
69 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
70 validator_state: Arc<AuthorityState>,
71 _local_executor_handle: JoinHandle<()>,
72 pending_tx_log: Arc<WritePathPendingTransactionLog>,
73 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
74 metrics: Arc<TransactionOrchestratorMetrics>,
75}
76
77impl TransactionOrchestrator<NetworkAuthorityClient> {
78 pub fn new_with_auth_aggregator(
79 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
80 validator_state: Arc<AuthorityState>,
81 reconfig_channel: Receiver<IotaSystemState>,
82 parent_path: &Path,
83 prometheus_registry: &Registry,
84 ) -> Self {
85 let observer = OnsiteReconfigObserver::new(
86 reconfig_channel,
87 validator_state.get_object_cache_reader().clone(),
88 validator_state.clone_committee_store(),
89 validators.safe_client_metrics_base.clone(),
90 validators.metrics.deref().clone(),
91 );
92
93 TransactionOrchestrator::new(
94 validators,
95 validator_state,
96 parent_path,
97 prometheus_registry,
98 observer,
99 )
100 }
101}
102
103impl<A> TransactionOrchestrator<A>
104where
105 A: AuthorityAPI + Send + Sync + 'static + Clone,
106 OnsiteReconfigObserver: ReconfigObserver<A>,
107{
108 pub fn new(
109 validators: Arc<AuthorityAggregator<A>>,
110 validator_state: Arc<AuthorityState>,
111 parent_path: &Path,
112 prometheus_registry: &Registry,
113 reconfig_observer: OnsiteReconfigObserver,
114 ) -> Self {
115 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
116 let notifier = Arc::new(NotifyRead::new());
117 let reconfig_observer = Arc::new(reconfig_observer);
118 let quorum_driver_handler = Arc::new(
119 QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
120 .with_notifier(notifier.clone())
121 .with_reconfig_observer(reconfig_observer.clone())
122 .start(),
123 );
124
125 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
126 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
127 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
128 parent_path.join("fullnode_pending_transactions"),
129 ));
130 let pending_tx_log_clone = pending_tx_log.clone();
131 let _local_executor_handle = {
132 spawn_monitored_task!(async move {
133 Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
134 .await;
135 })
136 };
137 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138 Self {
139 quorum_driver_handler,
140 validator_state,
141 _local_executor_handle,
142 pending_tx_log,
143 notifier,
144 metrics,
145 }
146 }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151 A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "debug", skip_all,
154 fields(
155 tx_digest = ?request.transaction.digest(),
156 tx_type = ?request_type,
157 ),
158 err)]
159 pub async fn execute_transaction_block(
160 &self,
161 request: ExecuteTransactionRequestV1,
162 request_type: ExecuteTransactionRequestType,
163 client_addr: Option<SocketAddr>,
164 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165 {
166 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168 let (transaction, response) = self
169 .execute_transaction_impl(&epoch_store, request, client_addr)
170 .await?;
171
172 let executed_locally = if matches!(
173 request_type,
174 ExecuteTransactionRequestType::WaitForLocalExecution
175 ) {
176 let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
177 transaction,
178 response.effects_cert.executed_epoch(),
179 );
180 let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
181 &self.validator_state,
182 &epoch_store,
183 &executable_tx,
184 &response.effects_cert,
185 &self.metrics,
186 )
187 .await
188 .is_ok();
189 add_server_timing("local_execution");
190 executed_locally
191 } else {
192 false
193 };
194
195 let QuorumDriverResponse {
196 effects_cert,
197 events,
198 input_objects,
199 output_objects,
200 auxiliary_data,
201 } = response;
202
203 let response = ExecuteTransactionResponseV1 {
204 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
205 events,
206 input_objects,
207 output_objects,
208 auxiliary_data,
209 };
210
211 Ok((response, executed_locally))
212 }
213
214 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
217 fields(tx_digest = ?request.transaction.digest()))]
218 pub async fn execute_transaction_v1(
219 &self,
220 request: ExecuteTransactionRequestV1,
221 client_addr: Option<SocketAddr>,
222 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
223 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
224
225 let QuorumDriverResponse {
226 effects_cert,
227 events,
228 input_objects,
229 output_objects,
230 auxiliary_data,
231 } = self
232 .execute_transaction_impl(&epoch_store, request, client_addr)
233 .await
234 .map(|(_, r)| r)?;
235
236 Ok(ExecuteTransactionResponseV1 {
237 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
238 events,
239 input_objects,
240 output_objects,
241 auxiliary_data,
242 })
243 }
244
245 pub async fn execute_transaction_impl(
249 &self,
250 epoch_store: &AuthorityPerEpochStore,
251 request: ExecuteTransactionRequestV1,
252 client_addr: Option<SocketAddr>,
253 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
254 let transaction = epoch_store
255 .verify_transaction(request.transaction.clone())
256 .map_err(QuorumDriverError::InvalidUserSignature)?;
257 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
258 let tx_digest = *transaction.digest();
259 debug!(?tx_digest, "TO Received transaction execution request.");
260
261 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
262 (
263 self.metrics.request_latency_shared_obj.start_timer(),
264 self.metrics
265 .wait_for_finality_latency_shared_obj
266 .start_timer(),
267 )
268 } else {
269 (
270 self.metrics.request_latency_single_writer.start_timer(),
271 self.metrics
272 .wait_for_finality_latency_single_writer
273 .start_timer(),
274 )
275 };
276
277 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
279 wait_for_finality_gauge.inc();
280 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
281 in_flight.dec();
282 });
283
284 let ticket = self
285 .submit(transaction.clone(), request, client_addr)
286 .await
287 .map_err(|e| {
288 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
289 QuorumDriverError::QuorumDriverInternal(e)
290 })?;
291
292 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
293 debug!(?tx_digest, "Timeout waiting for transaction finality.");
294 self.metrics.wait_for_finality_timeout.inc();
295 return Err(QuorumDriverError::TimeoutBeforeFinality);
296 };
297 add_server_timing("wait_for_finality");
298
299 drop(_txn_finality_timer);
300 drop(_wait_for_finality_gauge);
301 self.metrics.wait_for_finality_finished.inc();
302
303 match result {
304 Err(err) => {
305 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
306 Err(QuorumDriverError::QuorumDriverInternal(err))
307 }
308 Ok(Err(err)) => Err(err),
309 Ok(Ok(response)) => {
310 good_response_metrics.inc();
311 Ok((transaction, response))
312 }
313 }
314 }
315
316 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
319 async fn submit(
320 &self,
321 transaction: VerifiedTransaction,
322 request: ExecuteTransactionRequestV1,
323 client_addr: Option<SocketAddr>,
324 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
325 let tx_digest = *transaction.digest();
326 let ticket = self.notifier.register_one(&tx_digest);
327 if self
330 .pending_tx_log
331 .write_pending_transaction_maybe(&transaction)
332 .await?
333 {
334 debug!(?tx_digest, "no pending request in flight, submitting.");
335 self.quorum_driver()
336 .submit_transaction_no_ticket(request.clone(), client_addr)
337 .await?;
338 }
339 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
345 let qd = self.clone_quorum_driver();
346 Ok(async move {
347 let digests = [tx_digest];
348 let effects_await = cache_reader.notify_read_executed_effects(&digests);
349 let res = match select(ticket, effects_await.boxed()).await {
351 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
352 Either::Right((_, unfinished_quorum_driver_task)) => {
353 debug!(
354 ?tx_digest,
355 "Effects are available in DB, use quorum driver to get a certificate"
356 );
357 qd.submit_transaction_no_ticket(request, client_addr)
358 .await?;
359 Ok(unfinished_quorum_driver_task.await)
360 }
361 };
362 res
363 })
364 }
365
366 #[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
367 async fn execute_finalized_tx_locally_with_timeout(
368 validator_state: &Arc<AuthorityState>,
369 epoch_store: &Arc<AuthorityPerEpochStore>,
370 transaction: &VerifiedExecutableTransaction,
371 effects_cert: &VerifiedCertifiedTransactionEffects,
372 metrics: &TransactionOrchestratorMetrics,
373 ) -> IotaResult {
374 let tx_digest = transaction.digest();
384 if validator_state.is_tx_already_executed(tx_digest)? {
385 return Ok(());
386 }
387 metrics.local_execution_in_flight.inc();
388 let _metrics_guard =
389 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
390 in_flight.dec();
391 });
392
393 let _guard = if transaction.contains_shared_object() {
394 metrics.local_execution_latency_shared_obj.start_timer()
395 } else {
396 metrics.local_execution_latency_single_writer.start_timer()
397 };
398 debug!(?tx_digest, "Executing finalized tx locally.");
399 match timeout(
400 LOCAL_EXECUTION_TIMEOUT,
401 validator_state.fullnode_execute_certificate_with_effects(
402 transaction,
403 effects_cert,
404 epoch_store,
405 ),
406 )
407 .instrument(error_span!(
408 "transaction_orchestrator::local_execution",
409 ?tx_digest
410 ))
411 .await
412 {
413 Err(_elapsed) => {
414 debug!(
415 ?tx_digest,
416 "Executing tx locally by orchestrator timed out within {:?}.",
417 LOCAL_EXECUTION_TIMEOUT
418 );
419 metrics.local_execution_timeout.inc();
420 Err(IotaError::Timeout)
421 }
422 Ok(Err(err)) => {
423 debug!(
424 ?tx_digest,
425 "Executing tx locally by orchestrator failed with error: {:?}", err
426 );
427 metrics.local_execution_failure.inc();
428 Err(IotaError::TransactionOrchestratorLocalExecution {
429 error: err.to_string(),
430 })
431 }
432 Ok(Ok(_)) => {
433 metrics.local_execution_success.inc();
434 Ok(())
435 }
436 }
437 }
438
439 async fn loop_execute_finalized_tx_locally(
440 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
441 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
442 ) {
443 loop {
444 match effects_receiver.recv().await {
445 Ok(Ok((transaction, ..))) => {
446 let tx_digest = transaction.digest();
447 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
448 error!(
449 ?tx_digest,
450 "Failed to finish transaction in pending transaction log: {err}"
451 );
452 }
453 }
454 Ok(Err((tx_digest, _err))) => {
455 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
456 error!(
457 ?tx_digest,
458 "Failed to finish transaction in pending transaction log: {err}"
459 );
460 }
461 }
462 Err(RecvError::Closed) => {
463 error!("Sender of effects subscriber queue has been dropped!");
464 return;
465 }
466 Err(RecvError::Lagged(skipped_count)) => {
467 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
468 }
469 }
470 }
471 }
472
473 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
474 &self.quorum_driver_handler
475 }
476
477 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
478 self.quorum_driver_handler.clone()
479 }
480
481 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
482 self.quorum_driver().authority_aggregator().load_full()
483 }
484
485 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
486 self.quorum_driver_handler.subscribe_to_effects()
487 }
488
489 fn update_metrics(
490 &'_ self,
491 transaction: &VerifiedTransaction,
492 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
493 let (in_flight, good_response) = if transaction.contains_shared_object() {
494 self.metrics.total_req_received_shared_object.inc();
495 (
496 self.metrics.req_in_flight_shared_object.clone(),
497 &self.metrics.good_response_shared_object,
498 )
499 } else {
500 self.metrics.total_req_received_single_writer.inc();
501 (
502 self.metrics.req_in_flight_single_writer.clone(),
503 &self.metrics.good_response_single_writer,
504 )
505 };
506 in_flight.inc();
507 (
508 scopeguard::guard(in_flight, |in_flight| {
509 in_flight.dec();
510 }),
511 good_response,
512 )
513 }
514
515 fn schedule_txes_in_log(
516 pending_tx_log: Arc<WritePathPendingTransactionLog>,
517 quorum_driver: Arc<QuorumDriverHandler<A>>,
518 ) {
519 spawn_logged_monitored_task!(async move {
520 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
521 info!("Skipping loading pending transactions from pending_tx_log.");
522 return;
523 }
524 let pending_txes = pending_tx_log.load_all_pending_transactions();
525 info!(
526 "Recovering {} pending transactions from pending_tx_log.",
527 pending_txes.len()
528 );
529 for (i, tx) in pending_txes.into_iter().enumerate() {
530 let tx = tx.into_inner();
533 let tx_digest = *tx.digest();
534 if let Err(err) = quorum_driver
537 .submit_transaction_no_ticket(
538 ExecuteTransactionRequestV1 {
539 transaction: tx,
540 include_events: true,
541 include_input_objects: false,
542 include_output_objects: false,
543 include_auxiliary_data: false,
544 },
545 None,
546 )
547 .await
548 {
549 warn!(
550 ?tx_digest,
551 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
552 );
553 } else {
554 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
555 if (i + 1) % 1000 == 0 {
556 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
557 }
558 }
559 }
560 });
564 }
565
566 pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
567 self.pending_tx_log.load_all_pending_transactions()
568 }
569}
570
571#[derive(Clone)]
573pub struct TransactionOrchestratorMetrics {
574 total_req_received_single_writer: GenericCounter<AtomicU64>,
575 total_req_received_shared_object: GenericCounter<AtomicU64>,
576
577 good_response_single_writer: GenericCounter<AtomicU64>,
578 good_response_shared_object: GenericCounter<AtomicU64>,
579
580 req_in_flight_single_writer: GenericGauge<AtomicI64>,
581 req_in_flight_shared_object: GenericGauge<AtomicI64>,
582
583 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
584 wait_for_finality_finished: GenericCounter<AtomicU64>,
585 wait_for_finality_timeout: GenericCounter<AtomicU64>,
586
587 local_execution_in_flight: GenericGauge<AtomicI64>,
588 local_execution_success: GenericCounter<AtomicU64>,
589 local_execution_timeout: GenericCounter<AtomicU64>,
590 local_execution_failure: GenericCounter<AtomicU64>,
591
592 request_latency_single_writer: Histogram,
593 request_latency_shared_obj: Histogram,
594 wait_for_finality_latency_single_writer: Histogram,
595 wait_for_finality_latency_shared_obj: Histogram,
596 local_execution_latency_single_writer: Histogram,
597 local_execution_latency_shared_obj: Histogram,
598}
599
600impl TransactionOrchestratorMetrics {
604 pub fn new(registry: &Registry) -> Self {
605 let total_req_received = register_int_counter_vec_with_registry!(
606 "tx_orchestrator_total_req_received",
607 "Total number of executions request Transaction Orchestrator receives, group by tx type",
608 &["tx_type"],
609 registry
610 )
611 .unwrap();
612
613 let total_req_received_single_writer =
614 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
615 let total_req_received_shared_object =
616 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
617
618 let good_response = register_int_counter_vec_with_registry!(
619 "tx_orchestrator_good_response",
620 "Total number of good responses Transaction Orchestrator generates, group by tx type",
621 &["tx_type"],
622 registry
623 )
624 .unwrap();
625
626 let good_response_single_writer =
627 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
628 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
629
630 let req_in_flight = register_int_gauge_vec_with_registry!(
631 "tx_orchestrator_req_in_flight",
632 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
633 &["tx_type"],
634 registry
635 )
636 .unwrap();
637
638 let req_in_flight_single_writer =
639 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
640 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
641
642 let request_latency = register_histogram_vec_with_registry!(
643 "tx_orchestrator_request_latency",
644 "Time spent in processing one Transaction Orchestrator request",
645 &["tx_type"],
646 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
647 registry,
648 )
649 .unwrap();
650 let wait_for_finality_latency = register_histogram_vec_with_registry!(
651 "tx_orchestrator_wait_for_finality_latency",
652 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
653 &["tx_type"],
654 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
655 registry,
656 )
657 .unwrap();
658 let local_execution_latency = register_histogram_vec_with_registry!(
659 "tx_orchestrator_local_execution_latency",
660 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
661 &["tx_type"],
662 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
663 registry,
664 )
665 .unwrap();
666
667 Self {
668 total_req_received_single_writer,
669 total_req_received_shared_object,
670 good_response_single_writer,
671 good_response_shared_object,
672 req_in_flight_single_writer,
673 req_in_flight_shared_object,
674 wait_for_finality_in_flight: register_int_gauge_with_registry!(
675 "tx_orchestrator_wait_for_finality_in_flight",
676 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
677 registry,
678 )
679 .unwrap(),
680 wait_for_finality_finished: register_int_counter_with_registry!(
681 "tx_orchestrator_wait_for_finality_finished",
682 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
683 registry,
684 )
685 .unwrap(),
686 wait_for_finality_timeout: register_int_counter_with_registry!(
687 "tx_orchestrator_wait_for_finality_timeout",
688 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
689 registry,
690 )
691 .unwrap(),
692 local_execution_in_flight: register_int_gauge_with_registry!(
693 "tx_orchestrator_local_execution_in_flight",
694 "Number of local execution txns in flights Transaction Orchestrator handles",
695 registry,
696 )
697 .unwrap(),
698 local_execution_success: register_int_counter_with_registry!(
699 "tx_orchestrator_local_execution_success",
700 "Total number of successful local execution txns Transaction Orchestrator handles",
701 registry,
702 )
703 .unwrap(),
704 local_execution_timeout: register_int_counter_with_registry!(
705 "tx_orchestrator_local_execution_timeout",
706 "Total number of timed-out local execution txns Transaction Orchestrator handles",
707 registry,
708 )
709 .unwrap(),
710 local_execution_failure: register_int_counter_with_registry!(
711 "tx_orchestrator_local_execution_failure",
712 "Total number of failed local execution txns Transaction Orchestrator handles",
713 registry,
714 )
715 .unwrap(),
716 request_latency_single_writer: request_latency
717 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
718 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
719 wait_for_finality_latency_single_writer: wait_for_finality_latency
720 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
721 wait_for_finality_latency_shared_obj: wait_for_finality_latency
722 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
723 local_execution_latency_single_writer: local_execution_latency
724 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
725 local_execution_latency_shared_obj: local_execution_latency
726 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
727 }
728 }
729
730 pub fn new_for_tests() -> Self {
731 let registry = Registry::new();
732 Self::new(®istry)
733 }
734}
735
736#[async_trait::async_trait]
737impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
738where
739 A: AuthorityAPI + Send + Sync + 'static + Clone,
740{
741 async fn execute_transaction(
742 &self,
743 request: ExecuteTransactionRequestV1,
744 client_addr: Option<std::net::SocketAddr>,
745 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
746 self.execute_transaction_v1(request, client_addr).await
747 }
748
749 fn simulate_transaction(
750 &self,
751 transaction: TransactionData,
752 ) -> Result<SimulateTransactionResult, IotaError> {
753 self.validator_state.simulate_transaction(transaction)
754 }
755}