iota_core/
execution_driver.rs1use std::sync::{Arc, Weak};
6
7use iota_common::fatal;
8use iota_macros::fail_point_async;
9use iota_metrics::{monitored_scope, spawn_monitored_task};
10use iota_types::error::IotaError;
11use rand::{
12 Rng, SeedableRng,
13 rngs::{OsRng, StdRng},
14};
15use tokio::sync::{Semaphore, mpsc::UnboundedReceiver, oneshot};
16use tracing::{Instrument, error_span, info, trace, warn};
17
18use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
19
20#[cfg(test)]
21#[path = "unit_tests/execution_driver_tests.rs"]
22mod execution_driver_tests;
23
24const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
25
26pub async fn execution_process(
29 authority_state: Weak<AuthorityState>,
30 mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
31 mut rx_execution_shutdown: oneshot::Receiver<()>,
32) {
33 info!("Starting pending certificates execution process.");
34
35 let limit = Arc::new(Semaphore::new(num_cpus::get()));
37 let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
38
39 loop {
41 let _scope = monitored_scope("ExecutionDriver::loop");
42
43 let certificate;
44 let expected_effects_digest;
45 let txn_ready_time;
46 tokio::select! {
47 result = rx_ready_certificates.recv() => {
48 if let Some(pending_cert) = result {
49 certificate = pending_cert.certificate;
50 expected_effects_digest = pending_cert.expected_effects_digest;
51 txn_ready_time = pending_cert.stats.ready_time.unwrap();
52 } else {
53 info!("No more certificate will be received. Exiting executor ...");
56 return;
57 };
58 }
59 _ = &mut rx_execution_shutdown => {
60 info!("Shutdown signal received. Exiting executor ...");
61 return;
62 }
63 };
64
65 let authority = if let Some(authority) = authority_state.upgrade() {
66 authority
67 } else {
68 info!("Authority state has shutdown. Exiting ...");
71 return;
72 };
73 authority.metrics.execution_driver_dispatch_queue.dec();
74
75 let epoch_store = authority.load_epoch_store_one_call_per_task();
78
79 let digest = *certificate.digest();
80 trace!(?digest, "Pending certificate execution activated.");
81
82 if epoch_store.epoch() != certificate.epoch() {
83 info!(
84 ?digest,
85 cur_epoch = epoch_store.epoch(),
86 cert_epoch = certificate.epoch(),
87 "Ignoring certificate from previous epoch."
88 );
89 continue;
90 }
91
92 let limit = limit.clone();
93 let permit = limit.acquire_owned().await.unwrap();
96
97 if rng.gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
98 authority
99 .metrics
100 .execution_queueing_latency
101 .report(txn_ready_time.elapsed());
102 if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
103 authority
104 .metrics
105 .execution_queueing_delay_s
106 .observe(latency.as_secs_f64());
107 }
108 }
109
110 authority.metrics.execution_rate_tracker.lock().record();
111
112 let epoch_store_clone = epoch_store.clone();
115 spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
116 let _scope = monitored_scope("ExecutionDriver::task");
117 let _guard = permit;
118 if let Ok(true) = authority.try_is_tx_already_executed(&digest) {
119 return;
120 }
121
122 fail_point_async!("transaction_execution_delay");
123
124 match authority.try_execute_immediately(
125 &certificate,
126 expected_effects_digest,
127 &epoch_store_clone,
128 ) {
129 Err(IotaError::ValidatorHaltedAtEpochEnd) => {
130 warn!("Could not execute transaction {digest:?} because validator is halted at epoch end. certificate={certificate:?}");
131 return;
132 }
133 Err(e) => {
134 fatal!("Failed to execute certified transaction {digest:?}! error={e} certificate={certificate:?}");
135 }
136 _ => (),
137 }
138 authority
139 .metrics
140 .execution_driver_executed_transactions
141 .inc();
142 }.instrument(error_span!("execution_driver", tx_digest = ?digest))));
143 }
144}