iota_core/
execution_driver.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    sync::{Arc, Weak},
7    time::Duration,
8};
9
10use iota_macros::fail_point_async;
11use iota_metrics::{monitored_scope, spawn_monitored_task};
12use iota_protocol_config::Chain;
13use rand::{
14    Rng, SeedableRng,
15    rngs::{OsRng, StdRng},
16};
17use tokio::{
18    sync::{Semaphore, mpsc::UnboundedReceiver, oneshot},
19    time::sleep,
20};
21use tracing::{Instrument, error, error_span, info, trace};
22
23use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
24
25#[cfg(test)]
26#[path = "unit_tests/execution_driver_tests.rs"]
27mod execution_driver_tests;
28
29// Execution should not encounter permanent failures, so any failure can and
30// needs to be retried.
31pub const EXECUTION_MAX_ATTEMPTS: u32 = 10;
32const EXECUTION_FAILURE_RETRY_INTERVAL: Duration = Duration::from_secs(1);
33const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
34
35/// When a notification that a new pending transaction is received we activate
36/// processing the transaction in a loop.
37pub async fn execution_process(
38    authority_state: Weak<AuthorityState>,
39    mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
40    mut rx_execution_shutdown: oneshot::Receiver<()>,
41) {
42    info!("Starting pending certificates execution process.");
43
44    // Rate limit concurrent executions to # of cpus.
45    let limit = Arc::new(Semaphore::new(num_cpus::get()));
46    let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
47
48    let is_mainnet = {
49        let Some(state) = authority_state.upgrade() else {
50            info!("Authority state has shutdown. Exiting ...");
51            return;
52        };
53
54        state
55            .get_chain_identifier()
56            .map(|chain_id| chain_id.chain())
57            == Some(Chain::Mainnet)
58    };
59
60    // Loop whenever there is a signal that a new transactions is ready to process.
61    loop {
62        let _scope = monitored_scope("ExecutionDriver::loop");
63
64        let certificate;
65        let expected_effects_digest;
66        let txn_ready_time;
67        tokio::select! {
68            result = rx_ready_certificates.recv() => {
69                if let Some(pending_cert) = result {
70                    certificate = pending_cert.certificate;
71                    expected_effects_digest = pending_cert.expected_effects_digest;
72                    txn_ready_time = pending_cert.stats.ready_time.unwrap();
73                } else {
74                    // Should only happen after the AuthorityState has shut down and tx_ready_certificate
75                    // has been dropped by TransactionManager.
76                    info!("No more certificate will be received. Exiting executor ...");
77                    return;
78                };
79            }
80            _ = &mut rx_execution_shutdown => {
81                info!("Shutdown signal received. Exiting executor ...");
82                return;
83            }
84        };
85
86        let authority = if let Some(authority) = authority_state.upgrade() {
87            authority
88        } else {
89            // Terminate the execution if authority has already shutdown, even if there can
90            // be more items in rx_ready_certificates.
91            info!("Authority state has shutdown. Exiting ...");
92            return;
93        };
94        authority.metrics.execution_driver_dispatch_queue.dec();
95
96        // TODO: Ideally execution_driver should own a copy of epoch store and recreate
97        // each epoch.
98        let epoch_store = authority.load_epoch_store_one_call_per_task();
99
100        let digest = *certificate.digest();
101        trace!(?digest, "Pending certificate execution activated.");
102
103        if epoch_store.epoch() != certificate.epoch() {
104            info!(
105                ?digest,
106                cur_epoch = epoch_store.epoch(),
107                cert_epoch = certificate.epoch(),
108                "Ignoring certificate from previous epoch."
109            );
110            continue;
111        }
112
113        let limit = limit.clone();
114        // hold semaphore permit until task completes. unwrap ok because we never close
115        // the semaphore in this context.
116        let permit = limit.acquire_owned().await.unwrap();
117
118        if rng.gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
119            authority
120                .metrics
121                .execution_queueing_latency
122                .report(txn_ready_time.elapsed());
123            if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
124                authority
125                    .metrics
126                    .execution_queueing_delay_s
127                    .observe(latency.as_secs_f64());
128            }
129        }
130
131        authority.metrics.execution_rate_tracker.lock().record();
132
133        // Certificate execution can take significant time, so run it in a separate
134        // task.
135        let epoch_store_clone = epoch_store.clone();
136        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
137            let _scope = monitored_scope("ExecutionDriver::task");
138            let _guard = permit;
139            if let Ok(true) = authority.is_tx_already_executed(&digest) {
140                return;
141            }
142            let mut attempts = 0;
143            loop {
144                fail_point_async!("transaction_execution_delay");
145                attempts += 1;
146                let res = authority
147                    .try_execute_immediately(&certificate, expected_effects_digest, &epoch_store_clone)
148                    .await;
149                if let Err(e) = res {
150                    // Tighten this check everywhere except mainnet - if we don't see an increase in
151                    // these crashes we will remove the retries.
152                    if !is_mainnet || attempts == EXECUTION_MAX_ATTEMPTS {
153                        panic!("Failed to execute certified transaction {digest:?} after {attempts} attempts! error={e} certificate={certificate:?}");
154                    }
155                    // Assume only transient failure can happen. Permanent failure is probably
156                    // a bug. There is nothing that can be done to recover from permanent failures.
157                    error!(tx_digest=?digest, "Failed to execute certified transaction {digest:?}! attempt {attempts}, {e}");
158                    sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await;
159                } else {
160                    break;
161                }
162            }
163            authority
164                .metrics
165                .execution_driver_executed_transactions
166                .inc();
167        }.instrument(error_span!("execution_driver", tx_digest = ?digest))));
168    }
169}