iota_core/
execution_driver.rs1use std::sync::{Arc, Weak};
6
7use iota_common::{fatal, random::get_rng};
8use iota_macros::fail_point_async;
9use iota_metrics::{monitored_scope, spawn_monitored_task};
10use iota_types::error::IotaError;
11use rand::Rng;
12use tokio::sync::{Semaphore, mpsc::UnboundedReceiver, oneshot};
13use tracing::{Instrument, error_span, info, instrument, warn};
14
15use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
16
17#[cfg(test)]
18#[path = "unit_tests/execution_driver_tests.rs"]
19mod execution_driver_tests;
20
21const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
22
23#[instrument("start_execute_pending_certs", level = "trace", skip_all)]
26pub async fn execution_process(
27 authority_state: Weak<AuthorityState>,
28 mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
29 mut rx_execution_shutdown: oneshot::Receiver<()>,
30) {
31 info!("Starting pending certificates execution process.");
32
33 let limit = Arc::new(Semaphore::new(num_cpus::get()));
35
36 loop {
38 let _scope = monitored_scope("ExecutionDriver::loop");
39
40 let certificate;
41 let expected_effects_digest;
42 let txn_ready_time;
43 tokio::select! {
44 result = rx_ready_certificates.recv() => {
45 if let Some(pending_cert) = result {
46 certificate = pending_cert.certificate;
47 expected_effects_digest = pending_cert.expected_effects_digest;
48 txn_ready_time = pending_cert.stats.ready_time.unwrap();
49 } else {
50 info!("No more certificate will be received. Exiting executor ...");
53 return;
54 };
55 }
56 _ = &mut rx_execution_shutdown => {
57 info!("Shutdown signal received. Exiting executor ...");
58 return;
59 }
60 };
61
62 let authority = if let Some(authority) = authority_state.upgrade() {
63 authority
64 } else {
65 info!("Authority state has shutdown. Exiting ...");
68 return;
69 };
70 authority.metrics.execution_driver_dispatch_queue.dec();
71
72 let epoch_store = authority.load_epoch_store_one_call_per_task();
75
76 let digest = *certificate.digest();
77
78 if epoch_store.epoch() != certificate.epoch() {
79 info!(
80 ?digest,
81 cur_epoch = epoch_store.epoch(),
82 cert_epoch = certificate.epoch(),
83 "Ignoring certificate from previous epoch."
84 );
85 continue;
86 }
87
88 let limit = limit.clone();
89 let permit = limit.acquire_owned().await.unwrap();
92
93 if get_rng().gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
94 authority
95 .metrics
96 .execution_queueing_latency
97 .report(txn_ready_time.elapsed());
98 if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
99 authority
100 .metrics
101 .execution_queueing_delay_s
102 .observe(latency.as_secs_f64());
103 }
104 }
105
106 authority.metrics.execution_rate_tracker.lock().record();
107
108 let epoch_store_clone = epoch_store.clone();
111 spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
112 let _scope = monitored_scope("ExecutionDriver::task");
113 let _guard = permit;
114 if let Ok(true) = authority.try_is_tx_already_executed(&digest) {
115 return;
116 }
117
118 fail_point_async!("transaction_execution_delay");
119
120 match authority.try_execute_immediately(
121 &certificate,
122 expected_effects_digest,
123 &epoch_store_clone,
124 ) {
125 Err(IotaError::ValidatorHaltedAtEpochEnd) => {
126 warn!("Could not execute transaction {digest} because validator is halted at epoch end. certificate={certificate:?}");
127 return;
128 }
129 Err(e) => {
130 fatal!("Failed to execute certified transaction {digest}! error={e} certificate={certificate:?}");
131 }
132 _ => (),
133 }
134 authority
135 .metrics
136 .execution_driver_executed_transactions
137 .inc();
138 }.instrument(error_span!("executing_pending_cert", tx_digest = ?digest))));
139 }
140}