iota_core/
execution_driver.rs1use std::{
6 sync::{Arc, Weak},
7 time::Duration,
8};
9
10use iota_macros::fail_point_async;
11use iota_metrics::{monitored_scope, spawn_monitored_task};
12use iota_protocol_config::Chain;
13use rand::{
14 Rng, SeedableRng,
15 rngs::{OsRng, StdRng},
16};
17use tokio::{
18 sync::{Semaphore, mpsc::UnboundedReceiver, oneshot},
19 time::sleep,
20};
21use tracing::{Instrument, error, error_span, info, trace};
22
23use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
24
25#[cfg(test)]
26#[path = "unit_tests/execution_driver_tests.rs"]
27mod execution_driver_tests;
28
29pub const EXECUTION_MAX_ATTEMPTS: u32 = 10;
32const EXECUTION_FAILURE_RETRY_INTERVAL: Duration = Duration::from_secs(1);
33const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
34
35pub async fn execution_process(
38 authority_state: Weak<AuthorityState>,
39 mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
40 mut rx_execution_shutdown: oneshot::Receiver<()>,
41) {
42 info!("Starting pending certificates execution process.");
43
44 let limit = Arc::new(Semaphore::new(num_cpus::get()));
46 let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
47
48 let is_mainnet = {
49 let Some(state) = authority_state.upgrade() else {
50 info!("Authority state has shutdown. Exiting ...");
51 return;
52 };
53
54 state
55 .get_chain_identifier()
56 .map(|chain_id| chain_id.chain())
57 == Some(Chain::Mainnet)
58 };
59
60 loop {
62 let _scope = monitored_scope("ExecutionDriver::loop");
63
64 let certificate;
65 let expected_effects_digest;
66 let txn_ready_time;
67 tokio::select! {
68 result = rx_ready_certificates.recv() => {
69 if let Some(pending_cert) = result {
70 certificate = pending_cert.certificate;
71 expected_effects_digest = pending_cert.expected_effects_digest;
72 txn_ready_time = pending_cert.stats.ready_time.unwrap();
73 } else {
74 info!("No more certificate will be received. Exiting executor ...");
77 return;
78 };
79 }
80 _ = &mut rx_execution_shutdown => {
81 info!("Shutdown signal received. Exiting executor ...");
82 return;
83 }
84 };
85
86 let authority = if let Some(authority) = authority_state.upgrade() {
87 authority
88 } else {
89 info!("Authority state has shutdown. Exiting ...");
92 return;
93 };
94 authority.metrics.execution_driver_dispatch_queue.dec();
95
96 let epoch_store = authority.load_epoch_store_one_call_per_task();
99
100 let digest = *certificate.digest();
101 trace!(?digest, "Pending certificate execution activated.");
102
103 if epoch_store.epoch() != certificate.epoch() {
104 info!(
105 ?digest,
106 cur_epoch = epoch_store.epoch(),
107 cert_epoch = certificate.epoch(),
108 "Ignoring certificate from previous epoch."
109 );
110 continue;
111 }
112
113 let limit = limit.clone();
114 let permit = limit.acquire_owned().await.unwrap();
117
118 if rng.gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
119 authority
120 .metrics
121 .execution_queueing_latency
122 .report(txn_ready_time.elapsed());
123 if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
124 authority
125 .metrics
126 .execution_queueing_delay_s
127 .observe(latency.as_secs_f64());
128 }
129 }
130
131 authority.metrics.execution_rate_tracker.lock().record();
132
133 let epoch_store_clone = epoch_store.clone();
136 spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
137 let _scope = monitored_scope("ExecutionDriver::task");
138 let _guard = permit;
139 if let Ok(true) = authority.is_tx_already_executed(&digest) {
140 return;
141 }
142 let mut attempts = 0;
143 loop {
144 fail_point_async!("transaction_execution_delay");
145 attempts += 1;
146 let res = authority
147 .try_execute_immediately(&certificate, expected_effects_digest, &epoch_store_clone)
148 .await;
149 if let Err(e) = res {
150 if !is_mainnet || attempts == EXECUTION_MAX_ATTEMPTS {
153 panic!("Failed to execute certified transaction {digest:?} after {attempts} attempts! error={e} certificate={certificate:?}");
154 }
155 error!(tx_digest=?digest, "Failed to execute certified transaction {digest:?}! attempt {attempts}, {e}");
158 sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await;
159 } else {
160 break;
161 }
162 }
163 authority
164 .metrics
165 .execution_driver_executed_transactions
166 .inc();
167 }.instrument(error_span!("execution_driver", tx_digest = ?digest))));
168 }
169}