1use std::{net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration};
10
11use futures::{
12 FutureExt,
13 future::{Either, Future, select},
14};
15use iota_common::sync::notify_read::NotifyRead;
16use iota_metrics::{
17 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
18 spawn_logged_monitored_task, spawn_monitored_task,
19};
20use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
21use iota_types::{
22 base_types::TransactionDigest,
23 error::{IotaError, IotaResult},
24 iota_system_state::IotaSystemState,
25 quorum_driver_types::{
26 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
27 FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
28 QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
29 },
30 transaction::{TransactionData, VerifiedTransaction},
31 transaction_executor::{SimulateTransactionResult, VmChecks},
32};
33use prometheus::{
34 Histogram, Registry,
35 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
36 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
37 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
38 register_int_gauge_with_registry,
39};
40use tokio::{
41 sync::broadcast::{Receiver, error::RecvError},
42 task::JoinHandle,
43 time::timeout,
44};
45use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
46
47use crate::{
48 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
49 authority_aggregator::AuthorityAggregator,
50 authority_client::{AuthorityAPI, NetworkAuthorityClient},
51 quorum_driver::{
52 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
53 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
54 },
55};
56
57const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
60
61const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
62
63pub struct TransactionOrchestrator<A: Clone> {
67 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
68 validator_state: Arc<AuthorityState>,
69 _local_executor_handle: JoinHandle<()>,
70 pending_tx_log: Arc<WritePathPendingTransactionLog>,
71 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
72 metrics: Arc<TransactionOrchestratorMetrics>,
73}
74
75impl TransactionOrchestrator<NetworkAuthorityClient> {
76 pub fn new_with_auth_aggregator(
77 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
78 validator_state: Arc<AuthorityState>,
79 reconfig_channel: Receiver<IotaSystemState>,
80 parent_path: &Path,
81 prometheus_registry: &Registry,
82 ) -> Self {
83 let observer = OnsiteReconfigObserver::new(
84 reconfig_channel,
85 validator_state.get_object_cache_reader().clone(),
86 validator_state.clone_committee_store(),
87 validators.safe_client_metrics_base.clone(),
88 validators.metrics.deref().clone(),
89 );
90
91 TransactionOrchestrator::new(
92 validators,
93 validator_state,
94 parent_path,
95 prometheus_registry,
96 observer,
97 )
98 }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103 A: AuthorityAPI + Send + Sync + 'static + Clone,
104 OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106 pub fn new(
107 validators: Arc<AuthorityAggregator<A>>,
108 validator_state: Arc<AuthorityState>,
109 parent_path: &Path,
110 prometheus_registry: &Registry,
111 reconfig_observer: OnsiteReconfigObserver,
112 ) -> Self {
113 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
114 let notifier = Arc::new(NotifyRead::new());
115 let reconfig_observer = Arc::new(reconfig_observer);
116 let quorum_driver_handler = Arc::new(
117 QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
118 .with_notifier(notifier.clone())
119 .with_reconfig_observer(reconfig_observer.clone())
120 .start(),
121 );
122
123 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
124 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
125 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
126 parent_path.join("fullnode_pending_transactions"),
127 ));
128 let pending_tx_log_clone = pending_tx_log.clone();
129 let _local_executor_handle = {
130 spawn_monitored_task!(async move {
131 Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
132 })
133 };
134 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
135 Self {
136 quorum_driver_handler,
137 validator_state,
138 _local_executor_handle,
139 pending_tx_log,
140 notifier,
141 metrics,
142 }
143 }
144}
145
146impl<A> TransactionOrchestrator<A>
147where
148 A: AuthorityAPI + Send + Sync + 'static + Clone,
149{
150 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
151 fields(
152 tx_digest = ?request.transaction.digest(),
153 tx_type = ?request_type,
154 ),
155 err)]
156 pub async fn execute_transaction_block(
157 &self,
158 request: ExecuteTransactionRequestV1,
159 request_type: ExecuteTransactionRequestType,
160 client_addr: Option<SocketAddr>,
161 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
162 {
163 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
164
165 let (transaction, response) = self
166 .execute_transaction_impl(&epoch_store, request, client_addr)
167 .await?;
168
169 let executed_locally = if matches!(
170 request_type,
171 ExecuteTransactionRequestType::WaitForLocalExecution
172 ) {
173 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
174 &self.validator_state,
175 &transaction,
176 &self.metrics,
177 )
178 .await
179 .is_ok();
180 add_server_timing("local_execution");
181 executed_locally
182 } else {
183 false
184 };
185
186 let QuorumDriverResponse {
187 effects_cert,
188 events,
189 input_objects,
190 output_objects,
191 auxiliary_data,
192 } = response;
193
194 let response = ExecuteTransactionResponseV1 {
195 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
196 events,
197 input_objects,
198 output_objects,
199 auxiliary_data,
200 };
201
202 Ok((response, executed_locally))
203 }
204
205 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
208 fields(tx_digest = ?request.transaction.digest()))]
209 pub async fn execute_transaction_v1(
210 &self,
211 request: ExecuteTransactionRequestV1,
212 client_addr: Option<SocketAddr>,
213 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
214 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
215
216 let QuorumDriverResponse {
217 effects_cert,
218 events,
219 input_objects,
220 output_objects,
221 auxiliary_data,
222 } = self
223 .execute_transaction_impl(&epoch_store, request, client_addr)
224 .await
225 .map(|(_, r)| r)?;
226
227 Ok(ExecuteTransactionResponseV1 {
228 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 })
234 }
235
236 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
240 pub async fn execute_transaction_impl(
241 &self,
242 epoch_store: &AuthorityPerEpochStore,
243 request: ExecuteTransactionRequestV1,
244 client_addr: Option<SocketAddr>,
245 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
246 let transaction = epoch_store
247 .verify_transaction(request.transaction.clone())
248 .map_err(QuorumDriverError::InvalidUserSignature)?;
249 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
250 let tx_digest = *transaction.digest();
251 debug!(?tx_digest, "TO Received transaction execution request.");
252
253 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
254 (
255 self.metrics.request_latency_shared_obj.start_timer(),
256 self.metrics
257 .wait_for_finality_latency_shared_obj
258 .start_timer(),
259 )
260 } else {
261 (
262 self.metrics.request_latency_single_writer.start_timer(),
263 self.metrics
264 .wait_for_finality_latency_single_writer
265 .start_timer(),
266 )
267 };
268
269 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
271 wait_for_finality_gauge.inc();
272 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
273 in_flight.dec();
274 });
275
276 let ticket = self
277 .submit(transaction.clone(), request, client_addr)
278 .await
279 .map_err(|e| {
280 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
281 QuorumDriverError::QuorumDriverInternal(e)
282 })?;
283
284 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
285 debug!(?tx_digest, "Timeout waiting for transaction finality.");
286 self.metrics.wait_for_finality_timeout.inc();
287 return Err(QuorumDriverError::TimeoutBeforeFinality);
288 };
289 add_server_timing("wait_for_finality");
290
291 drop(_txn_finality_timer);
292 drop(_wait_for_finality_gauge);
293 self.metrics.wait_for_finality_finished.inc();
294
295 match result {
296 Err(err) => {
297 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
298 Err(QuorumDriverError::QuorumDriverInternal(err))
299 }
300 Ok(Err(err)) => Err(err),
301 Ok(Ok(response)) => {
302 good_response_metrics.inc();
303 Ok((transaction, response))
304 }
305 }
306 }
307
308 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
311 async fn submit(
312 &self,
313 transaction: VerifiedTransaction,
314 request: ExecuteTransactionRequestV1,
315 client_addr: Option<SocketAddr>,
316 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
317 let tx_digest = *transaction.digest();
318 let ticket = self.notifier.register_one(&tx_digest);
319 if self
322 .pending_tx_log
323 .write_pending_transaction_maybe(&transaction)
324 .await?
325 {
326 debug!(?tx_digest, "no pending request in flight, submitting.");
327 self.quorum_driver()
328 .submit_transaction_no_ticket(request.clone(), client_addr)
329 .await?;
330 }
331 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
337 let qd = self.clone_quorum_driver();
338 Ok(async move {
339 let digests = [tx_digest];
340 let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
341 let res = match select(ticket, effects_await.boxed()).await {
343 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
344 Either::Right((_, unfinished_quorum_driver_task)) => {
345 debug!(
346 ?tx_digest,
347 "Effects are available in DB, use quorum driver to get a certificate"
348 );
349 qd.submit_transaction_no_ticket(request, client_addr)
350 .await?;
351 Ok(unfinished_quorum_driver_task.await)
352 }
353 };
354 res
355 })
356 }
357
358 #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
359 async fn wait_for_finalized_tx_executed_locally_with_timeout(
360 validator_state: &Arc<AuthorityState>,
361 transaction: &VerifiedTransaction,
362 metrics: &TransactionOrchestratorMetrics,
363 ) -> IotaResult {
364 let tx_digest = *transaction.digest();
365 metrics.local_execution_in_flight.inc();
366 let _metrics_guard =
367 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
368 in_flight.dec();
369 });
370
371 let _guard = if transaction.contains_shared_object() {
372 metrics.local_execution_latency_shared_obj.start_timer()
373 } else {
374 metrics.local_execution_latency_single_writer.start_timer()
375 };
376 debug!(
377 ?tx_digest,
378 "Waiting for finalized tx to be executed locally."
379 );
380 match timeout(
381 LOCAL_EXECUTION_TIMEOUT,
382 validator_state
383 .get_transaction_cache_reader()
384 .try_notify_read_executed_effects_digests(&[tx_digest]),
385 )
386 .instrument(trace_span!("local_execution"))
387 .await
388 {
389 Err(_elapsed) => {
390 debug!(
391 ?tx_digest,
392 "Waiting for finalized tx to be executed locally timed out within {:?}.",
393 LOCAL_EXECUTION_TIMEOUT
394 );
395 metrics.local_execution_timeout.inc();
396 Err(IotaError::Timeout)
397 }
398 Ok(Err(err)) => {
399 debug!(
400 ?tx_digest,
401 "Waiting for finalized tx to be executed locally failed with error: {:?}", err
402 );
403 metrics.local_execution_failure.inc();
404 Err(IotaError::TransactionOrchestratorLocalExecution {
405 error: err.to_string(),
406 })
407 }
408 Ok(Ok(_)) => {
409 metrics.local_execution_success.inc();
410 Ok(())
411 }
412 }
413 }
414
415 async fn loop_pending_transaction_log(
417 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
418 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
419 ) {
420 loop {
421 match effects_receiver.recv().await {
422 Ok(Ok((transaction, ..))) => {
423 let tx_digest = transaction.digest();
424 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
425 error!(
426 ?tx_digest,
427 "Failed to finish transaction in pending transaction log: {err}"
428 );
429 }
430 }
431 Ok(Err((tx_digest, _err))) => {
432 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
433 error!(
434 ?tx_digest,
435 "Failed to finish transaction in pending transaction log: {err}"
436 );
437 }
438 }
439 Err(RecvError::Closed) => {
440 error!("Sender of effects subscriber queue has been dropped!");
441 return;
442 }
443 Err(RecvError::Lagged(skipped_count)) => {
444 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
445 }
446 }
447 }
448 }
449
450 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
451 &self.quorum_driver_handler
452 }
453
454 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
455 self.quorum_driver_handler.clone()
456 }
457
458 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
459 self.quorum_driver().authority_aggregator().load_full()
460 }
461
462 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
463 self.quorum_driver_handler.subscribe_to_effects()
464 }
465
466 fn update_metrics(
467 &'_ self,
468 transaction: &VerifiedTransaction,
469 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
470 let (in_flight, good_response) = if transaction.contains_shared_object() {
471 self.metrics.total_req_received_shared_object.inc();
472 (
473 self.metrics.req_in_flight_shared_object.clone(),
474 &self.metrics.good_response_shared_object,
475 )
476 } else {
477 self.metrics.total_req_received_single_writer.inc();
478 (
479 self.metrics.req_in_flight_single_writer.clone(),
480 &self.metrics.good_response_single_writer,
481 )
482 };
483 in_flight.inc();
484 (
485 scopeguard::guard(in_flight, |in_flight| {
486 in_flight.dec();
487 }),
488 good_response,
489 )
490 }
491
492 fn schedule_txes_in_log(
493 pending_tx_log: Arc<WritePathPendingTransactionLog>,
494 quorum_driver: Arc<QuorumDriverHandler<A>>,
495 ) {
496 spawn_logged_monitored_task!(async move {
497 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
498 info!("Skipping loading pending transactions from pending_tx_log.");
499 return;
500 }
501 let pending_txes = pending_tx_log.load_all_pending_transactions();
502 info!(
503 "Recovering {} pending transactions from pending_tx_log.",
504 pending_txes.len()
505 );
506 for (i, tx) in pending_txes.into_iter().enumerate() {
507 let tx = tx.into_inner();
510 let tx_digest = *tx.digest();
511 if let Err(err) = quorum_driver
514 .submit_transaction_no_ticket(
515 ExecuteTransactionRequestV1 {
516 transaction: tx,
517 include_events: true,
518 include_input_objects: false,
519 include_output_objects: false,
520 include_auxiliary_data: false,
521 },
522 None,
523 )
524 .await
525 {
526 warn!(
527 ?tx_digest,
528 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
529 );
530 } else {
531 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
532 if (i + 1) % 1000 == 0 {
533 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
534 }
535 }
536 }
537 });
541 }
542
543 pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
544 self.pending_tx_log.load_all_pending_transactions()
545 }
546}
547
548#[derive(Clone)]
550pub struct TransactionOrchestratorMetrics {
551 total_req_received_single_writer: GenericCounter<AtomicU64>,
552 total_req_received_shared_object: GenericCounter<AtomicU64>,
553
554 good_response_single_writer: GenericCounter<AtomicU64>,
555 good_response_shared_object: GenericCounter<AtomicU64>,
556
557 req_in_flight_single_writer: GenericGauge<AtomicI64>,
558 req_in_flight_shared_object: GenericGauge<AtomicI64>,
559
560 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
561 wait_for_finality_finished: GenericCounter<AtomicU64>,
562 wait_for_finality_timeout: GenericCounter<AtomicU64>,
563
564 local_execution_in_flight: GenericGauge<AtomicI64>,
565 local_execution_success: GenericCounter<AtomicU64>,
566 local_execution_timeout: GenericCounter<AtomicU64>,
567 local_execution_failure: GenericCounter<AtomicU64>,
568
569 request_latency_single_writer: Histogram,
570 request_latency_shared_obj: Histogram,
571 wait_for_finality_latency_single_writer: Histogram,
572 wait_for_finality_latency_shared_obj: Histogram,
573 local_execution_latency_single_writer: Histogram,
574 local_execution_latency_shared_obj: Histogram,
575}
576
577impl TransactionOrchestratorMetrics {
581 pub fn new(registry: &Registry) -> Self {
582 let total_req_received = register_int_counter_vec_with_registry!(
583 "tx_orchestrator_total_req_received",
584 "Total number of executions request Transaction Orchestrator receives, group by tx type",
585 &["tx_type"],
586 registry
587 )
588 .unwrap();
589
590 let total_req_received_single_writer =
591 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
592 let total_req_received_shared_object =
593 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
594
595 let good_response = register_int_counter_vec_with_registry!(
596 "tx_orchestrator_good_response",
597 "Total number of good responses Transaction Orchestrator generates, group by tx type",
598 &["tx_type"],
599 registry
600 )
601 .unwrap();
602
603 let good_response_single_writer =
604 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
605 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
606
607 let req_in_flight = register_int_gauge_vec_with_registry!(
608 "tx_orchestrator_req_in_flight",
609 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
610 &["tx_type"],
611 registry
612 )
613 .unwrap();
614
615 let req_in_flight_single_writer =
616 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
617 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
618
619 let request_latency = register_histogram_vec_with_registry!(
620 "tx_orchestrator_request_latency",
621 "Time spent in processing one Transaction Orchestrator request",
622 &["tx_type"],
623 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
624 registry,
625 )
626 .unwrap();
627 let wait_for_finality_latency = register_histogram_vec_with_registry!(
628 "tx_orchestrator_wait_for_finality_latency",
629 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
630 &["tx_type"],
631 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
632 registry,
633 )
634 .unwrap();
635 let local_execution_latency = register_histogram_vec_with_registry!(
636 "tx_orchestrator_local_execution_latency",
637 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
638 &["tx_type"],
639 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
640 registry,
641 )
642 .unwrap();
643
644 Self {
645 total_req_received_single_writer,
646 total_req_received_shared_object,
647 good_response_single_writer,
648 good_response_shared_object,
649 req_in_flight_single_writer,
650 req_in_flight_shared_object,
651 wait_for_finality_in_flight: register_int_gauge_with_registry!(
652 "tx_orchestrator_wait_for_finality_in_flight",
653 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
654 registry,
655 )
656 .unwrap(),
657 wait_for_finality_finished: register_int_counter_with_registry!(
658 "tx_orchestrator_wait_for_finality_finished",
659 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
660 registry,
661 )
662 .unwrap(),
663 wait_for_finality_timeout: register_int_counter_with_registry!(
664 "tx_orchestrator_wait_for_finality_timeout",
665 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
666 registry,
667 )
668 .unwrap(),
669 local_execution_in_flight: register_int_gauge_with_registry!(
670 "tx_orchestrator_local_execution_in_flight",
671 "Number of local execution txns in flights Transaction Orchestrator handles",
672 registry,
673 )
674 .unwrap(),
675 local_execution_success: register_int_counter_with_registry!(
676 "tx_orchestrator_local_execution_success",
677 "Total number of successful local execution txns Transaction Orchestrator handles",
678 registry,
679 )
680 .unwrap(),
681 local_execution_timeout: register_int_counter_with_registry!(
682 "tx_orchestrator_local_execution_timeout",
683 "Total number of timed-out local execution txns Transaction Orchestrator handles",
684 registry,
685 )
686 .unwrap(),
687 local_execution_failure: register_int_counter_with_registry!(
688 "tx_orchestrator_local_execution_failure",
689 "Total number of failed local execution txns Transaction Orchestrator handles",
690 registry,
691 )
692 .unwrap(),
693 request_latency_single_writer: request_latency
694 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
695 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
696 wait_for_finality_latency_single_writer: wait_for_finality_latency
697 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
698 wait_for_finality_latency_shared_obj: wait_for_finality_latency
699 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
700 local_execution_latency_single_writer: local_execution_latency
701 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
702 local_execution_latency_shared_obj: local_execution_latency
703 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
704 }
705 }
706
707 pub fn new_for_tests() -> Self {
708 let registry = Registry::new();
709 Self::new(®istry)
710 }
711}
712
713#[async_trait::async_trait]
714impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
715where
716 A: AuthorityAPI + Send + Sync + 'static + Clone,
717{
718 async fn execute_transaction(
719 &self,
720 request: ExecuteTransactionRequestV1,
721 client_addr: Option<std::net::SocketAddr>,
722 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
723 self.execute_transaction_v1(request, client_addr).await
724 }
725
726 fn simulate_transaction(
727 &self,
728 transaction: TransactionData,
729 checks: VmChecks,
730 ) -> Result<SimulateTransactionResult, IotaError> {
731 self.validator_state
732 .simulate_transaction(transaction, checks)
733 }
734}