1use std::{
10 collections::BTreeMap, net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration,
11};
12
13use futures::{
14 FutureExt,
15 future::{Either, Future, select},
16};
17use iota_common::sync::notify_read::NotifyRead;
18use iota_metrics::{
19 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
20 spawn_logged_monitored_task, spawn_monitored_task,
21};
22use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
23use iota_types::{
24 base_types::TransactionDigest,
25 error::{IotaError, IotaResult},
26 iota_system_state::IotaSystemState,
27 messages_checkpoint::CheckpointSequenceNumber,
28 quorum_driver_types::{
29 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
30 FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
31 QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
32 },
33 transaction::{TransactionData, VerifiedTransaction},
34 transaction_executor::{SimulateTransactionResult, VmChecks},
35};
36use prometheus::{
37 Histogram, Registry,
38 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
39 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
40 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
41 register_int_gauge_with_registry,
42};
43use tokio::{
44 sync::broadcast::{Receiver, error::RecvError},
45 task::JoinHandle,
46 time::timeout,
47};
48use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
49
50use crate::{
51 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
52 authority_aggregator::AuthorityAggregator,
53 authority_client::{AuthorityAPI, NetworkAuthorityClient},
54 quorum_driver::{
55 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
56 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
57 },
58};
59
60const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
63
64const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
65
66pub struct TransactionOrchestrator<A: Clone> {
70 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
71 validator_state: Arc<AuthorityState>,
72 _local_executor_handle: JoinHandle<()>,
73 pending_tx_log: Arc<WritePathPendingTransactionLog>,
74 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
75 metrics: Arc<TransactionOrchestratorMetrics>,
76}
77
78impl TransactionOrchestrator<NetworkAuthorityClient> {
79 pub fn new_with_auth_aggregator(
80 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
81 validator_state: Arc<AuthorityState>,
82 reconfig_channel: Receiver<IotaSystemState>,
83 parent_path: &Path,
84 prometheus_registry: &Registry,
85 ) -> Self {
86 let observer = OnsiteReconfigObserver::new(
87 reconfig_channel,
88 validator_state.get_object_cache_reader().clone(),
89 validator_state.clone_committee_store(),
90 validators.safe_client_metrics_base.clone(),
91 validators.metrics.deref().clone(),
92 );
93
94 TransactionOrchestrator::new(
95 validators,
96 validator_state,
97 parent_path,
98 prometheus_registry,
99 observer,
100 )
101 }
102}
103
104impl<A> TransactionOrchestrator<A>
105where
106 A: AuthorityAPI + Send + Sync + 'static + Clone,
107 OnsiteReconfigObserver: ReconfigObserver<A>,
108{
109 pub fn new(
110 validators: Arc<AuthorityAggregator<A>>,
111 validator_state: Arc<AuthorityState>,
112 parent_path: &Path,
113 prometheus_registry: &Registry,
114 reconfig_observer: OnsiteReconfigObserver,
115 ) -> Self {
116 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
117 let notifier = Arc::new(NotifyRead::new());
118 let reconfig_observer = Arc::new(reconfig_observer);
119 let quorum_driver_handler = Arc::new(
120 QuorumDriverHandlerBuilder::new(validators, metrics)
121 .with_notifier(notifier.clone())
122 .with_reconfig_observer(reconfig_observer)
123 .start(),
124 );
125
126 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
127 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
128 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
129 parent_path.join("fullnode_pending_transactions"),
130 ));
131 let pending_tx_log_clone = pending_tx_log.clone();
132 let _local_executor_handle = {
133 spawn_monitored_task!(async move {
134 Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
135 })
136 };
137 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
138 Self {
139 quorum_driver_handler,
140 validator_state,
141 _local_executor_handle,
142 pending_tx_log,
143 notifier,
144 metrics,
145 }
146 }
147}
148
149impl<A> TransactionOrchestrator<A>
150where
151 A: AuthorityAPI + Send + Sync + 'static + Clone,
152{
153 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
154 fields(
155 tx_digest = ?request.transaction.digest(),
156 tx_type = ?request_type,
157 ),
158 err)]
159 pub async fn execute_transaction_block(
160 &self,
161 request: ExecuteTransactionRequestV1,
162 request_type: ExecuteTransactionRequestType,
163 client_addr: Option<SocketAddr>,
164 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
165 {
166 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
167
168 let (transaction, response) = self
169 .execute_transaction_impl(&epoch_store, request, client_addr)
170 .await?;
171
172 let executed_locally = if matches!(
173 request_type,
174 ExecuteTransactionRequestType::WaitForLocalExecution
175 ) {
176 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
177 &self.validator_state,
178 &transaction,
179 &self.metrics,
180 )
181 .await
182 .is_ok();
183 add_server_timing("local_execution");
184 executed_locally
185 } else {
186 false
187 };
188
189 let QuorumDriverResponse {
190 effects_cert,
191 events,
192 input_objects,
193 output_objects,
194 auxiliary_data,
195 } = response;
196
197 let response = ExecuteTransactionResponseV1 {
198 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
199 events,
200 input_objects,
201 output_objects,
202 auxiliary_data,
203 };
204
205 Ok((response, executed_locally))
206 }
207
208 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
211 fields(tx_digest = ?request.transaction.digest()))]
212 pub async fn execute_transaction_v1(
213 &self,
214 request: ExecuteTransactionRequestV1,
215 client_addr: Option<SocketAddr>,
216 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
217 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
218
219 let QuorumDriverResponse {
220 effects_cert,
221 events,
222 input_objects,
223 output_objects,
224 auxiliary_data,
225 } = self
226 .execute_transaction_impl(&epoch_store, request, client_addr)
227 .await
228 .map(|(_, r)| r)?;
229
230 Ok(ExecuteTransactionResponseV1 {
231 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
232 events,
233 input_objects,
234 output_objects,
235 auxiliary_data,
236 })
237 }
238
239 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
243 pub async fn execute_transaction_impl(
244 &self,
245 epoch_store: &AuthorityPerEpochStore,
246 request: ExecuteTransactionRequestV1,
247 client_addr: Option<SocketAddr>,
248 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
249 let transaction = epoch_store
250 .verify_transaction(request.transaction.clone())
251 .map_err(QuorumDriverError::InvalidUserSignature)?;
252 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
253 let tx_digest = *transaction.digest();
254 debug!(?tx_digest, "TO Received transaction execution request.");
255
256 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
257 (
258 self.metrics.request_latency_shared_obj.start_timer(),
259 self.metrics
260 .wait_for_finality_latency_shared_obj
261 .start_timer(),
262 )
263 } else {
264 (
265 self.metrics.request_latency_single_writer.start_timer(),
266 self.metrics
267 .wait_for_finality_latency_single_writer
268 .start_timer(),
269 )
270 };
271
272 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
274 wait_for_finality_gauge.inc();
275 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
276 in_flight.dec();
277 });
278
279 let ticket = self
280 .submit(transaction.clone(), request, client_addr)
281 .await
282 .map_err(|e| {
283 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
284 QuorumDriverError::QuorumDriverInternal(e)
285 })?;
286
287 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
288 debug!(?tx_digest, "Timeout waiting for transaction finality.");
289 self.metrics.wait_for_finality_timeout.inc();
290 return Err(QuorumDriverError::TimeoutBeforeFinality);
291 };
292 add_server_timing("wait_for_finality");
293
294 drop(_txn_finality_timer);
295 drop(_wait_for_finality_gauge);
296 self.metrics.wait_for_finality_finished.inc();
297
298 match result {
299 Err(err) => {
300 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
301 Err(QuorumDriverError::QuorumDriverInternal(err))
302 }
303 Ok(Err(err)) => Err(err),
304 Ok(Ok(response)) => {
305 good_response_metrics.inc();
306 Ok((transaction, response))
307 }
308 }
309 }
310
311 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
314 async fn submit(
315 &self,
316 transaction: VerifiedTransaction,
317 request: ExecuteTransactionRequestV1,
318 client_addr: Option<SocketAddr>,
319 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
320 let tx_digest = *transaction.digest();
321 let ticket = self.notifier.register_one(&tx_digest);
322 if self
325 .pending_tx_log
326 .write_pending_transaction_maybe(&transaction)
327 .await?
328 {
329 debug!(?tx_digest, "no pending request in flight, submitting.");
330 self.quorum_driver()
331 .submit_transaction_no_ticket(request.clone(), client_addr)
332 .await?;
333 }
334 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
340 let qd = self.clone_quorum_driver();
341 Ok(async move {
342 let digests = [tx_digest];
343 let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
344 let res = match select(ticket, effects_await.boxed()).await {
346 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
347 Either::Right((_, unfinished_quorum_driver_task)) => {
348 debug!(
349 ?tx_digest,
350 "Effects are available in DB, use quorum driver to get a certificate"
351 );
352 qd.submit_transaction_no_ticket(request, client_addr)
353 .await?;
354 Ok(unfinished_quorum_driver_task.await)
355 }
356 };
357 res
358 })
359 }
360
361 #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
362 async fn wait_for_finalized_tx_executed_locally_with_timeout(
363 validator_state: &Arc<AuthorityState>,
364 transaction: &VerifiedTransaction,
365 metrics: &TransactionOrchestratorMetrics,
366 ) -> IotaResult {
367 let tx_digest = *transaction.digest();
368 metrics.local_execution_in_flight.inc();
369 let _metrics_guard =
370 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
371 in_flight.dec();
372 });
373
374 let _guard = if transaction.contains_shared_object() {
375 metrics.local_execution_latency_shared_obj.start_timer()
376 } else {
377 metrics.local_execution_latency_single_writer.start_timer()
378 };
379 debug!(
380 ?tx_digest,
381 "Waiting for finalized tx to be executed locally."
382 );
383 match timeout(
384 LOCAL_EXECUTION_TIMEOUT,
385 validator_state
386 .get_transaction_cache_reader()
387 .try_notify_read_executed_effects_digests(&[tx_digest]),
388 )
389 .instrument(trace_span!("local_execution"))
390 .await
391 {
392 Err(_elapsed) => {
393 debug!(
394 ?tx_digest,
395 "Waiting for finalized tx to be executed locally timed out within {:?}.",
396 LOCAL_EXECUTION_TIMEOUT
397 );
398 metrics.local_execution_timeout.inc();
399 Err(IotaError::Timeout)
400 }
401 Ok(Err(err)) => {
402 debug!(
403 ?tx_digest,
404 "Waiting for finalized tx to be executed locally failed with error: {:?}", err
405 );
406 metrics.local_execution_failure.inc();
407 Err(IotaError::TransactionOrchestratorLocalExecution {
408 error: err.to_string(),
409 })
410 }
411 Ok(Ok(_)) => {
412 metrics.local_execution_success.inc();
413 Ok(())
414 }
415 }
416 }
417
418 async fn loop_pending_transaction_log(
420 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
421 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
422 ) {
423 loop {
424 match effects_receiver.recv().await {
425 Ok(Ok((transaction, ..))) => {
426 let tx_digest = transaction.digest();
427 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
428 error!(
429 ?tx_digest,
430 "Failed to finish transaction in pending transaction log: {err}"
431 );
432 }
433 }
434 Ok(Err((tx_digest, _err))) => {
435 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
436 error!(
437 ?tx_digest,
438 "Failed to finish transaction in pending transaction log: {err}"
439 );
440 }
441 }
442 Err(RecvError::Closed) => {
443 error!("Sender of effects subscriber queue has been dropped!");
444 return;
445 }
446 Err(RecvError::Lagged(skipped_count)) => {
447 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
448 }
449 }
450 }
451 }
452
453 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
454 &self.quorum_driver_handler
455 }
456
457 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
458 self.quorum_driver_handler.clone()
459 }
460
461 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
462 self.quorum_driver().authority_aggregator().load_full()
463 }
464
465 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
466 self.quorum_driver_handler.subscribe_to_effects()
467 }
468
469 fn update_metrics(
470 &'_ self,
471 transaction: &VerifiedTransaction,
472 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
473 let (in_flight, good_response) = if transaction.contains_shared_object() {
474 self.metrics.total_req_received_shared_object.inc();
475 (
476 self.metrics.req_in_flight_shared_object.clone(),
477 &self.metrics.good_response_shared_object,
478 )
479 } else {
480 self.metrics.total_req_received_single_writer.inc();
481 (
482 self.metrics.req_in_flight_single_writer.clone(),
483 &self.metrics.good_response_single_writer,
484 )
485 };
486 in_flight.inc();
487 (
488 scopeguard::guard(in_flight, |in_flight| {
489 in_flight.dec();
490 }),
491 good_response,
492 )
493 }
494
495 fn schedule_txes_in_log(
496 pending_tx_log: Arc<WritePathPendingTransactionLog>,
497 quorum_driver: Arc<QuorumDriverHandler<A>>,
498 ) {
499 spawn_logged_monitored_task!(async move {
500 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
501 info!("Skipping loading pending transactions from pending_tx_log.");
502 return;
503 }
504 let pending_txes = pending_tx_log
505 .load_all_pending_transactions()
506 .expect("failed to load all pending transactions");
507 info!(
508 "Recovering {} pending transactions from pending_tx_log.",
509 pending_txes.len()
510 );
511 for (i, tx) in pending_txes.into_iter().enumerate() {
512 let tx = tx.into_inner();
515 let tx_digest = *tx.digest();
516 if let Err(err) = quorum_driver
519 .submit_transaction_no_ticket(
520 ExecuteTransactionRequestV1 {
521 transaction: tx,
522 include_events: true,
523 include_input_objects: false,
524 include_output_objects: false,
525 include_auxiliary_data: false,
526 },
527 None,
528 )
529 .await
530 {
531 warn!(
532 ?tx_digest,
533 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
534 );
535 } else {
536 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
537 if (i + 1) % 1000 == 0 {
538 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
539 }
540 }
541 }
542 });
546 }
547
548 pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
549 self.pending_tx_log.load_all_pending_transactions()
550 }
551}
552
553#[derive(Clone)]
555pub struct TransactionOrchestratorMetrics {
556 total_req_received_single_writer: GenericCounter<AtomicU64>,
557 total_req_received_shared_object: GenericCounter<AtomicU64>,
558
559 good_response_single_writer: GenericCounter<AtomicU64>,
560 good_response_shared_object: GenericCounter<AtomicU64>,
561
562 req_in_flight_single_writer: GenericGauge<AtomicI64>,
563 req_in_flight_shared_object: GenericGauge<AtomicI64>,
564
565 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
566 wait_for_finality_finished: GenericCounter<AtomicU64>,
567 wait_for_finality_timeout: GenericCounter<AtomicU64>,
568
569 local_execution_in_flight: GenericGauge<AtomicI64>,
570 local_execution_success: GenericCounter<AtomicU64>,
571 local_execution_timeout: GenericCounter<AtomicU64>,
572 local_execution_failure: GenericCounter<AtomicU64>,
573
574 request_latency_single_writer: Histogram,
575 request_latency_shared_obj: Histogram,
576 wait_for_finality_latency_single_writer: Histogram,
577 wait_for_finality_latency_shared_obj: Histogram,
578 local_execution_latency_single_writer: Histogram,
579 local_execution_latency_shared_obj: Histogram,
580}
581
582impl TransactionOrchestratorMetrics {
586 pub fn new(registry: &Registry) -> Self {
587 let total_req_received = register_int_counter_vec_with_registry!(
588 "tx_orchestrator_total_req_received",
589 "Total number of executions request Transaction Orchestrator receives, group by tx type",
590 &["tx_type"],
591 registry
592 )
593 .unwrap();
594
595 let total_req_received_single_writer =
596 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
597 let total_req_received_shared_object =
598 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
599
600 let good_response = register_int_counter_vec_with_registry!(
601 "tx_orchestrator_good_response",
602 "Total number of good responses Transaction Orchestrator generates, group by tx type",
603 &["tx_type"],
604 registry
605 )
606 .unwrap();
607
608 let good_response_single_writer =
609 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
610 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
611
612 let req_in_flight = register_int_gauge_vec_with_registry!(
613 "tx_orchestrator_req_in_flight",
614 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
615 &["tx_type"],
616 registry
617 )
618 .unwrap();
619
620 let req_in_flight_single_writer =
621 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
622 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
623
624 let request_latency = register_histogram_vec_with_registry!(
625 "tx_orchestrator_request_latency",
626 "Time spent in processing one Transaction Orchestrator request",
627 &["tx_type"],
628 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
629 registry,
630 )
631 .unwrap();
632 let wait_for_finality_latency = register_histogram_vec_with_registry!(
633 "tx_orchestrator_wait_for_finality_latency",
634 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
635 &["tx_type"],
636 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
637 registry,
638 )
639 .unwrap();
640 let local_execution_latency = register_histogram_vec_with_registry!(
641 "tx_orchestrator_local_execution_latency",
642 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
643 &["tx_type"],
644 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
645 registry,
646 )
647 .unwrap();
648
649 Self {
650 total_req_received_single_writer,
651 total_req_received_shared_object,
652 good_response_single_writer,
653 good_response_shared_object,
654 req_in_flight_single_writer,
655 req_in_flight_shared_object,
656 wait_for_finality_in_flight: register_int_gauge_with_registry!(
657 "tx_orchestrator_wait_for_finality_in_flight",
658 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
659 registry,
660 )
661 .unwrap(),
662 wait_for_finality_finished: register_int_counter_with_registry!(
663 "tx_orchestrator_wait_for_finality_finished",
664 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
665 registry,
666 )
667 .unwrap(),
668 wait_for_finality_timeout: register_int_counter_with_registry!(
669 "tx_orchestrator_wait_for_finality_timeout",
670 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
671 registry,
672 )
673 .unwrap(),
674 local_execution_in_flight: register_int_gauge_with_registry!(
675 "tx_orchestrator_local_execution_in_flight",
676 "Number of local execution txns in flights Transaction Orchestrator handles",
677 registry,
678 )
679 .unwrap(),
680 local_execution_success: register_int_counter_with_registry!(
681 "tx_orchestrator_local_execution_success",
682 "Total number of successful local execution txns Transaction Orchestrator handles",
683 registry,
684 )
685 .unwrap(),
686 local_execution_timeout: register_int_counter_with_registry!(
687 "tx_orchestrator_local_execution_timeout",
688 "Total number of timed-out local execution txns Transaction Orchestrator handles",
689 registry,
690 )
691 .unwrap(),
692 local_execution_failure: register_int_counter_with_registry!(
693 "tx_orchestrator_local_execution_failure",
694 "Total number of failed local execution txns Transaction Orchestrator handles",
695 registry,
696 )
697 .unwrap(),
698 request_latency_single_writer: request_latency
699 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
700 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
701 wait_for_finality_latency_single_writer: wait_for_finality_latency
702 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
703 wait_for_finality_latency_shared_obj: wait_for_finality_latency
704 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
705 local_execution_latency_single_writer: local_execution_latency
706 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
707 local_execution_latency_shared_obj: local_execution_latency
708 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
709 }
710 }
711
712 pub fn new_for_tests() -> Self {
713 let registry = Registry::new();
714 Self::new(®istry)
715 }
716}
717
718#[async_trait::async_trait]
719impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
720where
721 A: AuthorityAPI + Send + Sync + 'static + Clone,
722{
723 async fn execute_transaction(
724 &self,
725 request: ExecuteTransactionRequestV1,
726 client_addr: Option<std::net::SocketAddr>,
727 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
728 self.execute_transaction_v1(request, client_addr).await
729 }
730
731 fn simulate_transaction(
732 &self,
733 transaction: TransactionData,
734 checks: VmChecks,
735 ) -> Result<SimulateTransactionResult, IotaError> {
736 self.validator_state
737 .simulate_transaction(transaction, checks)
738 }
739
740 async fn wait_for_checkpoint_inclusion(
747 &self,
748 digests: &[TransactionDigest],
749 timeout: Duration,
750 ) -> Result<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>, IotaError> {
751 self.validator_state
752 .wait_for_checkpoint_inclusion(digests, timeout)
753 .await
754 }
755}