1use std::{
10 collections::BTreeMap, net::SocketAddr, ops::Deref, path::Path, sync::Arc, time::Duration,
11};
12
13use futures::{
14 FutureExt,
15 future::{Either, Future, select},
16};
17use iota_common::sync::notify_read::NotifyRead;
18use iota_config::NodeConfig;
19use iota_metrics::{
20 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, add_server_timing,
21 spawn_logged_monitored_task, spawn_monitored_task,
22};
23use iota_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use iota_types::{
25 base_types::TransactionDigest,
26 error::{IotaError, IotaResult},
27 iota_system_state::IotaSystemState,
28 messages_checkpoint::CheckpointSequenceNumber,
29 quorum_driver_types::{
30 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV1,
31 ExecuteTransactionResponseV1, FinalizedEffects, IsTransactionExecutedLocally,
32 QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResponse,
33 QuorumDriverResult,
34 },
35 transaction::{TransactionData, VerifiedTransaction},
36 transaction_driver_types::{
37 EffectsFinalityInfo as TdEffectsFinalityInfo, FinalizedEffects as TdFinalizedEffects,
38 },
39 transaction_executor::{SimulateTransactionResult, VmChecks},
40};
41use prometheus::{
42 Histogram, Registry,
43 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
44 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
45 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
46 register_int_gauge_with_registry,
47};
48use tokio::{
49 sync::broadcast::{Receiver, error::RecvError},
50 task::JoinHandle,
51 time::timeout,
52};
53use tracing::{Instrument, debug, error, info, instrument, trace_span, warn};
54
55use crate::{
56 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
57 authority_aggregator::AuthorityAggregator,
58 authority_client::{AuthorityAPI, NetworkAuthorityClient},
59 quorum_driver::{
60 QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics,
61 reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver},
62 },
63 transaction_driver::{
64 AggregatedRequestErrors, QuorumTransactionResponse, SubmitTransactionOptions,
65 TransactionDriver, TransactionDriverError, TransactionDriverMetrics,
66 reconfig_observer::OnsiteReconfigObserver as TdOnsiteReconfigObserver,
67 },
68 validator_client_monitor::ValidatorClientMetrics,
69};
70
71const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
74
75const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
76
77pub struct TransactionOrchestrator<A: Clone> {
82 quorum_driver_handler: Option<Arc<QuorumDriverHandler<A>>>,
85 transaction_driver: Option<Arc<TransactionDriver<A>>>,
87 validator_state: Arc<AuthorityState>,
88 _local_executor_handle: Option<JoinHandle<()>>,
89 pending_tx_log: Arc<WritePathPendingTransactionLog>,
90 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
91 metrics: Arc<TransactionOrchestratorMetrics>,
92}
93
94impl TransactionOrchestrator<NetworkAuthorityClient> {
95 pub fn new_with_auth_aggregator(
96 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
97 validator_state: Arc<AuthorityState>,
98 reconfig_channel: Receiver<IotaSystemState>,
99 parent_path: &Path,
100 prometheus_registry: &Registry,
101 node_config: Option<&NodeConfig>,
102 ) -> Self {
103 let epoch_store = validator_state.load_epoch_store_one_call_per_task();
105 let use_transaction_driver = epoch_store.protocol_config().enable_white_flag_flow();
106
107 let td_reconfig_observer = if use_transaction_driver {
109 Some(TdOnsiteReconfigObserver::new(
110 reconfig_channel.resubscribe(),
111 validator_state.get_object_cache_reader().clone(),
112 validator_state.clone_committee_store(),
113 validators.safe_client_metrics_base.clone(),
114 ))
115 } else {
116 None
117 };
118
119 let qd_reconfig_observer = if !use_transaction_driver {
121 Some(OnsiteReconfigObserver::new(
122 reconfig_channel.resubscribe(),
123 validator_state.get_object_cache_reader().clone(),
124 validator_state.clone_committee_store(),
125 validators.safe_client_metrics_base.clone(),
126 validators.metrics.deref().clone(),
127 ))
128 } else {
129 None
130 };
131
132 TransactionOrchestrator::new(
133 validators,
134 validator_state,
135 parent_path,
136 prometheus_registry,
137 qd_reconfig_observer,
138 td_reconfig_observer,
139 node_config,
140 )
141 }
142}
143
144impl<A> TransactionOrchestrator<A>
145where
146 A: AuthorityAPI + Send + Sync + 'static + Clone,
147 OnsiteReconfigObserver: ReconfigObserver<A>,
148 TdOnsiteReconfigObserver: crate::transaction_driver::reconfig_observer::ReconfigObserver<A>,
149{
150 pub fn new(
151 validators: Arc<AuthorityAggregator<A>>,
152 validator_state: Arc<AuthorityState>,
153 parent_path: &Path,
154 prometheus_registry: &Registry,
155 reconfig_observer: Option<OnsiteReconfigObserver>,
156 td_reconfig_observer: Option<TdOnsiteReconfigObserver>,
157 node_config: Option<&NodeConfig>,
158 ) -> Self {
159 let epoch_store = validator_state.load_epoch_store_one_call_per_task();
161 let use_transaction_driver = epoch_store.protocol_config().enable_white_flag_flow();
162
163 let qd_metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
164 let notifier = Arc::new(NotifyRead::new());
165
166 let (quorum_driver_handler, effects_receiver) = if !use_transaction_driver {
168 let reconfig_observer = Arc::new(
169 reconfig_observer
170 .expect("QuorumDriver reconfig observer required when white flag is disabled"),
171 );
172 let handler = Arc::new(
173 QuorumDriverHandlerBuilder::new(validators.clone(), qd_metrics)
174 .with_notifier(notifier.clone())
175 .with_reconfig_observer(reconfig_observer)
176 .start(),
177 );
178 let receiver = handler.subscribe_to_effects();
179 (Some(handler), Some(receiver))
180 } else {
181 (None, None)
182 };
183
184 let transaction_driver = if use_transaction_driver {
186 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
187 let client_metrics = Arc::new(ValidatorClientMetrics::new(prometheus_registry));
188 let observer = td_reconfig_observer
189 .expect("TransactionDriver reconfig observer required when white flag is enabled");
190 Some(TransactionDriver::new(
191 validators,
192 Arc::new(observer),
193 td_metrics,
194 node_config,
195 client_metrics,
196 ))
197 } else {
198 None
199 };
200
201 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
202 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
203 parent_path.join("fullnode_pending_transactions"),
204 ));
205
206 let _local_executor_handle =
208 if let (Some(handler), Some(receiver)) = (&quorum_driver_handler, effects_receiver) {
209 let pending_tx_log_clone = pending_tx_log.clone();
210 let res = Some(spawn_monitored_task!(async move {
211 Self::loop_pending_transaction_log(receiver, pending_tx_log_clone).await;
212 }));
213
214 Self::schedule_txes_in_log(pending_tx_log.clone(), handler.clone());
217
218 res
219 } else {
220 None
223 };
224
225 Self {
226 quorum_driver_handler,
227 transaction_driver,
228 validator_state,
229 _local_executor_handle,
230 pending_tx_log,
231 notifier,
232 metrics,
233 }
234 }
235}
236
237impl<A> TransactionOrchestrator<A>
238where
239 A: AuthorityAPI + Send + Sync + 'static + Clone,
240{
241 #[instrument(name = "tx_orchestrator_execute_transaction_block", level = "trace", skip_all,
242 fields(
243 tx_digest = ?request.transaction.digest(),
244 tx_type = ?request_type,
245 ),
246 err)]
247 pub async fn execute_transaction_block(
248 &self,
249 request: ExecuteTransactionRequestV1,
250 request_type: ExecuteTransactionRequestType,
251 client_addr: Option<SocketAddr>,
252 ) -> Result<(ExecuteTransactionResponseV1, IsTransactionExecutedLocally), QuorumDriverError>
253 {
254 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
255
256 let (transaction, response) = if let Some(td) = &self.transaction_driver {
258 self.submit_with_transaction_driver(td.clone(), &epoch_store, request, client_addr)
259 .await?
260 } else {
261 let (tx, qd_resp) = self
262 .execute_transaction_impl(&epoch_store, request, client_addr)
263 .await?;
264 let resp = quorum_driver_response_to_v1(qd_resp);
265 (tx, resp)
266 };
267
268 let executed_locally = if matches!(
269 request_type,
270 ExecuteTransactionRequestType::WaitForLocalExecution
271 ) {
272 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
273 &self.validator_state,
274 &transaction,
275 &self.metrics,
276 )
277 .await
278 .is_ok();
279 add_server_timing("local_execution");
280 executed_locally
281 } else {
282 false
283 };
284
285 Ok((response, executed_locally))
286 }
287
288 #[instrument(name = "tx_orchestrator_execute_transaction_v1", level = "trace", skip_all,
291 fields(tx_digest = ?request.transaction.digest()))]
292 pub async fn execute_transaction_v1(
293 &self,
294 request: ExecuteTransactionRequestV1,
295 client_addr: Option<SocketAddr>,
296 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
297 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
298
299 if let Some(td) = &self.transaction_driver {
300 let (_, response) = self
301 .submit_with_transaction_driver(td.clone(), &epoch_store, request, client_addr)
302 .await?;
303 return Ok(response);
304 }
305
306 let qd_resp = self
307 .execute_transaction_impl(&epoch_store, request, client_addr)
308 .await
309 .map(|(_, r)| r)?;
310
311 Ok(quorum_driver_response_to_v1(qd_resp))
312 }
313
314 #[instrument(name = "tx_orchestrator_submit_with_td", level = "trace", skip_all,
316 fields(tx_digest = ?request.transaction.digest()))]
317 async fn submit_with_transaction_driver(
318 &self,
319 td: Arc<TransactionDriver<A>>,
320 epoch_store: &AuthorityPerEpochStore,
321 request: ExecuteTransactionRequestV1,
322 client_addr: Option<SocketAddr>,
323 ) -> Result<(VerifiedTransaction, ExecuteTransactionResponseV1), QuorumDriverError> {
324 let transaction = epoch_store
325 .verify_transaction(request.transaction.clone())
326 .map_err(QuorumDriverError::InvalidUserSignature)?;
327 let tx_digest = *transaction.digest();
328
329 let td_response = td
342 .drive_transaction(
343 Some(request.transaction.clone()),
344 SubmitTransactionOptions {
345 forwarded_client_addr: client_addr,
346 ..Default::default()
347 },
348 Some(WAIT_FOR_FINALITY_TIMEOUT),
349 )
350 .await
351 .map_err(map_td_error_to_qd)?;
352
353 debug!(
354 "TransactionOrchestrator: TransactionDriver submission succeeded for transaction {}",
355 tx_digest
356 );
357
358 let QuorumTransactionResponse {
359 effects: td_effects,
360 events,
361 input_objects,
362 output_objects,
363 auxiliary_data,
364 } = td_response;
365
366 let effects = convert_td_to_qd_effects(td_effects);
367 let response = ExecuteTransactionResponseV1 {
368 effects,
369 events: if request.include_events { events } else { None },
370 input_objects: if request.include_input_objects {
371 input_objects
372 } else {
373 None
374 },
375 output_objects: if request.include_output_objects {
376 output_objects
377 } else {
378 None
379 },
380 auxiliary_data: if request.include_auxiliary_data {
381 auxiliary_data
382 } else {
383 None
384 },
385 };
386
387 Ok((transaction, response))
388 }
389
390 #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.transaction.digest()))]
394 pub async fn execute_transaction_impl(
395 &self,
396 epoch_store: &AuthorityPerEpochStore,
397 request: ExecuteTransactionRequestV1,
398 client_addr: Option<SocketAddr>,
399 ) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
400 request
403 .transaction
404 .validity_check(epoch_store.protocol_config(), epoch_store.epoch())
405 .map_err(QuorumDriverError::InvalidTransaction)?;
406 let transaction = epoch_store
407 .verify_transaction(request.transaction.clone())
408 .map_err(QuorumDriverError::InvalidUserSignature)?;
409 let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
410 let tx_digest = *transaction.digest();
411 debug!(?tx_digest, "TO Received transaction execution request.");
412
413 let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
414 (
415 self.metrics.request_latency_shared_obj.start_timer(),
416 self.metrics
417 .wait_for_finality_latency_shared_obj
418 .start_timer(),
419 )
420 } else {
421 (
422 self.metrics.request_latency_single_writer.start_timer(),
423 self.metrics
424 .wait_for_finality_latency_single_writer
425 .start_timer(),
426 )
427 };
428
429 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
431 wait_for_finality_gauge.inc();
432 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
433 in_flight.dec();
434 });
435
436 let ticket = self
437 .submit(transaction.clone(), request, client_addr)
438 .await
439 .map_err(|e| {
440 warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
441 QuorumDriverError::QuorumDriverInternal(e)
442 })?;
443
444 let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
445 debug!(?tx_digest, "Timeout waiting for transaction finality.");
446 self.metrics.wait_for_finality_timeout.inc();
447 return Err(QuorumDriverError::TimeoutBeforeFinality);
448 };
449 add_server_timing("wait_for_finality");
450
451 drop(_txn_finality_timer);
452 drop(_wait_for_finality_gauge);
453 self.metrics.wait_for_finality_finished.inc();
454
455 match result {
456 Err(err) => {
457 warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
458 Err(QuorumDriverError::QuorumDriverInternal(err))
459 }
460 Ok(Err(err)) => Err(err),
461 Ok(Ok(response)) => {
462 good_response_metrics.inc();
463 Ok((transaction, response))
464 }
465 }
466 }
467
468 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
471 async fn submit(
472 &self,
473 transaction: VerifiedTransaction,
474 request: ExecuteTransactionRequestV1,
475 client_addr: Option<SocketAddr>,
476 ) -> IotaResult<impl Future<Output = IotaResult<QuorumDriverResult>> + '_> {
477 let tx_digest = *transaction.digest();
478 let ticket = self.notifier.register_one(&tx_digest);
479 if self
482 .pending_tx_log
483 .write_pending_transaction_maybe(&transaction)
484 .await?
485 {
486 debug!(?tx_digest, "no pending request in flight, submitting.");
487 self.quorum_driver()
488 .submit_transaction_no_ticket(request.clone(), client_addr)
489 .await?;
490 }
491 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
497 let qd = self.clone_quorum_driver();
498 Ok(async move {
499 let digests = [tx_digest];
500 let effects_await = cache_reader.try_notify_read_executed_effects(&digests);
501 let res = match select(ticket, effects_await.boxed()).await {
503 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
504 Either::Right((_, unfinished_quorum_driver_task)) => {
505 debug!(
506 ?tx_digest,
507 "Effects are available in DB, use quorum driver to get a certificate"
508 );
509 qd.submit_transaction_no_ticket(request, client_addr)
510 .await?;
511 Ok(unfinished_quorum_driver_task.await)
512 }
513 };
514 res
515 })
516 }
517
518 #[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
519 async fn wait_for_finalized_tx_executed_locally_with_timeout(
520 validator_state: &Arc<AuthorityState>,
521 transaction: &VerifiedTransaction,
522 metrics: &TransactionOrchestratorMetrics,
523 ) -> IotaResult {
524 let tx_digest = *transaction.digest();
525 metrics.local_execution_in_flight.inc();
526 let _metrics_guard =
527 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
528 in_flight.dec();
529 });
530
531 let _guard = if transaction.contains_shared_object() {
532 metrics.local_execution_latency_shared_obj.start_timer()
533 } else {
534 metrics.local_execution_latency_single_writer.start_timer()
535 };
536 debug!(
537 ?tx_digest,
538 "Waiting for finalized tx to be executed locally."
539 );
540 match timeout(
541 LOCAL_EXECUTION_TIMEOUT,
542 validator_state
543 .get_transaction_cache_reader()
544 .try_notify_read_executed_effects_digests(&[tx_digest]),
545 )
546 .instrument(trace_span!("local_execution"))
547 .await
548 {
549 Err(_elapsed) => {
550 debug!(
551 ?tx_digest,
552 "Waiting for finalized tx to be executed locally timed out within {:?}.",
553 LOCAL_EXECUTION_TIMEOUT
554 );
555 metrics.local_execution_timeout.inc();
556 Err(IotaError::Timeout)
557 }
558 Ok(Err(err)) => {
559 debug!(
560 ?tx_digest,
561 "Waiting for finalized tx to be executed locally failed with error: {:?}", err
562 );
563 metrics.local_execution_failure.inc();
564 Err(IotaError::TransactionOrchestratorLocalExecution {
565 error: err.to_string(),
566 })
567 }
568 Ok(Ok(_)) => {
569 metrics.local_execution_success.inc();
570 Ok(())
571 }
572 }
573 }
574
575 async fn loop_pending_transaction_log(
577 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
578 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
579 ) {
580 loop {
581 match effects_receiver.recv().await {
582 Ok(Ok((transaction, ..))) => {
583 let tx_digest = transaction.digest();
584 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
585 error!(
586 ?tx_digest,
587 "Failed to finish transaction in pending transaction log: {err}"
588 );
589 }
590 }
591 Ok(Err((tx_digest, _err))) => {
592 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
593 error!(
594 ?tx_digest,
595 "Failed to finish transaction in pending transaction log: {err}"
596 );
597 }
598 }
599 Err(RecvError::Closed) => {
600 error!("Sender of effects subscriber queue has been dropped!");
601 return;
602 }
603 Err(RecvError::Lagged(skipped_count)) => {
604 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
605 }
606 }
607 }
608 }
609
610 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
611 self.quorum_driver_handler
612 .as_ref()
613 .expect("QuorumDriverHandler is not initialized.")
614 }
615
616 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
617 self.quorum_driver_handler
618 .clone()
619 .expect("QuorumDriverHandler is not initialized.")
620 }
621
622 pub fn transaction_driver(&self) -> Option<&Arc<TransactionDriver<A>>> {
623 self.transaction_driver.as_ref()
624 }
625
626 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
627 self.quorum_driver().authority_aggregator().load_full()
628 }
629
630 pub fn subscribe_to_effects_queue(&self) -> Receiver<QuorumDriverEffectsQueueResult> {
631 if let Some(handler) = &self.quorum_driver_handler {
632 handler.subscribe_to_effects()
633 } else {
634 panic!("QuorumDriverHandler is not initialized, cannot subscribe to effects queue.");
635 }
636 }
637
638 fn update_metrics(
639 &'_ self,
640 transaction: &VerifiedTransaction,
641 ) -> (impl Drop, &'_ GenericCounter<AtomicU64>) {
642 let (in_flight, good_response) = if transaction.contains_shared_object() {
643 self.metrics.total_req_received_shared_object.inc();
644 (
645 self.metrics.req_in_flight_shared_object.clone(),
646 &self.metrics.good_response_shared_object,
647 )
648 } else {
649 self.metrics.total_req_received_single_writer.inc();
650 (
651 self.metrics.req_in_flight_single_writer.clone(),
652 &self.metrics.good_response_single_writer,
653 )
654 };
655 in_flight.inc();
656 (
657 scopeguard::guard(in_flight, |in_flight| {
658 in_flight.dec();
659 }),
660 good_response,
661 )
662 }
663
664 fn schedule_txes_in_log(
665 pending_tx_log: Arc<WritePathPendingTransactionLog>,
666 quorum_driver: Arc<QuorumDriverHandler<A>>,
667 ) {
668 spawn_logged_monitored_task!(async move {
669 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
670 info!("Skipping loading pending transactions from pending_tx_log.");
671 return;
672 }
673 let pending_txes = pending_tx_log
674 .load_all_pending_transactions()
675 .expect("failed to load all pending transactions");
676 info!(
677 "Recovering {} pending transactions from pending_tx_log.",
678 pending_txes.len()
679 );
680 for (i, tx) in pending_txes.into_iter().enumerate() {
681 let tx = tx.into_inner();
684 let tx_digest = *tx.digest();
685 if let Err(err) = quorum_driver
688 .submit_transaction_no_ticket(
689 ExecuteTransactionRequestV1 {
690 transaction: tx,
691 include_events: true,
692 include_input_objects: false,
693 include_output_objects: false,
694 include_auxiliary_data: false,
695 },
696 None,
697 )
698 .await
699 {
700 warn!(
701 ?tx_digest,
702 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
703 );
704 } else {
705 debug!(?tx_digest, "Enqueued transaction from pending_tx_log");
706 if (i + 1) % 1000 == 0 {
707 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
708 }
709 }
710 }
711 });
715 }
716
717 pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
718 self.pending_tx_log.load_all_pending_transactions()
719 }
720}
721
722fn quorum_driver_response_to_v1(response: QuorumDriverResponse) -> ExecuteTransactionResponseV1 {
726 let QuorumDriverResponse {
727 effects_cert,
728 events,
729 input_objects,
730 output_objects,
731 auxiliary_data,
732 } = response;
733 ExecuteTransactionResponseV1 {
734 effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
735 events,
736 input_objects,
737 output_objects,
738 auxiliary_data,
739 }
740}
741
742fn convert_td_to_qd_effects(td: TdFinalizedEffects) -> FinalizedEffects {
745 let finality_info = match td.finality_info {
746 TdEffectsFinalityInfo::Certified(sig) => EffectsFinalityInfo::Certified(sig),
747 TdEffectsFinalityInfo::Checkpointed(epoch, seq) => {
748 EffectsFinalityInfo::Checkpointed(epoch, seq)
749 }
750 TdEffectsFinalityInfo::QuorumExecuted(epoch) => EffectsFinalityInfo::QuorumExecuted(epoch),
751 };
752 FinalizedEffects {
753 effects: td.effects,
754 finality_info,
755 }
756}
757
758fn map_td_error_to_qd(e: TransactionDriverError) -> QuorumDriverError {
766 use TransactionDriverError::*;
767 match e {
768 ValidationFailed { error } => {
769 QuorumDriverError::InvalidUserSignature(IotaError::InvalidSignature { error })
770 }
771 TimeoutWithLastRetriableError { .. } => QuorumDriverError::TimeoutBeforeFinality,
772 RejectedByValidators {
773 submission_non_retriable_errors,
774 ..
775 } => {
776 let representative = submission_non_retriable_errors
781 .errors
782 .into_iter()
783 .next()
784 .map(|(msg, _, _, _)| msg)
785 .unwrap_or_else(|| "transaction rejected as invalid during submission".to_string());
786 QuorumDriverError::InvalidTransaction(IotaError::Unknown(format!(
787 "Transaction was rejected as invalid by more than 1/3 of validator stake \
788 during submission (non-retriable): {representative}"
789 )))
790 }
791 Aborted {
792 submission_retriable_errors,
793 submission_non_retriable_errors,
794 ..
795 } => {
796 let attempts = count_validator_attempts(&submission_retriable_errors)
801 + count_validator_attempts(&submission_non_retriable_errors);
802 QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts {
803 total_attempts: attempts,
804 }
805 }
806 other @ ForkedExecution { .. } => {
807 let msg = other.to_string();
811 error!("TransactionDriver observed forked execution: {msg}");
812 QuorumDriverError::QuorumDriverInternal(IotaError::Unknown(msg))
813 }
814 other @ ClientInternal { .. } => {
815 let msg = other.to_string();
816 warn!("TransactionDriver client-internal error: {msg}");
817 QuorumDriverError::QuorumDriverInternal(IotaError::Unknown(msg))
818 }
819 }
820}
821
822fn count_validator_attempts(errors: &AggregatedRequestErrors) -> u32 {
823 errors
824 .errors
825 .iter()
826 .map(|(_, authorities, _, _)| authorities.len() as u32)
827 .sum()
828}
829
830#[derive(Clone)]
832pub struct TransactionOrchestratorMetrics {
833 total_req_received_single_writer: GenericCounter<AtomicU64>,
834 total_req_received_shared_object: GenericCounter<AtomicU64>,
835
836 good_response_single_writer: GenericCounter<AtomicU64>,
837 good_response_shared_object: GenericCounter<AtomicU64>,
838
839 req_in_flight_single_writer: GenericGauge<AtomicI64>,
840 req_in_flight_shared_object: GenericGauge<AtomicI64>,
841
842 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
843 wait_for_finality_finished: GenericCounter<AtomicU64>,
844 wait_for_finality_timeout: GenericCounter<AtomicU64>,
845
846 local_execution_in_flight: GenericGauge<AtomicI64>,
847 local_execution_success: GenericCounter<AtomicU64>,
848 local_execution_timeout: GenericCounter<AtomicU64>,
849 local_execution_failure: GenericCounter<AtomicU64>,
850
851 request_latency_single_writer: Histogram,
852 request_latency_shared_obj: Histogram,
853 wait_for_finality_latency_single_writer: Histogram,
854 wait_for_finality_latency_shared_obj: Histogram,
855 local_execution_latency_single_writer: Histogram,
856 local_execution_latency_shared_obj: Histogram,
857}
858
859impl TransactionOrchestratorMetrics {
863 pub fn new(registry: &Registry) -> Self {
864 let total_req_received = register_int_counter_vec_with_registry!(
865 "tx_orchestrator_total_req_received",
866 "Total number of executions request Transaction Orchestrator receives, group by tx type",
867 &["tx_type"],
868 registry
869 )
870 .unwrap();
871
872 let total_req_received_single_writer =
873 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
874 let total_req_received_shared_object =
875 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
876
877 let good_response = register_int_counter_vec_with_registry!(
878 "tx_orchestrator_good_response",
879 "Total number of good responses Transaction Orchestrator generates, group by tx type",
880 &["tx_type"],
881 registry
882 )
883 .unwrap();
884
885 let good_response_single_writer =
886 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
887 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
888
889 let req_in_flight = register_int_gauge_vec_with_registry!(
890 "tx_orchestrator_req_in_flight",
891 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
892 &["tx_type"],
893 registry
894 )
895 .unwrap();
896
897 let req_in_flight_single_writer =
898 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
899 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
900
901 let request_latency = register_histogram_vec_with_registry!(
902 "tx_orchestrator_request_latency",
903 "Time spent in processing one Transaction Orchestrator request",
904 &["tx_type"],
905 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
906 registry,
907 )
908 .unwrap();
909 let wait_for_finality_latency = register_histogram_vec_with_registry!(
910 "tx_orchestrator_wait_for_finality_latency",
911 "Time spent in waiting for one Transaction Orchestrator request gets finalized",
912 &["tx_type"],
913 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
914 registry,
915 )
916 .unwrap();
917 let local_execution_latency = register_histogram_vec_with_registry!(
918 "tx_orchestrator_local_execution_latency",
919 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
920 &["tx_type"],
921 iota_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
922 registry,
923 )
924 .unwrap();
925
926 Self {
927 total_req_received_single_writer,
928 total_req_received_shared_object,
929 good_response_single_writer,
930 good_response_shared_object,
931 req_in_flight_single_writer,
932 req_in_flight_shared_object,
933 wait_for_finality_in_flight: register_int_gauge_with_registry!(
934 "tx_orchestrator_wait_for_finality_in_flight",
935 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
936 registry,
937 )
938 .unwrap(),
939 wait_for_finality_finished: register_int_counter_with_registry!(
940 "tx_orchestrator_wait_for_finality_finished",
941 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
942 registry,
943 )
944 .unwrap(),
945 wait_for_finality_timeout: register_int_counter_with_registry!(
946 "tx_orchestrator_wait_for_finality_timeout",
947 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
948 registry,
949 )
950 .unwrap(),
951 local_execution_in_flight: register_int_gauge_with_registry!(
952 "tx_orchestrator_local_execution_in_flight",
953 "Number of local execution txns in flights Transaction Orchestrator handles",
954 registry,
955 )
956 .unwrap(),
957 local_execution_success: register_int_counter_with_registry!(
958 "tx_orchestrator_local_execution_success",
959 "Total number of successful local execution txns Transaction Orchestrator handles",
960 registry,
961 )
962 .unwrap(),
963 local_execution_timeout: register_int_counter_with_registry!(
964 "tx_orchestrator_local_execution_timeout",
965 "Total number of timed-out local execution txns Transaction Orchestrator handles",
966 registry,
967 )
968 .unwrap(),
969 local_execution_failure: register_int_counter_with_registry!(
970 "tx_orchestrator_local_execution_failure",
971 "Total number of failed local execution txns Transaction Orchestrator handles",
972 registry,
973 )
974 .unwrap(),
975 request_latency_single_writer: request_latency
976 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
977 request_latency_shared_obj: request_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
978 wait_for_finality_latency_single_writer: wait_for_finality_latency
979 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
980 wait_for_finality_latency_shared_obj: wait_for_finality_latency
981 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
982 local_execution_latency_single_writer: local_execution_latency
983 .with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]),
984 local_execution_latency_shared_obj: local_execution_latency
985 .with_label_values(&[TX_TYPE_SHARED_OBJ_TX]),
986 }
987 }
988
989 pub fn new_for_tests() -> Self {
990 let registry = Registry::new();
991 Self::new(®istry)
992 }
993}
994
995#[async_trait::async_trait]
996impl<A> iota_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
997where
998 A: AuthorityAPI + Send + Sync + 'static + Clone,
999{
1000 async fn execute_transaction(
1001 &self,
1002 request: ExecuteTransactionRequestV1,
1003 client_addr: Option<std::net::SocketAddr>,
1004 ) -> Result<ExecuteTransactionResponseV1, QuorumDriverError> {
1005 self.execute_transaction_v1(request, client_addr).await
1006 }
1007
1008 fn simulate_transaction(
1009 &self,
1010 transaction: TransactionData,
1011 checks: VmChecks,
1012 ) -> Result<SimulateTransactionResult, IotaError> {
1013 self.validator_state
1014 .simulate_transaction(transaction, checks)
1015 }
1016
1017 async fn wait_for_checkpoint_inclusion(
1024 &self,
1025 digests: &[TransactionDigest],
1026 timeout: Duration,
1027 ) -> Result<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>, IotaError> {
1028 self.validator_state
1029 .wait_for_checkpoint_inclusion(digests, timeout)
1030 .await
1031 }
1032}