iota_core/
execution_driver.rs1use std::{
6 sync::{Arc, Weak},
7 time::Duration,
8};
9
10use iota_macros::fail_point_async;
11use iota_metrics::{monitored_scope, spawn_monitored_task};
12use rand::{
13 Rng, SeedableRng,
14 rngs::{OsRng, StdRng},
15};
16use tokio::{
17 sync::{Semaphore, mpsc::UnboundedReceiver, oneshot},
18 time::sleep,
19};
20use tracing::{Instrument, error, error_span, info, trace};
21
22use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
23
24#[cfg(test)]
25#[path = "unit_tests/execution_driver_tests.rs"]
26mod execution_driver_tests;
27
28pub const EXECUTION_MAX_ATTEMPTS: u32 = 10;
31const EXECUTION_FAILURE_RETRY_INTERVAL: Duration = Duration::from_secs(1);
32const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
33
34pub async fn execution_process(
37 authority_state: Weak<AuthorityState>,
38 mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
39 mut rx_execution_shutdown: oneshot::Receiver<()>,
40) {
41 info!("Starting pending certificates execution process.");
42
43 let limit = Arc::new(Semaphore::new(num_cpus::get()));
45 let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
46
47 loop {
49 let _scope = monitored_scope("ExecutionDriver::loop");
50
51 let certificate;
52 let expected_effects_digest;
53 let txn_ready_time;
54 tokio::select! {
55 result = rx_ready_certificates.recv() => {
56 if let Some(pending_cert) = result {
57 certificate = pending_cert.certificate;
58 expected_effects_digest = pending_cert.expected_effects_digest;
59 txn_ready_time = pending_cert.stats.ready_time.unwrap();
60 } else {
61 info!("No more certificate will be received. Exiting executor ...");
64 return;
65 };
66 }
67 _ = &mut rx_execution_shutdown => {
68 info!("Shutdown signal received. Exiting executor ...");
69 return;
70 }
71 };
72
73 let authority = if let Some(authority) = authority_state.upgrade() {
74 authority
75 } else {
76 info!("Authority state has shutdown. Exiting ...");
79 return;
80 };
81 authority.metrics.execution_driver_dispatch_queue.dec();
82
83 let epoch_store = authority.load_epoch_store_one_call_per_task();
86
87 let digest = *certificate.digest();
88 trace!(?digest, "Pending certificate execution activated.");
89
90 if epoch_store.epoch() != certificate.epoch() {
91 info!(
92 ?digest,
93 cur_epoch = epoch_store.epoch(),
94 cert_epoch = certificate.epoch(),
95 "Ignoring certificate from previous epoch."
96 );
97 continue;
98 }
99
100 let limit = limit.clone();
101 let permit = limit.acquire_owned().await.unwrap();
104
105 if rng.gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
106 authority
107 .metrics
108 .execution_queueing_latency
109 .report(txn_ready_time.elapsed());
110 if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
111 authority
112 .metrics
113 .execution_queueing_delay_s
114 .observe(latency.as_secs_f64());
115 }
116 }
117
118 authority.metrics.execution_rate_tracker.lock().record();
119
120 let epoch_store_clone = epoch_store.clone();
123 spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
124 let _scope = monitored_scope("ExecutionDriver::task");
125 let _guard = permit;
126 if let Ok(true) = authority.is_tx_already_executed(&digest) {
127 return;
128 }
129 let mut attempts = 0;
130 loop {
131 fail_point_async!("transaction_execution_delay");
132 attempts += 1;
133 let res = authority
134 .try_execute_immediately(&certificate, expected_effects_digest, &epoch_store_clone)
135 .await;
136 if let Err(e) = res {
137 if attempts == EXECUTION_MAX_ATTEMPTS {
138 panic!("Failed to execute certified transaction {digest:?} after {attempts} attempts! error={e} certificate={certificate:?}");
139 }
140 error!(tx_digest=?digest, "Failed to execute certified transaction {digest:?}! attempt {attempts}, {e}");
143 sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await;
144 } else {
145 break;
146 }
147 }
148 authority
149 .metrics
150 .execution_driver_executed_transactions
151 .inc();
152 }.instrument(error_span!("execution_driver", tx_digest = ?digest))));
153 }
154}